zrok/controller/sparkData.go

59 lines
1.6 KiB
Go
Raw Normal View History

2022-12-22 22:06:23 +01:00
package controller
import (
"context"
"fmt"
"github.com/openziti/zrok/controller/store"
2022-12-22 22:06:23 +01:00
)
2023-01-04 20:21:23 +01:00
func sparkDataForShares(shrs []*store.Share) (map[string][]int64, error) {
2022-12-22 22:06:23 +01:00
out := make(map[string][]int64)
if len(shrs) > 0 {
2022-12-22 22:06:23 +01:00
qapi := idb.QueryAPI(cfg.Influx.Org)
result, err := qapi.Query(context.Background(), sparkFluxQuery(shrs))
2022-12-22 22:06:23 +01:00
if err != nil {
return nil, err
}
for result.Next() {
combinedRate := int64(0)
readRate := result.Record().ValueByKey("bytesRead")
if readRate != nil {
combinedRate += readRate.(int64)
}
writeRate := result.Record().ValueByKey("bytesWritten")
if writeRate != nil {
combinedRate += writeRate.(int64)
}
shrToken := result.Record().ValueByKey("share").(string)
shrMetrics := out[shrToken]
shrMetrics = append(shrMetrics, combinedRate)
out[shrToken] = shrMetrics
2022-12-22 22:06:23 +01:00
}
}
return out, nil
}
func sparkFluxQuery(shrs []*store.Share) string {
2023-01-04 20:21:23 +01:00
shrFilter := "|> filter(fn: (r) =>"
for i, shr := range shrs {
2022-12-22 22:06:23 +01:00
if i > 0 {
2023-01-04 20:21:23 +01:00
shrFilter += " or"
2022-12-22 22:06:23 +01:00
}
2023-01-04 20:21:23 +01:00
shrFilter += fmt.Sprintf(" r[\"share\"] == \"%v\"", shr.Token)
2022-12-22 22:06:23 +01:00
}
2023-01-04 20:21:23 +01:00
shrFilter += ")"
2022-12-22 22:06:23 +01:00
query := "read = from(bucket: \"zrok\")" +
"|> range(start: -5m)" +
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")" +
"|> filter(fn: (r) => r[\"_field\"] == \"bytesRead\" or r[\"_field\"] == \"bytesWritten\")" +
"|> filter(fn: (r) => r[\"namespace\"] == \"frontend\")" +
2023-01-04 20:21:23 +01:00
shrFilter +
2022-12-22 22:06:23 +01:00
"|> aggregateWindow(every: 5s, fn: sum, createEmpty: true)\n" +
"|> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" +
"|> yield(name: \"last\")"
return query
}