mirror of
https://github.com/openziti/zrok.git
synced 2024-11-07 08:44:14 +01:00
refinements and refactoring of limit classes to re-align with updated bandwidth journal, etc. (#606)
This commit is contained in:
parent
6256055a99
commit
cea7ff6474
@ -14,30 +14,30 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Agent struct {
|
type Agent struct {
|
||||||
cfg *Config
|
cfg *Config
|
||||||
ifx *influxReader
|
ifx *influxReader
|
||||||
zCfg *zrokEdgeSdk.Config
|
zCfg *zrokEdgeSdk.Config
|
||||||
str *store.Store
|
str *store.Store
|
||||||
queue chan *metrics.Usage
|
queue chan *metrics.Usage
|
||||||
acctWarningActions []AccountAction
|
warningActions []AccountAction
|
||||||
acctLimitActions []AccountAction
|
limitActions []AccountAction
|
||||||
acctRelaxActions []AccountAction
|
relaxActions []AccountAction
|
||||||
close chan struct{}
|
close chan struct{}
|
||||||
join chan struct{}
|
join chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewAgent(cfg *Config, ifxCfg *metrics.InfluxConfig, zCfg *zrokEdgeSdk.Config, emailCfg *emailUi.Config, str *store.Store) (*Agent, error) {
|
func NewAgent(cfg *Config, ifxCfg *metrics.InfluxConfig, zCfg *zrokEdgeSdk.Config, emailCfg *emailUi.Config, str *store.Store) (*Agent, error) {
|
||||||
a := &Agent{
|
a := &Agent{
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
ifx: newInfluxReader(ifxCfg),
|
ifx: newInfluxReader(ifxCfg),
|
||||||
zCfg: zCfg,
|
zCfg: zCfg,
|
||||||
str: str,
|
str: str,
|
||||||
queue: make(chan *metrics.Usage, 1024),
|
queue: make(chan *metrics.Usage, 1024),
|
||||||
acctWarningActions: []AccountAction{newAccountWarningAction(emailCfg, str)},
|
warningActions: []AccountAction{newAccountWarningAction(emailCfg, str)},
|
||||||
acctLimitActions: []AccountAction{newAccountLimitAction(str, zCfg)},
|
limitActions: []AccountAction{newAccountLimitAction(str, zCfg)},
|
||||||
acctRelaxActions: []AccountAction{newAccountRelaxAction(str, zCfg)},
|
relaxActions: []AccountAction{newAccountRelaxAction(str, zCfg)},
|
||||||
close: make(chan struct{}),
|
close: make(chan struct{}),
|
||||||
join: make(chan struct{}),
|
join: make(chan struct{}),
|
||||||
}
|
}
|
||||||
return a, nil
|
return a, nil
|
||||||
}
|
}
|
||||||
@ -258,8 +258,7 @@ func (a *Agent) enforce(u *metrics.Usage) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// run account limit actions
|
for _, action := range a.limitActions {
|
||||||
for _, action := range a.acctLimitActions {
|
|
||||||
if err := action.HandleAccount(acct, rxBytes, txBytes, a.cfg.Bandwidth, trx); err != nil {
|
if err := action.HandleAccount(acct, rxBytes, txBytes, a.cfg.Bandwidth, trx); err != nil {
|
||||||
return errors.Wrapf(err, "%v", reflect.TypeOf(action).String())
|
return errors.Wrapf(err, "%v", reflect.TypeOf(action).String())
|
||||||
}
|
}
|
||||||
@ -295,8 +294,7 @@ func (a *Agent) enforce(u *metrics.Usage) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// run account warning actions
|
for _, action := range a.warningActions {
|
||||||
for _, action := range a.acctWarningActions {
|
|
||||||
if err := action.HandleAccount(acct, rxBytes, txBytes, a.cfg.Bandwidth, trx); err != nil {
|
if err := action.HandleAccount(acct, rxBytes, txBytes, a.cfg.Bandwidth, trx); err != nil {
|
||||||
return errors.Wrapf(err, "%v", reflect.TypeOf(action).String())
|
return errors.Wrapf(err, "%v", reflect.TypeOf(action).String())
|
||||||
}
|
}
|
||||||
@ -334,7 +332,7 @@ func (a *Agent) relax() error {
|
|||||||
if !enforce && !warning {
|
if !enforce && !warning {
|
||||||
if alj.Action == store.LimitLimitAction {
|
if alj.Action == store.LimitLimitAction {
|
||||||
// run relax actions for account
|
// run relax actions for account
|
||||||
for _, action := range a.acctRelaxActions {
|
for _, action := range a.relaxActions {
|
||||||
if err := action.HandleAccount(acct, rxBytes, txBytes, a.cfg.Bandwidth, trx); err != nil {
|
if err := action.HandleAccount(acct, rxBytes, txBytes, a.cfg.Bandwidth, trx); err != nil {
|
||||||
return errors.Wrapf(err, "%v", reflect.TypeOf(action).String())
|
return errors.Wrapf(err, "%v", reflect.TypeOf(action).String())
|
||||||
}
|
}
|
||||||
|
@ -7,25 +7,10 @@ import (
|
|||||||
|
|
||||||
func sortLimitClasses(lcs []*store.LimitClass) {
|
func sortLimitClasses(lcs []*store.LimitClass) {
|
||||||
sort.Slice(lcs, func(i, j int) bool {
|
sort.Slice(lcs, func(i, j int) bool {
|
||||||
ipoints := limitScopePoints(lcs[i]) + modePoints(lcs[i])
|
return modePoints(lcs[i]) > modePoints(lcs[j])
|
||||||
jpoints := limitScopePoints(lcs[j]) + modePoints(lcs[j])
|
|
||||||
return ipoints > jpoints
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func limitScopePoints(lc *store.LimitClass) int {
|
|
||||||
points := 0
|
|
||||||
switch lc.LimitScope {
|
|
||||||
case store.AccountLimitScope:
|
|
||||||
points += 1000
|
|
||||||
case store.EnvironmentLimitScope:
|
|
||||||
points += 100
|
|
||||||
case store.ShareLimitScope:
|
|
||||||
points += 10
|
|
||||||
}
|
|
||||||
return points
|
|
||||||
}
|
|
||||||
|
|
||||||
func modePoints(lc *store.LimitClass) int {
|
func modePoints(lc *store.LimitClass) int {
|
||||||
points := 0
|
points := 0
|
||||||
if lc.BackendMode != "" {
|
if lc.BackendMode != "" {
|
||||||
|
@ -38,14 +38,13 @@ func TestBandwidthLimitJournal(t *testing.T) {
|
|||||||
assert.Equal(t, int64(2048), latestJe.TxBytes)
|
assert.Equal(t, int64(2048), latestJe.TxBytes)
|
||||||
|
|
||||||
lcId, err := str.CreateLimitClass(&LimitClass{
|
lcId, err := str.CreateLimitClass(&LimitClass{
|
||||||
LimitScope: AccountLimitScope,
|
|
||||||
LimitAction: LimitLimitAction,
|
|
||||||
ShareMode: sdk.PrivateShareMode,
|
ShareMode: sdk.PrivateShareMode,
|
||||||
BackendMode: sdk.VpnBackendMode,
|
BackendMode: sdk.VpnBackendMode,
|
||||||
PeriodMinutes: 60,
|
PeriodMinutes: 60,
|
||||||
RxBytes: 4096,
|
RxBytes: 4096,
|
||||||
TxBytes: 8192,
|
TxBytes: 8192,
|
||||||
TotalBytes: 10240,
|
TotalBytes: 10240,
|
||||||
|
LimitAction: LimitLimitAction,
|
||||||
}, trx)
|
}, trx)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
@ -9,10 +9,9 @@ import (
|
|||||||
|
|
||||||
type LimitClass struct {
|
type LimitClass struct {
|
||||||
Model
|
Model
|
||||||
LimitScope LimitScope
|
|
||||||
LimitAction LimitAction
|
|
||||||
ShareMode sdk.ShareMode
|
ShareMode sdk.ShareMode
|
||||||
BackendMode sdk.BackendMode
|
BackendMode sdk.BackendMode
|
||||||
|
Environments int
|
||||||
Shares int
|
Shares int
|
||||||
ReservedShares int
|
ReservedShares int
|
||||||
UniqueNames int
|
UniqueNames int
|
||||||
@ -20,6 +19,7 @@ type LimitClass struct {
|
|||||||
RxBytes int64
|
RxBytes int64
|
||||||
TxBytes int64
|
TxBytes int64
|
||||||
TotalBytes int64
|
TotalBytes int64
|
||||||
|
LimitAction LimitAction
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lc LimitClass) String() string {
|
func (lc LimitClass) String() string {
|
||||||
@ -32,12 +32,12 @@ func (lc LimitClass) String() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (str *Store) CreateLimitClass(lc *LimitClass, trx *sqlx.Tx) (int, error) {
|
func (str *Store) CreateLimitClass(lc *LimitClass, trx *sqlx.Tx) (int, error) {
|
||||||
stmt, err := trx.Prepare("insert into limit_classes (limit_scope, limit_action, share_mode, backend_mode, period_minutes, rx_bytes, tx_bytes, total_bytes) values ($1, $2, $3, $4, $5, $6, $7, $8) returning id")
|
stmt, err := trx.Prepare("insert into limit_classes (share_mode, backend_mode, environments, shares, reserved_shares, unique_names, period_minutes, rx_bytes, tx_bytes, total_bytes, limit_action) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) returning id")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, errors.Wrap(err, "error preparing limit_classes insert statement")
|
return 0, errors.Wrap(err, "error preparing limit_classes insert statement")
|
||||||
}
|
}
|
||||||
var id int
|
var id int
|
||||||
if err := stmt.QueryRow(lc.LimitScope, lc.LimitAction, lc.ShareMode, lc.BackendMode, lc.PeriodMinutes, lc.RxBytes, lc.TxBytes, lc.TotalBytes).Scan(&id); err != nil {
|
if err := stmt.QueryRow(lc.ShareMode, lc.BackendMode, lc.Environments, lc.Shares, lc.ReservedShares, lc.UniqueNames, lc.PeriodMinutes, lc.RxBytes, lc.TxBytes, lc.TotalBytes, lc.LimitAction).Scan(&id); err != nil {
|
||||||
return 0, errors.Wrap(err, "error executing limit_classes insert statement")
|
return 0, errors.Wrap(err, "error executing limit_classes insert statement")
|
||||||
}
|
}
|
||||||
return id, nil
|
return id, nil
|
||||||
|
@ -1,13 +1,10 @@
|
|||||||
-- +migrate Up
|
-- +migrate Up
|
||||||
|
|
||||||
create type limit_scope as enum ('account', 'environment', 'share');
|
|
||||||
create type limit_action as enum ('warning', 'limit');
|
create type limit_action as enum ('warning', 'limit');
|
||||||
|
|
||||||
create table limit_classes (
|
create table limit_classes (
|
||||||
id serial primary key,
|
id serial primary key,
|
||||||
|
|
||||||
limit_scope limit_scope not null default ('account'),
|
|
||||||
limit_action limit_action not null default ('limit'),
|
|
||||||
share_mode share_mode,
|
share_mode share_mode,
|
||||||
backend_mode backend_mode,
|
backend_mode backend_mode,
|
||||||
|
|
||||||
@ -20,6 +17,8 @@ create table limit_classes (
|
|||||||
tx_bytes bigint not null default (-1),
|
tx_bytes bigint not null default (-1),
|
||||||
total_bytes bigint not null default (-1),
|
total_bytes bigint not null default (-1),
|
||||||
|
|
||||||
|
limit_action limit_action not null default ('limit'),
|
||||||
|
|
||||||
created_at timestamptz not null default(current_timestamp),
|
created_at timestamptz not null default(current_timestamp),
|
||||||
updated_at timestamptz not null default(current_timestamp),
|
updated_at timestamptz not null default(current_timestamp),
|
||||||
deleted boolean not null default(false)
|
deleted boolean not null default(false)
|
@ -3,8 +3,6 @@
|
|||||||
create table limit_classes (
|
create table limit_classes (
|
||||||
id integer primary key,
|
id integer primary key,
|
||||||
|
|
||||||
limit_scope string not null default ('account'),
|
|
||||||
limit_action string not null default ('limit'),
|
|
||||||
share_mode string,
|
share_mode string,
|
||||||
backend_mode string,
|
backend_mode string,
|
||||||
|
|
||||||
@ -17,13 +15,15 @@ create table limit_classes (
|
|||||||
tx_bytes bigint not null default (-1),
|
tx_bytes bigint not null default (-1),
|
||||||
total_bytes bigint not null default (-1),
|
total_bytes bigint not null default (-1),
|
||||||
|
|
||||||
|
limit_action string not null default ('limit'),
|
||||||
|
|
||||||
created_at datetime not null default(strftime('%Y-%m-%d %H:%M:%f', 'now')),
|
created_at datetime not null default(strftime('%Y-%m-%d %H:%M:%f', 'now')),
|
||||||
updated_at datetime not null default(strftime('%Y-%m-%d %H:%M:%f', 'now')),
|
updated_at datetime not null default(strftime('%Y-%m-%d %H:%M:%f', 'now')),
|
||||||
deleted boolean not null default(false)
|
deleted boolean not null default(false)
|
||||||
);
|
);
|
||||||
|
|
||||||
create table applied_limit_classes (
|
create table applied_limit_classes (
|
||||||
id integer primary key,
|
id integer primary key,
|
||||||
account_id integer not null references accounts (id),
|
account_id integer not null references accounts (id),
|
||||||
limit_class_id integer not null references limit_classes (id),
|
limit_class_id integer not null references limit_classes (id),
|
||||||
created_at datetime not null default(strftime('%Y-%m-%d %H:%M:%f', 'now')),
|
created_at datetime not null default(strftime('%Y-%m-%d %H:%M:%f', 'now')),
|
Loading…
Reference in New Issue
Block a user