influx for canary metrics, working (#948)

This commit is contained in:
Michael Quigley 2025-04-22 14:12:16 -04:00
parent 825e5da3d6
commit ce67253b75
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
5 changed files with 115 additions and 11 deletions

1
.gitignore vendored
View File

@ -4,6 +4,7 @@
*.db
/automated-release-build/
etc/dev.yml
etc/dev-canary.yml
etc/dev-frontend.yml
# Dependencies

31
canary/config.go Normal file
View File

@ -0,0 +1,31 @@
package canary
import (
"github.com/michaelquigley/cf"
"github.com/openziti/zrok/controller/metrics"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const ConfigVersion = 1
type Config struct {
V int
Influx *metrics.InfluxConfig
}
func DefaultConfig() *Config {
return &Config{}
}
func LoadConfig(path string) (*Config, error) {
cfg := DefaultConfig()
if err := cf.BindYaml(cfg, path, cf.DefaultOptions()); err != nil {
return nil, errors.Wrapf(err, "error loading canary configuration '%v'", path)
}
if cfg.V != ConfigVersion {
return nil, errors.Errorf("expecting canary configuration version '%v', got '%v'", ConfigVersion, cfg.V)
}
logrus.Info(cf.Dump(cfg, cf.DefaultOptions()))
return cfg, nil
}

View File

@ -20,6 +20,7 @@ type EnablerOptions struct {
type Enabler struct {
Id uint
Done chan struct{}
opt *EnablerOptions
root env_core.Root
Environments chan *sdk.Environment
@ -28,6 +29,7 @@ type Enabler struct {
func NewEnabler(id uint, opt *EnablerOptions, root env_core.Root) *Enabler {
return &Enabler{
Id: id,
Done: make(chan struct{}),
opt: opt,
root: root,
Environments: make(chan *sdk.Environment, opt.Iterations),
@ -36,6 +38,7 @@ func NewEnabler(id uint, opt *EnablerOptions, root env_core.Root) *Enabler {
func (e *Enabler) Run() {
defer close(e.Environments)
defer close(e.Done)
defer logrus.Infof("#%d stopping", e.Id)
e.dwell()
e.iterate()
@ -51,6 +54,7 @@ func (e *Enabler) dwell() {
}
func (e *Enabler) iterate() {
defer logrus.Info("done")
for i := uint(0); i < e.opt.Iterations; i++ {
snapshot := NewSnapshot("enable", e.Id, uint64(i))

View File

@ -3,6 +3,7 @@ package canary
import (
"context"
"fmt"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/openziti/zrok/util"
"github.com/sirupsen/logrus"
"slices"
@ -37,14 +38,16 @@ type SnapshotCollector struct {
InputQueue chan *Snapshot
Closed chan struct{}
ctx context.Context
cfg *Config
snapshots map[string][]*Snapshot
}
func NewSnapshotCollector(ctx context.Context) *SnapshotCollector {
func NewSnapshotCollector(ctx context.Context, cfg *Config) *SnapshotCollector {
return &SnapshotCollector{
InputQueue: make(chan *Snapshot),
Closed: make(chan struct{}),
ctx: ctx,
cfg: cfg,
snapshots: make(map[string][]*Snapshot),
}
}
@ -69,3 +72,31 @@ func (sc *SnapshotCollector) Run() {
}
}
}
func (sc *SnapshotCollector) Store() error {
idb := influxdb2.NewClient(sc.cfg.Influx.Url, sc.cfg.Influx.Token)
writeApi := idb.WriteAPIBlocking(sc.cfg.Influx.Org, sc.cfg.Influx.Bucket)
for key, arr := range sc.snapshots {
for _, snapshot := range arr {
tags := map[string]string{
"instance": fmt.Sprintf("%d", snapshot.Instance),
"iteration": fmt.Sprintf("%d", snapshot.Iteration),
"ok": fmt.Sprintf("%t", snapshot.Ok),
}
if snapshot.Error != nil {
tags["error"] = snapshot.Error.Error()
}
pt := influxdb2.NewPoint(snapshot.Operation, tags, map[string]interface{}{
"duration": snapshot.Completed.Sub(snapshot.Started),
"size": snapshot.Size,
}, snapshot.Started)
if err := writeApi.WritePoint(context.Background(), pt); err != nil {
return err
}
}
logrus.Infof("wrote '%v' points for '%v'", len(arr), key)
}
idb.Close()
logrus.Infof("complete")
return nil
}

View File

@ -1,6 +1,7 @@
package main
import (
"context"
"github.com/openziti/zrok/canary"
"github.com/openziti/zrok/environment"
"github.com/sirupsen/logrus"
@ -14,16 +15,17 @@ func init() {
}
type testCanaryEnabler struct {
cmd *cobra.Command
enablers uint
iterations uint
minPreDelay time.Duration
maxPreDelay time.Duration
minDwell time.Duration
maxDwell time.Duration
minPacing time.Duration
maxPacing time.Duration
skipDisable bool
cmd *cobra.Command
enablers uint
iterations uint
minPreDelay time.Duration
maxPreDelay time.Duration
minDwell time.Duration
maxDwell time.Duration
minPacing time.Duration
maxPacing time.Duration
skipDisable bool
canaryConfig string
}
func newTestCanaryEnabler() *testCanaryEnabler {
@ -41,6 +43,7 @@ func newTestCanaryEnabler() *testCanaryEnabler {
cmd.Flags().DurationVar(&command.minPacing, "min-pacing", 0, "Minimum pacing time")
cmd.Flags().DurationVar(&command.maxPacing, "max-pacing", 0, "Maximum pacing time")
cmd.Flags().BoolVar(&command.skipDisable, "skip-disable", false, "Disable (clean up) enabled environments")
cmd.Flags().StringVar(&command.canaryConfig, "canary-config", "", "Path to canary configuration file")
return command
}
@ -54,6 +57,19 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) {
panic(err)
}
var sc *canary.SnapshotCollector
var scCtx context.Context
var scCancel context.CancelFunc
if cmd.canaryConfig != "" {
cfg, err := canary.LoadConfig(cmd.canaryConfig)
if err != nil {
panic(err)
}
scCtx, scCancel = context.WithCancel(context.Background())
sc = canary.NewSnapshotCollector(scCtx, cfg)
go sc.Run()
}
var enablers []*canary.Enabler
for i := uint(0); i < cmd.enablers; i++ {
preDelay := cmd.maxPreDelay.Milliseconds()
@ -70,6 +86,9 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) {
MinPacing: cmd.minPacing,
MaxPacing: cmd.maxPacing,
}
if sc != nil {
enablerOpts.SnapshotQueue = sc.InputQueue
}
enabler := canary.NewEnabler(i, enablerOpts, root)
enablers = append(enablers, enabler)
go enabler.Run()
@ -81,11 +100,15 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) {
disablerOpts := &canary.DisablerOptions{
Environments: enablers[i].Environments,
}
if sc != nil {
disablerOpts.SnapshotQueue = sc.InputQueue
}
disabler := canary.NewDisabler(i, disablerOpts, root)
disablers = append(disablers, disabler)
go disabler.Run()
}
for _, disabler := range disablers {
logrus.Infof("waiting for disabler #%d", disabler.Id)
<-disabler.Done
}
@ -103,4 +126,18 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) {
}
}
}
for _, enabler := range enablers {
<-enabler.Done
}
if sc != nil {
scCancel()
<-sc.Closed
if err := sc.Store(); err != nil {
panic(err)
}
}
logrus.Infof("complete")
}