From ce67253b750646975469be8eb856d35ea423fecc Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Tue, 22 Apr 2025 14:12:16 -0400 Subject: [PATCH] influx for canary metrics, working (#948) --- .gitignore | 1 + canary/config.go | 31 +++++++++++++++++++ canary/enabler.go | 4 +++ canary/metrics.go | 33 +++++++++++++++++++- cmd/zrok/testCanaryEnabler.go | 57 +++++++++++++++++++++++++++++------ 5 files changed, 115 insertions(+), 11 deletions(-) create mode 100644 canary/config.go diff --git a/.gitignore b/.gitignore index 46267f3f..8a3caa3d 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ *.db /automated-release-build/ etc/dev.yml +etc/dev-canary.yml etc/dev-frontend.yml # Dependencies diff --git a/canary/config.go b/canary/config.go new file mode 100644 index 00000000..36a24e61 --- /dev/null +++ b/canary/config.go @@ -0,0 +1,31 @@ +package canary + +import ( + "github.com/michaelquigley/cf" + "github.com/openziti/zrok/controller/metrics" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +const ConfigVersion = 1 + +type Config struct { + V int + Influx *metrics.InfluxConfig +} + +func DefaultConfig() *Config { + return &Config{} +} + +func LoadConfig(path string) (*Config, error) { + cfg := DefaultConfig() + if err := cf.BindYaml(cfg, path, cf.DefaultOptions()); err != nil { + return nil, errors.Wrapf(err, "error loading canary configuration '%v'", path) + } + if cfg.V != ConfigVersion { + return nil, errors.Errorf("expecting canary configuration version '%v', got '%v'", ConfigVersion, cfg.V) + } + logrus.Info(cf.Dump(cfg, cf.DefaultOptions())) + return cfg, nil +} diff --git a/canary/enabler.go b/canary/enabler.go index 43dbe1fd..1841570c 100644 --- a/canary/enabler.go +++ b/canary/enabler.go @@ -20,6 +20,7 @@ type EnablerOptions struct { type Enabler struct { Id uint + Done chan struct{} opt *EnablerOptions root env_core.Root Environments chan *sdk.Environment @@ -28,6 +29,7 @@ type Enabler struct { func NewEnabler(id uint, opt *EnablerOptions, root env_core.Root) *Enabler { return &Enabler{ Id: id, + Done: make(chan struct{}), opt: opt, root: root, Environments: make(chan *sdk.Environment, opt.Iterations), @@ -36,6 +38,7 @@ func NewEnabler(id uint, opt *EnablerOptions, root env_core.Root) *Enabler { func (e *Enabler) Run() { defer close(e.Environments) + defer close(e.Done) defer logrus.Infof("#%d stopping", e.Id) e.dwell() e.iterate() @@ -51,6 +54,7 @@ func (e *Enabler) dwell() { } func (e *Enabler) iterate() { + defer logrus.Info("done") for i := uint(0); i < e.opt.Iterations; i++ { snapshot := NewSnapshot("enable", e.Id, uint64(i)) diff --git a/canary/metrics.go b/canary/metrics.go index 9d1f89b4..5589efd4 100644 --- a/canary/metrics.go +++ b/canary/metrics.go @@ -3,6 +3,7 @@ package canary import ( "context" "fmt" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" "github.com/openziti/zrok/util" "github.com/sirupsen/logrus" "slices" @@ -37,14 +38,16 @@ type SnapshotCollector struct { InputQueue chan *Snapshot Closed chan struct{} ctx context.Context + cfg *Config snapshots map[string][]*Snapshot } -func NewSnapshotCollector(ctx context.Context) *SnapshotCollector { +func NewSnapshotCollector(ctx context.Context, cfg *Config) *SnapshotCollector { return &SnapshotCollector{ InputQueue: make(chan *Snapshot), Closed: make(chan struct{}), ctx: ctx, + cfg: cfg, snapshots: make(map[string][]*Snapshot), } } @@ -69,3 +72,31 @@ func (sc *SnapshotCollector) Run() { } } } + +func (sc *SnapshotCollector) Store() error { + idb := influxdb2.NewClient(sc.cfg.Influx.Url, sc.cfg.Influx.Token) + writeApi := idb.WriteAPIBlocking(sc.cfg.Influx.Org, sc.cfg.Influx.Bucket) + for key, arr := range sc.snapshots { + for _, snapshot := range arr { + tags := map[string]string{ + "instance": fmt.Sprintf("%d", snapshot.Instance), + "iteration": fmt.Sprintf("%d", snapshot.Iteration), + "ok": fmt.Sprintf("%t", snapshot.Ok), + } + if snapshot.Error != nil { + tags["error"] = snapshot.Error.Error() + } + pt := influxdb2.NewPoint(snapshot.Operation, tags, map[string]interface{}{ + "duration": snapshot.Completed.Sub(snapshot.Started), + "size": snapshot.Size, + }, snapshot.Started) + if err := writeApi.WritePoint(context.Background(), pt); err != nil { + return err + } + } + logrus.Infof("wrote '%v' points for '%v'", len(arr), key) + } + idb.Close() + logrus.Infof("complete") + return nil +} diff --git a/cmd/zrok/testCanaryEnabler.go b/cmd/zrok/testCanaryEnabler.go index 8fa6cda0..0a8111cf 100644 --- a/cmd/zrok/testCanaryEnabler.go +++ b/cmd/zrok/testCanaryEnabler.go @@ -1,6 +1,7 @@ package main import ( + "context" "github.com/openziti/zrok/canary" "github.com/openziti/zrok/environment" "github.com/sirupsen/logrus" @@ -14,16 +15,17 @@ func init() { } type testCanaryEnabler struct { - cmd *cobra.Command - enablers uint - iterations uint - minPreDelay time.Duration - maxPreDelay time.Duration - minDwell time.Duration - maxDwell time.Duration - minPacing time.Duration - maxPacing time.Duration - skipDisable bool + cmd *cobra.Command + enablers uint + iterations uint + minPreDelay time.Duration + maxPreDelay time.Duration + minDwell time.Duration + maxDwell time.Duration + minPacing time.Duration + maxPacing time.Duration + skipDisable bool + canaryConfig string } func newTestCanaryEnabler() *testCanaryEnabler { @@ -41,6 +43,7 @@ func newTestCanaryEnabler() *testCanaryEnabler { cmd.Flags().DurationVar(&command.minPacing, "min-pacing", 0, "Minimum pacing time") cmd.Flags().DurationVar(&command.maxPacing, "max-pacing", 0, "Maximum pacing time") cmd.Flags().BoolVar(&command.skipDisable, "skip-disable", false, "Disable (clean up) enabled environments") + cmd.Flags().StringVar(&command.canaryConfig, "canary-config", "", "Path to canary configuration file") return command } @@ -54,6 +57,19 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) { panic(err) } + var sc *canary.SnapshotCollector + var scCtx context.Context + var scCancel context.CancelFunc + if cmd.canaryConfig != "" { + cfg, err := canary.LoadConfig(cmd.canaryConfig) + if err != nil { + panic(err) + } + scCtx, scCancel = context.WithCancel(context.Background()) + sc = canary.NewSnapshotCollector(scCtx, cfg) + go sc.Run() + } + var enablers []*canary.Enabler for i := uint(0); i < cmd.enablers; i++ { preDelay := cmd.maxPreDelay.Milliseconds() @@ -70,6 +86,9 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) { MinPacing: cmd.minPacing, MaxPacing: cmd.maxPacing, } + if sc != nil { + enablerOpts.SnapshotQueue = sc.InputQueue + } enabler := canary.NewEnabler(i, enablerOpts, root) enablers = append(enablers, enabler) go enabler.Run() @@ -81,11 +100,15 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) { disablerOpts := &canary.DisablerOptions{ Environments: enablers[i].Environments, } + if sc != nil { + disablerOpts.SnapshotQueue = sc.InputQueue + } disabler := canary.NewDisabler(i, disablerOpts, root) disablers = append(disablers, disabler) go disabler.Run() } for _, disabler := range disablers { + logrus.Infof("waiting for disabler #%d", disabler.Id) <-disabler.Done } @@ -103,4 +126,18 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) { } } } + + for _, enabler := range enablers { + <-enabler.Done + } + + if sc != nil { + scCancel() + <-sc.Closed + if err := sc.Store(); err != nil { + panic(err) + } + } + + logrus.Infof("complete") }