From df4c52aae57bf1eab964efdf03a9754279f6e511 Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Tue, 7 Mar 2023 12:57:35 -0500 Subject: [PATCH] remove legacy v0.3 metrics infrastructure (#128) --- .gitignore | 1 + controller/bootstrap.go | 126 ----------------- controller/config.go | 8 -- controller/controller.go | 10 -- controller/createIdentity.go | 29 ---- controller/metrics.go | 181 ------------------------ controller/metrics/config.go | 8 ++ controller/metrics/influx.go | 1 + endpoints/proxyBackend/http.go | 11 -- endpoints/proxyBackend/metricsConn.go | 49 ------- endpoints/publicFrontend/config.go | 13 +- endpoints/publicFrontend/http.go | 15 +- endpoints/publicFrontend/metrics.go | 109 -------------- endpoints/publicFrontend/metricsConn.go | 52 ------- etc/metrics.yml | 12 +- 15 files changed, 25 insertions(+), 600 deletions(-) delete mode 100644 controller/metrics.go create mode 100644 controller/metrics/influx.go delete mode 100644 endpoints/proxyBackend/metricsConn.go delete mode 100644 endpoints/publicFrontend/metrics.go delete mode 100644 endpoints/publicFrontend/metricsConn.go diff --git a/.gitignore b/.gitignore index 337fee57..9b877e3c 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ *.db automated-release-build etc/dev.yml +etc/dev-metrics.yml # Dependencies /node_modules/ diff --git a/controller/bootstrap.go b/controller/bootstrap.go index f85dd3ca..f3f79c6d 100644 --- a/controller/bootstrap.go +++ b/controller/bootstrap.go @@ -9,9 +9,6 @@ import ( "github.com/openziti/edge/rest_management_api_client/config" "github.com/openziti/edge/rest_management_api_client/edge_router_policy" "github.com/openziti/edge/rest_management_api_client/identity" - "github.com/openziti/edge/rest_management_api_client/service" - "github.com/openziti/edge/rest_management_api_client/service_edge_router_policy" - "github.com/openziti/edge/rest_management_api_client/service_policy" "github.com/openziti/edge/rest_model" rest_model_edge "github.com/openziti/edge/rest_model" "github.com/openziti/sdk-golang/ziti" @@ -100,27 +97,6 @@ func Bootstrap(skipCtrl, skipFrontend bool, inCfg *Config) error { return err } - var metricsSvcZId string - if metricsSvcZId, err = assertMetricsService(cfg, edge); err != nil { - return err - } - - if err := assertMetricsSerp(metricsSvcZId, cfg, edge); err != nil { - return err - } - - if !skipCtrl { - if err := assertCtrlMetricsBind(ctrlZId, metricsSvcZId, edge); err != nil { - return err - } - } - - if !skipFrontend { - if err := assertFrontendMetricsDial(frontendZId, metricsSvcZId, edge); err != nil { - return err - } - } - return nil } @@ -243,105 +219,3 @@ func assertErpForIdentity(name, zId string, edge *rest_management_api_client.Zit logrus.Infof("asserted erps for '%v' (%v)", name, zId) return nil } - -func assertMetricsService(cfg *Config, edge *rest_management_api_client.ZitiEdgeManagement) (string, error) { - filter := fmt.Sprintf("name=\"%v\" and tags.zrok != null", cfg.Metrics.ServiceName) - limit := int64(0) - offset := int64(0) - listReq := &service.ListServicesParams{ - Filter: &filter, - Limit: &limit, - Offset: &offset, - } - listReq.SetTimeout(30 * time.Second) - listResp, err := edge.Service.ListServices(listReq, nil) - if err != nil { - return "", errors.Wrapf(err, "error listing '%v' service", cfg.Metrics.ServiceName) - } - var svcZId string - if len(listResp.Payload.Data) != 1 { - logrus.Infof("creating '%v' service", cfg.Metrics.ServiceName) - svcZId, err = zrokEdgeSdk.CreateService("metrics", nil, nil, edge) - if err != nil { - return "", errors.Wrapf(err, "error creating '%v' service", cfg.Metrics.ServiceName) - } - } else { - svcZId = *listResp.Payload.Data[0].ID - } - - logrus.Infof("asserted '%v' service (%v)", cfg.Metrics.ServiceName, svcZId) - return svcZId, nil -} - -func assertMetricsSerp(metricsSvcZId string, cfg *Config, edge *rest_management_api_client.ZitiEdgeManagement) error { - filter := fmt.Sprintf("allOf(serviceRoles) = \"@%v\" and allOf(edgeRouterRoles) = \"#all\" and tags.zrok != null", metricsSvcZId) - limit := int64(0) - offset := int64(0) - listReq := &service_edge_router_policy.ListServiceEdgeRouterPoliciesParams{ - Filter: &filter, - Limit: &limit, - Offset: &offset, - } - listReq.SetTimeout(30 * time.Second) - listResp, err := edge.ServiceEdgeRouterPolicy.ListServiceEdgeRouterPolicies(listReq, nil) - if err != nil { - return errors.Wrapf(err, "error listing '%v' serps", cfg.Metrics.ServiceName) - } - if len(listResp.Payload.Data) != 1 { - logrus.Infof("creating '%v' serp", cfg.Metrics.ServiceName) - _, err := zrokEdgeSdk.CreateServiceEdgeRouterPolicy(cfg.Metrics.ServiceName, metricsSvcZId, nil, edge) - if err != nil { - return errors.Wrapf(err, "error creating '%v' serp", cfg.Metrics.ServiceName) - } - } - logrus.Infof("asserted '%v' serp", cfg.Metrics.ServiceName) - return nil -} - -func assertCtrlMetricsBind(ctrlZId, metricsSvcZId string, edge *rest_management_api_client.ZitiEdgeManagement) error { - filter := fmt.Sprintf("allOf(serviceRoles) = \"@%v\" and allOf(identityRoles) = \"@%v\" and type = 2 and tags.zrok != null", metricsSvcZId, ctrlZId) - limit := int64(0) - offset := int64(0) - listReq := &service_policy.ListServicePoliciesParams{ - Filter: &filter, - Limit: &limit, - Offset: &offset, - } - listReq.SetTimeout(30 * time.Second) - listResp, err := edge.ServicePolicy.ListServicePolicies(listReq, nil) - if err != nil { - return errors.Wrapf(err, "error listing 'ctrl-metrics-bind' service policy") - } - if len(listResp.Payload.Data) != 1 { - logrus.Info("creating 'ctrl-metrics-bind' service policy") - if err = zrokEdgeSdk.CreateServicePolicyBind("ctrl-metrics-bind", metricsSvcZId, ctrlZId, nil, edge); err != nil { - return errors.Wrap(err, "error creating 'ctrl-metrics-bind' service policy") - } - } - logrus.Infof("asserted 'ctrl-metrics-bind' service policy") - return nil -} - -func assertFrontendMetricsDial(frontendZId, metricsSvcZId string, edge *rest_management_api_client.ZitiEdgeManagement) error { - filter := fmt.Sprintf("allOf(serviceRoles) = \"@%v\" and allOf(identityRoles) = \"@%v\" and type = 1 and tags.zrok != null", metricsSvcZId, frontendZId) - limit := int64(0) - offset := int64(0) - listReq := &service_policy.ListServicePoliciesParams{ - Filter: &filter, - Limit: &limit, - Offset: &offset, - } - listReq.SetTimeout(30 * time.Second) - listResp, err := edge.ServicePolicy.ListServicePolicies(listReq, nil) - if err != nil { - return errors.Wrapf(err, "error listing 'frontend-metrics-dial' service policy") - } - if len(listResp.Payload.Data) != 1 { - logrus.Info("creating 'frontend-metrics-dial' service policy") - if err = zrokEdgeSdk.CreateServicePolicyDial("frontend-metrics-dial", metricsSvcZId, []string{frontendZId}, nil, edge); err != nil { - return errors.Wrap(err, "error creating 'frontend-metrics-dial' service policy") - } - } - logrus.Infof("asserted 'frontend-metrics-dial' service policy") - return nil -} diff --git a/controller/config.go b/controller/config.go index 80aab2ef..e45b7a70 100644 --- a/controller/config.go +++ b/controller/config.go @@ -18,7 +18,6 @@ type Config struct { Influx *InfluxConfig Limits *LimitsConfig Maintenance *MaintenanceConfig - Metrics *MetricsConfig Registration *RegistrationConfig ResetPassword *ResetPasswordConfig Store *store.Config @@ -58,10 +57,6 @@ type ZitiConfig struct { Password string `cf:"+secret"` } -type MetricsConfig struct { - ServiceName string -} - type InfluxConfig struct { Url string Bucket string @@ -99,9 +94,6 @@ func DefaultConfig() *Config { Environments: Unlimited, Shares: Unlimited, }, - Metrics: &MetricsConfig{ - ServiceName: "metrics", - }, Maintenance: &MaintenanceConfig{ ResetPassword: &ResetPasswordMaintenanceConfig{ ExpirationTimeout: time.Minute * 15, diff --git a/controller/controller.go b/controller/controller.go index 509f80b8..6d5db1a3 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -15,7 +15,6 @@ import ( var cfg *Config var str *store.Store -var mtr *metricsAgent var idb influxdb2.Client func Run(inCfg *Config) error { @@ -67,15 +66,6 @@ func Run(inCfg *Config) error { idb = influxdb2.NewClient(cfg.Influx.Url, cfg.Influx.Token) } - if cfg.Metrics != nil { - mtr = newMetricsAgent() - go mtr.run() - defer func() { - mtr.stop() - mtr.join() - }() - } - ctx, cancel := context.WithCancel(context.Background()) defer func() { cancel() diff --git a/controller/createIdentity.go b/controller/createIdentity.go index af2e01cf..811bd45b 100644 --- a/controller/createIdentity.go +++ b/controller/createIdentity.go @@ -3,15 +3,12 @@ package controller import ( "bytes" "encoding/json" - "fmt" "github.com/go-openapi/runtime/middleware" - "github.com/openziti/edge/rest_management_api_client/service" rest_model_edge "github.com/openziti/edge/rest_model" "github.com/openziti/zrok/controller/zrokEdgeSdk" "github.com/openziti/zrok/rest_model_zrok" "github.com/openziti/zrok/rest_server_zrok/operations/admin" "github.com/sirupsen/logrus" - "time" ) type createIdentityHandler struct{} @@ -52,32 +49,6 @@ func (h *createIdentityHandler) Handle(params admin.CreateIdentityParams, princi return admin.NewCreateIdentityInternalServerError() } - filter := fmt.Sprintf("name=\"%v\" and tags.zrok != null", cfg.Metrics.ServiceName) - limit := int64(0) - offset := int64(0) - listSvcReq := &service.ListServicesParams{ - Filter: &filter, - Limit: &limit, - Offset: &offset, - } - listSvcReq.SetTimeout(30 * time.Second) - listSvcResp, err := edge.Service.ListServices(listSvcReq, nil) - if err != nil { - logrus.Errorf("error listing metrics service: %v", err) - return admin.NewCreateIdentityInternalServerError() - } - if len(listSvcResp.Payload.Data) != 1 { - logrus.Errorf("could not find metrics service") - return admin.NewCreateIdentityInternalServerError() - } - svcZId := *listSvcResp.Payload.Data[0].ID - - spName := fmt.Sprintf("%v-%v-dial", name, cfg.Metrics.ServiceName) - if err := zrokEdgeSdk.CreateServicePolicyDial(spName, svcZId, []string{zId}, nil, edge); err != nil { - logrus.Errorf("error creating named dial service policy '%v': %v", spName, err) - return admin.NewCreateIdentityInternalServerError() - } - var out bytes.Buffer enc := json.NewEncoder(&out) enc.SetEscapeHTML(false) diff --git a/controller/metrics.go b/controller/metrics.go deleted file mode 100644 index d92784c6..00000000 --- a/controller/metrics.go +++ /dev/null @@ -1,181 +0,0 @@ -package controller - -import ( - "bytes" - "context" - "fmt" - influxdb2 "github.com/influxdata/influxdb-client-go/v2" - "github.com/influxdata/influxdb-client-go/v2/api" - "github.com/influxdata/influxdb-client-go/v2/api/write" - "github.com/openziti/sdk-golang/ziti" - "github.com/openziti/sdk-golang/ziti/config" - "github.com/openziti/sdk-golang/ziti/edge" - "github.com/openziti/zrok/model" - "github.com/openziti/zrok/util" - "github.com/openziti/zrok/zrokdir" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "gopkg.in/mgo.v2/bson" - "net" - "time" -) - -type metricsAgent struct { - writeApi api.WriteAPIBlocking - metricsQueue chan *model.Metrics - envCache map[string]*envCacheEntry - zCtx ziti.Context - zListener edge.Listener - shutdown chan struct{} - joined chan struct{} -} - -type envCacheEntry struct { - env string - lastAccess time.Time -} - -func newMetricsAgent() *metricsAgent { - ma := &metricsAgent{ - metricsQueue: make(chan *model.Metrics, 1024), - envCache: make(map[string]*envCacheEntry), - shutdown: make(chan struct{}), - joined: make(chan struct{}), - } - if idb != nil { - ma.writeApi = idb.WriteAPIBlocking(cfg.Influx.Org, cfg.Influx.Bucket) - } - return ma -} - -func (ma *metricsAgent) run() { - logrus.Info("starting") - defer logrus.Info("exiting") - defer close(ma.joined) - - if err := ma.bindService(); err != nil { - logrus.Errorf("error binding metrics service: %v", err) - return - } - -work: - for { - select { - case <-ma.shutdown: - break work - - case m := <-ma.metricsQueue: - if err := ma.processMetrics(m); err != nil { - logrus.Errorf("error processing metrics: %v", err) - } - } - } - - if err := ma.zListener.Close(); err != nil { - logrus.Errorf("error closing metrics service listener: %v", err) - } -} - -func (ma *metricsAgent) bindService() error { - zif, err := zrokdir.ZitiIdentityFile("ctrl") - if err != nil { - return errors.Wrap(err, "error getting 'ctrl' identity") - } - zCfg, err := config.NewFromFile(zif) - if err != nil { - return errors.Wrap(err, "error loading 'ctrl' identity") - } - ma.zCtx = ziti.NewContextWithConfig(zCfg) - opts := &ziti.ListenOptions{ - ConnectTimeout: 5 * time.Minute, - MaxConnections: 1024, - } - ma.zListener, err = ma.zCtx.ListenWithOptions(cfg.Metrics.ServiceName, opts) - if err != nil { - return errors.Wrapf(err, "error listening for metrics on '%v'", cfg.Metrics.ServiceName) - } - go ma.listen() - return nil -} - -func (ma *metricsAgent) listen() { - logrus.Info("started") - defer logrus.Info("exited") - for { - conn, err := ma.zListener.Accept() - if err != nil { - logrus.Errorf("error accepting: %v", err) - return - } - logrus.Debugf("accepted metrics connetion from '%v'", conn.RemoteAddr()) - go newMetricsHandler(conn, ma.metricsQueue).run() - } -} - -func (ma *metricsAgent) processMetrics(m *model.Metrics) error { - var pts []*write.Point - if len(m.Sessions) > 0 { - out := "metrics = {\n" - for k, v := range m.Sessions { - if ma.writeApi != nil { - pt := influxdb2.NewPoint("xfer", - map[string]string{"namespace": m.Namespace, "share": k}, - map[string]interface{}{"bytesRead": v.BytesRead, "bytesWritten": v.BytesWritten}, - time.UnixMilli(v.LastUpdate)) - pts = append(pts, pt) - } - out += fmt.Sprintf("\t[%v.%v]: %v/%v (%v)\n", m.Namespace, k, util.BytesToSize(v.BytesRead), util.BytesToSize(v.BytesWritten), time.Since(time.UnixMilli(v.LastUpdate))) - } - out += "}" - logrus.Info(out) - } - - if len(pts) > 0 { - if err := ma.writeApi.WritePoint(context.Background(), pts...); err == nil { - logrus.Debugf("wrote metrics to influx") - } else { - return err - } - } - - return nil -} - -func (ma *metricsAgent) stop() { - close(ma.shutdown) -} - -func (ma *metricsAgent) join() { - <-ma.joined -} - -type metricsHandler struct { - conn net.Conn - metricsQueue chan *model.Metrics -} - -func newMetricsHandler(conn net.Conn, metricsQueue chan *model.Metrics) *metricsHandler { - return &metricsHandler{conn, metricsQueue} -} - -func (mh *metricsHandler) run() { - logrus.Debugf("handling metrics connection: %v", mh.conn.RemoteAddr()) - var mtrBuf bytes.Buffer - buf := make([]byte, 4096) - for { - n, err := mh.conn.Read(buf) - if err != nil { - break - } - mtrBuf.Write(buf[:n]) - } - if err := mh.conn.Close(); err != nil { - logrus.Errorf("error closing metrics connection") - } - m := &model.Metrics{} - if err := bson.Unmarshal(mtrBuf.Bytes(), &m); err == nil { - mh.metricsQueue <- m - } else { - logrus.Errorf("error unmarshaling metrics: %v", err) - } -} diff --git a/controller/metrics/config.go b/controller/metrics/config.go index ab38f941..1fa0d72d 100644 --- a/controller/metrics/config.go +++ b/controller/metrics/config.go @@ -7,6 +7,14 @@ import ( type Config struct { Source interface{} + Influx *InfluxConfig +} + +type InfluxConfig struct { + Url string + Bucket string + Org string + Token string `cf:"+secret"` } func LoadConfig(path string) (*Config, error) { diff --git a/controller/metrics/influx.go b/controller/metrics/influx.go new file mode 100644 index 00000000..1abe097a --- /dev/null +++ b/controller/metrics/influx.go @@ -0,0 +1 @@ +package metrics diff --git a/endpoints/proxyBackend/http.go b/endpoints/proxyBackend/http.go index 5992809b..b2b73669 100644 --- a/endpoints/proxyBackend/http.go +++ b/endpoints/proxyBackend/http.go @@ -1,7 +1,6 @@ package proxyBackend import ( - "context" "crypto/tls" "fmt" "github.com/openziti/sdk-golang/ziti" @@ -11,7 +10,6 @@ import ( "github.com/openziti/zrok/util" "github.com/pkg/errors" "github.com/sirupsen/logrus" - "net" "net/http" "net/http/httputil" "net/url" @@ -79,7 +77,6 @@ func newReverseProxy(cfg *Config) (*httputil.ReverseProxy, error) { } tpt := http.DefaultTransport.(*http.Transport).Clone() - tpt.DialContext = metricsDial if cfg.Insecure { tpt.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} } @@ -105,11 +102,3 @@ func newReverseProxy(cfg *Config) (*httputil.ReverseProxy, error) { return proxy, nil } - -func metricsDial(_ context.Context, network string, addr string) (net.Conn, error) { - conn, err := net.Dial(network, addr) - if err != nil { - return conn, err - } - return newMetricsConn("backend", conn), nil -} diff --git a/endpoints/proxyBackend/metricsConn.go b/endpoints/proxyBackend/metricsConn.go deleted file mode 100644 index ba63b2d1..00000000 --- a/endpoints/proxyBackend/metricsConn.go +++ /dev/null @@ -1,49 +0,0 @@ -package proxyBackend - -import ( - "net" - "time" -) - -type metricsConn struct { - id string - conn net.Conn -} - -func newMetricsConn(id string, conn net.Conn) *metricsConn { - return &metricsConn{id, conn} -} - -func (mc *metricsConn) Read(b []byte) (n int, err error) { - n, err = mc.conn.Read(b) - return n, err -} - -func (mc *metricsConn) Write(b []byte) (n int, err error) { - n, err = mc.conn.Write(b) - return n, err -} - -func (mc *metricsConn) Close() error { - return mc.conn.Close() -} - -func (mc *metricsConn) LocalAddr() net.Addr { - return mc.conn.LocalAddr() -} - -func (mc *metricsConn) RemoteAddr() net.Addr { - return mc.conn.RemoteAddr() -} - -func (mc *metricsConn) SetDeadline(t time.Time) error { - return mc.conn.SetDeadline(t) -} - -func (mc *metricsConn) SetReadDeadline(t time.Time) error { - return mc.conn.SetReadDeadline(t) -} - -func (mc *metricsConn) SetWriteDeadline(t time.Time) error { - return mc.conn.SetWriteDeadline(t) -} diff --git a/endpoints/publicFrontend/config.go b/endpoints/publicFrontend/config.go index 674a2674..080a1714 100644 --- a/endpoints/publicFrontend/config.go +++ b/endpoints/publicFrontend/config.go @@ -3,29 +3,18 @@ package publicFrontend import ( "github.com/michaelquigley/cf" "github.com/pkg/errors" - "time" ) type Config struct { Identity string - Metrics *MetricsConfig Address string HostMatch string } -type MetricsConfig struct { - Service string - SendTimeout time.Duration -} - func DefaultConfig() *Config { return &Config{ Identity: "frontend", - Metrics: &MetricsConfig{ - Service: "metrics", - SendTimeout: 5 * time.Second, - }, - Address: "0.0.0.0:8080", + Address: "0.0.0.0:8080", } } diff --git a/endpoints/publicFrontend/http.go b/endpoints/publicFrontend/http.go index 884a734b..9e9ddbb7 100644 --- a/endpoints/publicFrontend/http.go +++ b/endpoints/publicFrontend/http.go @@ -24,16 +24,9 @@ type httpFrontend struct { cfg *Config zCtx ziti.Context handler http.Handler - metrics *metricsAgent } func NewHTTP(cfg *Config) (*httpFrontend, error) { - ma, err := newMetricsAgent(cfg) - if err != nil { - return nil, err - } - go ma.run() - zCfgPath, err := zrokdir.ZitiIdentityFile(cfg.Identity) if err != nil { return nil, errors.Wrapf(err, "error getting ziti identity '%v' from zrokdir", cfg.Identity) @@ -44,7 +37,7 @@ func NewHTTP(cfg *Config) (*httpFrontend, error) { } zCfg.ConfigTypes = []string{model.ZrokProxyConfig} zCtx := ziti.NewContextWithConfig(zCfg) - zDialCtx := zitiDialContext{ctx: zCtx, updates: ma.updates} + zDialCtx := zitiDialContext{ctx: zCtx} zTransport := http.DefaultTransport.(*http.Transport).Clone() zTransport.DialContext = zDialCtx.Dial @@ -59,7 +52,6 @@ func NewHTTP(cfg *Config) (*httpFrontend, error) { cfg: cfg, zCtx: zCtx, handler: handler, - metrics: ma, }, nil } @@ -68,8 +60,7 @@ func (self *httpFrontend) Run() error { } type zitiDialContext struct { - ctx ziti.Context - updates chan metricsUpdate + ctx ziti.Context } func (self *zitiDialContext) Dial(_ context.Context, _ string, addr string) (net.Conn, error) { @@ -78,7 +69,7 @@ func (self *zitiDialContext) Dial(_ context.Context, _ string, addr string) (net if err != nil { return conn, err } - return newMetricsConn(shrToken, conn, self.updates), nil + return conn, nil } func newServiceProxy(cfg *Config, ctx ziti.Context) (*httputil.ReverseProxy, error) { diff --git a/endpoints/publicFrontend/metrics.go b/endpoints/publicFrontend/metrics.go deleted file mode 100644 index 4c6a512f..00000000 --- a/endpoints/publicFrontend/metrics.go +++ /dev/null @@ -1,109 +0,0 @@ -package publicFrontend - -import ( - "github.com/openziti/sdk-golang/ziti" - "github.com/openziti/sdk-golang/ziti/config" - "github.com/openziti/zrok/model" - "github.com/openziti/zrok/zrokdir" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "gopkg.in/mgo.v2/bson" - "time" -) - -type metricsAgent struct { - cfg *Config - accum map[string]model.SessionMetrics - updates chan metricsUpdate - lastSend time.Time - zCtx ziti.Context -} - -type metricsUpdate struct { - id string - bytesRead int64 - bytesWritten int64 -} - -func newMetricsAgent(cfg *Config) (*metricsAgent, error) { - zif, err := zrokdir.ZitiIdentityFile(cfg.Identity) - if err != nil { - return nil, errors.Wrapf(err, "error getting '%v' identity file", cfg.Identity) - } - zCfg, err := config.NewFromFile(zif) - if err != nil { - return nil, errors.Wrapf(err, "error loading '%v' identity", cfg.Identity) - } - logrus.Infof("loaded '%v' identity", cfg.Identity) - return &metricsAgent{ - cfg: cfg, - accum: make(map[string]model.SessionMetrics), - updates: make(chan metricsUpdate, 10240), - lastSend: time.Now(), - zCtx: ziti.NewContextWithConfig(zCfg), - }, nil -} - -func (ma *metricsAgent) run() { - for { - select { - case update := <-ma.updates: - ma.pushUpdate(update) - if time.Since(ma.lastSend) >= ma.cfg.Metrics.SendTimeout { - if err := ma.sendMetrics(); err != nil { - logrus.Errorf("error sending metrics: %v", err) - } - } - - case <-time.After(5 * time.Second): - if err := ma.sendMetrics(); err != nil { - logrus.Errorf("error sending metrics: %v", err) - } - } - } -} - -func (ma *metricsAgent) pushUpdate(mu metricsUpdate) { - if sm, found := ma.accum[mu.id]; found { - ma.accum[mu.id] = model.SessionMetrics{ - BytesRead: sm.BytesRead + mu.bytesRead, - BytesWritten: sm.BytesWritten + mu.bytesWritten, - LastUpdate: time.Now().UnixMilli(), - } - } else { - ma.accum[mu.id] = model.SessionMetrics{ - BytesRead: mu.bytesRead, - BytesWritten: mu.bytesWritten, - LastUpdate: time.Now().UnixMilli(), - } - } -} - -func (ma *metricsAgent) sendMetrics() error { - if len(ma.accum) > 0 { - m := &model.Metrics{ - Namespace: ma.cfg.Identity, - Sessions: ma.accum, - } - metricsJson, err := bson.Marshal(m) - if err != nil { - return errors.Wrap(err, "error marshaling metrics") - } - conn, err := ma.zCtx.Dial(ma.cfg.Metrics.Service) - if err != nil { - return errors.Wrap(err, "error connecting to metrics service") - } - n, err := conn.Write(metricsJson) - if err != nil { - return errors.Wrap(err, "error sending metrics") - } - defer func() { _ = conn.Close() }() - if n != len(metricsJson) { - return errors.Wrap(err, "short metrics write") - } - logrus.Infof("sent %d bytes of metrics data", n) - ma.accum = make(map[string]model.SessionMetrics) - ma.lastSend = time.Now() - } - return nil -} diff --git a/endpoints/publicFrontend/metricsConn.go b/endpoints/publicFrontend/metricsConn.go deleted file mode 100644 index 92fe5114..00000000 --- a/endpoints/publicFrontend/metricsConn.go +++ /dev/null @@ -1,52 +0,0 @@ -package publicFrontend - -import ( - "net" - "time" -) - -type metricsConn struct { - id string - conn net.Conn - updates chan metricsUpdate -} - -func newMetricsConn(id string, conn net.Conn, updates chan metricsUpdate) *metricsConn { - return &metricsConn{id, conn, updates} -} - -func (mc *metricsConn) Read(b []byte) (n int, err error) { - n, err = mc.conn.Read(b) - mc.updates <- metricsUpdate{mc.id, int64(n), 0} - return n, err -} - -func (mc *metricsConn) Write(b []byte) (n int, err error) { - n, err = mc.conn.Write(b) - mc.updates <- metricsUpdate{mc.id, 0, int64(n)} - return n, err -} - -func (mc *metricsConn) Close() error { - return mc.conn.Close() -} - -func (mc *metricsConn) LocalAddr() net.Addr { - return mc.conn.LocalAddr() -} - -func (mc *metricsConn) RemoteAddr() net.Addr { - return mc.conn.RemoteAddr() -} - -func (mc *metricsConn) SetDeadline(t time.Time) error { - return mc.conn.SetDeadline(t) -} - -func (mc *metricsConn) SetReadDeadline(t time.Time) error { - return mc.conn.SetReadDeadline(t) -} - -func (mc *metricsConn) SetWriteDeadline(t time.Time) error { - return mc.conn.SetWriteDeadline(t) -} diff --git a/etc/metrics.yml b/etc/metrics.yml index 0293f52d..9b28e9bc 100644 --- a/etc/metrics.yml +++ b/etc/metrics.yml @@ -1,10 +1,20 @@ +# file source +# source: type: file path: /tmp/fabric-usage.log +# websocket source +# #source: # type: websocket # websocket_endpoint: wss://127.0.0.1:1280/fabric/v1/ws-api # api_endpoint: https://127.0.0.1:1280 # username: admin -# password: "" \ No newline at end of file +# password: "" + +influx: + url: "http://127.0.0.1:8086" + bucket: zrok + org: zrok + token: "" \ No newline at end of file