mirror of
https://github.com/openziti/zrok.git
synced 2025-02-16 18:20:51 +01:00
look up share details from usage data (#128)
This commit is contained in:
parent
3a0c06f54d
commit
97f20acd87
@ -6,13 +6,22 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type MetricsAgent struct {
|
type MetricsAgent struct {
|
||||||
src Source
|
src Source
|
||||||
join chan struct{}
|
cache *shareCache
|
||||||
|
join chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Run(cfg *Config) (*MetricsAgent, error) {
|
func Run(cfg *Config) (*MetricsAgent, error) {
|
||||||
logrus.Info("starting")
|
logrus.Info("starting")
|
||||||
|
|
||||||
|
if cfg.Store == nil {
|
||||||
|
return nil, errors.New("no 'store' configured; exiting")
|
||||||
|
}
|
||||||
|
cache, err := newShareCache(cfg.Store)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "error creating share cache")
|
||||||
|
}
|
||||||
|
|
||||||
if cfg.Source == nil {
|
if cfg.Source == nil {
|
||||||
return nil, errors.New("no 'source' configured; exiting")
|
return nil, errors.New("no 'source' configured; exiting")
|
||||||
}
|
}
|
||||||
@ -32,12 +41,17 @@ func Run(cfg *Config) (*MetricsAgent, error) {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case event := <-events:
|
case event := <-events:
|
||||||
logrus.Info(Ingest(event))
|
usage := Ingest(event)
|
||||||
|
if shrToken, err := cache.getToken(usage.ZitiServiceId); err == nil {
|
||||||
|
logrus.Infof("share: %v, circuit: %v, rx: %d, tx: %d", shrToken, usage.ZitiCircuitId, usage.BackendRx, usage.BackendTx)
|
||||||
|
} else {
|
||||||
|
logrus.Error(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return &MetricsAgent{src, join}, nil
|
return &MetricsAgent{src: src, join: join}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ma *MetricsAgent) Stop() {
|
func (ma *MetricsAgent) Stop() {
|
||||||
|
@ -2,12 +2,14 @@ package metrics
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/michaelquigley/cf"
|
"github.com/michaelquigley/cf"
|
||||||
|
"github.com/openziti/zrok/controller/store"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Source interface{}
|
Source interface{}
|
||||||
Influx *InfluxConfig
|
Influx *InfluxConfig
|
||||||
|
Store *store.Config
|
||||||
}
|
}
|
||||||
|
|
||||||
type InfluxConfig struct {
|
type InfluxConfig struct {
|
||||||
|
@ -1 +1,21 @@
|
|||||||
package metrics
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
||||||
|
"github.com/influxdata/influxdb-client-go/v2/api"
|
||||||
|
)
|
||||||
|
|
||||||
|
type influxDb struct {
|
||||||
|
idb influxdb2.Client
|
||||||
|
writeApi api.WriteAPIBlocking
|
||||||
|
}
|
||||||
|
|
||||||
|
func openInfluxDb(cfg *InfluxConfig) *influxDb {
|
||||||
|
idb := influxdb2.NewClient(cfg.Url, cfg.Token)
|
||||||
|
wapi := idb.WriteAPIBlocking(cfg.Org, cfg.Bucket)
|
||||||
|
return &influxDb{idb, wapi}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *influxDb) Write(u *Usage) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
31
controller/metrics/shareCache.go
Normal file
31
controller/metrics/shareCache.go
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/openziti/zrok/controller/store"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type shareCache struct {
|
||||||
|
str *store.Store
|
||||||
|
}
|
||||||
|
|
||||||
|
func newShareCache(cfg *store.Config) (*shareCache, error) {
|
||||||
|
str, err := store.Open(cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "error opening store")
|
||||||
|
}
|
||||||
|
return &shareCache{str}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sc *shareCache) getToken(svcZId string) (string, error) {
|
||||||
|
tx, err := sc.str.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
defer func() { _ = tx.Rollback() }()
|
||||||
|
shr, err := sc.str.FindShareWithZId(svcZId, tx)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return shr.Token, nil
|
||||||
|
}
|
@ -62,6 +62,14 @@ func (self *Store) FindShareWithToken(shrToken string, tx *sqlx.Tx) (*Share, err
|
|||||||
return shr, nil
|
return shr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (self *Store) FindShareWithZId(zId string, tx *sqlx.Tx) (*Share, error) {
|
||||||
|
shr := &Share{}
|
||||||
|
if err := tx.QueryRowx("select * from shares where z_id = $1", zId).StructScan(shr); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "error selecting share by z_id")
|
||||||
|
}
|
||||||
|
return shr, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (self *Store) FindSharesForEnvironment(envId int, tx *sqlx.Tx) ([]*Share, error) {
|
func (self *Store) FindSharesForEnvironment(envId int, tx *sqlx.Tx) ([]*Share, error) {
|
||||||
rows, err := tx.Queryx("select shares.* from shares where environment_id = $1", envId)
|
rows, err := tx.Queryx("select shares.* from shares where environment_id = $1", envId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user