replacing the old account/environment/share limit journals with the new bandwidth limit journal (#606)

This commit is contained in:
Michael Quigley 2024-05-31 14:26:29 -04:00
parent ea5670b4ae
commit 481cc7f7ad
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
8 changed files with 89 additions and 385 deletions

View File

@ -56,8 +56,8 @@ func (a *Agent) CanCreateEnvironment(acctId int, trx *sqlx.Tx) (bool, error) {
if err := a.str.LimitCheckLock(acctId, trx); err != nil {
return false, err
}
if empty, err := a.str.IsAccountLimitJournalEmpty(acctId, trx); err == nil && !empty {
alj, err := a.str.FindLatestAccountLimitJournal(acctId, trx)
if empty, err := a.str.IsBandwidthLimitJournalEmpty(acctId, trx); err == nil && !empty {
alj, err := a.str.FindLatestBandwidthLimitJournal(acctId, trx)
if err != nil {
return false, err
}
@ -86,8 +86,8 @@ func (a *Agent) CanCreateShare(acctId, envId int, reserved, uniqueName bool, _ s
if err := a.str.LimitCheckLock(acctId, trx); err != nil {
return false, err
}
if empty, err := a.str.IsAccountLimitJournalEmpty(acctId, trx); err == nil && !empty {
alj, err := a.str.FindLatestAccountLimitJournal(acctId, trx)
if empty, err := a.str.IsBandwidthLimitJournalEmpty(acctId, trx); err == nil && !empty {
alj, err := a.str.FindLatestBandwidthLimitJournal(acctId, trx)
if err != nil {
return false, err
}
@ -98,18 +98,6 @@ func (a *Agent) CanCreateShare(acctId, envId int, reserved, uniqueName bool, _ s
return false, err
}
if empty, err := a.str.IsEnvironmentLimitJournalEmpty(envId, trx); err == nil && !empty {
elj, err := a.str.FindLatestEnvironmentLimitJournal(envId, trx)
if err != nil {
return false, err
}
if elj.Action == store.LimitLimitAction {
return false, nil
}
} else if err != nil {
return false, err
}
alc, err := a.str.FindLimitClassesForAccount(acctId, trx)
if err != nil {
logrus.Errorf("error finding limit classes for account with id '%d': %v", acctId, err)
@ -167,8 +155,8 @@ func (a *Agent) CanAccessShare(shrId int, trx *sqlx.Tx) (bool, error) {
if err != nil {
return false, err
}
if empty, err := a.str.IsShareLimitJournalEmpty(shr.Id, trx); err == nil && !empty {
slj, err := a.str.FindLatestShareLimitJournal(shr.Id, trx)
if empty, err := a.str.IsBandwidthLimitJournalEmpty(shr.Id, trx); err == nil && !empty {
slj, err := a.str.FindLatestBandwidthLimitJournal(shr.Id, trx)
if err != nil {
return false, err
}
@ -178,40 +166,6 @@ func (a *Agent) CanAccessShare(shrId int, trx *sqlx.Tx) (bool, error) {
} else if err != nil {
return false, err
}
env, err := a.str.GetEnvironment(shr.EnvironmentId, trx)
if err != nil {
return false, err
}
if empty, err := a.str.IsEnvironmentLimitJournalEmpty(env.Id, trx); err == nil && !empty {
elj, err := a.str.FindLatestEnvironmentLimitJournal(env.Id, trx)
if err != nil {
return false, err
}
if elj.Action == store.LimitLimitAction {
return false, nil
}
} else if err != nil {
return false, err
}
if env.AccountId != nil {
acct, err := a.str.GetAccount(*env.AccountId, trx)
if err != nil {
return false, err
}
if empty, err := a.str.IsAccountLimitJournalEmpty(acct.Id, trx); err == nil && !empty {
alj, err := a.str.FindLatestAccountLimitJournal(acct.Id, trx)
if err != nil {
return false, err
}
if alj.Action == store.LimitLimitAction {
return false, nil
}
} else if err != nil {
return false, err
}
}
}
return true, nil
}
@ -277,15 +231,15 @@ func (a *Agent) enforce(u *metrics.Usage) error {
if enforce {
enforced := false
var enforcedAt time.Time
if empty, err := a.str.IsAccountLimitJournalEmpty(int(u.AccountId), trx); err == nil && !empty {
if latest, err := a.str.FindLatestAccountLimitJournal(int(u.AccountId), trx); err == nil {
if empty, err := a.str.IsBandwidthLimitJournalEmpty(int(u.AccountId), trx); err == nil && !empty {
if latest, err := a.str.FindLatestBandwidthLimitJournal(int(u.AccountId), trx); err == nil {
enforced = latest.Action == store.LimitLimitAction
enforcedAt = latest.UpdatedAt
}
}
if !enforced {
_, err := a.str.CreateAccountLimitJournal(&store.AccountLimitJournal{
_, err := a.str.CreateBandwidthLimitJournalEntry(&store.BandwidthLimitJournalEntry{
AccountId: int(u.AccountId),
RxBytes: rxBytes,
TxBytes: txBytes,
@ -314,15 +268,15 @@ func (a *Agent) enforce(u *metrics.Usage) error {
} else if warning {
warned := false
var warnedAt time.Time
if empty, err := a.str.IsAccountLimitJournalEmpty(int(u.AccountId), trx); err == nil && !empty {
if latest, err := a.str.FindLatestAccountLimitJournal(int(u.AccountId), trx); err == nil {
if empty, err := a.str.IsBandwidthLimitJournalEmpty(int(u.AccountId), trx); err == nil && !empty {
if latest, err := a.str.FindLatestBandwidthLimitJournal(int(u.AccountId), trx); err == nil {
warned = latest.Action == store.WarningLimitAction || latest.Action == store.LimitLimitAction
warnedAt = latest.UpdatedAt
}
}
if !warned {
_, err := a.str.CreateAccountLimitJournal(&store.AccountLimitJournal{
_, err := a.str.CreateBandwidthLimitJournalEntry(&store.BandwidthLimitJournalEntry{
AccountId: int(u.AccountId),
RxBytes: rxBytes,
TxBytes: txBytes,
@ -366,7 +320,7 @@ func (a *Agent) relax() error {
commit := false
if aljs, err := a.str.FindAllLatestAccountLimitJournal(trx); err == nil {
if aljs, err := a.str.FindAllLatestBandwidthLimitJournal(trx); err == nil {
for _, alj := range aljs {
if acct, err := a.str.GetAccount(alj.AccountId, trx); err == nil {
if alj.Action == store.WarningLimitAction || alj.Action == store.LimitLimitAction {
@ -382,7 +336,7 @@ func (a *Agent) relax() error {
} else {
logrus.Infof("relaxing warning for '%v'", acct.Email)
}
if err := a.str.DeleteAccountLimitJournalForAccount(acct.Id, trx); err == nil {
if err := a.str.DeleteBandwidthLimitJournal(acct.Id, trx); err == nil {
commit = true
} else {
logrus.Errorf("error deleting account_limit_journal for '%v': %v", acct.Email, err)

View File

@ -27,11 +27,6 @@ func (h *overviewHandler) Handle(_ metadata.OverviewParams, principal *rest_mode
logrus.Errorf("error finding environments for '%v': %v", principal.Email, err)
return metadata.NewOverviewInternalServerError()
}
elm, err := newEnvironmentsLimitedMap(envs, trx)
if err != nil {
logrus.Errorf("error finding limited environments for '%v': %v", principal.Email, err)
return metadata.NewOverviewInternalServerError()
}
accountLimited, err := h.isAccountLimited(principal, trx)
if err != nil {
logrus.Errorf("error checking account limited for '%v': %v", principal.Email, err)
@ -44,7 +39,6 @@ func (h *overviewHandler) Handle(_ metadata.OverviewParams, principal *rest_mode
Description: env.Description,
Host: env.Host,
ZID: env.ZId,
Limited: elm.isLimited(env),
CreatedAt: env.CreatedAt.UnixMilli(),
UpdatedAt: env.UpdatedAt.UnixMilli(),
},
@ -54,11 +48,6 @@ func (h *overviewHandler) Handle(_ metadata.OverviewParams, principal *rest_mode
logrus.Errorf("error finding shares for environment '%v': %v", env.ZId, err)
return metadata.NewOverviewInternalServerError()
}
slm, err := newSharesLimitedMap(shrs, trx)
if err != nil {
logrus.Errorf("error finding limited shares for environment '%v': %v", env.ZId, err)
return metadata.NewOverviewInternalServerError()
}
for _, shr := range shrs {
feEndpoint := ""
if shr.FrontendEndpoint != nil {
@ -81,7 +70,6 @@ func (h *overviewHandler) Handle(_ metadata.OverviewParams, principal *rest_mode
FrontendEndpoint: feEndpoint,
BackendProxyEndpoint: beProxyEndpoint,
Reserved: shr.Reserved,
Limited: slm.isLimited(shr),
CreatedAt: shr.CreatedAt.UnixMilli(),
UpdatedAt: shr.UpdatedAt.UnixMilli(),
}
@ -116,70 +104,16 @@ func (h *overviewHandler) Handle(_ metadata.OverviewParams, principal *rest_mode
}
func (h *overviewHandler) isAccountLimited(principal *rest_model_zrok.Principal, trx *sqlx.Tx) (bool, error) {
var alj *store.AccountLimitJournal
aljEmpty, err := str.IsAccountLimitJournalEmpty(int(principal.ID), trx)
var je *store.BandwidthLimitJournalEntry
jEmpty, err := str.IsBandwidthLimitJournalEmpty(int(principal.ID), trx)
if err != nil {
return false, err
}
if !aljEmpty {
alj, err = str.FindLatestAccountLimitJournal(int(principal.ID), trx)
if !jEmpty {
je, err = str.FindLatestBandwidthLimitJournal(int(principal.ID), trx)
if err != nil {
return false, err
}
}
return alj != nil && alj.Action == store.LimitLimitAction, nil
}
type sharesLimitedMap struct {
v map[int]struct{}
}
func newSharesLimitedMap(shrs []*store.Share, trx *sqlx.Tx) (*sharesLimitedMap, error) {
var shrIds []int
for i := range shrs {
shrIds = append(shrIds, shrs[i].Id)
}
shrsLimited, err := str.FindSelectedLatestShareLimitjournal(shrIds, trx)
if err != nil {
return nil, err
}
slm := &sharesLimitedMap{v: make(map[int]struct{})}
for i := range shrsLimited {
if shrsLimited[i].Action == store.LimitLimitAction {
slm.v[shrsLimited[i].ShareId] = struct{}{}
}
}
return slm, nil
}
func (m *sharesLimitedMap) isLimited(shr *store.Share) bool {
_, limited := m.v[shr.Id]
return limited
}
type environmentsLimitedMap struct {
v map[int]struct{}
}
func newEnvironmentsLimitedMap(envs []*store.Environment, trx *sqlx.Tx) (*environmentsLimitedMap, error) {
var envIds []int
for i := range envs {
envIds = append(envIds, envs[i].Id)
}
envsLimited, err := str.FindSelectedLatestEnvironmentLimitJournal(envIds, trx)
if err != nil {
return nil, err
}
elm := &environmentsLimitedMap{v: make(map[int]struct{})}
for i := range envsLimited {
if envsLimited[i].Action == store.LimitLimitAction {
elm.v[envsLimited[i].EnvironmentId] = struct{}{}
}
}
return elm, nil
}
func (m *environmentsLimitedMap) isLimited(env *store.Environment) bool {
_, limited := m.v[env.Id]
return limited
return je != nil && je.Action == store.LimitLimitAction, nil
}

View File

@ -1,65 +0,0 @@
package store
import (
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
)
type AccountLimitJournal struct {
Model
AccountId int
RxBytes int64
TxBytes int64
Action LimitAction
}
func (str *Store) CreateAccountLimitJournal(j *AccountLimitJournal, trx *sqlx.Tx) (int, error) {
stmt, err := trx.Prepare("insert into account_limit_journal (account_id, rx_bytes, tx_bytes, action) values ($1, $2, $3, $4) returning id")
if err != nil {
return 0, errors.Wrap(err, "error preparing account_limit_journal insert statement")
}
var id int
if err := stmt.QueryRow(j.AccountId, j.RxBytes, j.TxBytes, j.Action).Scan(&id); err != nil {
return 0, errors.Wrap(err, "error executing account_limit_journal insert statement")
}
return id, nil
}
func (str *Store) IsAccountLimitJournalEmpty(acctId int, trx *sqlx.Tx) (bool, error) {
count := 0
if err := trx.QueryRowx("select count(0) from account_limit_journal where account_id = $1", acctId).Scan(&count); err != nil {
return false, err
}
return count == 0, nil
}
func (str *Store) FindLatestAccountLimitJournal(acctId int, trx *sqlx.Tx) (*AccountLimitJournal, error) {
j := &AccountLimitJournal{}
if err := trx.QueryRowx("select * from account_limit_journal where account_id = $1 order by id desc limit 1", acctId).StructScan(j); err != nil {
return nil, errors.Wrap(err, "error finding account_limit_journal by account_id")
}
return j, nil
}
func (str *Store) FindAllLatestAccountLimitJournal(trx *sqlx.Tx) ([]*AccountLimitJournal, error) {
rows, err := trx.Queryx("select id, account_id, rx_bytes, tx_bytes, action, created_at, updated_at from account_limit_journal where id in (select max(id) as id from account_limit_journal group by account_id)")
if err != nil {
return nil, errors.Wrap(err, "error selecting all latest account_limit_journal")
}
var aljs []*AccountLimitJournal
for rows.Next() {
alj := &AccountLimitJournal{}
if err := rows.StructScan(alj); err != nil {
return nil, errors.Wrap(err, "error scanning account_limit_journal")
}
aljs = append(aljs, alj)
}
return aljs, nil
}
func (str *Store) DeleteAccountLimitJournalForAccount(acctId int, trx *sqlx.Tx) error {
if _, err := trx.Exec("delete from account_limit_journal where account_id = $1", acctId); err != nil {
return errors.Wrapf(err, "error deleting account_limit journal for '#%d'", acctId)
}
return nil
}

View File

@ -0,0 +1,66 @@
package store
import (
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
)
type BandwidthLimitJournalEntry struct {
Model
AccountId int
LimitClassId *int
Action LimitAction
RxBytes int64
TxBytes int64
}
func (str *Store) CreateBandwidthLimitJournalEntry(j *BandwidthLimitJournalEntry, trx *sqlx.Tx) (int, error) {
stmt, err := trx.Prepare("insert into bandwidth_limit_journal (account_id, limit_class_id, action, rx_bytes, tx_bytes) values ($1, $2, $3, $4, $5) returning id")
if err != nil {
return 0, errors.Wrap(err, "error preparing bandwidth_limit_journal insert statement")
}
var id int
if err := stmt.QueryRow(j.AccountId, j.LimitClassId, j.Action, j.RxBytes, j.TxBytes).Scan(&id); err != nil {
return 0, errors.Wrap(err, "error executing bandwidth_limit_journal insert statement")
}
return id, nil
}
func (str *Store) IsBandwidthLimitJournalEmpty(acctId int, trx *sqlx.Tx) (bool, error) {
count := 0
if err := trx.QueryRowx("select count(0) from bandwidth_limit_journal where account_id = $1", acctId).Scan(&count); err != nil {
return false, err
}
return count == 0, nil
}
func (str *Store) FindLatestBandwidthLimitJournal(acctId int, trx *sqlx.Tx) (*BandwidthLimitJournalEntry, error) {
j := &BandwidthLimitJournalEntry{}
if err := trx.QueryRowx("select * from bandwidth_limit_journal where account_id = $1 order by id desc limit 1", acctId).StructScan(j); err != nil {
return nil, errors.Wrap(err, "error finding bandwidth_limit_journal by account_id")
}
return j, nil
}
func (str *Store) FindAllLatestBandwidthLimitJournal(trx *sqlx.Tx) ([]*BandwidthLimitJournalEntry, error) {
rows, err := trx.Queryx("select id, account_id, limit_class_id, action, rx_bytes, tx_bytes, created_at, updated_at from bandwidth_limit_journal where id in (select max(id) as id from bandwidth_limit_journal group by account_id)")
if err != nil {
return nil, errors.Wrap(err, "error finding all latest bandwidth_limit_journal")
}
var jes []*BandwidthLimitJournalEntry
for rows.Next() {
je := &BandwidthLimitJournalEntry{}
if err := rows.StructScan(je); err != nil {
return nil, errors.Wrap(err, "error scanning bandwidth_limit_journal")
}
jes = append(jes, je)
}
return jes, nil
}
func (str *Store) DeleteBandwidthLimitJournal(acctId int, trx *sqlx.Tx) error {
if _, err := trx.Exec("delete from bandwidth_limit_journal where account_id = $1", acctId); err != nil {
return errors.Wrapf(err, "error deleting from bandwidth_limit_journal for account_id = %d", acctId)
}
return nil
}

View File

@ -1,93 +0,0 @@
package store
import (
"fmt"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
)
type EnvironmentLimitJournal struct {
Model
EnvironmentId int
RxBytes int64
TxBytes int64
Action LimitAction
}
func (str *Store) CreateEnvironmentLimitJournal(j *EnvironmentLimitJournal, trx *sqlx.Tx) (int, error) {
stmt, err := trx.Prepare("insert into environment_limit_journal (environment_id, rx_bytes, tx_bytes, action) values ($1, $2, $3, $4) returning id")
if err != nil {
return 0, errors.Wrap(err, "error preparing environment_limit_journal insert statement")
}
var id int
if err := stmt.QueryRow(j.EnvironmentId, j.RxBytes, j.TxBytes, j.Action).Scan(&id); err != nil {
return 0, errors.Wrap(err, "error executing environment_limit_journal insert statement")
}
return id, nil
}
func (str *Store) IsEnvironmentLimitJournalEmpty(envId int, trx *sqlx.Tx) (bool, error) {
count := 0
if err := trx.QueryRowx("select count(0) from environment_limit_journal where environment_id = $1", envId).Scan(&count); err != nil {
return false, err
}
return count == 0, nil
}
func (str *Store) FindLatestEnvironmentLimitJournal(envId int, trx *sqlx.Tx) (*EnvironmentLimitJournal, error) {
j := &EnvironmentLimitJournal{}
if err := trx.QueryRowx("select * from environment_limit_journal where environment_id = $1 order by created_at desc limit 1", envId).StructScan(j); err != nil {
return nil, errors.Wrap(err, "error finding environment_limit_journal by environment_id")
}
return j, nil
}
func (str *Store) FindSelectedLatestEnvironmentLimitJournal(envIds []int, trx *sqlx.Tx) ([]*EnvironmentLimitJournal, error) {
if len(envIds) < 1 {
return nil, nil
}
in := "("
for i := range envIds {
if i > 0 {
in += ", "
}
in += fmt.Sprintf("%d", envIds[i])
}
in += ")"
rows, err := trx.Queryx("select id, environment_id, rx_bytes, tx_bytes, action, created_at, updated_at from environment_limit_journal where id in (select max(id) as id from environment_limit_journal group by environment_id) and environment_id in " + in)
if err != nil {
return nil, errors.Wrap(err, "error selecting all latest environment_limit_journal")
}
var eljs []*EnvironmentLimitJournal
for rows.Next() {
elj := &EnvironmentLimitJournal{}
if err := rows.StructScan(elj); err != nil {
return nil, errors.Wrap(err, "error scanning environment_limit_journal")
}
eljs = append(eljs, elj)
}
return eljs, nil
}
func (str *Store) FindAllLatestEnvironmentLimitJournal(trx *sqlx.Tx) ([]*EnvironmentLimitJournal, error) {
rows, err := trx.Queryx("select id, environment_id, rx_bytes, tx_bytes, action, created_at, updated_at from environment_limit_journal where id in (select max(id) as id from environment_limit_journal group by environment_id)")
if err != nil {
return nil, errors.Wrap(err, "error selecting all latest environment_limit_journal")
}
var eljs []*EnvironmentLimitJournal
for rows.Next() {
elj := &EnvironmentLimitJournal{}
if err := rows.StructScan(elj); err != nil {
return nil, errors.Wrap(err, "error scanning environment_limit_journal")
}
eljs = append(eljs, elj)
}
return eljs, nil
}
func (str *Store) DeleteEnvironmentLimitJournalForEnvironment(envId int, trx *sqlx.Tx) error {
if _, err := trx.Exec("delete from environment_limit_journal where environment_id = $1", envId); err != nil {
return errors.Wrapf(err, "error deleteing environment_limit_journal for '#%d'", envId)
}
return nil
}

View File

@ -5,7 +5,6 @@ type LimitAction string
const (
LimitLimitAction LimitAction = "limit"
WarningLimitAction LimitAction = "warning"
ClearLimitAction LimitAction = "clear"
)
type LimitScope string

View File

@ -1,93 +0,0 @@
package store
import (
"fmt"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
)
type ShareLimitJournal struct {
Model
ShareId int
RxBytes int64
TxBytes int64
Action LimitAction
}
func (str *Store) CreateShareLimitJournal(j *ShareLimitJournal, trx *sqlx.Tx) (int, error) {
stmt, err := trx.Prepare("insert into share_limit_journal (share_id, rx_bytes, tx_bytes, action) values ($1, $2, $3, $4) returning id")
if err != nil {
return 0, errors.Wrap(err, "error preparing share_limit_journal insert statement")
}
var id int
if err := stmt.QueryRow(j.ShareId, j.RxBytes, j.TxBytes, j.Action).Scan(&id); err != nil {
return 0, errors.Wrap(err, "error executing share_limit_journal insert statement")
}
return id, nil
}
func (str *Store) IsShareLimitJournalEmpty(shrId int, trx *sqlx.Tx) (bool, error) {
count := 0
if err := trx.QueryRowx("select count(0) from share_limit_journal where share_id = $1", shrId).Scan(&count); err != nil {
return false, err
}
return count == 0, nil
}
func (str *Store) FindLatestShareLimitJournal(shrId int, trx *sqlx.Tx) (*ShareLimitJournal, error) {
j := &ShareLimitJournal{}
if err := trx.QueryRowx("select * from share_limit_journal where share_id = $1 order by created_at desc limit 1", shrId).StructScan(j); err != nil {
return nil, errors.Wrap(err, "error finding share_limit_journal by share_id")
}
return j, nil
}
func (str *Store) FindSelectedLatestShareLimitjournal(shrIds []int, trx *sqlx.Tx) ([]*ShareLimitJournal, error) {
if len(shrIds) < 1 {
return nil, nil
}
in := "("
for i := range shrIds {
if i > 0 {
in += ", "
}
in += fmt.Sprintf("%d", shrIds[i])
}
in += ")"
rows, err := trx.Queryx("select id, share_id, rx_bytes, tx_bytes, action, created_at, updated_at from share_limit_journal where id in (select max(id) as id from share_limit_journal group by share_id) and share_id in " + in)
if err != nil {
return nil, errors.Wrap(err, "error selecting all latest share_limit_journal")
}
var sljs []*ShareLimitJournal
for rows.Next() {
slj := &ShareLimitJournal{}
if err := rows.StructScan(slj); err != nil {
return nil, errors.Wrap(err, "error scanning share_limit_journal")
}
sljs = append(sljs, slj)
}
return sljs, nil
}
func (str *Store) FindAllLatestShareLimitJournal(trx *sqlx.Tx) ([]*ShareLimitJournal, error) {
rows, err := trx.Queryx("select id, share_id, rx_bytes, tx_bytes, action, created_at, updated_at from share_limit_journal where id in (select max(id) as id from share_limit_journal group by share_id)")
if err != nil {
return nil, errors.Wrap(err, "error selecting all latest share_limit_journal")
}
var sljs []*ShareLimitJournal
for rows.Next() {
slj := &ShareLimitJournal{}
if err := rows.StructScan(slj); err != nil {
return nil, errors.Wrap(err, "error scanning share_limit_journal")
}
sljs = append(sljs, slj)
}
return sljs, nil
}
func (str *Store) DeleteShareLimitJournalForShare(shrId int, trx *sqlx.Tx) error {
if _, err := trx.Exec("delete from share_limit_journal where share_id = $1", shrId); err != nil {
return errors.Wrapf(err, "error deleting share_limit_journal for '#%d'", shrId)
}
return nil
}

View File

@ -10,10 +10,12 @@ create type limit_action_type as enum ('warning', 'limit');
create table bandwidth_limit_journal (
id serial primary key,
account_id integer references accounts (id) not null,
limit_class integer references limit_classes,
limit_class_id integer references limit_classes (id),
action limit_action_type not null,
rx_bytes bigint not null,
tx_bytes bigint not null,
created_at timestamptz not null default(current_timestamp),
updated_at timestamptz not null default(current_timestamp)
);
);
create index bandwidth_limit_journal_account_id_idx on bandwidth_limit_journal (account_id);