From 97f20acd874d552bbe84c15e392d11a868709562 Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Tue, 7 Mar 2023 15:27:39 -0500 Subject: [PATCH] look up share details from usage data (#128) --- controller/metrics/agent.go | 22 ++++++++++++++++++---- controller/metrics/config.go | 2 ++ controller/metrics/influx.go | 20 ++++++++++++++++++++ controller/metrics/shareCache.go | 31 +++++++++++++++++++++++++++++++ controller/store/share.go | 8 ++++++++ 5 files changed, 79 insertions(+), 4 deletions(-) create mode 100644 controller/metrics/shareCache.go diff --git a/controller/metrics/agent.go b/controller/metrics/agent.go index a0f0f175..4233c027 100644 --- a/controller/metrics/agent.go +++ b/controller/metrics/agent.go @@ -6,13 +6,22 @@ import ( ) type MetricsAgent struct { - src Source - join chan struct{} + src Source + cache *shareCache + join chan struct{} } func Run(cfg *Config) (*MetricsAgent, error) { logrus.Info("starting") + if cfg.Store == nil { + return nil, errors.New("no 'store' configured; exiting") + } + cache, err := newShareCache(cfg.Store) + if err != nil { + return nil, errors.Wrap(err, "error creating share cache") + } + if cfg.Source == nil { return nil, errors.New("no 'source' configured; exiting") } @@ -32,12 +41,17 @@ func Run(cfg *Config) (*MetricsAgent, error) { for { select { case event := <-events: - logrus.Info(Ingest(event)) + usage := Ingest(event) + if shrToken, err := cache.getToken(usage.ZitiServiceId); err == nil { + logrus.Infof("share: %v, circuit: %v, rx: %d, tx: %d", shrToken, usage.ZitiCircuitId, usage.BackendRx, usage.BackendTx) + } else { + logrus.Error(err) + } } } }() - return &MetricsAgent{src, join}, nil + return &MetricsAgent{src: src, join: join}, nil } func (ma *MetricsAgent) Stop() { diff --git a/controller/metrics/config.go b/controller/metrics/config.go index 1fa0d72d..4f08f5a1 100644 --- a/controller/metrics/config.go +++ b/controller/metrics/config.go @@ -2,12 +2,14 @@ package metrics import ( "github.com/michaelquigley/cf" + "github.com/openziti/zrok/controller/store" "github.com/pkg/errors" ) type Config struct { Source interface{} Influx *InfluxConfig + Store *store.Config } type InfluxConfig struct { diff --git a/controller/metrics/influx.go b/controller/metrics/influx.go index 1abe097a..18a79e84 100644 --- a/controller/metrics/influx.go +++ b/controller/metrics/influx.go @@ -1 +1,21 @@ package metrics + +import ( + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + "github.com/influxdata/influxdb-client-go/v2/api" +) + +type influxDb struct { + idb influxdb2.Client + writeApi api.WriteAPIBlocking +} + +func openInfluxDb(cfg *InfluxConfig) *influxDb { + idb := influxdb2.NewClient(cfg.Url, cfg.Token) + wapi := idb.WriteAPIBlocking(cfg.Org, cfg.Bucket) + return &influxDb{idb, wapi} +} + +func (i *influxDb) Write(u *Usage) error { + return nil +} diff --git a/controller/metrics/shareCache.go b/controller/metrics/shareCache.go new file mode 100644 index 00000000..69ce4c88 --- /dev/null +++ b/controller/metrics/shareCache.go @@ -0,0 +1,31 @@ +package metrics + +import ( + "github.com/openziti/zrok/controller/store" + "github.com/pkg/errors" +) + +type shareCache struct { + str *store.Store +} + +func newShareCache(cfg *store.Config) (*shareCache, error) { + str, err := store.Open(cfg) + if err != nil { + return nil, errors.Wrap(err, "error opening store") + } + return &shareCache{str}, nil +} + +func (sc *shareCache) getToken(svcZId string) (string, error) { + tx, err := sc.str.Begin() + if err != nil { + return "", err + } + defer func() { _ = tx.Rollback() }() + shr, err := sc.str.FindShareWithZId(svcZId, tx) + if err != nil { + return "", err + } + return shr.Token, nil +} diff --git a/controller/store/share.go b/controller/store/share.go index 5ed92b5b..5800a36f 100644 --- a/controller/store/share.go +++ b/controller/store/share.go @@ -62,6 +62,14 @@ func (self *Store) FindShareWithToken(shrToken string, tx *sqlx.Tx) (*Share, err return shr, nil } +func (self *Store) FindShareWithZId(zId string, tx *sqlx.Tx) (*Share, error) { + shr := &Share{} + if err := tx.QueryRowx("select * from shares where z_id = $1", zId).StructScan(shr); err != nil { + return nil, errors.Wrap(err, "error selecting share by z_id") + } + return shr, nil +} + func (self *Store) FindSharesForEnvironment(envId int, tx *sqlx.Tx) ([]*Share, error) { rows, err := tx.Queryx("select shares.* from shares where environment_id = $1", envId) if err != nil {