remove legacy v0.3 metrics infrastructure (#128)

This commit is contained in:
Michael Quigley 2023-03-07 12:57:35 -05:00
parent 84bd7d391a
commit df4c52aae5
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
15 changed files with 25 additions and 600 deletions

1
.gitignore vendored
View File

@ -4,6 +4,7 @@
*.db *.db
automated-release-build automated-release-build
etc/dev.yml etc/dev.yml
etc/dev-metrics.yml
# Dependencies # Dependencies
/node_modules/ /node_modules/

View File

@ -9,9 +9,6 @@ import (
"github.com/openziti/edge/rest_management_api_client/config" "github.com/openziti/edge/rest_management_api_client/config"
"github.com/openziti/edge/rest_management_api_client/edge_router_policy" "github.com/openziti/edge/rest_management_api_client/edge_router_policy"
"github.com/openziti/edge/rest_management_api_client/identity" "github.com/openziti/edge/rest_management_api_client/identity"
"github.com/openziti/edge/rest_management_api_client/service"
"github.com/openziti/edge/rest_management_api_client/service_edge_router_policy"
"github.com/openziti/edge/rest_management_api_client/service_policy"
"github.com/openziti/edge/rest_model" "github.com/openziti/edge/rest_model"
rest_model_edge "github.com/openziti/edge/rest_model" rest_model_edge "github.com/openziti/edge/rest_model"
"github.com/openziti/sdk-golang/ziti" "github.com/openziti/sdk-golang/ziti"
@ -100,27 +97,6 @@ func Bootstrap(skipCtrl, skipFrontend bool, inCfg *Config) error {
return err return err
} }
var metricsSvcZId string
if metricsSvcZId, err = assertMetricsService(cfg, edge); err != nil {
return err
}
if err := assertMetricsSerp(metricsSvcZId, cfg, edge); err != nil {
return err
}
if !skipCtrl {
if err := assertCtrlMetricsBind(ctrlZId, metricsSvcZId, edge); err != nil {
return err
}
}
if !skipFrontend {
if err := assertFrontendMetricsDial(frontendZId, metricsSvcZId, edge); err != nil {
return err
}
}
return nil return nil
} }
@ -243,105 +219,3 @@ func assertErpForIdentity(name, zId string, edge *rest_management_api_client.Zit
logrus.Infof("asserted erps for '%v' (%v)", name, zId) logrus.Infof("asserted erps for '%v' (%v)", name, zId)
return nil return nil
} }
func assertMetricsService(cfg *Config, edge *rest_management_api_client.ZitiEdgeManagement) (string, error) {
filter := fmt.Sprintf("name=\"%v\" and tags.zrok != null", cfg.Metrics.ServiceName)
limit := int64(0)
offset := int64(0)
listReq := &service.ListServicesParams{
Filter: &filter,
Limit: &limit,
Offset: &offset,
}
listReq.SetTimeout(30 * time.Second)
listResp, err := edge.Service.ListServices(listReq, nil)
if err != nil {
return "", errors.Wrapf(err, "error listing '%v' service", cfg.Metrics.ServiceName)
}
var svcZId string
if len(listResp.Payload.Data) != 1 {
logrus.Infof("creating '%v' service", cfg.Metrics.ServiceName)
svcZId, err = zrokEdgeSdk.CreateService("metrics", nil, nil, edge)
if err != nil {
return "", errors.Wrapf(err, "error creating '%v' service", cfg.Metrics.ServiceName)
}
} else {
svcZId = *listResp.Payload.Data[0].ID
}
logrus.Infof("asserted '%v' service (%v)", cfg.Metrics.ServiceName, svcZId)
return svcZId, nil
}
func assertMetricsSerp(metricsSvcZId string, cfg *Config, edge *rest_management_api_client.ZitiEdgeManagement) error {
filter := fmt.Sprintf("allOf(serviceRoles) = \"@%v\" and allOf(edgeRouterRoles) = \"#all\" and tags.zrok != null", metricsSvcZId)
limit := int64(0)
offset := int64(0)
listReq := &service_edge_router_policy.ListServiceEdgeRouterPoliciesParams{
Filter: &filter,
Limit: &limit,
Offset: &offset,
}
listReq.SetTimeout(30 * time.Second)
listResp, err := edge.ServiceEdgeRouterPolicy.ListServiceEdgeRouterPolicies(listReq, nil)
if err != nil {
return errors.Wrapf(err, "error listing '%v' serps", cfg.Metrics.ServiceName)
}
if len(listResp.Payload.Data) != 1 {
logrus.Infof("creating '%v' serp", cfg.Metrics.ServiceName)
_, err := zrokEdgeSdk.CreateServiceEdgeRouterPolicy(cfg.Metrics.ServiceName, metricsSvcZId, nil, edge)
if err != nil {
return errors.Wrapf(err, "error creating '%v' serp", cfg.Metrics.ServiceName)
}
}
logrus.Infof("asserted '%v' serp", cfg.Metrics.ServiceName)
return nil
}
func assertCtrlMetricsBind(ctrlZId, metricsSvcZId string, edge *rest_management_api_client.ZitiEdgeManagement) error {
filter := fmt.Sprintf("allOf(serviceRoles) = \"@%v\" and allOf(identityRoles) = \"@%v\" and type = 2 and tags.zrok != null", metricsSvcZId, ctrlZId)
limit := int64(0)
offset := int64(0)
listReq := &service_policy.ListServicePoliciesParams{
Filter: &filter,
Limit: &limit,
Offset: &offset,
}
listReq.SetTimeout(30 * time.Second)
listResp, err := edge.ServicePolicy.ListServicePolicies(listReq, nil)
if err != nil {
return errors.Wrapf(err, "error listing 'ctrl-metrics-bind' service policy")
}
if len(listResp.Payload.Data) != 1 {
logrus.Info("creating 'ctrl-metrics-bind' service policy")
if err = zrokEdgeSdk.CreateServicePolicyBind("ctrl-metrics-bind", metricsSvcZId, ctrlZId, nil, edge); err != nil {
return errors.Wrap(err, "error creating 'ctrl-metrics-bind' service policy")
}
}
logrus.Infof("asserted 'ctrl-metrics-bind' service policy")
return nil
}
func assertFrontendMetricsDial(frontendZId, metricsSvcZId string, edge *rest_management_api_client.ZitiEdgeManagement) error {
filter := fmt.Sprintf("allOf(serviceRoles) = \"@%v\" and allOf(identityRoles) = \"@%v\" and type = 1 and tags.zrok != null", metricsSvcZId, frontendZId)
limit := int64(0)
offset := int64(0)
listReq := &service_policy.ListServicePoliciesParams{
Filter: &filter,
Limit: &limit,
Offset: &offset,
}
listReq.SetTimeout(30 * time.Second)
listResp, err := edge.ServicePolicy.ListServicePolicies(listReq, nil)
if err != nil {
return errors.Wrapf(err, "error listing 'frontend-metrics-dial' service policy")
}
if len(listResp.Payload.Data) != 1 {
logrus.Info("creating 'frontend-metrics-dial' service policy")
if err = zrokEdgeSdk.CreateServicePolicyDial("frontend-metrics-dial", metricsSvcZId, []string{frontendZId}, nil, edge); err != nil {
return errors.Wrap(err, "error creating 'frontend-metrics-dial' service policy")
}
}
logrus.Infof("asserted 'frontend-metrics-dial' service policy")
return nil
}

