From 022084ec880d5dd64263b618bdd5eab1a48990f7 Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Fri, 24 Feb 2023 11:01:17 -0500 Subject: [PATCH] basic 'fabric.usage' parsing (#128) --- controller/usageAgent.go | 121 +++++++++++++++++++++++++++++++++++---- 1 file changed, 110 insertions(+), 11 deletions(-) diff --git a/controller/usageAgent.go b/controller/usageAgent.go index d469313d..8847d9f5 100644 --- a/controller/usageAgent.go +++ b/controller/usageAgent.go @@ -1,10 +1,10 @@ package controller import ( + "bytes" "crypto/tls" "crypto/x509" "encoding/json" - "fmt" "github.com/gorilla/websocket" "github.com/openziti/channel/v2" "github.com/openziti/channel/v2/websockets" @@ -13,6 +13,7 @@ import ( "github.com/openziti/fabric/pb/mgmt_pb" "github.com/openziti/identity" "github.com/openziti/sdk-golang/ziti/constants" + "github.com/openziti/zrok/util" "github.com/pkg/errors" "github.com/sirupsen/logrus" "io" @@ -21,10 +22,14 @@ import ( "time" ) -type usageAgent struct{} - func RunUsageAgent(cfg *Config) error { - wsUrl := "wss://127.0.0.1:1280/fabric/v1/ws-api" + zitiApiEndpointUrl, err := url.Parse(cfg.Ziti.ApiEndpoint) + if err != nil { + return err + } + + wsUrl := "wss://" + zitiApiEndpointUrl.Host + "/fabric/v1/ws-api" + logrus.Infof("wsUrl: %v", wsUrl) caCerts, err := rest_util.GetControllerWellKnownCas(cfg.Ziti.ApiEndpoint) if err != nil { @@ -37,11 +42,8 @@ func RunUsageAgent(cfg *Config) error { authenticator := rest_util.NewAuthenticatorUpdb(cfg.Ziti.Username, cfg.Ziti.Password) authenticator.RootCas = caPool - ctrlUrl, err := url.Parse(cfg.Ziti.ApiEndpoint) - if err != nil { - return err - } - apiSession, err := authenticator.Authenticate(ctrlUrl) + + apiSession, err := authenticator.Authenticate(zitiApiEndpointUrl) if err != nil { return err } @@ -85,7 +87,7 @@ func RunUsageAgent(cfg *Config) error { streamEventsRequest := map[string]interface{}{} streamEventsRequest["format"] = "json" streamEventsRequest["subscriptions"] = []*event.Subscription{ - &event.Subscription{ + { Type: "fabric.usage", Options: map[string]interface{}{ "version": uint8(3), @@ -119,6 +121,103 @@ func RunUsageAgent(cfg *Config) error { return nil } +type usageAgent struct{} + func (ua *usageAgent) HandleReceive(msg *channel.Message, _ channel.Channel) { - fmt.Println(string(msg.Body)) + //logrus.Infof(string(msg.Body)) + decoder := json.NewDecoder(bytes.NewReader(msg.Body)) + for { + event := make(map[string]interface{}) + err := decoder.Decode(&event) + if err == io.EOF { + break + } + if err == nil { + if ns, found := event["namespace"]; found && ns == "fabric.usage" { + start := float64(0) + if v, found := event["interval_start_utc"]; found { + if vFloat64, ok := v.(float64); ok { + start = vFloat64 + } else { + logrus.Error("unable to assert 'interval_start_utc'") + } + } else { + logrus.Error("missing 'interval_start_utc'") + } + clientId := "" + serviceId := "" + if v, found := event["tags"]; found { + if tags, ok := v.(map[string]interface{}); ok { + if v, found := tags["clientId"]; found { + if vStr, ok := v.(string); ok { + clientId = vStr + } else { + logrus.Error("unable to assert 'tags/clientId'") + } + } else { + logrus.Errorf("missing 'tags/clientId'") + } + if v, found := tags["serviceId"]; found { + if vStr, ok := v.(string); ok { + serviceId = vStr + } else { + logrus.Error("unable to assert 'tags/serviceId'") + } + } else { + logrus.Error("missing 'tags/serviceId'") + } + } else { + logrus.Errorf("unable to assert 'tags'") + } + } else { + logrus.Errorf("missing 'tags'") + } + tx := int64(0) + rx := int64(0) + if v, found := event["usage"]; found { + if usage, ok := v.(map[string]interface{}); ok { + if v, found := usage["egress.tx"]; found { + if vFloat64, ok := v.(float64); ok { + tx = int64(vFloat64) + } else { + logrus.Error("unable to assert 'usage/egress.tx'") + } + } else { + logrus.Error("missing 'usage/egress.tx'") + } + if v, found := usage["egress.rx"]; found { + if vFloat64, ok := v.(float64); ok { + rx = int64(vFloat64) + } else { + logrus.Error("unable to assert 'usage/egress.rx'") + } + } else { + logrus.Error("missing 'usage/egress.rx'") + } + } else { + logrus.Error("unabel to assert 'usage'") + } + } else { + logrus.Error("missing 'usage'") + } + circuitId := "" + if v, found := event["circuit_id"]; found { + if vStr, ok := v.(string); ok { + circuitId = vStr + } else { + logrus.Error("unable to assert 'circuit_id'") + } + } else { + logrus.Error("missing 'circuit_id'") + } + + logrus.Infof("usage: start '%d', serviceId '%v', clientId '%v', circuitId '%v' [rx: %v, tx: %v]", int64(start), serviceId, clientId, circuitId, util.BytesToSize(rx), util.BytesToSize(tx)) + + } else { + logrus.Errorf("not 'fabric.usage'") + } + } else { + logrus.Errorf("error parsing '%v': %v", string(msg.Body), err) + } + } }