From 87122024da29cfa1a03c202d2dd460cc09f4aee6 Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Mon, 10 Oct 2022 16:56:01 -0400 Subject: [PATCH] more metrics infrastructure (#74) --- .../backend/{metrics.go => metricsconn.go} | 3 - endpoints/frontend/http.go | 12 +++- endpoints/frontend/metrics.go | 59 ++++++++----------- endpoints/frontend/metricsconn.go | 52 ++++++++++++++++ 4 files changed, 84 insertions(+), 42 deletions(-) rename endpoints/backend/{metrics.go => metricsconn.go} (89%) create mode 100644 endpoints/frontend/metricsconn.go diff --git a/endpoints/backend/metrics.go b/endpoints/backend/metricsconn.go similarity index 89% rename from endpoints/backend/metrics.go rename to endpoints/backend/metricsconn.go index 049227ab..66a3f661 100644 --- a/endpoints/backend/metrics.go +++ b/endpoints/backend/metricsconn.go @@ -1,7 +1,6 @@ package backend import ( - "github.com/sirupsen/logrus" "net" "time" ) @@ -17,13 +16,11 @@ func newMetricsConn(id string, conn net.Conn) *metricsConn { func (mc *metricsConn) Read(b []byte) (n int, err error) { n, err = mc.conn.Read(b) - logrus.Infof("[%v] => %d", mc.id, n) return n, err } func (mc *metricsConn) Write(b []byte) (n int, err error) { n, err = mc.conn.Write(b) - logrus.Infof("[%v] <= %d", mc.id, n) return n, err } diff --git a/endpoints/frontend/http.go b/endpoints/frontend/http.go index 173a3d30..c4a6b2be 100644 --- a/endpoints/frontend/http.go +++ b/endpoints/frontend/http.go @@ -24,9 +24,13 @@ type httpListen struct { cfg *Config zCtx ziti.Context handler http.Handler + metrics *metricsAgent } func NewHTTP(cfg *Config) (*httpListen, error) { + ma := newMetricsAgent() + go ma.run() + zCfgPath, err := zrokdir.ZitiIdentityFile(cfg.Identity) if err != nil { return nil, errors.Wrapf(err, "error getting ziti identity '%v' from zrokdir", cfg.Identity) @@ -37,7 +41,7 @@ func NewHTTP(cfg *Config) (*httpListen, error) { } zCfg.ConfigTypes = []string{model.ZrokProxyConfig} zCtx := ziti.NewContextWithConfig(zCfg) - zDialCtx := zitiDialContext{ctx: zCtx} + zDialCtx := zitiDialContext{ctx: zCtx, updates: ma.updates} zTransport := http.DefaultTransport.(*http.Transport).Clone() zTransport.DialContext = zDialCtx.Dial @@ -52,6 +56,7 @@ func NewHTTP(cfg *Config) (*httpListen, error) { cfg: cfg, zCtx: zCtx, handler: handler, + metrics: ma, }, nil } @@ -60,7 +65,8 @@ func (self *httpListen) Run() error { } type zitiDialContext struct { - ctx ziti.Context + ctx ziti.Context + updates chan metricsUpdate } func (self *zitiDialContext) Dial(_ context.Context, _ string, addr string) (net.Conn, error) { @@ -69,7 +75,7 @@ func (self *zitiDialContext) Dial(_ context.Context, _ string, addr string) (net if err != nil { return conn, err } - return newMetricsConn(svcName, conn), nil + return newMetricsConn(svcName, conn, self.updates), nil } func newServiceProxy(cfg *Config, ctx ziti.Context) (*httputil.ReverseProxy, error) { diff --git a/endpoints/frontend/metrics.go b/endpoints/frontend/metrics.go index 4a38fcc8..ac1fbb8b 100644 --- a/endpoints/frontend/metrics.go +++ b/endpoints/frontend/metrics.go @@ -2,51 +2,38 @@ package frontend import ( "github.com/sirupsen/logrus" - "net" "time" ) -type metricsConn struct { - id string - conn net.Conn +type metricsAgent struct { + metrics map[string]sessionMetrics + updates chan metricsUpdate } -func newMetricsConn(id string, conn net.Conn) *metricsConn { - return &metricsConn{id, conn} +type sessionMetrics struct { + bytesRead int64 + bytesWritten int64 + lastUpdate time.Time } -func (mc *metricsConn) Read(b []byte) (n int, err error) { - n, err = mc.conn.Read(b) - logrus.Infof("[%v] => %d", mc.id, n) - return n, err +type metricsUpdate struct { + id string + bytesRead int64 + bytesWritten int64 } -func (mc *metricsConn) Write(b []byte) (n int, err error) { - n, err = mc.conn.Write(b) - logrus.Infof("[%v] <= %d", mc.id, n) - return n, err +func newMetricsAgent() *metricsAgent { + return &metricsAgent{ + metrics: make(map[string]sessionMetrics), + updates: make(chan metricsUpdate, 10240), + } } -func (mc *metricsConn) Close() error { - return mc.conn.Close() -} - -func (mc *metricsConn) LocalAddr() net.Addr { - return mc.conn.LocalAddr() -} - -func (mc *metricsConn) RemoteAddr() net.Addr { - return mc.conn.RemoteAddr() -} - -func (mc *metricsConn) SetDeadline(t time.Time) error { - return mc.conn.SetDeadline(t) -} - -func (mc *metricsConn) SetReadDeadline(t time.Time) error { - return mc.conn.SetReadDeadline(t) -} - -func (mc *metricsConn) SetWriteDeadline(t time.Time) error { - return mc.conn.SetWriteDeadline(t) +func (ma *metricsAgent) run() { + for { + select { + case update := <-ma.updates: + logrus.Infof("update: [%v] read: %d, written: %d", update.id, update.bytesRead, update.bytesWritten) + } + } } diff --git a/endpoints/frontend/metricsconn.go b/endpoints/frontend/metricsconn.go new file mode 100644 index 00000000..5664fc78 --- /dev/null +++ b/endpoints/frontend/metricsconn.go @@ -0,0 +1,52 @@ +package frontend + +import ( + "net" + "time" +) + +type metricsConn struct { + id string + conn net.Conn + updates chan metricsUpdate +} + +func newMetricsConn(id string, conn net.Conn, updates chan metricsUpdate) *metricsConn { + return &metricsConn{id, conn, updates} +} + +func (mc *metricsConn) Read(b []byte) (n int, err error) { + n, err = mc.conn.Read(b) + mc.updates <- metricsUpdate{mc.id, int64(n), 0} + return n, err +} + +func (mc *metricsConn) Write(b []byte) (n int, err error) { + n, err = mc.conn.Write(b) + mc.updates <- metricsUpdate{mc.id, 0, int64(n)} + return n, err +} + +func (mc *metricsConn) Close() error { + return mc.conn.Close() +} + +func (mc *metricsConn) LocalAddr() net.Addr { + return mc.conn.LocalAddr() +} + +func (mc *metricsConn) RemoteAddr() net.Addr { + return mc.conn.RemoteAddr() +} + +func (mc *metricsConn) SetDeadline(t time.Time) error { + return mc.conn.SetDeadline(t) +} + +func (mc *metricsConn) SetReadDeadline(t time.Time) error { + return mc.conn.SetReadDeadline(t) +} + +func (mc *metricsConn) SetWriteDeadline(t time.Time) error { + return mc.conn.SetWriteDeadline(t) +}