basic 'fabric.usage' parsing (#128)

This commit is contained in:
Michael Quigley 2023-02-24 11:01:17 -05:00
parent 217a0caec3
commit 022084ec88
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62

View File

@ -1,10 +1,10 @@
package controller package controller
import ( import (
"bytes"
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
"encoding/json" "encoding/json"
"fmt"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/openziti/channel/v2" "github.com/openziti/channel/v2"
"github.com/openziti/channel/v2/websockets" "github.com/openziti/channel/v2/websockets"
@ -13,6 +13,7 @@ import (
"github.com/openziti/fabric/pb/mgmt_pb" "github.com/openziti/fabric/pb/mgmt_pb"
"github.com/openziti/identity" "github.com/openziti/identity"
"github.com/openziti/sdk-golang/ziti/constants" "github.com/openziti/sdk-golang/ziti/constants"
"github.com/openziti/zrok/util"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"io" "io"
@ -21,10 +22,14 @@ import (
"time" "time"
) )
type usageAgent struct{}
func RunUsageAgent(cfg *Config) error { func RunUsageAgent(cfg *Config) error {
wsUrl := "wss://127.0.0.1:1280/fabric/v1/ws-api" zitiApiEndpointUrl, err := url.Parse(cfg.Ziti.ApiEndpoint)
if err != nil {
return err
}
wsUrl := "wss://" + zitiApiEndpointUrl.Host + "/fabric/v1/ws-api"
logrus.Infof("wsUrl: %v", wsUrl)
caCerts, err := rest_util.GetControllerWellKnownCas(cfg.Ziti.ApiEndpoint) caCerts, err := rest_util.GetControllerWellKnownCas(cfg.Ziti.ApiEndpoint)
if err != nil { if err != nil {
@ -37,11 +42,8 @@ func RunUsageAgent(cfg *Config) error {
authenticator := rest_util.NewAuthenticatorUpdb(cfg.Ziti.Username, cfg.Ziti.Password) authenticator := rest_util.NewAuthenticatorUpdb(cfg.Ziti.Username, cfg.Ziti.Password)
authenticator.RootCas = caPool authenticator.RootCas = caPool
ctrlUrl, err := url.Parse(cfg.Ziti.ApiEndpoint)
if err != nil { apiSession, err := authenticator.Authenticate(zitiApiEndpointUrl)
return err
}
apiSession, err := authenticator.Authenticate(ctrlUrl)
if err != nil { if err != nil {
return err return err
} }
@ -85,7 +87,7 @@ func RunUsageAgent(cfg *Config) error {
streamEventsRequest := map[string]interface{}{} streamEventsRequest := map[string]interface{}{}
streamEventsRequest["format"] = "json" streamEventsRequest["format"] = "json"
streamEventsRequest["subscriptions"] = []*event.Subscription{ streamEventsRequest["subscriptions"] = []*event.Subscription{
&event.Subscription{ {
Type: "fabric.usage", Type: "fabric.usage",
Options: map[string]interface{}{ Options: map[string]interface{}{
"version": uint8(3), "version": uint8(3),
@ -119,6 +121,103 @@ func RunUsageAgent(cfg *Config) error {
return nil return nil
} }
type usageAgent struct{}
func (ua *usageAgent) HandleReceive(msg *channel.Message, _ channel.Channel) { func (ua *usageAgent) HandleReceive(msg *channel.Message, _ channel.Channel) {
fmt.Println(string(msg.Body)) //logrus.Infof(string(msg.Body))
decoder := json.NewDecoder(bytes.NewReader(msg.Body))
for {
event := make(map[string]interface{})
err := decoder.Decode(&event)
if err == io.EOF {
break
}
if err == nil {
if ns, found := event["namespace"]; found && ns == "fabric.usage" {
start := float64(0)
if v, found := event["interval_start_utc"]; found {
if vFloat64, ok := v.(float64); ok {
start = vFloat64
} else {
logrus.Error("unable to assert 'interval_start_utc'")
}
} else {
logrus.Error("missing 'interval_start_utc'")
}
clientId := ""
serviceId := ""
if v, found := event["tags"]; found {
if tags, ok := v.(map[string]interface{}); ok {
if v, found := tags["clientId"]; found {
if vStr, ok := v.(string); ok {
clientId = vStr
} else {
logrus.Error("unable to assert 'tags/clientId'")
}
} else {
logrus.Errorf("missing 'tags/clientId'")
}
if v, found := tags["serviceId"]; found {
if vStr, ok := v.(string); ok {
serviceId = vStr
} else {
logrus.Error("unable to assert 'tags/serviceId'")
}
} else {
logrus.Error("missing 'tags/serviceId'")
}
} else {
logrus.Errorf("unable to assert 'tags'")
}
} else {
logrus.Errorf("missing 'tags'")
}
tx := int64(0)
rx := int64(0)
if v, found := event["usage"]; found {
if usage, ok := v.(map[string]interface{}); ok {
if v, found := usage["egress.tx"]; found {
if vFloat64, ok := v.(float64); ok {
tx = int64(vFloat64)
} else {
logrus.Error("unable to assert 'usage/egress.tx'")
}
} else {
logrus.Error("missing 'usage/egress.tx'")
}
if v, found := usage["egress.rx"]; found {
if vFloat64, ok := v.(float64); ok {
rx = int64(vFloat64)
} else {
logrus.Error("unable to assert 'usage/egress.rx'")
}
} else {
logrus.Error("missing 'usage/egress.rx'")
}
} else {
logrus.Error("unabel to assert 'usage'")
}
} else {
logrus.Error("missing 'usage'")
}
circuitId := ""
if v, found := event["circuit_id"]; found {
if vStr, ok := v.(string); ok {
circuitId = vStr
} else {
logrus.Error("unable to assert 'circuit_id'")
}
} else {
logrus.Error("missing 'circuit_id'")
}
logrus.Infof("usage: start '%d', serviceId '%v', clientId '%v', circuitId '%v' [rx: %v, tx: %v]", int64(start), serviceId, clientId, circuitId, util.BytesToSize(rx), util.BytesToSize(tx))
} else {
logrus.Errorf("not 'fabric.usage'")
}
} else {
logrus.Errorf("error parsing '%v': %v", string(msg.Body), err)
}
}
} }