basic file source and usage ingester (#128)

This commit is contained in:
Michael Quigley 2023-03-03 14:22:22 -05:00
parent 55523ae7ed
commit f5846681e7
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
7 changed files with 49 additions and 6 deletions

View File

@ -18,11 +18,24 @@ func Run(cfg *Config) error {
return errors.New("invalid 'source'; exiting")
}
srcJoin, err := src.Start()
events := make(chan map[string]interface{}, 1024)
srcJoin, err := src.Start(events)
if err != nil {
return errors.Wrap(err, "error starting source")
}
go func() {
ingester := &UsageIngester{}
for {
select {
case event := <-events:
if err := ingester.Ingest(event); err != nil {
logrus.Error(err)
}
}
}
}()
<-srcJoin
return nil

View File

@ -1,8 +1,11 @@
package metrics
import (
"encoding/json"
"github.com/michaelquigley/cf"
"github.com/nxadm/tail"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"os"
)
@ -25,18 +28,38 @@ type fileSource struct {
cfg *FileSourceConfig
}
func (s *fileSource) Start() (chan struct{}, error) {
func (s *fileSource) Start(events chan map[string]interface{}) (chan struct{}, error) {
f, err := os.Open(s.cfg.Path)
if err != nil {
return nil, errors.Wrapf(err, "error opening '%v'", s.cfg.Path)
}
_ = f.Close()
ch := make(chan struct{})
go func() {
f.Close()
s.tail(events)
close(ch)
}()
return ch, nil
}
func (s *fileSource) Stop() {
}
func (s *fileSource) tail(events chan map[string]interface{}) {
t, err := tail.TailFile(s.cfg.Path, tail.Config{Follow: true, ReOpen: true})
if err != nil {
logrus.Error(err)
return
}
for line := range t.Lines {
event := make(map[string]interface{})
if err := json.Unmarshal([]byte(line.Text), &event); err == nil {
events <- event
} else {
logrus.Errorf("error parsing line #%d: %v", line.Num, err)
}
}
}

View File

@ -1,7 +1,7 @@
package metrics
type Source interface {
Start() (chan struct{}, error)
Start(chan map[string]interface{}) (chan struct{}, error)
Stop()
}

View File

@ -3,6 +3,7 @@ package metrics
import (
"github.com/openziti/zrok/util"
"github.com/sirupsen/logrus"
"reflect"
)
type UsageIngester struct{}
@ -70,7 +71,7 @@ func (i *UsageIngester) Ingest(event map[string]interface{}) error {
logrus.Error("missing 'usage/egress.rx'")
}
} else {
logrus.Error("unabel to assert 'usage'")
logrus.Errorf("unable to assert 'usage' (%v) %v", reflect.TypeOf(v), event)
}
} else {
logrus.Error("missing 'usage'")

View File

@ -12,6 +12,9 @@ func loadWebsocketSourceConfig(v interface{}, opts *cf.Options) (interface{}, er
type websocketSource struct{}
func (s *websocketSource) Start() (chan struct{}, error) {
func (s *websocketSource) Start(events chan map[string]interface{}) (chan struct{}, error) {
return nil, nil
}
func (s *websocketSource) Stop() {
}

2
go.mod
View File

@ -25,6 +25,7 @@ require (
github.com/michaelquigley/cf v0.0.13
github.com/michaelquigley/pfxlog v0.6.9
github.com/muesli/reflow v0.3.0
github.com/nxadm/tail v1.4.8
github.com/openziti/channel/v2 v2.0.30
github.com/openziti/edge v0.22.39
github.com/openziti/fabric v0.22.33
@ -110,6 +111,7 @@ require (
golang.org/x/term v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

1
go.sum
View File

@ -471,6 +471,7 @@ github.com/netfoundry/secretstream v0.1.3/go.mod h1:4wlgjAsXXlEa0tNJ7XEYvoBh/Ev0
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=