flux query influx for duration totals (#271)

This commit is contained in:
Michael Quigley 2023-03-17 13:51:12 -04:00
parent 9418195150
commit df56e49fab
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
2 changed files with 58 additions and 1 deletions

View File

@ -4,6 +4,7 @@ import (
"github.com/openziti/zrok/controller/metrics"
"github.com/openziti/zrok/controller/store"
"github.com/openziti/zrok/controller/zrokEdgeSdk"
"github.com/openziti/zrok/util"
"github.com/sirupsen/logrus"
"time"
)
@ -39,6 +40,15 @@ func (a *Agent) Stop() {
func (a *Agent) Handle(u *metrics.Usage) error {
logrus.Infof("handling: %v", u)
rxTotal, err := a.ifx.totalRxForShare(u.ShareToken, 24*time.Hour)
if err != nil {
logrus.Error(err)
}
txTotal, err := a.ifx.totalTxForShare(u.ShareToken, 24*time.Hour)
if err != nil {
logrus.Error(err)
}
logrus.Infof("'%v': {rx: %v, tx: %v}", u.ShareToken, util.BytesToSize(rxTotal), util.BytesToSize(txTotal))
return nil
}

View File

@ -1,12 +1,19 @@
package limits
import (
"context"
"fmt"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/openziti/zrok/controller/metrics"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"strings"
"time"
)
type influxReader struct {
cfg *metrics.InfluxConfig
idb influxdb2.Client
queryApi api.QueryAPI
}
@ -14,5 +21,45 @@ type influxReader struct {
func newInfluxReader(cfg *metrics.InfluxConfig) *influxReader {
idb := influxdb2.NewClient(cfg.Url, cfg.Token)
queryApi := idb.QueryAPI(cfg.Org)
return &influxReader{idb, queryApi}
return &influxReader{cfg, idb, queryApi}
}
func (r *influxReader) totalRxForShare(shrToken string, duration time.Duration) (int64, error) {
query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) +
fmt.Sprintf("|> range(start: -%v)\n", duration) +
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
"|> filter(fn: (r) => r[\"_field\"] == \"bytesRead\")\n" +
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" +
fmt.Sprintf("|> filter(fn: (r) => r[\"share\"] == \"%v\")\n", shrToken) +
"|> sum()"
return r.runQueryForSum(query)
}
func (r *influxReader) totalTxForShare(shrToken string, duration time.Duration) (int64, error) {
query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) +
fmt.Sprintf("|> range(start: -%v)\n", duration) +
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
"|> filter(fn: (r) => r[\"_field\"] == \"bytesWritten\")\n" +
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" +
fmt.Sprintf("|> filter(fn: (r) => r[\"share\"] == \"%v\")\n", shrToken) +
"|> sum()"
return r.runQueryForSum(query)
}
func (r *influxReader) runQueryForSum(query string) (int64, error) {
result, err := r.queryApi.Query(context.Background(), query)
if err != nil {
return -1, err
}
if result.Next() {
if v, ok := result.Record().Value().(int64); ok {
return v, nil
} else {
return -1, errors.New("error asserting result type")
}
}
logrus.Warnf("empty read result set for '%v'", strings.ReplaceAll(query, "\n", ""))
return 0, nil
}