diff --git a/controller/limits/agent.go b/controller/limits/agent.go index 936a9f9c..0237ca99 100644 --- a/controller/limits/agent.go +++ b/controller/limits/agent.go @@ -92,10 +92,14 @@ mainLoop: for { select { case usage := <-a.queue: - a.enforce(usage) + if err := a.enforce(usage); err != nil { + logrus.Errorf("error running enforcement: %v", err) + } case <-time.After(a.cfg.Cycle): - logrus.Info("inspection cycle") + if err := a.relax(); err != nil { + logrus.Errorf("error running relax cycle: %v", err) + } case <-a.close: close(a.join) @@ -111,7 +115,7 @@ func (a *Agent) enforce(u *metrics.Usage) error { } defer func() { _ = trx.Rollback() }() - if enforce, warning, err := a.checkAccountLimits(u, trx); err == nil { + if enforce, warning, err := a.checkAccountLimit(u.AccountId); err == nil { if enforce { enforced := false var enforcedAt time.Time @@ -173,7 +177,7 @@ func (a *Agent) enforce(u *metrics.Usage) error { } } else { - if enforce, warning, err := a.checkEnvironmentLimit(u, trx); err == nil { + if enforce, warning, err := a.checkEnvironmentLimit(u.EnvironmentId); err == nil { if enforce { enforced := false var enforcedAt time.Time @@ -235,7 +239,7 @@ func (a *Agent) enforce(u *metrics.Usage) error { } } else { - if enforce, warning, err := a.checkShareLimit(u); err == nil { + if enforce, warning, err := a.checkShareLimit(u.ShareToken); err == nil { if enforce { shr, err := a.str.FindShareWithToken(u.ShareToken, trx) if err != nil { @@ -321,12 +325,165 @@ func (a *Agent) enforce(u *metrics.Usage) error { return nil } -func (a *Agent) checkAccountLimits(u *metrics.Usage, trx *sqlx.Tx) (enforce, warning bool, err error) { - acct, err := a.str.GetAccount(int(u.AccountId), trx) +func (a *Agent) relax() error { + logrus.Info("relaxing") + + trx, err := a.str.Begin() if err != nil { - return false, false, errors.Wrapf(err, "error getting account '%d'", u.AccountId) + return errors.Wrap(err, "error starting transaction") + } + defer func() { _ = trx.Rollback() }() + + commit := false + + if sljs, err := a.str.FindAllLatestShareLimitJournal(trx); err == nil { + for _, slj := range sljs { + if shr, err := a.str.GetShare(slj.ShareId, trx); err == nil { + switch slj.Action { + case store.WarningAction: + if enforce, warning, err := a.checkShareLimit(shr.Token); err == nil { + if !enforce && !warning { + logrus.Infof("relaxing warning for share '%v'", shr.Token) + + if err := a.str.DeleteShareLimitJournalForShare(shr.Id, trx); err == nil { + commit = true + } else { + logrus.Errorf("error deleting share_limit_journal for '%v'", shr.Token) + } + } else { + logrus.Infof("share '%v' still over limit", shr.Token) + } + } else { + logrus.Errorf("error checking share limit for '%v': %v", shr.Token, err) + } + + case store.LimitAction: + if enforce, warning, err := a.checkShareLimit(shr.Token); err == nil { + if !enforce && !warning { + logrus.Infof("relaxing limit for share '%v'", shr.Token) + + if err := a.str.DeleteShareLimitJournalForShare(shr.Id, trx); err == nil { + commit = true + } else { + logrus.Errorf("error deleting share_limit_journal for '%v': %v", shr.Token, err) + } + } else { + logrus.Infof("share '%v' still over limit", shr.Token) + } + } else { + logrus.Errorf("error checking share limit for '%v': %v", shr.Token, err) + } + } + } else { + logrus.Errorf("error getting share for '#%d': %v", slj.ShareId, err) + } + } + } else { + return err } + if eljs, err := a.str.FindAllLatestEnvironmentLimitJournal(trx); err == nil { + for _, elj := range eljs { + if env, err := a.str.GetEnvironment(elj.EnvironmentId, trx); err == nil { + switch elj.Action { + case store.WarningAction: + if enforce, warning, err := a.checkEnvironmentLimit(int64(elj.EnvironmentId)); err == nil { + if !enforce && !warning { + logrus.Infof("relaxing warning for environment '%v'", env.ZId) + + if err := a.str.DeleteEnvironmentLimitJournalForEnvironment(env.Id, trx); err == nil { + commit = true + } else { + logrus.Errorf("error deleteing environment_limit_journal for '%v': %v", env.ZId, err) + } + } else { + logrus.Infof("environment '%v' still over limit", env.ZId) + } + } else { + logrus.Errorf("error checking environment limit for '%v': %v", env.ZId, err) + } + + case store.LimitAction: + if enforce, warning, err := a.checkEnvironmentLimit(int64(elj.EnvironmentId)); err == nil { + if !enforce && !warning { + logrus.Infof("relaxing limit for environment '%v'", env.ZId) + + if err := a.str.DeleteEnvironmentLimitJournalForEnvironment(env.Id, trx); err == nil { + commit = true + } else { + logrus.Errorf("error deleteing environment_limit_journal for '%v': %v", env.ZId, err) + } + } else { + logrus.Infof("environment '%v' still over limit", env.ZId) + } + } else { + logrus.Errorf("error checking environment limit for '%v': %v", env.ZId, err) + } + } + } else { + logrus.Errorf("error getting environment for '#%d': %v", elj.EnvironmentId, err) + } + } + } else { + return err + } + + if aljs, err := a.str.FindAllLatestAccountLimitJournal(trx); err == nil { + for _, alj := range aljs { + if acct, err := a.str.GetAccount(alj.AccountId, trx); err == nil { + switch alj.Action { + case store.WarningAction: + if enforce, warning, err := a.checkAccountLimit(int64(alj.AccountId)); err == nil { + if !enforce && !warning { + logrus.Infof("relaxing warning for account '%v'", acct.Email) + + if err := a.str.DeleteAccountLimitJournalForAccount(acct.Id, trx); err == nil { + commit = true + } else { + logrus.Errorf("error deleting account_limit_journal for '%v': %v", acct.Email, err) + } + } else { + logrus.Infof("account '%v' still over limit", acct.Email) + } + } else { + logrus.Errorf("error checking account limit for '%v': %v", acct.Email, err) + } + + case store.LimitAction: + if enforce, warning, err := a.checkAccountLimit(int64(alj.AccountId)); err == nil { + if !enforce && !warning { + logrus.Infof("relaxing limit for account '%v'", acct.Email) + + if err := a.str.DeleteAccountLimitJournalForAccount(acct.Id, trx); err == nil { + commit = true + } else { + logrus.Errorf("error deleting account_limit_journal for '%v': %v", acct.Email, err) + } + } else { + logrus.Infof("account '%v' still over limit", acct.Email) + } + } else { + logrus.Errorf("error checking account limit for '%v': %v", acct.Email, err) + } + } + } else { + logrus.Errorf("error getting account for '#%d': %v", alj.AccountId, err) + } + } + } else { + return err + } + + if commit { + if err := trx.Commit(); err != nil { + return err + } + } + + return nil +} + +func (a *Agent) checkAccountLimit(acctId int64) (enforce, warning bool, err error) { period := 24 * time.Hour limit := DefaultBandwidthPerPeriod() if a.cfg.Bandwidth != nil && a.cfg.Bandwidth.PerAccount != nil { @@ -335,25 +492,16 @@ func (a *Agent) checkAccountLimits(u *metrics.Usage, trx *sqlx.Tx) (enforce, war if limit.Period > 0 { period = limit.Period } - rx, tx, err := a.ifx.totalRxTxForAccount(u.AccountId, period) + rx, tx, err := a.ifx.totalRxTxForAccount(acctId, period) if err != nil { logrus.Error(err) } enforce, warning = a.checkLimit(limit, rx, tx) - if enforce || warning { - logrus.Debugf("'%v': %v", acct.Email, a.describeLimit(limit, rx, tx)) - } - return enforce, warning, nil } -func (a *Agent) checkEnvironmentLimit(u *metrics.Usage, trx *sqlx.Tx) (enforce, warning bool, err error) { - env, err := a.str.GetEnvironment(int(u.EnvironmentId), trx) - if err != nil { - return false, false, errors.Wrapf(err, "error getting account '%d'", u.EnvironmentId) - } - +func (a *Agent) checkEnvironmentLimit(envId int64) (enforce, warning bool, err error) { period := 24 * time.Hour limit := DefaultBandwidthPerPeriod() if a.cfg.Bandwidth != nil && a.cfg.Bandwidth.PerEnvironment != nil { @@ -362,20 +510,16 @@ func (a *Agent) checkEnvironmentLimit(u *metrics.Usage, trx *sqlx.Tx) (enforce, if limit.Period > 0 { period = limit.Period } - rx, tx, err := a.ifx.totalRxTxForEnvironment(u.EnvironmentId, period) + rx, tx, err := a.ifx.totalRxTxForEnvironment(envId, period) if err != nil { logrus.Error(err) } enforce, warning = a.checkLimit(limit, rx, tx) - if enforce || warning { - logrus.Debugf("'%v': %v", env.ZId, a.describeLimit(limit, rx, tx)) - } - return enforce, warning, nil } -func (a *Agent) checkShareLimit(u *metrics.Usage) (enforce, warning bool, err error) { +func (a *Agent) checkShareLimit(shrToken string) (enforce, warning bool, err error) { period := 24 * time.Hour limit := DefaultBandwidthPerPeriod() if a.cfg.Bandwidth != nil && a.cfg.Bandwidth.PerShare != nil { @@ -384,14 +528,14 @@ func (a *Agent) checkShareLimit(u *metrics.Usage) (enforce, warning bool, err er if limit.Period > 0 { period = limit.Period } - rx, tx, err := a.ifx.totalRxTxForShare(u.ShareToken, period) + rx, tx, err := a.ifx.totalRxTxForShare(shrToken, period) if err != nil { logrus.Error(err) } enforce, warning = a.checkLimit(limit, rx, tx) if enforce || warning { - logrus.Debugf("'%v': %v", u.ShareToken, a.describeLimit(limit, rx, tx)) + logrus.Debugf("'%v': %v", shrToken, a.describeLimit(limit, rx, tx)) } return enforce, warning, nil diff --git a/controller/store/accountLimitJournal.go b/controller/store/accountLimitJournal.go index 66308bbc..80194efa 100644 --- a/controller/store/accountLimitJournal.go +++ b/controller/store/accountLimitJournal.go @@ -44,15 +44,22 @@ func (str *Store) FindLatestAccountLimitJournal(acctId int, trx *sqlx.Tx) (*Acco func (str *Store) FindAllLatestAccountLimitJournal(trx *sqlx.Tx) ([]*AccountLimitJournal, error) { rows, err := trx.Queryx("select id, account_id, rx_bytes, tx_bytes, action, created_at, updated_at from account_limit_journal where id in (select max(id) as id from account_limit_journal group by account_id)") if err != nil { - return nil, errors.Wrap(err, "error selecting distinct account_limit_jounal") + return nil, errors.Wrap(err, "error selecting all latest account_limit_journal") } - var is []*AccountLimitJournal + var aljs []*AccountLimitJournal for rows.Next() { - i := &AccountLimitJournal{} - if err := rows.StructScan(i); err != nil { + alj := &AccountLimitJournal{} + if err := rows.StructScan(alj); err != nil { return nil, errors.Wrap(err, "error scanning account_limit_journal") } - is = append(is, i) + aljs = append(aljs, alj) } - return is, nil + return aljs, nil +} + +func (str *Store) DeleteAccountLimitJournalForAccount(acctId int, trx *sqlx.Tx) error { + if _, err := trx.Exec("delete from account_limit_journal where account_id = $1", acctId); err != nil { + return errors.Wrapf(err, "error deleting account_limit journal for '#%d'", acctId) + } + return nil } diff --git a/controller/store/environmentLimitJournal.go b/controller/store/environmentLimitJournal.go index 75794185..5a7a2963 100644 --- a/controller/store/environmentLimitJournal.go +++ b/controller/store/environmentLimitJournal.go @@ -40,3 +40,26 @@ func (str *Store) FindLatestEnvironmentLimitJournal(envId int, trx *sqlx.Tx) (*E } return j, nil } + +func (str *Store) FindAllLatestEnvironmentLimitJournal(trx *sqlx.Tx) ([]*EnvironmentLimitJournal, error) { + rows, err := trx.Queryx("select id, environment_id, rx_bytes, tx_bytes, action, created_at, updated_at from environment_limit_journal where id in (select max(id) as id from environment_limit_journal group by environment_id)") + if err != nil { + return nil, errors.Wrap(err, "error selecting all latest environment_limit_journal") + } + var eljs []*EnvironmentLimitJournal + for rows.Next() { + elj := &EnvironmentLimitJournal{} + if err := rows.StructScan(elj); err != nil { + return nil, errors.Wrap(err, "error scanning environment_limit_journal") + } + eljs = append(eljs, elj) + } + return eljs, nil +} + +func (str *Store) DeleteEnvironmentLimitJournalForEnvironment(envId int, trx *sqlx.Tx) error { + if _, err := trx.Exec("delete from environment_limit_journal where environment_id = $1", envId); err != nil { + return errors.Wrapf(err, "error deleteing environment_limit_journal for '#%d'", envId) + } + return nil +} diff --git a/controller/store/shareLimitJournal.go b/controller/store/shareLimitJournal.go index 59891206..7dcc351f 100644 --- a/controller/store/shareLimitJournal.go +++ b/controller/store/shareLimitJournal.go @@ -13,8 +13,8 @@ type ShareLimitJournal struct { Action LimitJournalAction } -func (str *Store) CreateShareLimitJournal(j *ShareLimitJournal, tx *sqlx.Tx) (int, error) { - stmt, err := tx.Prepare("insert into share_limit_journal (share_id, rx_bytes, tx_bytes, action) values ($1, $2, $3, $4) returning id") +func (str *Store) CreateShareLimitJournal(j *ShareLimitJournal, trx *sqlx.Tx) (int, error) { + stmt, err := trx.Prepare("insert into share_limit_journal (share_id, rx_bytes, tx_bytes, action) values ($1, $2, $3, $4) returning id") if err != nil { return 0, errors.Wrap(err, "error preparing share_limit_journal insert statement") } @@ -33,10 +33,33 @@ func (str *Store) IsShareLimitJournalEmpty(shrId int, trx *sqlx.Tx) (bool, error return count == 0, nil } -func (str *Store) FindLatestShareLimitJournal(shrId int, tx *sqlx.Tx) (*ShareLimitJournal, error) { +func (str *Store) FindLatestShareLimitJournal(shrId int, trx *sqlx.Tx) (*ShareLimitJournal, error) { j := &ShareLimitJournal{} - if err := tx.QueryRowx("select * from share_limit_journal where share_id = $1 order by created_at desc limit 1", shrId).StructScan(j); err != nil { + if err := trx.QueryRowx("select * from share_limit_journal where share_id = $1 order by created_at desc limit 1", shrId).StructScan(j); err != nil { return nil, errors.Wrap(err, "error finding share_limit_journal by share_id") } return j, nil } + +func (str *Store) FindAllLatestShareLimitJournal(trx *sqlx.Tx) ([]*ShareLimitJournal, error) { + rows, err := trx.Queryx("select id, share_id, rx_bytes, tx_bytes, action, created_at, updated_at from share_limit_journal where id in (select max(id) as id from share_limit_journal group by share_id)") + if err != nil { + return nil, errors.Wrap(err, "error selecting all latest share_limit_journal") + } + var sljs []*ShareLimitJournal + for rows.Next() { + slj := &ShareLimitJournal{} + if err := rows.StructScan(slj); err != nil { + return nil, errors.Wrap(err, "error scanning share_limit_journal") + } + sljs = append(sljs, slj) + } + return sljs, nil +} + +func (str *Store) DeleteShareLimitJournalForShare(shrId int, trx *sqlx.Tx) error { + if _, err := trx.Exec("delete from share_limit_journal where share_id = $1", shrId); err != nil { + return errors.Wrapf(err, "error deleting share_limit_journal for '#%d'", shrId) + } + return nil +}