better support for scoped/unscoped bandwidth limit coexistence (#606)

This commit is contained in:
Michael Quigley 2024-06-06 13:49:36 -04:00
parent cb4afd4a0f
commit bee5356e3c
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
8 changed files with 84 additions and 45 deletions

View File

@ -379,14 +379,14 @@ func (a *Agent) enforce(u *metrics.Usage) error {
switch exceededLc.GetLimitAction() {
case store.LimitLimitAction:
for _, limitAction := range a.limitActions {
if err := limitAction.HandleAccount(acct, rxBytes, txBytes, exceededLc, trx); err != nil {
if err := limitAction.HandleAccount(acct, rxBytes, txBytes, exceededLc, ul, trx); err != nil {
return errors.Wrapf(err, "%v", reflect.TypeOf(limitAction).String())
}
}
case store.WarningLimitAction:
for _, warningAction := range a.warningActions {
if err := warningAction.HandleAccount(acct, rxBytes, txBytes, exceededLc, trx); err != nil {
if err := warningAction.HandleAccount(acct, rxBytes, txBytes, exceededLc, ul, trx); err != nil {
return errors.Wrapf(err, "%v", reflect.TypeOf(warningAction).String())
}
}
@ -420,11 +420,18 @@ func (a *Agent) relax() error {
})
accounts := make(map[int]*store.Account)
uls := make(map[int]*userLimits)
for _, bwje := range bwjes {
if _, found := accounts[bwje.AccountId]; !found {
if acct, err := a.str.GetAccount(bwje.AccountId, trx); err == nil {
accounts[bwje.AccountId] = acct
ul, err := a.getUserLimits(acct.Id, trx)
if err != nil {
return errors.Wrapf(err, "error getting user limits for '%v'", acct.Email)
}
uls[bwje.AccountId] = ul
} else {
return err
}
@ -465,7 +472,7 @@ func (a *Agent) relax() error {
if bwc.GetLimitAction() == store.LimitLimitAction {
logrus.Infof("relaxing limit '%v' for '%v'", bwc.String(), accounts[bwje.AccountId].Email)
for _, action := range a.relaxActions {
if err := action.HandleAccount(accounts[bwje.AccountId], used.rx, used.tx, bwc, trx); err != nil {
if err := action.HandleAccount(accounts[bwje.AccountId], used.rx, used.tx, bwc, uls[bwje.AccountId], trx); err != nil {
return errors.Wrapf(err, "%v", reflect.TypeOf(action).String())
}
}
@ -486,7 +493,7 @@ func (a *Agent) relax() error {
}
}
} else {
logrus.Infof("account '%v' still over limit: '%v' with rx: %d, tx: %d", accounts[bwje.AccountId].Email, bwc, used.rx, used.tx)
logrus.Infof("account '%v' still over limit: '%v' with rx: %d, tx: %d, total: %d", accounts[bwje.AccountId].Email, bwc, used.rx, used.tx, used.rx+used.tx)
}
}
} else {

View File

@ -1,7 +1,6 @@
package limits
import (
"encoding/json"
"fmt"
"github.com/openziti/zrok/controller/store"
"github.com/openziti/zrok/sdk/golang/sdk"
@ -32,6 +31,10 @@ func (bc *configBandwidthClass) IsGlobal() bool {
return true
}
func (bc *configBandwidthClass) IsScoped() bool {
return false
}
func (bc *configBandwidthClass) GetLimitClassId() int {
return -1
}
@ -65,8 +68,16 @@ func (bc *configBandwidthClass) GetLimitAction() store.LimitAction {
}
func (bc *configBandwidthClass) String() string {
if out, err := json.Marshal(bc.bw); err == nil {
return fmt.Sprintf("Config<period: %d, %s, action: %s>", bc.periodInMinutes, string(out), bc.limitAction)
out := fmt.Sprintf("ConfigClass<periodMinutes: %d", bc.periodInMinutes)
if bc.bw.Rx > store.Unlimited {
out += fmt.Sprintf(", rxBytes: %d", bc.bw.Rx)
}
return "<<ERROR>>"
if bc.bw.Tx > store.Unlimited {
out += fmt.Sprintf(", txBytes: %d", bc.bw.Tx)
}
if bc.bw.Total > store.Unlimited {
out += fmt.Sprintf(", totalBytes: %d", bc.bw.Total)
}
out += fmt.Sprintf(", limitAction: %s>", bc.limitAction)
return out
}

View File

@ -4,6 +4,7 @@ import (
"github.com/jmoiron/sqlx"
"github.com/openziti/zrok/controller/store"
"github.com/openziti/zrok/controller/zrokEdgeSdk"
"github.com/openziti/zrok/sdk/golang/sdk"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@ -17,7 +18,7 @@ func newLimitAction(str *store.Store, zCfg *zrokEdgeSdk.Config) *limitAction {
return &limitAction{str, zCfg}
}
func (a *limitAction) HandleAccount(acct *store.Account, _, _ int64, _ store.BandwidthClass, trx *sqlx.Tx) error {
func (a *limitAction) HandleAccount(acct *store.Account, _, _ int64, bwc store.BandwidthClass, ul *userLimits, trx *sqlx.Tx) error {
logrus.Infof("limiting '%v'", acct.Email)
envs, err := a.str.FindEnvironmentsForAccount(acct.Id, trx)
@ -30,6 +31,7 @@ func (a *limitAction) HandleAccount(acct *store.Account, _, _ int64, _ store.Ban
return err
}
ignoreBackends := ul.ignoreBackends(bwc)
for _, env := range envs {
shrs, err := a.str.FindSharesForEnvironment(env.Id, trx)
if err != nil {
@ -37,10 +39,14 @@ func (a *limitAction) HandleAccount(acct *store.Account, _, _ int64, _ store.Ban
}
for _, shr := range shrs {
if err := zrokEdgeSdk.DeleteServicePoliciesDial(env.ZId, shr.Token, edge); err != nil {
return errors.Wrapf(err, "error deleting dial service policy for '%v'", shr.Token)
if _, ignore := ignoreBackends[sdk.BackendMode(shr.BackendMode)]; !ignore {
if err := zrokEdgeSdk.DeleteServicePoliciesDial(env.ZId, shr.Token, edge); err != nil {
return errors.Wrapf(err, "error deleting dial service policy for '%v'", shr.Token)
}
logrus.Infof("removed dial service policy for share '%v' of environment '%v'", shr.Token, env.ZId)
} else {
logrus.Debugf("ignoring share '%v' for '%v' with backend mode '%v'", shr.Token, acct.Email, shr.BackendMode)
}
logrus.Infof("removed dial service policy for share '%v' of environment '%v'", shr.Token, env.ZId)
}
}

View File

@ -6,13 +6,5 @@ import (
)
type AccountAction interface {
HandleAccount(a *store.Account, rxBytes, txBytes int64, limit store.BandwidthClass, trx *sqlx.Tx) error
}
type EnvironmentAction interface {
HandleEnvironment(e *store.Environment, rxBytes, txBytes int64, limit store.BandwidthClass, trx *sqlx.Tx) error
}
type ShareAction interface {
HandleShare(s *store.Share, rxBytes, txBytes int64, limit store.BandwidthClass, trx *sqlx.Tx) error
HandleAccount(a *store.Account, rxBytes, txBytes int64, bwc store.BandwidthClass, ul *userLimits, trx *sqlx.Tx) error
}

View File

@ -19,7 +19,7 @@ func newRelaxAction(str *store.Store, zCfg *zrokEdgeSdk.Config) *relaxAction {
return &relaxAction{str, zCfg}
}
func (a *relaxAction) HandleAccount(acct *store.Account, _, _ int64, _ store.BandwidthClass, trx *sqlx.Tx) error {
func (a *relaxAction) HandleAccount(acct *store.Account, _, _ int64, bwc store.BandwidthClass, _ *userLimits, trx *sqlx.Tx) error {
logrus.Infof("relaxing '%v'", acct.Email)
envs, err := a.str.FindEnvironmentsForAccount(acct.Id, trx)
@ -39,14 +39,17 @@ func (a *relaxAction) HandleAccount(acct *store.Account, _, _ int64, _ store.Ban
}
for _, shr := range shrs {
switch shr.ShareMode {
case string(sdk.PublicShareMode):
if err := relaxPublicShare(a.str, edge, shr, trx); err != nil {
return errors.Wrap(err, "error relaxing public share")
}
case string(sdk.PrivateShareMode):
if err := relaxPrivateShare(a.str, edge, shr, trx); err != nil {
return errors.Wrap(err, "error relaxing private share")
// TODO: when relaxing unscoped classes; need to not relax other scoped limits
if !bwc.IsScoped() || bwc.GetBackendMode() == sdk.BackendMode(shr.BackendMode) {
switch shr.ShareMode {
case string(sdk.PublicShareMode):
if err := relaxPublicShare(a.str, edge, shr, trx); err != nil {
return errors.Wrap(err, "error relaxing public share")
}
case string(sdk.PrivateShareMode):
if err := relaxPrivateShare(a.str, edge, shr, trx); err != nil {
return errors.Wrap(err, "error relaxing private share")
}
}
}
}

View File

@ -26,6 +26,19 @@ func (ul *userLimits) toBandwidthArray(backendMode sdk.BackendMode) []store.Band
return ul.bandwidth
}
func (ul *userLimits) ignoreBackends(bwc store.BandwidthClass) map[sdk.BackendMode]bool {
if bwc.IsScoped() {
ignoreBackends := make(map[sdk.BackendMode]bool)
for backendMode := range ul.scopes {
if backendMode != bwc.GetBackendMode() {
ignoreBackends[backendMode] = true
}
}
return ignoreBackends
}
return nil
}
func (a *Agent) getUserLimits(acctId int, trx *sqlx.Tx) (*userLimits, error) {
resource := newConfigResourceCountClass(a.cfg)
cfgBwcs := newConfigBandwidthClasses(a.cfg.Bandwidth)

View File

@ -19,27 +19,27 @@ func newWarningAction(cfg *emailUi.Config, str *store.Store) *warningAction {
return &warningAction{str, cfg}
}
func (a *warningAction) HandleAccount(acct *store.Account, rxBytes, txBytes int64, limit store.BandwidthClass, _ *sqlx.Tx) error {
func (a *warningAction) HandleAccount(acct *store.Account, rxBytes, txBytes int64, bwc store.BandwidthClass, _ *userLimits, _ *sqlx.Tx) error {
logrus.Infof("warning '%v'", acct.Email)
if a.cfg != nil {
rxLimit := "(store.Unlimited bytes)"
if limit.GetRxBytes() != store.Unlimited {
rxLimit = util.BytesToSize(limit.GetRxBytes())
if bwc.GetRxBytes() != store.Unlimited {
rxLimit = util.BytesToSize(bwc.GetRxBytes())
}
txLimit := "(store.Unlimited bytes)"
if limit.GetTxBytes() != store.Unlimited {
txLimit = util.BytesToSize(limit.GetTxBytes())
if bwc.GetTxBytes() != store.Unlimited {
txLimit = util.BytesToSize(bwc.GetTxBytes())
}
totalLimit := "(store.Unlimited bytes)"
if limit.GetTotalBytes() != store.Unlimited {
totalLimit = util.BytesToSize(limit.GetTotalBytes())
if bwc.GetTotalBytes() != store.Unlimited {
totalLimit = util.BytesToSize(bwc.GetTotalBytes())
}
detail := newDetailMessage()
detail = detail.append("Your account has received %v and sent %v (for a total of %v), which has triggered a transfer limit warning.", util.BytesToSize(rxBytes), util.BytesToSize(txBytes), util.BytesToSize(rxBytes+txBytes))
detail = detail.append("This zrok instance only allows an account to receive %v, send %v, totalling not more than %v for each %v.", rxLimit, txLimit, totalLimit, time.Duration(limit.GetPeriodMinutes())*time.Minute)
detail = detail.append("If you exceed the transfer limit, access to your shares will be temporarily disabled (until the last %v falls below the transfer limit)", time.Duration(limit.GetPeriodMinutes())*time.Minute)
detail = detail.append("This zrok instance only allows an account to receive %v, send %v, totalling not more than %v for each %v.", rxLimit, txLimit, totalLimit, time.Duration(bwc.GetPeriodMinutes())*time.Minute)
detail = detail.append("If you exceed the transfer limit, access to your shares will be temporarily disabled (until the last %v falls below the transfer limit)", time.Duration(bwc.GetPeriodMinutes())*time.Minute)
if err := sendLimitWarningEmail(a.cfg, acct.Email, detail); err != nil {
return errors.Wrapf(err, "error sending limit warning email to '%v'", acct.Email)

View File

@ -9,26 +9,29 @@ import (
const Unlimited = -1
type ResourceCountClass interface {
type BaseLimitClass interface {
IsGlobal() bool
GetLimitClassId() int
String() string
}
type ResourceCountClass interface {
BaseLimitClass
GetEnvironments() int
GetShares() int
GetReservedShares() int
GetUniqueNames() int
String() string
}
type BandwidthClass interface {
IsGlobal() bool
GetLimitClassId() int
BaseLimitClass
IsScoped() bool
GetBackendMode() sdk.BackendMode
GetPeriodMinutes() int
GetRxBytes() int64
GetTxBytes() int64
GetTotalBytes() int64
GetLimitAction() LimitAction
String() string
}
type LimitClass struct {
@ -50,6 +53,10 @@ func (lc LimitClass) IsGlobal() bool {
return false
}
func (lc LimitClass) IsScoped() bool {
return lc.BackendMode != nil
}
func (lc LimitClass) GetLimitClassId() int {
return lc.Id
}
@ -105,7 +112,7 @@ func (lc LimitClass) GetLimitAction() LimitAction {
}
func (lc LimitClass) String() string {
out := fmt.Sprintf("LimitClass<%d", lc.Id)
out := fmt.Sprintf("LimitClass<id: %d", lc.Id)
if lc.ShareMode != nil {
out += fmt.Sprintf(", shareMode: '%s'", *lc.ShareMode)
}