diff --git a/controller/metrics2/amqpSource.go b/controller/metrics2/amqpSource.go index 772d1685..5e07f5d4 100644 --- a/controller/metrics2/amqpSource.go +++ b/controller/metrics2/amqpSource.go @@ -2,11 +2,16 @@ package metrics2 import ( "github.com/michaelquigley/cf" + "github.com/openziti/zrok/controller/env" "github.com/pkg/errors" amqp "github.com/rabbitmq/amqp091-go" "github.com/sirupsen/logrus" ) +func init() { + env.GetCfOptions().AddFlexibleSetter("amqpSource", loadAmqpSourceConfig) +} + type AmqpSourceConfig struct { Url string `cf:"+secret"` QueueName string diff --git a/controller/metrics2/model.go b/controller/metrics2/model.go index a53cb37f..e533c9ea 100644 --- a/controller/metrics2/model.go +++ b/controller/metrics2/model.go @@ -1,5 +1,34 @@ package metrics2 +type Usage struct { + ProcessedStamp time.Time + IntervalStart time.Time + ZitiServiceId string + ZitiCircuitId string + ShareToken string + EnvironmentId int64 + AccountId int64 + FrontendTx int64 + FrontendRx int64 + BackendTx int64 + BackendRx int64 +} + +func (u Usage) String() string { + out := "Usage {" + out += fmt.Sprintf("processed '%v'", u.ProcessedStamp) + out += ", " + fmt.Sprintf("interval '%v'", u.IntervalStart) + out += ", " + fmt.Sprintf("service '%v'", u.ZitiServiceId) + out += ", " + fmt.Sprintf("circuit '%v'", u.ZitiCircuitId) + out += ", " + fmt.Sprintf("share '%v'", u.ShareToken) + out += ", " + fmt.Sprintf("environment '%d'", u.EnvironmentId) + out += ", " + fmt.Sprintf("account '%v'", u.AccountId) + out += ", " + fmt.Sprintf("fe {rx %v, tx %v}", util.BytesToSize(u.FrontendRx), util.BytesToSize(u.FrontendTx)) + out += ", " + fmt.Sprintf("be {rx %v, tx %v}", util.BytesToSize(u.BackendRx), util.BytesToSize(u.BackendTx)) + out += "}" + return out +} + type ZitiEventJson string type ZitiEventJsonSource interface { diff --git a/controller/metrics2/usageIngest.go b/controller/metrics2/usageIngest.go new file mode 100644 index 00000000..99f50122 --- /dev/null +++ b/controller/metrics2/usageIngest.go @@ -0,0 +1,102 @@ +package metrics2 + +import ( + "encoding/json" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "reflect" + "time" +) + +func Ingest(event ZitiEventJson) (*Usage, error) { + eventMap := make(map[string]interface{}) + if err := json.Unmarshal([]byte(event), &eventMap); err == nil { + u := &Usage{ProcessedStamp: time.Now()} + if ns, found := eventMap["namespace"]; found && ns == "fabric.usage" { + if v, found := eventMap["interval_start_utc"]; found { + if vFloat64, ok := v.(float64); ok { + u.IntervalStart = time.Unix(int64(vFloat64), 0) + } else { + logrus.Error("unable to assert 'interval_start_utc'") + } + } else { + logrus.Error("missing 'interval_start_utc'") + } + if v, found := eventMap["tags"]; found { + if tags, ok := v.(map[string]interface{}); ok { + if v, found := tags["serviceId"]; found { + if vStr, ok := v.(string); ok { + u.ZitiServiceId = vStr + } else { + logrus.Error("unable to assert 'tags/serviceId'") + } + } else { + logrus.Error("missing 'tags/serviceId'") + } + } else { + logrus.Errorf("unable to assert 'tags'") + } + } else { + logrus.Errorf("missing 'tags'") + } + if v, found := eventMap["usage"]; found { + if usage, ok := v.(map[string]interface{}); ok { + if v, found := usage["ingress.tx"]; found { + if vFloat64, ok := v.(float64); ok { + u.FrontendTx = int64(vFloat64) + } else { + logrus.Error("unable to assert 'usage/ingress.tx'") + } + } else { + logrus.Warn("missing 'usage/ingress.tx'") + } + if v, found := usage["ingress.rx"]; found { + if vFloat64, ok := v.(float64); ok { + u.FrontendRx = int64(vFloat64) + } else { + logrus.Error("unable to assert 'usage/ingress.rx") + } + } else { + logrus.Warn("missing 'usage/ingress.rx") + } + if v, found := usage["egress.tx"]; found { + if vFloat64, ok := v.(float64); ok { + u.BackendTx = int64(vFloat64) + } else { + logrus.Error("unable to assert 'usage/egress.tx'") + } + } else { + logrus.Warn("missing 'usage/egress.tx'") + } + if v, found := usage["egress.rx"]; found { + if vFloat64, ok := v.(float64); ok { + u.BackendRx = int64(vFloat64) + } else { + logrus.Error("unable to assert 'usage/egress.rx'") + } + } else { + logrus.Warn("missing 'usage/egress.rx'") + } + } else { + logrus.Errorf("unable to assert 'usage' (%v) %v", reflect.TypeOf(v), event) + } + } else { + logrus.Warnf("missing 'usage'") + } + if v, found := eventMap["circuit_id"]; found { + if vStr, ok := v.(string); ok { + u.ZitiCircuitId = vStr + } else { + logrus.Error("unable to assert 'circuit_id'") + } + } else { + logrus.Warn("missing 'circuit_id'") + } + } else { + logrus.Errorf("not 'fabric.usage'") + } + return u, nil + } else { + return nil, errors.Wrap(err, "error unmarshaling") + } +}