diff --git a/controller/limits/agent.go b/controller/limits/agent.go index e2420b53..226adb90 100644 --- a/controller/limits/agent.go +++ b/controller/limits/agent.go @@ -138,7 +138,7 @@ func (a *Agent) enforce(u *metrics.Usage) error { } defer func() { _ = trx.Rollback() }() - if enforce, warning, err := a.checkAccountLimit(u.AccountId); err == nil { + if enforce, warning, rxBytes, txBytes, err := a.checkAccountLimit(u.AccountId); err == nil { if enforce { enforced := false var enforcedAt time.Time @@ -159,9 +159,16 @@ func (a *Agent) enforce(u *metrics.Usage) error { if err != nil { return err } - - logrus.Warnf("enforcing account limit for '#%d'", u.AccountId) - + acct, err := a.str.GetAccount(int(u.AccountId), trx) + if err != nil { + return err + } + // run account limit actions + for _, action := range a.acctLimitActions { + if err := action.HandleAccount(acct, rxBytes, txBytes, a.cfg.Bandwidth.PerAccount); err != nil { + return err + } + } if err := trx.Commit(); err != nil { return err } @@ -189,9 +196,16 @@ func (a *Agent) enforce(u *metrics.Usage) error { if err != nil { return err } - - logrus.Warnf("warning account '#%d'", u.AccountId) - + acct, err := a.str.GetAccount(int(u.AccountId), trx) + if err != nil { + return err + } + // run account warning actions + for _, action := range a.acctWarningActions { + if err := action.HandleAccount(acct, rxBytes, txBytes, a.cfg.Bandwidth.PerAccount); err != nil { + return err + } + } if err := trx.Commit(); err != nil { return err } @@ -200,7 +214,7 @@ func (a *Agent) enforce(u *metrics.Usage) error { } } else { - if enforce, warning, err := a.checkEnvironmentLimit(u.EnvironmentId); err == nil { + if enforce, warning, rxBytes, txBytes, err := a.checkEnvironmentLimit(u.EnvironmentId); err == nil { if enforce { enforced := false var enforcedAt time.Time @@ -221,9 +235,16 @@ func (a *Agent) enforce(u *metrics.Usage) error { if err != nil { return err } - - logrus.Warnf("enforcing environment limit for environment '#%d'", u.EnvironmentId) - + env, err := a.str.GetEnvironment(int(u.EnvironmentId), trx) + if err != nil { + return err + } + // run environment limit actions + for _, action := range a.envLimitActions { + if err := action.HandleEnvironment(env, rxBytes, txBytes, a.cfg.Bandwidth.PerEnvironment); err != nil { + return err + } + } if err := trx.Commit(); err != nil { return err } @@ -251,9 +272,16 @@ func (a *Agent) enforce(u *metrics.Usage) error { if err != nil { return err } - - logrus.Warnf("warning environment '#%d'", u.EnvironmentId) - + env, err := a.str.GetEnvironment(int(u.EnvironmentId), trx) + if err != nil { + return err + } + // run environment warning actions + for _, action := range a.envWarningActions { + if err := action.HandleEnvironment(env, rxBytes, txBytes, a.cfg.Bandwidth.PerEnvironment); err != nil { + return err + } + } if err := trx.Commit(); err != nil { return err } @@ -262,7 +290,7 @@ func (a *Agent) enforce(u *metrics.Usage) error { } } else { - if enforce, warning, err := a.checkShareLimit(u.ShareToken); err == nil { + if enforce, warning, rxBytes, txBytes, err := a.checkShareLimit(u.ShareToken); err == nil { if enforce { shr, err := a.str.FindShareWithToken(u.ShareToken, trx) if err != nil { @@ -288,9 +316,12 @@ func (a *Agent) enforce(u *metrics.Usage) error { if err != nil { return err } - - logrus.Warnf("enforcing share limit for share '%v'", shr.Token) - + // run share limit actions + for _, action := range a.shrLimitActions { + if err := action.HandleShare(shr, rxBytes, txBytes, a.cfg.Bandwidth.PerShare); err != nil { + return err + } + } if err := trx.Commit(); err != nil { return err } @@ -323,9 +354,12 @@ func (a *Agent) enforce(u *metrics.Usage) error { if err != nil { return err } - - logrus.Warnf("warning share '%v'", shr.Token) - + // run share warning actions + for _, action := range a.shrWarningActions { + if err := action.HandleShare(shr, rxBytes, txBytes, a.cfg.Bandwidth.PerShare); err != nil { + return err + } + } if err := trx.Commit(); err != nil { return err } @@ -362,12 +396,15 @@ func (a *Agent) relax() error { if sljs, err := a.str.FindAllLatestShareLimitJournal(trx); err == nil { for _, slj := range sljs { if shr, err := a.str.GetShare(slj.ShareId, trx); err == nil { - switch slj.Action { - case store.WarningAction: - if enforce, warning, err := a.checkShareLimit(shr.Token); err == nil { + if slj.Action == store.WarningAction || slj.Action == store.LimitAction { + if enforce, warning, rxBytes, txBytes, err := a.checkShareLimit(shr.Token); err == nil { if !enforce && !warning { - logrus.Infof("relaxing warning for share '%v'", shr.Token) - + // run relax actions for share + for _, action := range a.shrRelaxActions { + if err := action.HandleShare(shr, rxBytes, txBytes, a.cfg.Bandwidth.PerShare); err != nil { + return err + } + } if err := a.str.DeleteShareLimitJournalForShare(shr.Id, trx); err == nil { commit = true } else { @@ -379,23 +416,6 @@ func (a *Agent) relax() error { } else { logrus.Errorf("error checking share limit for '%v': %v", shr.Token, err) } - - case store.LimitAction: - if enforce, warning, err := a.checkShareLimit(shr.Token); err == nil { - if !enforce && !warning { - logrus.Infof("relaxing limit for share '%v'", shr.Token) - - if err := a.str.DeleteShareLimitJournalForShare(shr.Id, trx); err == nil { - commit = true - } else { - logrus.Errorf("error deleting share_limit_journal for '%v': %v", shr.Token, err) - } - } else { - logrus.Infof("share '%v' still over limit", shr.Token) - } - } else { - logrus.Errorf("error checking share limit for '%v': %v", shr.Token, err) - } } } else { logrus.Errorf("error getting share for '#%d': %v", slj.ShareId, err) @@ -408,29 +428,15 @@ func (a *Agent) relax() error { if eljs, err := a.str.FindAllLatestEnvironmentLimitJournal(trx); err == nil { for _, elj := range eljs { if env, err := a.str.GetEnvironment(elj.EnvironmentId, trx); err == nil { - switch elj.Action { - case store.WarningAction: - if enforce, warning, err := a.checkEnvironmentLimit(int64(elj.EnvironmentId)); err == nil { + if elj.Action == store.WarningAction || elj.Action == store.LimitAction { + if enforce, warning, rxBytes, txBytes, err := a.checkEnvironmentLimit(int64(elj.EnvironmentId)); err == nil { if !enforce && !warning { - logrus.Infof("relaxing warning for environment '%v'", env.ZId) - - if err := a.str.DeleteEnvironmentLimitJournalForEnvironment(env.Id, trx); err == nil { - commit = true - } else { - logrus.Errorf("error deleteing environment_limit_journal for '%v': %v", env.ZId, err) + // run relax actions for environment + for _, action := range a.envRelaxActions { + if err := action.HandleEnvironment(env, rxBytes, txBytes, a.cfg.Bandwidth.PerEnvironment); err != nil { + return err + } } - } else { - logrus.Infof("environment '%v' still over limit", env.ZId) - } - } else { - logrus.Errorf("error checking environment limit for '%v': %v", env.ZId, err) - } - - case store.LimitAction: - if enforce, warning, err := a.checkEnvironmentLimit(int64(elj.EnvironmentId)); err == nil { - if !enforce && !warning { - logrus.Infof("relaxing limit for environment '%v'", env.ZId) - if err := a.str.DeleteEnvironmentLimitJournalForEnvironment(env.Id, trx); err == nil { commit = true } else { @@ -454,29 +460,15 @@ func (a *Agent) relax() error { if aljs, err := a.str.FindAllLatestAccountLimitJournal(trx); err == nil { for _, alj := range aljs { if acct, err := a.str.GetAccount(alj.AccountId, trx); err == nil { - switch alj.Action { - case store.WarningAction: - if enforce, warning, err := a.checkAccountLimit(int64(alj.AccountId)); err == nil { + if alj.Action == store.WarningAction || alj.Action == store.LimitAction { + if enforce, warning, rxBytes, txBytes, err := a.checkAccountLimit(int64(alj.AccountId)); err == nil { if !enforce && !warning { - logrus.Infof("relaxing warning for account '%v'", acct.Email) - - if err := a.str.DeleteAccountLimitJournalForAccount(acct.Id, trx); err == nil { - commit = true - } else { - logrus.Errorf("error deleting account_limit_journal for '%v': %v", acct.Email, err) + // run relax actions for account + for _, action := range a.acctRelaxActions { + if err := action.HandleAccount(acct, rxBytes, txBytes, a.cfg.Bandwidth.PerAccount); err != nil { + return err + } } - } else { - logrus.Infof("account '%v' still over limit", acct.Email) - } - } else { - logrus.Errorf("error checking account limit for '%v': %v", acct.Email, err) - } - - case store.LimitAction: - if enforce, warning, err := a.checkAccountLimit(int64(alj.AccountId)); err == nil { - if !enforce && !warning { - logrus.Infof("relaxing limit for account '%v'", acct.Email) - if err := a.str.DeleteAccountLimitJournalForAccount(acct.Id, trx); err == nil { commit = true } else { @@ -506,7 +498,7 @@ func (a *Agent) relax() error { return nil } -func (a *Agent) checkAccountLimit(acctId int64) (enforce, warning bool, err error) { +func (a *Agent) checkAccountLimit(acctId int64) (enforce, warning bool, rxBytes, txBytes int64, err error) { period := 24 * time.Hour limit := DefaultBandwidthPerPeriod() if a.cfg.Bandwidth != nil && a.cfg.Bandwidth.PerAccount != nil { @@ -521,10 +513,10 @@ func (a *Agent) checkAccountLimit(acctId int64) (enforce, warning bool, err erro } enforce, warning = a.checkLimit(limit, rx, tx) - return enforce, warning, nil + return enforce, warning, rx, tx, nil } -func (a *Agent) checkEnvironmentLimit(envId int64) (enforce, warning bool, err error) { +func (a *Agent) checkEnvironmentLimit(envId int64) (enforce, warning bool, rxBytes, txBytes int64, err error) { period := 24 * time.Hour limit := DefaultBandwidthPerPeriod() if a.cfg.Bandwidth != nil && a.cfg.Bandwidth.PerEnvironment != nil { @@ -539,10 +531,10 @@ func (a *Agent) checkEnvironmentLimit(envId int64) (enforce, warning bool, err e } enforce, warning = a.checkLimit(limit, rx, tx) - return enforce, warning, nil + return enforce, warning, rx, tx, nil } -func (a *Agent) checkShareLimit(shrToken string) (enforce, warning bool, err error) { +func (a *Agent) checkShareLimit(shrToken string) (enforce, warning bool, rxBytes, txBytes int64, err error) { period := 24 * time.Hour limit := DefaultBandwidthPerPeriod() if a.cfg.Bandwidth != nil && a.cfg.Bandwidth.PerShare != nil { @@ -561,7 +553,7 @@ func (a *Agent) checkShareLimit(shrToken string) (enforce, warning bool, err err logrus.Debugf("'%v': %v", shrToken, a.describeLimit(limit, rx, tx)) } - return enforce, warning, nil + return enforce, warning, rx, tx, nil } func (a *Agent) checkLimit(cfg *BandwidthPerPeriod, rx, tx int64) (enforce, warning bool) {