2022-12-22 22:06:23 +01:00
|
|
|
package controller
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
2023-01-13 21:01:34 +01:00
|
|
|
"github.com/openziti/zrok/controller/store"
|
2022-12-22 22:06:23 +01:00
|
|
|
)
|
|
|
|
|
2023-05-11 21:21:10 +02:00
|
|
|
func sparkDataForShares(shrs []*store.Share) (rx, tx map[string][]int64, err error) {
|
|
|
|
rx = make(map[string][]int64)
|
|
|
|
tx = make(map[string][]int64)
|
2023-01-04 19:13:50 +01:00
|
|
|
if len(shrs) > 0 {
|
2023-03-13 19:19:38 +01:00
|
|
|
qapi := idb.QueryAPI(cfg.Metrics.Influx.Org)
|
2022-12-22 22:06:23 +01:00
|
|
|
|
2023-05-11 21:21:10 +02:00
|
|
|
query := sparkFluxQuery(shrs, cfg.Metrics.Influx.Bucket)
|
|
|
|
result, err := qapi.Query(context.Background(), query)
|
2022-12-22 22:06:23 +01:00
|
|
|
if err != nil {
|
2023-05-11 21:21:10 +02:00
|
|
|
return nil, nil, err
|
2022-12-22 22:06:23 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
for result.Next() {
|
2023-01-04 19:13:50 +01:00
|
|
|
shrToken := result.Record().ValueByKey("share").(string)
|
2023-05-11 21:21:10 +02:00
|
|
|
switch result.Record().Field() {
|
|
|
|
case "rx":
|
|
|
|
rxV := int64(0)
|
|
|
|
if v, ok := result.Record().Value().(int64); ok {
|
|
|
|
rxV = v
|
|
|
|
}
|
|
|
|
rxData := append(rx[shrToken], rxV)
|
|
|
|
rx[shrToken] = rxData
|
|
|
|
|
|
|
|
case "tx":
|
|
|
|
txV := int64(0)
|
|
|
|
if v, ok := result.Record().Value().(int64); ok {
|
|
|
|
txV = v
|
|
|
|
}
|
|
|
|
txData := append(tx[shrToken], txV)
|
|
|
|
tx[shrToken] = txData
|
|
|
|
}
|
2022-12-22 22:06:23 +01:00
|
|
|
}
|
|
|
|
}
|
2023-05-11 21:21:10 +02:00
|
|
|
return rx, tx, nil
|
2022-12-22 22:06:23 +01:00
|
|
|
}
|
|
|
|
|
2023-05-11 21:21:10 +02:00
|
|
|
func sparkFluxQuery(shrs []*store.Share, bucket string) string {
|
2023-01-04 20:21:23 +01:00
|
|
|
shrFilter := "|> filter(fn: (r) =>"
|
2023-01-04 19:13:50 +01:00
|
|
|
for i, shr := range shrs {
|
2022-12-22 22:06:23 +01:00
|
|
|
if i > 0 {
|
2023-01-04 20:21:23 +01:00
|
|
|
shrFilter += " or"
|
2022-12-22 22:06:23 +01:00
|
|
|
}
|
2023-01-04 20:21:23 +01:00
|
|
|
shrFilter += fmt.Sprintf(" r[\"share\"] == \"%v\"", shr.Token)
|
2022-12-22 22:06:23 +01:00
|
|
|
}
|
2023-01-04 20:21:23 +01:00
|
|
|
shrFilter += ")"
|
2023-05-11 21:21:10 +02:00
|
|
|
query := fmt.Sprintf("from(bucket: \"%v\")\n", bucket) +
|
|
|
|
"|> range(start: -5m)\n" +
|
|
|
|
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
|
|
|
|
"|> filter(fn: (r) => r[\"_field\"] == \"rx\" or r[\"_field\"] == \"tx\")\n" +
|
2023-03-07 22:29:39 +01:00
|
|
|
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")" +
|
2023-01-04 20:21:23 +01:00
|
|
|
shrFilter +
|
2023-05-11 21:21:10 +02:00
|
|
|
"|> aggregateWindow(every: 10s, fn: sum, createEmpty: true)\n"
|
2022-12-22 22:06:23 +01:00
|
|
|
return query
|
|
|
|
}
|