more limits.Agent elaboration (#271)

This commit is contained in:
Michael Quigley 2023-03-17 13:13:33 -04:00
parent b69237e9cc
commit 9418195150
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
4 changed files with 58 additions and 6 deletions

View File

@ -88,9 +88,7 @@ func Run(inCfg *config.Config) error {
return errors.Wrap(err, "error creating limits agent")
}
ma.AddUsageSink(la)
if err := la.Start(); err != nil {
return errors.Wrap(err, "error starting limits agent")
}
la.Start()
defer func() { la.Stop() }()
}
}

View File

@ -5,23 +5,56 @@ import (
"github.com/openziti/zrok/controller/store"
"github.com/openziti/zrok/controller/zrokEdgeSdk"
"github.com/sirupsen/logrus"
"time"
)
type Agent struct {
cfg *Config
ifx *influxReader
zCfg *zrokEdgeSdk.Config
str *store.Store
close chan struct{}
join chan struct{}
}
func NewAgent(cfg *Config, ifxCfg *metrics.InfluxConfig, zCfg *zrokEdgeSdk.Config, str *store.Store) (*Agent, error) {
return &Agent{}, nil
return &Agent{
cfg: cfg,
ifx: newInfluxReader(ifxCfg),
zCfg: zCfg,
str: str,
close: make(chan struct{}),
join: make(chan struct{}),
}, nil
}
func (a *Agent) Start() error {
return nil
func (a *Agent) Start() {
go a.run()
}
func (a *Agent) Stop() {
close(a.close)
<-a.join
}
func (a *Agent) Handle(u *metrics.Usage) error {
logrus.Infof("handling: %v", u)
return nil
}
func (a *Agent) run() {
logrus.Info("started")
defer logrus.Info("stopped")
mainLoop:
for {
select {
case <-time.After(a.cfg.Cycle):
logrus.Info("insepection cycle")
case <-a.close:
close(a.join)
break mainLoop
}
}
}

View File

@ -8,6 +8,7 @@ type Config struct {
Environments int
Shares int
Bandwidth *BandwidthConfig
Cycle time.Duration
Enforcing bool
}
@ -74,5 +75,7 @@ func DefaultConfig() *Config {
},
},
},
Enforcing: false,
Cycle: 15 * time.Minute,
}
}

View File

@ -0,0 +1,18 @@
package limits
import (
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/openziti/zrok/controller/metrics"
)
type influxReader struct {
idb influxdb2.Client
queryApi api.QueryAPI
}
func newInfluxReader(cfg *metrics.InfluxConfig) *influxReader {
idb := influxdb2.NewClient(cfg.Url, cfg.Token)
queryApi := idb.QueryAPI(cfg.Org)
return &influxReader{idb, queryApi}
}