centralized metrics storage in metricsAgent, rather than the handler (#74, #76)

This commit is contained in:
Michael Quigley 2022-10-19 11:37:31 -04:00
parent 83c12e30cb
commit 13251aec17
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62

View File

@ -36,6 +36,7 @@ type metricsAgent struct {
cfg *MetricsConfig cfg *MetricsConfig
influx influxdb2.Client influx influxdb2.Client
writeApi api.WriteAPIBlocking writeApi api.WriteAPIBlocking
metricsQueue chan *model.Metrics
envCache map[string]*envCacheEntry envCache map[string]*envCacheEntry
zCtx ziti.Context zCtx ziti.Context
zListener edge.Listener zListener edge.Listener
@ -51,6 +52,7 @@ type envCacheEntry struct {
func newMetricsAgent(cfg *MetricsConfig) *metricsAgent { func newMetricsAgent(cfg *MetricsConfig) *metricsAgent {
ma := &metricsAgent{ ma := &metricsAgent{
cfg: cfg, cfg: cfg,
metricsQueue: make(chan *model.Metrics, 1024),
envCache: make(map[string]*envCacheEntry), envCache: make(map[string]*envCacheEntry),
shutdown: make(chan struct{}), shutdown: make(chan struct{}),
joined: make(chan struct{}), joined: make(chan struct{}),
@ -62,24 +64,35 @@ func newMetricsAgent(cfg *MetricsConfig) *metricsAgent {
return ma return ma
} }
func (mtr *metricsAgent) run() { func (ma *metricsAgent) run() {
logrus.Info("starting") logrus.Info("starting")
defer logrus.Info("exiting") defer logrus.Info("exiting")
defer close(mtr.joined) defer close(ma.joined)
if err := mtr.bindService(); err != nil { if err := ma.bindService(); err != nil {
logrus.Errorf("error binding metrics service: %v", err) logrus.Errorf("error binding metrics service: %v", err)
return return
} }
<-mtr.shutdown work:
for {
select {
case <-ma.shutdown:
break work
if err := mtr.zListener.Close(); err != nil { case m := <-ma.metricsQueue:
if err := ma.processMetrics(m); err != nil {
logrus.Errorf("error processing metrics: %v", err)
}
}
}
if err := ma.zListener.Close(); err != nil {
logrus.Errorf("error closing metrics service listener: %v", err) logrus.Errorf("error closing metrics service listener: %v", err)
} }
} }
func (mtr *metricsAgent) bindService() error { func (ma *metricsAgent) bindService() error {
zif, err := zrokdir.ZitiIdentityFile("ctrl") zif, err := zrokdir.ZitiIdentityFile("ctrl")
if err != nil { if err != nil {
return errors.Wrap(err, "error getting 'ctrl' identity") return errors.Wrap(err, "error getting 'ctrl' identity")
@ -88,48 +101,77 @@ func (mtr *metricsAgent) bindService() error {
if err != nil { if err != nil {
return errors.Wrap(err, "error loading 'ctrl' identity") return errors.Wrap(err, "error loading 'ctrl' identity")
} }
mtr.zCtx = ziti.NewContextWithConfig(zCfg) ma.zCtx = ziti.NewContextWithConfig(zCfg)
opts := &ziti.ListenOptions{ opts := &ziti.ListenOptions{
ConnectTimeout: 5 * time.Minute, ConnectTimeout: 5 * time.Minute,
MaxConnections: 1024, MaxConnections: 1024,
} }
mtr.zListener, err = mtr.zCtx.ListenWithOptions(mtr.cfg.ServiceName, opts) ma.zListener, err = ma.zCtx.ListenWithOptions(ma.cfg.ServiceName, opts)
if err != nil { if err != nil {
return errors.Wrapf(err, "error listening for metrics on '%v'", mtr.cfg.ServiceName) return errors.Wrapf(err, "error listening for metrics on '%v'", ma.cfg.ServiceName)
} }
go mtr.listen() go ma.listen()
return nil return nil
} }
func (mtr *metricsAgent) listen() { func (ma *metricsAgent) listen() {
logrus.Info("started") logrus.Info("started")
defer logrus.Info("exited") defer logrus.Info("exited")
for { for {
conn, err := mtr.zListener.Accept() conn, err := ma.zListener.Accept()
if err != nil { if err != nil {
logrus.Errorf("error accepting: %v", err) logrus.Errorf("error accepting: %v", err)
return return
} }
logrus.Debugf("accepted metrics connetion from '%v'", conn.RemoteAddr()) logrus.Debugf("accepted metrics connetion from '%v'", conn.RemoteAddr())
go newMetricsHandler(conn, mtr.writeApi).run() go newMetricsHandler(conn, ma.metricsQueue).run()
} }
} }
func (mtr *metricsAgent) stop() { func (ma *metricsAgent) processMetrics(m *model.Metrics) error {
close(mtr.shutdown) var pts []*write.Point
if len(m.Sessions) > 0 {
out := "metrics = {\n"
for k, v := range m.Sessions {
if ma.writeApi != nil {
pt := influxdb2.NewPoint("xfer",
map[string]string{"namespace": m.Namespace, "session": k},
map[string]interface{}{"bytesRead": v.BytesRead, "bytesWritten": v.BytesWritten},
time.UnixMilli(v.LastUpdate))
pts = append(pts, pt)
}
out += fmt.Sprintf("\t[%v.%v]: %v/%v (%v)\n", m.Namespace, k, util.BytesToSize(v.BytesRead), util.BytesToSize(v.BytesWritten), time.Since(time.UnixMilli(v.LastUpdate)))
}
out += "}"
logrus.Info(out)
}
if len(pts) > 0 {
if err := ma.writeApi.WritePoint(context.Background(), pts...); err == nil {
logrus.Debugf("wrote metrics to influx")
} else {
return err
}
}
return nil
} }
func (mtr *metricsAgent) join() { func (ma *metricsAgent) stop() {
<-mtr.joined close(ma.shutdown)
}
func (ma *metricsAgent) join() {
<-ma.joined
} }
type metricsHandler struct { type metricsHandler struct {
conn net.Conn conn net.Conn
writeApi api.WriteAPIBlocking metricsQueue chan *model.Metrics
} }
func newMetricsHandler(conn net.Conn, writeApi api.WriteAPIBlocking) *metricsHandler { func newMetricsHandler(conn net.Conn, metricsQueue chan *model.Metrics) *metricsHandler {
return &metricsHandler{conn, writeApi} return &metricsHandler{conn, metricsQueue}
} }
func (mh *metricsHandler) run() { func (mh *metricsHandler) run() {
@ -146,33 +188,9 @@ func (mh *metricsHandler) run() {
if err := mh.conn.Close(); err != nil { if err := mh.conn.Close(); err != nil {
logrus.Errorf("error closing metrics connection") logrus.Errorf("error closing metrics connection")
} }
mtr := &model.Metrics{} m := &model.Metrics{}
if err := bson.Unmarshal(mtrBuf.Bytes(), &mtr); err == nil { if err := bson.Unmarshal(mtrBuf.Bytes(), &m); err == nil {
var pts []*write.Point mh.metricsQueue <- m
if len(mtr.Sessions) > 0 {
out := "metrics = {\n"
for k, v := range mtr.Sessions {
if mh.writeApi != nil {
pt := influxdb2.NewPoint("xfer",
map[string]string{"namespace": mtr.Namespace, "session": k},
map[string]interface{}{"bytesRead": v.BytesRead, "bytesWritten": v.BytesWritten},
time.UnixMilli(v.LastUpdate))
pts = append(pts, pt)
}
out += fmt.Sprintf("\t[%v.%v]: %v/%v (%v)\n", mtr.Namespace, k, util.BytesToSize(v.BytesRead), util.BytesToSize(v.BytesWritten), time.Since(time.UnixMilli(v.LastUpdate)))
}
out += "}"
logrus.Info(out)
}
if len(pts) > 0 {
if err := mh.writeApi.WritePoint(context.Background(), pts...); err == nil {
logrus.Debugf("wrote metrics to influx")
} else {
logrus.Errorf("error writing points to influxdb: %v", err)
}
}
} else { } else {
logrus.Errorf("error unmarshaling metrics: %v", err) logrus.Errorf("error unmarshaling metrics: %v", err)
} }