zrok/controller/metrics/fileSource.go

108 lines
2.4 KiB
Go
Raw Normal View History

2023-03-03 19:31:57 +01:00
package metrics
2023-03-03 19:51:10 +01:00
import (
"encoding/binary"
"encoding/json"
2023-03-03 19:51:10 +01:00
"github.com/michaelquigley/cf"
"github.com/nxadm/tail"
2023-03-03 19:51:10 +01:00
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
2023-03-03 19:51:10 +01:00
"os"
)
2023-03-03 19:31:57 +01:00
type FileSourceConfig struct {
Path string
IndexPath string
2023-03-03 19:31:57 +01:00
}
2023-03-06 21:51:23 +01:00
func loadFileSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) {
2023-03-03 19:51:10 +01:00
if submap, ok := v.(map[string]interface{}); ok {
cfg := &FileSourceConfig{}
if err := cf.Bind(cfg, submap, cf.DefaultOptions()); err != nil {
return nil, err
}
return &fileSource{cfg: cfg}, nil
2023-03-03 19:51:10 +01:00
}
return nil, errors.New("invalid config structure for 'file' source")
2023-03-03 19:31:57 +01:00
}
2023-03-03 19:51:10 +01:00
type fileSource struct {
cfg *FileSourceConfig
t *tail.Tail
2023-03-03 19:51:10 +01:00
}
2023-03-03 19:31:57 +01:00
func (s *fileSource) Start(events chan map[string]interface{}) (join chan struct{}, err error) {
2023-03-03 19:51:10 +01:00
f, err := os.Open(s.cfg.Path)
if err != nil {
return nil, errors.Wrapf(err, "error opening '%v'", s.cfg.Path)
}
_ = f.Close()
idxF, err := os.OpenFile(s.indexPath(), os.O_CREATE|os.O_RDWR, os.ModePerm)
if err != nil {
return nil, errors.Wrapf(err, "error opening '%v'", s.indexPath())
}
pos := int64(0)
posBuf := make([]byte, 8)
if n, err := idxF.Read(posBuf); err == nil && n == 8 {
pos = int64(binary.LittleEndian.Uint64(posBuf))
logrus.Infof("recovered stored position: %d", pos)
}
join = make(chan struct{})
2023-03-03 19:51:10 +01:00
go func() {
s.tail(pos, events, idxF)
close(join)
2023-03-03 19:51:10 +01:00
}()
return join, nil
2023-03-03 19:51:10 +01:00
}
func (s *fileSource) Stop() {
if err := s.t.Stop(); err != nil {
logrus.Error(err)
}
2023-03-03 19:31:57 +01:00
}
func (s *fileSource) tail(pos int64, events chan map[string]interface{}, idxF *os.File) {
logrus.Infof("started")
defer logrus.Infof("stopped")
posBuf := make([]byte, 8)
var err error
s.t, err = tail.TailFile(s.cfg.Path, tail.Config{
ReOpen: true,
Follow: true,
Location: &tail.SeekInfo{Offset: pos},
})
if err != nil {
logrus.Error(err)
return
}
for line := range s.t.Lines {
event := make(map[string]interface{})
if err := json.Unmarshal([]byte(line.Text), &event); err == nil {
binary.LittleEndian.PutUint64(posBuf, uint64(line.SeekInfo.Offset))
if n, err := idxF.Seek(0, 0); err == nil && n == 0 {
if n, err := idxF.Write(posBuf); err != nil || n != 8 {
logrus.Errorf("error writing index (%d): %v", n, err)
}
}
events <- event
} else {
logrus.Errorf("error parsing line #%d: %v", line.Num, err)
}
}
}
func (s *fileSource) indexPath() string {
if s.cfg.IndexPath == "" {
return s.cfg.Path + ".idx"
} else {
return s.cfg.IndexPath
}
}