mirror of
https://github.com/openziti/zrok.git
synced 2025-04-10 18:38:18 +02:00
parent
4559036ad8
commit
1542e0131f
@ -2,8 +2,11 @@ package controller
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
||||||
|
"github.com/influxdata/influxdb-client-go/v2/api"
|
||||||
|
"github.com/influxdata/influxdb-client-go/v2/api/write"
|
||||||
"github.com/openziti-test-kitchen/zrok/model"
|
"github.com/openziti-test-kitchen/zrok/model"
|
||||||
"github.com/openziti-test-kitchen/zrok/util"
|
"github.com/openziti-test-kitchen/zrok/util"
|
||||||
"github.com/openziti-test-kitchen/zrok/zrokdir"
|
"github.com/openziti-test-kitchen/zrok/zrokdir"
|
||||||
@ -32,6 +35,7 @@ type InfluxConfig struct {
|
|||||||
type metricsAgent struct {
|
type metricsAgent struct {
|
||||||
cfg *MetricsConfig
|
cfg *MetricsConfig
|
||||||
influx influxdb2.Client
|
influx influxdb2.Client
|
||||||
|
writeApi api.WriteAPIBlocking
|
||||||
zCtx ziti.Context
|
zCtx ziti.Context
|
||||||
zListener edge.Listener
|
zListener edge.Listener
|
||||||
shutdown chan struct{}
|
shutdown chan struct{}
|
||||||
@ -44,7 +48,10 @@ func newMetricsAgent(cfg *MetricsConfig) *metricsAgent {
|
|||||||
shutdown: make(chan struct{}),
|
shutdown: make(chan struct{}),
|
||||||
joined: make(chan struct{}),
|
joined: make(chan struct{}),
|
||||||
}
|
}
|
||||||
ma.influx = influxdb2.NewClient(cfg.Influx.Url, cfg.Influx.Token)
|
if cfg.Influx != nil {
|
||||||
|
ma.influx = influxdb2.NewClient(cfg.Influx.Url, cfg.Influx.Token)
|
||||||
|
ma.writeApi = ma.influx.WriteAPIBlocking(cfg.Influx.Org, cfg.Influx.Bucket)
|
||||||
|
}
|
||||||
return ma
|
return ma
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -97,7 +104,7 @@ func (mtr *metricsAgent) listen() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
logrus.Debugf("accepted metrics connetion from '%v'", conn.RemoteAddr())
|
logrus.Debugf("accepted metrics connetion from '%v'", conn.RemoteAddr())
|
||||||
go newMetricsHandler(conn).run()
|
go newMetricsHandler(conn, mtr.writeApi).run()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -110,11 +117,12 @@ func (mtr *metricsAgent) join() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type metricsHandler struct {
|
type metricsHandler struct {
|
||||||
conn net.Conn
|
conn net.Conn
|
||||||
|
writeApi api.WriteAPIBlocking
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMetricsHandler(conn net.Conn) *metricsHandler {
|
func newMetricsHandler(conn net.Conn, writeApi api.WriteAPIBlocking) *metricsHandler {
|
||||||
return &metricsHandler{conn}
|
return &metricsHandler{conn, writeApi}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mh *metricsHandler) run() {
|
func (mh *metricsHandler) run() {
|
||||||
@ -134,11 +142,28 @@ func (mh *metricsHandler) run() {
|
|||||||
mtr := &model.Metrics{}
|
mtr := &model.Metrics{}
|
||||||
if err := bson.Unmarshal(mtrBuf.Bytes(), &mtr); err == nil {
|
if err := bson.Unmarshal(mtrBuf.Bytes(), &mtr); err == nil {
|
||||||
out := "metrics = {\n"
|
out := "metrics = {\n"
|
||||||
|
var pts []*write.Point
|
||||||
for k, v := range mtr.Sessions {
|
for k, v := range mtr.Sessions {
|
||||||
|
if mh.writeApi != nil {
|
||||||
|
pt := influxdb2.NewPoint("xfer",
|
||||||
|
map[string]string{"namespace": mtr.Namespace, "session": k},
|
||||||
|
map[string]interface{}{"bytesRead": v.BytesRead, "bytesWritten": v.BytesWritten},
|
||||||
|
time.UnixMilli(v.LastUpdate))
|
||||||
|
pts = append(pts, pt)
|
||||||
|
}
|
||||||
out += fmt.Sprintf("\t[%v.%v]: %v/%v (%v)\n", mtr.Namespace, k, util.BytesToSize(v.BytesRead), util.BytesToSize(v.BytesWritten), time.Duration(mtr.LocalNow-v.LastUpdate)*time.Millisecond)
|
out += fmt.Sprintf("\t[%v.%v]: %v/%v (%v)\n", mtr.Namespace, k, util.BytesToSize(v.BytesRead), util.BytesToSize(v.BytesWritten), time.Duration(mtr.LocalNow-v.LastUpdate)*time.Millisecond)
|
||||||
}
|
}
|
||||||
out += "}"
|
out += "}"
|
||||||
logrus.Info(out)
|
logrus.Info(out)
|
||||||
|
|
||||||
|
if len(pts) > 0 {
|
||||||
|
if err := mh.writeApi.WritePoint(context.Background(), pts...); err == nil {
|
||||||
|
logrus.Debugf("wrote metrics to influx")
|
||||||
|
} else {
|
||||||
|
logrus.Errorf("error writing points to influxdb: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
logrus.Errorf("error unmarshaling metrics: %v", err)
|
logrus.Errorf("error unmarshaling metrics: %v", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user