diff --git a/controller/metrics.go b/controller/metrics.go index 7ac8ab05..ce36bfbc 100644 --- a/controller/metrics.go +++ b/controller/metrics.go @@ -151,7 +151,7 @@ func (mh *metricsHandler) run() { time.UnixMilli(v.LastUpdate)) pts = append(pts, pt) } - out += fmt.Sprintf("\t[%v.%v]: %v/%v (%v)\n", mtr.Namespace, k, util.BytesToSize(v.BytesRead), util.BytesToSize(v.BytesWritten), time.Duration(mtr.LocalNow-v.LastUpdate)*time.Millisecond) + out += fmt.Sprintf("\t[%v.%v]: %v/%v\n", mtr.Namespace, k, util.BytesToSize(v.BytesRead), util.BytesToSize(v.BytesWritten)) } out += "}" logrus.Info(out) diff --git a/endpoints/frontend/config.go b/endpoints/frontend/config.go index 1d991de0..cce61855 100644 --- a/endpoints/frontend/config.go +++ b/endpoints/frontend/config.go @@ -14,16 +14,16 @@ type Config struct { } type MetricsConfig struct { - Service string - DropoutTimeout time.Duration + Service string + SendTimeout time.Duration } func DefaultConfig() *Config { return &Config{ Identity: "frontend", Metrics: &MetricsConfig{ - Service: "metrics", - DropoutTimeout: 30 * time.Second, + Service: "metrics", + SendTimeout: 5 * time.Second, }, Address: "0.0.0.0:8080", } diff --git a/endpoints/frontend/metrics.go b/endpoints/frontend/metrics.go index 74a721b1..88b65db7 100644 --- a/endpoints/frontend/metrics.go +++ b/endpoints/frontend/metrics.go @@ -12,10 +12,11 @@ import ( ) type metricsAgent struct { - cfg *Config - metrics *model.Metrics - updates chan metricsUpdate - zCtx ziti.Context + cfg *Config + accum map[string]model.SessionMetrics + updates chan metricsUpdate + lastSend time.Time + zCtx ziti.Context } type metricsUpdate struct { @@ -35,10 +36,11 @@ func newMetricsAgent(cfg *Config) (*metricsAgent, error) { } logrus.Infof("loaded '%v' identity", cfg.Identity) return &metricsAgent{ - cfg: cfg, - metrics: &model.Metrics{Namespace: cfg.Identity}, - updates: make(chan metricsUpdate, 10240), - zCtx: ziti.NewContextWithConfig(zCfg), + cfg: cfg, + accum: make(map[string]model.SessionMetrics), + updates: make(chan metricsUpdate, 10240), + lastSend: time.Now(), + zCtx: ziti.NewContextWithConfig(zCfg), }, nil } @@ -46,33 +48,43 @@ func (ma *metricsAgent) run() { for { select { case update := <-ma.updates: - ma.metrics.PushSession(update.id, model.SessionMetrics{ - BytesRead: update.bytesRead, - BytesWritten: update.bytesWritten, - LastUpdate: time.Now().UnixMilli(), - }) + ma.pushUpdate(update) + if time.Since(ma.lastSend) >= ma.cfg.Metrics.SendTimeout { + if err := ma.sendMetrics(); err != nil { + logrus.Errorf("error sending metrics: %v", err) + } + } case <-time.After(5 * time.Second): if err := ma.sendMetrics(); err != nil { logrus.Errorf("error sending metrics: %v", err) } - var dropouts []string - for k, v := range ma.metrics.Sessions { - if time.Now().Sub(time.UnixMilli(v.LastUpdate)) > ma.cfg.Metrics.DropoutTimeout { - dropouts = append(dropouts, k) - } - } - for _, dropout := range dropouts { - delete(ma.metrics.Sessions, dropout) - logrus.Infof("dropout: %v", dropout) - } + } + } +} + +func (ma *metricsAgent) pushUpdate(mu metricsUpdate) { + if sm, found := ma.accum[mu.id]; found { + ma.accum[mu.id] = model.SessionMetrics{ + BytesRead: sm.BytesRead + mu.bytesRead, + BytesWritten: sm.BytesWritten + mu.bytesWritten, + LastUpdate: time.Now().UnixMilli(), + } + } else { + ma.accum[mu.id] = model.SessionMetrics{ + BytesRead: mu.bytesRead, + BytesWritten: mu.bytesWritten, + LastUpdate: time.Now().UnixMilli(), } } } func (ma *metricsAgent) sendMetrics() error { - ma.metrics.LocalNow = time.Now().UnixMilli() - metricsJson, err := bson.Marshal(ma.metrics) + m := &model.Metrics{ + Namespace: ma.cfg.Identity, + Sessions: ma.accum, + } + metricsJson, err := bson.Marshal(m) if err != nil { return errors.Wrap(err, "error marshaling metrics") } @@ -89,5 +101,7 @@ func (ma *metricsAgent) sendMetrics() error { return errors.Wrap(err, "short metrics write") } logrus.Infof("sent %d bytes of metrics data", n) + ma.accum = make(map[string]model.SessionMetrics) + ma.lastSend = time.Now() return nil } diff --git a/model/metrics.go b/model/metrics.go index 8b37ff6e..33f9ca21 100644 --- a/model/metrics.go +++ b/model/metrics.go @@ -1,25 +1,10 @@ package model type Metrics struct { - LocalNow int64 Namespace string Sessions map[string]SessionMetrics } -func (m *Metrics) PushSession(svcName string, sm SessionMetrics) { - if m.Sessions == nil { - m.Sessions = make(map[string]SessionMetrics) - } - if prev, found := m.Sessions[svcName]; found { - prev.BytesRead += sm.BytesRead - prev.BytesWritten += sm.BytesWritten - prev.LastUpdate = sm.LastUpdate - m.Sessions[svcName] = prev - } else { - m.Sessions[svcName] = sm - } -} - type SessionMetrics struct { BytesRead int64 BytesWritten int64