start of infrastructure for positionally resuming tail (#128)

This commit is contained in:
Michael Quigley 2023-03-06 13:56:54 -05:00
parent 1b1ecd91e1
commit 2192c23760
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
3 changed files with 53 additions and 12 deletions

View File

@ -5,6 +5,10 @@ import (
"github.com/openziti/zrok/controller/metrics" "github.com/openziti/zrok/controller/metrics"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"os"
"os/signal"
"syscall"
"time"
) )
func init() { func init() {
@ -33,7 +37,21 @@ func (cmd *metricsCommand) run(_ *cobra.Command, args []string) {
} }
logrus.Infof(cf.Dump(cfg, metrics.GetCfOptions())) logrus.Infof(cf.Dump(cfg, metrics.GetCfOptions()))
if err := metrics.Run(cfg); err != nil { ma, err := metrics.Run(cfg)
if err != nil {
panic(err) panic(err)
} }
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
ma.Stop()
ma.Join()
os.Exit(0)
}()
for {
time.Sleep(30 * time.Minute)
}
} }

View File

@ -5,23 +5,28 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
func Run(cfg *Config) error { type MetricsAgent struct {
src Source
join chan struct{}
}
func Run(cfg *Config) (*MetricsAgent, error) {
logrus.Info("starting") logrus.Info("starting")
defer logrus.Warn("stopping") defer logrus.Warn("stopping")
if cfg.Source == nil { if cfg.Source == nil {
return errors.New("no 'source' configured; exiting") return nil, errors.New("no 'source' configured; exiting")
} }
src, ok := cfg.Source.(Source) src, ok := cfg.Source.(Source)
if !ok { if !ok {
return errors.New("invalid 'source'; exiting") return nil, errors.New("invalid 'source'; exiting")
} }
events := make(chan map[string]interface{}, 1024) events := make(chan map[string]interface{}, 1024)
srcJoin, err := src.Start(events) join, err := src.Start(events)
if err != nil { if err != nil {
return errors.Wrap(err, "error starting source") return nil, errors.Wrap(err, "error starting source")
} }
go func() { go func() {
@ -33,7 +38,13 @@ func Run(cfg *Config) error {
} }
}() }()
<-srcJoin return &MetricsAgent{src, join}, nil
}
return nil
func (ma *MetricsAgent) Stop() {
ma.src.Stop()
}
func (ma *MetricsAgent) Join() {
<-ma.join
} }

View File

@ -19,13 +19,14 @@ func loadFileSourceConfig(v interface{}, opts *cf.Options) (interface{}, error)
if err := cf.Bind(cfg, submap, cf.DefaultOptions()); err != nil { if err := cf.Bind(cfg, submap, cf.DefaultOptions()); err != nil {
return nil, err return nil, err
} }
return &fileSource{cfg}, nil return &fileSource{cfg: cfg}, nil
} }
return nil, errors.New("invalid config structure for 'file' source") return nil, errors.New("invalid config structure for 'file' source")
} }
type fileSource struct { type fileSource struct {
cfg *FileSourceConfig cfg *FileSourceConfig
t *tail.Tail
} }
func (s *fileSource) Start(events chan map[string]interface{}) (chan struct{}, error) { func (s *fileSource) Start(events chan map[string]interface{}) (chan struct{}, error) {
@ -45,18 +46,29 @@ func (s *fileSource) Start(events chan map[string]interface{}) (chan struct{}, e
} }
func (s *fileSource) Stop() { func (s *fileSource) Stop() {
if err := s.t.Stop(); err != nil {
logrus.Error(err)
}
} }
func (s *fileSource) tail(events chan map[string]interface{}) { func (s *fileSource) tail(events chan map[string]interface{}) {
t, err := tail.TailFile(s.cfg.Path, tail.Config{Follow: true, ReOpen: true}) logrus.Infof("started")
defer logrus.Infof("stopped")
var err error
s.t, err = tail.TailFile(s.cfg.Path, tail.Config{
ReOpen: true,
Follow: true,
})
if err != nil { if err != nil {
logrus.Error(err) logrus.Error(err)
return return
} }
for line := range t.Lines { for line := range s.t.Lines {
event := make(map[string]interface{}) event := make(map[string]interface{})
if err := json.Unmarshal([]byte(line.Text), &event); err == nil { if err := json.Unmarshal([]byte(line.Text), &event); err == nil {
logrus.Infof("seekinfo: offset: %d", line.SeekInfo.Offset)
events <- event events <- event
} else { } else {
logrus.Errorf("error parsing line #%d: %v", line.Num, err) logrus.Errorf("error parsing line #%d: %v", line.Num, err)