mirror of
https://github.com/openziti/zrok.git
synced 2025-08-17 19:31:12 +02:00
new sparkline implementation (#325)
This commit is contained in:
@ -6,37 +6,43 @@ import (
|
||||
"github.com/openziti/zrok/controller/store"
|
||||
)
|
||||
|
||||
func sparkDataForShares(shrs []*store.Share) (map[string][]int64, error) {
|
||||
out := make(map[string][]int64)
|
||||
|
||||
func sparkDataForShares(shrs []*store.Share) (rx, tx map[string][]int64, err error) {
|
||||
rx = make(map[string][]int64)
|
||||
tx = make(map[string][]int64)
|
||||
if len(shrs) > 0 {
|
||||
qapi := idb.QueryAPI(cfg.Metrics.Influx.Org)
|
||||
|
||||
result, err := qapi.Query(context.Background(), sparkFluxQuery(shrs))
|
||||
query := sparkFluxQuery(shrs, cfg.Metrics.Influx.Bucket)
|
||||
result, err := qapi.Query(context.Background(), query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
for result.Next() {
|
||||
combinedRate := int64(0)
|
||||
readRate := result.Record().ValueByKey("tx")
|
||||
if readRate != nil {
|
||||
combinedRate += readRate.(int64)
|
||||
}
|
||||
writeRate := result.Record().ValueByKey("tx")
|
||||
if writeRate != nil {
|
||||
combinedRate += writeRate.(int64)
|
||||
}
|
||||
shrToken := result.Record().ValueByKey("share").(string)
|
||||
shrMetrics := out[shrToken]
|
||||
shrMetrics = append(shrMetrics, combinedRate)
|
||||
out[shrToken] = shrMetrics
|
||||
switch result.Record().Field() {
|
||||
case "rx":
|
||||
rxV := int64(0)
|
||||
if v, ok := result.Record().Value().(int64); ok {
|
||||
rxV = v
|
||||
}
|
||||
rxData := append(rx[shrToken], rxV)
|
||||
rx[shrToken] = rxData
|
||||
|
||||
case "tx":
|
||||
txV := int64(0)
|
||||
if v, ok := result.Record().Value().(int64); ok {
|
||||
txV = v
|
||||
}
|
||||
txData := append(tx[shrToken], txV)
|
||||
tx[shrToken] = txData
|
||||
}
|
||||
}
|
||||
}
|
||||
return out, nil
|
||||
return rx, tx, nil
|
||||
}
|
||||
|
||||
func sparkFluxQuery(shrs []*store.Share) string {
|
||||
func sparkFluxQuery(shrs []*store.Share, bucket string) string {
|
||||
shrFilter := "|> filter(fn: (r) =>"
|
||||
for i, shr := range shrs {
|
||||
if i > 0 {
|
||||
@ -45,14 +51,12 @@ func sparkFluxQuery(shrs []*store.Share) string {
|
||||
shrFilter += fmt.Sprintf(" r[\"share\"] == \"%v\"", shr.Token)
|
||||
}
|
||||
shrFilter += ")"
|
||||
query := "read = from(bucket: \"zrok\")" +
|
||||
"|> range(start: -5m)" +
|
||||
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")" +
|
||||
"|> filter(fn: (r) => r[\"_field\"] == \"rx\" or r[\"_field\"] == \"tx\")" +
|
||||
query := fmt.Sprintf("from(bucket: \"%v\")\n", bucket) +
|
||||
"|> range(start: -5m)\n" +
|
||||
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
|
||||
"|> filter(fn: (r) => r[\"_field\"] == \"rx\" or r[\"_field\"] == \"tx\")\n" +
|
||||
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")" +
|
||||
shrFilter +
|
||||
"|> aggregateWindow(every: 5s, fn: sum, createEmpty: true)\n" +
|
||||
"|> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" +
|
||||
"|> yield(name: \"last\")"
|
||||
"|> aggregateWindow(every: 10s, fn: sum, createEmpty: true)\n"
|
||||
return query
|
||||
}
|
||||
|
Reference in New Issue
Block a user