From f5846681e75d08235f74ba75e5e3fa8566c0ca09 Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Fri, 3 Mar 2023 14:22:22 -0500 Subject: [PATCH] basic file source and usage ingester (#128) --- controller/metrics/agent.go | 15 ++++++++++++++- controller/metrics/fileSource.go | 27 +++++++++++++++++++++++++-- controller/metrics/model.go | 2 +- controller/metrics/usageIngester.go | 3 ++- controller/metrics/websocketSource.go | 5 ++++- go.mod | 2 ++ go.sum | 1 + 7 files changed, 49 insertions(+), 6 deletions(-) diff --git a/controller/metrics/agent.go b/controller/metrics/agent.go index 9d5c768f..047ffdc9 100644 --- a/controller/metrics/agent.go +++ b/controller/metrics/agent.go @@ -18,11 +18,24 @@ func Run(cfg *Config) error { return errors.New("invalid 'source'; exiting") } - srcJoin, err := src.Start() + events := make(chan map[string]interface{}, 1024) + srcJoin, err := src.Start(events) if err != nil { return errors.Wrap(err, "error starting source") } + go func() { + ingester := &UsageIngester{} + for { + select { + case event := <-events: + if err := ingester.Ingest(event); err != nil { + logrus.Error(err) + } + } + } + }() + <-srcJoin return nil diff --git a/controller/metrics/fileSource.go b/controller/metrics/fileSource.go index c545bd12..db692966 100644 --- a/controller/metrics/fileSource.go +++ b/controller/metrics/fileSource.go @@ -1,8 +1,11 @@ package metrics import ( + "encoding/json" "github.com/michaelquigley/cf" + "github.com/nxadm/tail" "github.com/pkg/errors" + "github.com/sirupsen/logrus" "os" ) @@ -25,18 +28,38 @@ type fileSource struct { cfg *FileSourceConfig } -func (s *fileSource) Start() (chan struct{}, error) { +func (s *fileSource) Start(events chan map[string]interface{}) (chan struct{}, error) { f, err := os.Open(s.cfg.Path) if err != nil { return nil, errors.Wrapf(err, "error opening '%v'", s.cfg.Path) } + _ = f.Close() + ch := make(chan struct{}) go func() { - f.Close() + s.tail(events) close(ch) }() + return ch, nil } func (s *fileSource) Stop() { } + +func (s *fileSource) tail(events chan map[string]interface{}) { + t, err := tail.TailFile(s.cfg.Path, tail.Config{Follow: true, ReOpen: true}) + if err != nil { + logrus.Error(err) + return + } + + for line := range t.Lines { + event := make(map[string]interface{}) + if err := json.Unmarshal([]byte(line.Text), &event); err == nil { + events <- event + } else { + logrus.Errorf("error parsing line #%d: %v", line.Num, err) + } + } +} diff --git a/controller/metrics/model.go b/controller/metrics/model.go index a932e0d0..223f3a83 100644 --- a/controller/metrics/model.go +++ b/controller/metrics/model.go @@ -1,7 +1,7 @@ package metrics type Source interface { - Start() (chan struct{}, error) + Start(chan map[string]interface{}) (chan struct{}, error) Stop() } diff --git a/controller/metrics/usageIngester.go b/controller/metrics/usageIngester.go index ea294195..90293eea 100644 --- a/controller/metrics/usageIngester.go +++ b/controller/metrics/usageIngester.go @@ -3,6 +3,7 @@ package metrics import ( "github.com/openziti/zrok/util" "github.com/sirupsen/logrus" + "reflect" ) type UsageIngester struct{} @@ -70,7 +71,7 @@ func (i *UsageIngester) Ingest(event map[string]interface{}) error { logrus.Error("missing 'usage/egress.rx'") } } else { - logrus.Error("unabel to assert 'usage'") + logrus.Errorf("unable to assert 'usage' (%v) %v", reflect.TypeOf(v), event) } } else { logrus.Error("missing 'usage'") diff --git a/controller/metrics/websocketSource.go b/controller/metrics/websocketSource.go index c4dd1faa..c0c0afe3 100644 --- a/controller/metrics/websocketSource.go +++ b/controller/metrics/websocketSource.go @@ -12,6 +12,9 @@ func loadWebsocketSourceConfig(v interface{}, opts *cf.Options) (interface{}, er type websocketSource struct{} -func (s *websocketSource) Start() (chan struct{}, error) { +func (s *websocketSource) Start(events chan map[string]interface{}) (chan struct{}, error) { return nil, nil } + +func (s *websocketSource) Stop() { +} diff --git a/go.mod b/go.mod index 31b0f504..40142ea6 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/michaelquigley/cf v0.0.13 github.com/michaelquigley/pfxlog v0.6.9 github.com/muesli/reflow v0.3.0 + github.com/nxadm/tail v1.4.8 github.com/openziti/channel/v2 v2.0.30 github.com/openziti/edge v0.22.39 github.com/openziti/fabric v0.22.33 @@ -110,6 +111,7 @@ require ( golang.org/x/term v0.5.0 // indirect golang.org/x/text v0.7.0 // indirect google.golang.org/protobuf v1.28.1 // indirect + gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 71d4ca5c..e7446a34 100644 --- a/go.sum +++ b/go.sum @@ -471,6 +471,7 @@ github.com/netfoundry/secretstream v0.1.3/go.mod h1:4wlgjAsXXlEa0tNJ7XEYvoBh/Ev0 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=