mirror of
https://github.com/openziti/zrok.git
synced 2025-02-02 03:20:26 +01:00
basic global bandwidth enforcement testing tweaks (no clases) (#606)
This commit is contained in:
parent
3258c3ee73
commit
0f32c5e8a3
@ -337,14 +337,24 @@ func (a *Agent) enforce(u *metrics.Usage) error {
|
|||||||
if latest, err := a.str.FindLatestBandwidthLimitJournalForGlobal(int(u.AccountId), trx); err == nil {
|
if latest, err := a.str.FindLatestBandwidthLimitJournalForGlobal(int(u.AccountId), trx); err == nil {
|
||||||
enforced = latest.Action == exceededLc.GetLimitAction()
|
enforced = latest.Action == exceededLc.GetLimitAction()
|
||||||
enforcedAt = latest.UpdatedAt
|
enforcedAt = latest.UpdatedAt
|
||||||
|
logrus.Debugf("limit '%v' already applied (enforced: %t)", exceededLc, enforced)
|
||||||
|
} else {
|
||||||
|
logrus.Errorf("error getting latest global bandwidth journal entry: %v", err)
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
logrus.Debugf("no bandwidth limit journal entry for '%v'", exceededLc)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if empty, err := a.str.IsBandwidthLimitJournalEmptyForLimitClass(int(u.AccountId), exceededLc.GetLimitClassId(), trx); err == nil && !empty {
|
if empty, err := a.str.IsBandwidthLimitJournalEmptyForLimitClass(int(u.AccountId), exceededLc.GetLimitClassId(), trx); err == nil && !empty {
|
||||||
if latest, err := a.str.FindLatestBandwidthLimitJournalForLimitClass(int(u.AccountId), exceededLc.GetLimitClassId(), trx); err == nil {
|
if latest, err := a.str.FindLatestBandwidthLimitJournalForLimitClass(int(u.AccountId), exceededLc.GetLimitClassId(), trx); err == nil {
|
||||||
enforced = latest.Action == exceededLc.GetLimitAction()
|
enforced = latest.Action == exceededLc.GetLimitAction()
|
||||||
enforcedAt = latest.UpdatedAt
|
enforcedAt = latest.UpdatedAt
|
||||||
|
logrus.Debugf("limit '%v' already applied (enforced: %t)", exceededLc, enforced)
|
||||||
|
} else {
|
||||||
|
logrus.Errorf("error getting latest bandwidth limit journal entry for limit class '%d': %v", exceededLc.GetLimitClassId(), err)
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
logrus.Debugf("no bandwidth limit journal entry for '%v'", exceededLc)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -423,7 +433,7 @@ func (a *Agent) relax() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var bwc store.BandwidthClass
|
var bwc store.BandwidthClass
|
||||||
if bwje.LimitClassId != nil {
|
if bwje.LimitClassId == nil {
|
||||||
globalBwcs := newConfigBandwidthClasses(a.cfg.Bandwidth)
|
globalBwcs := newConfigBandwidthClasses(a.cfg.Bandwidth)
|
||||||
if bwje.Action == store.WarningLimitAction {
|
if bwje.Action == store.WarningLimitAction {
|
||||||
bwc = globalBwcs[0]
|
bwc = globalBwcs[0]
|
||||||
@ -455,27 +465,30 @@ func (a *Agent) relax() error {
|
|||||||
used := periodBw[bwc.GetPeriodMinutes()]
|
used := periodBw[bwc.GetPeriodMinutes()]
|
||||||
if !a.limitExceeded(used.rx, used.tx, bwc) {
|
if !a.limitExceeded(used.rx, used.tx, bwc) {
|
||||||
if bwc.GetLimitAction() == store.LimitLimitAction {
|
if bwc.GetLimitAction() == store.LimitLimitAction {
|
||||||
|
logrus.Infof("relaxing limit '%v' for '%v'", bwc.String(), accounts[bwje.AccountId].Email)
|
||||||
for _, action := range a.relaxActions {
|
for _, action := range a.relaxActions {
|
||||||
if err := action.HandleAccount(accounts[bwje.AccountId], used.rx, used.tx, bwc, trx); err != nil {
|
if err := action.HandleAccount(accounts[bwje.AccountId], used.rx, used.tx, bwc, trx); err != nil {
|
||||||
return errors.Wrapf(err, "%v", reflect.TypeOf(action).String())
|
return errors.Wrapf(err, "%v", reflect.TypeOf(action).String())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logrus.Infof("relaxing warning for '%v'", accounts[bwje.AccountId].Email)
|
logrus.Infof("relaxing warning '%v' for '%v'", bwc.String(), accounts[bwje.AccountId].Email)
|
||||||
}
|
}
|
||||||
var lcId *int
|
if bwc.IsGlobal() {
|
||||||
if !bwc.IsGlobal() {
|
if err := a.str.DeleteBandwidthLimitJournalEntryForGlobal(bwje.AccountId, trx); err == nil {
|
||||||
newLcId := 0
|
commit = true
|
||||||
newLcId = bwc.GetLimitClassId()
|
} else {
|
||||||
lcId = &newLcId
|
logrus.Errorf("error deleting global bandwidth limit journal entry for '%v': %v", accounts[bwje.AccountId].Email, err)
|
||||||
}
|
}
|
||||||
if err := a.str.DeleteBandwidthLimitJournalEntryForLimitClass(bwje.AccountId, lcId, trx); err == nil {
|
|
||||||
commit = true
|
|
||||||
} else {
|
} else {
|
||||||
logrus.Errorf("error deleting bandwidth limit journal entry for '%v': %v", accounts[bwje.AccountId].Email, err)
|
if err := a.str.DeleteBandwidthLimitJournalEntryForLimitClass(bwje.AccountId, *bwje.LimitClassId, trx); err == nil {
|
||||||
|
commit = true
|
||||||
|
} else {
|
||||||
|
logrus.Errorf("error deleting bandwidth limit journal entry for '%v': %v", accounts[bwje.AccountId].Email, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logrus.Infof("account '%v' still over limit: %v", accounts[bwje.AccountId].Email, bwc)
|
logrus.Infof("account '%v' still over limit: '%v' with rx: %d, tx: %d", accounts[bwje.AccountId].Email, bwc, used.rx, used.tx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -536,6 +549,9 @@ func (a *Agent) isOverLimitClass(u *metrics.Usage, alcs []*store.LimitClass) (st
|
|||||||
selectedLcPoints = points
|
selectedLcPoints = points
|
||||||
rxBytes = period.rx
|
rxBytes = period.rx
|
||||||
txBytes = period.tx
|
txBytes = period.tx
|
||||||
|
logrus.Debugf("exceeded limit '%v' with rx: %d, tx: %d", bwc.String(), period.rx, period.tx)
|
||||||
|
} else {
|
||||||
|
logrus.Debugf("limit '%v' ok with rx: %d, tx: %d", bwc.String(), period.rx, period.tx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -570,7 +586,7 @@ func (a *Agent) limitExceeded(rx, tx int64, bwc store.BandwidthClass) bool {
|
|||||||
if bwc.GetRxBytes() != Unlimited && rx >= bwc.GetRxBytes() {
|
if bwc.GetRxBytes() != Unlimited && rx >= bwc.GetRxBytes() {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if bwc.GetTxBytes() != Unlimited && bwc.GetRxBytes() != Unlimited && tx+rx >= bwc.GetTxBytes()+bwc.GetRxBytes() {
|
if bwc.GetTotalBytes() != Unlimited && tx+rx >= bwc.GetTotalBytes() {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
package limits
|
package limits
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"github.com/openziti/zrok/controller/store"
|
"github.com/openziti/zrok/controller/store"
|
||||||
"github.com/openziti/zrok/sdk/golang/sdk"
|
"github.com/openziti/zrok/sdk/golang/sdk"
|
||||||
)
|
)
|
||||||
@ -61,3 +63,10 @@ func (bc *configBandwidthClass) GetTotalBytes() int64 {
|
|||||||
func (bc *configBandwidthClass) GetLimitAction() store.LimitAction {
|
func (bc *configBandwidthClass) GetLimitAction() store.LimitAction {
|
||||||
return bc.limitAction
|
return bc.limitAction
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (bc *configBandwidthClass) String() string {
|
||||||
|
if out, err := json.Marshal(bc.bw); err == nil {
|
||||||
|
return fmt.Sprintf("Config<period: %d, %s, action: %s>", bc.periodInMinutes, string(out), bc.limitAction)
|
||||||
|
}
|
||||||
|
return "<<ERROR>>"
|
||||||
|
}
|
@ -52,7 +52,7 @@ func (str *Store) IsBandwidthLimitJournalEmptyForGlobal(acctId int, trx *sqlx.Tx
|
|||||||
|
|
||||||
func (str *Store) FindLatestBandwidthLimitJournalForGlobal(acctId int, trx *sqlx.Tx) (*BandwidthLimitJournalEntry, error) {
|
func (str *Store) FindLatestBandwidthLimitJournalForGlobal(acctId int, trx *sqlx.Tx) (*BandwidthLimitJournalEntry, error) {
|
||||||
j := &BandwidthLimitJournalEntry{}
|
j := &BandwidthLimitJournalEntry{}
|
||||||
if err := trx.QueryRowx("select * from bandwidth_limit_journal where account_id = $1 and limit_class_id is null order by id desc limit 1", acctId).Scan(&j); err != nil {
|
if err := trx.QueryRowx("select * from bandwidth_limit_journal where account_id = $1 and limit_class_id is null order by id desc limit 1", acctId).StructScan(j); err != nil {
|
||||||
return nil, errors.Wrap(err, "error finding bandwidth_limit_journal by account_id for global")
|
return nil, errors.Wrap(err, "error finding bandwidth_limit_journal by account_id for global")
|
||||||
}
|
}
|
||||||
return j, nil
|
return j, nil
|
||||||
@ -113,7 +113,14 @@ func (str *Store) DeleteBandwidthLimitJournal(acctId int, trx *sqlx.Tx) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (str *Store) DeleteBandwidthLimitJournalEntryForLimitClass(acctId int, lcId *int, trx *sqlx.Tx) error {
|
func (str *Store) DeleteBandwidthLimitJournalEntryForGlobal(acctId int, trx *sqlx.Tx) error {
|
||||||
|
if _, err := trx.Exec("delete from bandwidth_limit_journal where account_id = $1 and limit_class_id is null", acctId); err != nil {
|
||||||
|
return errors.Wrapf(err, "error deleting from bandwidth_limit_journal for account_id = %d and limit_class_id is null", acctId)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (str *Store) DeleteBandwidthLimitJournalEntryForLimitClass(acctId int, lcId int, trx *sqlx.Tx) error {
|
||||||
if _, err := trx.Exec("delete from bandwidth_limit_journal where account_id = $1 and limit_class_id = $2", acctId, lcId); err != nil {
|
if _, err := trx.Exec("delete from bandwidth_limit_journal where account_id = $1 and limit_class_id = $2", acctId, lcId); err != nil {
|
||||||
return errors.Wrapf(err, "error deleting from bandwidth_limit_journal for account_id = %d and limit_class_id = %d", acctId, lcId)
|
return errors.Wrapf(err, "error deleting from bandwidth_limit_journal for account_id = %d and limit_class_id = %d", acctId, lcId)
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,7 @@ type BandwidthClass interface {
|
|||||||
GetTxBytes() int64
|
GetTxBytes() int64
|
||||||
GetTotalBytes() int64
|
GetTotalBytes() int64
|
||||||
GetLimitAction() LimitAction
|
GetLimitAction() LimitAction
|
||||||
|
String() string
|
||||||
}
|
}
|
||||||
|
|
||||||
type LimitClass struct {
|
type LimitClass struct {
|
||||||
@ -71,12 +72,10 @@ func (lc LimitClass) GetLimitAction() LimitAction {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (lc LimitClass) String() string {
|
func (lc LimitClass) String() string {
|
||||||
out, err := json.MarshalIndent(&lc, "", " ")
|
if out, err := json.Marshal(&lc); err == nil {
|
||||||
if err != nil {
|
return "LimitClass<" + string(out) + ">"
|
||||||
return ""
|
|
||||||
|
|
||||||
}
|
}
|
||||||
return string(out)
|
return "<<ERROR>>"
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ BandwidthClass = (*LimitClass)(nil)
|
var _ BandwidthClass = (*LimitClass)(nil)
|
||||||
|
@ -13,7 +13,6 @@ func Middleware(handler http.Handler, healthCheck func(w http.ResponseWriter, r
|
|||||||
logrus.Infof("building")
|
logrus.Infof("building")
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
if strings.HasPrefix(r.URL.Path, "/api/v1") {
|
if strings.HasPrefix(r.URL.Path, "/api/v1") {
|
||||||
logrus.Debugf("directing '%v' to api handler", r.URL.Path)
|
|
||||||
handler.ServeHTTP(w, r)
|
handler.ServeHTTP(w, r)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -22,8 +21,6 @@ func Middleware(handler http.Handler, healthCheck func(w http.ResponseWriter, r
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Debugf("directing '%v' to static handler", r.URL.Path)
|
|
||||||
|
|
||||||
staticPath := "build"
|
staticPath := "build"
|
||||||
indexPath := "index.html"
|
indexPath := "index.html"
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user