From 29f38de5466b079ee48f89bf485343b0278d23f7 Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Thu, 16 Mar 2023 15:05:39 -0400 Subject: [PATCH] wire in the new 'limits.Agent' infrastructure; extend the 'metrics.Agent' to support additional 'metrics.UsageSink' instances (#271) --- controller/controller.go | 13 +++++++++++++ controller/limits/agent.go | 27 +++++++++++++++++++++++++++ controller/limits/config.go | 2 +- controller/metrics/agent.go | 14 ++++++++++---- 4 files changed, 51 insertions(+), 5 deletions(-) create mode 100644 controller/limits/agent.go diff --git a/controller/controller.go b/controller/controller.go index 670bd863..ab91ab97 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -3,6 +3,7 @@ package controller import ( "context" "github.com/openziti/zrok/controller/config" + "github.com/openziti/zrok/controller/limits" "github.com/openziti/zrok/controller/metrics" "github.com/sirupsen/logrus" @@ -80,6 +81,18 @@ func Run(inCfg *config.Config) error { return errors.Wrap(err, "error starting metrics agent") } defer func() { ma.Stop() }() + + if cfg.Limits != nil && cfg.Limits.Enforcing { + la, err := limits.NewAgent(cfg.Limits, cfg.Metrics.Influx, cfg.Ziti, str) + if err != nil { + return errors.Wrap(err, "error creating limits agent") + } + ma.AddUsageSink(la) + if err := la.Start(); err != nil { + return errors.Wrap(err, "error starting limits agent") + } + defer func() { la.Stop() }() + } } ctx, cancel := context.WithCancel(context.Background()) diff --git a/controller/limits/agent.go b/controller/limits/agent.go new file mode 100644 index 00000000..d69193d6 --- /dev/null +++ b/controller/limits/agent.go @@ -0,0 +1,27 @@ +package limits + +import ( + "github.com/openziti/zrok/controller/metrics" + "github.com/openziti/zrok/controller/store" + "github.com/openziti/zrok/controller/zrokEdgeSdk" + "github.com/sirupsen/logrus" +) + +type Agent struct { +} + +func NewAgent(cfg *Config, ifxCfg *metrics.InfluxConfig, zCfg *zrokEdgeSdk.Config, str *store.Store) (*Agent, error) { + return &Agent{}, nil +} + +func (a *Agent) Start() error { + return nil +} + +func (a *Agent) Stop() { +} + +func (a *Agent) Handle(u *metrics.Usage) error { + logrus.Infof("handling: %v", u) + return nil +} diff --git a/controller/limits/config.go b/controller/limits/config.go index 4c044950..e3d10cff 100644 --- a/controller/limits/config.go +++ b/controller/limits/config.go @@ -8,7 +8,7 @@ type Config struct { Environments int Shares int Bandwidth *BandwidthConfig - Cycle time.Duration + Enforcing bool } type BandwidthConfig struct { diff --git a/controller/metrics/agent.go b/controller/metrics/agent.go index 89be79e5..ce10c8fb 100644 --- a/controller/metrics/agent.go +++ b/controller/metrics/agent.go @@ -11,7 +11,7 @@ type Agent struct { src ZitiEventJsonSource srcJoin chan struct{} cache *cache - snk UsageSink + snks []UsageSink } func NewAgent(cfg *AgentConfig, str *store.Store, ifxCfg *InfluxConfig) (*Agent, error) { @@ -22,10 +22,14 @@ func NewAgent(cfg *AgentConfig, str *store.Store, ifxCfg *InfluxConfig) (*Agent, return nil, errors.New("invalid event json source") } a.cache = newShareCache(str) - a.snk = newInfluxWriter(ifxCfg) + a.snks = append(a.snks, newInfluxWriter(ifxCfg)) return a, nil } +func (a *Agent) AddUsageSink(snk UsageSink) { + a.snks = append(a.snks, snk) +} + func (a *Agent) Start() error { a.events = make(chan ZitiEventJson) srcJoin, err := a.src.Start(a.events) @@ -44,8 +48,10 @@ func (a *Agent) Start() error { if err := a.cache.addZrokDetail(usage); err != nil { logrus.Error(err) } - if err := a.snk.Handle(usage); err != nil { - logrus.Error(err) + for _, snk := range a.snks { + if err := snk.Handle(usage); err != nil { + logrus.Error(err) + } } } else { logrus.Error(err)