zrok/controller/metrics.go

182 lines
4.4 KiB
Go
Raw Normal View History

2022-10-12 18:42:05 +02:00
package controller
2022-10-13 20:23:52 +02:00
import (
2022-10-14 21:49:59 +02:00
"bytes"
2022-10-17 21:54:22 +02:00
"context"
2022-10-14 22:13:21 +02:00
"fmt"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
2022-10-17 21:54:22 +02:00
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/write"
2022-10-13 22:18:18 +02:00
"github.com/openziti/sdk-golang/ziti"
"github.com/openziti/sdk-golang/ziti/config"
"github.com/openziti/sdk-golang/ziti/edge"
"github.com/openziti/zrok/model"
"github.com/openziti/zrok/util"
"github.com/openziti/zrok/zrokdir"
2022-10-13 22:18:18 +02:00
"github.com/pkg/errors"
2022-10-13 20:23:52 +02:00
"github.com/sirupsen/logrus"
2022-10-14 22:13:21 +02:00
"gopkg.in/mgo.v2/bson"
2022-10-13 22:18:18 +02:00
"net"
"time"
2022-10-13 20:23:52 +02:00
)
type metricsAgent struct {
writeApi api.WriteAPIBlocking
metricsQueue chan *model.Metrics
envCache map[string]*envCacheEntry
zCtx ziti.Context
zListener edge.Listener
shutdown chan struct{}
joined chan struct{}
2022-10-13 20:23:52 +02:00
}
type envCacheEntry struct {
env string
lastAccess time.Time
}
func newMetricsAgent() *metricsAgent {
ma := &metricsAgent{
metricsQueue: make(chan *model.Metrics, 1024),
envCache: make(map[string]*envCacheEntry),
shutdown: make(chan struct{}),
joined: make(chan struct{}),
}
if idb != nil {
ma.writeApi = idb.WriteAPIBlocking(cfg.Influx.Org, cfg.Influx.Bucket)
2022-10-17 21:54:22 +02:00
}
return ma
2022-10-13 20:23:52 +02:00
}
func (ma *metricsAgent) run() {
2022-10-13 20:23:52 +02:00
logrus.Info("starting")
defer logrus.Info("exiting")
defer close(ma.joined)
2022-10-13 20:23:52 +02:00
if err := ma.bindService(); err != nil {
2022-10-13 22:18:18 +02:00
logrus.Errorf("error binding metrics service: %v", err)
return
}
work:
for {
select {
case <-ma.shutdown:
break work
case m := <-ma.metricsQueue:
if err := ma.processMetrics(m); err != nil {
logrus.Errorf("error processing metrics: %v", err)
}
}
}
2022-10-13 22:18:18 +02:00
if err := ma.zListener.Close(); err != nil {
2022-10-13 22:18:18 +02:00
logrus.Errorf("error closing metrics service listener: %v", err)
}
}
func (ma *metricsAgent) bindService() error {
2022-10-13 22:18:18 +02:00
zif, err := zrokdir.ZitiIdentityFile("ctrl")
if err != nil {
return errors.Wrap(err, "error getting 'ctrl' identity")
}
zCfg, err := config.NewFromFile(zif)
if err != nil {
return errors.Wrap(err, "error loading 'ctrl' identity")
}
ma.zCtx = ziti.NewContextWithConfig(zCfg)
2022-10-13 22:18:18 +02:00
opts := &ziti.ListenOptions{
ConnectTimeout: 5 * time.Minute,
MaxConnections: 1024,
}
ma.zListener, err = ma.zCtx.ListenWithOptions(cfg.Metrics.ServiceName, opts)
2022-10-13 22:18:18 +02:00
if err != nil {
return errors.Wrapf(err, "error listening for metrics on '%v'", cfg.Metrics.ServiceName)
2022-10-13 22:18:18 +02:00
}
go ma.listen()
2022-10-13 22:18:18 +02:00
return nil
}
func (ma *metricsAgent) listen() {
2022-10-14 21:49:59 +02:00
logrus.Info("started")
defer logrus.Info("exited")
2022-10-13 22:18:18 +02:00
for {
conn, err := ma.zListener.Accept()
2022-10-13 22:18:18 +02:00
if err != nil {
logrus.Errorf("error accepting: %v", err)
return
}
2022-10-14 22:13:21 +02:00
logrus.Debugf("accepted metrics connetion from '%v'", conn.RemoteAddr())
go newMetricsHandler(conn, ma.metricsQueue).run()
2022-10-13 22:18:18 +02:00
}
}
func (ma *metricsAgent) processMetrics(m *model.Metrics) error {
var pts []*write.Point
if len(m.Sessions) > 0 {
out := "metrics = {\n"
for k, v := range m.Sessions {
if ma.writeApi != nil {
pt := influxdb2.NewPoint("xfer",
map[string]string{"namespace": m.Namespace, "share": k},
map[string]interface{}{"bytesRead": v.BytesRead, "bytesWritten": v.BytesWritten},
time.UnixMilli(v.LastUpdate))
pts = append(pts, pt)
}
out += fmt.Sprintf("\t[%v.%v]: %v/%v (%v)\n", m.Namespace, k, util.BytesToSize(v.BytesRead), util.BytesToSize(v.BytesWritten), time.Since(time.UnixMilli(v.LastUpdate)))
}
out += "}"
logrus.Info(out)
}
if len(pts) > 0 {
if err := ma.writeApi.WritePoint(context.Background(), pts...); err == nil {
logrus.Debugf("wrote metrics to influx")
} else {
return err
}
}
return nil
}
func (ma *metricsAgent) stop() {
close(ma.shutdown)
}
func (ma *metricsAgent) join() {
<-ma.joined
2022-10-13 20:23:52 +02:00
}
2022-10-13 22:18:18 +02:00
type metricsHandler struct {
conn net.Conn
metricsQueue chan *model.Metrics
2022-10-13 22:18:18 +02:00
}
func newMetricsHandler(conn net.Conn, metricsQueue chan *model.Metrics) *metricsHandler {
return &metricsHandler{conn, metricsQueue}
2022-10-13 22:18:18 +02:00
}
func (mh *metricsHandler) run() {
2022-10-14 22:13:21 +02:00
logrus.Debugf("handling metrics connection: %v", mh.conn.RemoteAddr())
2022-10-14 21:49:59 +02:00
var mtrBuf bytes.Buffer
buf := make([]byte, 4096)
for {
n, err := mh.conn.Read(buf)
if err != nil {
break
}
mtrBuf.Write(buf[:n])
}
2022-10-13 22:18:18 +02:00
if err := mh.conn.Close(); err != nil {
logrus.Errorf("error closing metrics connection")
}
m := &model.Metrics{}
if err := bson.Unmarshal(mtrBuf.Bytes(), &m); err == nil {
mh.metricsQueue <- m
2022-10-14 21:49:59 +02:00
} else {
logrus.Errorf("error unmarshaling metrics: %v", err)
}
2022-10-13 22:18:18 +02:00
}