From 85b178ab82c9e4c792157b6c444da00433d27b10 Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Wed, 30 Apr 2025 17:04:06 -0400 Subject: [PATCH] no more collector (#954) --- canary/snapshotCollector.go | 77 ------------------------------------- 1 file changed, 77 deletions(-) delete mode 100644 canary/snapshotCollector.go diff --git a/canary/snapshotCollector.go b/canary/snapshotCollector.go deleted file mode 100644 index 8fe7071e..00000000 --- a/canary/snapshotCollector.go +++ /dev/null @@ -1,77 +0,0 @@ -package canary - -import ( - "context" - "fmt" - influxdb2 "github.com/influxdata/influxdb-client-go/v2" - "github.com/sirupsen/logrus" - "slices" - "sort" -) - -type SnapshotCollector struct { - InputQueue chan *Snapshot - Closed chan struct{} - ctx context.Context - cfg *Config - snapshots map[string][]*Snapshot -} - -func NewSnapshotCollector(ctx context.Context, cfg *Config) *SnapshotCollector { - return &SnapshotCollector{ - InputQueue: make(chan *Snapshot), - Closed: make(chan struct{}), - ctx: ctx, - cfg: cfg, - snapshots: make(map[string][]*Snapshot), - } -} - -func (sc *SnapshotCollector) Run() { - defer close(sc.Closed) - defer logrus.Info("stopping") - logrus.Info("starting") - for { - select { - case <-sc.ctx.Done(): - return - - case snapshot := <-sc.InputQueue: - var snapshots []*Snapshot - if v, ok := sc.snapshots[snapshot.Operation]; ok { - snapshots = v - } - i := sort.Search(len(snapshots), func(i int) bool { return snapshots[i].Completed.After(snapshot.Started) }) - snapshots = slices.Insert(snapshots, i, snapshot) - sc.snapshots[snapshot.Operation] = snapshots - } - } -} - -func (sc *SnapshotCollector) Store() error { - idb := influxdb2.NewClient(sc.cfg.Influx.Url, sc.cfg.Influx.Token) - writeApi := idb.WriteAPIBlocking(sc.cfg.Influx.Org, sc.cfg.Influx.Bucket) - for key, arr := range sc.snapshots { - for _, snapshot := range arr { - tags := map[string]string{ - "instance": fmt.Sprintf("%d", snapshot.Instance), - "iteration": fmt.Sprintf("%d", snapshot.Iteration), - "ok": fmt.Sprintf("%t", snapshot.Ok), - } - if snapshot.Error != nil { - tags["error"] = snapshot.Error.Error() - } - pt := influxdb2.NewPoint(snapshot.Operation, tags, map[string]interface{}{ - "duration": snapshot.Completed.Sub(snapshot.Started).Milliseconds(), - "size": snapshot.Size, - }, snapshot.Started) - if err := writeApi.WritePoint(context.Background(), pt); err != nil { - return err - } - } - logrus.Infof("wrote '%v' points for '%v'", len(arr), key) - } - idb.Close() - logrus.Infof("complete") - return nil -}