From ec48d574fda7533eeebdcf278d381aa32c0464b0 Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Mon, 5 Jun 2023 16:01:04 -0400 Subject: [PATCH] allocate an edge management session when it's needed, rather than trying to be efficient and keep one around --- controller/limits/accountLimitAction.go | 16 ++++++++------ controller/limits/accountRelaxAction.go | 17 +++++++++------ controller/limits/accountWarningAction.go | 12 +++++------ controller/limits/agent.go | 21 +++++++------------ controller/limits/environmentLimitAction.go | 14 ++++++++----- controller/limits/environmentRelaxAction.go | 17 +++++++++------ controller/limits/environmentWarningAction.go | 10 ++++----- controller/limits/shareLimitAction.go | 14 ++++++++----- controller/limits/shareRelaxAction.go | 15 ++++++++----- controller/limits/shareWarningAction.go | 10 ++++----- 10 files changed, 81 insertions(+), 65 deletions(-) diff --git a/controller/limits/accountLimitAction.go b/controller/limits/accountLimitAction.go index 457c6cc5..fe3561bd 100644 --- a/controller/limits/accountLimitAction.go +++ b/controller/limits/accountLimitAction.go @@ -2,7 +2,6 @@ package limits import ( "github.com/jmoiron/sqlx" - "github.com/openziti/edge-api/rest_management_api_client" "github.com/openziti/zrok/controller/store" "github.com/openziti/zrok/controller/zrokEdgeSdk" "github.com/pkg/errors" @@ -11,14 +10,14 @@ import ( type accountLimitAction struct { str *store.Store - edge *rest_management_api_client.ZitiEdgeManagement + zCfg *zrokEdgeSdk.Config } -func newAccountLimitAction(str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement) *accountLimitAction { - return &accountLimitAction{str, edge} +func newAccountLimitAction(str *store.Store, zCfg *zrokEdgeSdk.Config) *accountLimitAction { + return &accountLimitAction{str, zCfg} } -func (a *accountLimitAction) HandleAccount(acct *store.Account, rxBytes, txBytes int64, limit *BandwidthPerPeriod, trx *sqlx.Tx) error { +func (a *accountLimitAction) HandleAccount(acct *store.Account, _, _ int64, _ *BandwidthPerPeriod, trx *sqlx.Tx) error { logrus.Infof("limiting '%v'", acct.Email) envs, err := a.str.FindEnvironmentsForAccount(acct.Id, trx) @@ -26,6 +25,11 @@ func (a *accountLimitAction) HandleAccount(acct *store.Account, rxBytes, txBytes return errors.Wrapf(err, "error finding environments for account '%v'", acct.Email) } + edge, err := zrokEdgeSdk.Client(a.zCfg) + if err != nil { + return err + } + for _, env := range envs { shrs, err := a.str.FindSharesForEnvironment(env.Id, trx) if err != nil { @@ -33,7 +37,7 @@ func (a *accountLimitAction) HandleAccount(acct *store.Account, rxBytes, txBytes } for _, shr := range shrs { - if err := zrokEdgeSdk.DeleteServicePoliciesDial(env.ZId, shr.Token, a.edge); err != nil { + if err := zrokEdgeSdk.DeleteServicePoliciesDial(env.ZId, shr.Token, edge); err != nil { return errors.Wrapf(err, "error deleting dial service policy for '%v'", shr.Token) } logrus.Infof("removed dial service policy for share '%v' of environment '%v'", shr.Token, env.ZId) diff --git a/controller/limits/accountRelaxAction.go b/controller/limits/accountRelaxAction.go index ba961b32..82e66476 100644 --- a/controller/limits/accountRelaxAction.go +++ b/controller/limits/accountRelaxAction.go @@ -2,19 +2,19 @@ package limits import ( "github.com/jmoiron/sqlx" - "github.com/openziti/edge-api/rest_management_api_client" "github.com/openziti/zrok/controller/store" + "github.com/openziti/zrok/controller/zrokEdgeSdk" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) type accountRelaxAction struct { str *store.Store - edge *rest_management_api_client.ZitiEdgeManagement + zCfg *zrokEdgeSdk.Config } -func newAccountRelaxAction(str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement) *accountRelaxAction { - return &accountRelaxAction{str, edge} +func newAccountRelaxAction(str *store.Store, zCfg *zrokEdgeSdk.Config) *accountRelaxAction { + return &accountRelaxAction{str, zCfg} } func (a *accountRelaxAction) HandleAccount(acct *store.Account, _, _ int64, _ *BandwidthPerPeriod, trx *sqlx.Tx) error { @@ -25,6 +25,11 @@ func (a *accountRelaxAction) HandleAccount(acct *store.Account, _, _ int64, _ *B return errors.Wrapf(err, "error finding environments for account '%v'", acct.Email) } + edge, err := zrokEdgeSdk.Client(a.zCfg) + if err != nil { + return err + } + for _, env := range envs { shrs, err := a.str.FindSharesForEnvironment(env.Id, trx) if err != nil { @@ -34,11 +39,11 @@ func (a *accountRelaxAction) HandleAccount(acct *store.Account, _, _ int64, _ *B for _, shr := range shrs { switch shr.ShareMode { case "public": - if err := relaxPublicShare(a.str, a.edge, shr, trx); err != nil { + if err := relaxPublicShare(a.str, edge, shr, trx); err != nil { return errors.Wrap(err, "error relaxing public share") } case "private": - if err := relaxPrivateShare(a.str, a.edge, shr, trx); err != nil { + if err := relaxPrivateShare(a.str, edge, shr, trx); err != nil { return errors.Wrap(err, "error relaxing private share") } } diff --git a/controller/limits/accountWarningAction.go b/controller/limits/accountWarningAction.go index 8700f810..4b2610bf 100644 --- a/controller/limits/accountWarningAction.go +++ b/controller/limits/accountWarningAction.go @@ -2,7 +2,6 @@ package limits import ( "github.com/jmoiron/sqlx" - "github.com/openziti/edge-api/rest_management_api_client" "github.com/openziti/zrok/controller/emailUi" "github.com/openziti/zrok/controller/store" "github.com/openziti/zrok/util" @@ -11,16 +10,15 @@ import ( ) type accountWarningAction struct { - str *store.Store - edge *rest_management_api_client.ZitiEdgeManagement - cfg *emailUi.Config + str *store.Store + cfg *emailUi.Config } -func newAccountWarningAction(cfg *emailUi.Config, str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement) *accountWarningAction { - return &accountWarningAction{str, edge, cfg} +func newAccountWarningAction(cfg *emailUi.Config, str *store.Store) *accountWarningAction { + return &accountWarningAction{str, cfg} } -func (a *accountWarningAction) HandleAccount(acct *store.Account, rxBytes, txBytes int64, limit *BandwidthPerPeriod, trx *sqlx.Tx) error { +func (a *accountWarningAction) HandleAccount(acct *store.Account, rxBytes, txBytes int64, limit *BandwidthPerPeriod, _ *sqlx.Tx) error { logrus.Infof("warning '%v'", acct.Email) if a.cfg != nil { diff --git a/controller/limits/agent.go b/controller/limits/agent.go index 1f05fd18..a9c0c259 100644 --- a/controller/limits/agent.go +++ b/controller/limits/agent.go @@ -34,25 +34,20 @@ type Agent struct { } func NewAgent(cfg *Config, ifxCfg *metrics.InfluxConfig, zCfg *zrokEdgeSdk.Config, emailCfg *emailUi.Config, str *store.Store) (*Agent, error) { - edge, err := zrokEdgeSdk.Client(zCfg) - if err != nil { - return nil, err - } a := &Agent{ cfg: cfg, ifx: newInfluxReader(ifxCfg), zCfg: zCfg, str: str, queue: make(chan *metrics.Usage, 1024), - acctWarningActions: []AccountAction{newAccountWarningAction(emailCfg, str, edge)}, - acctLimitActions: []AccountAction{newAccountLimitAction(str, edge)}, - acctRelaxActions: []AccountAction{newAccountRelaxAction(str, edge)}, - envWarningActions: []EnvironmentAction{newEnvironmentWarningAction(emailCfg, str, edge)}, - envLimitActions: []EnvironmentAction{newEnvironmentLimitAction(str, edge)}, - envRelaxActions: []EnvironmentAction{newEnvironmentRelaxAction(str, edge)}, - shrWarningActions: []ShareAction{newShareWarningAction(emailCfg, str, edge)}, - shrLimitActions: []ShareAction{newShareLimitAction(str, edge)}, - shrRelaxActions: []ShareAction{newShareRelaxAction(str, edge)}, + acctWarningActions: []AccountAction{newAccountWarningAction(emailCfg, str)}, + acctRelaxActions: []AccountAction{newAccountRelaxAction(str, zCfg)}, + envWarningActions: []EnvironmentAction{newEnvironmentWarningAction(emailCfg, str)}, + envLimitActions: []EnvironmentAction{newEnvironmentLimitAction(str, zCfg)}, + envRelaxActions: []EnvironmentAction{newEnvironmentRelaxAction(str, zCfg)}, + shrWarningActions: []ShareAction{newShareWarningAction(emailCfg, str)}, + shrLimitActions: []ShareAction{newShareLimitAction(str, zCfg)}, + shrRelaxActions: []ShareAction{newShareRelaxAction(str, zCfg)}, close: make(chan struct{}), join: make(chan struct{}), } diff --git a/controller/limits/environmentLimitAction.go b/controller/limits/environmentLimitAction.go index f51da98b..31f10cec 100644 --- a/controller/limits/environmentLimitAction.go +++ b/controller/limits/environmentLimitAction.go @@ -2,7 +2,6 @@ package limits import ( "github.com/jmoiron/sqlx" - "github.com/openziti/edge-api/rest_management_api_client" "github.com/openziti/zrok/controller/store" "github.com/openziti/zrok/controller/zrokEdgeSdk" "github.com/pkg/errors" @@ -11,11 +10,11 @@ import ( type environmentLimitAction struct { str *store.Store - edge *rest_management_api_client.ZitiEdgeManagement + zCfg *zrokEdgeSdk.Config } -func newEnvironmentLimitAction(str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement) *environmentLimitAction { - return &environmentLimitAction{str, edge} +func newEnvironmentLimitAction(str *store.Store, zCfg *zrokEdgeSdk.Config) *environmentLimitAction { + return &environmentLimitAction{str, zCfg} } func (a *environmentLimitAction) HandleEnvironment(env *store.Environment, _, _ int64, _ *BandwidthPerPeriod, trx *sqlx.Tx) error { @@ -26,8 +25,13 @@ func (a *environmentLimitAction) HandleEnvironment(env *store.Environment, _, _ return errors.Wrapf(err, "error finding shares for environment '%v'", env.ZId) } + edge, err := zrokEdgeSdk.Client(a.zCfg) + if err != nil { + return err + } + for _, shr := range shrs { - if err := zrokEdgeSdk.DeleteServicePoliciesDial(env.ZId, shr.Token, a.edge); err != nil { + if err := zrokEdgeSdk.DeleteServicePoliciesDial(env.ZId, shr.Token, edge); err != nil { return errors.Wrapf(err, "error deleting dial service policy for '%v'", shr.Token) } logrus.Infof("removed dial service policy for share '%v' of environment '%v'", shr.Token, env.ZId) diff --git a/controller/limits/environmentRelaxAction.go b/controller/limits/environmentRelaxAction.go index 31ae5cf9..19d8a115 100644 --- a/controller/limits/environmentRelaxAction.go +++ b/controller/limits/environmentRelaxAction.go @@ -2,19 +2,19 @@ package limits import ( "github.com/jmoiron/sqlx" - "github.com/openziti/edge-api/rest_management_api_client" "github.com/openziti/zrok/controller/store" + "github.com/openziti/zrok/controller/zrokEdgeSdk" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) type environmentRelaxAction struct { str *store.Store - edge *rest_management_api_client.ZitiEdgeManagement + zCfg *zrokEdgeSdk.Config } -func newEnvironmentRelaxAction(str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement) *environmentRelaxAction { - return &environmentRelaxAction{str, edge} +func newEnvironmentRelaxAction(str *store.Store, zCfg *zrokEdgeSdk.Config) *environmentRelaxAction { + return &environmentRelaxAction{str, zCfg} } func (a *environmentRelaxAction) HandleEnvironment(env *store.Environment, rxBytes, txBytes int64, limit *BandwidthPerPeriod, trx *sqlx.Tx) error { @@ -25,15 +25,20 @@ func (a *environmentRelaxAction) HandleEnvironment(env *store.Environment, rxByt return errors.Wrapf(err, "error finding shares for environment '%v'", env.ZId) } + edge, err := zrokEdgeSdk.Client(a.zCfg) + if err != nil { + return err + } + for _, shr := range shrs { if !shr.Deleted { switch shr.ShareMode { case "public": - if err := relaxPublicShare(a.str, a.edge, shr, trx); err != nil { + if err := relaxPublicShare(a.str, edge, shr, trx); err != nil { return err } case "private": - if err := relaxPrivateShare(a.str, a.edge, shr, trx); err != nil { + if err := relaxPrivateShare(a.str, edge, shr, trx); err != nil { return err } } diff --git a/controller/limits/environmentWarningAction.go b/controller/limits/environmentWarningAction.go index 8a74e31d..2d5b04a1 100644 --- a/controller/limits/environmentWarningAction.go +++ b/controller/limits/environmentWarningAction.go @@ -2,7 +2,6 @@ package limits import ( "github.com/jmoiron/sqlx" - "github.com/openziti/edge-api/rest_management_api_client" "github.com/openziti/zrok/controller/emailUi" "github.com/openziti/zrok/controller/store" "github.com/openziti/zrok/util" @@ -11,13 +10,12 @@ import ( ) type environmentWarningAction struct { - str *store.Store - edge *rest_management_api_client.ZitiEdgeManagement - cfg *emailUi.Config + str *store.Store + cfg *emailUi.Config } -func newEnvironmentWarningAction(cfg *emailUi.Config, str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement) *environmentWarningAction { - return &environmentWarningAction{str, edge, cfg} +func newEnvironmentWarningAction(cfg *emailUi.Config, str *store.Store) *environmentWarningAction { + return &environmentWarningAction{str, cfg} } func (a *environmentWarningAction) HandleEnvironment(env *store.Environment, rxBytes, txBytes int64, limit *BandwidthPerPeriod, trx *sqlx.Tx) error { diff --git a/controller/limits/shareLimitAction.go b/controller/limits/shareLimitAction.go index d068b48d..8a4fc8b9 100644 --- a/controller/limits/shareLimitAction.go +++ b/controller/limits/shareLimitAction.go @@ -2,7 +2,6 @@ package limits import ( "github.com/jmoiron/sqlx" - "github.com/openziti/edge-api/rest_management_api_client" "github.com/openziti/zrok/controller/store" "github.com/openziti/zrok/controller/zrokEdgeSdk" "github.com/sirupsen/logrus" @@ -10,11 +9,11 @@ import ( type shareLimitAction struct { str *store.Store - edge *rest_management_api_client.ZitiEdgeManagement + zCfg *zrokEdgeSdk.Config } -func newShareLimitAction(str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement) *shareLimitAction { - return &shareLimitAction{str, edge} +func newShareLimitAction(str *store.Store, zCfg *zrokEdgeSdk.Config) *shareLimitAction { + return &shareLimitAction{str, zCfg} } func (a *shareLimitAction) HandleShare(shr *store.Share, _, _ int64, _ *BandwidthPerPeriod, trx *sqlx.Tx) error { @@ -25,7 +24,12 @@ func (a *shareLimitAction) HandleShare(shr *store.Share, _, _ int64, _ *Bandwidt return err } - if err := zrokEdgeSdk.DeleteServicePoliciesDial(env.ZId, shr.Token, a.edge); err != nil { + edge, err := zrokEdgeSdk.Client(a.zCfg) + if err != nil { + return err + } + + if err := zrokEdgeSdk.DeleteServicePoliciesDial(env.ZId, shr.Token, edge); err != nil { return err } logrus.Infof("removed dial service policy for '%v'", shr.Token) diff --git a/controller/limits/shareRelaxAction.go b/controller/limits/shareRelaxAction.go index 3f757fcf..55e0a2c9 100644 --- a/controller/limits/shareRelaxAction.go +++ b/controller/limits/shareRelaxAction.go @@ -11,24 +11,29 @@ import ( type shareRelaxAction struct { str *store.Store - edge *rest_management_api_client.ZitiEdgeManagement + zCfg *zrokEdgeSdk.Config } -func newShareRelaxAction(str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement) *shareRelaxAction { - return &shareRelaxAction{str, edge} +func newShareRelaxAction(str *store.Store, zCfg *zrokEdgeSdk.Config) *shareRelaxAction { + return &shareRelaxAction{str, zCfg} } func (a *shareRelaxAction) HandleShare(shr *store.Share, _, _ int64, _ *BandwidthPerPeriod, trx *sqlx.Tx) error { logrus.Infof("relaxing '%v'", shr.Token) if !shr.Deleted { + edge, err := zrokEdgeSdk.Client(a.zCfg) + if err != nil { + return err + } + switch shr.ShareMode { case "public": - if err := relaxPublicShare(a.str, a.edge, shr, trx); err != nil { + if err := relaxPublicShare(a.str, edge, shr, trx); err != nil { return err } case "private": - if err := relaxPrivateShare(a.str, a.edge, shr, trx); err != nil { + if err := relaxPrivateShare(a.str, edge, shr, trx); err != nil { return err } } diff --git a/controller/limits/shareWarningAction.go b/controller/limits/shareWarningAction.go index 8a860805..764b6d3f 100644 --- a/controller/limits/shareWarningAction.go +++ b/controller/limits/shareWarningAction.go @@ -2,7 +2,6 @@ package limits import ( "github.com/jmoiron/sqlx" - "github.com/openziti/edge-api/rest_management_api_client" "github.com/openziti/zrok/controller/emailUi" "github.com/openziti/zrok/controller/store" "github.com/openziti/zrok/util" @@ -11,13 +10,12 @@ import ( ) type shareWarningAction struct { - str *store.Store - edge *rest_management_api_client.ZitiEdgeManagement - cfg *emailUi.Config + str *store.Store + cfg *emailUi.Config } -func newShareWarningAction(cfg *emailUi.Config, str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement) *shareWarningAction { - return &shareWarningAction{str, edge, cfg} +func newShareWarningAction(cfg *emailUi.Config, str *store.Store) *shareWarningAction { + return &shareWarningAction{str, cfg} } func (a *shareWarningAction) HandleShare(shr *store.Share, rxBytes, txBytes int64, limit *BandwidthPerPeriod, trx *sqlx.Tx) error {