queue-based internal design for limits.Agent (#271)

This commit is contained in:
Michael Quigley 2023-03-21 13:05:22 -04:00
parent 7360598df5
commit 73ef98d7a6
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
2 changed files with 82 additions and 58 deletions

View File

@ -14,6 +14,7 @@ type Agent struct {
ifx *influxReader
zCfg *zrokEdgeSdk.Config
str *store.Store
queue chan *metrics.Usage
close chan struct{}
join chan struct{}
}
@ -24,6 +25,7 @@ func NewAgent(cfg *Config, ifxCfg *metrics.InfluxConfig, zCfg *zrokEdgeSdk.Confi
ifx: newInfluxReader(ifxCfg),
zCfg: zCfg,
str: str,
queue: make(chan *metrics.Usage, 1024),
close: make(chan struct{}),
join: make(chan struct{}),
}, nil
@ -39,25 +41,8 @@ func (a *Agent) Stop() {
}
func (a *Agent) Handle(u *metrics.Usage) error {
logrus.Infof("handling: %v", u)
acctRx, acctTx, err := a.ifx.totalRxTxForAccount(u.AccountId, 24*time.Hour)
if err != nil {
logrus.Error(err)
}
envRx, envTx, err := a.ifx.totalRxTxForEnvironment(u.EnvironmentId, 24*time.Hour)
if err != nil {
logrus.Error(err)
}
shareRx, shareTx, err := a.ifx.totalRxTxForShare(u.ShareToken, 24*time.Hour)
if err != nil {
logrus.Error(err)
}
logrus.Infof("'%v': acct:{rx: %v, tx: %v}, env:{rx: %v, tx: %v}, share:{rx: %v, tx: %v}",
u.ShareToken,
util.BytesToSize(acctRx), util.BytesToSize(acctTx),
util.BytesToSize(envRx), util.BytesToSize(envTx),
util.BytesToSize(shareRx), util.BytesToSize(shareTx),
)
logrus.Debugf("handling: %v", u)
a.queue <- u
return nil
}
@ -68,6 +53,9 @@ func (a *Agent) run() {
mainLoop:
for {
select {
case usage := <-a.queue:
a.enforce(usage)
case <-time.After(a.cfg.Cycle):
logrus.Info("inspection cycle")
@ -77,3 +65,59 @@ mainLoop:
}
}
}
func (a *Agent) enforce(u *metrics.Usage) {
acctPeriod := 24 * time.Hour
acctLimit := DefaultBandwidthPerPeriod()
if a.cfg.Bandwidth != nil && a.cfg.Bandwidth.PerAccount != nil {
acctLimit = a.cfg.Bandwidth.PerAccount
}
if acctLimit.Period > 0 {
acctPeriod = acctLimit.Period
}
acctRx, acctTx, err := a.ifx.totalRxTxForAccount(u.AccountId, acctPeriod)
if err != nil {
logrus.Error(err)
}
if acctLimit.Warning.Rx != Unlimited && acctRx > acctLimit.Warning.Rx {
logrus.Warnf("'%v': account over rx warning limit '%v' at '%v'", u.ShareToken, util.BytesToSize(acctLimit.Warning.Rx), util.BytesToSize(acctRx))
}
if acctLimit.Warning.Tx != Unlimited && acctTx > acctLimit.Warning.Tx {
logrus.Warnf("'%v': account over tx warning limit '%v' at '%v'", u.ShareToken, util.BytesToSize(acctLimit.Warning.Tx), util.BytesToSize(acctTx))
}
if acctLimit.Warning.Total != Unlimited && acctTx+acctRx > acctLimit.Warning.Total {
logrus.Warnf("'%v': account over total warning limit '%v' at '%v'", u.ShareToken, util.BytesToSize(acctLimit.Warning.Total), util.BytesToSize(acctRx+acctTx))
}
envPeriod := 24 * time.Hour
envLimit := DefaultBandwidthPerPeriod()
if a.cfg.Bandwidth != nil && a.cfg.Bandwidth.PerEnvironment != nil {
envLimit = a.cfg.Bandwidth.PerEnvironment
}
if envLimit.Period > 0 {
envPeriod = envLimit.Period
}
envRx, envTx, err := a.ifx.totalRxTxForEnvironment(u.EnvironmentId, envPeriod)
if err != nil {
logrus.Error(err)
}
sharePeriod := 24 * time.Hour
shareLimit := DefaultBandwidthPerPeriod()
if a.cfg.Bandwidth != nil && a.cfg.Bandwidth.PerShare != nil {
shareLimit = a.cfg.Bandwidth.PerShare
}
if shareLimit.Period > 0 {
sharePeriod = shareLimit.Period
}
shareRx, shareTx, err := a.ifx.totalRxTxForShare(u.ShareToken, sharePeriod)
if err != nil {
logrus.Error(err)
}
logrus.Infof("'%v': acct:{rx: %v, tx: %v}/%v, env:{rx: %v, tx: %v}/%v, share:{rx: %v, tx: %v}/%v",
u.ShareToken,
util.BytesToSize(acctRx), util.BytesToSize(acctTx), acctPeriod,
util.BytesToSize(envRx), util.BytesToSize(envTx), envPeriod,
util.BytesToSize(shareRx), util.BytesToSize(shareTx), sharePeriod,
)
}

View File

@ -30,50 +30,30 @@ type Bandwidth struct {
Total int64
}
func DefaultBandwidthPerPeriod() *BandwidthPerPeriod {
return &BandwidthPerPeriod{
Period: 15 * (24 * time.Hour),
Warning: &Bandwidth{
Rx: Unlimited,
Tx: Unlimited,
Total: Unlimited,
},
Limit: &Bandwidth{
Rx: Unlimited,
Tx: Unlimited,
Total: Unlimited,
},
}
}
func DefaultConfig() *Config {
return &Config{
Environments: Unlimited,
Shares: Unlimited,
Bandwidth: &BandwidthConfig{
PerAccount: &BandwidthPerPeriod{
Period: 365 * (24 * time.Hour),
Warning: &Bandwidth{
Rx: Unlimited,
Tx: Unlimited,
Total: Unlimited,
},
Limit: &Bandwidth{
Rx: Unlimited,
Tx: Unlimited,
Total: Unlimited,
},
},
PerEnvironment: &BandwidthPerPeriod{
Period: 365 * (24 * time.Hour),
Warning: &Bandwidth{
Rx: Unlimited,
Tx: Unlimited,
Total: Unlimited,
},
Limit: &Bandwidth{
Rx: Unlimited,
Tx: Unlimited,
Total: Unlimited,
},
},
PerShare: &BandwidthPerPeriod{
Period: 365 * (24 * time.Hour),
Warning: &Bandwidth{
Rx: Unlimited,
Tx: Unlimited,
Total: Unlimited,
},
Limit: &Bandwidth{
Rx: Unlimited,
Tx: Unlimited,
Total: Unlimited,
},
},
PerAccount: DefaultBandwidthPerPeriod(),
PerEnvironment: DefaultBandwidthPerPeriod(),
PerShare: DefaultBandwidthPerPeriod(),
},
Enforcing: false,
Cycle: 15 * time.Minute,