basic position durability for tail operation (#128)

This commit is contained in:
Michael Quigley 2023-03-06 14:20:44 -05:00
parent 2192c23760
commit e4877a20ed
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
2 changed files with 34 additions and 11 deletions

View File

@ -12,7 +12,6 @@ type MetricsAgent struct {
func Run(cfg *Config) (*MetricsAgent, error) { func Run(cfg *Config) (*MetricsAgent, error) {
logrus.Info("starting") logrus.Info("starting")
defer logrus.Warn("stopping")
if cfg.Source == nil { if cfg.Source == nil {
return nil, errors.New("no 'source' configured; exiting") return nil, errors.New("no 'source' configured; exiting")
@ -42,6 +41,7 @@ func Run(cfg *Config) (*MetricsAgent, error) {
} }
func (ma *MetricsAgent) Stop() { func (ma *MetricsAgent) Stop() {
logrus.Info("stopping")
ma.src.Stop() ma.src.Stop()
} }

View File

@ -1,6 +1,7 @@
package metrics package metrics
import ( import (
"encoding/binary"
"encoding/json" "encoding/json"
"github.com/michaelquigley/cf" "github.com/michaelquigley/cf"
"github.com/nxadm/tail" "github.com/nxadm/tail"
@ -10,7 +11,8 @@ import (
) )
type FileSourceConfig struct { type FileSourceConfig struct {
Path string Path string
DataPath string
} }
func loadFileSourceConfig(v interface{}, opts *cf.Options) (interface{}, error) { func loadFileSourceConfig(v interface{}, opts *cf.Options) (interface{}, error) {
@ -29,20 +31,33 @@ type fileSource struct {
t *tail.Tail t *tail.Tail
} }
func (s *fileSource) Start(events chan map[string]interface{}) (chan struct{}, error) { func (s *fileSource) Start(events chan map[string]interface{}) (join chan struct{}, err error) {
f, err := os.Open(s.cfg.Path) f, err := os.Open(s.cfg.Path)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "error opening '%v'", s.cfg.Path) return nil, errors.Wrapf(err, "error opening '%v'", s.cfg.Path)
} }
_ = f.Close() _ = f.Close()
ch := make(chan struct{}) idxPath := s.cfg.Path + ".idx"
idxF, err := os.OpenFile(idxPath, os.O_CREATE|os.O_RDWR, os.ModePerm)
if err != nil {
return nil, errors.Wrapf(err, "error opening '%v'", idxPath)
}
pos := int64(0)
posBuf := make([]byte, 8)
if n, err := idxF.Read(posBuf); err == nil && n == 8 {
pos = int64(binary.LittleEndian.Uint64(posBuf))
logrus.Infof("recovered stored position: %d", pos)
}
join = make(chan struct{})
go func() { go func() {
s.tail(events) s.tail(pos, events, idxF)
close(ch) close(join)
}() }()
return ch, nil return join, nil
} }
func (s *fileSource) Stop() { func (s *fileSource) Stop() {
@ -51,14 +66,17 @@ func (s *fileSource) Stop() {
} }
} }
func (s *fileSource) tail(events chan map[string]interface{}) { func (s *fileSource) tail(pos int64, events chan map[string]interface{}, idxF *os.File) {
logrus.Infof("started") logrus.Infof("started")
defer logrus.Infof("stopped") defer logrus.Infof("stopped")
posBuf := make([]byte, 8)
var err error var err error
s.t, err = tail.TailFile(s.cfg.Path, tail.Config{ s.t, err = tail.TailFile(s.cfg.Path, tail.Config{
ReOpen: true, ReOpen: true,
Follow: true, Follow: true,
Location: &tail.SeekInfo{Offset: pos},
}) })
if err != nil { if err != nil {
logrus.Error(err) logrus.Error(err)
@ -68,7 +86,12 @@ func (s *fileSource) tail(events chan map[string]interface{}) {
for line := range s.t.Lines { for line := range s.t.Lines {
event := make(map[string]interface{}) event := make(map[string]interface{})
if err := json.Unmarshal([]byte(line.Text), &event); err == nil { if err := json.Unmarshal([]byte(line.Text), &event); err == nil {
logrus.Infof("seekinfo: offset: %d", line.SeekInfo.Offset) binary.LittleEndian.PutUint64(posBuf, uint64(line.SeekInfo.Offset))
if n, err := idxF.Seek(0, 0); err == nil && n == 0 {
if n, err := idxF.Write(posBuf); err != nil || n != 8 {
logrus.Errorf("error writing index (%d): %v", n, err)
}
}
events <- event events <- event
} else { } else {
logrus.Errorf("error parsing line #%d: %v", line.Num, err) logrus.Errorf("error parsing line #%d: %v", line.Num, err)