allocate an edge management session when it's needed, rather than trying to be efficient and keep one around

This commit is contained in:
Michael Quigley 2023-06-05 16:01:04 -04:00
parent a8c652aef8
commit ec48d574fd
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
10 changed files with 81 additions and 65 deletions

View File

@ -2,7 +2,6 @@ package limits
import (
"github.com/jmoiron/sqlx"
"github.com/openziti/edge-api/rest_management_api_client"
"github.com/openziti/zrok/controller/store"
"github.com/openziti/zrok/controller/zrokEdgeSdk"
"github.com/pkg/errors"
@ -11,14 +10,14 @@ import (
type accountLimitAction struct {
str *store.Store
edge *rest_management_api_client.ZitiEdgeManagement
zCfg *zrokEdgeSdk.Config
}
func newAccountLimitAction(str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement) *accountLimitAction {
return &accountLimitAction{str, edge}
func newAccountLimitAction(str *store.Store, zCfg *zrokEdgeSdk.Config) *accountLimitAction {
return &accountLimitAction{str, zCfg}
}
func (a *accountLimitAction) HandleAccount(acct *store.Account, rxBytes, txBytes int64, limit *BandwidthPerPeriod, trx *sqlx.Tx) error {
func (a *accountLimitAction) HandleAccount(acct *store.Account, _, _ int64, _ *BandwidthPerPeriod, trx *sqlx.Tx) error {
logrus.Infof("limiting '%v'", acct.Email)
envs, err := a.str.FindEnvironmentsForAccount(acct.Id, trx)
@ -26,6 +25,11 @@ func (a *accountLimitAction) HandleAccount(acct *store.Account, rxBytes, txBytes
return errors.Wrapf(err, "error finding environments for account '%v'", acct.Email)
}
edge, err := zrokEdgeSdk.Client(a.zCfg)
if err != nil {
return err
}
for _, env := range envs {
shrs, err := a.str.FindSharesForEnvironment(env.Id, trx)
if err != nil {
@ -33,7 +37,7 @@ func (a *accountLimitAction) HandleAccount(acct *store.Account, rxBytes, txBytes
}
for _, shr := range shrs {
if err := zrokEdgeSdk.DeleteServicePoliciesDial(env.ZId, shr.Token, a.edge); err != nil {
if err := zrokEdgeSdk.DeleteServicePoliciesDial(env.ZId, shr.Token, edge); err != nil {
return errors.Wrapf(err, "error deleting dial service policy for '%v'", shr.Token)
}
logrus.Infof("removed dial service policy for share '%v' of environment '%v'", shr.Token, env.ZId)

View File

@ -2,19 +2,19 @@ package limits
import (
"github.com/jmoiron/sqlx"
"github.com/openziti/edge-api/rest_management_api_client"
"github.com/openziti/zrok/controller/store"
"github.com/openziti/zrok/controller/zrokEdgeSdk"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type accountRelaxAction struct {
str *store.Store
edge *rest_management_api_client.ZitiEdgeManagement
zCfg *zrokEdgeSdk.Config
}
func newAccountRelaxAction(str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement) *accountRelaxAction {
return &accountRelaxAction{str, edge}
func newAccountRelaxAction(str *store.Store, zCfg *zrokEdgeSdk.Config) *accountRelaxAction {
return &accountRelaxAction{str, zCfg}
}
func (a *accountRelaxAction) HandleAccount(acct *store.Account, _, _ int64, _ *BandwidthPerPeriod, trx *sqlx.Tx) error {
@ -25,6 +25,11 @@ func (a *accountRelaxAction) HandleAccount(acct *store.Account, _, _ int64, _ *B
return errors.Wrapf(err, "error finding environments for account '%v'", acct.Email)
}
edge, err := zrokEdgeSdk.Client(a.zCfg)
if err != nil {
return err
}
for _, env := range envs {
shrs, err := a.str.FindSharesForEnvironment(env.Id, trx)
if err != nil {
@ -34,11 +39,11 @@ func (a *accountRelaxAction) HandleAccount(acct *store.Account, _, _ int64, _ *B
for _, shr := range shrs {
switch shr.ShareMode {
case "public":
if err := relaxPublicShare(a.str, a.edge, shr, trx); err != nil {
if err := relaxPublicShare(a.str, edge, shr, trx); err != nil {
return errors.Wrap(err, "error relaxing public share")
}
case "private":
if err := relaxPrivateShare(a.str, a.edge, shr, trx); err != nil {
if err := relaxPrivateShare(a.str, edge, shr, trx); err != nil {
return errors.Wrap(err, "error relaxing private share")
}
}

View File

@ -2,7 +2,6 @@ package limits
import (
"github.com/jmoiron/sqlx"
"github.com/openziti/edge-api/rest_management_api_client"
"github.com/openziti/zrok/controller/emailUi"
"github.com/openziti/zrok/controller/store"
"github.com/openziti/zrok/util"
@ -11,16 +10,15 @@ import (
)
type accountWarningAction struct {
str *store.Store
edge *rest_management_api_client.ZitiEdgeManagement
cfg *emailUi.Config
str *store.Store
cfg *emailUi.Config
}
func newAccountWarningAction(cfg *emailUi.Config, str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement) *accountWarningAction {
return &accountWarningAction{str, edge, cfg}
func newAccountWarningAction(cfg *emailUi.Config, str *store.Store) *accountWarningAction {
return &accountWarningAction{str, cfg}
}
func (a *accountWarningAction) HandleAccount(acct *store.Account, rxBytes, txBytes int64, limit *BandwidthPerPeriod, trx *sqlx.Tx) error {
func (a *accountWarningAction) HandleAccount(acct *store.Account, rxBytes, txBytes int64, limit *BandwidthPerPeriod, _ *sqlx.Tx) error {
logrus.Infof("warning '%v'", acct.Email)
if a.cfg != nil {

View File

@ -34,25 +34,20 @@ type Agent struct {
}
func NewAgent(cfg *Config, ifxCfg *metrics.InfluxConfig, zCfg *zrokEdgeSdk.Config, emailCfg *emailUi.Config, str *store.Store) (*Agent, error) {
edge, err := zrokEdgeSdk.Client(zCfg)
if err != nil {
return nil, err
}
a := &Agent{
cfg: cfg,
ifx: newInfluxReader(ifxCfg),
zCfg: zCfg,
str: str,
queue: make(chan *metrics.Usage, 1024),
acctWarningActions: []AccountAction{newAccountWarningAction(emailCfg, str, edge)},
acctLimitActions: []AccountAction{newAccountLimitAction(str, edge)},
acctRelaxActions: []AccountAction{newAccountRelaxAction(str, edge)},
envWarningActions: []EnvironmentAction{newEnvironmentWarningAction(emailCfg, str, edge)},
envLimitActions: []EnvironmentAction{newEnvironmentLimitAction(str, edge)},
envRelaxActions: []EnvironmentAction{newEnvironmentRelaxAction(str, edge)},
shrWarningActions: []ShareAction{newShareWarningAction(emailCfg, str, edge)},
shrLimitActions: []ShareAction{newShareLimitAction(str, edge)},
shrRelaxActions: []ShareAction{newShareRelaxAction(str, edge)},
acctWarningActions: []AccountAction{newAccountWarningAction(emailCfg, str)},
acctRelaxActions: []AccountAction{newAccountRelaxAction(str, zCfg)},
envWarningActions: []EnvironmentAction{newEnvironmentWarningAction(emailCfg, str)},
envLimitActions: []EnvironmentAction{newEnvironmentLimitAction(str, zCfg)},
envRelaxActions: []EnvironmentAction{newEnvironmentRelaxAction(str, zCfg)},
shrWarningActions: []ShareAction{newShareWarningAction(emailCfg, str)},
shrLimitActions: []ShareAction{newShareLimitAction(str, zCfg)},
shrRelaxActions: []ShareAction{newShareRelaxAction(str, zCfg)},
close: make(chan struct{}),
join: make(chan struct{}),
}

View File

@ -2,7 +2,6 @@ package limits
import (
"github.com/jmoiron/sqlx"
"github.com/openziti/edge-api/rest_management_api_client"
"github.com/openziti/zrok/controller/store"
"github.com/openziti/zrok/controller/zrokEdgeSdk"
"github.com/pkg/errors"
@ -11,11 +10,11 @@ import (
type environmentLimitAction struct {
str *store.Store
edge *rest_management_api_client.ZitiEdgeManagement
zCfg *zrokEdgeSdk.Config
}
func newEnvironmentLimitAction(str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement) *environmentLimitAction {
return &environmentLimitAction{str, edge}
func newEnvironmentLimitAction(str *store.Store, zCfg *zrokEdgeSdk.Config) *environmentLimitAction {
return &environmentLimitAction{str, zCfg}
}
func (a *environmentLimitAction) HandleEnvironment(env *store.Environment, _, _ int64, _ *BandwidthPerPeriod, trx *sqlx.Tx) error {
@ -26,8 +25,13 @@ func (a *environmentLimitAction) HandleEnvironment(env *store.Environment, _, _
return errors.Wrapf(err, "error finding shares for environment '%v'", env.ZId)
}
edge, err := zrokEdgeSdk.Client(a.zCfg)
if err != nil {
return err
}
for _, shr := range shrs {
if err := zrokEdgeSdk.DeleteServicePoliciesDial(env.ZId, shr.Token, a.edge); err != nil {
if err := zrokEdgeSdk.DeleteServicePoliciesDial(env.ZId, shr.Token, edge); err != nil {
return errors.Wrapf(err, "error deleting dial service policy for '%v'", shr.Token)
}
logrus.Infof("removed dial service policy for share '%v' of environment '%v'", shr.Token, env.ZId)

View File

@ -2,19 +2,19 @@ package limits
import (
"github.com/jmoiron/sqlx"
"github.com/openziti/edge-api/rest_management_api_client"
"github.com/openziti/zrok/controller/store"
"github.com/openziti/zrok/controller/zrokEdgeSdk"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type environmentRelaxAction struct {
str *store.Store
edge *rest_management_api_client.ZitiEdgeManagement
zCfg *zrokEdgeSdk.Config
}
func newEnvironmentRelaxAction(str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement) *environmentRelaxAction {
return &environmentRelaxAction{str, edge}
func newEnvironmentRelaxAction(str *store.Store, zCfg *zrokEdgeSdk.Config) *environmentRelaxAction {
return &environmentRelaxAction{str, zCfg}
}
func (a *environmentRelaxAction) HandleEnvironment(env *store.Environment, rxBytes, txBytes int64, limit *BandwidthPerPeriod, trx *sqlx.Tx) error {
@ -25,15 +25,20 @@ func (a *environmentRelaxAction) HandleEnvironment(env *store.Environment, rxByt
return errors.Wrapf(err, "error finding shares for environment '%v'", env.ZId)
}
edge, err := zrokEdgeSdk.Client(a.zCfg)
if err != nil {
return err
}
for _, shr := range shrs {
if !shr.Deleted {
switch shr.ShareMode {
case "public":
if err := relaxPublicShare(a.str, a.edge, shr, trx); err != nil {
if err := relaxPublicShare(a.str, edge, shr, trx); err != nil {
return err
}
case "private":
if err := relaxPrivateShare(a.str, a.edge, shr, trx); err != nil {
if err := relaxPrivateShare(a.str, edge, shr, trx); err != nil {
return err
}
}

View File

@ -2,7 +2,6 @@ package limits
import (
"github.com/jmoiron/sqlx"
"github.com/openziti/edge-api/rest_management_api_client"
"github.com/openziti/zrok/controller/emailUi"
"github.com/openziti/zrok/controller/store"
"github.com/openziti/zrok/util"
@ -11,13 +10,12 @@ import (
)
type environmentWarningAction struct {
str *store.Store
edge *rest_management_api_client.ZitiEdgeManagement
cfg *emailUi.Config
str *store.Store
cfg *emailUi.Config
}
func newEnvironmentWarningAction(cfg *emailUi.Config, str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement) *environmentWarningAction {
return &environmentWarningAction{str, edge, cfg}
func newEnvironmentWarningAction(cfg *emailUi.Config, str *store.Store) *environmentWarningAction {
return &environmentWarningAction{str, cfg}
}
func (a *environmentWarningAction) HandleEnvironment(env *store.Environment, rxBytes, txBytes int64, limit *BandwidthPerPeriod, trx *sqlx.Tx) error {

View File

@ -2,7 +2,6 @@ package limits
import (
"github.com/jmoiron/sqlx"
"github.com/openziti/edge-api/rest_management_api_client"
"github.com/openziti/zrok/controller/store"
"github.com/openziti/zrok/controller/zrokEdgeSdk"
"github.com/sirupsen/logrus"
@ -10,11 +9,11 @@ import (
type shareLimitAction struct {
str *store.Store
edge *rest_management_api_client.ZitiEdgeManagement
zCfg *zrokEdgeSdk.Config
}
func newShareLimitAction(str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement) *shareLimitAction {
return &shareLimitAction{str, edge}
func newShareLimitAction(str *store.Store, zCfg *zrokEdgeSdk.Config) *shareLimitAction {
return &shareLimitAction{str, zCfg}
}
func (a *shareLimitAction) HandleShare(shr *store.Share, _, _ int64, _ *BandwidthPerPeriod, trx *sqlx.Tx) error {
@ -25,7 +24,12 @@ func (a *shareLimitAction) HandleShare(shr *store.Share, _, _ int64, _ *Bandwidt
return err
}
if err := zrokEdgeSdk.DeleteServicePoliciesDial(env.ZId, shr.Token, a.edge); err != nil {
edge, err := zrokEdgeSdk.Client(a.zCfg)
if err != nil {
return err
}
if err := zrokEdgeSdk.DeleteServicePoliciesDial(env.ZId, shr.Token, edge); err != nil {
return err
}
logrus.Infof("removed dial service policy for '%v'", shr.Token)

View File

@ -11,24 +11,29 @@ import (
type shareRelaxAction struct {
str *store.Store
edge *rest_management_api_client.ZitiEdgeManagement
zCfg *zrokEdgeSdk.Config
}
func newShareRelaxAction(str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement) *shareRelaxAction {
return &shareRelaxAction{str, edge}
func newShareRelaxAction(str *store.Store, zCfg *zrokEdgeSdk.Config) *shareRelaxAction {
return &shareRelaxAction{str, zCfg}
}
func (a *shareRelaxAction) HandleShare(shr *store.Share, _, _ int64, _ *BandwidthPerPeriod, trx *sqlx.Tx) error {
logrus.Infof("relaxing '%v'", shr.Token)
if !shr.Deleted {
edge, err := zrokEdgeSdk.Client(a.zCfg)
if err != nil {
return err
}
switch shr.ShareMode {
case "public":
if err := relaxPublicShare(a.str, a.edge, shr, trx); err != nil {
if err := relaxPublicShare(a.str, edge, shr, trx); err != nil {
return err
}
case "private":
if err := relaxPrivateShare(a.str, a.edge, shr, trx); err != nil {
if err := relaxPrivateShare(a.str, edge, shr, trx); err != nil {
return err
}
}

View File

@ -2,7 +2,6 @@ package limits
import (
"github.com/jmoiron/sqlx"
"github.com/openziti/edge-api/rest_management_api_client"
"github.com/openziti/zrok/controller/emailUi"
"github.com/openziti/zrok/controller/store"
"github.com/openziti/zrok/util"
@ -11,13 +10,12 @@ import (
)
type shareWarningAction struct {
str *store.Store
edge *rest_management_api_client.ZitiEdgeManagement
cfg *emailUi.Config
str *store.Store
cfg *emailUi.Config
}
func newShareWarningAction(cfg *emailUi.Config, str *store.Store, edge *rest_management_api_client.ZitiEdgeManagement) *shareWarningAction {
return &shareWarningAction{str, edge, cfg}
func newShareWarningAction(cfg *emailUi.Config, str *store.Store) *shareWarningAction {
return &shareWarningAction{str, cfg}
}
func (a *shareWarningAction) HandleShare(shr *store.Share, rxBytes, txBytes int64, limit *BandwidthPerPeriod, trx *sqlx.Tx) error {