more metrics infrastructure (#74)

This commit is contained in:
Michael Quigley 2022-10-10 16:56:01 -04:00
parent a60449080b
commit 87122024da
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
4 changed files with 84 additions and 42 deletions

View File

@ -1,7 +1,6 @@
package backend package backend
import ( import (
"github.com/sirupsen/logrus"
"net" "net"
"time" "time"
) )
@ -17,13 +16,11 @@ func newMetricsConn(id string, conn net.Conn) *metricsConn {
func (mc *metricsConn) Read(b []byte) (n int, err error) { func (mc *metricsConn) Read(b []byte) (n int, err error) {
n, err = mc.conn.Read(b) n, err = mc.conn.Read(b)
logrus.Infof("[%v] => %d", mc.id, n)
return n, err return n, err
} }
func (mc *metricsConn) Write(b []byte) (n int, err error) { func (mc *metricsConn) Write(b []byte) (n int, err error) {
n, err = mc.conn.Write(b) n, err = mc.conn.Write(b)
logrus.Infof("[%v] <= %d", mc.id, n)
return n, err return n, err
} }

View File

@ -24,9 +24,13 @@ type httpListen struct {
cfg *Config cfg *Config
zCtx ziti.Context zCtx ziti.Context
handler http.Handler handler http.Handler
metrics *metricsAgent
} }
func NewHTTP(cfg *Config) (*httpListen, error) { func NewHTTP(cfg *Config) (*httpListen, error) {
ma := newMetricsAgent()
go ma.run()
zCfgPath, err := zrokdir.ZitiIdentityFile(cfg.Identity) zCfgPath, err := zrokdir.ZitiIdentityFile(cfg.Identity)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "error getting ziti identity '%v' from zrokdir", cfg.Identity) return nil, errors.Wrapf(err, "error getting ziti identity '%v' from zrokdir", cfg.Identity)
@ -37,7 +41,7 @@ func NewHTTP(cfg *Config) (*httpListen, error) {
} }
zCfg.ConfigTypes = []string{model.ZrokProxyConfig} zCfg.ConfigTypes = []string{model.ZrokProxyConfig}
zCtx := ziti.NewContextWithConfig(zCfg) zCtx := ziti.NewContextWithConfig(zCfg)
zDialCtx := zitiDialContext{ctx: zCtx} zDialCtx := zitiDialContext{ctx: zCtx, updates: ma.updates}
zTransport := http.DefaultTransport.(*http.Transport).Clone() zTransport := http.DefaultTransport.(*http.Transport).Clone()
zTransport.DialContext = zDialCtx.Dial zTransport.DialContext = zDialCtx.Dial
@ -52,6 +56,7 @@ func NewHTTP(cfg *Config) (*httpListen, error) {
cfg: cfg, cfg: cfg,
zCtx: zCtx, zCtx: zCtx,
handler: handler, handler: handler,
metrics: ma,
}, nil }, nil
} }
@ -61,6 +66,7 @@ func (self *httpListen) Run() error {
type zitiDialContext struct { type zitiDialContext struct {
ctx ziti.Context ctx ziti.Context
updates chan metricsUpdate
} }
func (self *zitiDialContext) Dial(_ context.Context, _ string, addr string) (net.Conn, error) { func (self *zitiDialContext) Dial(_ context.Context, _ string, addr string) (net.Conn, error) {
@ -69,7 +75,7 @@ func (self *zitiDialContext) Dial(_ context.Context, _ string, addr string) (net
if err != nil { if err != nil {
return conn, err return conn, err
} }
return newMetricsConn(svcName, conn), nil return newMetricsConn(svcName, conn, self.updates), nil
} }
func newServiceProxy(cfg *Config, ctx ziti.Context) (*httputil.ReverseProxy, error) { func newServiceProxy(cfg *Config, ctx ziti.Context) (*httputil.ReverseProxy, error) {

View File

@ -2,51 +2,38 @@ package frontend
import ( import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"net"
"time" "time"
) )
type metricsConn struct { type metricsAgent struct {
metrics map[string]sessionMetrics
updates chan metricsUpdate
}
type sessionMetrics struct {
bytesRead int64
bytesWritten int64
lastUpdate time.Time
}
type metricsUpdate struct {
id string id string
conn net.Conn bytesRead int64
bytesWritten int64
} }
func newMetricsConn(id string, conn net.Conn) *metricsConn { func newMetricsAgent() *metricsAgent {
return &metricsConn{id, conn} return &metricsAgent{
metrics: make(map[string]sessionMetrics),
updates: make(chan metricsUpdate, 10240),
}
} }
func (mc *metricsConn) Read(b []byte) (n int, err error) { func (ma *metricsAgent) run() {
n, err = mc.conn.Read(b) for {
logrus.Infof("[%v] => %d", mc.id, n) select {
return n, err case update := <-ma.updates:
logrus.Infof("update: [%v] read: %d, written: %d", update.id, update.bytesRead, update.bytesWritten)
} }
func (mc *metricsConn) Write(b []byte) (n int, err error) {
n, err = mc.conn.Write(b)
logrus.Infof("[%v] <= %d", mc.id, n)
return n, err
} }
func (mc *metricsConn) Close() error {
return mc.conn.Close()
}
func (mc *metricsConn) LocalAddr() net.Addr {
return mc.conn.LocalAddr()
}
func (mc *metricsConn) RemoteAddr() net.Addr {
return mc.conn.RemoteAddr()
}
func (mc *metricsConn) SetDeadline(t time.Time) error {
return mc.conn.SetDeadline(t)
}
func (mc *metricsConn) SetReadDeadline(t time.Time) error {
return mc.conn.SetReadDeadline(t)
}
func (mc *metricsConn) SetWriteDeadline(t time.Time) error {
return mc.conn.SetWriteDeadline(t)
} }

View File

@ -0,0 +1,52 @@
package frontend
import (
"net"
"time"
)
type metricsConn struct {
id string
conn net.Conn
updates chan metricsUpdate
}
func newMetricsConn(id string, conn net.Conn, updates chan metricsUpdate) *metricsConn {
return &metricsConn{id, conn, updates}
}
func (mc *metricsConn) Read(b []byte) (n int, err error) {
n, err = mc.conn.Read(b)
mc.updates <- metricsUpdate{mc.id, int64(n), 0}
return n, err
}
func (mc *metricsConn) Write(b []byte) (n int, err error) {
n, err = mc.conn.Write(b)
mc.updates <- metricsUpdate{mc.id, 0, int64(n)}
return n, err
}
func (mc *metricsConn) Close() error {
return mc.conn.Close()
}
func (mc *metricsConn) LocalAddr() net.Addr {
return mc.conn.LocalAddr()
}
func (mc *metricsConn) RemoteAddr() net.Addr {
return mc.conn.RemoteAddr()
}
func (mc *metricsConn) SetDeadline(t time.Time) error {
return mc.conn.SetDeadline(t)
}
func (mc *metricsConn) SetReadDeadline(t time.Time) error {
return mc.conn.SetReadDeadline(t)
}
func (mc *metricsConn) SetWriteDeadline(t time.Time) error {
return mc.conn.SetWriteDeadline(t)
}