better account and environment total efficiency and accuracy; 'bytesRead'/'bytesWritten' -> 'rx'/'tx' (#271)

This commit is contained in:
Michael Quigley
2023-03-20 15:37:21 -04:00
committed by Kenneth Bingham
parent ecfae7975b
commit a6d6cfbe65
4 changed files with 35 additions and 72 deletions

View File

@ -24,90 +24,65 @@ func newInfluxReader(cfg *metrics.InfluxConfig) *influxReader {
return &influxReader{cfg, idb, queryApi}
}
func (r *influxReader) totalRxForAccount(acctId int64, duration time.Duration) (int64, error) {
func (r *influxReader) totalForAccount(acctId int64, duration time.Duration) (int64, int64, error) {
query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) +
fmt.Sprintf("|> range(start: -%v)\n", duration) +
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
"|> filter(fn: (r) => r[\"_field\"] == \"bytesRead\")\n" +
"|> filter(fn: (r) => r[\"_field\"] == \"rx\" or r[\"_field\"] == \"tx\")\n" +
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" +
fmt.Sprintf("|> filter(fn: (r) => r[\"acctId\"] == \"%d\")\n", acctId) +
"|> set(key: \"share\", value: \"*\")\n" +
"|> drop(columns: [\"share\", \"envId\"])\n" +
"|> sum()"
return r.runQueryForSum(query)
}
func (r *influxReader) totalTxForAccount(acctId int64, duration time.Duration) (int64, error) {
func (r *influxReader) totalForEnvironment(envId int64, duration time.Duration) (int64, int64, error) {
query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) +
fmt.Sprintf("|> range(start: -%v)\n", duration) +
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
"|> filter(fn: (r) => r[\"_field\"] == \"bytesWritten\")\n" +
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" +
fmt.Sprintf("|> filter(fn: (r) => r[\"acctId\"] == \"%d\")\n", acctId) +
"|> set(key: \"share\", value: \"*\")\n" +
"|> sum()"
return r.runQueryForSum(query)
}
func (r *influxReader) totalRxForEnvironment(envId int64, duration time.Duration) (int64, error) {
query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) +
fmt.Sprintf("|> range(start: -%v)\n", duration) +
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
"|> filter(fn: (r) => r[\"_field\"] == \"bytesRead\")\n" +
"|> filter(fn: (r) => r[\"_field\"] == \"rx\" or r[\"_field\"] == \"tx\")\n" +
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" +
fmt.Sprintf("|> filter(fn: (r) => r[\"envId\"] == \"%d\")\n", envId) +
"|> set(key: \"share\", value: \"*\")\n" +
"|> drop(columns: [\"share\", \"acctId\"])\n" +
"|> sum()"
return r.runQueryForSum(query)
}
func (r *influxReader) totalTxForEnvironment(envId int64, duration time.Duration) (int64, error) {
func (r *influxReader) totalForShare(shrToken string, duration time.Duration) (int64, int64, error) {
query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) +
fmt.Sprintf("|> range(start: -%v)\n", duration) +
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
"|> filter(fn: (r) => r[\"_field\"] == \"bytesWritten\")\n" +
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" +
fmt.Sprintf("|> filter(fn: (r) => r[\"envId\"] == \"%d\")\n", envId) +
"|> set(key: \"share\", value: \"*\")\n" +
"|> sum()"
return r.runQueryForSum(query)
}
func (r *influxReader) totalRxForShare(shrToken string, duration time.Duration) (int64, error) {
query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) +
fmt.Sprintf("|> range(start: -%v)\n", duration) +
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
"|> filter(fn: (r) => r[\"_field\"] == \"bytesRead\")\n" +
"|> filter(fn: (r) => r[\"_field\"] == \"rx\" or r[\"_field\"] == \"tx\")\n" +
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" +
fmt.Sprintf("|> filter(fn: (r) => r[\"share\"] == \"%v\")\n", shrToken) +
"|> sum()"
return r.runQueryForSum(query)
}
func (r *influxReader) totalTxForShare(shrToken string, duration time.Duration) (int64, error) {
query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) +
fmt.Sprintf("|> range(start: -%v)\n", duration) +
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
"|> filter(fn: (r) => r[\"_field\"] == \"bytesWritten\")\n" +
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" +
fmt.Sprintf("|> filter(fn: (r) => r[\"share\"] == \"%v\")\n", shrToken) +
"|> sum()"
return r.runQueryForSum(query)
}
func (r *influxReader) runQueryForSum(query string) (int64, error) {
func (r *influxReader) runQueryForSum(query string) (rx int64, tx int64, err error) {
result, err := r.queryApi.Query(context.Background(), query)
if err != nil {
return -1, err
return -1, -1, err
}
if result.Next() {
count := 0
for result.Next() {
if v, ok := result.Record().Value().(int64); ok {
return v, nil
switch result.Record().Field() {
case "tx":
tx = v
case "rx":
rx = v
default:
logrus.Warnf("field '%v'?", result.Record().Field())
}
} else {
return -1, errors.New("error asserting result type")
return -1, -1, errors.New("error asserting value type")
}
count++
}
logrus.Warnf("empty read result set for '%v'", strings.ReplaceAll(query, "\n", ""))
return 0, nil
if count != 2 {
return -1, -1, errors.Errorf("expected 2 results; got '%d' (%v)", count, strings.ReplaceAll(query, "\n", ""))
}
return rx, tx, nil
}