From 141cb424e0dce16e419e37042785071045d43fa4 Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Thu, 30 May 2024 13:46:15 -0400 Subject: [PATCH] simplifying limit enforcement structures (#606) --- controller/limits/accountRelaxAction.go | 46 +++ controller/limits/agent.go | 296 +----------------- controller/limits/config.go | 18 +- controller/limits/environmentLimitAction.go | 41 --- controller/limits/environmentRelaxAction.go | 50 --- controller/limits/environmentWarningAction.go | 58 ---- controller/limits/shareLimitAction.go | 38 --- controller/limits/shareRelaxAction.go | 89 ------ controller/limits/shareWarningAction.go | 63 ---- 9 files changed, 55 insertions(+), 644 deletions(-) delete mode 100644 controller/limits/environmentLimitAction.go delete mode 100644 controller/limits/environmentRelaxAction.go delete mode 100644 controller/limits/environmentWarningAction.go delete mode 100644 controller/limits/shareLimitAction.go delete mode 100644 controller/limits/shareRelaxAction.go delete mode 100644 controller/limits/shareWarningAction.go diff --git a/controller/limits/accountRelaxAction.go b/controller/limits/accountRelaxAction.go index 8ad28354..d2b32380 100644 --- a/controller/limits/accountRelaxAction.go +++ b/controller/limits/accountRelaxAction.go @@ -2,6 +2,7 @@ package limits import ( "github.com/jmoiron/sqlx" + "github.com/openziti/edge-api/rest_management_api_client" "github.com/openziti/zrok/controller/store" "github.com/openziti/zrok/controller/zrokEdgeSdk" "github.com/openziti/zrok/sdk/golang/sdk" @@ -53,3 +54,48 @@ func (a *accountRelaxAction) HandleAccount(acct *store.Account, _, _ int64, _ *B return nil } + +func relaxPublicShare(str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement, shr *store.Share, trx *sqlx.Tx) error { + env, err := str.GetEnvironment(shr.EnvironmentId, trx) + if err != nil { + return errors.Wrap(err, "error finding environment") + } + + fe, err := str.FindFrontendPubliclyNamed(*shr.FrontendSelection, trx) + if err != nil { + return errors.Wrapf(err, "error finding frontend name '%v' for '%v'", *shr.FrontendSelection, shr.Token) + } + + if err := zrokEdgeSdk.CreateServicePolicyDial(env.ZId+"-"+shr.ZId+"-dial", shr.ZId, []string{fe.ZId}, zrokEdgeSdk.ZrokShareTags(shr.Token).SubTags, edge); err != nil { + return errors.Wrapf(err, "error creating dial service policy for '%v'", shr.Token) + } + logrus.Infof("added dial service policy for '%v'", shr.Token) + return nil +} + +func relaxPrivateShare(str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement, shr *store.Share, trx *sqlx.Tx) error { + fes, err := str.FindFrontendsForPrivateShare(shr.Id, trx) + if err != nil { + return errors.Wrapf(err, "error finding frontends for share '%v'", shr.Token) + } + for _, fe := range fes { + if fe.EnvironmentId != nil { + env, err := str.GetEnvironment(*fe.EnvironmentId, trx) + if err != nil { + return errors.Wrapf(err, "error getting environment for frontend '%v'", fe.Token) + } + + addlTags := map[string]interface{}{ + "zrokEnvironmentZId": env.ZId, + "zrokFrontendToken": fe.Token, + "zrokShareToken": shr.Token, + } + if err := zrokEdgeSdk.CreateServicePolicyDial(fe.Token+"-"+env.ZId+"-"+shr.ZId+"-dial", shr.ZId, []string{env.ZId}, addlTags, edge); err != nil { + return errors.Wrapf(err, "unable to create dial policy for frontend '%v'", fe.Token) + } + + logrus.Infof("added dial service policy for share '%v' to private frontend '%v'", shr.Token, fe.Token) + } + } + return nil +} diff --git a/controller/limits/agent.go b/controller/limits/agent.go index 13321369..6fddc103 100644 --- a/controller/limits/agent.go +++ b/controller/limits/agent.go @@ -24,12 +24,6 @@ type Agent struct { acctWarningActions []AccountAction acctLimitActions []AccountAction acctRelaxActions []AccountAction - envWarningActions []EnvironmentAction - envLimitActions []EnvironmentAction - envRelaxActions []EnvironmentAction - shrWarningActions []ShareAction - shrLimitActions []ShareAction - shrRelaxActions []ShareAction close chan struct{} join chan struct{} } @@ -44,12 +38,6 @@ func NewAgent(cfg *Config, ifxCfg *metrics.InfluxConfig, zCfg *zrokEdgeSdk.Confi acctWarningActions: []AccountAction{newAccountWarningAction(emailCfg, str)}, acctLimitActions: []AccountAction{newAccountLimitAction(str, zCfg)}, acctRelaxActions: []AccountAction{newAccountRelaxAction(str, zCfg)}, - envWarningActions: []EnvironmentAction{newEnvironmentWarningAction(emailCfg, str)}, - envLimitActions: []EnvironmentAction{newEnvironmentLimitAction(str, zCfg)}, - envRelaxActions: []EnvironmentAction{newEnvironmentRelaxAction(str, zCfg)}, - shrWarningActions: []ShareAction{newShareWarningAction(emailCfg, str)}, - shrLimitActions: []ShareAction{newShareLimitAction(str, zCfg)}, - shrRelaxActions: []ShareAction{newShareRelaxAction(str, zCfg)}, close: make(chan struct{}), join: make(chan struct{}), } @@ -314,7 +302,7 @@ func (a *Agent) enforce(u *metrics.Usage) error { } // run account limit actions for _, action := range a.acctLimitActions { - if err := action.HandleAccount(acct, rxBytes, txBytes, a.cfg.Bandwidth.PerAccount, trx); err != nil { + if err := action.HandleAccount(acct, rxBytes, txBytes, a.cfg.Bandwidth, trx); err != nil { return errors.Wrapf(err, "%v", reflect.TypeOf(action).String()) } } @@ -351,7 +339,7 @@ func (a *Agent) enforce(u *metrics.Usage) error { } // run account warning actions for _, action := range a.acctWarningActions { - if err := action.HandleAccount(acct, rxBytes, txBytes, a.cfg.Bandwidth.PerAccount, trx); err != nil { + if err := action.HandleAccount(acct, rxBytes, txBytes, a.cfg.Bandwidth, trx); err != nil { return errors.Wrapf(err, "%v", reflect.TypeOf(action).String()) } } @@ -361,168 +349,6 @@ func (a *Agent) enforce(u *metrics.Usage) error { } else { logrus.Debugf("already warned account '#%d' at %v", u.AccountId, warnedAt) } - - } else { - if enforce, warning, rxBytes, txBytes, err := a.checkEnvironmentLimit(u.EnvironmentId); err == nil { - if enforce { - enforced := false - var enforcedAt time.Time - if empty, err := a.str.IsEnvironmentLimitJournalEmpty(int(u.EnvironmentId), trx); err == nil && !empty { - if latest, err := a.str.FindLatestEnvironmentLimitJournal(int(u.EnvironmentId), trx); err == nil { - enforced = latest.Action == store.LimitLimitAction - enforcedAt = latest.UpdatedAt - } - } - - if !enforced { - _, err := a.str.CreateEnvironmentLimitJournal(&store.EnvironmentLimitJournal{ - EnvironmentId: int(u.EnvironmentId), - RxBytes: rxBytes, - TxBytes: txBytes, - Action: store.LimitLimitAction, - }, trx) - if err != nil { - return err - } - env, err := a.str.GetEnvironment(int(u.EnvironmentId), trx) - if err != nil { - return err - } - // run environment limit actions - for _, action := range a.envLimitActions { - if err := action.HandleEnvironment(env, rxBytes, txBytes, a.cfg.Bandwidth.PerEnvironment, trx); err != nil { - return errors.Wrapf(err, "%v", reflect.TypeOf(action).String()) - } - } - if err := trx.Commit(); err != nil { - return err - } - } else { - logrus.Debugf("already enforced limit for environment '#%d' at %v", u.EnvironmentId, enforcedAt) - } - - } else if warning { - warned := false - var warnedAt time.Time - if empty, err := a.str.IsEnvironmentLimitJournalEmpty(int(u.EnvironmentId), trx); err == nil && !empty { - if latest, err := a.str.FindLatestEnvironmentLimitJournal(int(u.EnvironmentId), trx); err == nil { - warned = latest.Action == store.WarningLimitAction || latest.Action == store.LimitLimitAction - warnedAt = latest.UpdatedAt - } - } - - if !warned { - _, err := a.str.CreateEnvironmentLimitJournal(&store.EnvironmentLimitJournal{ - EnvironmentId: int(u.EnvironmentId), - RxBytes: rxBytes, - TxBytes: txBytes, - Action: store.WarningLimitAction, - }, trx) - if err != nil { - return err - } - env, err := a.str.GetEnvironment(int(u.EnvironmentId), trx) - if err != nil { - return err - } - // run environment warning actions - for _, action := range a.envWarningActions { - if err := action.HandleEnvironment(env, rxBytes, txBytes, a.cfg.Bandwidth.PerEnvironment, trx); err != nil { - return errors.Wrapf(err, "%v", reflect.TypeOf(action).String()) - } - } - if err := trx.Commit(); err != nil { - return err - } - } else { - logrus.Debugf("already warned environment '#%d' at %v", u.EnvironmentId, warnedAt) - } - - } else { - if enforce, warning, rxBytes, txBytes, err := a.checkShareLimit(u.ShareToken); err == nil { - if enforce { - shr, err := a.str.FindShareWithToken(u.ShareToken, trx) - if err != nil { - return err - } - - enforced := false - var enforcedAt time.Time - if empty, err := a.str.IsShareLimitJournalEmpty(shr.Id, trx); err == nil && !empty { - if latest, err := a.str.FindLatestShareLimitJournal(shr.Id, trx); err == nil { - enforced = latest.Action == store.LimitLimitAction - enforcedAt = latest.UpdatedAt - } - } - - if !enforced { - _, err := a.str.CreateShareLimitJournal(&store.ShareLimitJournal{ - ShareId: shr.Id, - RxBytes: rxBytes, - TxBytes: txBytes, - Action: store.LimitLimitAction, - }, trx) - if err != nil { - return err - } - // run share limit actions - for _, action := range a.shrLimitActions { - if err := action.HandleShare(shr, rxBytes, txBytes, a.cfg.Bandwidth.PerShare, trx); err != nil { - return errors.Wrapf(err, "%v", reflect.TypeOf(action).String()) - } - } - if err := trx.Commit(); err != nil { - return err - } - } else { - logrus.Debugf("already enforced limit for share '%v' at %v", shr.Token, enforcedAt) - } - - } else if warning { - shr, err := a.str.FindShareWithToken(u.ShareToken, trx) - if err != nil { - return err - } - - warned := false - var warnedAt time.Time - if empty, err := a.str.IsShareLimitJournalEmpty(shr.Id, trx); err == nil && !empty { - if latest, err := a.str.FindLatestShareLimitJournal(shr.Id, trx); err == nil { - warned = latest.Action == store.WarningLimitAction || latest.Action == store.LimitLimitAction - warnedAt = latest.UpdatedAt - } - } - - if !warned { - _, err := a.str.CreateShareLimitJournal(&store.ShareLimitJournal{ - ShareId: shr.Id, - RxBytes: rxBytes, - TxBytes: txBytes, - Action: store.WarningLimitAction, - }, trx) - if err != nil { - return err - } - // run share warning actions - for _, action := range a.shrWarningActions { - if err := action.HandleShare(shr, rxBytes, txBytes, a.cfg.Bandwidth.PerShare, trx); err != nil { - return errors.Wrapf(err, "%v", reflect.TypeOf(action).String()) - } - } - if err := trx.Commit(); err != nil { - return err - } - } else { - logrus.Debugf("already warned share '%v' at %v", shr.Token, warnedAt) - } - } - } else { - logrus.Error(err) - } - } - } else { - logrus.Error(err) - } } } else { logrus.Error(err) @@ -542,78 +368,6 @@ func (a *Agent) relax() error { commit := false - if sljs, err := a.str.FindAllLatestShareLimitJournal(trx); err == nil { - for _, slj := range sljs { - if shr, err := a.str.GetShare(slj.ShareId, trx); err == nil { - if slj.Action == store.WarningLimitAction || slj.Action == store.LimitLimitAction { - if enforce, warning, rxBytes, txBytes, err := a.checkShareLimit(shr.Token); err == nil { - if !enforce && !warning { - if slj.Action == store.LimitLimitAction { - // run relax actions for share - for _, action := range a.shrRelaxActions { - if err := action.HandleShare(shr, rxBytes, txBytes, a.cfg.Bandwidth.PerShare, trx); err != nil { - return errors.Wrapf(err, "%v", reflect.TypeOf(action).String()) - } - } - } else { - logrus.Infof("relaxing warning for '%v'", shr.Token) - } - if err := a.str.DeleteShareLimitJournalForShare(shr.Id, trx); err == nil { - commit = true - } else { - logrus.Errorf("error deleting share_limit_journal for '%v'", shr.Token) - } - } else { - logrus.Infof("share '%v' still over limit", shr.Token) - } - } else { - logrus.Errorf("error checking share limit for '%v': %v", shr.Token, err) - } - } - } else { - logrus.Errorf("error getting share for '#%d': %v", slj.ShareId, err) - } - } - } else { - return err - } - - if eljs, err := a.str.FindAllLatestEnvironmentLimitJournal(trx); err == nil { - for _, elj := range eljs { - if env, err := a.str.GetEnvironment(elj.EnvironmentId, trx); err == nil { - if elj.Action == store.WarningLimitAction || elj.Action == store.LimitLimitAction { - if enforce, warning, rxBytes, txBytes, err := a.checkEnvironmentLimit(int64(elj.EnvironmentId)); err == nil { - if !enforce && !warning { - if elj.Action == store.LimitLimitAction { - // run relax actions for environment - for _, action := range a.envRelaxActions { - if err := action.HandleEnvironment(env, rxBytes, txBytes, a.cfg.Bandwidth.PerEnvironment, trx); err != nil { - return errors.Wrapf(err, "%v", reflect.TypeOf(action).String()) - } - } - } else { - logrus.Infof("relaxing warning for '%v'", env.ZId) - } - if err := a.str.DeleteEnvironmentLimitJournalForEnvironment(env.Id, trx); err == nil { - commit = true - } else { - logrus.Errorf("error deleteing environment_limit_journal for '%v': %v", env.ZId, err) - } - } else { - logrus.Infof("environment '%v' still over limit", env.ZId) - } - } else { - logrus.Errorf("error checking environment limit for '%v': %v", env.ZId, err) - } - } - } else { - logrus.Errorf("error getting environment for '#%d': %v", elj.EnvironmentId, err) - } - } - } else { - return err - } - if aljs, err := a.str.FindAllLatestAccountLimitJournal(trx); err == nil { for _, alj := range aljs { if acct, err := a.str.GetAccount(alj.AccountId, trx); err == nil { @@ -623,7 +377,7 @@ func (a *Agent) relax() error { if alj.Action == store.LimitLimitAction { // run relax actions for account for _, action := range a.acctRelaxActions { - if err := action.HandleAccount(acct, rxBytes, txBytes, a.cfg.Bandwidth.PerAccount, trx); err != nil { + if err := action.HandleAccount(acct, rxBytes, txBytes, a.cfg.Bandwidth, trx); err != nil { return errors.Wrapf(err, "%v", reflect.TypeOf(action).String()) } } @@ -662,8 +416,8 @@ func (a *Agent) relax() error { func (a *Agent) checkAccountLimit(acctId int64) (enforce, warning bool, rxBytes, txBytes int64, err error) { period := 24 * time.Hour limit := DefaultBandwidthPerPeriod() - if a.cfg.Bandwidth != nil && a.cfg.Bandwidth.PerAccount != nil { - limit = a.cfg.Bandwidth.PerAccount + if a.cfg.Bandwidth != nil && a.cfg.Bandwidth != nil { + limit = a.cfg.Bandwidth } if limit.Period > 0 { period = limit.Period @@ -677,46 +431,6 @@ func (a *Agent) checkAccountLimit(acctId int64) (enforce, warning bool, rxBytes, return enforce, warning, rx, tx, nil } -func (a *Agent) checkEnvironmentLimit(envId int64) (enforce, warning bool, rxBytes, txBytes int64, err error) { - period := 24 * time.Hour - limit := DefaultBandwidthPerPeriod() - if a.cfg.Bandwidth != nil && a.cfg.Bandwidth.PerEnvironment != nil { - limit = a.cfg.Bandwidth.PerEnvironment - } - if limit.Period > 0 { - period = limit.Period - } - rx, tx, err := a.ifx.totalRxTxForEnvironment(envId, period) - if err != nil { - logrus.Error(err) - } - - enforce, warning = a.checkLimit(limit, rx, tx) - return enforce, warning, rx, tx, nil -} - -func (a *Agent) checkShareLimit(shrToken string) (enforce, warning bool, rxBytes, txBytes int64, err error) { - period := 24 * time.Hour - limit := DefaultBandwidthPerPeriod() - if a.cfg.Bandwidth != nil && a.cfg.Bandwidth.PerShare != nil { - limit = a.cfg.Bandwidth.PerShare - } - if limit.Period > 0 { - period = limit.Period - } - rx, tx, err := a.ifx.totalRxTxForShare(shrToken, period) - if err != nil { - logrus.Error(err) - } - - enforce, warning = a.checkLimit(limit, rx, tx) - if enforce || warning { - logrus.Debugf("'%v': %v", shrToken, describeLimit(limit, rx, tx)) - } - - return enforce, warning, rx, tx, nil -} - func (a *Agent) checkLimit(cfg *BandwidthPerPeriod, rx, tx int64) (enforce, warning bool) { if cfg.Limit.Rx != Unlimited && rx > cfg.Limit.Rx { return true, false diff --git a/controller/limits/config.go b/controller/limits/config.go index 412fa35a..bd2c28d2 100644 --- a/controller/limits/config.go +++ b/controller/limits/config.go @@ -9,17 +9,11 @@ type Config struct { Shares int ReservedShares int UniqueNames int - Bandwidth *BandwidthConfig + Bandwidth *BandwidthPerPeriod Cycle time.Duration Enforcing bool } -type BandwidthConfig struct { - PerAccount *BandwidthPerPeriod - PerEnvironment *BandwidthPerPeriod - PerShare *BandwidthPerPeriod -} - type BandwidthPerPeriod struct { Period time.Duration Warning *Bandwidth @@ -54,12 +48,8 @@ func DefaultConfig() *Config { Shares: Unlimited, ReservedShares: Unlimited, UniqueNames: Unlimited, - Bandwidth: &BandwidthConfig{ - PerAccount: DefaultBandwidthPerPeriod(), - PerEnvironment: DefaultBandwidthPerPeriod(), - PerShare: DefaultBandwidthPerPeriod(), - }, - Enforcing: false, - Cycle: 15 * time.Minute, + Bandwidth: DefaultBandwidthPerPeriod(), + Enforcing: false, + Cycle: 15 * time.Minute, } } diff --git a/controller/limits/environmentLimitAction.go b/controller/limits/environmentLimitAction.go deleted file mode 100644 index 31f10cec..00000000 --- a/controller/limits/environmentLimitAction.go +++ /dev/null @@ -1,41 +0,0 @@ -package limits - -import ( - "github.com/jmoiron/sqlx" - "github.com/openziti/zrok/controller/store" - "github.com/openziti/zrok/controller/zrokEdgeSdk" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" -) - -type environmentLimitAction struct { - str *store.Store - zCfg *zrokEdgeSdk.Config -} - -func newEnvironmentLimitAction(str *store.Store, zCfg *zrokEdgeSdk.Config) *environmentLimitAction { - return &environmentLimitAction{str, zCfg} -} - -func (a *environmentLimitAction) HandleEnvironment(env *store.Environment, _, _ int64, _ *BandwidthPerPeriod, trx *sqlx.Tx) error { - logrus.Infof("limiting '%v'", env.ZId) - - shrs, err := a.str.FindSharesForEnvironment(env.Id, trx) - if err != nil { - return errors.Wrapf(err, "error finding shares for environment '%v'", env.ZId) - } - - edge, err := zrokEdgeSdk.Client(a.zCfg) - if err != nil { - return err - } - - for _, shr := range shrs { - if err := zrokEdgeSdk.DeleteServicePoliciesDial(env.ZId, shr.Token, edge); err != nil { - return errors.Wrapf(err, "error deleting dial service policy for '%v'", shr.Token) - } - logrus.Infof("removed dial service policy for share '%v' of environment '%v'", shr.Token, env.ZId) - } - - return nil -} diff --git a/controller/limits/environmentRelaxAction.go b/controller/limits/environmentRelaxAction.go deleted file mode 100644 index aae04185..00000000 --- a/controller/limits/environmentRelaxAction.go +++ /dev/null @@ -1,50 +0,0 @@ -package limits - -import ( - "github.com/jmoiron/sqlx" - "github.com/openziti/zrok/controller/store" - "github.com/openziti/zrok/controller/zrokEdgeSdk" - "github.com/openziti/zrok/sdk/golang/sdk" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" -) - -type environmentRelaxAction struct { - str *store.Store - zCfg *zrokEdgeSdk.Config -} - -func newEnvironmentRelaxAction(str *store.Store, zCfg *zrokEdgeSdk.Config) *environmentRelaxAction { - return &environmentRelaxAction{str, zCfg} -} - -func (a *environmentRelaxAction) HandleEnvironment(env *store.Environment, rxBytes, txBytes int64, limit *BandwidthPerPeriod, trx *sqlx.Tx) error { - logrus.Infof("relaxing '%v'", env.ZId) - - shrs, err := a.str.FindSharesForEnvironment(env.Id, trx) - if err != nil { - return errors.Wrapf(err, "error finding shares for environment '%v'", env.ZId) - } - - edge, err := zrokEdgeSdk.Client(a.zCfg) - if err != nil { - return err - } - - for _, shr := range shrs { - if !shr.Deleted { - switch shr.ShareMode { - case string(sdk.PublicShareMode): - if err := relaxPublicShare(a.str, edge, shr, trx); err != nil { - return err - } - case string(sdk.PrivateShareMode): - if err := relaxPrivateShare(a.str, edge, shr, trx); err != nil { - return err - } - } - } - } - - return nil -} diff --git a/controller/limits/environmentWarningAction.go b/controller/limits/environmentWarningAction.go deleted file mode 100644 index 2d5b04a1..00000000 --- a/controller/limits/environmentWarningAction.go +++ /dev/null @@ -1,58 +0,0 @@ -package limits - -import ( - "github.com/jmoiron/sqlx" - "github.com/openziti/zrok/controller/emailUi" - "github.com/openziti/zrok/controller/store" - "github.com/openziti/zrok/util" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" -) - -type environmentWarningAction struct { - str *store.Store - cfg *emailUi.Config -} - -func newEnvironmentWarningAction(cfg *emailUi.Config, str *store.Store) *environmentWarningAction { - return &environmentWarningAction{str, cfg} -} - -func (a *environmentWarningAction) HandleEnvironment(env *store.Environment, rxBytes, txBytes int64, limit *BandwidthPerPeriod, trx *sqlx.Tx) error { - logrus.Infof("warning '%v'", env.ZId) - - if a.cfg != nil { - if env.AccountId != nil { - acct, err := a.str.GetAccount(*env.AccountId, trx) - if err != nil { - return err - } - - rxLimit := "unlimited bytes" - if limit.Limit.Rx != Unlimited { - rxLimit = util.BytesToSize(limit.Limit.Rx) - } - txLimit := "unlimited bytes" - if limit.Limit.Tx != Unlimited { - txLimit = util.BytesToSize(limit.Limit.Tx) - } - totalLimit := "unlimited bytes" - if limit.Limit.Total != Unlimited { - totalLimit = util.BytesToSize(limit.Limit.Total) - } - - detail := newDetailMessage() - detail = detail.append("Your environment '%v' has received %v and sent %v (for a total of %v), which has triggered a transfer limit warning.", env.Description, util.BytesToSize(rxBytes), util.BytesToSize(txBytes), util.BytesToSize(rxBytes+txBytes)) - detail = detail.append("This zrok instance only allows a share to receive %v, send %v, totalling not more than %v for each %v.", rxLimit, txLimit, totalLimit, limit.Period) - detail = detail.append("If you exceed the transfer limit, access to your shares will be temporarily disabled (until the last %v falls below the transfer limit).", limit.Period) - - if err := sendLimitWarningEmail(a.cfg, acct.Email, detail); err != nil { - return errors.Wrapf(err, "error sending limit warning email to '%v'", acct.Email) - } - } - } else { - logrus.Warnf("skipping warning email for environment limit; no email configuration specified") - } - - return nil -} diff --git a/controller/limits/shareLimitAction.go b/controller/limits/shareLimitAction.go deleted file mode 100644 index 8a4fc8b9..00000000 --- a/controller/limits/shareLimitAction.go +++ /dev/null @@ -1,38 +0,0 @@ -package limits - -import ( - "github.com/jmoiron/sqlx" - "github.com/openziti/zrok/controller/store" - "github.com/openziti/zrok/controller/zrokEdgeSdk" - "github.com/sirupsen/logrus" -) - -type shareLimitAction struct { - str *store.Store - zCfg *zrokEdgeSdk.Config -} - -func newShareLimitAction(str *store.Store, zCfg *zrokEdgeSdk.Config) *shareLimitAction { - return &shareLimitAction{str, zCfg} -} - -func (a *shareLimitAction) HandleShare(shr *store.Share, _, _ int64, _ *BandwidthPerPeriod, trx *sqlx.Tx) error { - logrus.Infof("limiting '%v'", shr.Token) - - env, err := a.str.GetEnvironment(shr.EnvironmentId, trx) - if err != nil { - return err - } - - edge, err := zrokEdgeSdk.Client(a.zCfg) - if err != nil { - return err - } - - if err := zrokEdgeSdk.DeleteServicePoliciesDial(env.ZId, shr.Token, edge); err != nil { - return err - } - logrus.Infof("removed dial service policy for '%v'", shr.Token) - - return nil -} diff --git a/controller/limits/shareRelaxAction.go b/controller/limits/shareRelaxAction.go deleted file mode 100644 index 5f05496d..00000000 --- a/controller/limits/shareRelaxAction.go +++ /dev/null @@ -1,89 +0,0 @@ -package limits - -import ( - "github.com/jmoiron/sqlx" - "github.com/openziti/edge-api/rest_management_api_client" - "github.com/openziti/zrok/controller/store" - "github.com/openziti/zrok/controller/zrokEdgeSdk" - "github.com/openziti/zrok/sdk/golang/sdk" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" -) - -type shareRelaxAction struct { - str *store.Store - zCfg *zrokEdgeSdk.Config -} - -func newShareRelaxAction(str *store.Store, zCfg *zrokEdgeSdk.Config) *shareRelaxAction { - return &shareRelaxAction{str, zCfg} -} - -func (a *shareRelaxAction) HandleShare(shr *store.Share, _, _ int64, _ *BandwidthPerPeriod, trx *sqlx.Tx) error { - logrus.Infof("relaxing '%v'", shr.Token) - - if !shr.Deleted { - edge, err := zrokEdgeSdk.Client(a.zCfg) - if err != nil { - return err - } - - switch shr.ShareMode { - case string(sdk.PublicShareMode): - if err := relaxPublicShare(a.str, edge, shr, trx); err != nil { - return err - } - case string(sdk.PrivateShareMode): - if err := relaxPrivateShare(a.str, edge, shr, trx); err != nil { - return err - } - } - } - - return nil -} - -func relaxPublicShare(str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement, shr *store.Share, trx *sqlx.Tx) error { - env, err := str.GetEnvironment(shr.EnvironmentId, trx) - if err != nil { - return errors.Wrap(err, "error finding environment") - } - - fe, err := str.FindFrontendPubliclyNamed(*shr.FrontendSelection, trx) - if err != nil { - return errors.Wrapf(err, "error finding frontend name '%v' for '%v'", *shr.FrontendSelection, shr.Token) - } - - if err := zrokEdgeSdk.CreateServicePolicyDial(env.ZId+"-"+shr.ZId+"-dial", shr.ZId, []string{fe.ZId}, zrokEdgeSdk.ZrokShareTags(shr.Token).SubTags, edge); err != nil { - return errors.Wrapf(err, "error creating dial service policy for '%v'", shr.Token) - } - logrus.Infof("added dial service policy for '%v'", shr.Token) - return nil -} - -func relaxPrivateShare(str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement, shr *store.Share, trx *sqlx.Tx) error { - fes, err := str.FindFrontendsForPrivateShare(shr.Id, trx) - if err != nil { - return errors.Wrapf(err, "error finding frontends for share '%v'", shr.Token) - } - for _, fe := range fes { - if fe.EnvironmentId != nil { - env, err := str.GetEnvironment(*fe.EnvironmentId, trx) - if err != nil { - return errors.Wrapf(err, "error getting environment for frontend '%v'", fe.Token) - } - - addlTags := map[string]interface{}{ - "zrokEnvironmentZId": env.ZId, - "zrokFrontendToken": fe.Token, - "zrokShareToken": shr.Token, - } - if err := zrokEdgeSdk.CreateServicePolicyDial(fe.Token+"-"+env.ZId+"-"+shr.ZId+"-dial", shr.ZId, []string{env.ZId}, addlTags, edge); err != nil { - return errors.Wrapf(err, "unable to create dial policy for frontend '%v'", fe.Token) - } - - logrus.Infof("added dial service policy for share '%v' to private frontend '%v'", shr.Token, fe.Token) - } - } - return nil -} diff --git a/controller/limits/shareWarningAction.go b/controller/limits/shareWarningAction.go deleted file mode 100644 index 764b6d3f..00000000 --- a/controller/limits/shareWarningAction.go +++ /dev/null @@ -1,63 +0,0 @@ -package limits - -import ( - "github.com/jmoiron/sqlx" - "github.com/openziti/zrok/controller/emailUi" - "github.com/openziti/zrok/controller/store" - "github.com/openziti/zrok/util" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" -) - -type shareWarningAction struct { - str *store.Store - cfg *emailUi.Config -} - -func newShareWarningAction(cfg *emailUi.Config, str *store.Store) *shareWarningAction { - return &shareWarningAction{str, cfg} -} - -func (a *shareWarningAction) HandleShare(shr *store.Share, rxBytes, txBytes int64, limit *BandwidthPerPeriod, trx *sqlx.Tx) error { - logrus.Infof("warning '%v'", shr.Token) - - if a.cfg != nil { - env, err := a.str.GetEnvironment(shr.EnvironmentId, trx) - if err != nil { - return err - } - - if env.AccountId != nil { - acct, err := a.str.GetAccount(*env.AccountId, trx) - if err != nil { - return err - } - - rxLimit := "unlimited bytes" - if limit.Limit.Rx != Unlimited { - rxLimit = util.BytesToSize(limit.Limit.Rx) - } - txLimit := "unlimited bytes" - if limit.Limit.Tx != Unlimited { - txLimit = util.BytesToSize(limit.Limit.Tx) - } - totalLimit := "unlimited bytes" - if limit.Limit.Total != Unlimited { - totalLimit = util.BytesToSize(limit.Limit.Total) - } - - detail := newDetailMessage() - detail = detail.append("Your share '%v' has received %v and sent %v (for a total of %v), which has triggered a transfer limit warning.", shr.Token, util.BytesToSize(rxBytes), util.BytesToSize(txBytes), util.BytesToSize(rxBytes+txBytes)) - detail = detail.append("This zrok instance only allows a share to receive %v, send %v, totalling not more than %v for each %v.", rxLimit, txLimit, totalLimit, limit.Period) - detail = detail.append("If you exceed the transfer limit, access to your shares will be temporarily disabled (until the last %v falls below the transfer limit).", limit.Period) - - if err := sendLimitWarningEmail(a.cfg, acct.Email, detail); err != nil { - return errors.Wrapf(err, "error sending limit warning email to '%v'", acct.Email) - } - } - } else { - logrus.Warnf("skipping warning email for share limit; no email configuration specified") - } - - return nil -}