email subject (#279); record windowed transfer correctly in journals (#273); properly cycle the relax run when inbound usage is happenign (#276)

This commit is contained in:
Michael Quigley
2023-03-29 17:03:42 -04:00
parent 69990447c9
commit 6fc794ea50
2 changed files with 27 additions and 13 deletions

View File

@@ -149,6 +149,7 @@ func (a *Agent) run() {
logrus.Info("started")
defer logrus.Info("stopped")
lastCycle := time.Now()
mainLoop:
for {
select {
@@ -156,11 +157,18 @@ mainLoop:
if err := a.enforce(usage); err != nil {
logrus.Errorf("error running enforcement: %v", err)
}
if time.Since(lastCycle) > a.cfg.Cycle {
if err := a.relax(); err != nil {
logrus.Errorf("error running relax cycle: %v", err)
}
lastCycle = time.Now()
}
case <-time.After(a.cfg.Cycle):
if err := a.relax(); err != nil {
logrus.Errorf("error running relax cycle: %v", err)
}
lastCycle = time.Now()
case <-a.close:
close(a.join)
@@ -190,8 +198,8 @@ func (a *Agent) enforce(u *metrics.Usage) error {
if !enforced {
_, err := a.str.CreateAccountLimitJournal(&store.AccountLimitJournal{
AccountId: int(u.AccountId),
RxBytes: u.BackendRx,
TxBytes: u.BackendTx,
RxBytes: rxBytes,
TxBytes: txBytes,
Action: store.LimitAction,
}, trx)
if err != nil {
@@ -227,8 +235,8 @@ func (a *Agent) enforce(u *metrics.Usage) error {
if !warned {
_, err := a.str.CreateAccountLimitJournal(&store.AccountLimitJournal{
AccountId: int(u.AccountId),
RxBytes: u.BackendRx,
TxBytes: u.BackendTx,
RxBytes: rxBytes,
TxBytes: txBytes,
Action: store.WarningAction,
}, trx)
if err != nil {
@@ -266,8 +274,8 @@ func (a *Agent) enforce(u *metrics.Usage) error {
if !enforced {
_, err := a.str.CreateEnvironmentLimitJournal(&store.EnvironmentLimitJournal{
EnvironmentId: int(u.EnvironmentId),
RxBytes: u.BackendRx,
TxBytes: u.BackendTx,
RxBytes: rxBytes,
TxBytes: txBytes,
Action: store.LimitAction,
}, trx)
if err != nil {
@@ -303,8 +311,8 @@ func (a *Agent) enforce(u *metrics.Usage) error {
if !warned {
_, err := a.str.CreateEnvironmentLimitJournal(&store.EnvironmentLimitJournal{
EnvironmentId: int(u.EnvironmentId),
RxBytes: u.BackendRx,
TxBytes: u.BackendTx,
RxBytes: rxBytes,
TxBytes: txBytes,
Action: store.WarningAction,
}, trx)
if err != nil {
@@ -347,8 +355,8 @@ func (a *Agent) enforce(u *metrics.Usage) error {
if !enforced {
_, err := a.str.CreateShareLimitJournal(&store.ShareLimitJournal{
ShareId: shr.Id,
RxBytes: u.BackendRx,
TxBytes: u.BackendTx,
RxBytes: rxBytes,
TxBytes: txBytes,
Action: store.LimitAction,
}, trx)
if err != nil {
@@ -385,8 +393,8 @@ func (a *Agent) enforce(u *metrics.Usage) error {
if !warned {
_, err := a.str.CreateShareLimitJournal(&store.ShareLimitJournal{
ShareId: shr.Id,
RxBytes: u.BackendRx,
TxBytes: u.BackendTx,
RxBytes: rxBytes,
TxBytes: txBytes,
Action: store.WarningAction,
}, trx)
if err != nil {
@@ -444,6 +452,8 @@ func (a *Agent) relax() error {
return errors.Wrapf(err, "%v", reflect.TypeOf(action).String())
}
}
} else {
logrus.Infof("relaxing warning for '%v'", shr.Token)
}
if err := a.str.DeleteShareLimitJournalForShare(shr.Id, trx); err == nil {
commit = true
@@ -478,6 +488,8 @@ func (a *Agent) relax() error {
return errors.Wrapf(err, "%v", reflect.TypeOf(action).String())
}
}
} else {
logrus.Infof("relaxing warning for '%v'", env.ZId)
}
if err := a.str.DeleteEnvironmentLimitJournalForEnvironment(env.Id, trx); err == nil {
commit = true
@@ -512,6 +524,8 @@ func (a *Agent) relax() error {
return errors.Wrapf(err, "%v", reflect.TypeOf(action).String())
}
}
} else {
logrus.Infof("relaxing warning for '%v'", acct.Email)
}
if err := a.str.DeleteAccountLimitJournalForAccount(acct.Id, trx); err == nil {
commit = true