zrok/endpoints/frontend/metrics.go
2022-10-18 11:07:18 -04:00

94 lines
2.4 KiB
Go

package frontend
import (
"github.com/openziti-test-kitchen/zrok/model"
"github.com/openziti-test-kitchen/zrok/zrokdir"
"github.com/openziti/sdk-golang/ziti"
"github.com/openziti/sdk-golang/ziti/config"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"gopkg.in/mgo.v2/bson"
"time"
)
type metricsAgent struct {
cfg *Config
metrics *model.Metrics
updates chan metricsUpdate
zCtx ziti.Context
}
type metricsUpdate struct {
id string
bytesRead int64
bytesWritten int64
}
func newMetricsAgent(cfg *Config) (*metricsAgent, error) {
zif, err := zrokdir.ZitiIdentityFile(cfg.Identity)
if err != nil {
return nil, errors.Wrapf(err, "error getting '%v' identity file", cfg.Identity)
}
zCfg, err := config.NewFromFile(zif)
if err != nil {
return nil, errors.Wrapf(err, "error loading '%v' identity", cfg.Identity)
}
logrus.Infof("loaded '%v' identity", cfg.Identity)
return &metricsAgent{
cfg: cfg,
metrics: &model.Metrics{Namespace: cfg.Identity},
updates: make(chan metricsUpdate, 10240),
zCtx: ziti.NewContextWithConfig(zCfg),
}, nil
}
func (ma *metricsAgent) run() {
for {
select {
case update := <-ma.updates:
ma.metrics.PushSession(update.id, model.SessionMetrics{
BytesRead: update.bytesRead,
BytesWritten: update.bytesWritten,
LastUpdate: time.Now().UnixMilli(),
})
case <-time.After(5 * time.Second):
if err := ma.sendMetrics(); err != nil {
logrus.Errorf("error sending metrics: %v", err)
}
var dropouts []string
for k, v := range ma.metrics.Sessions {
if time.Now().Sub(time.UnixMilli(v.LastUpdate)) > ma.cfg.Metrics.DropoutTimeout {
dropouts = append(dropouts, k)
}
}
for _, dropout := range dropouts {
delete(ma.metrics.Sessions, dropout)
logrus.Infof("dropout: %v", dropout)
}
}
}
}
func (ma *metricsAgent) sendMetrics() error {
ma.metrics.LocalNow = time.Now().UnixMilli()
metricsJson, err := bson.Marshal(ma.metrics)
if err != nil {
return errors.Wrap(err, "error marshaling metrics")
}
conn, err := ma.zCtx.Dial(ma.cfg.Metrics.Service)
if err != nil {
return errors.Wrap(err, "error connecting to metrics service")
}
n, err := conn.Write(metricsJson)
if err != nil {
return errors.Wrap(err, "error sending metrics")
}
defer func() { _ = conn.Close() }()
if n != len(metricsJson) {
return errors.Wrap(err, "short metrics write")
}
logrus.Infof("sent %d bytes of metrics data", n)
return nil
}