mirror of
https://github.com/openziti/zrok.git
synced 2025-01-22 22:09:03 +01:00
usage ingester skeleton (#128)
This commit is contained in:
parent
e6dc836cc6
commit
55523ae7ed
@ -6,5 +6,5 @@ type Source interface {
|
||||
}
|
||||
|
||||
type Ingester interface {
|
||||
Ingest(msg map[string]interface{})
|
||||
Ingest(msg map[string]interface{}) error
|
||||
}
|
||||
|
95
controller/metrics/usageIngester.go
Normal file
95
controller/metrics/usageIngester.go
Normal file
@ -0,0 +1,95 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"github.com/openziti/zrok/util"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type UsageIngester struct{}
|
||||
|
||||
func (i *UsageIngester) Ingest(event map[string]interface{}) error {
|
||||
if ns, found := event["namespace"]; found && ns == "fabric.usage" {
|
||||
start := float64(0)
|
||||
if v, found := event["interval_start_utc"]; found {
|
||||
if vFloat64, ok := v.(float64); ok {
|
||||
start = vFloat64
|
||||
} else {
|
||||
logrus.Error("unable to assert 'interval_start_utc'")
|
||||
}
|
||||
} else {
|
||||
logrus.Error("missing 'interval_start_utc'")
|
||||
}
|
||||
clientId := ""
|
||||
serviceId := ""
|
||||
if v, found := event["tags"]; found {
|
||||
if tags, ok := v.(map[string]interface{}); ok {
|
||||
if v, found := tags["clientId"]; found {
|
||||
if vStr, ok := v.(string); ok {
|
||||
clientId = vStr
|
||||
} else {
|
||||
logrus.Error("unable to assert 'tags/clientId'")
|
||||
}
|
||||
} else {
|
||||
logrus.Errorf("missing 'tags/clientId'")
|
||||
}
|
||||
if v, found := tags["serviceId"]; found {
|
||||
if vStr, ok := v.(string); ok {
|
||||
serviceId = vStr
|
||||
} else {
|
||||
logrus.Error("unable to assert 'tags/serviceId'")
|
||||
}
|
||||
} else {
|
||||
logrus.Error("missing 'tags/serviceId'")
|
||||
}
|
||||
} else {
|
||||
logrus.Errorf("unable to assert 'tags'")
|
||||
}
|
||||
} else {
|
||||
logrus.Errorf("missing 'tags'")
|
||||
}
|
||||
tx := int64(0)
|
||||
rx := int64(0)
|
||||
if v, found := event["usage"]; found {
|
||||
if usage, ok := v.(map[string]interface{}); ok {
|
||||
if v, found := usage["egress.tx"]; found {
|
||||
if vFloat64, ok := v.(float64); ok {
|
||||
tx = int64(vFloat64)
|
||||
} else {
|
||||
logrus.Error("unable to assert 'usage/egress.tx'")
|
||||
}
|
||||
} else {
|
||||
logrus.Error("missing 'usage/egress.tx'")
|
||||
}
|
||||
if v, found := usage["egress.rx"]; found {
|
||||
if vFloat64, ok := v.(float64); ok {
|
||||
rx = int64(vFloat64)
|
||||
} else {
|
||||
logrus.Error("unable to assert 'usage/egress.rx'")
|
||||
}
|
||||
} else {
|
||||
logrus.Error("missing 'usage/egress.rx'")
|
||||
}
|
||||
} else {
|
||||
logrus.Error("unabel to assert 'usage'")
|
||||
}
|
||||
} else {
|
||||
logrus.Error("missing 'usage'")
|
||||
}
|
||||
circuitId := ""
|
||||
if v, found := event["circuit_id"]; found {
|
||||
if vStr, ok := v.(string); ok {
|
||||
circuitId = vStr
|
||||
} else {
|
||||
logrus.Error("unable to assert 'circuit_id'")
|
||||
}
|
||||
} else {
|
||||
logrus.Error("missing 'circuit_id'")
|
||||
}
|
||||
|
||||
logrus.Infof("usage: start '%d', serviceId '%v', clientId '%v', circuitId '%v' [rx: %v, tx: %v]", int64(start), serviceId, clientId, circuitId, util.BytesToSize(rx), util.BytesToSize(tx))
|
||||
|
||||
} else {
|
||||
logrus.Errorf("not 'fabric.usage'")
|
||||
}
|
||||
return nil
|
||||
}
|
@ -13,7 +13,7 @@ import (
|
||||
"github.com/openziti/fabric/pb/mgmt_pb"
|
||||
"github.com/openziti/identity"
|
||||
"github.com/openziti/sdk-golang/ziti/constants"
|
||||
"github.com/openziti/zrok/util"
|
||||
"github.com/openziti/zrok/controller/metrics"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"io"
|
||||
@ -124,7 +124,6 @@ func RunUsageAgent(cfg *Config) error {
|
||||
type usageAgent struct{}
|
||||
|
||||
func (ua *usageAgent) HandleReceive(msg *channel.Message, _ channel.Channel) {
|
||||
//logrus.Infof(string(msg.Body))
|
||||
decoder := json.NewDecoder(bytes.NewReader(msg.Body))
|
||||
for {
|
||||
event := make(map[string]interface{})
|
||||
@ -133,88 +132,9 @@ func (ua *usageAgent) HandleReceive(msg *channel.Message, _ channel.Channel) {
|
||||
break
|
||||
}
|
||||
if err == nil {
|
||||
if ns, found := event["namespace"]; found && ns == "fabric.usage" {
|
||||
start := float64(0)
|
||||
if v, found := event["interval_start_utc"]; found {
|
||||
if vFloat64, ok := v.(float64); ok {
|
||||
start = vFloat64
|
||||
} else {
|
||||
logrus.Error("unable to assert 'interval_start_utc'")
|
||||
}
|
||||
} else {
|
||||
logrus.Error("missing 'interval_start_utc'")
|
||||
}
|
||||
clientId := ""
|
||||
serviceId := ""
|
||||
if v, found := event["tags"]; found {
|
||||
if tags, ok := v.(map[string]interface{}); ok {
|
||||
if v, found := tags["clientId"]; found {
|
||||
if vStr, ok := v.(string); ok {
|
||||
clientId = vStr
|
||||
} else {
|
||||
logrus.Error("unable to assert 'tags/clientId'")
|
||||
}
|
||||
} else {
|
||||
logrus.Errorf("missing 'tags/clientId'")
|
||||
}
|
||||
if v, found := tags["serviceId"]; found {
|
||||
if vStr, ok := v.(string); ok {
|
||||
serviceId = vStr
|
||||
} else {
|
||||
logrus.Error("unable to assert 'tags/serviceId'")
|
||||
}
|
||||
} else {
|
||||
logrus.Error("missing 'tags/serviceId'")
|
||||
}
|
||||
} else {
|
||||
logrus.Errorf("unable to assert 'tags'")
|
||||
}
|
||||
} else {
|
||||
logrus.Errorf("missing 'tags'")
|
||||
}
|
||||
tx := int64(0)
|
||||
rx := int64(0)
|
||||
if v, found := event["usage"]; found {
|
||||
if usage, ok := v.(map[string]interface{}); ok {
|
||||
if v, found := usage["egress.tx"]; found {
|
||||
if vFloat64, ok := v.(float64); ok {
|
||||
tx = int64(vFloat64)
|
||||
} else {
|
||||
logrus.Error("unable to assert 'usage/egress.tx'")
|
||||
}
|
||||
} else {
|
||||
logrus.Error("missing 'usage/egress.tx'")
|
||||
}
|
||||
if v, found := usage["egress.rx"]; found {
|
||||
if vFloat64, ok := v.(float64); ok {
|
||||
rx = int64(vFloat64)
|
||||
} else {
|
||||
logrus.Error("unable to assert 'usage/egress.rx'")
|
||||
}
|
||||
} else {
|
||||
logrus.Error("missing 'usage/egress.rx'")
|
||||
}
|
||||
} else {
|
||||
logrus.Error("unabel to assert 'usage'")
|
||||
}
|
||||
} else {
|
||||
logrus.Error("missing 'usage'")
|
||||
}
|
||||
circuitId := ""
|
||||
if v, found := event["circuit_id"]; found {
|
||||
if vStr, ok := v.(string); ok {
|
||||
circuitId = vStr
|
||||
} else {
|
||||
logrus.Error("unable to assert 'circuit_id'")
|
||||
}
|
||||
} else {
|
||||
logrus.Error("missing 'circuit_id'")
|
||||
}
|
||||
|
||||
logrus.Infof("usage: start '%d', serviceId '%v', clientId '%v', circuitId '%v' [rx: %v, tx: %v]", int64(start), serviceId, clientId, circuitId, util.BytesToSize(rx), util.BytesToSize(tx))
|
||||
|
||||
} else {
|
||||
logrus.Errorf("not 'fabric.usage'")
|
||||
ui := &metrics.UsageIngester{}
|
||||
if err := ui.Ingest(event); err != nil {
|
||||
logrus.Errorf("error ingesting '%v': %v", string(msg.Body), err)
|
||||
}
|
||||
} else {
|
||||
logrus.Errorf("error parsing '%v': %v", string(msg.Body), err)
|
||||
|
Loading…
Reference in New Issue
Block a user