roughed in environment sparklines backend (#327)

This commit is contained in:
Michael Quigley
2023-05-12 11:57:34 -04:00
parent 6259b62a8a
commit 2655eaefc0
16 changed files with 908 additions and 51 deletions

View File

@ -6,13 +6,79 @@ import (
"github.com/openziti/zrok/controller/store"
)
func sparkDataForEnvironments(envs []*store.Environment) (rx, tx map[int][]int64, err error) {
rx = make(map[int][]int64)
tx = make(map[int][]int64)
if len(envs) > 0 {
qapi := idb.QueryAPI(cfg.Metrics.Influx.Org)
envFilter := "|> filter(fn: (r) =>"
for i, env := range envs {
if i > 0 {
envFilter += " or"
}
envFilter += fmt.Sprintf(" r[\"envId\"] == \"%d\"", env.Id)
}
envFilter += ")"
query := fmt.Sprintf("from(bucket: \"%v\")\n", cfg.Metrics.Influx.Bucket) +
"|> range(start: -5m)\n" +
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
"|> filter(fn: (r) => r[\"_field\"] == \"rx\" or r[\"_field\"] == \"tx\")\n" +
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" +
envFilter +
"|> aggregateWindow(every: 10s, fn: sum, createEmpty: true)\n"
result, err := qapi.Query(context.Background(), query)
if err != nil {
return nil, nil, err
}
for result.Next() {
envId := result.Record().ValueByKey("envId").(int64)
switch result.Record().Field() {
case "rx":
rxV := int64(0)
if v, ok := result.Record().Value().(int64); ok {
rxV = v
}
rxData := append(rx[int(envId)], rxV)
rx[int(envId)] = rxData
case "tx":
txV := int64(0)
if v, ok := result.Record().Value().(int64); ok {
txV = v
}
txData := append(tx[int(envId)], txV)
tx[int(envId)] = txData
}
}
}
return rx, tx, nil
}
func sparkDataForShares(shrs []*store.Share) (rx, tx map[string][]int64, err error) {
rx = make(map[string][]int64)
tx = make(map[string][]int64)
if len(shrs) > 0 {
qapi := idb.QueryAPI(cfg.Metrics.Influx.Org)
query := sparkFluxQuery(shrs, cfg.Metrics.Influx.Bucket)
shrFilter := "|> filter(fn: (r) =>"
for i, shr := range shrs {
if i > 0 {
shrFilter += " or"
}
shrFilter += fmt.Sprintf(" r[\"share\"] == \"%v\"", shr.Token)
}
shrFilter += ")"
query := fmt.Sprintf("from(bucket: \"%v\")\n", cfg.Metrics.Influx.Bucket) +
"|> range(start: -5m)\n" +
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
"|> filter(fn: (r) => r[\"_field\"] == \"rx\" or r[\"_field\"] == \"tx\")\n" +
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" +
shrFilter +
"|> aggregateWindow(every: 10s, fn: sum, createEmpty: true)\n"
result, err := qapi.Query(context.Background(), query)
if err != nil {
return nil, nil, err
@ -41,22 +107,3 @@ func sparkDataForShares(shrs []*store.Share) (rx, tx map[string][]int64, err err
}
return rx, tx, nil
}
func sparkFluxQuery(shrs []*store.Share, bucket string) string {
shrFilter := "|> filter(fn: (r) =>"
for i, shr := range shrs {
if i > 0 {
shrFilter += " or"
}
shrFilter += fmt.Sprintf(" r[\"share\"] == \"%v\"", shr.Token)
}
shrFilter += ")"
query := fmt.Sprintf("from(bucket: \"%v\")\n", bucket) +
"|> range(start: -5m)\n" +
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
"|> filter(fn: (r) => r[\"_field\"] == \"rx\" or r[\"_field\"] == \"tx\")\n" +
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")" +
shrFilter +
"|> aggregateWindow(every: 10s, fn: sum, createEmpty: true)\n"
return query
}