mirror of
https://github.com/openziti/zrok.git
synced 2024-11-26 18:13:52 +01:00
basic file source and usage ingester (#128)
This commit is contained in:
parent
44a613e7db
commit
90126e1617
@ -18,11 +18,24 @@ func Run(cfg *Config) error {
|
||||
return errors.New("invalid 'source'; exiting")
|
||||
}
|
||||
|
||||
srcJoin, err := src.Start()
|
||||
events := make(chan map[string]interface{}, 1024)
|
||||
srcJoin, err := src.Start(events)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error starting source")
|
||||
}
|
||||
|
||||
go func() {
|
||||
ingester := &UsageIngester{}
|
||||
for {
|
||||
select {
|
||||
case event := <-events:
|
||||
if err := ingester.Ingest(event); err != nil {
|
||||
logrus.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
<-srcJoin
|
||||
|
||||
return nil
|
||||
|
@ -1,8 +1,11 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/michaelquigley/cf"
|
||||
"github.com/nxadm/tail"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"os"
|
||||
)
|
||||
|
||||
@ -25,18 +28,38 @@ type fileSource struct {
|
||||
cfg *FileSourceConfig
|
||||
}
|
||||
|
||||
func (s *fileSource) Start() (chan struct{}, error) {
|
||||
func (s *fileSource) Start(events chan map[string]interface{}) (chan struct{}, error) {
|
||||
f, err := os.Open(s.cfg.Path)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "error opening '%v'", s.cfg.Path)
|
||||
}
|
||||
_ = f.Close()
|
||||
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
f.Close()
|
||||
s.tail(events)
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
func (s *fileSource) Stop() {
|
||||
}
|
||||
|
||||
func (s *fileSource) tail(events chan map[string]interface{}) {
|
||||
t, err := tail.TailFile(s.cfg.Path, tail.Config{Follow: true, ReOpen: true})
|
||||
if err != nil {
|
||||
logrus.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
for line := range t.Lines {
|
||||
event := make(map[string]interface{})
|
||||
if err := json.Unmarshal([]byte(line.Text), &event); err == nil {
|
||||
events <- event
|
||||
} else {
|
||||
logrus.Errorf("error parsing line #%d: %v", line.Num, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
package metrics
|
||||
|
||||
type Source interface {
|
||||
Start() (chan struct{}, error)
|
||||
Start(chan map[string]interface{}) (chan struct{}, error)
|
||||
Stop()
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,7 @@ package metrics
|
||||
import (
|
||||
"github.com/openziti/zrok/util"
|
||||
"github.com/sirupsen/logrus"
|
||||
"reflect"
|
||||
)
|
||||
|
||||
type UsageIngester struct{}
|
||||
@ -70,7 +71,7 @@ func (i *UsageIngester) Ingest(event map[string]interface{}) error {
|
||||
logrus.Error("missing 'usage/egress.rx'")
|
||||
}
|
||||
} else {
|
||||
logrus.Error("unabel to assert 'usage'")
|
||||
logrus.Errorf("unable to assert 'usage' (%v) %v", reflect.TypeOf(v), event)
|
||||
}
|
||||
} else {
|
||||
logrus.Error("missing 'usage'")
|
||||
|
@ -12,6 +12,9 @@ func loadWebsocketSourceConfig(v interface{}, opts *cf.Options) (interface{}, er
|
||||
|
||||
type websocketSource struct{}
|
||||
|
||||
func (s *websocketSource) Start() (chan struct{}, error) {
|
||||
func (s *websocketSource) Start(events chan map[string]interface{}) (chan struct{}, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *websocketSource) Stop() {
|
||||
}
|
||||
|
2
go.mod
2
go.mod
@ -25,6 +25,7 @@ require (
|
||||
github.com/michaelquigley/cf v0.0.13
|
||||
github.com/michaelquigley/pfxlog v0.6.9
|
||||
github.com/muesli/reflow v0.3.0
|
||||
github.com/nxadm/tail v1.4.8
|
||||
github.com/openziti/channel/v2 v2.0.30
|
||||
github.com/openziti/edge v0.22.39
|
||||
github.com/openziti/fabric v0.22.33
|
||||
@ -110,6 +111,7 @@ require (
|
||||
golang.org/x/term v0.5.0 // indirect
|
||||
golang.org/x/text v0.7.0 // indirect
|
||||
google.golang.org/protobuf v1.28.1 // indirect
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
1
go.sum
1
go.sum
@ -471,6 +471,7 @@ github.com/netfoundry/secretstream v0.1.3/go.mod h1:4wlgjAsXXlEa0tNJ7XEYvoBh/Ev0
|
||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
|
||||
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
|
||||
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
|
||||
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
|
||||
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
|
||||
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
|
||||
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
|
||||
|
Loading…
Reference in New Issue
Block a user