diff --git a/controller/metrics.go b/controller/metrics.go index a975a2f7..bba68078 100644 --- a/controller/metrics.go +++ b/controller/metrics.go @@ -33,14 +33,15 @@ type InfluxConfig struct { } type metricsAgent struct { - cfg *MetricsConfig - influx influxdb2.Client - writeApi api.WriteAPIBlocking - envCache map[string]*envCacheEntry - zCtx ziti.Context - zListener edge.Listener - shutdown chan struct{} - joined chan struct{} + cfg *MetricsConfig + influx influxdb2.Client + writeApi api.WriteAPIBlocking + metricsQueue chan *model.Metrics + envCache map[string]*envCacheEntry + zCtx ziti.Context + zListener edge.Listener + shutdown chan struct{} + joined chan struct{} } type envCacheEntry struct { @@ -50,10 +51,11 @@ type envCacheEntry struct { func newMetricsAgent(cfg *MetricsConfig) *metricsAgent { ma := &metricsAgent{ - cfg: cfg, - envCache: make(map[string]*envCacheEntry), - shutdown: make(chan struct{}), - joined: make(chan struct{}), + cfg: cfg, + metricsQueue: make(chan *model.Metrics, 1024), + envCache: make(map[string]*envCacheEntry), + shutdown: make(chan struct{}), + joined: make(chan struct{}), } if cfg.Influx != nil { ma.influx = influxdb2.NewClient(cfg.Influx.Url, cfg.Influx.Token) @@ -62,24 +64,35 @@ func newMetricsAgent(cfg *MetricsConfig) *metricsAgent { return ma } -func (mtr *metricsAgent) run() { +func (ma *metricsAgent) run() { logrus.Info("starting") defer logrus.Info("exiting") - defer close(mtr.joined) + defer close(ma.joined) - if err := mtr.bindService(); err != nil { + if err := ma.bindService(); err != nil { logrus.Errorf("error binding metrics service: %v", err) return } - <-mtr.shutdown +work: + for { + select { + case <-ma.shutdown: + break work - if err := mtr.zListener.Close(); err != nil { + case m := <-ma.metricsQueue: + if err := ma.processMetrics(m); err != nil { + logrus.Errorf("error processing metrics: %v", err) + } + } + } + + if err := ma.zListener.Close(); err != nil { logrus.Errorf("error closing metrics service listener: %v", err) } } -func (mtr *metricsAgent) bindService() error { +func (ma *metricsAgent) bindService() error { zif, err := zrokdir.ZitiIdentityFile("ctrl") if err != nil { return errors.Wrap(err, "error getting 'ctrl' identity") @@ -88,48 +101,77 @@ func (mtr *metricsAgent) bindService() error { if err != nil { return errors.Wrap(err, "error loading 'ctrl' identity") } - mtr.zCtx = ziti.NewContextWithConfig(zCfg) + ma.zCtx = ziti.NewContextWithConfig(zCfg) opts := &ziti.ListenOptions{ ConnectTimeout: 5 * time.Minute, MaxConnections: 1024, } - mtr.zListener, err = mtr.zCtx.ListenWithOptions(mtr.cfg.ServiceName, opts) + ma.zListener, err = ma.zCtx.ListenWithOptions(ma.cfg.ServiceName, opts) if err != nil { - return errors.Wrapf(err, "error listening for metrics on '%v'", mtr.cfg.ServiceName) + return errors.Wrapf(err, "error listening for metrics on '%v'", ma.cfg.ServiceName) } - go mtr.listen() + go ma.listen() return nil } -func (mtr *metricsAgent) listen() { +func (ma *metricsAgent) listen() { logrus.Info("started") defer logrus.Info("exited") for { - conn, err := mtr.zListener.Accept() + conn, err := ma.zListener.Accept() if err != nil { logrus.Errorf("error accepting: %v", err) return } logrus.Debugf("accepted metrics connetion from '%v'", conn.RemoteAddr()) - go newMetricsHandler(conn, mtr.writeApi).run() + go newMetricsHandler(conn, ma.metricsQueue).run() } } -func (mtr *metricsAgent) stop() { - close(mtr.shutdown) +func (ma *metricsAgent) processMetrics(m *model.Metrics) error { + var pts []*write.Point + if len(m.Sessions) > 0 { + out := "metrics = {\n" + for k, v := range m.Sessions { + if ma.writeApi != nil { + pt := influxdb2.NewPoint("xfer", + map[string]string{"namespace": m.Namespace, "session": k}, + map[string]interface{}{"bytesRead": v.BytesRead, "bytesWritten": v.BytesWritten}, + time.UnixMilli(v.LastUpdate)) + pts = append(pts, pt) + } + out += fmt.Sprintf("\t[%v.%v]: %v/%v (%v)\n", m.Namespace, k, util.BytesToSize(v.BytesRead), util.BytesToSize(v.BytesWritten), time.Since(time.UnixMilli(v.LastUpdate))) + } + out += "}" + logrus.Info(out) + } + + if len(pts) > 0 { + if err := ma.writeApi.WritePoint(context.Background(), pts...); err == nil { + logrus.Debugf("wrote metrics to influx") + } else { + return err + } + } + + return nil } -func (mtr *metricsAgent) join() { - <-mtr.joined +func (ma *metricsAgent) stop() { + close(ma.shutdown) +} + +func (ma *metricsAgent) join() { + <-ma.joined } type metricsHandler struct { - conn net.Conn - writeApi api.WriteAPIBlocking + conn net.Conn + metricsQueue chan *model.Metrics } -func newMetricsHandler(conn net.Conn, writeApi api.WriteAPIBlocking) *metricsHandler { - return &metricsHandler{conn, writeApi} +func newMetricsHandler(conn net.Conn, metricsQueue chan *model.Metrics) *metricsHandler { + return &metricsHandler{conn, metricsQueue} } func (mh *metricsHandler) run() { @@ -146,33 +188,9 @@ func (mh *metricsHandler) run() { if err := mh.conn.Close(); err != nil { logrus.Errorf("error closing metrics connection") } - mtr := &model.Metrics{} - if err := bson.Unmarshal(mtrBuf.Bytes(), &mtr); err == nil { - var pts []*write.Point - if len(mtr.Sessions) > 0 { - out := "metrics = {\n" - for k, v := range mtr.Sessions { - if mh.writeApi != nil { - pt := influxdb2.NewPoint("xfer", - map[string]string{"namespace": mtr.Namespace, "session": k}, - map[string]interface{}{"bytesRead": v.BytesRead, "bytesWritten": v.BytesWritten}, - time.UnixMilli(v.LastUpdate)) - pts = append(pts, pt) - } - out += fmt.Sprintf("\t[%v.%v]: %v/%v (%v)\n", mtr.Namespace, k, util.BytesToSize(v.BytesRead), util.BytesToSize(v.BytesWritten), time.Since(time.UnixMilli(v.LastUpdate))) - } - out += "}" - logrus.Info(out) - } - - if len(pts) > 0 { - if err := mh.writeApi.WritePoint(context.Background(), pts...); err == nil { - logrus.Debugf("wrote metrics to influx") - } else { - logrus.Errorf("error writing points to influxdb: %v", err) - } - } - + m := &model.Metrics{} + if err := bson.Unmarshal(mtrBuf.Bytes(), &m); err == nil { + mh.metricsQueue <- m } else { logrus.Errorf("error unmarshaling metrics: %v", err) }