record environment and account ids on metrics records (#235); 'zrok metrics' -> 'zrok ctrl metrics' (#269)

This commit is contained in:
Michael Quigley 2023-03-13 16:20:56 -04:00 committed by Kenneth Bingham
parent 69a232c059
commit 1d6ccfdeb6
No known key found for this signature in database
GPG Key ID: 31709281860130B6
6 changed files with 53 additions and 38 deletions

View File

@ -14,7 +14,7 @@ import (
)
func init() {
rootCmd.AddCommand(newMetricsCommand().cmd)
controllerCmd.cmd.AddCommand(newMetricsCommand().cmd)
}
type metricsCommand struct {

View File

@ -8,7 +8,7 @@ import (
type MetricsAgent struct {
src Source
cache *shareCache
cache *cache
join chan struct{}
}
@ -46,8 +46,7 @@ func Run(cfg *Config, strCfg *store.Config) (*MetricsAgent, error) {
select {
case event := <-events:
usage := Ingest(event)
if shrToken, err := cache.getToken(usage.ZitiServiceId); err == nil {
usage.ShareToken = shrToken
if err := cache.addZrokDetail(usage); err == nil {
if err := idb.Write(usage); err != nil {
logrus.Error(err)
}

View File

@ -0,0 +1,40 @@
package metrics
import (
"github.com/openziti/zrok/controller/store"
"github.com/pkg/errors"
)
type cache struct {
str *store.Store
}
func newShareCache(cfg *store.Config) (*cache, error) {
str, err := store.Open(cfg)
if err != nil {
return nil, errors.Wrap(err, "error opening store")
}
return &cache{str}, nil
}
func (sc *cache) addZrokDetail(u *Usage) error {
tx, err := sc.str.Begin()
if err != nil {
return err
}
defer func() { _ = tx.Rollback() }()
shr, err := sc.str.FindShareWithZIdAndDeleted(u.ZitiServiceId, tx)
if err != nil {
return err
}
u.ShareToken = shr.Token
env, err := sc.str.GetEnvironment(shr.EnvironmentId, tx)
if err != nil {
return err
}
u.EnvironmentId = int64(env.Id)
u.AccountId = int64(*env.AccountId)
return nil
}

View File

@ -24,16 +24,19 @@ func openInfluxDb(cfg *InfluxConfig) *influxDb {
func (i *influxDb) Write(u *Usage) error {
out := fmt.Sprintf("share: %v, circuit: %v", u.ShareToken, u.ZitiCircuitId)
envId := fmt.Sprintf("%d", u.EnvironmentId)
acctId := fmt.Sprintf("%d", u.AccountId)
var pts []*write.Point
circuitPt := influxdb2.NewPoint("circuits",
map[string]string{"share": u.ShareToken},
map[string]string{"share": u.ShareToken, "envId": envId, "acctId": acctId},
map[string]interface{}{"circuit": u.ZitiCircuitId},
u.IntervalStart)
pts = append(pts, circuitPt)
if u.BackendTx > 0 || u.BackendRx > 0 {
pt := influxdb2.NewPoint("xfer",
map[string]string{"namespace": "backend", "share": u.ShareToken},
map[string]string{"namespace": "backend", "share": u.ShareToken, "envId": envId, "acctId": acctId},
map[string]interface{}{"bytesRead": u.BackendRx, "bytesWritten": u.BackendTx},
u.IntervalStart)
pts = append(pts, pt)
@ -41,7 +44,7 @@ func (i *influxDb) Write(u *Usage) error {
}
if u.FrontendTx > 0 || u.FrontendRx > 0 {
pt := influxdb2.NewPoint("xfer",
map[string]string{"namespace": "frontend", "share": u.ShareToken},
map[string]string{"namespace": "frontend", "share": u.ShareToken, "envId": envId, "acctId": acctId},
map[string]interface{}{"bytesRead": u.FrontendRx, "bytesWritten": u.FrontendTx},
u.IntervalStart)
pts = append(pts, pt)

View File

@ -12,6 +12,8 @@ type Usage struct {
ZitiServiceId string
ZitiCircuitId string
ShareToken string
EnvironmentId int64
AccountId int64
FrontendTx int64
FrontendRx int64
BackendTx int64
@ -25,6 +27,8 @@ func (u Usage) String() string {
out += ", " + fmt.Sprintf("service '%v'", u.ZitiServiceId)
out += ", " + fmt.Sprintf("circuit '%v'", u.ZitiCircuitId)
out += ", " + fmt.Sprintf("share '%v'", u.ShareToken)
out += ", " + fmt.Sprintf("environment '%d'", u.EnvironmentId)
out += ", " + fmt.Sprintf("account '%v'", u.AccountId)
out += ", " + fmt.Sprintf("fe {rx %v, tx %v}", util.BytesToSize(u.FrontendRx), util.BytesToSize(u.FrontendTx))
out += ", " + fmt.Sprintf("be {rx %v, tx %v}", util.BytesToSize(u.BackendRx), util.BytesToSize(u.BackendTx))
out += "}"

View File

@ -1,31 +0,0 @@
package metrics
import (
"github.com/openziti/zrok/controller/store"
"github.com/pkg/errors"
)
type shareCache struct {
str *store.Store
}
func newShareCache(cfg *store.Config) (*shareCache, error) {
str, err := store.Open(cfg)
if err != nil {
return nil, errors.Wrap(err, "error opening store")
}
return &shareCache{str}, nil
}
func (sc *shareCache) getToken(svcZId string) (string, error) {
tx, err := sc.str.Begin()
if err != nil {
return "", err
}
defer func() { _ = tx.Rollback() }()
shr, err := sc.str.FindShareWithZIdAndDeleted(svcZId, tx)
if err != nil {
return "", err
}
return shr.Token, nil
}