better flux query for overview metrics (#74, #80)

This commit is contained in:
Michael Quigley 2022-10-19 17:37:29 -04:00
parent e5e683d694
commit 005db24691
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62

View File

@ -73,15 +73,15 @@ func sparkDataForServices(svcs []*store.Service) (map[string][]int64, error) {
for result.Next() { for result.Next() {
combinedRate := int64(0) combinedRate := int64(0)
readRate := result.Record().ValueByKey("_value_t1") readRate := result.Record().ValueByKey("bytesRead")
if readRate != nil { if readRate != nil {
combinedRate += int64(readRate.(float64)) combinedRate += readRate.(int64)
} }
writeRate := result.Record().ValueByKey("_value_t2") writeRate := result.Record().ValueByKey("bytesWritten")
if writeRate != nil { if writeRate != nil {
combinedRate += int64(writeRate.(float64)) combinedRate += writeRate.(int64)
} }
svcName := result.Record().ValueByKey("service_t1").(string) svcName := result.Record().ValueByKey("service").(string)
svcMetrics := out[svcName] svcMetrics := out[svcName]
svcMetrics = append(svcMetrics, combinedRate) svcMetrics = append(svcMetrics, combinedRate)
out[svcName] = svcMetrics out[svcName] = svcMetrics
@ -102,17 +102,11 @@ func sparkFluxQuery(svcs []*store.Service) string {
query := "read = from(bucket: \"zrok\")" + query := "read = from(bucket: \"zrok\")" +
"|> range(start: -5m)" + "|> range(start: -5m)" +
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")" + "|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")" +
"|> filter(fn: (r) => r[\"_field\"] == \"bytesRead\")" + "|> filter(fn: (r) => r[\"_field\"] == \"bytesRead\" or r[\"_field\"] == \"bytesWritten\")" +
"|> filter(fn: (r) => r[\"namespace\"] == \"frontend\")" + "|> filter(fn: (r) => r[\"namespace\"] == \"frontend\")" +
svcFilter + svcFilter +
"|> aggregateWindow(every: 5s, fn: mean, createEmpty: true)\n\n" + "|> aggregateWindow(every: 5s, fn: sum, createEmpty: true)\n" +
"written = from(bucket: \"zrok\")" + "|> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" +
"|> range(start: -5m)" + "|> yield(name: \"last\")"
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")" +
"|> filter(fn: (r) => r[\"_field\"] == \"bytesWritten\")" +
"|> filter(fn: (r) => r[\"namespace\"] == \"frontend\")" +
svcFilter +
"|> aggregateWindow(every: 5s, fn: mean, createEmpty: true)\n\n" +
"join(tables: {t1: read, t2: written}, on: [\"_time\"])"
return query return query
} }