From 728ae4b7b0c7113121cb474b1e55d5c12fe86e51 Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Mon, 6 Mar 2023 14:20:44 -0500 Subject: [PATCH] basic position durability for tail operation (#128) --- controller/metrics/agent.go | 2 +- controller/metrics/fileSource.go | 43 ++++++++++++++++++++++++-------- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/controller/metrics/agent.go b/controller/metrics/agent.go index 757d033f..82d356f7 100644 --- a/controller/metrics/agent.go +++ b/controller/metrics/agent.go @@ -12,7 +12,6 @@ type MetricsAgent struct { func Run(cfg *Config) (*MetricsAgent, error) { logrus.Info("starting") - defer logrus.Warn("stopping") if cfg.Source == nil { return nil, errors.New("no 'source' configured; exiting") @@ -42,6 +41,7 @@ func Run(cfg *Config) (*MetricsAgent, error) { } func (ma *MetricsAgent) Stop() { + logrus.Info("stopping") ma.src.Stop() } diff --git a/controller/metrics/fileSource.go b/controller/metrics/fileSource.go index 9aba3937..d9e4432f 100644 --- a/controller/metrics/fileSource.go +++ b/controller/metrics/fileSource.go @@ -1,6 +1,7 @@ package metrics import ( + "encoding/binary" "encoding/json" "github.com/michaelquigley/cf" "github.com/nxadm/tail" @@ -10,7 +11,8 @@ import ( ) type FileSourceConfig struct { - Path string + Path string + DataPath string } func loadFileSourceConfig(v interface{}, opts *cf.Options) (interface{}, error) { @@ -29,20 +31,33 @@ type fileSource struct { t *tail.Tail } -func (s *fileSource) Start(events chan map[string]interface{}) (chan struct{}, error) { +func (s *fileSource) Start(events chan map[string]interface{}) (join chan struct{}, err error) { f, err := os.Open(s.cfg.Path) if err != nil { return nil, errors.Wrapf(err, "error opening '%v'", s.cfg.Path) } _ = f.Close() - ch := make(chan struct{}) + idxPath := s.cfg.Path + ".idx" + idxF, err := os.OpenFile(idxPath, os.O_CREATE|os.O_RDWR, os.ModePerm) + if err != nil { + return nil, errors.Wrapf(err, "error opening '%v'", idxPath) + } + + pos := int64(0) + posBuf := make([]byte, 8) + if n, err := idxF.Read(posBuf); err == nil && n == 8 { + pos = int64(binary.LittleEndian.Uint64(posBuf)) + logrus.Infof("recovered stored position: %d", pos) + } + + join = make(chan struct{}) go func() { - s.tail(events) - close(ch) + s.tail(pos, events, idxF) + close(join) }() - return ch, nil + return join, nil } func (s *fileSource) Stop() { @@ -51,14 +66,17 @@ func (s *fileSource) Stop() { } } -func (s *fileSource) tail(events chan map[string]interface{}) { +func (s *fileSource) tail(pos int64, events chan map[string]interface{}, idxF *os.File) { logrus.Infof("started") defer logrus.Infof("stopped") + posBuf := make([]byte, 8) + var err error s.t, err = tail.TailFile(s.cfg.Path, tail.Config{ - ReOpen: true, - Follow: true, + ReOpen: true, + Follow: true, + Location: &tail.SeekInfo{Offset: pos}, }) if err != nil { logrus.Error(err) @@ -68,7 +86,12 @@ func (s *fileSource) tail(events chan map[string]interface{}) { for line := range s.t.Lines { event := make(map[string]interface{}) if err := json.Unmarshal([]byte(line.Text), &event); err == nil { - logrus.Infof("seekinfo: offset: %d", line.SeekInfo.Offset) + binary.LittleEndian.PutUint64(posBuf, uint64(line.SeekInfo.Offset)) + if n, err := idxF.Seek(0, 0); err == nil && n == 0 { + if n, err := idxF.Write(posBuf); err != nil || n != 8 { + logrus.Errorf("error writing index (%d): %v", n, err) + } + } events <- event } else { logrus.Errorf("error parsing line #%d: %v", line.Num, err)