diff --git a/controller/metrics/agent.go b/controller/metrics/agent.go index 047ffdc9..f793b43a 100644 --- a/controller/metrics/agent.go +++ b/controller/metrics/agent.go @@ -25,13 +25,10 @@ func Run(cfg *Config) error { } go func() { - ingester := &UsageIngester{} for { select { case event := <-events: - if err := ingester.Ingest(event); err != nil { - logrus.Error(err) - } + logrus.Info(Ingest(event)) } } }() diff --git a/controller/metrics/model.go b/controller/metrics/model.go index 223f3a83..38830aa7 100644 --- a/controller/metrics/model.go +++ b/controller/metrics/model.go @@ -1,5 +1,34 @@ package metrics +import ( + "fmt" + "github.com/openziti/zrok/util" + "time" +) + +type Usage struct { + ProcessedStamp time.Time + IntervalStart time.Time + ZitiServiceId string + ZitiCircuitId string + FrontendTx int64 + FrontendRx int64 + BackendTx int64 + BackendRx int64 +} + +func (u Usage) String() string { + out := "Usage {" + out += fmt.Sprintf("processed '%v'", u.ProcessedStamp) + out += ", " + fmt.Sprintf("interval '%v'", u.IntervalStart) + out += ", " + fmt.Sprintf("service '%v'", u.ZitiServiceId) + out += ", " + fmt.Sprintf("circuit '%v'", u.ZitiCircuitId) + out += ", " + fmt.Sprintf("fe {rx %v, tx %v}", util.BytesToSize(u.FrontendRx), util.BytesToSize(u.FrontendTx)) + out += ", " + fmt.Sprintf("be {rx %v, tx %v}", util.BytesToSize(u.BackendRx), util.BytesToSize(u.BackendTx)) + out += "}" + return out +} + type Source interface { Start(chan map[string]interface{}) (chan struct{}, error) Stop() diff --git a/controller/metrics/usageIngester.go b/controller/metrics/usageIngester.go index 90293eea..80db3412 100644 --- a/controller/metrics/usageIngester.go +++ b/controller/metrics/usageIngester.go @@ -1,41 +1,28 @@ package metrics import ( - "github.com/openziti/zrok/util" "github.com/sirupsen/logrus" "reflect" + "time" ) -type UsageIngester struct{} - -func (i *UsageIngester) Ingest(event map[string]interface{}) error { +func Ingest(event map[string]interface{}) *Usage { + u := &Usage{ProcessedStamp: time.Now()} if ns, found := event["namespace"]; found && ns == "fabric.usage" { - start := float64(0) if v, found := event["interval_start_utc"]; found { if vFloat64, ok := v.(float64); ok { - start = vFloat64 + u.IntervalStart = time.Unix(int64(vFloat64), 0) } else { logrus.Error("unable to assert 'interval_start_utc'") } } else { logrus.Error("missing 'interval_start_utc'") } - clientId := "" - serviceId := "" if v, found := event["tags"]; found { if tags, ok := v.(map[string]interface{}); ok { - if v, found := tags["clientId"]; found { - if vStr, ok := v.(string); ok { - clientId = vStr - } else { - logrus.Error("unable to assert 'tags/clientId'") - } - } else { - logrus.Errorf("missing 'tags/clientId'") - } if v, found := tags["serviceId"]; found { if vStr, ok := v.(string); ok { - serviceId = vStr + u.ZitiServiceId = vStr } else { logrus.Error("unable to assert 'tags/serviceId'") } @@ -48,49 +35,61 @@ func (i *UsageIngester) Ingest(event map[string]interface{}) error { } else { logrus.Errorf("missing 'tags'") } - tx := int64(0) - rx := int64(0) if v, found := event["usage"]; found { if usage, ok := v.(map[string]interface{}); ok { + if v, found := usage["ingress.tx"]; found { + if vFloat64, ok := v.(float64); ok { + u.FrontendTx = int64(vFloat64) + } else { + logrus.Error("unable to assert 'usage/ingress.tx'") + } + } else { + logrus.Warn("missing 'usage/ingress.tx'") + } + if v, found := usage["ingress.rx"]; found { + if vFloat64, ok := v.(float64); ok { + u.FrontendRx = int64(vFloat64) + } else { + logrus.Error("unable to assert 'usage/ingress.rx") + } + } else { + logrus.Warn("missing 'usage/ingress.rx") + } if v, found := usage["egress.tx"]; found { if vFloat64, ok := v.(float64); ok { - tx = int64(vFloat64) + u.BackendTx = int64(vFloat64) } else { logrus.Error("unable to assert 'usage/egress.tx'") } } else { - logrus.Error("missing 'usage/egress.tx'") + logrus.Warn("missing 'usage/egress.tx'") } if v, found := usage["egress.rx"]; found { if vFloat64, ok := v.(float64); ok { - rx = int64(vFloat64) + u.BackendRx = int64(vFloat64) } else { logrus.Error("unable to assert 'usage/egress.rx'") } } else { - logrus.Error("missing 'usage/egress.rx'") + logrus.Warn("missing 'usage/egress.rx'") } } else { logrus.Errorf("unable to assert 'usage' (%v) %v", reflect.TypeOf(v), event) } } else { - logrus.Error("missing 'usage'") + logrus.Warnf("missing 'usage'") } - circuitId := "" if v, found := event["circuit_id"]; found { if vStr, ok := v.(string); ok { - circuitId = vStr + u.ZitiCircuitId = vStr } else { logrus.Error("unable to assert 'circuit_id'") } } else { - logrus.Error("missing 'circuit_id'") + logrus.Warn("missing 'circuit_id'") } - - logrus.Infof("usage: start '%d', serviceId '%v', clientId '%v', circuitId '%v' [rx: %v, tx: %v]", int64(start), serviceId, clientId, circuitId, util.BytesToSize(rx), util.BytesToSize(tx)) - } else { logrus.Errorf("not 'fabric.usage'") } - return nil + return u } diff --git a/controller/usageAgent.go b/controller/usageAgent.go index 1e2954b8..4f463a1d 100644 --- a/controller/usageAgent.go +++ b/controller/usageAgent.go @@ -132,10 +132,7 @@ func (ua *usageAgent) HandleReceive(msg *channel.Message, _ channel.Channel) { break } if err == nil { - ui := &metrics.UsageIngester{} - if err := ui.Ingest(event); err != nil { - logrus.Errorf("error ingesting '%v': %v", string(msg.Body), err) - } + logrus.Info(metrics.Ingest(event)) } else { logrus.Errorf("error parsing '%v': %v", string(msg.Body), err) }