mirror of
https://github.com/openziti/zrok.git
synced 2025-08-14 10:08:26 +02:00
74 lines
1.8 KiB
Go
74 lines
1.8 KiB
Go
package canary
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
|
"github.com/influxdata/influxdb-client-go/v2/api"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type SnapshotStreamer struct {
|
|
InputQueue chan *Snapshot
|
|
Closed chan struct{}
|
|
ctx context.Context
|
|
cfg *Config
|
|
ifxClient influxdb2.Client
|
|
ifxWriteApi api.WriteAPIBlocking
|
|
}
|
|
|
|
func NewSnapshotStreamer(ctx context.Context, cfg *Config) (*SnapshotStreamer, error) {
|
|
out := &SnapshotStreamer{
|
|
InputQueue: make(chan *Snapshot),
|
|
Closed: make(chan struct{}),
|
|
ctx: ctx,
|
|
cfg: cfg,
|
|
}
|
|
if cfg.Influx != nil {
|
|
out.ifxClient = influxdb2.NewClient(cfg.Influx.Url, cfg.Influx.Token)
|
|
out.ifxWriteApi = out.ifxClient.WriteAPIBlocking(cfg.Influx.Org, cfg.Influx.Bucket)
|
|
} else {
|
|
return nil, errors.New("missing influx configuration")
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (ss *SnapshotStreamer) Run() {
|
|
defer close(ss.Closed)
|
|
defer ss.ifxClient.Close()
|
|
defer logrus.Info("stoping")
|
|
logrus.Info("starting")
|
|
|
|
for {
|
|
select {
|
|
case <-ss.ctx.Done():
|
|
return
|
|
|
|
case snapshot := <-ss.InputQueue:
|
|
if err := ss.store(snapshot); err != nil {
|
|
logrus.Errorf("error storing snapshot: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (ss *SnapshotStreamer) store(snapshot *Snapshot) error {
|
|
tags := map[string]string{
|
|
"instance": fmt.Sprintf("%d", snapshot.Instance),
|
|
"iteration": fmt.Sprintf("%d", snapshot.Iteration),
|
|
"ok": fmt.Sprintf("%t", snapshot.Ok),
|
|
}
|
|
if snapshot.Error != nil {
|
|
tags["error"] = snapshot.Error.Error()
|
|
}
|
|
pt := influxdb2.NewPoint(snapshot.Operation, tags, map[string]interface{}{
|
|
"duration": snapshot.Completed.Sub(snapshot.Started).Milliseconds(),
|
|
"size": snapshot.Size,
|
|
}, snapshot.Started)
|
|
if err := ss.ifxWriteApi.WritePoint(context.Background(), pt); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|