diff --git a/cmd/zrok/controllerMetricsBridge.go b/cmd/zrok/controllerMetricsBridge.go index b67ef129..8d9fa28e 100644 --- a/cmd/zrok/controllerMetricsBridge.go +++ b/cmd/zrok/controllerMetricsBridge.go @@ -4,7 +4,7 @@ import ( "github.com/michaelquigley/cf" "github.com/openziti/zrok/controller/config" "github.com/openziti/zrok/controller/env" - "github.com/openziti/zrok/controller/metrics2" + "github.com/openziti/zrok/controller/metrics" "github.com/sirupsen/logrus" "github.com/spf13/cobra" "os" @@ -39,7 +39,7 @@ func (cmd *bridgeCommand) run(_ *cobra.Command, args []string) { } logrus.Infof(cf.Dump(cfg, env.GetCfOptions())) - bridge, err := metrics2.NewBridge(cfg.Bridge) + bridge, err := metrics.NewBridge(cfg.Bridge) if err != nil { panic(err) } diff --git a/controller/config/config.go b/controller/config/config.go index 06413296..d2ac3eb6 100644 --- a/controller/config/config.go +++ b/controller/config/config.go @@ -3,7 +3,7 @@ package config import ( "github.com/openziti/zrok/controller/env" "github.com/openziti/zrok/controller/limits" - "github.com/openziti/zrok/controller/metrics2" + "github.com/openziti/zrok/controller/metrics" "github.com/openziti/zrok/controller/zrokEdgeSdk" "time" @@ -17,12 +17,12 @@ const ConfigVersion = 2 type Config struct { V int Admin *AdminConfig - Bridge *metrics2.BridgeConfig + Bridge *metrics.BridgeConfig Endpoint *EndpointConfig Email *EmailConfig Limits *limits.Config Maintenance *MaintenanceConfig - Metrics *metrics2.Config + Metrics *metrics.Config Registration *RegistrationConfig ResetPassword *ResetPasswordConfig Store *store.Config diff --git a/controller/controller.go b/controller/controller.go index e9e52f24..670bd863 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -3,7 +3,7 @@ package controller import ( "context" "github.com/openziti/zrok/controller/config" - "github.com/openziti/zrok/controller/metrics2" + "github.com/openziti/zrok/controller/metrics" "github.com/sirupsen/logrus" "github.com/go-openapi/loads" @@ -72,7 +72,7 @@ func Run(inCfg *config.Config) error { } if cfg.Metrics != nil && cfg.Metrics.Agent != nil && cfg.Metrics.Influx != nil { - ma, err := metrics2.NewAgent(cfg.Metrics.Agent, str, cfg.Metrics.Influx) + ma, err := metrics.NewAgent(cfg.Metrics.Agent, str, cfg.Metrics.Influx) if err != nil { return errors.Wrap(err, "error creating metrics agent") } diff --git a/controller/metrics/agent.go b/controller/metrics/agent.go index c06ab246..89be79e5 100644 --- a/controller/metrics/agent.go +++ b/controller/metrics/agent.go @@ -6,48 +6,45 @@ import ( "github.com/sirupsen/logrus" ) -type MetricsAgent struct { - src Source - cache *cache - join chan struct{} +type Agent struct { + events chan ZitiEventJson + src ZitiEventJsonSource + srcJoin chan struct{} + cache *cache + snk UsageSink } -func Run(cfg *Config, strCfg *store.Config) (*MetricsAgent, error) { - logrus.Info("starting") +func NewAgent(cfg *AgentConfig, str *store.Store, ifxCfg *InfluxConfig) (*Agent, error) { + a := &Agent{} + if v, ok := cfg.Source.(ZitiEventJsonSource); ok { + a.src = v + } else { + return nil, errors.New("invalid event json source") + } + a.cache = newShareCache(str) + a.snk = newInfluxWriter(ifxCfg) + return a, nil +} - cache, err := newShareCache(strCfg) +func (a *Agent) Start() error { + a.events = make(chan ZitiEventJson) + srcJoin, err := a.src.Start(a.events) if err != nil { - return nil, errors.Wrap(err, "error creating share cache") - } - - if cfg.Strategies == nil || cfg.Strategies.Source == nil { - return nil, errors.New("no 'strategies/source' configured; exiting") - } - - src, ok := cfg.Strategies.Source.(Source) - if !ok { - return nil, errors.New("invalid 'strategies/source'; exiting") - } - - if cfg.Influx == nil { - return nil, errors.New("no 'influx' configured; exiting") - } - - idb := openInfluxDb(cfg.Influx) - - events := make(chan map[string]interface{}) - join, err := src.Start(events) - if err != nil { - return nil, errors.Wrap(err, "error starting source") + return err } + a.srcJoin = srcJoin go func() { + logrus.Info("started") + defer logrus.Info("stopped") for { select { - case event := <-events: - usage := Ingest(event) - if err := cache.addZrokDetail(usage); err == nil { - if err := idb.Write(usage); err != nil { + case event := <-a.events: + if usage, err := Ingest(event); err == nil { + if err := a.cache.addZrokDetail(usage); err != nil { + logrus.Error(err) + } + if err := a.snk.Handle(usage); err != nil { logrus.Error(err) } } else { @@ -57,14 +54,10 @@ func Run(cfg *Config, strCfg *store.Config) (*MetricsAgent, error) { } }() - return &MetricsAgent{src: src, join: join}, nil + return nil } -func (ma *MetricsAgent) Stop() { - logrus.Info("stopping") - ma.src.Stop() -} - -func (ma *MetricsAgent) Join() { - <-ma.join +func (a *Agent) Stop() { + a.src.Stop() + close(a.events) } diff --git a/controller/metrics/amqpSender.go b/controller/metrics/amqpSender.go deleted file mode 100644 index 332d5328..00000000 --- a/controller/metrics/amqpSender.go +++ /dev/null @@ -1,51 +0,0 @@ -package metrics - -import ( - "context" - "github.com/pkg/errors" - amqp "github.com/rabbitmq/amqp091-go" - "time" -) - -type AmqpSenderConfig struct { - Url string `cf:"+secret"` - Queue string -} - -type AmqpSender struct { - conn *amqp.Connection - ch *amqp.Channel - queue amqp.Queue -} - -func NewAmqpSender(cfg *AmqpSenderConfig) (*AmqpSender, error) { - conn, err := amqp.Dial(cfg.Url) - if err != nil { - return nil, errors.Wrap(err, "error dialing amqp broker") - } - - ch, err := conn.Channel() - if err != nil { - return nil, errors.Wrap(err, "error getting channel from amqp connection") - } - - queue, err := ch.QueueDeclare(cfg.Queue, true, false, false, false, nil) - if err != nil { - return nil, errors.Wrap(err, "error creating amqp queue") - } - - return &AmqpSender{conn, ch, queue}, nil -} - -func (s *AmqpSender) Send(json string) error { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - err := s.ch.PublishWithContext(ctx, "", s.queue.Name, false, false, amqp.Publishing{ - ContentType: "application/json", - Body: []byte(json), - }) - if err != nil { - return errors.Wrap(err, "error sending") - } - return nil -} diff --git a/controller/metrics2/amqpSink.go b/controller/metrics/amqpSink.go similarity index 98% rename from controller/metrics2/amqpSink.go rename to controller/metrics/amqpSink.go index 545f3433..451bdb7b 100644 --- a/controller/metrics2/amqpSink.go +++ b/controller/metrics/amqpSink.go @@ -1,4 +1,4 @@ -package metrics2 +package metrics import ( "context" diff --git a/controller/metrics2/amqpSource.go b/controller/metrics/amqpSource.go similarity index 99% rename from controller/metrics2/amqpSource.go rename to controller/metrics/amqpSource.go index 5e07f5d4..506a4984 100644 --- a/controller/metrics2/amqpSource.go +++ b/controller/metrics/amqpSource.go @@ -1,4 +1,4 @@ -package metrics2 +package metrics import ( "github.com/michaelquigley/cf" diff --git a/controller/metrics2/bridge.go b/controller/metrics/bridge.go similarity index 98% rename from controller/metrics2/bridge.go rename to controller/metrics/bridge.go index f91c4219..17e8d6e9 100644 --- a/controller/metrics2/bridge.go +++ b/controller/metrics/bridge.go @@ -1,4 +1,4 @@ -package metrics2 +package metrics import ( "github.com/pkg/errors" diff --git a/controller/metrics/cache.go b/controller/metrics/cache.go index 6cb2b59e..e61866b6 100644 --- a/controller/metrics/cache.go +++ b/controller/metrics/cache.go @@ -2,34 +2,29 @@ package metrics import ( "github.com/openziti/zrok/controller/store" - "github.com/pkg/errors" ) type cache struct { str *store.Store } -func newShareCache(cfg *store.Config) (*cache, error) { - str, err := store.Open(cfg) - if err != nil { - return nil, errors.Wrap(err, "error opening store") - } - return &cache{str}, nil +func newShareCache(str *store.Store) *cache { + return &cache{str} } -func (sc *cache) addZrokDetail(u *Usage) error { - tx, err := sc.str.Begin() +func (c *cache) addZrokDetail(u *Usage) error { + tx, err := c.str.Begin() if err != nil { return err } defer func() { _ = tx.Rollback() }() - shr, err := sc.str.FindShareWithZIdAndDeleted(u.ZitiServiceId, tx) + shr, err := c.str.FindShareWithZIdAndDeleted(u.ZitiServiceId, tx) if err != nil { return err } u.ShareToken = shr.Token - env, err := sc.str.GetEnvironment(shr.EnvironmentId, tx) + env, err := c.str.GetEnvironment(shr.EnvironmentId, tx) if err != nil { return err } diff --git a/controller/metrics/config.go b/controller/metrics/config.go index 4cdafcbb..03e4da83 100644 --- a/controller/metrics/config.go +++ b/controller/metrics/config.go @@ -1,8 +1,12 @@ package metrics type Config struct { - Influx *InfluxConfig - Strategies *StrategiesConfig + Influx *InfluxConfig + Agent *AgentConfig +} + +type AgentConfig struct { + Source interface{} } type InfluxConfig struct { @@ -11,7 +15,3 @@ type InfluxConfig struct { Org string Token string `cf:"+secret"` } - -type StrategiesConfig struct { - Source interface{} -} diff --git a/controller/metrics/fileSource.go b/controller/metrics/fileSource.go index 67007552..ac7f86b8 100644 --- a/controller/metrics/fileSource.go +++ b/controller/metrics/fileSource.go @@ -2,7 +2,6 @@ package metrics import ( "encoding/binary" - "encoding/json" "github.com/michaelquigley/cf" "github.com/nxadm/tail" "github.com/openziti/zrok/controller/env" @@ -12,12 +11,12 @@ import ( ) func init() { - env.GetCfOptions().AddFlexibleSetter("file", loadFileSourceConfig) + env.GetCfOptions().AddFlexibleSetter("fileSource", loadFileSourceConfig) } type FileSourceConfig struct { - Path string - IndexPath string + Path string + PointerPath string } func loadFileSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) { @@ -28,36 +27,36 @@ func loadFileSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) { } return &fileSource{cfg: cfg}, nil } - return nil, errors.New("invalid config structure for 'file' source") + return nil, errors.New("invalid config structure for 'fileSource'") } type fileSource struct { - cfg *FileSourceConfig - t *tail.Tail + cfg *FileSourceConfig + ptrF *os.File + t *tail.Tail } -func (s *fileSource) Start(events chan map[string]interface{}) (join chan struct{}, err error) { +func (s *fileSource) Start(events chan ZitiEventJson) (join chan struct{}, err error) { f, err := os.Open(s.cfg.Path) if err != nil { return nil, errors.Wrapf(err, "error opening '%v'", s.cfg.Path) } _ = f.Close() - idxF, err := os.OpenFile(s.indexPath(), os.O_CREATE|os.O_RDWR, os.ModePerm) + s.ptrF, err = os.OpenFile(s.pointerPath(), os.O_CREATE|os.O_RDWR, os.ModePerm) if err != nil { - return nil, errors.Wrapf(err, "error opening '%v'", s.indexPath()) + return nil, errors.Wrapf(err, "error opening pointer '%v'", s.pointerPath()) } - pos := int64(0) - posBuf := make([]byte, 8) - if n, err := idxF.Read(posBuf); err == nil && n == 8 { - pos = int64(binary.LittleEndian.Uint64(posBuf)) - logrus.Infof("recovered stored position: %d", pos) + ptr, err := s.readPtr() + if err != nil { + logrus.Errorf("error reading pointer: %v", err) } + logrus.Infof("retrieved stored position pointer at '%d'", ptr) join = make(chan struct{}) go func() { - s.tail(pos, events, idxF) + s.tail(ptr, events) close(join) }() @@ -70,43 +69,62 @@ func (s *fileSource) Stop() { } } -func (s *fileSource) tail(pos int64, events chan map[string]interface{}, idxF *os.File) { - logrus.Infof("started") - defer logrus.Infof("stopped") - - posBuf := make([]byte, 8) +func (s *fileSource) tail(ptr int64, events chan ZitiEventJson) { + logrus.Info("started") + defer logrus.Info("stopped") var err error s.t, err = tail.TailFile(s.cfg.Path, tail.Config{ ReOpen: true, Follow: true, - Location: &tail.SeekInfo{Offset: pos}, + Location: &tail.SeekInfo{Offset: ptr}, }) if err != nil { - logrus.Error(err) + logrus.Errorf("error starting tail: %v", err) return } - for line := range s.t.Lines { - event := make(map[string]interface{}) - if err := json.Unmarshal([]byte(line.Text), &event); err == nil { - binary.LittleEndian.PutUint64(posBuf, uint64(line.SeekInfo.Offset)) - if n, err := idxF.Seek(0, 0); err == nil && n == 0 { - if n, err := idxF.Write(posBuf); err != nil || n != 8 { - logrus.Errorf("error writing index (%d): %v", n, err) - } - } - events <- event - } else { - logrus.Errorf("error parsing line #%d: %v", line.Num, err) + for event := range s.t.Lines { + events <- ZitiEventJson(event.Text) + + if err := s.writePtr(event.SeekInfo.Offset); err != nil { + logrus.Error(err) } } } -func (s *fileSource) indexPath() string { - if s.cfg.IndexPath == "" { - return s.cfg.Path + ".idx" +func (s *fileSource) pointerPath() string { + if s.cfg.PointerPath == "" { + return s.cfg.Path + ".ptr" } else { - return s.cfg.IndexPath + return s.cfg.PointerPath } } + +func (s *fileSource) readPtr() (int64, error) { + ptr := int64(0) + buf := make([]byte, 8) + if n, err := s.ptrF.Seek(0, 0); err == nil && n == 0 { + if n, err := s.ptrF.Read(buf); err == nil && n == 8 { + ptr = int64(binary.LittleEndian.Uint64(buf)) + return ptr, nil + } else { + return 0, errors.Wrapf(err, "error reading pointer (%d): %v", n, err) + } + } else { + return 0, errors.Wrapf(err, "error seeking pointer (%d): %v", n, err) + } +} + +func (s *fileSource) writePtr(ptr int64) error { + buf := make([]byte, 8) + binary.LittleEndian.PutUint64(buf, uint64(ptr)) + if n, err := s.ptrF.Seek(0, 0); err == nil && n == 0 { + if n, err := s.ptrF.Write(buf); err != nil || n != 8 { + return errors.Wrapf(err, "error writing pointer (%d): %v", n, err) + } + } else { + return errors.Wrapf(err, "error seeking pointer (%d): %v", n, err) + } + return nil +} diff --git a/controller/metrics/influx.go b/controller/metrics/influx.go deleted file mode 100644 index 214ed8e5..00000000 --- a/controller/metrics/influx.go +++ /dev/null @@ -1,61 +0,0 @@ -package metrics - -import ( - "context" - "fmt" - influxdb2 "github.com/influxdata/influxdb-client-go/v2" - "github.com/influxdata/influxdb-client-go/v2/api" - "github.com/influxdata/influxdb-client-go/v2/api/write" - "github.com/openziti/zrok/util" - "github.com/sirupsen/logrus" -) - -type influxDb struct { - idb influxdb2.Client - writeApi api.WriteAPIBlocking -} - -func openInfluxDb(cfg *InfluxConfig) *influxDb { - idb := influxdb2.NewClient(cfg.Url, cfg.Token) - wapi := idb.WriteAPIBlocking(cfg.Org, cfg.Bucket) - return &influxDb{idb, wapi} -} - -func (i *influxDb) Write(u *Usage) error { - out := fmt.Sprintf("share: %v, circuit: %v", u.ShareToken, u.ZitiCircuitId) - - envId := fmt.Sprintf("%d", u.EnvironmentId) - acctId := fmt.Sprintf("%d", u.AccountId) - - var pts []*write.Point - circuitPt := influxdb2.NewPoint("circuits", - map[string]string{"share": u.ShareToken, "envId": envId, "acctId": acctId}, - map[string]interface{}{"circuit": u.ZitiCircuitId}, - u.IntervalStart) - pts = append(pts, circuitPt) - - if u.BackendTx > 0 || u.BackendRx > 0 { - pt := influxdb2.NewPoint("xfer", - map[string]string{"namespace": "backend", "share": u.ShareToken, "envId": envId, "acctId": acctId}, - map[string]interface{}{"bytesRead": u.BackendRx, "bytesWritten": u.BackendTx}, - u.IntervalStart) - pts = append(pts, pt) - out += fmt.Sprintf(" backend {rx: %v, tx: %v}", util.BytesToSize(u.BackendRx), util.BytesToSize(u.BackendTx)) - } - if u.FrontendTx > 0 || u.FrontendRx > 0 { - pt := influxdb2.NewPoint("xfer", - map[string]string{"namespace": "frontend", "share": u.ShareToken, "envId": envId, "acctId": acctId}, - map[string]interface{}{"bytesRead": u.FrontendRx, "bytesWritten": u.FrontendTx}, - u.IntervalStart) - pts = append(pts, pt) - out += fmt.Sprintf(" frontend {rx: %v, tx: %v}", util.BytesToSize(u.FrontendRx), util.BytesToSize(u.FrontendTx)) - } - - if err := i.writeApi.WritePoint(context.Background(), pts...); err == nil { - logrus.Info(out) - } else { - return err - } - - return nil -} diff --git a/controller/metrics2/influxWriter.go b/controller/metrics/influxWriter.go similarity index 99% rename from controller/metrics2/influxWriter.go rename to controller/metrics/influxWriter.go index a2652302..12c09693 100644 --- a/controller/metrics2/influxWriter.go +++ b/controller/metrics/influxWriter.go @@ -1,4 +1,4 @@ -package metrics2 +package metrics import ( "context" diff --git a/controller/metrics/model.go b/controller/metrics/model.go index 632e8404..bda74939 100644 --- a/controller/metrics/model.go +++ b/controller/metrics/model.go @@ -35,11 +35,17 @@ func (u Usage) String() string { return out } -type Source interface { - Start(chan map[string]interface{}) (chan struct{}, error) +type UsageSink interface { + Handle(u *Usage) error +} + +type ZitiEventJson string + +type ZitiEventJsonSource interface { + Start(chan ZitiEventJson) (join chan struct{}, err error) Stop() } -type Ingester interface { - Ingest(msg map[string]interface{}) error +type ZitiEventJsonSink interface { + Handle(event ZitiEventJson) error } diff --git a/controller/metrics2/usageIngest.go b/controller/metrics/usageIngest.go similarity index 99% rename from controller/metrics2/usageIngest.go rename to controller/metrics/usageIngest.go index 99f50122..d22a3a57 100644 --- a/controller/metrics2/usageIngest.go +++ b/controller/metrics/usageIngest.go @@ -1,4 +1,4 @@ -package metrics2 +package metrics import ( "encoding/json" diff --git a/controller/metrics/usageIngester.go b/controller/metrics/usageIngester.go deleted file mode 100644 index 80db3412..00000000 --- a/controller/metrics/usageIngester.go +++ /dev/null @@ -1,95 +0,0 @@ -package metrics - -import ( - "github.com/sirupsen/logrus" - "reflect" - "time" -) - -func Ingest(event map[string]interface{}) *Usage { - u := &Usage{ProcessedStamp: time.Now()} - if ns, found := event["namespace"]; found && ns == "fabric.usage" { - if v, found := event["interval_start_utc"]; found { - if vFloat64, ok := v.(float64); ok { - u.IntervalStart = time.Unix(int64(vFloat64), 0) - } else { - logrus.Error("unable to assert 'interval_start_utc'") - } - } else { - logrus.Error("missing 'interval_start_utc'") - } - if v, found := event["tags"]; found { - if tags, ok := v.(map[string]interface{}); ok { - if v, found := tags["serviceId"]; found { - if vStr, ok := v.(string); ok { - u.ZitiServiceId = vStr - } else { - logrus.Error("unable to assert 'tags/serviceId'") - } - } else { - logrus.Error("missing 'tags/serviceId'") - } - } else { - logrus.Errorf("unable to assert 'tags'") - } - } else { - logrus.Errorf("missing 'tags'") - } - if v, found := event["usage"]; found { - if usage, ok := v.(map[string]interface{}); ok { - if v, found := usage["ingress.tx"]; found { - if vFloat64, ok := v.(float64); ok { - u.FrontendTx = int64(vFloat64) - } else { - logrus.Error("unable to assert 'usage/ingress.tx'") - } - } else { - logrus.Warn("missing 'usage/ingress.tx'") - } - if v, found := usage["ingress.rx"]; found { - if vFloat64, ok := v.(float64); ok { - u.FrontendRx = int64(vFloat64) - } else { - logrus.Error("unable to assert 'usage/ingress.rx") - } - } else { - logrus.Warn("missing 'usage/ingress.rx") - } - if v, found := usage["egress.tx"]; found { - if vFloat64, ok := v.(float64); ok { - u.BackendTx = int64(vFloat64) - } else { - logrus.Error("unable to assert 'usage/egress.tx'") - } - } else { - logrus.Warn("missing 'usage/egress.tx'") - } - if v, found := usage["egress.rx"]; found { - if vFloat64, ok := v.(float64); ok { - u.BackendRx = int64(vFloat64) - } else { - logrus.Error("unable to assert 'usage/egress.rx'") - } - } else { - logrus.Warn("missing 'usage/egress.rx'") - } - } else { - logrus.Errorf("unable to assert 'usage' (%v) %v", reflect.TypeOf(v), event) - } - } else { - logrus.Warnf("missing 'usage'") - } - if v, found := event["circuit_id"]; found { - if vStr, ok := v.(string); ok { - u.ZitiCircuitId = vStr - } else { - logrus.Error("unable to assert 'circuit_id'") - } - } else { - logrus.Warn("missing 'circuit_id'") - } - } else { - logrus.Errorf("not 'fabric.usage'") - } - return u -} diff --git a/controller/metrics/websocketSource.go b/controller/metrics/websocketSource.go index 3b1b2898..e6b66ac6 100644 --- a/controller/metrics/websocketSource.go +++ b/controller/metrics/websocketSource.go @@ -1,7 +1,6 @@ package metrics import ( - "bytes" "crypto/tls" "crypto/x509" "encoding/json" @@ -24,14 +23,14 @@ import ( ) func init() { - env.GetCfOptions().AddFlexibleSetter("websocket", loadWebsocketSourceConfig) + env.GetCfOptions().AddFlexibleSetter("websocketSource", loadWebsocketSourceConfig) } type WebsocketSourceConfig struct { WebsocketEndpoint string ApiEndpoint string Username string - Password string + Password string `cf:"+secret"` } func loadWebsocketSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) { @@ -42,17 +41,17 @@ func loadWebsocketSourceConfig(v interface{}, _ *cf.Options) (interface{}, error } return &websocketSource{cfg: cfg}, nil } - return nil, errors.New("invalid config structure for 'websocket' source") + return nil, errors.New("invalid config struture for 'websocketSource'") } type websocketSource struct { cfg *WebsocketSourceConfig ch channel.Channel - events chan map[string]interface{} + events chan ZitiEventJson join chan struct{} } -func (s *websocketSource) Start(events chan map[string]interface{}) (chan struct{}, error) { +func (s *websocketSource) Start(events chan ZitiEventJson) (join chan struct{}, err error) { caCerts, err := rest_util.GetControllerWellKnownCas(s.cfg.ApiEndpoint) if err != nil { return nil, err @@ -151,17 +150,5 @@ func (s *websocketSource) Stop() { } func (s *websocketSource) HandleReceive(msg *channel.Message, _ channel.Channel) { - decoder := json.NewDecoder(bytes.NewReader(msg.Body)) - for { - ev := make(map[string]interface{}) - err := decoder.Decode(&ev) - if err == io.EOF { - break - } - if err == nil { - s.events <- ev - } else { - logrus.Errorf("error parsing '%v': %v", string(msg.Body), err) - } - } + s.events <- ZitiEventJson(msg.Body) } diff --git a/controller/metrics2/agent.go b/controller/metrics2/agent.go deleted file mode 100644 index 0e299379..00000000 --- a/controller/metrics2/agent.go +++ /dev/null @@ -1,63 +0,0 @@ -package metrics2 - -import ( - "github.com/openziti/zrok/controller/store" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" -) - -type Agent struct { - events chan ZitiEventJson - src ZitiEventJsonSource - srcJoin chan struct{} - cache *cache - snk UsageSink -} - -func NewAgent(cfg *AgentConfig, str *store.Store, ifxCfg *InfluxConfig) (*Agent, error) { - a := &Agent{} - if v, ok := cfg.Source.(ZitiEventJsonSource); ok { - a.src = v - } else { - return nil, errors.New("invalid event json source") - } - a.cache = newShareCache(str) - a.snk = newInfluxWriter(ifxCfg) - return a, nil -} - -func (a *Agent) Start() error { - a.events = make(chan ZitiEventJson) - srcJoin, err := a.src.Start(a.events) - if err != nil { - return err - } - a.srcJoin = srcJoin - - go func() { - logrus.Info("started") - defer logrus.Info("stopped") - for { - select { - case event := <-a.events: - if usage, err := Ingest(event); err == nil { - if err := a.cache.addZrokDetail(usage); err != nil { - logrus.Error(err) - } - if err := a.snk.Handle(usage); err != nil { - logrus.Error(err) - } - } else { - logrus.Error(err) - } - } - } - }() - - return nil -} - -func (a *Agent) Stop() { - a.src.Stop() - close(a.events) -} diff --git a/controller/metrics2/cache.go b/controller/metrics2/cache.go deleted file mode 100644 index 32acd1c3..00000000 --- a/controller/metrics2/cache.go +++ /dev/null @@ -1,35 +0,0 @@ -package metrics2 - -import ( - "github.com/openziti/zrok/controller/store" -) - -type cache struct { - str *store.Store -} - -func newShareCache(str *store.Store) *cache { - return &cache{str} -} - -func (c *cache) addZrokDetail(u *Usage) error { - tx, err := c.str.Begin() - if err != nil { - return err - } - defer func() { _ = tx.Rollback() }() - - shr, err := c.str.FindShareWithZIdAndDeleted(u.ZitiServiceId, tx) - if err != nil { - return err - } - u.ShareToken = shr.Token - env, err := c.str.GetEnvironment(shr.EnvironmentId, tx) - if err != nil { - return err - } - u.EnvironmentId = int64(env.Id) - u.AccountId = int64(*env.AccountId) - - return nil -} diff --git a/controller/metrics2/config.go b/controller/metrics2/config.go deleted file mode 100644 index de593f3a..00000000 --- a/controller/metrics2/config.go +++ /dev/null @@ -1,17 +0,0 @@ -package metrics2 - -type Config struct { - Influx *InfluxConfig - Agent *AgentConfig -} - -type AgentConfig struct { - Source interface{} -} - -type InfluxConfig struct { - Url string - Bucket string - Org string - Token string `cf:"+secret"` -} diff --git a/controller/metrics2/fileSource.go b/controller/metrics2/fileSource.go deleted file mode 100644 index 9ca527a5..00000000 --- a/controller/metrics2/fileSource.go +++ /dev/null @@ -1,130 +0,0 @@ -package metrics2 - -import ( - "encoding/binary" - "github.com/michaelquigley/cf" - "github.com/nxadm/tail" - "github.com/openziti/zrok/controller/env" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "os" -) - -func init() { - env.GetCfOptions().AddFlexibleSetter("fileSource", loadFileSourceConfig) -} - -type FileSourceConfig struct { - Path string - PointerPath string -} - -func loadFileSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) { - if submap, ok := v.(map[string]interface{}); ok { - cfg := &FileSourceConfig{} - if err := cf.Bind(cfg, submap, cf.DefaultOptions()); err != nil { - return nil, err - } - return &fileSource{cfg: cfg}, nil - } - return nil, errors.New("invalid config structure for 'fileSource'") -} - -type fileSource struct { - cfg *FileSourceConfig - ptrF *os.File - t *tail.Tail -} - -func (s *fileSource) Start(events chan ZitiEventJson) (join chan struct{}, err error) { - f, err := os.Open(s.cfg.Path) - if err != nil { - return nil, errors.Wrapf(err, "error opening '%v'", s.cfg.Path) - } - _ = f.Close() - - s.ptrF, err = os.OpenFile(s.pointerPath(), os.O_CREATE|os.O_RDWR, os.ModePerm) - if err != nil { - return nil, errors.Wrapf(err, "error opening pointer '%v'", s.pointerPath()) - } - - ptr, err := s.readPtr() - if err != nil { - logrus.Errorf("error reading pointer: %v", err) - } - logrus.Infof("retrieved stored position pointer at '%d'", ptr) - - join = make(chan struct{}) - go func() { - s.tail(ptr, events) - close(join) - }() - - return join, nil -} - -func (s *fileSource) Stop() { - if err := s.t.Stop(); err != nil { - logrus.Error(err) - } -} - -func (s *fileSource) tail(ptr int64, events chan ZitiEventJson) { - logrus.Info("started") - defer logrus.Info("stopped") - - var err error - s.t, err = tail.TailFile(s.cfg.Path, tail.Config{ - ReOpen: true, - Follow: true, - Location: &tail.SeekInfo{Offset: ptr}, - }) - if err != nil { - logrus.Errorf("error starting tail: %v", err) - return - } - - for event := range s.t.Lines { - events <- ZitiEventJson(event.Text) - - if err := s.writePtr(event.SeekInfo.Offset); err != nil { - logrus.Error(err) - } - } -} - -func (s *fileSource) pointerPath() string { - if s.cfg.PointerPath == "" { - return s.cfg.Path + ".ptr" - } else { - return s.cfg.PointerPath - } -} - -func (s *fileSource) readPtr() (int64, error) { - ptr := int64(0) - buf := make([]byte, 8) - if n, err := s.ptrF.Seek(0, 0); err == nil && n == 0 { - if n, err := s.ptrF.Read(buf); err == nil && n == 8 { - ptr = int64(binary.LittleEndian.Uint64(buf)) - return ptr, nil - } else { - return 0, errors.Wrapf(err, "error reading pointer (%d): %v", n, err) - } - } else { - return 0, errors.Wrapf(err, "error seeking pointer (%d): %v", n, err) - } -} - -func (s *fileSource) writePtr(ptr int64) error { - buf := make([]byte, 8) - binary.LittleEndian.PutUint64(buf, uint64(ptr)) - if n, err := s.ptrF.Seek(0, 0); err == nil && n == 0 { - if n, err := s.ptrF.Write(buf); err != nil || n != 8 { - return errors.Wrapf(err, "error writing pointer (%d): %v", n, err) - } - } else { - return errors.Wrapf(err, "error seeking pointer (%d): %v", n, err) - } - return nil -} diff --git a/controller/metrics2/model.go b/controller/metrics2/model.go deleted file mode 100644 index 939cb09f..00000000 --- a/controller/metrics2/model.go +++ /dev/null @@ -1,51 +0,0 @@ -package metrics2 - -import ( - "fmt" - "github.com/openziti/zrok/util" - "time" -) - -type Usage struct { - ProcessedStamp time.Time - IntervalStart time.Time - ZitiServiceId string - ZitiCircuitId string - ShareToken string - EnvironmentId int64 - AccountId int64 - FrontendTx int64 - FrontendRx int64 - BackendTx int64 - BackendRx int64 -} - -func (u Usage) String() string { - out := "Usage {" - out += fmt.Sprintf("processed '%v'", u.ProcessedStamp) - out += ", " + fmt.Sprintf("interval '%v'", u.IntervalStart) - out += ", " + fmt.Sprintf("service '%v'", u.ZitiServiceId) - out += ", " + fmt.Sprintf("circuit '%v'", u.ZitiCircuitId) - out += ", " + fmt.Sprintf("share '%v'", u.ShareToken) - out += ", " + fmt.Sprintf("environment '%d'", u.EnvironmentId) - out += ", " + fmt.Sprintf("account '%v'", u.AccountId) - out += ", " + fmt.Sprintf("fe {rx %v, tx %v}", util.BytesToSize(u.FrontendRx), util.BytesToSize(u.FrontendTx)) - out += ", " + fmt.Sprintf("be {rx %v, tx %v}", util.BytesToSize(u.BackendRx), util.BytesToSize(u.BackendTx)) - out += "}" - return out -} - -type UsageSink interface { - Handle(u *Usage) error -} - -type ZitiEventJson string - -type ZitiEventJsonSource interface { - Start(chan ZitiEventJson) (join chan struct{}, err error) - Stop() -} - -type ZitiEventJsonSink interface { - Handle(event ZitiEventJson) error -} diff --git a/controller/metrics2/websocketSource.go b/controller/metrics2/websocketSource.go deleted file mode 100644 index 10a4d973..00000000 --- a/controller/metrics2/websocketSource.go +++ /dev/null @@ -1,154 +0,0 @@ -package metrics2 - -import ( - "crypto/tls" - "crypto/x509" - "encoding/json" - "github.com/gorilla/websocket" - "github.com/michaelquigley/cf" - "github.com/openziti/channel/v2" - "github.com/openziti/channel/v2/websockets" - "github.com/openziti/edge/rest_util" - "github.com/openziti/fabric/event" - "github.com/openziti/fabric/pb/mgmt_pb" - "github.com/openziti/identity" - "github.com/openziti/sdk-golang/ziti/constants" - "github.com/openziti/zrok/controller/env" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "io" - "net/http" - "net/url" - "time" -) - -func init() { - env.GetCfOptions().AddFlexibleSetter("websocketSource", loadWebsocketSourceConfig) -} - -type WebsocketSourceConfig struct { - WebsocketEndpoint string - ApiEndpoint string - Username string - Password string `cf:"+secret"` -} - -func loadWebsocketSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) { - if submap, ok := v.(map[string]interface{}); ok { - cfg := &WebsocketSourceConfig{} - if err := cf.Bind(cfg, submap, cf.DefaultOptions()); err != nil { - return nil, err - } - return &websocketSource{cfg: cfg}, nil - } - return nil, errors.New("invalid config struture for 'websocketSource'") -} - -type websocketSource struct { - cfg *WebsocketSourceConfig - ch channel.Channel - events chan ZitiEventJson - join chan struct{} -} - -func (s *websocketSource) Start(events chan ZitiEventJson) (join chan struct{}, err error) { - caCerts, err := rest_util.GetControllerWellKnownCas(s.cfg.ApiEndpoint) - if err != nil { - return nil, err - } - caPool := x509.NewCertPool() - for _, ca := range caCerts { - caPool.AddCert(ca) - } - - authenticator := rest_util.NewAuthenticatorUpdb(s.cfg.Username, s.cfg.Password) - authenticator.RootCas = caPool - - apiEndpointUrl, err := url.Parse(s.cfg.ApiEndpoint) - if err != nil { - return nil, err - } - apiSession, err := authenticator.Authenticate(apiEndpointUrl) - if err != nil { - return nil, err - } - - dialer := &websocket.Dialer{ - TLSClientConfig: &tls.Config{ - RootCAs: caPool, - }, - HandshakeTimeout: 5 * time.Second, - } - - conn, resp, err := dialer.Dial(s.cfg.WebsocketEndpoint, http.Header{constants.ZitiSession: []string{*apiSession.Token}}) - if err != nil { - if resp != nil { - if body, rerr := io.ReadAll(resp.Body); rerr == nil { - logrus.Errorf("response body '%v': %v", string(body), err) - } - } else { - logrus.Errorf("no response from websocket dial: %v", err) - } - } - - id := &identity.TokenId{Token: "mgmt"} - underlayFactory := websockets.NewUnderlayFactory(id, conn, nil) - - s.join = make(chan struct{}) - s.events = events - bindHandler := func(binding channel.Binding) error { - binding.AddReceiveHandler(int32(mgmt_pb.ContentType_StreamEventsEventType), s) - binding.AddCloseHandler(channel.CloseHandlerF(func(ch channel.Channel) { - close(s.join) - })) - return nil - } - - s.ch, err = channel.NewChannel("mgmt", underlayFactory, channel.BindHandlerF(bindHandler), nil) - if err != nil { - return nil, err - } - - streamEventsRequest := map[string]interface{}{} - streamEventsRequest["format"] = "json" - streamEventsRequest["subscriptions"] = []*event.Subscription{ - { - Type: "fabric.usage", - Options: map[string]interface{}{ - "version": uint8(3), - }, - }, - } - - msgBytes, err := json.Marshal(streamEventsRequest) - if err != nil { - return nil, err - } - - requestMsg := channel.NewMessage(int32(mgmt_pb.ContentType_StreamEventsRequestType), msgBytes) - responseMsg, err := requestMsg.WithTimeout(5 * time.Second).SendForReply(s.ch) - if err != nil { - return nil, err - } - - if responseMsg.ContentType == channel.ContentTypeResultType { - result := channel.UnmarshalResult(responseMsg) - if result.Success { - logrus.Infof("event stream started: %v", result.Message) - } else { - return nil, errors.Wrap(err, "error starting event streaming") - } - } else { - return nil, errors.Errorf("unexpected response type %v", responseMsg.ContentType) - } - - return s.join, nil -} - -func (s *websocketSource) Stop() { - _ = s.ch.Close() -} - -func (s *websocketSource) HandleReceive(msg *channel.Message, _ channel.Channel) { - s.events <- ZitiEventJson(msg.Body) -}