metrics2 -> metrics (#270)

This commit is contained in:
Michael Quigley 2023-03-15 16:14:06 -04:00
parent 3f7db68ed7
commit 86126b3f53
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
23 changed files with 131 additions and 789 deletions

View File

@ -4,7 +4,7 @@ import (
"github.com/michaelquigley/cf"
"github.com/openziti/zrok/controller/config"
"github.com/openziti/zrok/controller/env"
"github.com/openziti/zrok/controller/metrics2"
"github.com/openziti/zrok/controller/metrics"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"os"
@ -39,7 +39,7 @@ func (cmd *bridgeCommand) run(_ *cobra.Command, args []string) {
}
logrus.Infof(cf.Dump(cfg, env.GetCfOptions()))
bridge, err := metrics2.NewBridge(cfg.Bridge)
bridge, err := metrics.NewBridge(cfg.Bridge)
if err != nil {
panic(err)
}

View File

@ -3,7 +3,7 @@ package config
import (
"github.com/openziti/zrok/controller/env"
"github.com/openziti/zrok/controller/limits"
"github.com/openziti/zrok/controller/metrics2"
"github.com/openziti/zrok/controller/metrics"
"github.com/openziti/zrok/controller/zrokEdgeSdk"
"time"
@ -17,12 +17,12 @@ const ConfigVersion = 2
type Config struct {
V int
Admin *AdminConfig
Bridge *metrics2.BridgeConfig
Bridge *metrics.BridgeConfig
Endpoint *EndpointConfig
Email *EmailConfig
Limits *limits.Config
Maintenance *MaintenanceConfig
Metrics *metrics2.Config
Metrics *metrics.Config
Registration *RegistrationConfig
ResetPassword *ResetPasswordConfig
Store *store.Config

View File

@ -3,7 +3,7 @@ package controller
import (
"context"
"github.com/openziti/zrok/controller/config"
"github.com/openziti/zrok/controller/metrics2"
"github.com/openziti/zrok/controller/metrics"
"github.com/sirupsen/logrus"
"github.com/go-openapi/loads"
@ -72,7 +72,7 @@ func Run(inCfg *config.Config) error {
}
if cfg.Metrics != nil && cfg.Metrics.Agent != nil && cfg.Metrics.Influx != nil {
ma, err := metrics2.NewAgent(cfg.Metrics.Agent, str, cfg.Metrics.Influx)
ma, err := metrics.NewAgent(cfg.Metrics.Agent, str, cfg.Metrics.Influx)
if err != nil {
return errors.Wrap(err, "error creating metrics agent")
}

View File

@ -6,48 +6,45 @@ import (
"github.com/sirupsen/logrus"
)
type MetricsAgent struct {
src Source
cache *cache
join chan struct{}
type Agent struct {
events chan ZitiEventJson
src ZitiEventJsonSource
srcJoin chan struct{}
cache *cache
snk UsageSink
}
func Run(cfg *Config, strCfg *store.Config) (*MetricsAgent, error) {
logrus.Info("starting")
func NewAgent(cfg *AgentConfig, str *store.Store, ifxCfg *InfluxConfig) (*Agent, error) {
a := &Agent{}
if v, ok := cfg.Source.(ZitiEventJsonSource); ok {
a.src = v
} else {
return nil, errors.New("invalid event json source")
}
a.cache = newShareCache(str)
a.snk = newInfluxWriter(ifxCfg)
return a, nil
}
cache, err := newShareCache(strCfg)
func (a *Agent) Start() error {
a.events = make(chan ZitiEventJson)
srcJoin, err := a.src.Start(a.events)
if err != nil {
return nil, errors.Wrap(err, "error creating share cache")
}
if cfg.Strategies == nil || cfg.Strategies.Source == nil {
return nil, errors.New("no 'strategies/source' configured; exiting")
}
src, ok := cfg.Strategies.Source.(Source)
if !ok {
return nil, errors.New("invalid 'strategies/source'; exiting")
}
if cfg.Influx == nil {
return nil, errors.New("no 'influx' configured; exiting")
}
idb := openInfluxDb(cfg.Influx)
events := make(chan map[string]interface{})
join, err := src.Start(events)
if err != nil {
return nil, errors.Wrap(err, "error starting source")
return err
}
a.srcJoin = srcJoin
go func() {
logrus.Info("started")
defer logrus.Info("stopped")
for {
select {
case event := <-events:
usage := Ingest(event)
if err := cache.addZrokDetail(usage); err == nil {
if err := idb.Write(usage); err != nil {
case event := <-a.events:
if usage, err := Ingest(event); err == nil {
if err := a.cache.addZrokDetail(usage); err != nil {
logrus.Error(err)
}
if err := a.snk.Handle(usage); err != nil {
logrus.Error(err)
}
} else {
@ -57,14 +54,10 @@ func Run(cfg *Config, strCfg *store.Config) (*MetricsAgent, error) {
}
}()
return &MetricsAgent{src: src, join: join}, nil
return nil
}
func (ma *MetricsAgent) Stop() {
logrus.Info("stopping")
ma.src.Stop()
}
func (ma *MetricsAgent) Join() {
<-ma.join
func (a *Agent) Stop() {
a.src.Stop()
close(a.events)
}

View File

@ -1,51 +0,0 @@
package metrics
import (
"context"
"github.com/pkg/errors"
amqp "github.com/rabbitmq/amqp091-go"
"time"
)
type AmqpSenderConfig struct {
Url string `cf:"+secret"`
Queue string
}
type AmqpSender struct {
conn *amqp.Connection
ch *amqp.Channel
queue amqp.Queue
}
func NewAmqpSender(cfg *AmqpSenderConfig) (*AmqpSender, error) {
conn, err := amqp.Dial(cfg.Url)
if err != nil {
return nil, errors.Wrap(err, "error dialing amqp broker")
}
ch, err := conn.Channel()
if err != nil {
return nil, errors.Wrap(err, "error getting channel from amqp connection")
}
queue, err := ch.QueueDeclare(cfg.Queue, true, false, false, false, nil)
if err != nil {
return nil, errors.Wrap(err, "error creating amqp queue")
}
return &AmqpSender{conn, ch, queue}, nil
}
func (s *AmqpSender) Send(json string) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := s.ch.PublishWithContext(ctx, "", s.queue.Name, false, false, amqp.Publishing{
ContentType: "application/json",
Body: []byte(json),
})
if err != nil {
return errors.Wrap(err, "error sending")
}
return nil
}

View File

@ -1,4 +1,4 @@
package metrics2
package metrics
import (
"context"

View File

@ -1,4 +1,4 @@
package metrics2
package metrics
import (
"github.com/michaelquigley/cf"

View File

@ -1,4 +1,4 @@
package metrics2
package metrics
import (
"github.com/pkg/errors"

View File

@ -2,34 +2,29 @@ package metrics
import (
"github.com/openziti/zrok/controller/store"
"github.com/pkg/errors"
)
type cache struct {
str *store.Store
}
func newShareCache(cfg *store.Config) (*cache, error) {
str, err := store.Open(cfg)
if err != nil {
return nil, errors.Wrap(err, "error opening store")
}
return &cache{str}, nil
func newShareCache(str *store.Store) *cache {
return &cache{str}
}
func (sc *cache) addZrokDetail(u *Usage) error {
tx, err := sc.str.Begin()
func (c *cache) addZrokDetail(u *Usage) error {
tx, err := c.str.Begin()
if err != nil {
return err
}
defer func() { _ = tx.Rollback() }()
shr, err := sc.str.FindShareWithZIdAndDeleted(u.ZitiServiceId, tx)
shr, err := c.str.FindShareWithZIdAndDeleted(u.ZitiServiceId, tx)
if err != nil {
return err
}
u.ShareToken = shr.Token
env, err := sc.str.GetEnvironment(shr.EnvironmentId, tx)
env, err := c.str.GetEnvironment(shr.EnvironmentId, tx)
if err != nil {
return err
}

View File

@ -1,8 +1,12 @@
package metrics
type Config struct {
Influx *InfluxConfig
Strategies *StrategiesConfig
Influx *InfluxConfig
Agent *AgentConfig
}
type AgentConfig struct {
Source interface{}
}
type InfluxConfig struct {
@ -11,7 +15,3 @@ type InfluxConfig struct {
Org string
Token string `cf:"+secret"`
}
type StrategiesConfig struct {
Source interface{}
}

View File

@ -2,7 +2,6 @@ package metrics
import (
"encoding/binary"
"encoding/json"
"github.com/michaelquigley/cf"
"github.com/nxadm/tail"
"github.com/openziti/zrok/controller/env"
@ -12,12 +11,12 @@ import (
)
func init() {
env.GetCfOptions().AddFlexibleSetter("file", loadFileSourceConfig)
env.GetCfOptions().AddFlexibleSetter("fileSource", loadFileSourceConfig)
}
type FileSourceConfig struct {
Path string
IndexPath string
Path string
PointerPath string
}
func loadFileSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) {
@ -28,36 +27,36 @@ func loadFileSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) {
}
return &fileSource{cfg: cfg}, nil
}
return nil, errors.New("invalid config structure for 'file' source")
return nil, errors.New("invalid config structure for 'fileSource'")
}
type fileSource struct {
cfg *FileSourceConfig
t *tail.Tail
cfg *FileSourceConfig
ptrF *os.File
t *tail.Tail
}
func (s *fileSource) Start(events chan map[string]interface{}) (join chan struct{}, err error) {
func (s *fileSource) Start(events chan ZitiEventJson) (join chan struct{}, err error) {
f, err := os.Open(s.cfg.Path)
if err != nil {
return nil, errors.Wrapf(err, "error opening '%v'", s.cfg.Path)
}
_ = f.Close()
idxF, err := os.OpenFile(s.indexPath(), os.O_CREATE|os.O_RDWR, os.ModePerm)
s.ptrF, err = os.OpenFile(s.pointerPath(), os.O_CREATE|os.O_RDWR, os.ModePerm)
if err != nil {
return nil, errors.Wrapf(err, "error opening '%v'", s.indexPath())
return nil, errors.Wrapf(err, "error opening pointer '%v'", s.pointerPath())
}
pos := int64(0)
posBuf := make([]byte, 8)
if n, err := idxF.Read(posBuf); err == nil && n == 8 {
pos = int64(binary.LittleEndian.Uint64(posBuf))
logrus.Infof("recovered stored position: %d", pos)
ptr, err := s.readPtr()
if err != nil {
logrus.Errorf("error reading pointer: %v", err)
}
logrus.Infof("retrieved stored position pointer at '%d'", ptr)
join = make(chan struct{})
go func() {
s.tail(pos, events, idxF)
s.tail(ptr, events)
close(join)
}()
@ -70,43 +69,62 @@ func (s *fileSource) Stop() {
}
}
func (s *fileSource) tail(pos int64, events chan map[string]interface{}, idxF *os.File) {
logrus.Infof("started")
defer logrus.Infof("stopped")
posBuf := make([]byte, 8)
func (s *fileSource) tail(ptr int64, events chan ZitiEventJson) {
logrus.Info("started")
defer logrus.Info("stopped")
var err error
s.t, err = tail.TailFile(s.cfg.Path, tail.Config{
ReOpen: true,
Follow: true,
Location: &tail.SeekInfo{Offset: pos},
Location: &tail.SeekInfo{Offset: ptr},
})
if err != nil {
logrus.Error(err)
logrus.Errorf("error starting tail: %v", err)
return
}
for line := range s.t.Lines {
event := make(map[string]interface{})
if err := json.Unmarshal([]byte(line.Text), &event); err == nil {
binary.LittleEndian.PutUint64(posBuf, uint64(line.SeekInfo.Offset))
if n, err := idxF.Seek(0, 0); err == nil && n == 0 {
if n, err := idxF.Write(posBuf); err != nil || n != 8 {
logrus.Errorf("error writing index (%d): %v", n, err)
}
}
events <- event
} else {
logrus.Errorf("error parsing line #%d: %v", line.Num, err)
for event := range s.t.Lines {
events <- ZitiEventJson(event.Text)
if err := s.writePtr(event.SeekInfo.Offset); err != nil {
logrus.Error(err)
}
}
}
func (s *fileSource) indexPath() string {
if s.cfg.IndexPath == "" {
return s.cfg.Path + ".idx"
func (s *fileSource) pointerPath() string {
if s.cfg.PointerPath == "" {
return s.cfg.Path + ".ptr"
} else {
return s.cfg.IndexPath
return s.cfg.PointerPath
}
}
func (s *fileSource) readPtr() (int64, error) {
ptr := int64(0)
buf := make([]byte, 8)
if n, err := s.ptrF.Seek(0, 0); err == nil && n == 0 {
if n, err := s.ptrF.Read(buf); err == nil && n == 8 {
ptr = int64(binary.LittleEndian.Uint64(buf))
return ptr, nil
} else {
return 0, errors.Wrapf(err, "error reading pointer (%d): %v", n, err)
}
} else {
return 0, errors.Wrapf(err, "error seeking pointer (%d): %v", n, err)
}
}
func (s *fileSource) writePtr(ptr int64) error {
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, uint64(ptr))
if n, err := s.ptrF.Seek(0, 0); err == nil && n == 0 {
if n, err := s.ptrF.Write(buf); err != nil || n != 8 {
return errors.Wrapf(err, "error writing pointer (%d): %v", n, err)
}
} else {
return errors.Wrapf(err, "error seeking pointer (%d): %v", n, err)
}
return nil
}

View File

@ -1,61 +0,0 @@
package metrics
import (
"context"
"fmt"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/write"
"github.com/openziti/zrok/util"
"github.com/sirupsen/logrus"
)
type influxDb struct {
idb influxdb2.Client
writeApi api.WriteAPIBlocking
}
func openInfluxDb(cfg *InfluxConfig) *influxDb {
idb := influxdb2.NewClient(cfg.Url, cfg.Token)
wapi := idb.WriteAPIBlocking(cfg.Org, cfg.Bucket)
return &influxDb{idb, wapi}
}
func (i *influxDb) Write(u *Usage) error {
out := fmt.Sprintf("share: %v, circuit: %v", u.ShareToken, u.ZitiCircuitId)
envId := fmt.Sprintf("%d", u.EnvironmentId)
acctId := fmt.Sprintf("%d", u.AccountId)
var pts []*write.Point
circuitPt := influxdb2.NewPoint("circuits",
map[string]string{"share": u.ShareToken, "envId": envId, "acctId": acctId},
map[string]interface{}{"circuit": u.ZitiCircuitId},
u.IntervalStart)
pts = append(pts, circuitPt)
if u.BackendTx > 0 || u.BackendRx > 0 {
pt := influxdb2.NewPoint("xfer",
map[string]string{"namespace": "backend", "share": u.ShareToken, "envId": envId, "acctId": acctId},
map[string]interface{}{"bytesRead": u.BackendRx, "bytesWritten": u.BackendTx},
u.IntervalStart)
pts = append(pts, pt)
out += fmt.Sprintf(" backend {rx: %v, tx: %v}", util.BytesToSize(u.BackendRx), util.BytesToSize(u.BackendTx))
}
if u.FrontendTx > 0 || u.FrontendRx > 0 {
pt := influxdb2.NewPoint("xfer",
map[string]string{"namespace": "frontend", "share": u.ShareToken, "envId": envId, "acctId": acctId},
map[string]interface{}{"bytesRead": u.FrontendRx, "bytesWritten": u.FrontendTx},
u.IntervalStart)
pts = append(pts, pt)
out += fmt.Sprintf(" frontend {rx: %v, tx: %v}", util.BytesToSize(u.FrontendRx), util.BytesToSize(u.FrontendTx))
}
if err := i.writeApi.WritePoint(context.Background(), pts...); err == nil {
logrus.Info(out)
} else {
return err
}
return nil
}

View File

@ -1,4 +1,4 @@
package metrics2
package metrics
import (
"context"

View File

@ -35,11 +35,17 @@ func (u Usage) String() string {
return out
}
type Source interface {
Start(chan map[string]interface{}) (chan struct{}, error)
type UsageSink interface {
Handle(u *Usage) error
}
type ZitiEventJson string
type ZitiEventJsonSource interface {
Start(chan ZitiEventJson) (join chan struct{}, err error)
Stop()
}
type Ingester interface {
Ingest(msg map[string]interface{}) error
type ZitiEventJsonSink interface {
Handle(event ZitiEventJson) error
}

View File

@ -1,4 +1,4 @@
package metrics2
package metrics
import (
"encoding/json"

View File

@ -1,95 +0,0 @@
package metrics
import (
"github.com/sirupsen/logrus"
"reflect"
"time"
)
func Ingest(event map[string]interface{}) *Usage {
u := &Usage{ProcessedStamp: time.Now()}
if ns, found := event["namespace"]; found && ns == "fabric.usage" {
if v, found := event["interval_start_utc"]; found {
if vFloat64, ok := v.(float64); ok {
u.IntervalStart = time.Unix(int64(vFloat64), 0)
} else {
logrus.Error("unable to assert 'interval_start_utc'")
}
} else {
logrus.Error("missing 'interval_start_utc'")
}
if v, found := event["tags"]; found {
if tags, ok := v.(map[string]interface{}); ok {
if v, found := tags["serviceId"]; found {
if vStr, ok := v.(string); ok {
u.ZitiServiceId = vStr
} else {
logrus.Error("unable to assert 'tags/serviceId'")
}
} else {
logrus.Error("missing 'tags/serviceId'")
}
} else {
logrus.Errorf("unable to assert 'tags'")
}
} else {
logrus.Errorf("missing 'tags'")
}
if v, found := event["usage"]; found {
if usage, ok := v.(map[string]interface{}); ok {
if v, found := usage["ingress.tx"]; found {
if vFloat64, ok := v.(float64); ok {
u.FrontendTx = int64(vFloat64)
} else {
logrus.Error("unable to assert 'usage/ingress.tx'")
}
} else {
logrus.Warn("missing 'usage/ingress.tx'")
}
if v, found := usage["ingress.rx"]; found {
if vFloat64, ok := v.(float64); ok {
u.FrontendRx = int64(vFloat64)
} else {
logrus.Error("unable to assert 'usage/ingress.rx")
}
} else {
logrus.Warn("missing 'usage/ingress.rx")
}
if v, found := usage["egress.tx"]; found {
if vFloat64, ok := v.(float64); ok {
u.BackendTx = int64(vFloat64)
} else {
logrus.Error("unable to assert 'usage/egress.tx'")
}
} else {
logrus.Warn("missing 'usage/egress.tx'")
}
if v, found := usage["egress.rx"]; found {
if vFloat64, ok := v.(float64); ok {
u.BackendRx = int64(vFloat64)
} else {
logrus.Error("unable to assert 'usage/egress.rx'")
}
} else {
logrus.Warn("missing 'usage/egress.rx'")
}
} else {
logrus.Errorf("unable to assert 'usage' (%v) %v", reflect.TypeOf(v), event)
}
} else {
logrus.Warnf("missing 'usage'")
}
if v, found := event["circuit_id"]; found {
if vStr, ok := v.(string); ok {
u.ZitiCircuitId = vStr
} else {
logrus.Error("unable to assert 'circuit_id'")
}
} else {
logrus.Warn("missing 'circuit_id'")
}
} else {
logrus.Errorf("not 'fabric.usage'")
}
return u
}

View File

@ -1,7 +1,6 @@
package metrics
import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
@ -24,14 +23,14 @@ import (
)
func init() {
env.GetCfOptions().AddFlexibleSetter("websocket", loadWebsocketSourceConfig)
env.GetCfOptions().AddFlexibleSetter("websocketSource", loadWebsocketSourceConfig)
}
type WebsocketSourceConfig struct {
WebsocketEndpoint string
ApiEndpoint string
Username string
Password string
Password string `cf:"+secret"`
}
func loadWebsocketSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) {
@ -42,17 +41,17 @@ func loadWebsocketSourceConfig(v interface{}, _ *cf.Options) (interface{}, error
}
return &websocketSource{cfg: cfg}, nil
}
return nil, errors.New("invalid config structure for 'websocket' source")
return nil, errors.New("invalid config struture for 'websocketSource'")
}
type websocketSource struct {
cfg *WebsocketSourceConfig
ch channel.Channel
events chan map[string]interface{}
events chan ZitiEventJson
join chan struct{}
}
func (s *websocketSource) Start(events chan map[string]interface{}) (chan struct{}, error) {
func (s *websocketSource) Start(events chan ZitiEventJson) (join chan struct{}, err error) {
caCerts, err := rest_util.GetControllerWellKnownCas(s.cfg.ApiEndpoint)
if err != nil {
return nil, err
@ -151,17 +150,5 @@ func (s *websocketSource) Stop() {
}
func (s *websocketSource) HandleReceive(msg *channel.Message, _ channel.Channel) {
decoder := json.NewDecoder(bytes.NewReader(msg.Body))
for {
ev := make(map[string]interface{})
err := decoder.Decode(&ev)
if err == io.EOF {
break
}
if err == nil {
s.events <- ev
} else {
logrus.Errorf("error parsing '%v': %v", string(msg.Body), err)
}
}
s.events <- ZitiEventJson(msg.Body)
}

View File

@ -1,63 +0,0 @@
package metrics2
import (
"github.com/openziti/zrok/controller/store"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type Agent struct {
events chan ZitiEventJson
src ZitiEventJsonSource
srcJoin chan struct{}
cache *cache
snk UsageSink
}
func NewAgent(cfg *AgentConfig, str *store.Store, ifxCfg *InfluxConfig) (*Agent, error) {
a := &Agent{}
if v, ok := cfg.Source.(ZitiEventJsonSource); ok {
a.src = v
} else {
return nil, errors.New("invalid event json source")
}
a.cache = newShareCache(str)
a.snk = newInfluxWriter(ifxCfg)
return a, nil
}
func (a *Agent) Start() error {
a.events = make(chan ZitiEventJson)
srcJoin, err := a.src.Start(a.events)
if err != nil {
return err
}
a.srcJoin = srcJoin
go func() {
logrus.Info("started")
defer logrus.Info("stopped")
for {
select {
case event := <-a.events:
if usage, err := Ingest(event); err == nil {
if err := a.cache.addZrokDetail(usage); err != nil {
logrus.Error(err)
}
if err := a.snk.Handle(usage); err != nil {
logrus.Error(err)
}
} else {
logrus.Error(err)
}
}
}
}()
return nil
}
func (a *Agent) Stop() {
a.src.Stop()
close(a.events)
}

View File

@ -1,35 +0,0 @@
package metrics2
import (
"github.com/openziti/zrok/controller/store"
)
type cache struct {
str *store.Store
}
func newShareCache(str *store.Store) *cache {
return &cache{str}
}
func (c *cache) addZrokDetail(u *Usage) error {
tx, err := c.str.Begin()
if err != nil {
return err
}
defer func() { _ = tx.Rollback() }()
shr, err := c.str.FindShareWithZIdAndDeleted(u.ZitiServiceId, tx)
if err != nil {
return err
}
u.ShareToken = shr.Token
env, err := c.str.GetEnvironment(shr.EnvironmentId, tx)
if err != nil {
return err
}
u.EnvironmentId = int64(env.Id)
u.AccountId = int64(*env.AccountId)
return nil
}

View File

@ -1,17 +0,0 @@
package metrics2
type Config struct {
Influx *InfluxConfig
Agent *AgentConfig
}
type AgentConfig struct {
Source interface{}
}
type InfluxConfig struct {
Url string
Bucket string
Org string
Token string `cf:"+secret"`
}

View File

@ -1,130 +0,0 @@
package metrics2
import (
"encoding/binary"
"github.com/michaelquigley/cf"
"github.com/nxadm/tail"
"github.com/openziti/zrok/controller/env"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"os"
)
func init() {
env.GetCfOptions().AddFlexibleSetter("fileSource", loadFileSourceConfig)
}
type FileSourceConfig struct {
Path string
PointerPath string
}
func loadFileSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) {
if submap, ok := v.(map[string]interface{}); ok {
cfg := &FileSourceConfig{}
if err := cf.Bind(cfg, submap, cf.DefaultOptions()); err != nil {
return nil, err
}
return &fileSource{cfg: cfg}, nil
}
return nil, errors.New("invalid config structure for 'fileSource'")
}
type fileSource struct {
cfg *FileSourceConfig
ptrF *os.File
t *tail.Tail
}
func (s *fileSource) Start(events chan ZitiEventJson) (join chan struct{}, err error) {
f, err := os.Open(s.cfg.Path)
if err != nil {
return nil, errors.Wrapf(err, "error opening '%v'", s.cfg.Path)
}
_ = f.Close()
s.ptrF, err = os.OpenFile(s.pointerPath(), os.O_CREATE|os.O_RDWR, os.ModePerm)
if err != nil {
return nil, errors.Wrapf(err, "error opening pointer '%v'", s.pointerPath())
}
ptr, err := s.readPtr()
if err != nil {
logrus.Errorf("error reading pointer: %v", err)
}
logrus.Infof("retrieved stored position pointer at '%d'", ptr)
join = make(chan struct{})
go func() {
s.tail(ptr, events)
close(join)
}()
return join, nil
}
func (s *fileSource) Stop() {
if err := s.t.Stop(); err != nil {
logrus.Error(err)
}
}
func (s *fileSource) tail(ptr int64, events chan ZitiEventJson) {
logrus.Info("started")
defer logrus.Info("stopped")
var err error
s.t, err = tail.TailFile(s.cfg.Path, tail.Config{
ReOpen: true,
Follow: true,
Location: &tail.SeekInfo{Offset: ptr},
})
if err != nil {
logrus.Errorf("error starting tail: %v", err)
return
}
for event := range s.t.Lines {
events <- ZitiEventJson(event.Text)
if err := s.writePtr(event.SeekInfo.Offset); err != nil {
logrus.Error(err)
}
}
}
func (s *fileSource) pointerPath() string {
if s.cfg.PointerPath == "" {
return s.cfg.Path + ".ptr"
} else {
return s.cfg.PointerPath
}
}
func (s *fileSource) readPtr() (int64, error) {
ptr := int64(0)
buf := make([]byte, 8)
if n, err := s.ptrF.Seek(0, 0); err == nil && n == 0 {
if n, err := s.ptrF.Read(buf); err == nil && n == 8 {
ptr = int64(binary.LittleEndian.Uint64(buf))
return ptr, nil
} else {
return 0, errors.Wrapf(err, "error reading pointer (%d): %v", n, err)
}
} else {
return 0, errors.Wrapf(err, "error seeking pointer (%d): %v", n, err)
}
}
func (s *fileSource) writePtr(ptr int64) error {
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, uint64(ptr))
if n, err := s.ptrF.Seek(0, 0); err == nil && n == 0 {
if n, err := s.ptrF.Write(buf); err != nil || n != 8 {
return errors.Wrapf(err, "error writing pointer (%d): %v", n, err)
}
} else {
return errors.Wrapf(err, "error seeking pointer (%d): %v", n, err)
}
return nil
}

View File

@ -1,51 +0,0 @@
package metrics2
import (
"fmt"
"github.com/openziti/zrok/util"
"time"
)
type Usage struct {
ProcessedStamp time.Time
IntervalStart time.Time
ZitiServiceId string
ZitiCircuitId string
ShareToken string
EnvironmentId int64
AccountId int64
FrontendTx int64
FrontendRx int64
BackendTx int64
BackendRx int64
}
func (u Usage) String() string {
out := "Usage {"
out += fmt.Sprintf("processed '%v'", u.ProcessedStamp)
out += ", " + fmt.Sprintf("interval '%v'", u.IntervalStart)
out += ", " + fmt.Sprintf("service '%v'", u.ZitiServiceId)
out += ", " + fmt.Sprintf("circuit '%v'", u.ZitiCircuitId)
out += ", " + fmt.Sprintf("share '%v'", u.ShareToken)
out += ", " + fmt.Sprintf("environment '%d'", u.EnvironmentId)
out += ", " + fmt.Sprintf("account '%v'", u.AccountId)
out += ", " + fmt.Sprintf("fe {rx %v, tx %v}", util.BytesToSize(u.FrontendRx), util.BytesToSize(u.FrontendTx))
out += ", " + fmt.Sprintf("be {rx %v, tx %v}", util.BytesToSize(u.BackendRx), util.BytesToSize(u.BackendTx))
out += "}"
return out
}
type UsageSink interface {
Handle(u *Usage) error
}
type ZitiEventJson string
type ZitiEventJsonSource interface {
Start(chan ZitiEventJson) (join chan struct{}, err error)
Stop()
}
type ZitiEventJsonSink interface {
Handle(event ZitiEventJson) error
}

View File

@ -1,154 +0,0 @@
package metrics2
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"github.com/gorilla/websocket"
"github.com/michaelquigley/cf"
"github.com/openziti/channel/v2"
"github.com/openziti/channel/v2/websockets"
"github.com/openziti/edge/rest_util"
"github.com/openziti/fabric/event"
"github.com/openziti/fabric/pb/mgmt_pb"
"github.com/openziti/identity"
"github.com/openziti/sdk-golang/ziti/constants"
"github.com/openziti/zrok/controller/env"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"io"
"net/http"
"net/url"
"time"
)
func init() {
env.GetCfOptions().AddFlexibleSetter("websocketSource", loadWebsocketSourceConfig)
}
type WebsocketSourceConfig struct {
WebsocketEndpoint string
ApiEndpoint string
Username string
Password string `cf:"+secret"`
}
func loadWebsocketSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) {
if submap, ok := v.(map[string]interface{}); ok {
cfg := &WebsocketSourceConfig{}
if err := cf.Bind(cfg, submap, cf.DefaultOptions()); err != nil {
return nil, err
}
return &websocketSource{cfg: cfg}, nil
}
return nil, errors.New("invalid config struture for 'websocketSource'")
}
type websocketSource struct {
cfg *WebsocketSourceConfig
ch channel.Channel
events chan ZitiEventJson
join chan struct{}
}
func (s *websocketSource) Start(events chan ZitiEventJson) (join chan struct{}, err error) {
caCerts, err := rest_util.GetControllerWellKnownCas(s.cfg.ApiEndpoint)
if err != nil {
return nil, err
}
caPool := x509.NewCertPool()
for _, ca := range caCerts {
caPool.AddCert(ca)
}
authenticator := rest_util.NewAuthenticatorUpdb(s.cfg.Username, s.cfg.Password)
authenticator.RootCas = caPool
apiEndpointUrl, err := url.Parse(s.cfg.ApiEndpoint)
if err != nil {
return nil, err
}
apiSession, err := authenticator.Authenticate(apiEndpointUrl)
if err != nil {
return nil, err
}
dialer := &websocket.Dialer{
TLSClientConfig: &tls.Config{
RootCAs: caPool,
},
HandshakeTimeout: 5 * time.Second,
}
conn, resp, err := dialer.Dial(s.cfg.WebsocketEndpoint, http.Header{constants.ZitiSession: []string{*apiSession.Token}})
if err != nil {
if resp != nil {
if body, rerr := io.ReadAll(resp.Body); rerr == nil {
logrus.Errorf("response body '%v': %v", string(body), err)
}
} else {
logrus.Errorf("no response from websocket dial: %v", err)
}
}
id := &identity.TokenId{Token: "mgmt"}
underlayFactory := websockets.NewUnderlayFactory(id, conn, nil)
s.join = make(chan struct{})
s.events = events
bindHandler := func(binding channel.Binding) error {
binding.AddReceiveHandler(int32(mgmt_pb.ContentType_StreamEventsEventType), s)
binding.AddCloseHandler(channel.CloseHandlerF(func(ch channel.Channel) {
close(s.join)
}))
return nil
}
s.ch, err = channel.NewChannel("mgmt", underlayFactory, channel.BindHandlerF(bindHandler), nil)
if err != nil {
return nil, err
}
streamEventsRequest := map[string]interface{}{}
streamEventsRequest["format"] = "json"
streamEventsRequest["subscriptions"] = []*event.Subscription{
{
Type: "fabric.usage",
Options: map[string]interface{}{
"version": uint8(3),
},
},
}
msgBytes, err := json.Marshal(streamEventsRequest)
if err != nil {
return nil, err
}
requestMsg := channel.NewMessage(int32(mgmt_pb.ContentType_StreamEventsRequestType), msgBytes)
responseMsg, err := requestMsg.WithTimeout(5 * time.Second).SendForReply(s.ch)
if err != nil {
return nil, err
}
if responseMsg.ContentType == channel.ContentTypeResultType {
result := channel.UnmarshalResult(responseMsg)
if result.Success {
logrus.Infof("event stream started: %v", result.Message)
} else {
return nil, errors.Wrap(err, "error starting event streaming")
}
} else {
return nil, errors.Errorf("unexpected response type %v", responseMsg.ContentType)
}
return s.join, nil
}
func (s *websocketSource) Stop() {
_ = s.ch.Close()
}
func (s *websocketSource) HandleReceive(msg *channel.Message, _ channel.Channel) {
s.events <- ZitiEventJson(msg.Body)
}