metrics2 -> metrics (#270)

This commit is contained in:
Michael Quigley
2023-03-15 16:14:06 -04:00
committed by Kenneth Bingham
parent e1c79e687e
commit e824c87d08
23 changed files with 131 additions and 789 deletions

View File

@ -4,7 +4,7 @@ import (
"github.com/michaelquigley/cf" "github.com/michaelquigley/cf"
"github.com/openziti/zrok/controller/config" "github.com/openziti/zrok/controller/config"
"github.com/openziti/zrok/controller/env" "github.com/openziti/zrok/controller/env"
"github.com/openziti/zrok/controller/metrics2" "github.com/openziti/zrok/controller/metrics"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"os" "os"
@ -39,7 +39,7 @@ func (cmd *bridgeCommand) run(_ *cobra.Command, args []string) {
} }
logrus.Infof(cf.Dump(cfg, env.GetCfOptions())) logrus.Infof(cf.Dump(cfg, env.GetCfOptions()))
bridge, err := metrics2.NewBridge(cfg.Bridge) bridge, err := metrics.NewBridge(cfg.Bridge)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@ -3,7 +3,7 @@ package config
import ( import (
"github.com/openziti/zrok/controller/env" "github.com/openziti/zrok/controller/env"
"github.com/openziti/zrok/controller/limits" "github.com/openziti/zrok/controller/limits"
"github.com/openziti/zrok/controller/metrics2" "github.com/openziti/zrok/controller/metrics"
"github.com/openziti/zrok/controller/zrokEdgeSdk" "github.com/openziti/zrok/controller/zrokEdgeSdk"
"time" "time"
@ -17,12 +17,12 @@ const ConfigVersion = 2
type Config struct { type Config struct {
V int V int
Admin *AdminConfig Admin *AdminConfig
Bridge *metrics2.BridgeConfig Bridge *metrics.BridgeConfig
Endpoint *EndpointConfig Endpoint *EndpointConfig
Email *EmailConfig Email *EmailConfig
Limits *limits.Config Limits *limits.Config
Maintenance *MaintenanceConfig Maintenance *MaintenanceConfig
Metrics *metrics2.Config Metrics *metrics.Config
Registration *RegistrationConfig Registration *RegistrationConfig
ResetPassword *ResetPasswordConfig ResetPassword *ResetPasswordConfig
Store *store.Config Store *store.Config

View File

@ -3,7 +3,7 @@ package controller
import ( import (
"context" "context"
"github.com/openziti/zrok/controller/config" "github.com/openziti/zrok/controller/config"
"github.com/openziti/zrok/controller/metrics2" "github.com/openziti/zrok/controller/metrics"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/go-openapi/loads" "github.com/go-openapi/loads"
@ -72,7 +72,7 @@ func Run(inCfg *config.Config) error {
} }
if cfg.Metrics != nil && cfg.Metrics.Agent != nil && cfg.Metrics.Influx != nil { if cfg.Metrics != nil && cfg.Metrics.Agent != nil && cfg.Metrics.Influx != nil {
ma, err := metrics2.NewAgent(cfg.Metrics.Agent, str, cfg.Metrics.Influx) ma, err := metrics.NewAgent(cfg.Metrics.Agent, str, cfg.Metrics.Influx)
if err != nil { if err != nil {
return errors.Wrap(err, "error creating metrics agent") return errors.Wrap(err, "error creating metrics agent")
} }

View File

@ -6,48 +6,45 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
type MetricsAgent struct { type Agent struct {
src Source events chan ZitiEventJson
src ZitiEventJsonSource
srcJoin chan struct{}
cache *cache cache *cache
join chan struct{} snk UsageSink
} }
func Run(cfg *Config, strCfg *store.Config) (*MetricsAgent, error) { func NewAgent(cfg *AgentConfig, str *store.Store, ifxCfg *InfluxConfig) (*Agent, error) {
logrus.Info("starting") a := &Agent{}
if v, ok := cfg.Source.(ZitiEventJsonSource); ok {
a.src = v
} else {
return nil, errors.New("invalid event json source")
}
a.cache = newShareCache(str)
a.snk = newInfluxWriter(ifxCfg)
return a, nil
}
cache, err := newShareCache(strCfg) func (a *Agent) Start() error {
a.events = make(chan ZitiEventJson)
srcJoin, err := a.src.Start(a.events)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "error creating share cache") return err
}
if cfg.Strategies == nil || cfg.Strategies.Source == nil {
return nil, errors.New("no 'strategies/source' configured; exiting")
}
src, ok := cfg.Strategies.Source.(Source)
if !ok {
return nil, errors.New("invalid 'strategies/source'; exiting")
}
if cfg.Influx == nil {
return nil, errors.New("no 'influx' configured; exiting")
}
idb := openInfluxDb(cfg.Influx)
events := make(chan map[string]interface{})
join, err := src.Start(events)
if err != nil {
return nil, errors.Wrap(err, "error starting source")
} }
a.srcJoin = srcJoin
go func() { go func() {
logrus.Info("started")
defer logrus.Info("stopped")
for { for {
select { select {
case event := <-events: case event := <-a.events:
usage := Ingest(event) if usage, err := Ingest(event); err == nil {
if err := cache.addZrokDetail(usage); err == nil { if err := a.cache.addZrokDetail(usage); err != nil {
if err := idb.Write(usage); err != nil { logrus.Error(err)
}
if err := a.snk.Handle(usage); err != nil {
logrus.Error(err) logrus.Error(err)
} }
} else { } else {
@ -57,14 +54,10 @@ func Run(cfg *Config, strCfg *store.Config) (*MetricsAgent, error) {
} }
}() }()
return &MetricsAgent{src: src, join: join}, nil return nil
} }
func (ma *MetricsAgent) Stop() { func (a *Agent) Stop() {
logrus.Info("stopping") a.src.Stop()
ma.src.Stop() close(a.events)
}
func (ma *MetricsAgent) Join() {
<-ma.join
} }

View File

@ -1,51 +0,0 @@
package metrics
import (
"context"
"github.com/pkg/errors"
amqp "github.com/rabbitmq/amqp091-go"
"time"
)
type AmqpSenderConfig struct {
Url string `cf:"+secret"`
Queue string
}
type AmqpSender struct {
conn *amqp.Connection
ch *amqp.Channel
queue amqp.Queue
}
func NewAmqpSender(cfg *AmqpSenderConfig) (*AmqpSender, error) {
conn, err := amqp.Dial(cfg.Url)
if err != nil {
return nil, errors.Wrap(err, "error dialing amqp broker")
}
ch, err := conn.Channel()
if err != nil {
return nil, errors.Wrap(err, "error getting channel from amqp connection")
}
queue, err := ch.QueueDeclare(cfg.Queue, true, false, false, false, nil)
if err != nil {
return nil, errors.Wrap(err, "error creating amqp queue")
}
return &AmqpSender{conn, ch, queue}, nil
}
func (s *AmqpSender) Send(json string) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := s.ch.PublishWithContext(ctx, "", s.queue.Name, false, false, amqp.Publishing{
ContentType: "application/json",
Body: []byte(json),
})
if err != nil {
return errors.Wrap(err, "error sending")
}
return nil
}

View File

@ -1,4 +1,4 @@
package metrics2 package metrics
import ( import (
"context" "context"

View File

@ -1,4 +1,4 @@
package metrics2 package metrics
import ( import (
"github.com/michaelquigley/cf" "github.com/michaelquigley/cf"

View File

@ -1,4 +1,4 @@
package metrics2 package metrics
import ( import (
"github.com/pkg/errors" "github.com/pkg/errors"

View File

@ -2,34 +2,29 @@ package metrics
import ( import (
"github.com/openziti/zrok/controller/store" "github.com/openziti/zrok/controller/store"
"github.com/pkg/errors"
) )
type cache struct { type cache struct {
str *store.Store str *store.Store
} }
func newShareCache(cfg *store.Config) (*cache, error) { func newShareCache(str *store.Store) *cache {
str, err := store.Open(cfg) return &cache{str}
if err != nil {
return nil, errors.Wrap(err, "error opening store")
}
return &cache{str}, nil
} }
func (sc *cache) addZrokDetail(u *Usage) error { func (c *cache) addZrokDetail(u *Usage) error {
tx, err := sc.str.Begin() tx, err := c.str.Begin()
if err != nil { if err != nil {
return err return err
} }
defer func() { _ = tx.Rollback() }() defer func() { _ = tx.Rollback() }()
shr, err := sc.str.FindShareWithZIdAndDeleted(u.ZitiServiceId, tx) shr, err := c.str.FindShareWithZIdAndDeleted(u.ZitiServiceId, tx)
if err != nil { if err != nil {
return err return err
} }
u.ShareToken = shr.Token u.ShareToken = shr.Token
env, err := sc.str.GetEnvironment(shr.EnvironmentId, tx) env, err := c.str.GetEnvironment(shr.EnvironmentId, tx)
if err != nil { if err != nil {
return err return err
} }

View File

@ -2,7 +2,11 @@ package metrics
type Config struct { type Config struct {
Influx *InfluxConfig Influx *InfluxConfig
Strategies *StrategiesConfig Agent *AgentConfig
}
type AgentConfig struct {
Source interface{}
} }
type InfluxConfig struct { type InfluxConfig struct {
@ -11,7 +15,3 @@ type InfluxConfig struct {
Org string Org string
Token string `cf:"+secret"` Token string `cf:"+secret"`
} }
type StrategiesConfig struct {
Source interface{}
}

View File

@ -2,7 +2,6 @@ package metrics
import ( import (
"encoding/binary" "encoding/binary"
"encoding/json"
"github.com/michaelquigley/cf" "github.com/michaelquigley/cf"
"github.com/nxadm/tail" "github.com/nxadm/tail"
"github.com/openziti/zrok/controller/env" "github.com/openziti/zrok/controller/env"
@ -12,12 +11,12 @@ import (
) )
func init() { func init() {
env.GetCfOptions().AddFlexibleSetter("file", loadFileSourceConfig) env.GetCfOptions().AddFlexibleSetter("fileSource", loadFileSourceConfig)
} }
type FileSourceConfig struct { type FileSourceConfig struct {
Path string Path string
IndexPath string PointerPath string
} }
func loadFileSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) { func loadFileSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) {
@ -28,36 +27,36 @@ func loadFileSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) {
} }
return &fileSource{cfg: cfg}, nil return &fileSource{cfg: cfg}, nil
} }
return nil, errors.New("invalid config structure for 'file' source") return nil, errors.New("invalid config structure for 'fileSource'")
} }
type fileSource struct { type fileSource struct {
cfg *FileSourceConfig cfg *FileSourceConfig
ptrF *os.File
t *tail.Tail t *tail.Tail
} }
func (s *fileSource) Start(events chan map[string]interface{}) (join chan struct{}, err error) { func (s *fileSource) Start(events chan ZitiEventJson) (join chan struct{}, err error) {
f, err := os.Open(s.cfg.Path) f, err := os.Open(s.cfg.Path)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "error opening '%v'", s.cfg.Path) return nil, errors.Wrapf(err, "error opening '%v'", s.cfg.Path)
} }
_ = f.Close() _ = f.Close()
idxF, err := os.OpenFile(s.indexPath(), os.O_CREATE|os.O_RDWR, os.ModePerm) s.ptrF, err = os.OpenFile(s.pointerPath(), os.O_CREATE|os.O_RDWR, os.ModePerm)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "error opening '%v'", s.indexPath()) return nil, errors.Wrapf(err, "error opening pointer '%v'", s.pointerPath())
} }
pos := int64(0) ptr, err := s.readPtr()
posBuf := make([]byte, 8) if err != nil {
if n, err := idxF.Read(posBuf); err == nil && n == 8 { logrus.Errorf("error reading pointer: %v", err)
pos = int64(binary.LittleEndian.Uint64(posBuf))
logrus.Infof("recovered stored position: %d", pos)
} }
logrus.Infof("retrieved stored position pointer at '%d'", ptr)
join = make(chan struct{}) join = make(chan struct{})
go func() { go func() {
s.tail(pos, events, idxF) s.tail(ptr, events)
close(join) close(join)
}() }()
@ -70,43 +69,62 @@ func (s *fileSource) Stop() {
} }
} }
func (s *fileSource) tail(pos int64, events chan map[string]interface{}, idxF *os.File) { func (s *fileSource) tail(ptr int64, events chan ZitiEventJson) {
logrus.Infof("started") logrus.Info("started")
defer logrus.Infof("stopped") defer logrus.Info("stopped")
posBuf := make([]byte, 8)
var err error var err error
s.t, err = tail.TailFile(s.cfg.Path, tail.Config{ s.t, err = tail.TailFile(s.cfg.Path, tail.Config{
ReOpen: true, ReOpen: true,
Follow: true, Follow: true,
Location: &tail.SeekInfo{Offset: pos}, Location: &tail.SeekInfo{Offset: ptr},
}) })
if err != nil { if err != nil {
logrus.Error(err) logrus.Errorf("error starting tail: %v", err)
return return
} }
for line := range s.t.Lines { for event := range s.t.Lines {
event := make(map[string]interface{}) events <- ZitiEventJson(event.Text)
if err := json.Unmarshal([]byte(line.Text), &event); err == nil {
binary.LittleEndian.PutUint64(posBuf, uint64(line.SeekInfo.Offset)) if err := s.writePtr(event.SeekInfo.Offset); err != nil {
if n, err := idxF.Seek(0, 0); err == nil && n == 0 { logrus.Error(err)
if n, err := idxF.Write(posBuf); err != nil || n != 8 {
logrus.Errorf("error writing index (%d): %v", n, err)
}
}
events <- event
} else {
logrus.Errorf("error parsing line #%d: %v", line.Num, err)
} }
} }
} }
func (s *fileSource) indexPath() string { func (s *fileSource) pointerPath() string {
if s.cfg.IndexPath == "" { if s.cfg.PointerPath == "" {
return s.cfg.Path + ".idx" return s.cfg.Path + ".ptr"
} else { } else {
return s.cfg.IndexPath return s.cfg.PointerPath
} }
} }
func (s *fileSource) readPtr() (int64, error) {
ptr := int64(0)
buf := make([]byte, 8)
if n, err := s.ptrF.Seek(0, 0); err == nil && n == 0 {
if n, err := s.ptrF.Read(buf); err == nil && n == 8 {
ptr = int64(binary.LittleEndian.Uint64(buf))
return ptr, nil
} else {
return 0, errors.Wrapf(err, "error reading pointer (%d): %v", n, err)
}
} else {
return 0, errors.Wrapf(err, "error seeking pointer (%d): %v", n, err)
}
}
func (s *fileSource) writePtr(ptr int64) error {
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, uint64(ptr))
if n, err := s.ptrF.Seek(0, 0); err == nil && n == 0 {
if n, err := s.ptrF.Write(buf); err != nil || n != 8 {
return errors.Wrapf(err, "error writing pointer (%d): %v", n, err)
}
} else {
return errors.Wrapf(err, "error seeking pointer (%d): %v", n, err)
}
return nil
}

View File

@ -1,61 +0,0 @@
package metrics
import (
"context"
"fmt"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/write"
"github.com/openziti/zrok/util"
"github.com/sirupsen/logrus"
)
type influxDb struct {
idb influxdb2.Client
writeApi api.WriteAPIBlocking
}
func openInfluxDb(cfg *InfluxConfig) *influxDb {
idb := influxdb2.NewClient(cfg.Url, cfg.Token)
wapi := idb.WriteAPIBlocking(cfg.Org, cfg.Bucket)
return &influxDb{idb, wapi}
}
func (i *influxDb) Write(u *Usage) error {
out := fmt.Sprintf("share: %v, circuit: %v", u.ShareToken, u.ZitiCircuitId)
envId := fmt.Sprintf("%d", u.EnvironmentId)
acctId := fmt.Sprintf("%d", u.AccountId)
var pts []*write.Point
circuitPt := influxdb2.NewPoint("circuits",
map[string]string{"share": u.ShareToken, "envId": envId, "acctId": acctId},
map[string]interface{}{"circuit": u.ZitiCircuitId},
u.IntervalStart)
pts = append(pts, circuitPt)
if u.BackendTx > 0 || u.BackendRx > 0 {
pt := influxdb2.NewPoint("xfer",
map[string]string{"namespace": "backend", "share": u.ShareToken, "envId": envId, "acctId": acctId},
map[string]interface{}{"bytesRead": u.BackendRx, "bytesWritten": u.BackendTx},
u.IntervalStart)
pts = append(pts, pt)
out += fmt.Sprintf(" backend {rx: %v, tx: %v}", util.BytesToSize(u.BackendRx), util.BytesToSize(u.BackendTx))
}
if u.FrontendTx > 0 || u.FrontendRx > 0 {
pt := influxdb2.NewPoint("xfer",
map[string]string{"namespace": "frontend", "share": u.ShareToken, "envId": envId, "acctId": acctId},
map[string]interface{}{"bytesRead": u.FrontendRx, "bytesWritten": u.FrontendTx},
u.IntervalStart)
pts = append(pts, pt)
out += fmt.Sprintf(" frontend {rx: %v, tx: %v}", util.BytesToSize(u.FrontendRx), util.BytesToSize(u.FrontendTx))
}
if err := i.writeApi.WritePoint(context.Background(), pts...); err == nil {
logrus.Info(out)
} else {
return err
}
return nil
}

View File

@ -1,4 +1,4 @@
package metrics2 package metrics
import ( import (
"context" "context"

View File

@ -35,11 +35,17 @@ func (u Usage) String() string {
return out return out
} }
type Source interface { type UsageSink interface {
Start(chan map[string]interface{}) (chan struct{}, error) Handle(u *Usage) error
}
type ZitiEventJson string
type ZitiEventJsonSource interface {
Start(chan ZitiEventJson) (join chan struct{}, err error)
Stop() Stop()
} }
type Ingester interface { type ZitiEventJsonSink interface {
Ingest(msg map[string]interface{}) error Handle(event ZitiEventJson) error
} }

View File

@ -1,4 +1,4 @@
package metrics2 package metrics
import ( import (
"encoding/json" "encoding/json"

View File

@ -1,95 +0,0 @@
package metrics
import (
"github.com/sirupsen/logrus"
"reflect"
"time"
)
func Ingest(event map[string]interface{}) *Usage {
u := &Usage{ProcessedStamp: time.Now()}
if ns, found := event["namespace"]; found && ns == "fabric.usage" {
if v, found := event["interval_start_utc"]; found {
if vFloat64, ok := v.(float64); ok {
u.IntervalStart = time.Unix(int64(vFloat64), 0)
} else {
logrus.Error("unable to assert 'interval_start_utc'")
}
} else {
logrus.Error("missing 'interval_start_utc'")
}
if v, found := event["tags"]; found {
if tags, ok := v.(map[string]interface{}); ok {
if v, found := tags["serviceId"]; found {
if vStr, ok := v.(string); ok {
u.ZitiServiceId = vStr
} else {
logrus.Error("unable to assert 'tags/serviceId'")
}
} else {
logrus.Error("missing 'tags/serviceId'")
}
} else {
logrus.Errorf("unable to assert 'tags'")
}
} else {
logrus.Errorf("missing 'tags'")
}
if v, found := event["usage"]; found {
if usage, ok := v.(map[string]interface{}); ok {
if v, found := usage["ingress.tx"]; found {
if vFloat64, ok := v.(float64); ok {
u.FrontendTx = int64(vFloat64)
} else {
logrus.Error("unable to assert 'usage/ingress.tx'")
}
} else {
logrus.Warn("missing 'usage/ingress.tx'")
}
if v, found := usage["ingress.rx"]; found {
if vFloat64, ok := v.(float64); ok {
u.FrontendRx = int64(vFloat64)
} else {
logrus.Error("unable to assert 'usage/ingress.rx")
}
} else {
logrus.Warn("missing 'usage/ingress.rx")
}
if v, found := usage["egress.tx"]; found {
if vFloat64, ok := v.(float64); ok {
u.BackendTx = int64(vFloat64)
} else {
logrus.Error("unable to assert 'usage/egress.tx'")
}
} else {
logrus.Warn("missing 'usage/egress.tx'")
}
if v, found := usage["egress.rx"]; found {
if vFloat64, ok := v.(float64); ok {
u.BackendRx = int64(vFloat64)
} else {
logrus.Error("unable to assert 'usage/egress.rx'")
}
} else {
logrus.Warn("missing 'usage/egress.rx'")
}
} else {
logrus.Errorf("unable to assert 'usage' (%v) %v", reflect.TypeOf(v), event)
}
} else {
logrus.Warnf("missing 'usage'")
}
if v, found := event["circuit_id"]; found {
if vStr, ok := v.(string); ok {
u.ZitiCircuitId = vStr
} else {
logrus.Error("unable to assert 'circuit_id'")
}
} else {
logrus.Warn("missing 'circuit_id'")
}
} else {
logrus.Errorf("not 'fabric.usage'")
}
return u
}

View File

@ -1,7 +1,6 @@
package metrics package metrics
import ( import (
"bytes"
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
"encoding/json" "encoding/json"
@ -24,14 +23,14 @@ import (
) )
func init() { func init() {
env.GetCfOptions().AddFlexibleSetter("websocket", loadWebsocketSourceConfig) env.GetCfOptions().AddFlexibleSetter("websocketSource", loadWebsocketSourceConfig)
} }
type WebsocketSourceConfig struct { type WebsocketSourceConfig struct {
WebsocketEndpoint string WebsocketEndpoint string
ApiEndpoint string ApiEndpoint string
Username string Username string
Password string Password string `cf:"+secret"`
} }
func loadWebsocketSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) { func loadWebsocketSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) {
@ -42,17 +41,17 @@ func loadWebsocketSourceConfig(v interface{}, _ *cf.Options) (interface{}, error
} }
return &websocketSource{cfg: cfg}, nil return &websocketSource{cfg: cfg}, nil
} }
return nil, errors.New("invalid config structure for 'websocket' source") return nil, errors.New("invalid config struture for 'websocketSource'")
} }
type websocketSource struct { type websocketSource struct {
cfg *WebsocketSourceConfig cfg *WebsocketSourceConfig
ch channel.Channel ch channel.Channel
events chan map[string]interface{} events chan ZitiEventJson
join chan struct{} join chan struct{}
} }
func (s *websocketSource) Start(events chan map[string]interface{}) (chan struct{}, error) { func (s *websocketSource) Start(events chan ZitiEventJson) (join chan struct{}, err error) {
caCerts, err := rest_util.GetControllerWellKnownCas(s.cfg.ApiEndpoint) caCerts, err := rest_util.GetControllerWellKnownCas(s.cfg.ApiEndpoint)
if err != nil { if err != nil {
return nil, err return nil, err
@ -151,17 +150,5 @@ func (s *websocketSource) Stop() {
} }
func (s *websocketSource) HandleReceive(msg *channel.Message, _ channel.Channel) { func (s *websocketSource) HandleReceive(msg *channel.Message, _ channel.Channel) {
decoder := json.NewDecoder(bytes.NewReader(msg.Body)) s.events <- ZitiEventJson(msg.Body)
for {
ev := make(map[string]interface{})
err := decoder.Decode(&ev)
if err == io.EOF {
break
}
if err == nil {
s.events <- ev
} else {
logrus.Errorf("error parsing '%v': %v", string(msg.Body), err)
}
}
} }

View File

@ -1,63 +0,0 @@
package metrics2
import (
"github.com/openziti/zrok/controller/store"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type Agent struct {
events chan ZitiEventJson
src ZitiEventJsonSource
srcJoin chan struct{}
cache *cache
snk UsageSink
}
func NewAgent(cfg *AgentConfig, str *store.Store, ifxCfg *InfluxConfig) (*Agent, error) {
a := &Agent{}
if v, ok := cfg.Source.(ZitiEventJsonSource); ok {
a.src = v
} else {
return nil, errors.New("invalid event json source")
}
a.cache = newShareCache(str)
a.snk = newInfluxWriter(ifxCfg)
return a, nil
}
func (a *Agent) Start() error {
a.events = make(chan ZitiEventJson)
srcJoin, err := a.src.Start(a.events)
if err != nil {
return err
}
a.srcJoin = srcJoin
go func() {
logrus.Info("started")
defer logrus.Info("stopped")
for {
select {
case event := <-a.events:
if usage, err := Ingest(event); err == nil {
if err := a.cache.addZrokDetail(usage); err != nil {
logrus.Error(err)
}
if err := a.snk.Handle(usage); err != nil {
logrus.Error(err)
}
} else {
logrus.Error(err)
}
}
}
}()
return nil
}
func (a *Agent) Stop() {
a.src.Stop()
close(a.events)
}

View File

@ -1,35 +0,0 @@
package metrics2
import (
"github.com/openziti/zrok/controller/store"
)
type cache struct {
str *store.Store
}
func newShareCache(str *store.Store) *cache {
return &cache{str}
}
func (c *cache) addZrokDetail(u *Usage) error {
tx, err := c.str.Begin()
if err != nil {
return err
}
defer func() { _ = tx.Rollback() }()
shr, err := c.str.FindShareWithZIdAndDeleted(u.ZitiServiceId, tx)
if err != nil {
return err
}
u.ShareToken = shr.Token
env, err := c.str.GetEnvironment(shr.EnvironmentId, tx)
if err != nil {
return err
}
u.EnvironmentId = int64(env.Id)
u.AccountId = int64(*env.AccountId)
return nil
}

View File

@ -1,17 +0,0 @@
package metrics2
type Config struct {
Influx *InfluxConfig
Agent *AgentConfig
}
type AgentConfig struct {
Source interface{}
}
type InfluxConfig struct {
Url string
Bucket string
Org string
Token string `cf:"+secret"`
}

View File

@ -1,130 +0,0 @@
package metrics2
import (
"encoding/binary"
"github.com/michaelquigley/cf"
"github.com/nxadm/tail"
"github.com/openziti/zrok/controller/env"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"os"
)
func init() {
env.GetCfOptions().AddFlexibleSetter("fileSource", loadFileSourceConfig)
}
type FileSourceConfig struct {
Path string
PointerPath string
}
func loadFileSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) {
if submap, ok := v.(map[string]interface{}); ok {
cfg := &FileSourceConfig{}
if err := cf.Bind(cfg, submap, cf.DefaultOptions()); err != nil {
return nil, err
}
return &fileSource{cfg: cfg}, nil
}
return nil, errors.New("invalid config structure for 'fileSource'")
}
type fileSource struct {
cfg *FileSourceConfig
ptrF *os.File
t *tail.Tail
}
func (s *fileSource) Start(events chan ZitiEventJson) (join chan struct{}, err error) {
f, err := os.Open(s.cfg.Path)
if err != nil {
return nil, errors.Wrapf(err, "error opening '%v'", s.cfg.Path)
}
_ = f.Close()
s.ptrF, err = os.OpenFile(s.pointerPath(), os.O_CREATE|os.O_RDWR, os.ModePerm)
if err != nil {
return nil, errors.Wrapf(err, "error opening pointer '%v'", s.pointerPath())
}
ptr, err := s.readPtr()
if err != nil {
logrus.Errorf("error reading pointer: %v", err)
}
logrus.Infof("retrieved stored position pointer at '%d'", ptr)
join = make(chan struct{})
go func() {
s.tail(ptr, events)
close(join)
}()
return join, nil
}
func (s *fileSource) Stop() {
if err := s.t.Stop(); err != nil {
logrus.Error(err)
}
}
func (s *fileSource) tail(ptr int64, events chan ZitiEventJson) {
logrus.Info("started")
defer logrus.Info("stopped")
var err error
s.t, err = tail.TailFile(s.cfg.Path, tail.Config{
ReOpen: true,
Follow: true,
Location: &tail.SeekInfo{Offset: ptr},
})
if err != nil {
logrus.Errorf("error starting tail: %v", err)
return
}
for event := range s.t.Lines {
events <- ZitiEventJson(event.Text)
if err := s.writePtr(event.SeekInfo.Offset); err != nil {
logrus.Error(err)
}
}
}
func (s *fileSource) pointerPath() string {
if s.cfg.PointerPath == "" {
return s.cfg.Path + ".ptr"
} else {
return s.cfg.PointerPath
}
}
func (s *fileSource) readPtr() (int64, error) {
ptr := int64(0)
buf := make([]byte, 8)
if n, err := s.ptrF.Seek(0, 0); err == nil && n == 0 {
if n, err := s.ptrF.Read(buf); err == nil && n == 8 {
ptr = int64(binary.LittleEndian.Uint64(buf))
return ptr, nil
} else {
return 0, errors.Wrapf(err, "error reading pointer (%d): %v", n, err)
}
} else {
return 0, errors.Wrapf(err, "error seeking pointer (%d): %v", n, err)
}
}
func (s *fileSource) writePtr(ptr int64) error {
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, uint64(ptr))
if n, err := s.ptrF.Seek(0, 0); err == nil && n == 0 {
if n, err := s.ptrF.Write(buf); err != nil || n != 8 {
return errors.Wrapf(err, "error writing pointer (%d): %v", n, err)
}
} else {
return errors.Wrapf(err, "error seeking pointer (%d): %v", n, err)
}
return nil
}

View File

@ -1,51 +0,0 @@
package metrics2
import (
"fmt"
"github.com/openziti/zrok/util"
"time"
)
type Usage struct {
ProcessedStamp time.Time
IntervalStart time.Time
ZitiServiceId string
ZitiCircuitId string
ShareToken string
EnvironmentId int64
AccountId int64
FrontendTx int64
FrontendRx int64
BackendTx int64
BackendRx int64
}
func (u Usage) String() string {
out := "Usage {"
out += fmt.Sprintf("processed '%v'", u.ProcessedStamp)
out += ", " + fmt.Sprintf("interval '%v'", u.IntervalStart)
out += ", " + fmt.Sprintf("service '%v'", u.ZitiServiceId)
out += ", " + fmt.Sprintf("circuit '%v'", u.ZitiCircuitId)
out += ", " + fmt.Sprintf("share '%v'", u.ShareToken)
out += ", " + fmt.Sprintf("environment '%d'", u.EnvironmentId)
out += ", " + fmt.Sprintf("account '%v'", u.AccountId)
out += ", " + fmt.Sprintf("fe {rx %v, tx %v}", util.BytesToSize(u.FrontendRx), util.BytesToSize(u.FrontendTx))
out += ", " + fmt.Sprintf("be {rx %v, tx %v}", util.BytesToSize(u.BackendRx), util.BytesToSize(u.BackendTx))
out += "}"
return out
}
type UsageSink interface {
Handle(u *Usage) error
}
type ZitiEventJson string
type ZitiEventJsonSource interface {
Start(chan ZitiEventJson) (join chan struct{}, err error)
Stop()
}
type ZitiEventJsonSink interface {
Handle(event ZitiEventJson) error
}

View File

@ -1,154 +0,0 @@
package metrics2
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"github.com/gorilla/websocket"
"github.com/michaelquigley/cf"
"github.com/openziti/channel/v2"
"github.com/openziti/channel/v2/websockets"
"github.com/openziti/edge/rest_util"
"github.com/openziti/fabric/event"
"github.com/openziti/fabric/pb/mgmt_pb"
"github.com/openziti/identity"
"github.com/openziti/sdk-golang/ziti/constants"
"github.com/openziti/zrok/controller/env"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"io"
"net/http"
"net/url"
"time"
)
func init() {
env.GetCfOptions().AddFlexibleSetter("websocketSource", loadWebsocketSourceConfig)
}
type WebsocketSourceConfig struct {
WebsocketEndpoint string
ApiEndpoint string
Username string
Password string `cf:"+secret"`
}
func loadWebsocketSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) {
if submap, ok := v.(map[string]interface{}); ok {
cfg := &WebsocketSourceConfig{}
if err := cf.Bind(cfg, submap, cf.DefaultOptions()); err != nil {
return nil, err
}
return &websocketSource{cfg: cfg}, nil
}
return nil, errors.New("invalid config struture for 'websocketSource'")
}
type websocketSource struct {
cfg *WebsocketSourceConfig
ch channel.Channel
events chan ZitiEventJson
join chan struct{}
}
func (s *websocketSource) Start(events chan ZitiEventJson) (join chan struct{}, err error) {
caCerts, err := rest_util.GetControllerWellKnownCas(s.cfg.ApiEndpoint)
if err != nil {
return nil, err
}
caPool := x509.NewCertPool()
for _, ca := range caCerts {
caPool.AddCert(ca)
}
authenticator := rest_util.NewAuthenticatorUpdb(s.cfg.Username, s.cfg.Password)
authenticator.RootCas = caPool
apiEndpointUrl, err := url.Parse(s.cfg.ApiEndpoint)
if err != nil {
return nil, err
}
apiSession, err := authenticator.Authenticate(apiEndpointUrl)
if err != nil {
return nil, err
}
dialer := &websocket.Dialer{
TLSClientConfig: &tls.Config{
RootCAs: caPool,
},
HandshakeTimeout: 5 * time.Second,
}
conn, resp, err := dialer.Dial(s.cfg.WebsocketEndpoint, http.Header{constants.ZitiSession: []string{*apiSession.Token}})
if err != nil {
if resp != nil {
if body, rerr := io.ReadAll(resp.Body); rerr == nil {
logrus.Errorf("response body '%v': %v", string(body), err)
}
} else {
logrus.Errorf("no response from websocket dial: %v", err)
}
}
id := &identity.TokenId{Token: "mgmt"}
underlayFactory := websockets.NewUnderlayFactory(id, conn, nil)
s.join = make(chan struct{})
s.events = events
bindHandler := func(binding channel.Binding) error {
binding.AddReceiveHandler(int32(mgmt_pb.ContentType_StreamEventsEventType), s)
binding.AddCloseHandler(channel.CloseHandlerF(func(ch channel.Channel) {
close(s.join)
}))
return nil
}
s.ch, err = channel.NewChannel("mgmt", underlayFactory, channel.BindHandlerF(bindHandler), nil)
if err != nil {
return nil, err
}
streamEventsRequest := map[string]interface{}{}
streamEventsRequest["format"] = "json"
streamEventsRequest["subscriptions"] = []*event.Subscription{
{
Type: "fabric.usage",
Options: map[string]interface{}{
"version": uint8(3),
},
},
}
msgBytes, err := json.Marshal(streamEventsRequest)
if err != nil {
return nil, err
}
requestMsg := channel.NewMessage(int32(mgmt_pb.ContentType_StreamEventsRequestType), msgBytes)
responseMsg, err := requestMsg.WithTimeout(5 * time.Second).SendForReply(s.ch)
if err != nil {
return nil, err
}
if responseMsg.ContentType == channel.ContentTypeResultType {
result := channel.UnmarshalResult(responseMsg)
if result.Success {
logrus.Infof("event stream started: %v", result.Message)
} else {
return nil, errors.Wrap(err, "error starting event streaming")
}
} else {
return nil, errors.Errorf("unexpected response type %v", responseMsg.ContentType)
}
return s.join, nil
}
func (s *websocketSource) Stop() {
_ = s.ch.Close()
}
func (s *websocketSource) HandleReceive(msg *channel.Message, _ channel.Channel) {
s.events <- ZitiEventJson(msg.Body)
}