diff --git a/controller/overview.go b/controller/overview.go index c4673b35..8206fbce 100644 --- a/controller/overview.go +++ b/controller/overview.go @@ -1,10 +1,7 @@ package controller import ( - "context" - "fmt" "github.com/go-openapi/runtime/middleware" - "github.com/openziti-test-kitchen/zrok/controller/store" "github.com/openziti-test-kitchen/zrok/rest_model_zrok" "github.com/openziti-test-kitchen/zrok/rest_server_zrok/operations/metadata" "github.com/sirupsen/logrus" @@ -40,15 +37,6 @@ func overviewHandler(_ metadata.OverviewParams, principal *rest_model_zrok.Princ }, } - var sparkData map[string][]int64 - if cfg.Influx != nil { - sparkData, err = sparkDataForServices(svcs) - if err != nil { - logrus.Errorf("error querying spark data for services: %v", err) - return metadata.NewOverviewInternalServerError() - } - } - for _, svc := range svcs { feEndpoint := "" if svc.FrontendEndpoint != nil { @@ -71,7 +59,6 @@ func overviewHandler(_ metadata.OverviewParams, principal *rest_model_zrok.Princ FrontendEndpoint: feEndpoint, BackendProxyEndpoint: beProxyEndpoint, Reserved: svc.Reserved, - Metrics: sparkData[svc.Token], CreatedAt: svc.CreatedAt.UnixMilli(), UpdatedAt: svc.UpdatedAt.UnixMilli(), }) @@ -80,54 +67,3 @@ func overviewHandler(_ metadata.OverviewParams, principal *rest_model_zrok.Princ } return metadata.NewOverviewOK().WithPayload(out) } - -func sparkDataForServices(svcs []*store.Service) (map[string][]int64, error) { - out := make(map[string][]int64) - - if len(svcs) > 0 { - qapi := idb.QueryAPI(cfg.Influx.Org) - - result, err := qapi.Query(context.Background(), sparkFluxQuery(svcs)) - if err != nil { - return nil, err - } - - for result.Next() { - combinedRate := int64(0) - readRate := result.Record().ValueByKey("bytesRead") - if readRate != nil { - combinedRate += readRate.(int64) - } - writeRate := result.Record().ValueByKey("bytesWritten") - if writeRate != nil { - combinedRate += writeRate.(int64) - } - svcToken := result.Record().ValueByKey("service").(string) - svcMetrics := out[svcToken] - svcMetrics = append(svcMetrics, combinedRate) - out[svcToken] = svcMetrics - } - } - return out, nil -} - -func sparkFluxQuery(svcs []*store.Service) string { - svcFilter := "|> filter(fn: (r) =>" - for i, svc := range svcs { - if i > 0 { - svcFilter += " or" - } - svcFilter += fmt.Sprintf(" r[\"service\"] == \"%v\"", svc.Token) - } - svcFilter += ")" - query := "read = from(bucket: \"zrok\")" + - "|> range(start: -5m)" + - "|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")" + - "|> filter(fn: (r) => r[\"_field\"] == \"bytesRead\" or r[\"_field\"] == \"bytesWritten\")" + - "|> filter(fn: (r) => r[\"namespace\"] == \"frontend\")" + - svcFilter + - "|> aggregateWindow(every: 5s, fn: sum, createEmpty: true)\n" + - "|> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" + - "|> yield(name: \"last\")" - return query -} diff --git a/controller/sparkData.go b/controller/sparkData.go new file mode 100644 index 00000000..0ce56d29 --- /dev/null +++ b/controller/sparkData.go @@ -0,0 +1,58 @@ +package controller + +import ( + "context" + "fmt" + "github.com/openziti-test-kitchen/zrok/controller/store" +) + +func sparkDataForServices(svcs []*store.Service) (map[string][]int64, error) { + out := make(map[string][]int64) + + if len(svcs) > 0 { + qapi := idb.QueryAPI(cfg.Influx.Org) + + result, err := qapi.Query(context.Background(), sparkFluxQuery(svcs)) + if err != nil { + return nil, err + } + + for result.Next() { + combinedRate := int64(0) + readRate := result.Record().ValueByKey("bytesRead") + if readRate != nil { + combinedRate += readRate.(int64) + } + writeRate := result.Record().ValueByKey("bytesWritten") + if writeRate != nil { + combinedRate += writeRate.(int64) + } + svcToken := result.Record().ValueByKey("service").(string) + svcMetrics := out[svcToken] + svcMetrics = append(svcMetrics, combinedRate) + out[svcToken] = svcMetrics + } + } + return out, nil +} + +func sparkFluxQuery(svcs []*store.Service) string { + svcFilter := "|> filter(fn: (r) =>" + for i, svc := range svcs { + if i > 0 { + svcFilter += " or" + } + svcFilter += fmt.Sprintf(" r[\"service\"] == \"%v\"", svc.Token) + } + svcFilter += ")" + query := "read = from(bucket: \"zrok\")" + + "|> range(start: -5m)" + + "|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")" + + "|> filter(fn: (r) => r[\"_field\"] == \"bytesRead\" or r[\"_field\"] == \"bytesWritten\")" + + "|> filter(fn: (r) => r[\"namespace\"] == \"frontend\")" + + svcFilter + + "|> aggregateWindow(every: 5s, fn: sum, createEmpty: true)\n" + + "|> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" + + "|> yield(name: \"last\")" + return query +}