mirror of
https://github.com/openziti/zrok.git
synced 2025-06-23 11:11:48 +02:00
action execution logic (#276)
This commit is contained in:
parent
067d9901d6
commit
44cbb8491c
@ -138,7 +138,7 @@ func (a *Agent) enforce(u *metrics.Usage) error {
|
|||||||
}
|
}
|
||||||
defer func() { _ = trx.Rollback() }()
|
defer func() { _ = trx.Rollback() }()
|
||||||
|
|
||||||
if enforce, warning, err := a.checkAccountLimit(u.AccountId); err == nil {
|
if enforce, warning, rxBytes, txBytes, err := a.checkAccountLimit(u.AccountId); err == nil {
|
||||||
if enforce {
|
if enforce {
|
||||||
enforced := false
|
enforced := false
|
||||||
var enforcedAt time.Time
|
var enforcedAt time.Time
|
||||||
@ -159,9 +159,16 @@ func (a *Agent) enforce(u *metrics.Usage) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
acct, err := a.str.GetAccount(int(u.AccountId), trx)
|
||||||
logrus.Warnf("enforcing account limit for '#%d'", u.AccountId)
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// run account limit actions
|
||||||
|
for _, action := range a.acctLimitActions {
|
||||||
|
if err := action.HandleAccount(acct, rxBytes, txBytes, a.cfg.Bandwidth.PerAccount); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
if err := trx.Commit(); err != nil {
|
if err := trx.Commit(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -189,9 +196,16 @@ func (a *Agent) enforce(u *metrics.Usage) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
acct, err := a.str.GetAccount(int(u.AccountId), trx)
|
||||||
logrus.Warnf("warning account '#%d'", u.AccountId)
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// run account warning actions
|
||||||
|
for _, action := range a.acctWarningActions {
|
||||||
|
if err := action.HandleAccount(acct, rxBytes, txBytes, a.cfg.Bandwidth.PerAccount); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
if err := trx.Commit(); err != nil {
|
if err := trx.Commit(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -200,7 +214,7 @@ func (a *Agent) enforce(u *metrics.Usage) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
if enforce, warning, err := a.checkEnvironmentLimit(u.EnvironmentId); err == nil {
|
if enforce, warning, rxBytes, txBytes, err := a.checkEnvironmentLimit(u.EnvironmentId); err == nil {
|
||||||
if enforce {
|
if enforce {
|
||||||
enforced := false
|
enforced := false
|
||||||
var enforcedAt time.Time
|
var enforcedAt time.Time
|
||||||
@ -221,9 +235,16 @@ func (a *Agent) enforce(u *metrics.Usage) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
env, err := a.str.GetEnvironment(int(u.EnvironmentId), trx)
|
||||||
logrus.Warnf("enforcing environment limit for environment '#%d'", u.EnvironmentId)
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// run environment limit actions
|
||||||
|
for _, action := range a.envLimitActions {
|
||||||
|
if err := action.HandleEnvironment(env, rxBytes, txBytes, a.cfg.Bandwidth.PerEnvironment); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
if err := trx.Commit(); err != nil {
|
if err := trx.Commit(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -251,9 +272,16 @@ func (a *Agent) enforce(u *metrics.Usage) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
env, err := a.str.GetEnvironment(int(u.EnvironmentId), trx)
|
||||||
logrus.Warnf("warning environment '#%d'", u.EnvironmentId)
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// run environment warning actions
|
||||||
|
for _, action := range a.envWarningActions {
|
||||||
|
if err := action.HandleEnvironment(env, rxBytes, txBytes, a.cfg.Bandwidth.PerEnvironment); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
if err := trx.Commit(); err != nil {
|
if err := trx.Commit(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -262,7 +290,7 @@ func (a *Agent) enforce(u *metrics.Usage) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
if enforce, warning, err := a.checkShareLimit(u.ShareToken); err == nil {
|
if enforce, warning, rxBytes, txBytes, err := a.checkShareLimit(u.ShareToken); err == nil {
|
||||||
if enforce {
|
if enforce {
|
||||||
shr, err := a.str.FindShareWithToken(u.ShareToken, trx)
|
shr, err := a.str.FindShareWithToken(u.ShareToken, trx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -288,9 +316,12 @@ func (a *Agent) enforce(u *metrics.Usage) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// run share limit actions
|
||||||
logrus.Warnf("enforcing share limit for share '%v'", shr.Token)
|
for _, action := range a.shrLimitActions {
|
||||||
|
if err := action.HandleShare(shr, rxBytes, txBytes, a.cfg.Bandwidth.PerShare); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
if err := trx.Commit(); err != nil {
|
if err := trx.Commit(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -323,9 +354,12 @@ func (a *Agent) enforce(u *metrics.Usage) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// run share warning actions
|
||||||
logrus.Warnf("warning share '%v'", shr.Token)
|
for _, action := range a.shrWarningActions {
|
||||||
|
if err := action.HandleShare(shr, rxBytes, txBytes, a.cfg.Bandwidth.PerShare); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
if err := trx.Commit(); err != nil {
|
if err := trx.Commit(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -362,12 +396,15 @@ func (a *Agent) relax() error {
|
|||||||
if sljs, err := a.str.FindAllLatestShareLimitJournal(trx); err == nil {
|
if sljs, err := a.str.FindAllLatestShareLimitJournal(trx); err == nil {
|
||||||
for _, slj := range sljs {
|
for _, slj := range sljs {
|
||||||
if shr, err := a.str.GetShare(slj.ShareId, trx); err == nil {
|
if shr, err := a.str.GetShare(slj.ShareId, trx); err == nil {
|
||||||
switch slj.Action {
|
if slj.Action == store.WarningAction || slj.Action == store.LimitAction {
|
||||||
case store.WarningAction:
|
if enforce, warning, rxBytes, txBytes, err := a.checkShareLimit(shr.Token); err == nil {
|
||||||
if enforce, warning, err := a.checkShareLimit(shr.Token); err == nil {
|
|
||||||
if !enforce && !warning {
|
if !enforce && !warning {
|
||||||
logrus.Infof("relaxing warning for share '%v'", shr.Token)
|
// run relax actions for share
|
||||||
|
for _, action := range a.shrRelaxActions {
|
||||||
|
if err := action.HandleShare(shr, rxBytes, txBytes, a.cfg.Bandwidth.PerShare); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
if err := a.str.DeleteShareLimitJournalForShare(shr.Id, trx); err == nil {
|
if err := a.str.DeleteShareLimitJournalForShare(shr.Id, trx); err == nil {
|
||||||
commit = true
|
commit = true
|
||||||
} else {
|
} else {
|
||||||
@ -379,23 +416,6 @@ func (a *Agent) relax() error {
|
|||||||
} else {
|
} else {
|
||||||
logrus.Errorf("error checking share limit for '%v': %v", shr.Token, err)
|
logrus.Errorf("error checking share limit for '%v': %v", shr.Token, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
case store.LimitAction:
|
|
||||||
if enforce, warning, err := a.checkShareLimit(shr.Token); err == nil {
|
|
||||||
if !enforce && !warning {
|
|
||||||
logrus.Infof("relaxing limit for share '%v'", shr.Token)
|
|
||||||
|
|
||||||
if err := a.str.DeleteShareLimitJournalForShare(shr.Id, trx); err == nil {
|
|
||||||
commit = true
|
|
||||||
} else {
|
|
||||||
logrus.Errorf("error deleting share_limit_journal for '%v': %v", shr.Token, err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logrus.Infof("share '%v' still over limit", shr.Token)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logrus.Errorf("error checking share limit for '%v': %v", shr.Token, err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logrus.Errorf("error getting share for '#%d': %v", slj.ShareId, err)
|
logrus.Errorf("error getting share for '#%d': %v", slj.ShareId, err)
|
||||||
@ -408,29 +428,15 @@ func (a *Agent) relax() error {
|
|||||||
if eljs, err := a.str.FindAllLatestEnvironmentLimitJournal(trx); err == nil {
|
if eljs, err := a.str.FindAllLatestEnvironmentLimitJournal(trx); err == nil {
|
||||||
for _, elj := range eljs {
|
for _, elj := range eljs {
|
||||||
if env, err := a.str.GetEnvironment(elj.EnvironmentId, trx); err == nil {
|
if env, err := a.str.GetEnvironment(elj.EnvironmentId, trx); err == nil {
|
||||||
switch elj.Action {
|
if elj.Action == store.WarningAction || elj.Action == store.LimitAction {
|
||||||
case store.WarningAction:
|
if enforce, warning, rxBytes, txBytes, err := a.checkEnvironmentLimit(int64(elj.EnvironmentId)); err == nil {
|
||||||
if enforce, warning, err := a.checkEnvironmentLimit(int64(elj.EnvironmentId)); err == nil {
|
|
||||||
if !enforce && !warning {
|
if !enforce && !warning {
|
||||||
logrus.Infof("relaxing warning for environment '%v'", env.ZId)
|
// run relax actions for environment
|
||||||
|
for _, action := range a.envRelaxActions {
|
||||||
if err := a.str.DeleteEnvironmentLimitJournalForEnvironment(env.Id, trx); err == nil {
|
if err := action.HandleEnvironment(env, rxBytes, txBytes, a.cfg.Bandwidth.PerEnvironment); err != nil {
|
||||||
commit = true
|
return err
|
||||||
} else {
|
}
|
||||||
logrus.Errorf("error deleteing environment_limit_journal for '%v': %v", env.ZId, err)
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
logrus.Infof("environment '%v' still over limit", env.ZId)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logrus.Errorf("error checking environment limit for '%v': %v", env.ZId, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
case store.LimitAction:
|
|
||||||
if enforce, warning, err := a.checkEnvironmentLimit(int64(elj.EnvironmentId)); err == nil {
|
|
||||||
if !enforce && !warning {
|
|
||||||
logrus.Infof("relaxing limit for environment '%v'", env.ZId)
|
|
||||||
|
|
||||||
if err := a.str.DeleteEnvironmentLimitJournalForEnvironment(env.Id, trx); err == nil {
|
if err := a.str.DeleteEnvironmentLimitJournalForEnvironment(env.Id, trx); err == nil {
|
||||||
commit = true
|
commit = true
|
||||||
} else {
|
} else {
|
||||||
@ -454,29 +460,15 @@ func (a *Agent) relax() error {
|
|||||||
if aljs, err := a.str.FindAllLatestAccountLimitJournal(trx); err == nil {
|
if aljs, err := a.str.FindAllLatestAccountLimitJournal(trx); err == nil {
|
||||||
for _, alj := range aljs {
|
for _, alj := range aljs {
|
||||||
if acct, err := a.str.GetAccount(alj.AccountId, trx); err == nil {
|
if acct, err := a.str.GetAccount(alj.AccountId, trx); err == nil {
|
||||||
switch alj.Action {
|
if alj.Action == store.WarningAction || alj.Action == store.LimitAction {
|
||||||
case store.WarningAction:
|
if enforce, warning, rxBytes, txBytes, err := a.checkAccountLimit(int64(alj.AccountId)); err == nil {
|
||||||
if enforce, warning, err := a.checkAccountLimit(int64(alj.AccountId)); err == nil {
|
|
||||||
if !enforce && !warning {
|
if !enforce && !warning {
|
||||||
logrus.Infof("relaxing warning for account '%v'", acct.Email)
|
// run relax actions for account
|
||||||
|
for _, action := range a.acctRelaxActions {
|
||||||
if err := a.str.DeleteAccountLimitJournalForAccount(acct.Id, trx); err == nil {
|
if err := action.HandleAccount(acct, rxBytes, txBytes, a.cfg.Bandwidth.PerAccount); err != nil {
|
||||||
commit = true
|
return err
|
||||||
} else {
|
}
|
||||||
logrus.Errorf("error deleting account_limit_journal for '%v': %v", acct.Email, err)
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
logrus.Infof("account '%v' still over limit", acct.Email)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logrus.Errorf("error checking account limit for '%v': %v", acct.Email, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
case store.LimitAction:
|
|
||||||
if enforce, warning, err := a.checkAccountLimit(int64(alj.AccountId)); err == nil {
|
|
||||||
if !enforce && !warning {
|
|
||||||
logrus.Infof("relaxing limit for account '%v'", acct.Email)
|
|
||||||
|
|
||||||
if err := a.str.DeleteAccountLimitJournalForAccount(acct.Id, trx); err == nil {
|
if err := a.str.DeleteAccountLimitJournalForAccount(acct.Id, trx); err == nil {
|
||||||
commit = true
|
commit = true
|
||||||
} else {
|
} else {
|
||||||
@ -506,7 +498,7 @@ func (a *Agent) relax() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Agent) checkAccountLimit(acctId int64) (enforce, warning bool, err error) {
|
func (a *Agent) checkAccountLimit(acctId int64) (enforce, warning bool, rxBytes, txBytes int64, err error) {
|
||||||
period := 24 * time.Hour
|
period := 24 * time.Hour
|
||||||
limit := DefaultBandwidthPerPeriod()
|
limit := DefaultBandwidthPerPeriod()
|
||||||
if a.cfg.Bandwidth != nil && a.cfg.Bandwidth.PerAccount != nil {
|
if a.cfg.Bandwidth != nil && a.cfg.Bandwidth.PerAccount != nil {
|
||||||
@ -521,10 +513,10 @@ func (a *Agent) checkAccountLimit(acctId int64) (enforce, warning bool, err erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
enforce, warning = a.checkLimit(limit, rx, tx)
|
enforce, warning = a.checkLimit(limit, rx, tx)
|
||||||
return enforce, warning, nil
|
return enforce, warning, rx, tx, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Agent) checkEnvironmentLimit(envId int64) (enforce, warning bool, err error) {
|
func (a *Agent) checkEnvironmentLimit(envId int64) (enforce, warning bool, rxBytes, txBytes int64, err error) {
|
||||||
period := 24 * time.Hour
|
period := 24 * time.Hour
|
||||||
limit := DefaultBandwidthPerPeriod()
|
limit := DefaultBandwidthPerPeriod()
|
||||||
if a.cfg.Bandwidth != nil && a.cfg.Bandwidth.PerEnvironment != nil {
|
if a.cfg.Bandwidth != nil && a.cfg.Bandwidth.PerEnvironment != nil {
|
||||||
@ -539,10 +531,10 @@ func (a *Agent) checkEnvironmentLimit(envId int64) (enforce, warning bool, err e
|
|||||||
}
|
}
|
||||||
|
|
||||||
enforce, warning = a.checkLimit(limit, rx, tx)
|
enforce, warning = a.checkLimit(limit, rx, tx)
|
||||||
return enforce, warning, nil
|
return enforce, warning, rx, tx, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Agent) checkShareLimit(shrToken string) (enforce, warning bool, err error) {
|
func (a *Agent) checkShareLimit(shrToken string) (enforce, warning bool, rxBytes, txBytes int64, err error) {
|
||||||
period := 24 * time.Hour
|
period := 24 * time.Hour
|
||||||
limit := DefaultBandwidthPerPeriod()
|
limit := DefaultBandwidthPerPeriod()
|
||||||
if a.cfg.Bandwidth != nil && a.cfg.Bandwidth.PerShare != nil {
|
if a.cfg.Bandwidth != nil && a.cfg.Bandwidth.PerShare != nil {
|
||||||
@ -561,7 +553,7 @@ func (a *Agent) checkShareLimit(shrToken string) (enforce, warning bool, err err
|
|||||||
logrus.Debugf("'%v': %v", shrToken, a.describeLimit(limit, rx, tx))
|
logrus.Debugf("'%v': %v", shrToken, a.describeLimit(limit, rx, tx))
|
||||||
}
|
}
|
||||||
|
|
||||||
return enforce, warning, nil
|
return enforce, warning, rx, tx, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Agent) checkLimit(cfg *BandwidthPerPeriod, rx, tx int64) (enforce, warning bool) {
|
func (a *Agent) checkLimit(cfg *BandwidthPerPeriod, rx, tx int64) (enforce, warning bool) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user