better metrics accumulator (#74, #76)

This commit is contained in:
Michael Quigley 2022-10-18 11:57:32 -04:00
parent 8c214fcdd8
commit 51d78526fe
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
4 changed files with 44 additions and 45 deletions

View File

@ -151,7 +151,7 @@ func (mh *metricsHandler) run() {
time.UnixMilli(v.LastUpdate)) time.UnixMilli(v.LastUpdate))
pts = append(pts, pt) pts = append(pts, pt)
} }
out += fmt.Sprintf("\t[%v.%v]: %v/%v (%v)\n", mtr.Namespace, k, util.BytesToSize(v.BytesRead), util.BytesToSize(v.BytesWritten), time.Duration(mtr.LocalNow-v.LastUpdate)*time.Millisecond) out += fmt.Sprintf("\t[%v.%v]: %v/%v\n", mtr.Namespace, k, util.BytesToSize(v.BytesRead), util.BytesToSize(v.BytesWritten))
} }
out += "}" out += "}"
logrus.Info(out) logrus.Info(out)

View File

@ -14,16 +14,16 @@ type Config struct {
} }
type MetricsConfig struct { type MetricsConfig struct {
Service string Service string
DropoutTimeout time.Duration SendTimeout time.Duration
} }
func DefaultConfig() *Config { func DefaultConfig() *Config {
return &Config{ return &Config{
Identity: "frontend", Identity: "frontend",
Metrics: &MetricsConfig{ Metrics: &MetricsConfig{
Service: "metrics", Service: "metrics",
DropoutTimeout: 30 * time.Second, SendTimeout: 5 * time.Second,
}, },
Address: "0.0.0.0:8080", Address: "0.0.0.0:8080",
} }

View File

@ -12,10 +12,11 @@ import (
) )
type metricsAgent struct { type metricsAgent struct {
cfg *Config cfg *Config
metrics *model.Metrics accum map[string]model.SessionMetrics
updates chan metricsUpdate updates chan metricsUpdate
zCtx ziti.Context lastSend time.Time
zCtx ziti.Context
} }
type metricsUpdate struct { type metricsUpdate struct {
@ -35,10 +36,11 @@ func newMetricsAgent(cfg *Config) (*metricsAgent, error) {
} }
logrus.Infof("loaded '%v' identity", cfg.Identity) logrus.Infof("loaded '%v' identity", cfg.Identity)
return &metricsAgent{ return &metricsAgent{
cfg: cfg, cfg: cfg,
metrics: &model.Metrics{Namespace: cfg.Identity}, accum: make(map[string]model.SessionMetrics),
updates: make(chan metricsUpdate, 10240), updates: make(chan metricsUpdate, 10240),
zCtx: ziti.NewContextWithConfig(zCfg), lastSend: time.Now(),
zCtx: ziti.NewContextWithConfig(zCfg),
}, nil }, nil
} }
@ -46,33 +48,43 @@ func (ma *metricsAgent) run() {
for { for {
select { select {
case update := <-ma.updates: case update := <-ma.updates:
ma.metrics.PushSession(update.id, model.SessionMetrics{ ma.pushUpdate(update)
BytesRead: update.bytesRead, if time.Since(ma.lastSend) >= ma.cfg.Metrics.SendTimeout {
BytesWritten: update.bytesWritten, if err := ma.sendMetrics(); err != nil {
LastUpdate: time.Now().UnixMilli(), logrus.Errorf("error sending metrics: %v", err)
}) }
}
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
if err := ma.sendMetrics(); err != nil { if err := ma.sendMetrics(); err != nil {
logrus.Errorf("error sending metrics: %v", err) logrus.Errorf("error sending metrics: %v", err)
} }
var dropouts []string }
for k, v := range ma.metrics.Sessions { }
if time.Now().Sub(time.UnixMilli(v.LastUpdate)) > ma.cfg.Metrics.DropoutTimeout { }
dropouts = append(dropouts, k)
} func (ma *metricsAgent) pushUpdate(mu metricsUpdate) {
} if sm, found := ma.accum[mu.id]; found {
for _, dropout := range dropouts { ma.accum[mu.id] = model.SessionMetrics{
delete(ma.metrics.Sessions, dropout) BytesRead: sm.BytesRead + mu.bytesRead,
logrus.Infof("dropout: %v", dropout) BytesWritten: sm.BytesWritten + mu.bytesWritten,
} LastUpdate: time.Now().UnixMilli(),
}
} else {
ma.accum[mu.id] = model.SessionMetrics{
BytesRead: mu.bytesRead,
BytesWritten: mu.bytesWritten,
LastUpdate: time.Now().UnixMilli(),
} }
} }
} }
func (ma *metricsAgent) sendMetrics() error { func (ma *metricsAgent) sendMetrics() error {
ma.metrics.LocalNow = time.Now().UnixMilli() m := &model.Metrics{
metricsJson, err := bson.Marshal(ma.metrics) Namespace: ma.cfg.Identity,
Sessions: ma.accum,
}
metricsJson, err := bson.Marshal(m)
if err != nil { if err != nil {
return errors.Wrap(err, "error marshaling metrics") return errors.Wrap(err, "error marshaling metrics")
} }
@ -89,5 +101,7 @@ func (ma *metricsAgent) sendMetrics() error {
return errors.Wrap(err, "short metrics write") return errors.Wrap(err, "short metrics write")
} }
logrus.Infof("sent %d bytes of metrics data", n) logrus.Infof("sent %d bytes of metrics data", n)
ma.accum = make(map[string]model.SessionMetrics)
ma.lastSend = time.Now()
return nil return nil
} }

View File

@ -1,25 +1,10 @@
package model package model
type Metrics struct { type Metrics struct {
LocalNow int64
Namespace string Namespace string
Sessions map[string]SessionMetrics Sessions map[string]SessionMetrics
} }
func (m *Metrics) PushSession(svcName string, sm SessionMetrics) {
if m.Sessions == nil {
m.Sessions = make(map[string]SessionMetrics)
}
if prev, found := m.Sessions[svcName]; found {
prev.BytesRead += sm.BytesRead
prev.BytesWritten += sm.BytesWritten
prev.LastUpdate = sm.LastUpdate
m.Sessions[svcName] = prev
} else {
m.Sessions[svcName] = sm
}
}
type SessionMetrics struct { type SessionMetrics struct {
BytesRead int64 BytesRead int64
BytesWritten int64 BytesWritten int64