diff --git a/cmd/zrok/controllerMetrics.go b/cmd/zrok/controllerMetrics.go index f29d75c9..7de987c8 100644 --- a/cmd/zrok/controllerMetrics.go +++ b/cmd/zrok/controllerMetrics.go @@ -13,8 +13,11 @@ import ( "time" ) +var metricsCmd *cobra.Command + func init() { - controllerCmd.cmd.AddCommand(newMetricsCommand().cmd) + metricsCmd = newMetricsCommand().cmd + controllerCmd.cmd.AddCommand(metricsCmd) } type metricsCommand struct { diff --git a/cmd/zrok/controllerMetricsBridge.go b/cmd/zrok/controllerMetricsBridge.go new file mode 100644 index 00000000..b67ef129 --- /dev/null +++ b/cmd/zrok/controllerMetricsBridge.go @@ -0,0 +1,61 @@ +package main + +import ( + "github.com/michaelquigley/cf" + "github.com/openziti/zrok/controller/config" + "github.com/openziti/zrok/controller/env" + "github.com/openziti/zrok/controller/metrics2" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "os" + "os/signal" + "syscall" + "time" +) + +func init() { + metricsCmd.AddCommand(newBridgeCommand().cmd) +} + +type bridgeCommand struct { + cmd *cobra.Command +} + +func newBridgeCommand() *bridgeCommand { + cmd := &cobra.Command{ + Use: "bridge ", + Short: "Start a zrok metrics bridge", + Args: cobra.ExactArgs(1), + } + command := &bridgeCommand{cmd} + cmd.Run = command.run + return command +} + +func (cmd *bridgeCommand) run(_ *cobra.Command, args []string) { + cfg, err := config.LoadConfig(args[0]) + if err != nil { + panic(err) + } + logrus.Infof(cf.Dump(cfg, env.GetCfOptions())) + + bridge, err := metrics2.NewBridge(cfg.Bridge) + if err != nil { + panic(err) + } + if _, err = bridge.Start(); err != nil { + panic(err) + } + + c := make(chan os.Signal) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + bridge.Stop() + os.Exit(0) + }() + + for { + time.Sleep(24 * 60 * time.Minute) + } +} diff --git a/controller/config/config.go b/controller/config/config.go index cdc7c2b7..19ec8833 100644 --- a/controller/config/config.go +++ b/controller/config/config.go @@ -4,6 +4,7 @@ import ( "github.com/openziti/zrok/controller/env" "github.com/openziti/zrok/controller/limits" "github.com/openziti/zrok/controller/metrics" + "github.com/openziti/zrok/controller/metrics2" "github.com/openziti/zrok/controller/zrokEdgeSdk" "time" @@ -17,6 +18,7 @@ const ConfigVersion = 2 type Config struct { V int Admin *AdminConfig + Bridge *metrics2.BridgeConfig Endpoint *EndpointConfig Email *EmailConfig Limits *limits.Config diff --git a/controller/metrics2/bridge.go b/controller/metrics2/bridge.go new file mode 100644 index 00000000..be51cbf2 --- /dev/null +++ b/controller/metrics2/bridge.go @@ -0,0 +1,77 @@ +package metrics2 + +import ( + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +type BridgeConfig struct { + Source interface{} + Sink interface{} +} + +type Bridge struct { + src ZitiEventJsonSource + srcJoin chan struct{} + snk ZitiEventJsonSink + events chan ZitiEventJson + close chan struct{} + join chan struct{} +} + +func NewBridge(cfg *BridgeConfig) (*Bridge, error) { + b := &Bridge{ + events: make(chan ZitiEventJson), + join: make(chan struct{}), + close: make(chan struct{}), + } + if v, ok := cfg.Source.(ZitiEventJsonSource); ok { + b.src = v + } else { + return nil, errors.New("invalid source type") + } + if v, ok := cfg.Sink.(ZitiEventJsonSink); ok { + b.snk = v + } else { + return nil, errors.New("invalid sink type") + } + return b, nil +} + +func (b *Bridge) Start() (join chan struct{}, err error) { + if b.srcJoin, err = b.src.Start(b.events); err != nil { + return nil, err + } + + go func() { + logrus.Info("started") + defer logrus.Info("stopped") + defer close(b.join) + + eventLoop: + for { + select { + case eventJson := <-b.events: + logrus.Info(eventJson) + if err := b.snk.Handle(eventJson); err == nil { + logrus.Info("-> %v", eventJson) + } else { + logrus.Error(err) + } + + case <-b.close: + logrus.Info("received close signal") + break eventLoop + } + } + }() + + return b.join, nil +} + +func (b *Bridge) Stop() { + b.src.Stop() + close(b.close) + <-b.srcJoin + <-b.join +} diff --git a/controller/metrics2/fileSource.go b/controller/metrics2/fileSource.go index 71a02939..9ca527a5 100644 --- a/controller/metrics2/fileSource.go +++ b/controller/metrics2/fileSource.go @@ -50,7 +50,7 @@ func (s *fileSource) Start(events chan ZitiEventJson) (join chan struct{}, err e ptr, err := s.readPtr() if err != nil { - return nil, errors.Wrap(err, "error reading pointer") + logrus.Errorf("error reading pointer: %v", err) } logrus.Infof("retrieved stored position pointer at '%d'", ptr) @@ -80,7 +80,7 @@ func (s *fileSource) tail(ptr int64, events chan ZitiEventJson) { Location: &tail.SeekInfo{Offset: ptr}, }) if err != nil { - logrus.Error(err) + logrus.Errorf("error starting tail: %v", err) return } @@ -109,10 +109,10 @@ func (s *fileSource) readPtr() (int64, error) { ptr = int64(binary.LittleEndian.Uint64(buf)) return ptr, nil } else { - return -1, errors.Wrapf(err, "error reading pointer (%d): %v", n, err) + return 0, errors.Wrapf(err, "error reading pointer (%d): %v", n, err) } } else { - return -1, errors.Wrapf(err, "error seeking pointer (%d): %v", n, err) + return 0, errors.Wrapf(err, "error seeking pointer (%d): %v", n, err) } }