From 1d6ccfdeb6ea2c9a2b2208b5e41b3b1314b6cfc1 Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Mon, 13 Mar 2023 16:20:56 -0400 Subject: [PATCH] record environment and account ids on metrics records (#235); 'zrok metrics' -> 'zrok ctrl metrics' (#269) --- cmd/zrok/{metrics.go => controllerMetrics.go} | 2 +- controller/metrics/agent.go | 5 +-- controller/metrics/cache.go | 40 +++++++++++++++++++ controller/metrics/influx.go | 9 +++-- controller/metrics/model.go | 4 ++ controller/metrics/shareCache.go | 31 -------------- 6 files changed, 53 insertions(+), 38 deletions(-) rename cmd/zrok/{metrics.go => controllerMetrics.go} (95%) create mode 100644 controller/metrics/cache.go delete mode 100644 controller/metrics/shareCache.go diff --git a/cmd/zrok/metrics.go b/cmd/zrok/controllerMetrics.go similarity index 95% rename from cmd/zrok/metrics.go rename to cmd/zrok/controllerMetrics.go index fb482276..f29d75c9 100644 --- a/cmd/zrok/metrics.go +++ b/cmd/zrok/controllerMetrics.go @@ -14,7 +14,7 @@ import ( ) func init() { - rootCmd.AddCommand(newMetricsCommand().cmd) + controllerCmd.cmd.AddCommand(newMetricsCommand().cmd) } type metricsCommand struct { diff --git a/controller/metrics/agent.go b/controller/metrics/agent.go index b01ddc44..c06ab246 100644 --- a/controller/metrics/agent.go +++ b/controller/metrics/agent.go @@ -8,7 +8,7 @@ import ( type MetricsAgent struct { src Source - cache *shareCache + cache *cache join chan struct{} } @@ -46,8 +46,7 @@ func Run(cfg *Config, strCfg *store.Config) (*MetricsAgent, error) { select { case event := <-events: usage := Ingest(event) - if shrToken, err := cache.getToken(usage.ZitiServiceId); err == nil { - usage.ShareToken = shrToken + if err := cache.addZrokDetail(usage); err == nil { if err := idb.Write(usage); err != nil { logrus.Error(err) } diff --git a/controller/metrics/cache.go b/controller/metrics/cache.go new file mode 100644 index 00000000..6cb2b59e --- /dev/null +++ b/controller/metrics/cache.go @@ -0,0 +1,40 @@ +package metrics + +import ( + "github.com/openziti/zrok/controller/store" + "github.com/pkg/errors" +) + +type cache struct { + str *store.Store +} + +func newShareCache(cfg *store.Config) (*cache, error) { + str, err := store.Open(cfg) + if err != nil { + return nil, errors.Wrap(err, "error opening store") + } + return &cache{str}, nil +} + +func (sc *cache) addZrokDetail(u *Usage) error { + tx, err := sc.str.Begin() + if err != nil { + return err + } + defer func() { _ = tx.Rollback() }() + + shr, err := sc.str.FindShareWithZIdAndDeleted(u.ZitiServiceId, tx) + if err != nil { + return err + } + u.ShareToken = shr.Token + env, err := sc.str.GetEnvironment(shr.EnvironmentId, tx) + if err != nil { + return err + } + u.EnvironmentId = int64(env.Id) + u.AccountId = int64(*env.AccountId) + + return nil +} diff --git a/controller/metrics/influx.go b/controller/metrics/influx.go index a2059ff9..214ed8e5 100644 --- a/controller/metrics/influx.go +++ b/controller/metrics/influx.go @@ -24,16 +24,19 @@ func openInfluxDb(cfg *InfluxConfig) *influxDb { func (i *influxDb) Write(u *Usage) error { out := fmt.Sprintf("share: %v, circuit: %v", u.ShareToken, u.ZitiCircuitId) + envId := fmt.Sprintf("%d", u.EnvironmentId) + acctId := fmt.Sprintf("%d", u.AccountId) + var pts []*write.Point circuitPt := influxdb2.NewPoint("circuits", - map[string]string{"share": u.ShareToken}, + map[string]string{"share": u.ShareToken, "envId": envId, "acctId": acctId}, map[string]interface{}{"circuit": u.ZitiCircuitId}, u.IntervalStart) pts = append(pts, circuitPt) if u.BackendTx > 0 || u.BackendRx > 0 { pt := influxdb2.NewPoint("xfer", - map[string]string{"namespace": "backend", "share": u.ShareToken}, + map[string]string{"namespace": "backend", "share": u.ShareToken, "envId": envId, "acctId": acctId}, map[string]interface{}{"bytesRead": u.BackendRx, "bytesWritten": u.BackendTx}, u.IntervalStart) pts = append(pts, pt) @@ -41,7 +44,7 @@ func (i *influxDb) Write(u *Usage) error { } if u.FrontendTx > 0 || u.FrontendRx > 0 { pt := influxdb2.NewPoint("xfer", - map[string]string{"namespace": "frontend", "share": u.ShareToken}, + map[string]string{"namespace": "frontend", "share": u.ShareToken, "envId": envId, "acctId": acctId}, map[string]interface{}{"bytesRead": u.FrontendRx, "bytesWritten": u.FrontendTx}, u.IntervalStart) pts = append(pts, pt) diff --git a/controller/metrics/model.go b/controller/metrics/model.go index 0dcf13eb..632e8404 100644 --- a/controller/metrics/model.go +++ b/controller/metrics/model.go @@ -12,6 +12,8 @@ type Usage struct { ZitiServiceId string ZitiCircuitId string ShareToken string + EnvironmentId int64 + AccountId int64 FrontendTx int64 FrontendRx int64 BackendTx int64 @@ -25,6 +27,8 @@ func (u Usage) String() string { out += ", " + fmt.Sprintf("service '%v'", u.ZitiServiceId) out += ", " + fmt.Sprintf("circuit '%v'", u.ZitiCircuitId) out += ", " + fmt.Sprintf("share '%v'", u.ShareToken) + out += ", " + fmt.Sprintf("environment '%d'", u.EnvironmentId) + out += ", " + fmt.Sprintf("account '%v'", u.AccountId) out += ", " + fmt.Sprintf("fe {rx %v, tx %v}", util.BytesToSize(u.FrontendRx), util.BytesToSize(u.FrontendTx)) out += ", " + fmt.Sprintf("be {rx %v, tx %v}", util.BytesToSize(u.BackendRx), util.BytesToSize(u.BackendTx)) out += "}" diff --git a/controller/metrics/shareCache.go b/controller/metrics/shareCache.go deleted file mode 100644 index 2789953d..00000000 --- a/controller/metrics/shareCache.go +++ /dev/null @@ -1,31 +0,0 @@ -package metrics - -import ( - "github.com/openziti/zrok/controller/store" - "github.com/pkg/errors" -) - -type shareCache struct { - str *store.Store -} - -func newShareCache(cfg *store.Config) (*shareCache, error) { - str, err := store.Open(cfg) - if err != nil { - return nil, errors.Wrap(err, "error opening store") - } - return &shareCache{str}, nil -} - -func (sc *shareCache) getToken(svcZId string) (string, error) { - tx, err := sc.str.Begin() - if err != nil { - return "", err - } - defer func() { _ = tx.Rollback() }() - shr, err := sc.str.FindShareWithZIdAndDeleted(svcZId, tx) - if err != nil { - return "", err - } - return shr.Token, nil -}