diff --git a/controller/limits/agent.go b/controller/limits/agent.go index e335228d..df42b700 100644 --- a/controller/limits/agent.go +++ b/controller/limits/agent.go @@ -40,27 +40,15 @@ func (a *Agent) Stop() { func (a *Agent) Handle(u *metrics.Usage) error { logrus.Infof("handling: %v", u) - acctRx, err := a.ifx.totalRxForAccount(u.AccountId, 24*time.Hour) + acctRx, acctTx, err := a.ifx.totalForAccount(u.AccountId, 24*time.Hour) if err != nil { logrus.Error(err) } - acctTx, err := a.ifx.totalTxForAccount(u.AccountId, 24*time.Hour) + envRx, envTx, err := a.ifx.totalForEnvironment(u.EnvironmentId, 24*time.Hour) if err != nil { logrus.Error(err) } - envRx, err := a.ifx.totalRxForEnvironment(u.EnvironmentId, 24*time.Hour) - if err != nil { - logrus.Error(err) - } - envTx, err := a.ifx.totalTxForEnvironment(u.EnvironmentId, 24*time.Hour) - if err != nil { - logrus.Error(err) - } - shareRx, err := a.ifx.totalRxForShare(u.ShareToken, 24*time.Hour) - if err != nil { - logrus.Error(err) - } - shareTx, err := a.ifx.totalTxForShare(u.ShareToken, 24*time.Hour) + shareRx, shareTx, err := a.ifx.totalForShare(u.ShareToken, 24*time.Hour) if err != nil { logrus.Error(err) } @@ -81,7 +69,7 @@ mainLoop: for { select { case <-time.After(a.cfg.Cycle): - logrus.Info("insepection cycle") + logrus.Info("inspection cycle") case <-a.close: close(a.join) diff --git a/controller/limits/influxReader.go b/controller/limits/influxReader.go index 99e3aca0..e2a38333 100644 --- a/controller/limits/influxReader.go +++ b/controller/limits/influxReader.go @@ -24,90 +24,65 @@ func newInfluxReader(cfg *metrics.InfluxConfig) *influxReader { return &influxReader{cfg, idb, queryApi} } -func (r *influxReader) totalRxForAccount(acctId int64, duration time.Duration) (int64, error) { +func (r *influxReader) totalForAccount(acctId int64, duration time.Duration) (int64, int64, error) { query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) + fmt.Sprintf("|> range(start: -%v)\n", duration) + "|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" + - "|> filter(fn: (r) => r[\"_field\"] == \"bytesRead\")\n" + + "|> filter(fn: (r) => r[\"_field\"] == \"rx\" or r[\"_field\"] == \"tx\")\n" + "|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" + fmt.Sprintf("|> filter(fn: (r) => r[\"acctId\"] == \"%d\")\n", acctId) + - "|> set(key: \"share\", value: \"*\")\n" + + "|> drop(columns: [\"share\", \"envId\"])\n" + "|> sum()" return r.runQueryForSum(query) } -func (r *influxReader) totalTxForAccount(acctId int64, duration time.Duration) (int64, error) { +func (r *influxReader) totalForEnvironment(envId int64, duration time.Duration) (int64, int64, error) { query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) + fmt.Sprintf("|> range(start: -%v)\n", duration) + "|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" + - "|> filter(fn: (r) => r[\"_field\"] == \"bytesWritten\")\n" + - "|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" + - fmt.Sprintf("|> filter(fn: (r) => r[\"acctId\"] == \"%d\")\n", acctId) + - "|> set(key: \"share\", value: \"*\")\n" + - "|> sum()" - return r.runQueryForSum(query) -} - -func (r *influxReader) totalRxForEnvironment(envId int64, duration time.Duration) (int64, error) { - query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) + - fmt.Sprintf("|> range(start: -%v)\n", duration) + - "|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" + - "|> filter(fn: (r) => r[\"_field\"] == \"bytesRead\")\n" + + "|> filter(fn: (r) => r[\"_field\"] == \"rx\" or r[\"_field\"] == \"tx\")\n" + "|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" + fmt.Sprintf("|> filter(fn: (r) => r[\"envId\"] == \"%d\")\n", envId) + - "|> set(key: \"share\", value: \"*\")\n" + + "|> drop(columns: [\"share\", \"acctId\"])\n" + "|> sum()" return r.runQueryForSum(query) } -func (r *influxReader) totalTxForEnvironment(envId int64, duration time.Duration) (int64, error) { +func (r *influxReader) totalForShare(shrToken string, duration time.Duration) (int64, int64, error) { query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) + fmt.Sprintf("|> range(start: -%v)\n", duration) + "|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" + - "|> filter(fn: (r) => r[\"_field\"] == \"bytesWritten\")\n" + - "|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" + - fmt.Sprintf("|> filter(fn: (r) => r[\"envId\"] == \"%d\")\n", envId) + - "|> set(key: \"share\", value: \"*\")\n" + - "|> sum()" - return r.runQueryForSum(query) -} - -func (r *influxReader) totalRxForShare(shrToken string, duration time.Duration) (int64, error) { - query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) + - fmt.Sprintf("|> range(start: -%v)\n", duration) + - "|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" + - "|> filter(fn: (r) => r[\"_field\"] == \"bytesRead\")\n" + + "|> filter(fn: (r) => r[\"_field\"] == \"rx\" or r[\"_field\"] == \"tx\")\n" + "|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" + fmt.Sprintf("|> filter(fn: (r) => r[\"share\"] == \"%v\")\n", shrToken) + "|> sum()" return r.runQueryForSum(query) } -func (r *influxReader) totalTxForShare(shrToken string, duration time.Duration) (int64, error) { - query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) + - fmt.Sprintf("|> range(start: -%v)\n", duration) + - "|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" + - "|> filter(fn: (r) => r[\"_field\"] == \"bytesWritten\")\n" + - "|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" + - fmt.Sprintf("|> filter(fn: (r) => r[\"share\"] == \"%v\")\n", shrToken) + - "|> sum()" - return r.runQueryForSum(query) -} - -func (r *influxReader) runQueryForSum(query string) (int64, error) { +func (r *influxReader) runQueryForSum(query string) (rx int64, tx int64, err error) { result, err := r.queryApi.Query(context.Background(), query) if err != nil { - return -1, err + return -1, -1, err } - if result.Next() { + count := 0 + for result.Next() { if v, ok := result.Record().Value().(int64); ok { - return v, nil + switch result.Record().Field() { + case "tx": + tx = v + case "rx": + rx = v + default: + logrus.Warnf("field '%v'?", result.Record().Field()) + } } else { - return -1, errors.New("error asserting result type") + return -1, -1, errors.New("error asserting value type") } + count++ } - - logrus.Warnf("empty read result set for '%v'", strings.ReplaceAll(query, "\n", "")) - return 0, nil + if count != 2 { + return -1, -1, errors.Errorf("expected 2 results; got '%d' (%v)", count, strings.ReplaceAll(query, "\n", "")) + } + return rx, tx, nil } diff --git a/controller/metrics/influxWriter.go b/controller/metrics/influxWriter.go index 12c09693..154f3e08 100644 --- a/controller/metrics/influxWriter.go +++ b/controller/metrics/influxWriter.go @@ -37,7 +37,7 @@ func (w *influxWriter) Handle(u *Usage) error { if u.BackendTx > 0 || u.BackendRx > 0 { pt := influxdb2.NewPoint("xfer", map[string]string{"namespace": "backend", "share": u.ShareToken, "envId": envId, "acctId": acctId}, - map[string]interface{}{"bytesRead": u.BackendRx, "bytesWritten": u.BackendTx}, + map[string]interface{}{"rx": u.BackendRx, "tx": u.BackendTx}, u.IntervalStart) pts = append(pts, pt) out += fmt.Sprintf(" backend {rx: %v, tx: %v}", util.BytesToSize(u.BackendRx), util.BytesToSize(u.BackendTx)) @@ -45,7 +45,7 @@ func (w *influxWriter) Handle(u *Usage) error { if u.FrontendTx > 0 || u.FrontendRx > 0 { pt := influxdb2.NewPoint("xfer", map[string]string{"namespace": "frontend", "share": u.ShareToken, "envId": envId, "acctId": acctId}, - map[string]interface{}{"bytesRead": u.FrontendRx, "bytesWritten": u.FrontendTx}, + map[string]interface{}{"rx": u.FrontendRx, "tx": u.FrontendTx}, u.IntervalStart) pts = append(pts, pt) out += fmt.Sprintf(" frontend {rx: %v, tx: %v}", util.BytesToSize(u.FrontendRx), util.BytesToSize(u.FrontendTx)) diff --git a/controller/sparkData.go b/controller/sparkData.go index bc793660..02b8b35d 100644 --- a/controller/sparkData.go +++ b/controller/sparkData.go @@ -19,11 +19,11 @@ func sparkDataForShares(shrs []*store.Share) (map[string][]int64, error) { for result.Next() { combinedRate := int64(0) - readRate := result.Record().ValueByKey("bytesRead") + readRate := result.Record().ValueByKey("tx") if readRate != nil { combinedRate += readRate.(int64) } - writeRate := result.Record().ValueByKey("bytesWritten") + writeRate := result.Record().ValueByKey("tx") if writeRate != nil { combinedRate += writeRate.(int64) } @@ -48,7 +48,7 @@ func sparkFluxQuery(shrs []*store.Share) string { query := "read = from(bucket: \"zrok\")" + "|> range(start: -5m)" + "|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")" + - "|> filter(fn: (r) => r[\"_field\"] == \"bytesRead\" or r[\"_field\"] == \"bytesWritten\")" + + "|> filter(fn: (r) => r[\"_field\"] == \"rx\" or r[\"_field\"] == \"tx\")" + "|> filter(fn: (r) => r[\"namespace\"] == \"backend\")" + shrFilter + "|> aggregateWindow(every: 5s, fn: sum, createEmpty: true)\n" +