wire in the new 'limits.Agent' infrastructure; extend the 'metrics.Agent' to support additional 'metrics.UsageSink' instances (#271)

This commit is contained in:
Michael Quigley
2023-03-16 15:05:39 -04:00
committed by Kenneth Bingham
parent e824c87d08
commit 29f38de546
4 changed files with 51 additions and 5 deletions

View File

@@ -11,7 +11,7 @@ type Agent struct {
src ZitiEventJsonSource
srcJoin chan struct{}
cache *cache
snk UsageSink
snks []UsageSink
}
func NewAgent(cfg *AgentConfig, str *store.Store, ifxCfg *InfluxConfig) (*Agent, error) {
@@ -22,10 +22,14 @@ func NewAgent(cfg *AgentConfig, str *store.Store, ifxCfg *InfluxConfig) (*Agent,
return nil, errors.New("invalid event json source")
}
a.cache = newShareCache(str)
a.snk = newInfluxWriter(ifxCfg)
a.snks = append(a.snks, newInfluxWriter(ifxCfg))
return a, nil
}
func (a *Agent) AddUsageSink(snk UsageSink) {
a.snks = append(a.snks, snk)
}
func (a *Agent) Start() error {
a.events = make(chan ZitiEventJson)
srcJoin, err := a.src.Start(a.events)
@@ -44,8 +48,10 @@ func (a *Agent) Start() error {
if err := a.cache.addZrokDetail(usage); err != nil {
logrus.Error(err)
}
if err := a.snk.Handle(usage); err != nil {
logrus.Error(err)
for _, snk := range a.snks {
if err := snk.Handle(usage); err != nil {
logrus.Error(err)
}
}
} else {
logrus.Error(err)