mirror of
https://github.com/openziti/zrok.git
synced 2025-04-15 14:58:17 +02:00
usage model (#128)
This commit is contained in:
parent
f5846681e7
commit
1b1ecd91e1
@ -25,13 +25,10 @@ func Run(cfg *Config) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
ingester := &UsageIngester{}
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case event := <-events:
|
case event := <-events:
|
||||||
if err := ingester.Ingest(event); err != nil {
|
logrus.Info(Ingest(event))
|
||||||
logrus.Error(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -1,5 +1,34 @@
|
|||||||
package metrics
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/openziti/zrok/util"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Usage struct {
|
||||||
|
ProcessedStamp time.Time
|
||||||
|
IntervalStart time.Time
|
||||||
|
ZitiServiceId string
|
||||||
|
ZitiCircuitId string
|
||||||
|
FrontendTx int64
|
||||||
|
FrontendRx int64
|
||||||
|
BackendTx int64
|
||||||
|
BackendRx int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u Usage) String() string {
|
||||||
|
out := "Usage {"
|
||||||
|
out += fmt.Sprintf("processed '%v'", u.ProcessedStamp)
|
||||||
|
out += ", " + fmt.Sprintf("interval '%v'", u.IntervalStart)
|
||||||
|
out += ", " + fmt.Sprintf("service '%v'", u.ZitiServiceId)
|
||||||
|
out += ", " + fmt.Sprintf("circuit '%v'", u.ZitiCircuitId)
|
||||||
|
out += ", " + fmt.Sprintf("fe {rx %v, tx %v}", util.BytesToSize(u.FrontendRx), util.BytesToSize(u.FrontendTx))
|
||||||
|
out += ", " + fmt.Sprintf("be {rx %v, tx %v}", util.BytesToSize(u.BackendRx), util.BytesToSize(u.BackendTx))
|
||||||
|
out += "}"
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
type Source interface {
|
type Source interface {
|
||||||
Start(chan map[string]interface{}) (chan struct{}, error)
|
Start(chan map[string]interface{}) (chan struct{}, error)
|
||||||
Stop()
|
Stop()
|
||||||
|
@ -1,41 +1,28 @@
|
|||||||
package metrics
|
package metrics
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/openziti/zrok/util"
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type UsageIngester struct{}
|
func Ingest(event map[string]interface{}) *Usage {
|
||||||
|
u := &Usage{ProcessedStamp: time.Now()}
|
||||||
func (i *UsageIngester) Ingest(event map[string]interface{}) error {
|
|
||||||
if ns, found := event["namespace"]; found && ns == "fabric.usage" {
|
if ns, found := event["namespace"]; found && ns == "fabric.usage" {
|
||||||
start := float64(0)
|
|
||||||
if v, found := event["interval_start_utc"]; found {
|
if v, found := event["interval_start_utc"]; found {
|
||||||
if vFloat64, ok := v.(float64); ok {
|
if vFloat64, ok := v.(float64); ok {
|
||||||
start = vFloat64
|
u.IntervalStart = time.Unix(int64(vFloat64), 0)
|
||||||
} else {
|
} else {
|
||||||
logrus.Error("unable to assert 'interval_start_utc'")
|
logrus.Error("unable to assert 'interval_start_utc'")
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logrus.Error("missing 'interval_start_utc'")
|
logrus.Error("missing 'interval_start_utc'")
|
||||||
}
|
}
|
||||||
clientId := ""
|
|
||||||
serviceId := ""
|
|
||||||
if v, found := event["tags"]; found {
|
if v, found := event["tags"]; found {
|
||||||
if tags, ok := v.(map[string]interface{}); ok {
|
if tags, ok := v.(map[string]interface{}); ok {
|
||||||
if v, found := tags["clientId"]; found {
|
|
||||||
if vStr, ok := v.(string); ok {
|
|
||||||
clientId = vStr
|
|
||||||
} else {
|
|
||||||
logrus.Error("unable to assert 'tags/clientId'")
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logrus.Errorf("missing 'tags/clientId'")
|
|
||||||
}
|
|
||||||
if v, found := tags["serviceId"]; found {
|
if v, found := tags["serviceId"]; found {
|
||||||
if vStr, ok := v.(string); ok {
|
if vStr, ok := v.(string); ok {
|
||||||
serviceId = vStr
|
u.ZitiServiceId = vStr
|
||||||
} else {
|
} else {
|
||||||
logrus.Error("unable to assert 'tags/serviceId'")
|
logrus.Error("unable to assert 'tags/serviceId'")
|
||||||
}
|
}
|
||||||
@ -48,49 +35,61 @@ func (i *UsageIngester) Ingest(event map[string]interface{}) error {
|
|||||||
} else {
|
} else {
|
||||||
logrus.Errorf("missing 'tags'")
|
logrus.Errorf("missing 'tags'")
|
||||||
}
|
}
|
||||||
tx := int64(0)
|
|
||||||
rx := int64(0)
|
|
||||||
if v, found := event["usage"]; found {
|
if v, found := event["usage"]; found {
|
||||||
if usage, ok := v.(map[string]interface{}); ok {
|
if usage, ok := v.(map[string]interface{}); ok {
|
||||||
|
if v, found := usage["ingress.tx"]; found {
|
||||||
|
if vFloat64, ok := v.(float64); ok {
|
||||||
|
u.FrontendTx = int64(vFloat64)
|
||||||
|
} else {
|
||||||
|
logrus.Error("unable to assert 'usage/ingress.tx'")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logrus.Warn("missing 'usage/ingress.tx'")
|
||||||
|
}
|
||||||
|
if v, found := usage["ingress.rx"]; found {
|
||||||
|
if vFloat64, ok := v.(float64); ok {
|
||||||
|
u.FrontendRx = int64(vFloat64)
|
||||||
|
} else {
|
||||||
|
logrus.Error("unable to assert 'usage/ingress.rx")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logrus.Warn("missing 'usage/ingress.rx")
|
||||||
|
}
|
||||||
if v, found := usage["egress.tx"]; found {
|
if v, found := usage["egress.tx"]; found {
|
||||||
if vFloat64, ok := v.(float64); ok {
|
if vFloat64, ok := v.(float64); ok {
|
||||||
tx = int64(vFloat64)
|
u.BackendTx = int64(vFloat64)
|
||||||
} else {
|
} else {
|
||||||
logrus.Error("unable to assert 'usage/egress.tx'")
|
logrus.Error("unable to assert 'usage/egress.tx'")
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logrus.Error("missing 'usage/egress.tx'")
|
logrus.Warn("missing 'usage/egress.tx'")
|
||||||
}
|
}
|
||||||
if v, found := usage["egress.rx"]; found {
|
if v, found := usage["egress.rx"]; found {
|
||||||
if vFloat64, ok := v.(float64); ok {
|
if vFloat64, ok := v.(float64); ok {
|
||||||
rx = int64(vFloat64)
|
u.BackendRx = int64(vFloat64)
|
||||||
} else {
|
} else {
|
||||||
logrus.Error("unable to assert 'usage/egress.rx'")
|
logrus.Error("unable to assert 'usage/egress.rx'")
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logrus.Error("missing 'usage/egress.rx'")
|
logrus.Warn("missing 'usage/egress.rx'")
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logrus.Errorf("unable to assert 'usage' (%v) %v", reflect.TypeOf(v), event)
|
logrus.Errorf("unable to assert 'usage' (%v) %v", reflect.TypeOf(v), event)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logrus.Error("missing 'usage'")
|
logrus.Warnf("missing 'usage'")
|
||||||
}
|
}
|
||||||
circuitId := ""
|
|
||||||
if v, found := event["circuit_id"]; found {
|
if v, found := event["circuit_id"]; found {
|
||||||
if vStr, ok := v.(string); ok {
|
if vStr, ok := v.(string); ok {
|
||||||
circuitId = vStr
|
u.ZitiCircuitId = vStr
|
||||||
} else {
|
} else {
|
||||||
logrus.Error("unable to assert 'circuit_id'")
|
logrus.Error("unable to assert 'circuit_id'")
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logrus.Error("missing 'circuit_id'")
|
logrus.Warn("missing 'circuit_id'")
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Infof("usage: start '%d', serviceId '%v', clientId '%v', circuitId '%v' [rx: %v, tx: %v]", int64(start), serviceId, clientId, circuitId, util.BytesToSize(rx), util.BytesToSize(tx))
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
logrus.Errorf("not 'fabric.usage'")
|
logrus.Errorf("not 'fabric.usage'")
|
||||||
}
|
}
|
||||||
return nil
|
return u
|
||||||
}
|
}
|
||||||
|
@ -132,10 +132,7 @@ func (ua *usageAgent) HandleReceive(msg *channel.Message, _ channel.Channel) {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
ui := &metrics.UsageIngester{}
|
logrus.Info(metrics.Ingest(event))
|
||||||
if err := ui.Ingest(event); err != nil {
|
|
||||||
logrus.Errorf("error ingesting '%v': %v", string(msg.Body), err)
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
logrus.Errorf("error parsing '%v': %v", string(msg.Body), err)
|
logrus.Errorf("error parsing '%v': %v", string(msg.Body), err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user