mirror of
https://github.com/openziti/zrok.git
synced 2024-11-23 00:23:48 +01:00
basic 'fabric.usage' parsing (#128)
This commit is contained in:
parent
ac4b0bf6c9
commit
6dfbaedcda
@ -1,10 +1,10 @@
|
|||||||
package controller
|
package controller
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
"github.com/openziti/channel/v2"
|
"github.com/openziti/channel/v2"
|
||||||
"github.com/openziti/channel/v2/websockets"
|
"github.com/openziti/channel/v2/websockets"
|
||||||
@ -13,6 +13,7 @@ import (
|
|||||||
"github.com/openziti/fabric/pb/mgmt_pb"
|
"github.com/openziti/fabric/pb/mgmt_pb"
|
||||||
"github.com/openziti/identity"
|
"github.com/openziti/identity"
|
||||||
"github.com/openziti/sdk-golang/ziti/constants"
|
"github.com/openziti/sdk-golang/ziti/constants"
|
||||||
|
"github.com/openziti/zrok/util"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"io"
|
"io"
|
||||||
@ -21,10 +22,14 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type usageAgent struct{}
|
|
||||||
|
|
||||||
func RunUsageAgent(cfg *Config) error {
|
func RunUsageAgent(cfg *Config) error {
|
||||||
wsUrl := "wss://127.0.0.1:1280/fabric/v1/ws-api"
|
zitiApiEndpointUrl, err := url.Parse(cfg.Ziti.ApiEndpoint)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
wsUrl := "wss://" + zitiApiEndpointUrl.Host + "/fabric/v1/ws-api"
|
||||||
|
logrus.Infof("wsUrl: %v", wsUrl)
|
||||||
|
|
||||||
caCerts, err := rest_util.GetControllerWellKnownCas(cfg.Ziti.ApiEndpoint)
|
caCerts, err := rest_util.GetControllerWellKnownCas(cfg.Ziti.ApiEndpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -37,11 +42,8 @@ func RunUsageAgent(cfg *Config) error {
|
|||||||
|
|
||||||
authenticator := rest_util.NewAuthenticatorUpdb(cfg.Ziti.Username, cfg.Ziti.Password)
|
authenticator := rest_util.NewAuthenticatorUpdb(cfg.Ziti.Username, cfg.Ziti.Password)
|
||||||
authenticator.RootCas = caPool
|
authenticator.RootCas = caPool
|
||||||
ctrlUrl, err := url.Parse(cfg.Ziti.ApiEndpoint)
|
|
||||||
if err != nil {
|
apiSession, err := authenticator.Authenticate(zitiApiEndpointUrl)
|
||||||
return err
|
|
||||||
}
|
|
||||||
apiSession, err := authenticator.Authenticate(ctrlUrl)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -85,7 +87,7 @@ func RunUsageAgent(cfg *Config) error {
|
|||||||
streamEventsRequest := map[string]interface{}{}
|
streamEventsRequest := map[string]interface{}{}
|
||||||
streamEventsRequest["format"] = "json"
|
streamEventsRequest["format"] = "json"
|
||||||
streamEventsRequest["subscriptions"] = []*event.Subscription{
|
streamEventsRequest["subscriptions"] = []*event.Subscription{
|
||||||
&event.Subscription{
|
{
|
||||||
Type: "fabric.usage",
|
Type: "fabric.usage",
|
||||||
Options: map[string]interface{}{
|
Options: map[string]interface{}{
|
||||||
"version": uint8(3),
|
"version": uint8(3),
|
||||||
@ -119,6 +121,103 @@ func RunUsageAgent(cfg *Config) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type usageAgent struct{}
|
||||||
|
|
||||||
func (ua *usageAgent) HandleReceive(msg *channel.Message, _ channel.Channel) {
|
func (ua *usageAgent) HandleReceive(msg *channel.Message, _ channel.Channel) {
|
||||||
fmt.Println(string(msg.Body))
|
//logrus.Infof(string(msg.Body))
|
||||||
|
decoder := json.NewDecoder(bytes.NewReader(msg.Body))
|
||||||
|
for {
|
||||||
|
event := make(map[string]interface{})
|
||||||
|
err := decoder.Decode(&event)
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
if ns, found := event["namespace"]; found && ns == "fabric.usage" {
|
||||||
|
start := float64(0)
|
||||||
|
if v, found := event["interval_start_utc"]; found {
|
||||||
|
if vFloat64, ok := v.(float64); ok {
|
||||||
|
start = vFloat64
|
||||||
|
} else {
|
||||||
|
logrus.Error("unable to assert 'interval_start_utc'")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logrus.Error("missing 'interval_start_utc'")
|
||||||
|
}
|
||||||
|
clientId := ""
|
||||||
|
serviceId := ""
|
||||||
|
if v, found := event["tags"]; found {
|
||||||
|
if tags, ok := v.(map[string]interface{}); ok {
|
||||||
|
if v, found := tags["clientId"]; found {
|
||||||
|
if vStr, ok := v.(string); ok {
|
||||||
|
clientId = vStr
|
||||||
|
} else {
|
||||||
|
logrus.Error("unable to assert 'tags/clientId'")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logrus.Errorf("missing 'tags/clientId'")
|
||||||
|
}
|
||||||
|
if v, found := tags["serviceId"]; found {
|
||||||
|
if vStr, ok := v.(string); ok {
|
||||||
|
serviceId = vStr
|
||||||
|
} else {
|
||||||
|
logrus.Error("unable to assert 'tags/serviceId'")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logrus.Error("missing 'tags/serviceId'")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logrus.Errorf("unable to assert 'tags'")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logrus.Errorf("missing 'tags'")
|
||||||
|
}
|
||||||
|
tx := int64(0)
|
||||||
|
rx := int64(0)
|
||||||
|
if v, found := event["usage"]; found {
|
||||||
|
if usage, ok := v.(map[string]interface{}); ok {
|
||||||
|
if v, found := usage["egress.tx"]; found {
|
||||||
|
if vFloat64, ok := v.(float64); ok {
|
||||||
|
tx = int64(vFloat64)
|
||||||
|
} else {
|
||||||
|
logrus.Error("unable to assert 'usage/egress.tx'")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logrus.Error("missing 'usage/egress.tx'")
|
||||||
|
}
|
||||||
|
if v, found := usage["egress.rx"]; found {
|
||||||
|
if vFloat64, ok := v.(float64); ok {
|
||||||
|
rx = int64(vFloat64)
|
||||||
|
} else {
|
||||||
|
logrus.Error("unable to assert 'usage/egress.rx'")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logrus.Error("missing 'usage/egress.rx'")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logrus.Error("unabel to assert 'usage'")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logrus.Error("missing 'usage'")
|
||||||
|
}
|
||||||
|
circuitId := ""
|
||||||
|
if v, found := event["circuit_id"]; found {
|
||||||
|
if vStr, ok := v.(string); ok {
|
||||||
|
circuitId = vStr
|
||||||
|
} else {
|
||||||
|
logrus.Error("unable to assert 'circuit_id'")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logrus.Error("missing 'circuit_id'")
|
||||||
|
}
|
||||||
|
|
||||||
|
logrus.Infof("usage: start '%d', serviceId '%v', clientId '%v', circuitId '%v' [rx: %v, tx: %v]", int64(start), serviceId, clientId, circuitId, util.BytesToSize(rx), util.BytesToSize(tx))
|
||||||
|
|
||||||
|
} else {
|
||||||
|
logrus.Errorf("not 'fabric.usage'")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logrus.Errorf("error parsing '%v': %v", string(msg.Body), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user