diff --git a/controller/limits/agent.go b/controller/limits/agent.go index 5d9508eb..2f9185c5 100644 --- a/controller/limits/agent.go +++ b/controller/limits/agent.go @@ -14,6 +14,7 @@ type Agent struct { ifx *influxReader zCfg *zrokEdgeSdk.Config str *store.Store + queue chan *metrics.Usage close chan struct{} join chan struct{} } @@ -24,6 +25,7 @@ func NewAgent(cfg *Config, ifxCfg *metrics.InfluxConfig, zCfg *zrokEdgeSdk.Confi ifx: newInfluxReader(ifxCfg), zCfg: zCfg, str: str, + queue: make(chan *metrics.Usage, 1024), close: make(chan struct{}), join: make(chan struct{}), }, nil @@ -39,25 +41,8 @@ func (a *Agent) Stop() { } func (a *Agent) Handle(u *metrics.Usage) error { - logrus.Infof("handling: %v", u) - acctRx, acctTx, err := a.ifx.totalRxTxForAccount(u.AccountId, 24*time.Hour) - if err != nil { - logrus.Error(err) - } - envRx, envTx, err := a.ifx.totalRxTxForEnvironment(u.EnvironmentId, 24*time.Hour) - if err != nil { - logrus.Error(err) - } - shareRx, shareTx, err := a.ifx.totalRxTxForShare(u.ShareToken, 24*time.Hour) - if err != nil { - logrus.Error(err) - } - logrus.Infof("'%v': acct:{rx: %v, tx: %v}, env:{rx: %v, tx: %v}, share:{rx: %v, tx: %v}", - u.ShareToken, - util.BytesToSize(acctRx), util.BytesToSize(acctTx), - util.BytesToSize(envRx), util.BytesToSize(envTx), - util.BytesToSize(shareRx), util.BytesToSize(shareTx), - ) + logrus.Debugf("handling: %v", u) + a.queue <- u return nil } @@ -68,6 +53,9 @@ func (a *Agent) run() { mainLoop: for { select { + case usage := <-a.queue: + a.enforce(usage) + case <-time.After(a.cfg.Cycle): logrus.Info("inspection cycle") @@ -77,3 +65,59 @@ mainLoop: } } } + +func (a *Agent) enforce(u *metrics.Usage) { + acctPeriod := 24 * time.Hour + acctLimit := DefaultBandwidthPerPeriod() + if a.cfg.Bandwidth != nil && a.cfg.Bandwidth.PerAccount != nil { + acctLimit = a.cfg.Bandwidth.PerAccount + } + if acctLimit.Period > 0 { + acctPeriod = acctLimit.Period + } + acctRx, acctTx, err := a.ifx.totalRxTxForAccount(u.AccountId, acctPeriod) + if err != nil { + logrus.Error(err) + } + if acctLimit.Warning.Rx != Unlimited && acctRx > acctLimit.Warning.Rx { + logrus.Warnf("'%v': account over rx warning limit '%v' at '%v'", u.ShareToken, util.BytesToSize(acctLimit.Warning.Rx), util.BytesToSize(acctRx)) + } + if acctLimit.Warning.Tx != Unlimited && acctTx > acctLimit.Warning.Tx { + logrus.Warnf("'%v': account over tx warning limit '%v' at '%v'", u.ShareToken, util.BytesToSize(acctLimit.Warning.Tx), util.BytesToSize(acctTx)) + } + if acctLimit.Warning.Total != Unlimited && acctTx+acctRx > acctLimit.Warning.Total { + logrus.Warnf("'%v': account over total warning limit '%v' at '%v'", u.ShareToken, util.BytesToSize(acctLimit.Warning.Total), util.BytesToSize(acctRx+acctTx)) + } + + envPeriod := 24 * time.Hour + envLimit := DefaultBandwidthPerPeriod() + if a.cfg.Bandwidth != nil && a.cfg.Bandwidth.PerEnvironment != nil { + envLimit = a.cfg.Bandwidth.PerEnvironment + } + if envLimit.Period > 0 { + envPeriod = envLimit.Period + } + envRx, envTx, err := a.ifx.totalRxTxForEnvironment(u.EnvironmentId, envPeriod) + if err != nil { + logrus.Error(err) + } + + sharePeriod := 24 * time.Hour + shareLimit := DefaultBandwidthPerPeriod() + if a.cfg.Bandwidth != nil && a.cfg.Bandwidth.PerShare != nil { + shareLimit = a.cfg.Bandwidth.PerShare + } + if shareLimit.Period > 0 { + sharePeriod = shareLimit.Period + } + shareRx, shareTx, err := a.ifx.totalRxTxForShare(u.ShareToken, sharePeriod) + if err != nil { + logrus.Error(err) + } + logrus.Infof("'%v': acct:{rx: %v, tx: %v}/%v, env:{rx: %v, tx: %v}/%v, share:{rx: %v, tx: %v}/%v", + u.ShareToken, + util.BytesToSize(acctRx), util.BytesToSize(acctTx), acctPeriod, + util.BytesToSize(envRx), util.BytesToSize(envTx), envPeriod, + util.BytesToSize(shareRx), util.BytesToSize(shareTx), sharePeriod, + ) +} diff --git a/controller/limits/config.go b/controller/limits/config.go index 7117f97e..4e6fbbe1 100644 --- a/controller/limits/config.go +++ b/controller/limits/config.go @@ -30,50 +30,30 @@ type Bandwidth struct { Total int64 } +func DefaultBandwidthPerPeriod() *BandwidthPerPeriod { + return &BandwidthPerPeriod{ + Period: 15 * (24 * time.Hour), + Warning: &Bandwidth{ + Rx: Unlimited, + Tx: Unlimited, + Total: Unlimited, + }, + Limit: &Bandwidth{ + Rx: Unlimited, + Tx: Unlimited, + Total: Unlimited, + }, + } +} + func DefaultConfig() *Config { return &Config{ Environments: Unlimited, Shares: Unlimited, Bandwidth: &BandwidthConfig{ - PerAccount: &BandwidthPerPeriod{ - Period: 365 * (24 * time.Hour), - Warning: &Bandwidth{ - Rx: Unlimited, - Tx: Unlimited, - Total: Unlimited, - }, - Limit: &Bandwidth{ - Rx: Unlimited, - Tx: Unlimited, - Total: Unlimited, - }, - }, - PerEnvironment: &BandwidthPerPeriod{ - Period: 365 * (24 * time.Hour), - Warning: &Bandwidth{ - Rx: Unlimited, - Tx: Unlimited, - Total: Unlimited, - }, - Limit: &Bandwidth{ - Rx: Unlimited, - Tx: Unlimited, - Total: Unlimited, - }, - }, - PerShare: &BandwidthPerPeriod{ - Period: 365 * (24 * time.Hour), - Warning: &Bandwidth{ - Rx: Unlimited, - Tx: Unlimited, - Total: Unlimited, - }, - Limit: &Bandwidth{ - Rx: Unlimited, - Tx: Unlimited, - Total: Unlimited, - }, - }, + PerAccount: DefaultBandwidthPerPeriod(), + PerEnvironment: DefaultBandwidthPerPeriod(), + PerShare: DefaultBandwidthPerPeriod(), }, Enforcing: false, Cycle: 15 * time.Minute,