View File

@ -18,7 +18,6 @@ type Config struct {
Influx *InfluxConfig Influx *InfluxConfig
Limits *LimitsConfig Limits *LimitsConfig
Maintenance *MaintenanceConfig Maintenance *MaintenanceConfig
Metrics *MetricsConfig
Registration *RegistrationConfig Registration *RegistrationConfig
ResetPassword *ResetPasswordConfig ResetPassword *ResetPasswordConfig
Store *store.Config Store *store.Config
@ -58,10 +57,6 @@ type ZitiConfig struct {
Password string `cf:"+secret"` Password string `cf:"+secret"`
} }
type MetricsConfig struct {
ServiceName string
}
type InfluxConfig struct { type InfluxConfig struct {
Url string Url string
Bucket string Bucket string
@ -99,9 +94,6 @@ func DefaultConfig() *Config {
Environments: Unlimited, Environments: Unlimited,
Shares: Unlimited, Shares: Unlimited,
}, },
Metrics: &MetricsConfig{
ServiceName: "metrics",
},
Maintenance: &MaintenanceConfig{ Maintenance: &MaintenanceConfig{
ResetPassword: &ResetPasswordMaintenanceConfig{ ResetPassword: &ResetPasswordMaintenanceConfig{
ExpirationTimeout: time.Minute * 15, ExpirationTimeout: time.Minute * 15,

View File

@ -15,7 +15,6 @@ import (
var cfg *Config var cfg *Config
var str *store.Store var str *store.Store
var mtr *metricsAgent
var idb influxdb2.Client var idb influxdb2.Client
func Run(inCfg *Config) error { func Run(inCfg *Config) error {
@ -67,15 +66,6 @@ func Run(inCfg *Config) error {
idb = influxdb2.NewClient(cfg.Influx.Url, cfg.Influx.Token) idb = influxdb2.NewClient(cfg.Influx.Url, cfg.Influx.Token)
} }
if cfg.Metrics != nil {
mtr = newMetricsAgent()
go mtr.run()
defer func() {
mtr.stop()
mtr.join()
}()
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer func() { defer func() {
cancel() cancel()

View File

@ -3,15 +3,12 @@ package controller
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt"
"github.com/go-openapi/runtime/middleware" "github.com/go-openapi/runtime/middleware"
"github.com/openziti/edge/rest_management_api_client/service"
rest_model_edge "github.com/openziti/edge/rest_model" rest_model_edge "github.com/openziti/edge/rest_model"
"github.com/openziti/zrok/controller/zrokEdgeSdk" "github.com/openziti/zrok/controller/zrokEdgeSdk"
"github.com/openziti/zrok/rest_model_zrok" "github.com/openziti/zrok/rest_model_zrok"
"github.com/openziti/zrok/rest_server_zrok/operations/admin" "github.com/openziti/zrok/rest_server_zrok/operations/admin"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"time"
) )
type createIdentityHandler struct{} type createIdentityHandler struct{}
@ -52,32 +49,6 @@ func (h *createIdentityHandler) Handle(params admin.CreateIdentityParams, princi
return admin.NewCreateIdentityInternalServerError() return admin.NewCreateIdentityInternalServerError()
} }
filter := fmt.Sprintf("name=\"%v\" and tags.zrok != null", cfg.Metrics.ServiceName)
limit := int64(0)
offset := int64(0)
listSvcReq := &service.ListServicesParams{
Filter: &filter,
Limit: &limit,
Offset: &offset,
}
listSvcReq.SetTimeout(30 * time.Second)
listSvcResp, err := edge.Service.ListServices(listSvcReq, nil)
if err != nil {
logrus.Errorf("error listing metrics service: %v", err)
return admin.NewCreateIdentityInternalServerError()
}
if len(listSvcResp.Payload.Data) != 1 {
logrus.Errorf("could not find metrics service")
return admin.NewCreateIdentityInternalServerError()
}
svcZId := *listSvcResp.Payload.Data[0].ID
spName := fmt.Sprintf("%v-%v-dial", name, cfg.Metrics.ServiceName)
if err := zrokEdgeSdk.CreateServicePolicyDial(spName, svcZId, []string{zId}, nil, edge); err != nil {
logrus.Errorf("error creating named dial service policy '%v': %v", spName, err)
return admin.NewCreateIdentityInternalServerError()
}
var out bytes.Buffer var out bytes.Buffer
enc := json.NewEncoder(&out) enc := json.NewEncoder(&out)
enc.SetEscapeHTML(false) enc.SetEscapeHTML(false)

View File

@ -1,181 +0,0 @@
package controller
import (
"bytes"
"context"
"fmt"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/write"
"github.com/openziti/sdk-golang/ziti"
"github.com/openziti/sdk-golang/ziti/config"
"github.com/openziti/sdk-golang/ziti/edge"
"github.com/openziti/zrok/model"
"github.com/openziti/zrok/util"
"github.com/openziti/zrok/zrokdir"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"gopkg.in/mgo.v2/bson"
"net"
"time"
)
type metricsAgent struct {
writeApi api.WriteAPIBlocking
metricsQueue chan *model.Metrics
envCache map[string]*envCacheEntry
zCtx ziti.Context
zListener edge.Listener
shutdown chan struct{}
joined chan struct{}
}
type envCacheEntry struct {
env string
lastAccess time.Time
}
func newMetricsAgent() *metricsAgent {
ma := &metricsAgent{
metricsQueue: make(chan *model.Metrics, 1024),
envCache: make(map[string]*envCacheEntry),
shutdown: make(chan struct{}),
joined: make(chan struct{}),
}
if idb != nil {
ma.writeApi = idb.WriteAPIBlocking(cfg.Influx.Org, cfg.Influx.Bucket)
}
return ma
}
func (ma *metricsAgent) run() {
logrus.Info("starting")
defer logrus.Info("exiting")
defer close(ma.joined)
if err := ma.bindService(); err != nil {
logrus.Errorf("error binding metrics service: %v", err)
return
}
work:
for {
select {
case <-ma.shutdown:
break work
case m := <-ma.metricsQueue:
if err := ma.processMetrics(m); err != nil {
logrus.Errorf("error processing metrics: %v", err)
}
}
}
if err := ma.zListener.Close(); err != nil {
logrus.Errorf("error closing metrics service listener: %v", err)
}
}
func (ma *metricsAgent) bindService() error {
zif, err := zrokdir.ZitiIdentityFile("ctrl")
if err != nil {
return errors.Wrap(err, "error getting 'ctrl' identity")
}
zCfg, err := config.NewFromFile(zif)
if err != nil {
return errors.Wrap(err, "error loading 'ctrl' identity")
}
ma.zCtx = ziti.NewContextWithConfig(zCfg)
opts := &ziti.ListenOptions{
ConnectTimeout: 5 * time.Minute,
MaxConnections: 1024,
}
ma.zListener, err = ma.zCtx.ListenWithOptions(cfg.Metrics.ServiceName, opts)
if err != nil {
return errors.Wrapf(err, "error listening for metrics on '%v'", cfg.Metrics.ServiceName)
}
go ma.listen()
return nil
}
func (ma *metricsAgent) listen() {
logrus.Info("started")
defer logrus.Info("exited")
for {
conn, err := ma.zListener.Accept()
if err != nil {
logrus.Errorf("error accepting: %v", err)
return
}
logrus.Debugf("accepted metrics connetion from '%v'", conn.RemoteAddr())
go newMetricsHandler(conn, ma.metricsQueue).run()
}
}
func (ma *metricsAgent) processMetrics(m *model.Metrics) error {
var pts []*write.Point
if len(m.Sessions) > 0 {
out := "metrics = {\n"
for k, v := range m.Sessions {
if ma.writeApi != nil {
pt := influxdb2.NewPoint("xfer",
map[string]string{"namespace": m.Namespace, "share": k},
map[string]interface{}{"bytesRead": v.BytesRead, "bytesWritten": v.BytesWritten},
time.UnixMilli(v.LastUpdate))
pts = append(pts, pt)
}
out += fmt.Sprintf("\t[%v.%v]: %v/%v (%v)\n", m.Namespace, k, util.BytesToSize(v.BytesRead), util.BytesToSize(v.BytesWritten), time.Since(time.UnixMilli(v.LastUpdate)))
}
out += "}"
logrus.Info(out)
}
if len(pts) > 0 {
if err := ma.writeApi.WritePoint(context.Background(), pts...); err == nil {
logrus.Debugf("wrote metrics to influx")
} else {
return err
}
}
return nil
}
func (ma *metricsAgent) stop() {
close(ma.shutdown)
}
func (ma *metricsAgent) join() {
<-ma.joined
}
type metricsHandler struct {
conn net.Conn
metricsQueue chan *model.Metrics
}
func newMetricsHandler(conn net.Conn, metricsQueue chan *model.Metrics) *metricsHandler {
return &metricsHandler{conn, metricsQueue}
}
func (mh *metricsHandler) run() {
logrus.Debugf("handling metrics connection: %v", mh.conn.RemoteAddr())
var mtrBuf bytes.Buffer
buf := make([]byte, 4096)
for {
n, err := mh.conn.Read(buf)
if err != nil {
break
}
mtrBuf.Write(buf[:n])
}
if err := mh.conn.Close(); err != nil {
logrus.Errorf("error closing metrics connection")
}
m := &model.Metrics{}
if err := bson.Unmarshal(mtrBuf.Bytes(), &m); err == nil {
mh.metricsQueue <- m
} else {
logrus.Errorf("error unmarshaling metrics: %v", err)
}
}

View File

@ -7,6 +7,14 @@ import (
type Config struct { type Config struct {
Source interface{} Source interface{}
Influx *InfluxConfig
}
type InfluxConfig struct {
Url string
Bucket string
Org string
Token string `cf:"+secret"`
} }
func LoadConfig(path string) (*Config, error) { func LoadConfig(path string) (*Config, error) {

View File

@ -0,0 +1 @@
package metrics

View File

@ -1,7 +1,6 @@
package proxyBackend package proxyBackend
import ( import (
"context"
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"github.com/openziti/sdk-golang/ziti" "github.com/openziti/sdk-golang/ziti"
@ -11,7 +10,6 @@ import (
"github.com/openziti/zrok/util" "github.com/openziti/zrok/util"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"net"
"net/http" "net/http"
"net/http/httputil" "net/http/httputil"
"net/url" "net/url"
@ -79,7 +77,6 @@ func newReverseProxy(cfg *Config) (*httputil.ReverseProxy, error) {
} }
tpt := http.DefaultTransport.(*http.Transport).Clone() tpt := http.DefaultTransport.(*http.Transport).Clone()
tpt.DialContext = metricsDial
if cfg.Insecure { if cfg.Insecure {
tpt.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} tpt.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
} }
@ -105,11 +102,3 @@ func newReverseProxy(cfg *Config) (*httputil.ReverseProxy, error) {
return proxy, nil return proxy, nil
} }
func metricsDial(_ context.Context, network string, addr string) (net.Conn, error) {
conn, err := net.Dial(network, addr)
if err != nil {
return conn, err
}
return newMetricsConn("backend", conn), nil
}

View File

@ -1,49 +0,0 @@
package proxyBackend
import (
"net"
"time"
)
type metricsConn struct {
id string
conn net.Conn
}
func newMetricsConn(id string, conn net.Conn) *metricsConn {
return &metricsConn{id, conn}
}
func (mc *metricsConn) Read(b []byte) (n int, err error) {
n, err = mc.conn.Read(b)
return n, err
}
func (mc *metricsConn) Write(b []byte) (n int, err error) {
n, err = mc.conn.Write(b)
return n, err
}
func (mc *metricsConn) Close() error {
return mc.conn.Close()
}
func (mc *metricsConn) LocalAddr() net.Addr {
return mc.conn.LocalAddr()
}
func (mc *metricsConn) RemoteAddr() net.Addr {
return mc.conn.RemoteAddr()
}
func (mc *metricsConn) SetDeadline(t time.Time) error {
return mc.conn.SetDeadline(t)
}
func (mc *metricsConn) SetReadDeadline(t time.Time) error {
return mc.conn.SetReadDeadline(t)
}
func (mc *metricsConn) SetWriteDeadline(t time.Time) error {
return mc.conn.SetWriteDeadline(t)
}

View File

@ -3,29 +3,18 @@ package publicFrontend
import ( import (
"github.com/michaelquigley/cf" "github.com/michaelquigley/cf"
"github.com/pkg/errors" "github.com/pkg/errors"
"time"
) )
type Config struct { type Config struct {
Identity string Identity string
Metrics *MetricsConfig
Address string Address string
HostMatch string HostMatch string
} }
type MetricsConfig struct {
Service string
SendTimeout time.Duration
}
func DefaultConfig() *Config { func DefaultConfig() *Config {
return &Config{ return &Config{
Identity: "frontend", Identity: "frontend",
Metrics: &MetricsConfig{ Address: "0.0.0.0:8080",
Service: "metrics",
SendTimeout: 5 * time.Second,
},
Address: "0.0.0.0:8080",
} }
} }

View File

@ -24,16 +24,9 @@ type httpFrontend struct {
cfg *Config cfg *Config
zCtx ziti.Context zCtx ziti.Context
handler http.Handler handler http.Handler
metrics *metricsAgent
} }
func NewHTTP(cfg *Config) (*httpFrontend, error) { func NewHTTP(cfg *Config) (*httpFrontend, error) {
ma, err := newMetricsAgent(cfg)
if err != nil {
return nil, err
}
go ma.run()
zCfgPath, err := zrokdir.ZitiIdentityFile(cfg.Identity) zCfgPath, err := zrokdir.ZitiIdentityFile(cfg.Identity)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "error getting ziti identity '%v' from zrokdir", cfg.Identity) return nil, errors.Wrapf(err, "error getting ziti identity '%v' from zrokdir", cfg.Identity)
@ -44,7 +37,7 @@ func NewHTTP(cfg *Config) (*httpFrontend, error) {
} }
zCfg.ConfigTypes = []string{model.ZrokProxyConfig} zCfg.ConfigTypes = []string{model.ZrokProxyConfig}
zCtx := ziti.NewContextWithConfig(zCfg) zCtx := ziti.NewContextWithConfig(zCfg)
zDialCtx := zitiDialContext{ctx: zCtx, updates: ma.updates} zDialCtx := zitiDialContext{ctx: zCtx}
zTransport := http.DefaultTransport.(*http.Transport).Clone() zTransport := http.DefaultTransport.(*http.Transport).Clone()
zTransport.DialContext = zDialCtx.Dial zTransport.DialContext = zDialCtx.Dial
@ -59,7 +52,6 @@ func NewHTTP(cfg *Config) (*httpFrontend, error) {
cfg: cfg, cfg: cfg,
zCtx: zCtx, zCtx: zCtx,
handler: handler, handler: handler,
metrics: ma,
}, nil }, nil
} }
@ -68,8 +60,7 @@ func (self *httpFrontend) Run() error {
} }
type zitiDialContext struct { type zitiDialContext struct {
ctx ziti.Context ctx ziti.Context
updates chan metricsUpdate
} }
func (self *zitiDialContext) Dial(_ context.Context, _ string, addr string) (net.Conn, error) { func (self *zitiDialContext) Dial(_ context.Context, _ string, addr string) (net.Conn, error) {
@ -78,7 +69,7 @@ func (self *zitiDialContext) Dial(_ context.Context, _ string, addr string) (net
if err != nil { if err != nil {
return conn, err return conn, err
} }
return newMetricsConn(shrToken, conn, self.updates), nil return conn, nil
} }
func newServiceProxy(cfg *Config, ctx ziti.Context) (*httputil.ReverseProxy, error) { func newServiceProxy(cfg *Config, ctx ziti.Context) (*httputil.ReverseProxy, error) {

View File

@ -1,109 +0,0 @@
package publicFrontend
import (
"github.com/openziti/sdk-golang/ziti"
"github.com/openziti/sdk-golang/ziti/config"
"github.com/openziti/zrok/model"
"github.com/openziti/zrok/zrokdir"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"gopkg.in/mgo.v2/bson"
"time"
)
type metricsAgent struct {
cfg *Config
accum map[string]model.SessionMetrics
updates chan metricsUpdate
lastSend time.Time
zCtx ziti.Context
}
type metricsUpdate struct {
id string
bytesRead int64
bytesWritten int64
}
func newMetricsAgent(cfg *Config) (*metricsAgent, error) {
zif, err := zrokdir.ZitiIdentityFile(cfg.Identity)
if err != nil {
return nil, errors.Wrapf(err, "error getting '%v' identity file", cfg.Identity)
}
zCfg, err := config.NewFromFile(zif)
if err != nil {
return nil, errors.Wrapf(err, "error loading '%v' identity", cfg.Identity)
}
logrus.Infof("loaded '%v' identity", cfg.Identity)
return &metricsAgent{
cfg: cfg,
accum: make(map[string]model.SessionMetrics),
updates: make(chan metricsUpdate, 10240),
lastSend: time.Now(),
zCtx: ziti.NewContextWithConfig(zCfg),
}, nil
}
func (ma *metricsAgent) run() {
for {
select {
case update := <-ma.updates:
ma.pushUpdate(update)
if time.Since(ma.lastSend) >= ma.cfg.Metrics.SendTimeout {
if err := ma.sendMetrics(); err != nil {
logrus.Errorf("error sending metrics: %v", err)
}
}
case <-time.After(5 * time.Second):
if err := ma.sendMetrics(); err != nil {
logrus.Errorf("error sending metrics: %v", err)
}
}
}
}
func (ma *metricsAgent) pushUpdate(mu metricsUpdate) {
if sm, found := ma.accum[mu.id]; found {
ma.accum[mu.id] = model.SessionMetrics{
BytesRead: sm.BytesRead + mu.bytesRead,
BytesWritten: sm.BytesWritten + mu.bytesWritten,
LastUpdate: time.Now().UnixMilli(),
}
} else {
ma.accum[mu.id] = model.SessionMetrics{
BytesRead: mu.bytesRead,
BytesWritten: mu.bytesWritten,
LastUpdate: time.Now().UnixMilli(),
}
}
}
func (ma *metricsAgent) sendMetrics() error {
if len(ma.accum) > 0 {
m := &model.Metrics{
Namespace: ma.cfg.Identity,
Sessions: ma.accum,
}
metricsJson, err := bson.Marshal(m)
if err != nil {
return errors.Wrap(err, "error marshaling metrics")
}
conn, err := ma.zCtx.Dial(ma.cfg.Metrics.Service)
if err != nil {
return errors.Wrap(err, "error connecting to metrics service")
}
n, err := conn.Write(metricsJson)
if err != nil {
return errors.Wrap(err, "error sending metrics")
}
defer func() { _ = conn.Close() }()
if n != len(metricsJson) {
return errors.Wrap(err, "short metrics write")
}
logrus.Infof("sent %d bytes of metrics data", n)
ma.accum = make(map[string]model.SessionMetrics)
ma.lastSend = time.Now()
}
return nil
}

View File

@ -1,52 +0,0 @@
package publicFrontend
import (
"net"
"time"
)
type metricsConn struct {
id string
conn net.Conn
updates chan metricsUpdate
}
func newMetricsConn(id string, conn net.Conn, updates chan metricsUpdate) *metricsConn {
return &metricsConn{id, conn, updates}
}
func (mc *metricsConn) Read(b []byte) (n int, err error) {
n, err = mc.conn.Read(b)
mc.updates <- metricsUpdate{mc.id, int64(n), 0}
return n, err
}
func (mc *metricsConn) Write(b []byte) (n int, err error) {
n, err = mc.conn.Write(b)
mc.updates <- metricsUpdate{mc.id, 0, int64(n)}
return n, err
}
func (mc *metricsConn) Close() error {
return mc.conn.Close()
}
func (mc *metricsConn) LocalAddr() net.Addr {
return mc.conn.LocalAddr()
}
func (mc *metricsConn) RemoteAddr() net.Addr {
return mc.conn.RemoteAddr()
}
func (mc *metricsConn) SetDeadline(t time.Time) error {
return mc.conn.SetDeadline(t)
}
func (mc *metricsConn) SetReadDeadline(t time.Time) error {
return mc.conn.SetReadDeadline(t)
}
func (mc *metricsConn) SetWriteDeadline(t time.Time) error {
return mc.conn.SetWriteDeadline(t)
}

View File

@ -1,10 +1,20 @@
# file source
#
source: source:
type: file type: file
path: /tmp/fabric-usage.log path: /tmp/fabric-usage.log
# websocket source
#
#source: #source:
# type: websocket # type: websocket
# websocket_endpoint: wss://127.0.0.1:1280/fabric/v1/ws-api # websocket_endpoint: wss://127.0.0.1:1280/fabric/v1/ws-api
# api_endpoint: https://127.0.0.1:1280 # api_endpoint: https://127.0.0.1:1280
# username: admin # username: admin
# password: "" # password: ""
influx:
url: "http://127.0.0.1:8086"
bucket: zrok
org: zrok
token: ""