simplifying limit enforcement structures (#606)

This commit is contained in:
Michael Quigley 2024-05-30 13:46:15 -04:00
parent 3077c6462d
commit 141cb424e0
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
9 changed files with 55 additions and 644 deletions

View File

@ -2,6 +2,7 @@ package limits
import ( import (
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/openziti/edge-api/rest_management_api_client"
"github.com/openziti/zrok/controller/store" "github.com/openziti/zrok/controller/store"
"github.com/openziti/zrok/controller/zrokEdgeSdk" "github.com/openziti/zrok/controller/zrokEdgeSdk"
"github.com/openziti/zrok/sdk/golang/sdk" "github.com/openziti/zrok/sdk/golang/sdk"
@ -53,3 +54,48 @@ func (a *accountRelaxAction) HandleAccount(acct *store.Account, _, _ int64, _ *B
return nil return nil
} }
func relaxPublicShare(str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement, shr *store.Share, trx *sqlx.Tx) error {
env, err := str.GetEnvironment(shr.EnvironmentId, trx)
if err != nil {
return errors.Wrap(err, "error finding environment")
}
fe, err := str.FindFrontendPubliclyNamed(*shr.FrontendSelection, trx)
if err != nil {
return errors.Wrapf(err, "error finding frontend name '%v' for '%v'", *shr.FrontendSelection, shr.Token)
}
if err := zrokEdgeSdk.CreateServicePolicyDial(env.ZId+"-"+shr.ZId+"-dial", shr.ZId, []string{fe.ZId}, zrokEdgeSdk.ZrokShareTags(shr.Token).SubTags, edge); err != nil {
return errors.Wrapf(err, "error creating dial service policy for '%v'", shr.Token)
}
logrus.Infof("added dial service policy for '%v'", shr.Token)
return nil
}
func relaxPrivateShare(str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement, shr *store.Share, trx *sqlx.Tx) error {
fes, err := str.FindFrontendsForPrivateShare(shr.Id, trx)
if err != nil {
return errors.Wrapf(err, "error finding frontends for share '%v'", shr.Token)
}
for _, fe := range fes {
if fe.EnvironmentId != nil {
env, err := str.GetEnvironment(*fe.EnvironmentId, trx)
if err != nil {
return errors.Wrapf(err, "error getting environment for frontend '%v'", fe.Token)
}
addlTags := map[string]interface{}{
"zrokEnvironmentZId": env.ZId,
"zrokFrontendToken": fe.Token,
"zrokShareToken": shr.Token,
}
if err := zrokEdgeSdk.CreateServicePolicyDial(fe.Token+"-"+env.ZId+"-"+shr.ZId+"-dial", shr.ZId, []string{env.ZId}, addlTags, edge); err != nil {
return errors.Wrapf(err, "unable to create dial policy for frontend '%v'", fe.Token)
}
logrus.Infof("added dial service policy for share '%v' to private frontend '%v'", shr.Token, fe.Token)
}
}
return nil
}

View File

@ -24,12 +24,6 @@ type Agent struct {
acctWarningActions []AccountAction acctWarningActions []AccountAction
acctLimitActions []AccountAction acctLimitActions []AccountAction
acctRelaxActions []AccountAction acctRelaxActions []AccountAction
envWarningActions []EnvironmentAction
envLimitActions []EnvironmentAction
envRelaxActions []EnvironmentAction
shrWarningActions []ShareAction
shrLimitActions []ShareAction
shrRelaxActions []ShareAction
close chan struct{} close chan struct{}
join chan struct{} join chan struct{}
} }
@ -44,12 +38,6 @@ func NewAgent(cfg *Config, ifxCfg *metrics.InfluxConfig, zCfg *zrokEdgeSdk.Confi
acctWarningActions: []AccountAction{newAccountWarningAction(emailCfg, str)}, acctWarningActions: []AccountAction{newAccountWarningAction(emailCfg, str)},
acctLimitActions: []AccountAction{newAccountLimitAction(str, zCfg)}, acctLimitActions: []AccountAction{newAccountLimitAction(str, zCfg)},
acctRelaxActions: []AccountAction{newAccountRelaxAction(str, zCfg)}, acctRelaxActions: []AccountAction{newAccountRelaxAction(str, zCfg)},
envWarningActions: []EnvironmentAction{newEnvironmentWarningAction(emailCfg, str)},
envLimitActions: []EnvironmentAction{newEnvironmentLimitAction(str, zCfg)},
envRelaxActions: []EnvironmentAction{newEnvironmentRelaxAction(str, zCfg)},
shrWarningActions: []ShareAction{newShareWarningAction(emailCfg, str)},
shrLimitActions: []ShareAction{newShareLimitAction(str, zCfg)},
shrRelaxActions: []ShareAction{newShareRelaxAction(str, zCfg)},
close: make(chan struct{}), close: make(chan struct{}),
join: make(chan struct{}), join: make(chan struct{}),
} }
@ -314,7 +302,7 @@ func (a *Agent) enforce(u *metrics.Usage) error {
} }
// run account limit actions // run account limit actions
for _, action := range a.acctLimitActions { for _, action := range a.acctLimitActions {
if err := action.HandleAccount(acct, rxBytes, txBytes, a.cfg.Bandwidth.PerAccount, trx); err != nil { if err := action.HandleAccount(acct, rxBytes, txBytes, a.cfg.Bandwidth, trx); err != nil {
return errors.Wrapf(err, "%v", reflect.TypeOf(action).String()) return errors.Wrapf(err, "%v", reflect.TypeOf(action).String())
} }
} }
@ -351,7 +339,7 @@ func (a *Agent) enforce(u *metrics.Usage) error {
} }
// run account warning actions // run account warning actions
for _, action := range a.acctWarningActions { for _, action := range a.acctWarningActions {
if err := action.HandleAccount(acct, rxBytes, txBytes, a.cfg.Bandwidth.PerAccount, trx); err != nil { if err := action.HandleAccount(acct, rxBytes, txBytes, a.cfg.Bandwidth, trx); err != nil {
return errors.Wrapf(err, "%v", reflect.TypeOf(action).String()) return errors.Wrapf(err, "%v", reflect.TypeOf(action).String())
} }
} }
@ -361,168 +349,6 @@ func (a *Agent) enforce(u *metrics.Usage) error {
} else { } else {
logrus.Debugf("already warned account '#%d' at %v", u.AccountId, warnedAt) logrus.Debugf("already warned account '#%d' at %v", u.AccountId, warnedAt)
} }
} else {
if enforce, warning, rxBytes, txBytes, err := a.checkEnvironmentLimit(u.EnvironmentId); err == nil {
if enforce {
enforced := false
var enforcedAt time.Time
if empty, err := a.str.IsEnvironmentLimitJournalEmpty(int(u.EnvironmentId), trx); err == nil && !empty {
if latest, err := a.str.FindLatestEnvironmentLimitJournal(int(u.EnvironmentId), trx); err == nil {
enforced = latest.Action == store.LimitLimitAction
enforcedAt = latest.UpdatedAt
}
}
if !enforced {
_, err := a.str.CreateEnvironmentLimitJournal(&store.EnvironmentLimitJournal{
EnvironmentId: int(u.EnvironmentId),
RxBytes: rxBytes,
TxBytes: txBytes,
Action: store.LimitLimitAction,
}, trx)
if err != nil {
return err
}
env, err := a.str.GetEnvironment(int(u.EnvironmentId), trx)
if err != nil {
return err
}
// run environment limit actions
for _, action := range a.envLimitActions {
if err := action.HandleEnvironment(env, rxBytes, txBytes, a.cfg.Bandwidth.PerEnvironment, trx); err != nil {
return errors.Wrapf(err, "%v", reflect.TypeOf(action).String())
}
}
if err := trx.Commit(); err != nil {
return err
}
} else {
logrus.Debugf("already enforced limit for environment '#%d' at %v", u.EnvironmentId, enforcedAt)
}
} else if warning {
warned := false
var warnedAt time.Time
if empty, err := a.str.IsEnvironmentLimitJournalEmpty(int(u.EnvironmentId), trx); err == nil && !empty {
if latest, err := a.str.FindLatestEnvironmentLimitJournal(int(u.EnvironmentId), trx); err == nil {
warned = latest.Action == store.WarningLimitAction || latest.Action == store.LimitLimitAction
warnedAt = latest.UpdatedAt
}
}
if !warned {
_, err := a.str.CreateEnvironmentLimitJournal(&store.EnvironmentLimitJournal{
EnvironmentId: int(u.EnvironmentId),
RxBytes: rxBytes,
TxBytes: txBytes,
Action: store.WarningLimitAction,
}, trx)
if err != nil {
return err
}
env, err := a.str.GetEnvironment(int(u.EnvironmentId), trx)
if err != nil {
return err
}
// run environment warning actions
for _, action := range a.envWarningActions {
if err := action.HandleEnvironment(env, rxBytes, txBytes, a.cfg.Bandwidth.PerEnvironment, trx); err != nil {
return errors.Wrapf(err, "%v", reflect.TypeOf(action).String())
}
}
if err := trx.Commit(); err != nil {
return err
}
} else {
logrus.Debugf("already warned environment '#%d' at %v", u.EnvironmentId, warnedAt)
}
} else {
if enforce, warning, rxBytes, txBytes, err := a.checkShareLimit(u.ShareToken); err == nil {
if enforce {
shr, err := a.str.FindShareWithToken(u.ShareToken, trx)
if err != nil {
return err
}
enforced := false
var enforcedAt time.Time
if empty, err := a.str.IsShareLimitJournalEmpty(shr.Id, trx); err == nil && !empty {
if latest, err := a.str.FindLatestShareLimitJournal(shr.Id, trx); err == nil {
enforced = latest.Action == store.LimitLimitAction
enforcedAt = latest.UpdatedAt
}
}
if !enforced {
_, err := a.str.CreateShareLimitJournal(&store.ShareLimitJournal{
ShareId: shr.Id,
RxBytes: rxBytes,
TxBytes: txBytes,
Action: store.LimitLimitAction,
}, trx)
if err != nil {
return err
}
// run share limit actions
for _, action := range a.shrLimitActions {
if err := action.HandleShare(shr, rxBytes, txBytes, a.cfg.Bandwidth.PerShare, trx); err != nil {
return errors.Wrapf(err, "%v", reflect.TypeOf(action).String())
}
}
if err := trx.Commit(); err != nil {
return err
}
} else {
logrus.Debugf("already enforced limit for share '%v' at %v", shr.Token, enforcedAt)
}
} else if warning {
shr, err := a.str.FindShareWithToken(u.ShareToken, trx)
if err != nil {
return err
}
warned := false
var warnedAt time.Time
if empty, err := a.str.IsShareLimitJournalEmpty(shr.Id, trx); err == nil && !empty {
if latest, err := a.str.FindLatestShareLimitJournal(shr.Id, trx); err == nil {
warned = latest.Action == store.WarningLimitAction || latest.Action == store.LimitLimitAction
warnedAt = latest.UpdatedAt
}
}
if !warned {
_, err := a.str.CreateShareLimitJournal(&store.ShareLimitJournal{
ShareId: shr.Id,
RxBytes: rxBytes,
TxBytes: txBytes,
Action: store.WarningLimitAction,
}, trx)
if err != nil {
return err
}
// run share warning actions
for _, action := range a.shrWarningActions {
if err := action.HandleShare(shr, rxBytes, txBytes, a.cfg.Bandwidth.PerShare, trx); err != nil {
return errors.Wrapf(err, "%v", reflect.TypeOf(action).String())
}
}
if err := trx.Commit(); err != nil {
return err
}
} else {
logrus.Debugf("already warned share '%v' at %v", shr.Token, warnedAt)
}
}
} else {
logrus.Error(err)
}
}
} else {
logrus.Error(err)
}
} }
} else { } else {
logrus.Error(err) logrus.Error(err)
@ -542,78 +368,6 @@ func (a *Agent) relax() error {
commit := false commit := false
if sljs, err := a.str.FindAllLatestShareLimitJournal(trx); err == nil {
for _, slj := range sljs {
if shr, err := a.str.GetShare(slj.ShareId, trx); err == nil {
if slj.Action == store.WarningLimitAction || slj.Action == store.LimitLimitAction {
if enforce, warning, rxBytes, txBytes, err := a.checkShareLimit(shr.Token); err == nil {
if !enforce && !warning {
if slj.Action == store.LimitLimitAction {
// run relax actions for share
for _, action := range a.shrRelaxActions {
if err := action.HandleShare(shr, rxBytes, txBytes, a.cfg.Bandwidth.PerShare, trx); err != nil {
return errors.Wrapf(err, "%v", reflect.TypeOf(action).String())
}
}
} else {
logrus.Infof("relaxing warning for '%v'", shr.Token)
}
if err := a.str.DeleteShareLimitJournalForShare(shr.Id, trx); err == nil {
commit = true
} else {
logrus.Errorf("error deleting share_limit_journal for '%v'", shr.Token)
}
} else {
logrus.Infof("share '%v' still over limit", shr.Token)
}
} else {
logrus.Errorf("error checking share limit for '%v': %v", shr.Token, err)
}
}
} else {
logrus.Errorf("error getting share for '#%d': %v", slj.ShareId, err)
}
}
} else {
return err
}
if eljs, err := a.str.FindAllLatestEnvironmentLimitJournal(trx); err == nil {
for _, elj := range eljs {
if env, err := a.str.GetEnvironment(elj.EnvironmentId, trx); err == nil {
if elj.Action == store.WarningLimitAction || elj.Action == store.LimitLimitAction {
if enforce, warning, rxBytes, txBytes, err := a.checkEnvironmentLimit(int64(elj.EnvironmentId)); err == nil {
if !enforce && !warning {
if elj.Action == store.LimitLimitAction {
// run relax actions for environment
for _, action := range a.envRelaxActions {
if err := action.HandleEnvironment(env, rxBytes, txBytes, a.cfg.Bandwidth.PerEnvironment, trx); err != nil {
return errors.Wrapf(err, "%v", reflect.TypeOf(action).String())
}
}
} else {
logrus.Infof("relaxing warning for '%v'", env.ZId)
}
if err := a.str.DeleteEnvironmentLimitJournalForEnvironment(env.Id, trx); err == nil {
commit = true
} else {
logrus.Errorf("error deleteing environment_limit_journal for '%v': %v", env.ZId, err)
}
} else {
logrus.Infof("environment '%v' still over limit", env.ZId)
}
} else {
logrus.Errorf("error checking environment limit for '%v': %v", env.ZId, err)
}
}
} else {
logrus.Errorf("error getting environment for '#%d': %v", elj.EnvironmentId, err)
}
}
} else {
return err
}
if aljs, err := a.str.FindAllLatestAccountLimitJournal(trx); err == nil { if aljs, err := a.str.FindAllLatestAccountLimitJournal(trx); err == nil {
for _, alj := range aljs { for _, alj := range aljs {
if acct, err := a.str.GetAccount(alj.AccountId, trx); err == nil { if acct, err := a.str.GetAccount(alj.AccountId, trx); err == nil {
@ -623,7 +377,7 @@ func (a *Agent) relax() error {
if alj.Action == store.LimitLimitAction { if alj.Action == store.LimitLimitAction {
// run relax actions for account // run relax actions for account
for _, action := range a.acctRelaxActions { for _, action := range a.acctRelaxActions {
if err := action.HandleAccount(acct, rxBytes, txBytes, a.cfg.Bandwidth.PerAccount, trx); err != nil { if err := action.HandleAccount(acct, rxBytes, txBytes, a.cfg.Bandwidth, trx); err != nil {
return errors.Wrapf(err, "%v", reflect.TypeOf(action).String()) return errors.Wrapf(err, "%v", reflect.TypeOf(action).String())
} }
} }
@ -662,8 +416,8 @@ func (a *Agent) relax() error {
func (a *Agent) checkAccountLimit(acctId int64) (enforce, warning bool, rxBytes, txBytes int64, err error) { func (a *Agent) checkAccountLimit(acctId int64) (enforce, warning bool, rxBytes, txBytes int64, err error) {
period := 24 * time.Hour period := 24 * time.Hour
limit := DefaultBandwidthPerPeriod() limit := DefaultBandwidthPerPeriod()
if a.cfg.Bandwidth != nil && a.cfg.Bandwidth.PerAccount != nil { if a.cfg.Bandwidth != nil && a.cfg.Bandwidth != nil {
limit = a.cfg.Bandwidth.PerAccount limit = a.cfg.Bandwidth
} }
if limit.Period > 0 { if limit.Period > 0 {
period = limit.Period period = limit.Period
@ -677,46 +431,6 @@ func (a *Agent) checkAccountLimit(acctId int64) (enforce, warning bool, rxBytes,
return enforce, warning, rx, tx, nil return enforce, warning, rx, tx, nil
} }
func (a *Agent) checkEnvironmentLimit(envId int64) (enforce, warning bool, rxBytes, txBytes int64, err error) {
period := 24 * time.Hour
limit := DefaultBandwidthPerPeriod()
if a.cfg.Bandwidth != nil && a.cfg.Bandwidth.PerEnvironment != nil {
limit = a.cfg.Bandwidth.PerEnvironment
}
if limit.Period > 0 {
period = limit.Period
}
rx, tx, err := a.ifx.totalRxTxForEnvironment(envId, period)
if err != nil {
logrus.Error(err)
}
enforce, warning = a.checkLimit(limit, rx, tx)
return enforce, warning, rx, tx, nil
}
func (a *Agent) checkShareLimit(shrToken string) (enforce, warning bool, rxBytes, txBytes int64, err error) {
period := 24 * time.Hour
limit := DefaultBandwidthPerPeriod()
if a.cfg.Bandwidth != nil && a.cfg.Bandwidth.PerShare != nil {
limit = a.cfg.Bandwidth.PerShare
}
if limit.Period > 0 {
period = limit.Period
}
rx, tx, err := a.ifx.totalRxTxForShare(shrToken, period)
if err != nil {
logrus.Error(err)
}
enforce, warning = a.checkLimit(limit, rx, tx)
if enforce || warning {
logrus.Debugf("'%v': %v", shrToken, describeLimit(limit, rx, tx))
}
return enforce, warning, rx, tx, nil
}
func (a *Agent) checkLimit(cfg *BandwidthPerPeriod, rx, tx int64) (enforce, warning bool) { func (a *Agent) checkLimit(cfg *BandwidthPerPeriod, rx, tx int64) (enforce, warning bool) {
if cfg.Limit.Rx != Unlimited && rx > cfg.Limit.Rx { if cfg.Limit.Rx != Unlimited && rx > cfg.Limit.Rx {
return true, false return true, false

View File

@ -9,17 +9,11 @@ type Config struct {
Shares int Shares int
ReservedShares int ReservedShares int
UniqueNames int UniqueNames int
Bandwidth *BandwidthConfig Bandwidth *BandwidthPerPeriod
Cycle time.Duration Cycle time.Duration
Enforcing bool Enforcing bool
} }
type BandwidthConfig struct {
PerAccount *BandwidthPerPeriod
PerEnvironment *BandwidthPerPeriod
PerShare *BandwidthPerPeriod
}
type BandwidthPerPeriod struct { type BandwidthPerPeriod struct {
Period time.Duration Period time.Duration
Warning *Bandwidth Warning *Bandwidth
@ -54,12 +48,8 @@ func DefaultConfig() *Config {
Shares: Unlimited, Shares: Unlimited,
ReservedShares: Unlimited, ReservedShares: Unlimited,
UniqueNames: Unlimited, UniqueNames: Unlimited,
Bandwidth: &BandwidthConfig{ Bandwidth: DefaultBandwidthPerPeriod(),
PerAccount: DefaultBandwidthPerPeriod(), Enforcing: false,
PerEnvironment: DefaultBandwidthPerPeriod(), Cycle: 15 * time.Minute,
PerShare: DefaultBandwidthPerPeriod(),
},
Enforcing: false,
Cycle: 15 * time.Minute,
} }
} }

View File

@ -1,41 +0,0 @@
package limits
import (
"github.com/jmoiron/sqlx"
"github.com/openziti/zrok/controller/store"
"github.com/openziti/zrok/controller/zrokEdgeSdk"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type environmentLimitAction struct {
str *store.Store
zCfg *zrokEdgeSdk.Config
}
func newEnvironmentLimitAction(str *store.Store, zCfg *zrokEdgeSdk.Config) *environmentLimitAction {
return &environmentLimitAction{str, zCfg}
}
func (a *environmentLimitAction) HandleEnvironment(env *store.Environment, _, _ int64, _ *BandwidthPerPeriod, trx *sqlx.Tx) error {
logrus.Infof("limiting '%v'", env.ZId)
shrs, err := a.str.FindSharesForEnvironment(env.Id, trx)
if err != nil {
return errors.Wrapf(err, "error finding shares for environment '%v'", env.ZId)
}
edge, err := zrokEdgeSdk.Client(a.zCfg)
if err != nil {
return err
}
for _, shr := range shrs {
if err := zrokEdgeSdk.DeleteServicePoliciesDial(env.ZId, shr.Token, edge); err != nil {
return errors.Wrapf(err, "error deleting dial service policy for '%v'", shr.Token)
}
logrus.Infof("removed dial service policy for share '%v' of environment '%v'", shr.Token, env.ZId)
}
return nil
}

View File

@ -1,50 +0,0 @@
package limits
import (
"github.com/jmoiron/sqlx"
"github.com/openziti/zrok/controller/store"
"github.com/openziti/zrok/controller/zrokEdgeSdk"
"github.com/openziti/zrok/sdk/golang/sdk"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type environmentRelaxAction struct {
str *store.Store
zCfg *zrokEdgeSdk.Config
}
func newEnvironmentRelaxAction(str *store.Store, zCfg *zrokEdgeSdk.Config) *environmentRelaxAction {
return &environmentRelaxAction{str, zCfg}
}
func (a *environmentRelaxAction) HandleEnvironment(env *store.Environment, rxBytes, txBytes int64, limit *BandwidthPerPeriod, trx *sqlx.Tx) error {
logrus.Infof("relaxing '%v'", env.ZId)
shrs, err := a.str.FindSharesForEnvironment(env.Id, trx)
if err != nil {
return errors.Wrapf(err, "error finding shares for environment '%v'", env.ZId)
}
edge, err := zrokEdgeSdk.Client(a.zCfg)
if err != nil {
return err
}
for _, shr := range shrs {
if !shr.Deleted {
switch shr.ShareMode {
case string(sdk.PublicShareMode):
if err := relaxPublicShare(a.str, edge, shr, trx); err != nil {
return err
}
case string(sdk.PrivateShareMode):
if err := relaxPrivateShare(a.str, edge, shr, trx); err != nil {
return err
}
}
}
}
return nil
}

View File

@ -1,58 +0,0 @@
package limits
import (
"github.com/jmoiron/sqlx"
"github.com/openziti/zrok/controller/emailUi"
"github.com/openziti/zrok/controller/store"
"github.com/openziti/zrok/util"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type environmentWarningAction struct {
str *store.Store
cfg *emailUi.Config
}
func newEnvironmentWarningAction(cfg *emailUi.Config, str *store.Store) *environmentWarningAction {
return &environmentWarningAction{str, cfg}
}
func (a *environmentWarningAction) HandleEnvironment(env *store.Environment, rxBytes, txBytes int64, limit *BandwidthPerPeriod, trx *sqlx.Tx) error {
logrus.Infof("warning '%v'", env.ZId)
if a.cfg != nil {
if env.AccountId != nil {
acct, err := a.str.GetAccount(*env.AccountId, trx)
if err != nil {
return err
}
rxLimit := "unlimited bytes"
if limit.Limit.Rx != Unlimited {
rxLimit = util.BytesToSize(limit.Limit.Rx)
}
txLimit := "unlimited bytes"
if limit.Limit.Tx != Unlimited {
txLimit = util.BytesToSize(limit.Limit.Tx)
}
totalLimit := "unlimited bytes"
if limit.Limit.Total != Unlimited {
totalLimit = util.BytesToSize(limit.Limit.Total)
}
detail := newDetailMessage()
detail = detail.append("Your environment '%v' has received %v and sent %v (for a total of %v), which has triggered a transfer limit warning.", env.Description, util.BytesToSize(rxBytes), util.BytesToSize(txBytes), util.BytesToSize(rxBytes+txBytes))
detail = detail.append("This zrok instance only allows a share to receive %v, send %v, totalling not more than %v for each %v.", rxLimit, txLimit, totalLimit, limit.Period)
detail = detail.append("If you exceed the transfer limit, access to your shares will be temporarily disabled (until the last %v falls below the transfer limit).", limit.Period)
if err := sendLimitWarningEmail(a.cfg, acct.Email, detail); err != nil {
return errors.Wrapf(err, "error sending limit warning email to '%v'", acct.Email)
}
}
} else {
logrus.Warnf("skipping warning email for environment limit; no email configuration specified")
}
return nil
}

View File

@ -1,38 +0,0 @@
package limits
import (
"github.com/jmoiron/sqlx"
"github.com/openziti/zrok/controller/store"
"github.com/openziti/zrok/controller/zrokEdgeSdk"
"github.com/sirupsen/logrus"
)
type shareLimitAction struct {
str *store.Store
zCfg *zrokEdgeSdk.Config
}
func newShareLimitAction(str *store.Store, zCfg *zrokEdgeSdk.Config) *shareLimitAction {
return &shareLimitAction{str, zCfg}
}
func (a *shareLimitAction) HandleShare(shr *store.Share, _, _ int64, _ *BandwidthPerPeriod, trx *sqlx.Tx) error {
logrus.Infof("limiting '%v'", shr.Token)
env, err := a.str.GetEnvironment(shr.EnvironmentId, trx)
if err != nil {
return err
}
edge, err := zrokEdgeSdk.Client(a.zCfg)
if err != nil {
return err
}
if err := zrokEdgeSdk.DeleteServicePoliciesDial(env.ZId, shr.Token, edge); err != nil {
return err
}
logrus.Infof("removed dial service policy for '%v'", shr.Token)
return nil
}

View File

@ -1,89 +0,0 @@
package limits
import (
"github.com/jmoiron/sqlx"
"github.com/openziti/edge-api/rest_management_api_client"
"github.com/openziti/zrok/controller/store"
"github.com/openziti/zrok/controller/zrokEdgeSdk"
"github.com/openziti/zrok/sdk/golang/sdk"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type shareRelaxAction struct {
str *store.Store
zCfg *zrokEdgeSdk.Config
}
func newShareRelaxAction(str *store.Store, zCfg *zrokEdgeSdk.Config) *shareRelaxAction {
return &shareRelaxAction{str, zCfg}
}
func (a *shareRelaxAction) HandleShare(shr *store.Share, _, _ int64, _ *BandwidthPerPeriod, trx *sqlx.Tx) error {
logrus.Infof("relaxing '%v'", shr.Token)
if !shr.Deleted {
edge, err := zrokEdgeSdk.Client(a.zCfg)
if err != nil {
return err
}
switch shr.ShareMode {
case string(sdk.PublicShareMode):
if err := relaxPublicShare(a.str, edge, shr, trx); err != nil {
return err
}
case string(sdk.PrivateShareMode):
if err := relaxPrivateShare(a.str, edge, shr, trx); err != nil {
return err
}
}
}
return nil
}
func relaxPublicShare(str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement, shr *store.Share, trx *sqlx.Tx) error {
env, err := str.GetEnvironment(shr.EnvironmentId, trx)
if err != nil {
return errors.Wrap(err, "error finding environment")
}
fe, err := str.FindFrontendPubliclyNamed(*shr.FrontendSelection, trx)
if err != nil {
return errors.Wrapf(err, "error finding frontend name '%v' for '%v'", *shr.FrontendSelection, shr.Token)
}
if err := zrokEdgeSdk.CreateServicePolicyDial(env.ZId+"-"+shr.ZId+"-dial", shr.ZId, []string{fe.ZId}, zrokEdgeSdk.ZrokShareTags(shr.Token).SubTags, edge); err != nil {
return errors.Wrapf(err, "error creating dial service policy for '%v'", shr.Token)
}
logrus.Infof("added dial service policy for '%v'", shr.Token)
return nil
}
func relaxPrivateShare(str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement, shr *store.Share, trx *sqlx.Tx) error {
fes, err := str.FindFrontendsForPrivateShare(shr.Id, trx)
if err != nil {
return errors.Wrapf(err, "error finding frontends for share '%v'", shr.Token)
}
for _, fe := range fes {
if fe.EnvironmentId != nil {
env, err := str.GetEnvironment(*fe.EnvironmentId, trx)
if err != nil {
return errors.Wrapf(err, "error getting environment for frontend '%v'", fe.Token)
}
addlTags := map[string]interface{}{
"zrokEnvironmentZId": env.ZId,
"zrokFrontendToken": fe.Token,
"zrokShareToken": shr.Token,
}
if err := zrokEdgeSdk.CreateServicePolicyDial(fe.Token+"-"+env.ZId+"-"+shr.ZId+"-dial", shr.ZId, []string{env.ZId}, addlTags, edge); err != nil {
return errors.Wrapf(err, "unable to create dial policy for frontend '%v'", fe.Token)
}
logrus.Infof("added dial service policy for share '%v' to private frontend '%v'", shr.Token, fe.Token)
}
}
return nil
}

View File

@ -1,63 +0,0 @@
package limits
import (
"github.com/jmoiron/sqlx"
"github.com/openziti/zrok/controller/emailUi"
"github.com/openziti/zrok/controller/store"
"github.com/openziti/zrok/util"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type shareWarningAction struct {
str *store.Store
cfg *emailUi.Config
}
func newShareWarningAction(cfg *emailUi.Config, str *store.Store) *shareWarningAction {
return &shareWarningAction{str, cfg}
}
func (a *shareWarningAction) HandleShare(shr *store.Share, rxBytes, txBytes int64, limit *BandwidthPerPeriod, trx *sqlx.Tx) error {
logrus.Infof("warning '%v'", shr.Token)
if a.cfg != nil {
env, err := a.str.GetEnvironment(shr.EnvironmentId, trx)
if err != nil {
return err
}
if env.AccountId != nil {
acct, err := a.str.GetAccount(*env.AccountId, trx)
if err != nil {
return err
}
rxLimit := "unlimited bytes"
if limit.Limit.Rx != Unlimited {
rxLimit = util.BytesToSize(limit.Limit.Rx)
}
txLimit := "unlimited bytes"
if limit.Limit.Tx != Unlimited {
txLimit = util.BytesToSize(limit.Limit.Tx)
}
totalLimit := "unlimited bytes"
if limit.Limit.Total != Unlimited {
totalLimit = util.BytesToSize(limit.Limit.Total)
}
detail := newDetailMessage()
detail = detail.append("Your share '%v' has received %v and sent %v (for a total of %v), which has triggered a transfer limit warning.", shr.Token, util.BytesToSize(rxBytes), util.BytesToSize(txBytes), util.BytesToSize(rxBytes+txBytes))
detail = detail.append("This zrok instance only allows a share to receive %v, send %v, totalling not more than %v for each %v.", rxLimit, txLimit, totalLimit, limit.Period)
detail = detail.append("If you exceed the transfer limit, access to your shares will be temporarily disabled (until the last %v falls below the transfer limit).", limit.Period)
if err := sendLimitWarningEmail(a.cfg, acct.Email, detail); err != nil {
return errors.Wrapf(err, "error sending limit warning email to '%v'", acct.Email)
}
}
} else {
logrus.Warnf("skipping warning email for share limit; no email configuration specified")
}
return nil
}