diff --git a/controller/metrics.go b/controller/metrics.go index bc039a24..7ac8ab05 100644 --- a/controller/metrics.go +++ b/controller/metrics.go @@ -2,8 +2,11 @@ package controller import ( "bytes" + "context" "fmt" influxdb2 "github.com/influxdata/influxdb-client-go/v2" + "github.com/influxdata/influxdb-client-go/v2/api" + "github.com/influxdata/influxdb-client-go/v2/api/write" "github.com/openziti-test-kitchen/zrok/model" "github.com/openziti-test-kitchen/zrok/util" "github.com/openziti-test-kitchen/zrok/zrokdir" @@ -32,6 +35,7 @@ type InfluxConfig struct { type metricsAgent struct { cfg *MetricsConfig influx influxdb2.Client + writeApi api.WriteAPIBlocking zCtx ziti.Context zListener edge.Listener shutdown chan struct{} @@ -44,7 +48,10 @@ func newMetricsAgent(cfg *MetricsConfig) *metricsAgent { shutdown: make(chan struct{}), joined: make(chan struct{}), } - ma.influx = influxdb2.NewClient(cfg.Influx.Url, cfg.Influx.Token) + if cfg.Influx != nil { + ma.influx = influxdb2.NewClient(cfg.Influx.Url, cfg.Influx.Token) + ma.writeApi = ma.influx.WriteAPIBlocking(cfg.Influx.Org, cfg.Influx.Bucket) + } return ma } @@ -97,7 +104,7 @@ func (mtr *metricsAgent) listen() { return } logrus.Debugf("accepted metrics connetion from '%v'", conn.RemoteAddr()) - go newMetricsHandler(conn).run() + go newMetricsHandler(conn, mtr.writeApi).run() } } @@ -110,11 +117,12 @@ func (mtr *metricsAgent) join() { } type metricsHandler struct { - conn net.Conn + conn net.Conn + writeApi api.WriteAPIBlocking } -func newMetricsHandler(conn net.Conn) *metricsHandler { - return &metricsHandler{conn} +func newMetricsHandler(conn net.Conn, writeApi api.WriteAPIBlocking) *metricsHandler { + return &metricsHandler{conn, writeApi} } func (mh *metricsHandler) run() { @@ -134,11 +142,28 @@ func (mh *metricsHandler) run() { mtr := &model.Metrics{} if err := bson.Unmarshal(mtrBuf.Bytes(), &mtr); err == nil { out := "metrics = {\n" + var pts []*write.Point for k, v := range mtr.Sessions { + if mh.writeApi != nil { + pt := influxdb2.NewPoint("xfer", + map[string]string{"namespace": mtr.Namespace, "session": k}, + map[string]interface{}{"bytesRead": v.BytesRead, "bytesWritten": v.BytesWritten}, + time.UnixMilli(v.LastUpdate)) + pts = append(pts, pt) + } out += fmt.Sprintf("\t[%v.%v]: %v/%v (%v)\n", mtr.Namespace, k, util.BytesToSize(v.BytesRead), util.BytesToSize(v.BytesWritten), time.Duration(mtr.LocalNow-v.LastUpdate)*time.Millisecond) } out += "}" logrus.Info(out) + + if len(pts) > 0 { + if err := mh.writeApi.WritePoint(context.Background(), pts...); err == nil { + logrus.Debugf("wrote metrics to influx") + } else { + logrus.Errorf("error writing points to influxdb: %v", err) + } + } + } else { logrus.Errorf("error unmarshaling metrics: %v", err) }