diff --git a/controller/limits/agent.go b/controller/limits/agent.go index df42b700..5d9508eb 100644 --- a/controller/limits/agent.go +++ b/controller/limits/agent.go @@ -40,15 +40,15 @@ func (a *Agent) Stop() { func (a *Agent) Handle(u *metrics.Usage) error { logrus.Infof("handling: %v", u) - acctRx, acctTx, err := a.ifx.totalForAccount(u.AccountId, 24*time.Hour) + acctRx, acctTx, err := a.ifx.totalRxTxForAccount(u.AccountId, 24*time.Hour) if err != nil { logrus.Error(err) } - envRx, envTx, err := a.ifx.totalForEnvironment(u.EnvironmentId, 24*time.Hour) + envRx, envTx, err := a.ifx.totalRxTxForEnvironment(u.EnvironmentId, 24*time.Hour) if err != nil { logrus.Error(err) } - shareRx, shareTx, err := a.ifx.totalForShare(u.ShareToken, 24*time.Hour) + shareRx, shareTx, err := a.ifx.totalRxTxForShare(u.ShareToken, 24*time.Hour) if err != nil { logrus.Error(err) } diff --git a/controller/limits/influxReader.go b/controller/limits/influxReader.go index e2a38333..22b8c4d1 100644 --- a/controller/limits/influxReader.go +++ b/controller/limits/influxReader.go @@ -24,7 +24,7 @@ func newInfluxReader(cfg *metrics.InfluxConfig) *influxReader { return &influxReader{cfg, idb, queryApi} } -func (r *influxReader) totalForAccount(acctId int64, duration time.Duration) (int64, int64, error) { +func (r *influxReader) totalRxTxForAccount(acctId int64, duration time.Duration) (int64, int64, error) { query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) + fmt.Sprintf("|> range(start: -%v)\n", duration) + "|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" + @@ -33,10 +33,10 @@ func (r *influxReader) totalForAccount(acctId int64, duration time.Duration) (in fmt.Sprintf("|> filter(fn: (r) => r[\"acctId\"] == \"%d\")\n", acctId) + "|> drop(columns: [\"share\", \"envId\"])\n" + "|> sum()" - return r.runQueryForSum(query) + return r.runQueryForRxTx(query) } -func (r *influxReader) totalForEnvironment(envId int64, duration time.Duration) (int64, int64, error) { +func (r *influxReader) totalRxTxForEnvironment(envId int64, duration time.Duration) (int64, int64, error) { query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) + fmt.Sprintf("|> range(start: -%v)\n", duration) + "|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" + @@ -45,10 +45,10 @@ func (r *influxReader) totalForEnvironment(envId int64, duration time.Duration) fmt.Sprintf("|> filter(fn: (r) => r[\"envId\"] == \"%d\")\n", envId) + "|> drop(columns: [\"share\", \"acctId\"])\n" + "|> sum()" - return r.runQueryForSum(query) + return r.runQueryForRxTx(query) } -func (r *influxReader) totalForShare(shrToken string, duration time.Duration) (int64, int64, error) { +func (r *influxReader) totalRxTxForShare(shrToken string, duration time.Duration) (int64, int64, error) { query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) + fmt.Sprintf("|> range(start: -%v)\n", duration) + "|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" + @@ -56,10 +56,10 @@ func (r *influxReader) totalForShare(shrToken string, duration time.Duration) (i "|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" + fmt.Sprintf("|> filter(fn: (r) => r[\"share\"] == \"%v\")\n", shrToken) + "|> sum()" - return r.runQueryForSum(query) + return r.runQueryForRxTx(query) } -func (r *influxReader) runQueryForSum(query string) (rx int64, tx int64, err error) { +func (r *influxReader) runQueryForRxTx(query string) (rx int64, tx int64, err error) { result, err := r.queryApi.Query(context.Background(), query) if err != nil { return -1, -1, err