From 2192c23760c3ccacdb6ce7db6992dbe8e2ecf96b Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Mon, 6 Mar 2023 13:56:54 -0500 Subject: [PATCH] start of infrastructure for positionally resuming tail (#128) --- cmd/zrok/metrics.go | 20 +++++++++++++++++++- controller/metrics/agent.go | 27 +++++++++++++++++++-------- controller/metrics/fileSource.go | 18 +++++++++++++++--- 3 files changed, 53 insertions(+), 12 deletions(-) diff --git a/cmd/zrok/metrics.go b/cmd/zrok/metrics.go index 445348d5..4001c224 100644 --- a/cmd/zrok/metrics.go +++ b/cmd/zrok/metrics.go @@ -5,6 +5,10 @@ import ( "github.com/openziti/zrok/controller/metrics" "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "os" + "os/signal" + "syscall" + "time" ) func init() { @@ -33,7 +37,21 @@ func (cmd *metricsCommand) run(_ *cobra.Command, args []string) { } logrus.Infof(cf.Dump(cfg, metrics.GetCfOptions())) - if err := metrics.Run(cfg); err != nil { + ma, err := metrics.Run(cfg) + if err != nil { panic(err) } + + c := make(chan os.Signal) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + ma.Stop() + ma.Join() + os.Exit(0) + }() + + for { + time.Sleep(30 * time.Minute) + } } diff --git a/controller/metrics/agent.go b/controller/metrics/agent.go index f793b43a..757d033f 100644 --- a/controller/metrics/agent.go +++ b/controller/metrics/agent.go @@ -5,23 +5,28 @@ import ( "github.com/sirupsen/logrus" ) -func Run(cfg *Config) error { +type MetricsAgent struct { + src Source + join chan struct{} +} + +func Run(cfg *Config) (*MetricsAgent, error) { logrus.Info("starting") defer logrus.Warn("stopping") if cfg.Source == nil { - return errors.New("no 'source' configured; exiting") + return nil, errors.New("no 'source' configured; exiting") } src, ok := cfg.Source.(Source) if !ok { - return errors.New("invalid 'source'; exiting") + return nil, errors.New("invalid 'source'; exiting") } events := make(chan map[string]interface{}, 1024) - srcJoin, err := src.Start(events) + join, err := src.Start(events) if err != nil { - return errors.Wrap(err, "error starting source") + return nil, errors.Wrap(err, "error starting source") } go func() { @@ -33,7 +38,13 @@ func Run(cfg *Config) error { } }() - <-srcJoin - - return nil + return &MetricsAgent{src, join}, nil +} + +func (ma *MetricsAgent) Stop() { + ma.src.Stop() +} + +func (ma *MetricsAgent) Join() { + <-ma.join } diff --git a/controller/metrics/fileSource.go b/controller/metrics/fileSource.go index db692966..9aba3937 100644 --- a/controller/metrics/fileSource.go +++ b/controller/metrics/fileSource.go @@ -19,13 +19,14 @@ func loadFileSourceConfig(v interface{}, opts *cf.Options) (interface{}, error) if err := cf.Bind(cfg, submap, cf.DefaultOptions()); err != nil { return nil, err } - return &fileSource{cfg}, nil + return &fileSource{cfg: cfg}, nil } return nil, errors.New("invalid config structure for 'file' source") } type fileSource struct { cfg *FileSourceConfig + t *tail.Tail } func (s *fileSource) Start(events chan map[string]interface{}) (chan struct{}, error) { @@ -45,18 +46,29 @@ func (s *fileSource) Start(events chan map[string]interface{}) (chan struct{}, e } func (s *fileSource) Stop() { + if err := s.t.Stop(); err != nil { + logrus.Error(err) + } } func (s *fileSource) tail(events chan map[string]interface{}) { - t, err := tail.TailFile(s.cfg.Path, tail.Config{Follow: true, ReOpen: true}) + logrus.Infof("started") + defer logrus.Infof("stopped") + + var err error + s.t, err = tail.TailFile(s.cfg.Path, tail.Config{ + ReOpen: true, + Follow: true, + }) if err != nil { logrus.Error(err) return } - for line := range t.Lines { + for line := range s.t.Lines { event := make(map[string]interface{}) if err := json.Unmarshal([]byte(line.Text), &event); err == nil { + logrus.Infof("seekinfo: offset: %d", line.SeekInfo.Offset) events <- event } else { logrus.Errorf("error parsing line #%d: %v", line.Num, err)