Merge pull request #261 from openziti/v0.4_metrics

v0.4.x New Metrics (#128)
This commit is contained in:
Michael Quigley 2023-03-09 14:28:53 -05:00 committed by GitHub
commit 3cb2e5b6e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 826 additions and 674 deletions

2
.gitignore vendored
View File

@ -4,6 +4,7 @@
*.db
automated-release-build
etc/dev.yml
etc/dev-metrics.yml
# Dependencies
/node_modules/
@ -28,3 +29,4 @@ npm-debug.log*
yarn-debug.log*
yarn-error.log*
.obsidian

View File

@ -1,6 +1,6 @@
# v0.4.0
Work in progress.
FEATURE: New metrics infrastructure based on OpenZiti usage events (https://github.com/openziti/zrok/issues/128). See the [v0.4 Metrics Guide](docs/guides/v0.4_metrics.md) for more information.
# v0.3.4

57
cmd/zrok/metrics.go Normal file
View File

@ -0,0 +1,57 @@
package main
import (
"github.com/michaelquigley/cf"
"github.com/openziti/zrok/controller/metrics"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"os"
"os/signal"
"syscall"
"time"
)
func init() {
rootCmd.AddCommand(newMetricsCommand().cmd)
}
type metricsCommand struct {
cmd *cobra.Command
}
func newMetricsCommand() *metricsCommand {
cmd := &cobra.Command{
Use: "metrics <configPath>",
Short: "Start a zrok metrics agent",
Args: cobra.ExactArgs(1),
}
command := &metricsCommand{cmd}
cmd.Run = command.run
return command
}
func (cmd *metricsCommand) run(_ *cobra.Command, args []string) {
cfg, err := metrics.LoadConfig(args[0])
if err != nil {
panic(err)
}
logrus.Infof(cf.Dump(cfg, metrics.GetCfOptions()))
ma, err := metrics.Run(cfg)
if err != nil {
panic(err)
}
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
ma.Stop()
ma.Join()
os.Exit(0)
}()
for {
time.Sleep(30 * time.Minute)
}
}

View File

@ -66,7 +66,7 @@ func (h *accessHandler) Handle(params share.AccessParams, principal *rest_model_
return share.NewAccessInternalServerError()
}
edge, err := edgeClient()
edge, err := zrokEdgeSdk.Client(cfg.Ziti)
if err != nil {
logrus.Error(err)
return share.NewAccessInternalServerError()

View File

@ -9,9 +9,6 @@ import (
"github.com/openziti/edge/rest_management_api_client/config"
"github.com/openziti/edge/rest_management_api_client/edge_router_policy"
"github.com/openziti/edge/rest_management_api_client/identity"
"github.com/openziti/edge/rest_management_api_client/service"
"github.com/openziti/edge/rest_management_api_client/service_edge_router_policy"
"github.com/openziti/edge/rest_management_api_client/service_policy"
"github.com/openziti/edge/rest_model"
rest_model_edge "github.com/openziti/edge/rest_model"
"github.com/openziti/sdk-golang/ziti"
@ -35,7 +32,7 @@ func Bootstrap(skipCtrl, skipFrontend bool, inCfg *Config) error {
}
logrus.Info("connecting to the ziti edge management api")
edge, err := edgeClient()
edge, err := zrokEdgeSdk.Client(cfg.Ziti)
if err != nil {
return errors.Wrap(err, "error connecting to the ziti edge management api")
}
@ -100,27 +97,6 @@ func Bootstrap(skipCtrl, skipFrontend bool, inCfg *Config) error {
return err
}
var metricsSvcZId string
if metricsSvcZId, err = assertMetricsService(cfg, edge); err != nil {
return err
}
if err := assertMetricsSerp(metricsSvcZId, cfg, edge); err != nil {
return err
}
if !skipCtrl {
if err := assertCtrlMetricsBind(ctrlZId, metricsSvcZId, edge); err != nil {
return err
}
}
if !skipFrontend {
if err := assertFrontendMetricsDial(frontendZId, metricsSvcZId, edge); err != nil {
return err
}
}
return nil
}
@ -243,105 +219,3 @@ func assertErpForIdentity(name, zId string, edge *rest_management_api_client.Zit
logrus.Infof("asserted erps for '%v' (%v)", name, zId)
return nil
}
func assertMetricsService(cfg *Config, edge *rest_management_api_client.ZitiEdgeManagement) (string, error) {
filter := fmt.Sprintf("name=\"%v\" and tags.zrok != null", cfg.Metrics.ServiceName)
limit := int64(0)
offset := int64(0)
listReq := &service.ListServicesParams{
Filter: &filter,
Limit: &limit,
Offset: &offset,
}
listReq.SetTimeout(30 * time.Second)
listResp, err := edge.Service.ListServices(listReq, nil)
if err != nil {
return "", errors.Wrapf(err, "error listing '%v' service", cfg.Metrics.ServiceName)
}
var svcZId string
if len(listResp.Payload.Data) != 1 {
logrus.Infof("creating '%v' service", cfg.Metrics.ServiceName)
svcZId, err = zrokEdgeSdk.CreateService("metrics", nil, nil, edge)
if err != nil {
return "", errors.Wrapf(err, "error creating '%v' service", cfg.Metrics.ServiceName)
}
} else {
svcZId = *listResp.Payload.Data[0].ID
}
logrus.Infof("asserted '%v' service (%v)", cfg.Metrics.ServiceName, svcZId)
return svcZId, nil
}
func assertMetricsSerp(metricsSvcZId string, cfg *Config, edge *rest_management_api_client.ZitiEdgeManagement) error {
filter := fmt.Sprintf("allOf(serviceRoles) = \"@%v\" and allOf(edgeRouterRoles) = \"#all\" and tags.zrok != null", metricsSvcZId)
limit := int64(0)
offset := int64(0)
listReq := &service_edge_router_policy.ListServiceEdgeRouterPoliciesParams{
Filter: &filter,
Limit: &limit,
Offset: &offset,
}
listReq.SetTimeout(30 * time.Second)
listResp, err := edge.ServiceEdgeRouterPolicy.ListServiceEdgeRouterPolicies(listReq, nil)
if err != nil {
return errors.Wrapf(err, "error listing '%v' serps", cfg.Metrics.ServiceName)
}
if len(listResp.Payload.Data) != 1 {
logrus.Infof("creating '%v' serp", cfg.Metrics.ServiceName)
_, err := zrokEdgeSdk.CreateServiceEdgeRouterPolicy(cfg.Metrics.ServiceName, metricsSvcZId, nil, edge)
if err != nil {
return errors.Wrapf(err, "error creating '%v' serp", cfg.Metrics.ServiceName)
}
}
logrus.Infof("asserted '%v' serp", cfg.Metrics.ServiceName)
return nil
}
func assertCtrlMetricsBind(ctrlZId, metricsSvcZId string, edge *rest_management_api_client.ZitiEdgeManagement) error {
filter := fmt.Sprintf("allOf(serviceRoles) = \"@%v\" and allOf(identityRoles) = \"@%v\" and type = 2 and tags.zrok != null", metricsSvcZId, ctrlZId)
limit := int64(0)
offset := int64(0)
listReq := &service_policy.ListServicePoliciesParams{
Filter: &filter,
Limit: &limit,
Offset: &offset,
}
listReq.SetTimeout(30 * time.Second)
listResp, err := edge.ServicePolicy.ListServicePolicies(listReq, nil)
if err != nil {
return errors.Wrapf(err, "error listing 'ctrl-metrics-bind' service policy")
}
if len(listResp.Payload.Data) != 1 {
logrus.Info("creating 'ctrl-metrics-bind' service policy")
if err = zrokEdgeSdk.CreateServicePolicyBind("ctrl-metrics-bind", metricsSvcZId, ctrlZId, nil, edge); err != nil {
return errors.Wrap(err, "error creating 'ctrl-metrics-bind' service policy")
}
}
logrus.Infof("asserted 'ctrl-metrics-bind' service policy")
return nil
}
func assertFrontendMetricsDial(frontendZId, metricsSvcZId string, edge *rest_management_api_client.ZitiEdgeManagement) error {
filter := fmt.Sprintf("allOf(serviceRoles) = \"@%v\" and allOf(identityRoles) = \"@%v\" and type = 1 and tags.zrok != null", metricsSvcZId, frontendZId)
limit := int64(0)
offset := int64(0)
listReq := &service_policy.ListServicePoliciesParams{
Filter: &filter,
Limit: &limit,
Offset: &offset,
}
listReq.SetTimeout(30 * time.Second)
listResp, err := edge.ServicePolicy.ListServicePolicies(listReq, nil)
if err != nil {
return errors.Wrapf(err, "error listing 'frontend-metrics-dial' service policy")
}
if len(listResp.Payload.Data) != 1 {
logrus.Info("creating 'frontend-metrics-dial' service policy")
if err = zrokEdgeSdk.CreateServicePolicyDial("frontend-metrics-dial", metricsSvcZId, []string{frontendZId}, nil, edge); err != nil {
return errors.Wrap(err, "error creating 'frontend-metrics-dial' service policy")
}
}
logrus.Infof("asserted 'frontend-metrics-dial' service policy")
return nil
}

View File

@ -1,6 +1,7 @@
package controller
import (
"github.com/openziti/zrok/controller/zrokEdgeSdk"
"time"
"github.com/michaelquigley/cf"
@ -18,11 +19,10 @@ type Config struct {
Influx *InfluxConfig
Limits *LimitsConfig
Maintenance *MaintenanceConfig
Metrics *MetricsConfig
Registration *RegistrationConfig
ResetPassword *ResetPasswordConfig
Store *store.Config
Ziti *ZitiConfig
Ziti *zrokEdgeSdk.ZitiConfig
}
type AdminConfig struct {
@ -52,16 +52,6 @@ type ResetPasswordConfig struct {
ResetUrlTemplate string
}
type ZitiConfig struct {
ApiEndpoint string
Username string
Password string `cf:"+secret"`
}
type MetricsConfig struct {
ServiceName string
}
type InfluxConfig struct {
Url string
Bucket string
@ -99,9 +89,6 @@ func DefaultConfig() *Config {
Environments: Unlimited,
Shares: Unlimited,
},
Metrics: &MetricsConfig{
ServiceName: "metrics",
},
Maintenance: &MaintenanceConfig{
ResetPassword: &ResetPasswordMaintenanceConfig{
ExpirationTimeout: time.Minute * 15,

View File

@ -15,7 +15,6 @@ import (
var cfg *Config
var str *store.Store
var mtr *metricsAgent
var idb influxdb2.Client
func Run(inCfg *Config) error {
@ -67,15 +66,6 @@ func Run(inCfg *Config) error {
idb = influxdb2.NewClient(cfg.Influx.Url, cfg.Influx.Token)
}
if cfg.Metrics != nil {
mtr = newMetricsAgent()
go mtr.run()
defer func() {
mtr.stop()
mtr.join()
}()
}
ctx, cancel := context.WithCancel(context.Background())
defer func() {
cancel()

View File

@ -25,7 +25,7 @@ func (h *createFrontendHandler) Handle(params admin.CreateFrontendParams, princi
return admin.NewCreateFrontendUnauthorized()
}
client, err := edgeClient()
client, err := zrokEdgeSdk.Client(cfg.Ziti)
if err != nil {
logrus.Errorf("error getting edge client: %v", err)
return admin.NewCreateFrontendInternalServerError()

View File

@ -3,15 +3,12 @@ package controller
import (
"bytes"
"encoding/json"
"fmt"
"github.com/go-openapi/runtime/middleware"
"github.com/openziti/edge/rest_management_api_client/service"
rest_model_edge "github.com/openziti/edge/rest_model"
"github.com/openziti/zrok/controller/zrokEdgeSdk"
"github.com/openziti/zrok/rest_model_zrok"
"github.com/openziti/zrok/rest_server_zrok/operations/admin"
"github.com/sirupsen/logrus"
"time"
)
type createIdentityHandler struct{}
@ -28,7 +25,7 @@ func (h *createIdentityHandler) Handle(params admin.CreateIdentityParams, princi
return admin.NewCreateIdentityUnauthorized()
}
edge, err := edgeClient()
edge, err := zrokEdgeSdk.Client(cfg.Ziti)
if err != nil {
logrus.Errorf("error getting edge client: %v", err)
return admin.NewCreateIdentityInternalServerError()
@ -52,32 +49,6 @@ func (h *createIdentityHandler) Handle(params admin.CreateIdentityParams, princi
return admin.NewCreateIdentityInternalServerError()
}
filter := fmt.Sprintf("name=\"%v\" and tags.zrok != null", cfg.Metrics.ServiceName)
limit := int64(0)
offset := int64(0)
listSvcReq := &service.ListServicesParams{
Filter: &filter,
Limit: &limit,
Offset: &offset,
}
listSvcReq.SetTimeout(30 * time.Second)
listSvcResp, err := edge.Service.ListServices(listSvcReq, nil)
if err != nil {
logrus.Errorf("error listing metrics service: %v", err)
return admin.NewCreateIdentityInternalServerError()
}
if len(listSvcResp.Payload.Data) != 1 {
logrus.Errorf("could not find metrics service")
return admin.NewCreateIdentityInternalServerError()
}
svcZId := *listSvcResp.Payload.Data[0].ID
spName := fmt.Sprintf("%v-%v-dial", name, cfg.Metrics.ServiceName)
if err := zrokEdgeSdk.CreateServicePolicyDial(spName, svcZId, []string{zId}, nil, edge); err != nil {
logrus.Errorf("error creating named dial service policy '%v': %v", spName, err)
return admin.NewCreateIdentityInternalServerError()
}
var out bytes.Buffer
enc := json.NewEncoder(&out)
enc.SetEscapeHTML(false)

View File

@ -36,7 +36,7 @@ func (h *disableHandler) Handle(params environment.DisableParams, principal *res
logrus.Errorf("error getting environment for user '%v': %v", principal.Email, err)
return environment.NewDisableInternalServerError()
}
edge, err := edgeClient()
edge, err := zrokEdgeSdk.Client(cfg.Ziti)
if err != nil {
logrus.Errorf("error getting edge client for user '%v': %v", principal.Email, err)
return environment.NewDisableInternalServerError()

View File

@ -35,7 +35,7 @@ func (h *enableHandler) Handle(params environment.EnableParams, principal *rest_
return environment.NewEnableUnauthorized()
}
client, err := edgeClient()
client, err := zrokEdgeSdk.Client(cfg.Ziti)
if err != nil {
logrus.Errorf("error getting edge client for user '%v': %v", principal.Email, err)
return environment.NewEnableInternalServerError()

View File

@ -28,7 +28,7 @@ func GC(inCfg *Config) error {
logrus.Errorf("error closing store: %v", err)
}
}()
edge, err := edgeClient()
edge, err := zrokEdgeSdk.Client(cfg.Ziti)
if err != nil {
return err
}

View File

@ -1,181 +0,0 @@
package controller
import (
"bytes"
"context"
"fmt"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/write"
"github.com/openziti/sdk-golang/ziti"
"github.com/openziti/sdk-golang/ziti/config"
"github.com/openziti/sdk-golang/ziti/edge"
"github.com/openziti/zrok/model"
"github.com/openziti/zrok/util"
"github.com/openziti/zrok/zrokdir"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"gopkg.in/mgo.v2/bson"
"net"
"time"
)
type metricsAgent struct {
writeApi api.WriteAPIBlocking
metricsQueue chan *model.Metrics
envCache map[string]*envCacheEntry
zCtx ziti.Context
zListener edge.Listener
shutdown chan struct{}
joined chan struct{}
}
type envCacheEntry struct {
env string
lastAccess time.Time
}
func newMetricsAgent() *metricsAgent {
ma := &metricsAgent{
metricsQueue: make(chan *model.Metrics, 1024),
envCache: make(map[string]*envCacheEntry),
shutdown: make(chan struct{}),
joined: make(chan struct{}),
}
if idb != nil {
ma.writeApi = idb.WriteAPIBlocking(cfg.Influx.Org, cfg.Influx.Bucket)
}
return ma
}
func (ma *metricsAgent) run() {
logrus.Info("starting")
defer logrus.Info("exiting")
defer close(ma.joined)
if err := ma.bindService(); err != nil {
logrus.Errorf("error binding metrics service: %v", err)
return
}
work:
for {
select {
case <-ma.shutdown:
break work
case m := <-ma.metricsQueue:
if err := ma.processMetrics(m); err != nil {
logrus.Errorf("error processing metrics: %v", err)
}
}
}
if err := ma.zListener.Close(); err != nil {
logrus.Errorf("error closing metrics service listener: %v", err)
}
}
func (ma *metricsAgent) bindService() error {
zif, err := zrokdir.ZitiIdentityFile("ctrl")
if err != nil {
return errors.Wrap(err, "error getting 'ctrl' identity")
}
zCfg, err := config.NewFromFile(zif)
if err != nil {
return errors.Wrap(err, "error loading 'ctrl' identity")
}
ma.zCtx = ziti.NewContextWithConfig(zCfg)
opts := &ziti.ListenOptions{
ConnectTimeout: 5 * time.Minute,
MaxConnections: 1024,
}
ma.zListener, err = ma.zCtx.ListenWithOptions(cfg.Metrics.ServiceName, opts)
if err != nil {
return errors.Wrapf(err, "error listening for metrics on '%v'", cfg.Metrics.ServiceName)
}
go ma.listen()
return nil
}
func (ma *metricsAgent) listen() {
logrus.Info("started")
defer logrus.Info("exited")
for {
conn, err := ma.zListener.Accept()
if err != nil {
logrus.Errorf("error accepting: %v", err)
return
}
logrus.Debugf("accepted metrics connetion from '%v'", conn.RemoteAddr())
go newMetricsHandler(conn, ma.metricsQueue).run()
}
}
func (ma *metricsAgent) processMetrics(m *model.Metrics) error {
var pts []*write.Point
if len(m.Sessions) > 0 {
out := "metrics = {\n"
for k, v := range m.Sessions {
if ma.writeApi != nil {
pt := influxdb2.NewPoint("xfer",
map[string]string{"namespace": m.Namespace, "share": k},
map[string]interface{}{"bytesRead": v.BytesRead, "bytesWritten": v.BytesWritten},
time.UnixMilli(v.LastUpdate))
pts = append(pts, pt)
}
out += fmt.Sprintf("\t[%v.%v]: %v/%v (%v)\n", m.Namespace, k, util.BytesToSize(v.BytesRead), util.BytesToSize(v.BytesWritten), time.Since(time.UnixMilli(v.LastUpdate)))
}
out += "}"
logrus.Info(out)
}
if len(pts) > 0 {
if err := ma.writeApi.WritePoint(context.Background(), pts...); err == nil {
logrus.Debugf("wrote metrics to influx")
} else {
return err
}
}
return nil
}
func (ma *metricsAgent) stop() {
close(ma.shutdown)
}
func (ma *metricsAgent) join() {
<-ma.joined
}
type metricsHandler struct {
conn net.Conn
metricsQueue chan *model.Metrics
}
func newMetricsHandler(conn net.Conn, metricsQueue chan *model.Metrics) *metricsHandler {
return &metricsHandler{conn, metricsQueue}
}
func (mh *metricsHandler) run() {
logrus.Debugf("handling metrics connection: %v", mh.conn.RemoteAddr())
var mtrBuf bytes.Buffer
buf := make([]byte, 4096)
for {
n, err := mh.conn.Read(buf)
if err != nil {
break
}
mtrBuf.Write(buf[:n])
}
if err := mh.conn.Close(); err != nil {
logrus.Errorf("error closing metrics connection")
}
m := &model.Metrics{}
if err := bson.Unmarshal(mtrBuf.Bytes(), &m); err == nil {
mh.metricsQueue <- m
} else {
logrus.Errorf("error unmarshaling metrics: %v", err)
}
}

View File

@ -0,0 +1,73 @@
package metrics
import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type MetricsAgent struct {
src Source
cache *shareCache
join chan struct{}
}
func Run(cfg *Config) (*MetricsAgent, error) {
logrus.Info("starting")
if cfg.Store == nil {
return nil, errors.New("no 'store' configured; exiting")
}
cache, err := newShareCache(cfg.Store)
if err != nil {
return nil, errors.Wrap(err, "error creating share cache")
}
if cfg.Source == nil {
return nil, errors.New("no 'source' configured; exiting")
}
src, ok := cfg.Source.(Source)
if !ok {
return nil, errors.New("invalid 'source'; exiting")
}
if cfg.Influx == nil {
return nil, errors.New("no 'influx' configured; exiting")
}
idb := openInfluxDb(cfg.Influx)
events := make(chan map[string]interface{})
join, err := src.Start(events)
if err != nil {
return nil, errors.Wrap(err, "error starting source")
}
go func() {
for {
select {
case event := <-events:
usage := Ingest(event)
if shrToken, err := cache.getToken(usage.ZitiServiceId); err == nil {
usage.ShareToken = shrToken
if err := idb.Write(usage); err != nil {
logrus.Error(err)
}
} else {
logrus.Error(err)
}
}
}
}()
return &MetricsAgent{src: src, join: join}, nil
}
func (ma *MetricsAgent) Stop() {
logrus.Info("stopping")
ma.src.Stop()
}
func (ma *MetricsAgent) Join() {
<-ma.join
}

10
controller/metrics/cf.go Normal file
View File

@ -0,0 +1,10 @@
package metrics
import "github.com/michaelquigley/cf"
func GetCfOptions() *cf.Options {
opts := cf.DefaultOptions()
opts.AddFlexibleSetter("file", loadFileSourceConfig)
opts.AddFlexibleSetter("websocket", loadWebsocketSourceConfig)
return opts
}

View File

@ -0,0 +1,28 @@
package metrics
import (
"github.com/michaelquigley/cf"
"github.com/openziti/zrok/controller/store"
"github.com/pkg/errors"
)
type Config struct {
Source interface{}
Influx *InfluxConfig
Store *store.Config
}
type InfluxConfig struct {
Url string
Bucket string
Org string
Token string `cf:"+secret"`
}
func LoadConfig(path string) (*Config, error) {
cfg := &Config{}
if err := cf.BindYaml(cfg, path, GetCfOptions()); err != nil {
return nil, errors.Wrapf(err, "error loading config from '%v'", path)
}
return cfg, nil
}

View File

@ -0,0 +1,107 @@
package metrics
import (
"encoding/binary"
"encoding/json"
"github.com/michaelquigley/cf"
"github.com/nxadm/tail"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"os"
)
type FileSourceConfig struct {
Path string
IndexPath string
}
func loadFileSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) {
if submap, ok := v.(map[string]interface{}); ok {
cfg := &FileSourceConfig{}
if err := cf.Bind(cfg, submap, cf.DefaultOptions()); err != nil {
return nil, err
}
return &fileSource{cfg: cfg}, nil
}
return nil, errors.New("invalid config structure for 'file' source")
}
type fileSource struct {
cfg *FileSourceConfig
t *tail.Tail
}
func (s *fileSource) Start(events chan map[string]interface{}) (join chan struct{}, err error) {
f, err := os.Open(s.cfg.Path)
if err != nil {
return nil, errors.Wrapf(err, "error opening '%v'", s.cfg.Path)
}
_ = f.Close()
idxF, err := os.OpenFile(s.indexPath(), os.O_CREATE|os.O_RDWR, os.ModePerm)
if err != nil {
return nil, errors.Wrapf(err, "error opening '%v'", s.indexPath())
}
pos := int64(0)
posBuf := make([]byte, 8)
if n, err := idxF.Read(posBuf); err == nil && n == 8 {
pos = int64(binary.LittleEndian.Uint64(posBuf))
logrus.Infof("recovered stored position: %d", pos)
}
join = make(chan struct{})
go func() {
s.tail(pos, events, idxF)
close(join)
}()
return join, nil
}
func (s *fileSource) Stop() {
if err := s.t.Stop(); err != nil {
logrus.Error(err)
}
}
func (s *fileSource) tail(pos int64, events chan map[string]interface{}, idxF *os.File) {
logrus.Infof("started")
defer logrus.Infof("stopped")
posBuf := make([]byte, 8)
var err error
s.t, err = tail.TailFile(s.cfg.Path, tail.Config{
ReOpen: true,
Follow: true,
Location: &tail.SeekInfo{Offset: pos},
})
if err != nil {
logrus.Error(err)
return
}
for line := range s.t.Lines {
event := make(map[string]interface{})
if err := json.Unmarshal([]byte(line.Text), &event); err == nil {
binary.LittleEndian.PutUint64(posBuf, uint64(line.SeekInfo.Offset))
if n, err := idxF.Seek(0, 0); err == nil && n == 0 {
if n, err := idxF.Write(posBuf); err != nil || n != 8 {
logrus.Errorf("error writing index (%d): %v", n, err)
}
}
events <- event
} else {
logrus.Errorf("error parsing line #%d: %v", line.Num, err)
}
}
}
func (s *fileSource) indexPath() string {
if s.cfg.IndexPath == "" {
return s.cfg.Path + ".idx"
} else {
return s.cfg.IndexPath
}
}

View File

@ -0,0 +1,32 @@
package metrics
import (
"context"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/sirupsen/logrus"
)
type influxDb struct {
idb influxdb2.Client
writeApi api.WriteAPIBlocking
}
func openInfluxDb(cfg *InfluxConfig) *influxDb {
idb := influxdb2.NewClient(cfg.Url, cfg.Token)
wapi := idb.WriteAPIBlocking(cfg.Org, cfg.Bucket)
return &influxDb{idb, wapi}
}
func (i *influxDb) Write(u *Usage) error {
pt := influxdb2.NewPoint("xfer",
map[string]string{"namespace": "backend", "share": u.ShareToken},
map[string]interface{}{"bytesRead": u.BackendRx, "bytesWritten": u.BackendTx},
u.IntervalStart)
if err := i.writeApi.WritePoint(context.Background(), pt); err == nil {
logrus.Infof("share: %v, circuit: %v, rx: %d, tx: %d", u.ShareToken, u.ZitiCircuitId, u.BackendRx, u.BackendTx)
} else {
return err
}
return nil
}

View File

@ -0,0 +1,41 @@
package metrics
import (
"fmt"
"github.com/openziti/zrok/util"
"time"
)
type Usage struct {
ProcessedStamp time.Time
IntervalStart time.Time
ZitiServiceId string
ZitiCircuitId string
ShareToken string
FrontendTx int64
FrontendRx int64
BackendTx int64
BackendRx int64
}
func (u Usage) String() string {
out := "Usage {"
out += fmt.Sprintf("processed '%v'", u.ProcessedStamp)
out += ", " + fmt.Sprintf("interval '%v'", u.IntervalStart)
out += ", " + fmt.Sprintf("service '%v'", u.ZitiServiceId)
out += ", " + fmt.Sprintf("circuit '%v'", u.ZitiCircuitId)
out += ", " + fmt.Sprintf("share '%v'", u.ShareToken)
out += ", " + fmt.Sprintf("fe {rx %v, tx %v}", util.BytesToSize(u.FrontendRx), util.BytesToSize(u.FrontendTx))
out += ", " + fmt.Sprintf("be {rx %v, tx %v}", util.BytesToSize(u.BackendRx), util.BytesToSize(u.BackendTx))
out += "}"
return out
}
type Source interface {
Start(chan map[string]interface{}) (chan struct{}, error)
Stop()
}
type Ingester interface {
Ingest(msg map[string]interface{}) error
}

View File

@ -0,0 +1,31 @@
package metrics
import (
"github.com/openziti/zrok/controller/store"
"github.com/pkg/errors"
)
type shareCache struct {
str *store.Store
}
func newShareCache(cfg *store.Config) (*shareCache, error) {
str, err := store.Open(cfg)
if err != nil {
return nil, errors.Wrap(err, "error opening store")
}
return &shareCache{str}, nil
}
func (sc *shareCache) getToken(svcZId string) (string, error) {
tx, err := sc.str.Begin()
if err != nil {
return "", err
}
defer func() { _ = tx.Rollback() }()
shr, err := sc.str.FindShareWithZId(svcZId, tx)
if err != nil {
return "", err
}
return shr.Token, nil
}

View File

@ -0,0 +1,95 @@
package metrics
import (
"github.com/sirupsen/logrus"
"reflect"
"time"
)
func Ingest(event map[string]interface{}) *Usage {
u := &Usage{ProcessedStamp: time.Now()}
if ns, found := event["namespace"]; found && ns == "fabric.usage" {
if v, found := event["interval_start_utc"]; found {
if vFloat64, ok := v.(float64); ok {
u.IntervalStart = time.Unix(int64(vFloat64), 0)
} else {
logrus.Error("unable to assert 'interval_start_utc'")
}
} else {
logrus.Error("missing 'interval_start_utc'")
}
if v, found := event["tags"]; found {
if tags, ok := v.(map[string]interface{}); ok {
if v, found := tags["serviceId"]; found {
if vStr, ok := v.(string); ok {
u.ZitiServiceId = vStr
} else {
logrus.Error("unable to assert 'tags/serviceId'")
}
} else {
logrus.Error("missing 'tags/serviceId'")
}
} else {
logrus.Errorf("unable to assert 'tags'")
}
} else {
logrus.Errorf("missing 'tags'")
}
if v, found := event["usage"]; found {
if usage, ok := v.(map[string]interface{}); ok {
if v, found := usage["ingress.tx"]; found {
if vFloat64, ok := v.(float64); ok {
u.FrontendTx = int64(vFloat64)
} else {
logrus.Error("unable to assert 'usage/ingress.tx'")
}
} else {
logrus.Warn("missing 'usage/ingress.tx'")
}
if v, found := usage["ingress.rx"]; found {
if vFloat64, ok := v.(float64); ok {
u.FrontendRx = int64(vFloat64)
} else {
logrus.Error("unable to assert 'usage/ingress.rx")
}
} else {
logrus.Warn("missing 'usage/ingress.rx")
}
if v, found := usage["egress.tx"]; found {
if vFloat64, ok := v.(float64); ok {
u.BackendTx = int64(vFloat64)
} else {
logrus.Error("unable to assert 'usage/egress.tx'")
}
} else {
logrus.Warn("missing 'usage/egress.tx'")
}
if v, found := usage["egress.rx"]; found {
if vFloat64, ok := v.(float64); ok {
u.BackendRx = int64(vFloat64)
} else {
logrus.Error("unable to assert 'usage/egress.rx'")
}
} else {
logrus.Warn("missing 'usage/egress.rx'")
}
} else {
logrus.Errorf("unable to assert 'usage' (%v) %v", reflect.TypeOf(v), event)
}
} else {
logrus.Warnf("missing 'usage'")
}
if v, found := event["circuit_id"]; found {
if vStr, ok := v.(string); ok {
u.ZitiCircuitId = vStr
} else {
logrus.Error("unable to assert 'circuit_id'")
}
} else {
logrus.Warn("missing 'circuit_id'")
}
} else {
logrus.Errorf("not 'fabric.usage'")
}
return u
}

View File

@ -0,0 +1,162 @@
package metrics
import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
"github.com/gorilla/websocket"
"github.com/michaelquigley/cf"
"github.com/openziti/channel/v2"
"github.com/openziti/channel/v2/websockets"
"github.com/openziti/edge/rest_util"
"github.com/openziti/fabric/event"
"github.com/openziti/fabric/pb/mgmt_pb"
"github.com/openziti/identity"
"github.com/openziti/sdk-golang/ziti/constants"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"io"
"net/http"
"net/url"
"time"
)
type WebsocketSourceConfig struct {
WebsocketEndpoint string
ApiEndpoint string
Username string
Password string
}
func loadWebsocketSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) {
if submap, ok := v.(map[string]interface{}); ok {
cfg := &WebsocketSourceConfig{}
if err := cf.Bind(cfg, submap, cf.DefaultOptions()); err != nil {
return nil, err
}
return &websocketSource{cfg: cfg}, nil
}
return nil, errors.New("invalid config structure for 'websocket' source")
}
type websocketSource struct {
cfg *WebsocketSourceConfig
ch channel.Channel
events chan map[string]interface{}
join chan struct{}
}
func (s *websocketSource) Start(events chan map[string]interface{}) (chan struct{}, error) {
caCerts, err := rest_util.GetControllerWellKnownCas(s.cfg.ApiEndpoint)
if err != nil {
return nil, err
}
caPool := x509.NewCertPool()
for _, ca := range caCerts {
caPool.AddCert(ca)
}
authenticator := rest_util.NewAuthenticatorUpdb(s.cfg.Username, s.cfg.Password)
authenticator.RootCas = caPool
apiEndpointUrl, err := url.Parse(s.cfg.ApiEndpoint)
if err != nil {
return nil, err
}
apiSession, err := authenticator.Authenticate(apiEndpointUrl)
if err != nil {
return nil, err
}
dialer := &websocket.Dialer{
TLSClientConfig: &tls.Config{
RootCAs: caPool,
},
HandshakeTimeout: 5 * time.Second,
}
conn, resp, err := dialer.Dial(s.cfg.WebsocketEndpoint, http.Header{constants.ZitiSession: []string{*apiSession.Token}})
if err != nil {
if resp != nil {
if body, rerr := io.ReadAll(resp.Body); rerr == nil {
logrus.Errorf("response body '%v': %v", string(body), err)
}
} else {
logrus.Errorf("no response from websocket dial: %v", err)
}
}
id := &identity.TokenId{Token: "mgmt"}
underlayFactory := websockets.NewUnderlayFactory(id, conn, nil)
s.join = make(chan struct{})
s.events = events
bindHandler := func(binding channel.Binding) error {
binding.AddReceiveHandler(int32(mgmt_pb.ContentType_StreamEventsEventType), s)
binding.AddCloseHandler(channel.CloseHandlerF(func(ch channel.Channel) {
close(s.join)
}))
return nil
}
s.ch, err = channel.NewChannel("mgmt", underlayFactory, channel.BindHandlerF(bindHandler), nil)
if err != nil {
return nil, err
}
streamEventsRequest := map[string]interface{}{}
streamEventsRequest["format"] = "json"
streamEventsRequest["subscriptions"] = []*event.Subscription{
{
Type: "fabric.usage",
Options: map[string]interface{}{
"version": uint8(3),
},
},
}
msgBytes, err := json.Marshal(streamEventsRequest)
if err != nil {
return nil, err
}
requestMsg := channel.NewMessage(int32(mgmt_pb.ContentType_StreamEventsRequestType), msgBytes)
responseMsg, err := requestMsg.WithTimeout(5 * time.Second).SendForReply(s.ch)
if err != nil {
return nil, err
}
if responseMsg.ContentType == channel.ContentTypeResultType {
result := channel.UnmarshalResult(responseMsg)
if result.Success {
logrus.Infof("event stream started: %v", result.Message)
} else {
return nil, errors.Wrap(err, "error starting event streaming")
}
} else {
return nil, errors.Errorf("unexpected response type %v", responseMsg.ContentType)
}
return s.join, nil
}
func (s *websocketSource) Stop() {
_ = s.ch.Close()
}
func (s *websocketSource) HandleReceive(msg *channel.Message, _ channel.Channel) {
decoder := json.NewDecoder(bytes.NewReader(msg.Body))
for {
ev := make(map[string]interface{})
err := decoder.Decode(&ev)
if err == io.EOF {
break
}
if err == nil {
s.events <- ev
} else {
logrus.Errorf("error parsing '%v': %v", string(msg.Body), err)
}
}
}

View File

@ -4,6 +4,7 @@ import (
"github.com/go-openapi/runtime/middleware"
"github.com/jmoiron/sqlx"
"github.com/openziti/zrok/controller/store"
"github.com/openziti/zrok/controller/zrokEdgeSdk"
"github.com/openziti/zrok/rest_model_zrok"
"github.com/openziti/zrok/rest_server_zrok/operations/share"
"github.com/pkg/errors"
@ -55,7 +56,7 @@ func (h *shareHandler) Handle(params share.ShareParams, principal *rest_model_zr
return share.NewShareUnauthorized()
}
edge, err := edgeClient()
edge, err := zrokEdgeSdk.Client(cfg.Ziti)
if err != nil {
logrus.Error(err)
return share.NewShareInternalServerError()

View File

@ -45,6 +45,7 @@ func (h *shareDetailHandler) Handle(params metadata.GetShareDetailParams, princi
var sparkData map[string][]int64
if cfg.Influx != nil {
sparkData, err = sparkDataForShares([]*store.Share{shr})
logrus.Info(sparkData)
if err != nil {
logrus.Errorf("error querying spark data for share: %v", err)
}

View File

@ -49,7 +49,7 @@ func sparkFluxQuery(shrs []*store.Share) string {
"|> range(start: -5m)" +
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")" +
"|> filter(fn: (r) => r[\"_field\"] == \"bytesRead\" or r[\"_field\"] == \"bytesWritten\")" +
"|> filter(fn: (r) => r[\"namespace\"] == \"frontend\")" +
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")" +
shrFilter +
"|> aggregateWindow(every: 5s, fn: sum, createEmpty: true)\n" +
"|> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" +

View File

@ -5,6 +5,7 @@ import (
"fmt"
"github.com/openziti/edge/rest_management_api_client"
"github.com/openziti/edge/rest_management_api_client/config"
"github.com/openziti/zrok/controller/zrokEdgeSdk"
"github.com/openziti/zrok/model"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@ -23,7 +24,7 @@ func controllerStartup() error {
func inspectZiti() error {
logrus.Infof("inspecting ziti controller configuration")
edge, err := edgeClient()
edge, err := zrokEdgeSdk.Client(cfg.Ziti)
if err != nil {
return errors.Wrap(err, "error getting ziti edge client")
}

View File

@ -62,6 +62,14 @@ func (self *Store) FindShareWithToken(shrToken string, tx *sqlx.Tx) (*Share, err
return shr, nil
}
func (self *Store) FindShareWithZId(zId string, tx *sqlx.Tx) (*Share, error) {
shr := &Share{}
if err := tx.QueryRowx("select * from shares where z_id = $1", zId).StructScan(shr); err != nil {
return nil, errors.Wrap(err, "error selecting share by z_id")
}
return shr, nil
}
func (self *Store) FindSharesForEnvironment(envId int, tx *sqlx.Tx) ([]*Share, error) {
rows, err := tx.Queryx("select shares.* from shares where environment_id = $1", envId)
if err != nil {

View File

@ -29,7 +29,7 @@ func (h *unaccessHandler) Handle(params share.UnaccessParams, principal *rest_mo
}
defer func() { _ = tx.Rollback() }()
edge, err := edgeClient()
edge, err := zrokEdgeSdk.Client(cfg.Ziti)
if err != nil {
logrus.Error(err)
return share.NewUnaccessInternalServerError()

View File

@ -29,7 +29,7 @@ func (h *unshareHandler) Handle(params share.UnshareParams, principal *rest_mode
}
defer func() { _ = tx.Rollback() }()
edge, err := edgeClient()
edge, err := zrokEdgeSdk.Client(cfg.Ziti)
if err != nil {
logrus.Errorf("error getting edge client for '%v': %v", principal.Email, err)
return share.NewUnshareInternalServerError()

View File

@ -1,11 +1,8 @@
package controller
import (
"crypto/x509"
errors2 "github.com/go-openapi/errors"
"github.com/jaevor/go-nanoid"
"github.com/openziti/edge/rest_management_api_client"
"github.com/openziti/edge/rest_util"
"github.com/openziti/zrok/rest_model_zrok"
"github.com/sirupsen/logrus"
"net/http"
@ -56,18 +53,6 @@ func (za *zrokAuthenticator) authenticate(token string) (*rest_model_zrok.Princi
}
}
func edgeClient() (*rest_management_api_client.ZitiEdgeManagement, error) {
caCerts, err := rest_util.GetControllerWellKnownCas(cfg.Ziti.ApiEndpoint)
if err != nil {
return nil, err
}
caPool := x509.NewCertPool()
for _, ca := range caCerts {
caPool.AddCert(ca)
}
return rest_util.NewEdgeManagementClientWithUpdb(cfg.Ziti.Username, cfg.Ziti.Password, cfg.Ziti.ApiEndpoint, caPool)
}
func createShareToken() (string, error) {
gen, err := nanoid.CustomASCII("abcdefghijklmnopqrstuvwxyz0123456789", 12)
if err != nil {

View File

@ -0,0 +1,25 @@
package zrokEdgeSdk
import (
"crypto/x509"
"github.com/openziti/edge/rest_management_api_client"
"github.com/openziti/edge/rest_util"
)
type ZitiConfig struct {
ApiEndpoint string
Username string
Password string `cf:"+secret"`
}
func Client(cfg *ZitiConfig) (*rest_management_api_client.ZitiEdgeManagement, error) {
caCerts, err := rest_util.GetControllerWellKnownCas(cfg.ApiEndpoint)
if err != nil {
return nil, err
}
caPool := x509.NewCertPool()
for _, ca := range caCerts {
caPool.AddCert(ca)
}
return rest_util.NewEdgeManagementClientWithUpdb(cfg.Username, cfg.Password, cfg.ApiEndpoint, caPool)
}

View File

@ -2,6 +2,7 @@ package zrokEdgeSdk
import (
"context"
"fmt"
"github.com/openziti/edge/rest_management_api_client"
edge_service "github.com/openziti/edge/rest_management_api_client/service"
"github.com/openziti/edge/rest_model"
@ -10,6 +11,27 @@ import (
"time"
)
func FindShareService(svcZId string, edge *rest_management_api_client.ZitiEdgeManagement) (string, error) {
filter := fmt.Sprintf("id=\"%v\"", svcZId)
limit := int64(0)
offset := int64(0)
listReq := &edge_service.ListServicesParams{
Filter: &filter,
Limit: &limit,
Offset: &offset,
Context: context.Background(),
}
listReq.SetTimeout(30 * time.Second)
listResp, err := edge.Service.ListServices(listReq, nil)
if err != nil {
return "", errors.Wrapf(err, "error listing service '%v'", svcZId)
}
if len(listResp.Payload.Data) == 1 {
return *listResp.Payload.Data[0].Name, nil
}
return "", errors.Errorf("service with ziti id '%v' not found", svcZId)
}
func CreateShareService(envZId, shrToken, cfgZId string, edge *rest_management_api_client.ZitiEdgeManagement) (shrZId string, err error) {
shrZId, err = CreateService(shrToken, []string{cfgZId}, map[string]interface{}{"zrokShareToken": shrToken}, edge)
if err != nil {

View File

@ -0,0 +1,31 @@
`v0.4` includes a new metrics infrastructure based on OpenZiti usage, which provides `zrok` with telemetry used to power end-user intelligence about shares, and also to power usage-based limits.
# Configuration
This requires a version of OpenZiti with a `fabric` dependency of `v0.22.52` or newer.
## controller configuration:
```
network:
intervalAgeThreshold: 5s
metricsReportInterval: 5s
events:
jsonLogger:
subscriptions:
- type: fabric.usage
version: 3
handler:
type: file
format: json
path: /tmp/fabric-usage.log
```
## router configuration:
```
metrics:
reportInterval: 5s
intervalAgeThreshold: 5s
```

View File

@ -1,7 +1,6 @@
package proxyBackend
import (
"context"
"crypto/tls"
"fmt"
"github.com/openziti/sdk-golang/ziti"
@ -11,7 +10,6 @@ import (
"github.com/openziti/zrok/util"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"net"
"net/http"
"net/http/httputil"
"net/url"
@ -79,7 +77,6 @@ func newReverseProxy(cfg *Config) (*httputil.ReverseProxy, error) {
}
tpt := http.DefaultTransport.(*http.Transport).Clone()
tpt.DialContext = metricsDial
if cfg.Insecure {
tpt.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
}
@ -105,11 +102,3 @@ func newReverseProxy(cfg *Config) (*httputil.ReverseProxy, error) {
return proxy, nil
}
func metricsDial(_ context.Context, network string, addr string) (net.Conn, error) {
conn, err := net.Dial(network, addr)
if err != nil {
return conn, err
}
return newMetricsConn("backend", conn), nil
}

View File

@ -1,49 +0,0 @@
package proxyBackend
import (
"net"
"time"
)
type metricsConn struct {
id string
conn net.Conn
}
func newMetricsConn(id string, conn net.Conn) *metricsConn {
return &metricsConn{id, conn}
}
func (mc *metricsConn) Read(b []byte) (n int, err error) {
n, err = mc.conn.Read(b)
return n, err
}
func (mc *metricsConn) Write(b []byte) (n int, err error) {
n, err = mc.conn.Write(b)
return n, err
}
func (mc *metricsConn) Close() error {
return mc.conn.Close()
}
func (mc *metricsConn) LocalAddr() net.Addr {
return mc.conn.LocalAddr()
}
func (mc *metricsConn) RemoteAddr() net.Addr {
return mc.conn.RemoteAddr()
}
func (mc *metricsConn) SetDeadline(t time.Time) error {
return mc.conn.SetDeadline(t)
}
func (mc *metricsConn) SetReadDeadline(t time.Time) error {
return mc.conn.SetReadDeadline(t)
}
func (mc *metricsConn) SetWriteDeadline(t time.Time) error {
return mc.conn.SetWriteDeadline(t)
}

View File

@ -3,29 +3,18 @@ package publicFrontend
import (
"github.com/michaelquigley/cf"
"github.com/pkg/errors"
"time"
)
type Config struct {
Identity string
Metrics *MetricsConfig
Address string
HostMatch string
}
type MetricsConfig struct {
Service string
SendTimeout time.Duration
}
func DefaultConfig() *Config {
return &Config{
Identity: "frontend",
Metrics: &MetricsConfig{
Service: "metrics",
SendTimeout: 5 * time.Second,
},
Address: "0.0.0.0:8080",
Address: "0.0.0.0:8080",
}
}

View File

@ -24,16 +24,9 @@ type httpFrontend struct {
cfg *Config
zCtx ziti.Context
handler http.Handler
metrics *metricsAgent
}
func NewHTTP(cfg *Config) (*httpFrontend, error) {
ma, err := newMetricsAgent(cfg)
if err != nil {
return nil, err
}
go ma.run()
zCfgPath, err := zrokdir.ZitiIdentityFile(cfg.Identity)
if err != nil {
return nil, errors.Wrapf(err, "error getting ziti identity '%v' from zrokdir", cfg.Identity)
@ -44,7 +37,7 @@ func NewHTTP(cfg *Config) (*httpFrontend, error) {
}
zCfg.ConfigTypes = []string{model.ZrokProxyConfig}
zCtx := ziti.NewContextWithConfig(zCfg)
zDialCtx := zitiDialContext{ctx: zCtx, updates: ma.updates}
zDialCtx := zitiDialContext{ctx: zCtx}
zTransport := http.DefaultTransport.(*http.Transport).Clone()
zTransport.DialContext = zDialCtx.Dial
@ -59,7 +52,6 @@ func NewHTTP(cfg *Config) (*httpFrontend, error) {
cfg: cfg,
zCtx: zCtx,
handler: handler,
metrics: ma,
}, nil
}
@ -68,8 +60,7 @@ func (self *httpFrontend) Run() error {
}
type zitiDialContext struct {
ctx ziti.Context
updates chan metricsUpdate
ctx ziti.Context
}
func (self *zitiDialContext) Dial(_ context.Context, _ string, addr string) (net.Conn, error) {
@ -78,7 +69,7 @@ func (self *zitiDialContext) Dial(_ context.Context, _ string, addr string) (net
if err != nil {
return conn, err
}
return newMetricsConn(shrToken, conn, self.updates), nil
return conn, nil
}
func newServiceProxy(cfg *Config, ctx ziti.Context) (*httputil.ReverseProxy, error) {

View File

@ -1,109 +0,0 @@
package publicFrontend
import (
"github.com/openziti/sdk-golang/ziti"
"github.com/openziti/sdk-golang/ziti/config"
"github.com/openziti/zrok/model"
"github.com/openziti/zrok/zrokdir"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"gopkg.in/mgo.v2/bson"
"time"
)
type metricsAgent struct {
cfg *Config
accum map[string]model.SessionMetrics
updates chan metricsUpdate
lastSend time.Time
zCtx ziti.Context
}
type metricsUpdate struct {
id string
bytesRead int64
bytesWritten int64
}
func newMetricsAgent(cfg *Config) (*metricsAgent, error) {
zif, err := zrokdir.ZitiIdentityFile(cfg.Identity)
if err != nil {
return nil, errors.Wrapf(err, "error getting '%v' identity file", cfg.Identity)
}
zCfg, err := config.NewFromFile(zif)
if err != nil {
return nil, errors.Wrapf(err, "error loading '%v' identity", cfg.Identity)
}
logrus.Infof("loaded '%v' identity", cfg.Identity)
return &metricsAgent{
cfg: cfg,
accum: make(map[string]model.SessionMetrics),
updates: make(chan metricsUpdate, 10240),
lastSend: time.Now(),
zCtx: ziti.NewContextWithConfig(zCfg),
}, nil
}
func (ma *metricsAgent) run() {
for {
select {
case update := <-ma.updates:
ma.pushUpdate(update)
if time.Since(ma.lastSend) >= ma.cfg.Metrics.SendTimeout {
if err := ma.sendMetrics(); err != nil {
logrus.Errorf("error sending metrics: %v", err)
}
}
case <-time.After(5 * time.Second):
if err := ma.sendMetrics(); err != nil {
logrus.Errorf("error sending metrics: %v", err)
}
}
}
}
func (ma *metricsAgent) pushUpdate(mu metricsUpdate) {
if sm, found := ma.accum[mu.id]; found {
ma.accum[mu.id] = model.SessionMetrics{
BytesRead: sm.BytesRead + mu.bytesRead,
BytesWritten: sm.BytesWritten + mu.bytesWritten,
LastUpdate: time.Now().UnixMilli(),
}
} else {
ma.accum[mu.id] = model.SessionMetrics{
BytesRead: mu.bytesRead,
BytesWritten: mu.bytesWritten,
LastUpdate: time.Now().UnixMilli(),
}
}
}
func (ma *metricsAgent) sendMetrics() error {
if len(ma.accum) > 0 {
m := &model.Metrics{
Namespace: ma.cfg.Identity,
Sessions: ma.accum,
}
metricsJson, err := bson.Marshal(m)
if err != nil {
return errors.Wrap(err, "error marshaling metrics")
}
conn, err := ma.zCtx.Dial(ma.cfg.Metrics.Service)
if err != nil {
return errors.Wrap(err, "error connecting to metrics service")
}
n, err := conn.Write(metricsJson)
if err != nil {
return errors.Wrap(err, "error sending metrics")
}
defer func() { _ = conn.Close() }()
if n != len(metricsJson) {
return errors.Wrap(err, "short metrics write")
}
logrus.Infof("sent %d bytes of metrics data", n)
ma.accum = make(map[string]model.SessionMetrics)
ma.lastSend = time.Now()
}
return nil
}

View File

@ -1,52 +0,0 @@
package publicFrontend
import (
"net"
"time"
)
type metricsConn struct {
id string
conn net.Conn
updates chan metricsUpdate
}
func newMetricsConn(id string, conn net.Conn, updates chan metricsUpdate) *metricsConn {
return &metricsConn{id, conn, updates}
}
func (mc *metricsConn) Read(b []byte) (n int, err error) {
n, err = mc.conn.Read(b)
mc.updates <- metricsUpdate{mc.id, int64(n), 0}
return n, err
}
func (mc *metricsConn) Write(b []byte) (n int, err error) {
n, err = mc.conn.Write(b)
mc.updates <- metricsUpdate{mc.id, 0, int64(n)}
return n, err
}
func (mc *metricsConn) Close() error {
return mc.conn.Close()
}
func (mc *metricsConn) LocalAddr() net.Addr {
return mc.conn.LocalAddr()
}
func (mc *metricsConn) RemoteAddr() net.Addr {
return mc.conn.RemoteAddr()
}
func (mc *metricsConn) SetDeadline(t time.Time) error {
return mc.conn.SetDeadline(t)
}
func (mc *metricsConn) SetReadDeadline(t time.Time) error {
return mc.conn.SetReadDeadline(t)
}
func (mc *metricsConn) SetWriteDeadline(t time.Time) error {
return mc.conn.SetWriteDeadline(t)
}

20
etc/metrics.yml Normal file
View File

@ -0,0 +1,20 @@
# file source
#
source:
type: file
path: /tmp/fabric-usage.log
# websocket source
#
#source:
# type: websocket
# websocket_endpoint: wss://127.0.0.1:1280/fabric/v1/ws-api
# api_endpoint: https://127.0.0.1:1280
# username: admin
# password: ""
influx:
url: "http://127.0.0.1:8086"
bucket: zrok
org: zrok
token: ""

36
go.mod
View File

@ -6,13 +6,14 @@ require (
github.com/charmbracelet/bubbles v0.14.0
github.com/charmbracelet/bubbletea v0.23.1
github.com/charmbracelet/lipgloss v0.6.0
github.com/go-openapi/errors v0.20.2
github.com/go-openapi/loads v0.21.1
github.com/go-openapi/runtime v0.24.1
github.com/go-openapi/spec v0.20.6
github.com/go-openapi/errors v0.20.3
github.com/go-openapi/loads v0.21.2
github.com/go-openapi/runtime v0.25.0
github.com/go-openapi/spec v0.20.8
github.com/go-openapi/strfmt v0.21.3
github.com/go-openapi/swag v0.21.1
github.com/go-openapi/validate v0.22.0
github.com/go-openapi/swag v0.22.3
github.com/go-openapi/validate v0.22.1
github.com/gorilla/websocket v1.5.0
github.com/iancoleman/strcase v0.2.0
github.com/influxdata/influxdb-client-go/v2 v2.11.0
github.com/jaevor/go-nanoid v1.3.0
@ -24,7 +25,11 @@ require (
github.com/michaelquigley/cf v0.0.13
github.com/michaelquigley/pfxlog v0.6.9
github.com/muesli/reflow v0.3.0
github.com/nxadm/tail v1.4.8
github.com/openziti/channel/v2 v2.0.45
github.com/openziti/edge v0.22.39
github.com/openziti/fabric v0.22.59
github.com/openziti/identity v1.0.37
github.com/openziti/sdk-golang v0.18.61
github.com/pkg/errors v0.9.1
github.com/rubenv/sql-migrate v1.1.2
@ -34,7 +39,7 @@ require (
github.com/stretchr/testify v1.8.2
github.com/wneessen/go-mail v0.2.7
golang.org/x/crypto v0.6.0
golang.org/x/net v0.6.0
golang.org/x/net v0.8.0
golang.org/x/time v0.3.0
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
nhooyr.io/websocket v1.8.7
@ -53,8 +58,10 @@ require (
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/fullsailor/pkcs7 v0.0.0-20190404230743-d7302db945fa // indirect
github.com/go-gorp/gorp/v3 v3.0.2 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-openapi/analysis v0.21.2 // indirect
github.com/go-openapi/analysis v0.21.4 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
@ -82,9 +89,7 @@ require (
github.com/netfoundry/secretstream v0.1.4 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/openziti/channel/v2 v2.0.45 // indirect
github.com/openziti/foundation/v2 v2.0.15 // indirect
github.com/openziti/identity v1.0.37 // indirect
github.com/openziti/foundation/v2 v2.0.17 // indirect
github.com/openziti/metrics v1.2.10 // indirect
github.com/openziti/transport/v2 v2.0.63 // indirect
github.com/orcaman/concurrent-map/v2 v2.0.1 // indirect
@ -100,11 +105,14 @@ require (
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.mongodb.org/mongo-driver v1.10.0 // indirect
go.mozilla.org/pkcs7 v0.0.0-20200128120323-432b2356ecb1 // indirect
go.opentelemetry.io/otel v1.11.1 // indirect
go.opentelemetry.io/otel/trace v1.11.1 // indirect
golang.org/x/exp v0.0.0-20221031165847-c99f073a8326 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/term v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/term v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

64
go.sum
View File

@ -123,39 +123,47 @@ github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2
github.com/go-gorp/gorp/v3 v3.0.2 h1:ULqJXIekoqMx29FI5ekXXFoH1dT2Vc8UhnRzBg+Emz4=
github.com/go-gorp/gorp/v3 v3.0.2/go.mod h1:BJ3q1ejpV8cVALtcXvXaXyTOlMmJhWDxTmncaR6rwBY=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-openapi/analysis v0.21.2 h1:hXFrOYFHUAMQdu6zwAiKKJHJQ8kqZs1ux/ru1P1wLJU=
github.com/go-openapi/analysis v0.21.2/go.mod h1:HZwRk4RRisyG8vx2Oe6aqeSQcoxRp47Xkp3+K6q+LdY=
github.com/go-openapi/analysis v0.21.4 h1:ZDFLvSNxpDaomuCueM0BlSXxpANBlFYiBvr+GXrvIHc=
github.com/go-openapi/analysis v0.21.4/go.mod h1:4zQ35W4neeZTqh3ol0rv/O8JBbka9QyAgQRPp9y3pfo=
github.com/go-openapi/errors v0.19.8/go.mod h1:cM//ZKUKyO06HSwqAelJ5NsEMMcpa6VpXe8DOa1Mi1M=
github.com/go-openapi/errors v0.19.9/go.mod h1:cM//ZKUKyO06HSwqAelJ5NsEMMcpa6VpXe8DOa1Mi1M=
github.com/go-openapi/errors v0.20.2 h1:dxy7PGTqEh94zj2E3h1cUmQQWiM1+aeCROfAr02EmK8=
github.com/go-openapi/errors v0.20.2/go.mod h1:cM//ZKUKyO06HSwqAelJ5NsEMMcpa6VpXe8DOa1Mi1M=
github.com/go-openapi/errors v0.20.3 h1:rz6kiC84sqNQoqrtulzaL/VERgkoCyB6WdEkc2ujzUc=
github.com/go-openapi/errors v0.20.3/go.mod h1:Z3FlZ4I8jEGxjUK+bugx3on2mIAk4txuAOhlsB1FSgk=
github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY=
github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonreference v0.19.6/go.mod h1:diGHMEHg2IqXZGKxqyvWdfWU/aim5Dprw5bqpKkTvns=
github.com/go-openapi/jsonreference v0.20.0 h1:MYlu0sBgChmCfJxxUKZ8g1cPWFOB37YSZqewK7OKeyA=
github.com/go-openapi/jsonreference v0.20.0/go.mod h1:Ag74Ico3lPc+zR+qjn4XBUmXymS4zJbYVCZmcgkasdo=
github.com/go-openapi/loads v0.21.1 h1:Wb3nVZpdEzDTcly8S4HMkey6fjARRzb7iEaySimlDW0=
github.com/go-openapi/loads v0.21.1/go.mod h1:/DtAMXXneXFjbQMGEtbamCZb+4x7eGwkvZCvBmwUG+g=
github.com/go-openapi/runtime v0.24.1 h1:Sml5cgQKGYQHF+M7yYSHaH1eOjvTykrddTE/KtQVjqo=
github.com/go-openapi/runtime v0.24.1/go.mod h1:AKurw9fNre+h3ELZfk6ILsfvPN+bvvlaU/M9q/r9hpk=
github.com/go-openapi/loads v0.21.2 h1:r2a/xFIYeZ4Qd2TnGpWDIQNcP80dIaZgf704za8enro=
github.com/go-openapi/loads v0.21.2/go.mod h1:Jq58Os6SSGz0rzh62ptiu8Z31I+OTHqmULx5e/gJbNw=
github.com/go-openapi/runtime v0.25.0 h1:7yQTCdRbWhX8vnIjdzU8S00tBYf7Sg71EBeorlPHvhc=
github.com/go-openapi/runtime v0.25.0/go.mod h1:Ux6fikcHXyyob6LNWxtE96hWwjBPYF0DXgVFuMTneOs=
github.com/go-openapi/spec v0.20.4/go.mod h1:faYFR1CvsJZ0mNsmsphTMSoRrNV3TEDoAM7FOEWeq8I=
github.com/go-openapi/spec v0.20.6 h1:ich1RQ3WDbfoeTqTAb+5EIxNmpKVJZWBNah9RAT0jIQ=
github.com/go-openapi/spec v0.20.6/go.mod h1:2OpW+JddWPrpXSCIX8eOx7lZ5iyuWj3RYR6VaaBKcWA=
github.com/go-openapi/spec v0.20.8 h1:ubHmXNY3FCIOinT8RNrrPfGc9t7I1qhPtdOGoG2AxRU=
github.com/go-openapi/spec v0.20.8/go.mod h1:2OpW+JddWPrpXSCIX8eOx7lZ5iyuWj3RYR6VaaBKcWA=
github.com/go-openapi/strfmt v0.21.0/go.mod h1:ZRQ409bWMj+SOgXofQAGTIo2Ebu72Gs+WaRADcS5iNg=
github.com/go-openapi/strfmt v0.21.1/go.mod h1:I/XVKeLc5+MM5oPNN7P6urMOpuLXEcNrCX/rPGuWb0k=
github.com/go-openapi/strfmt v0.21.2/go.mod h1:I/XVKeLc5+MM5oPNN7P6urMOpuLXEcNrCX/rPGuWb0k=
github.com/go-openapi/strfmt v0.21.3 h1:xwhj5X6CjXEZZHMWy1zKJxvW9AfHC9pkyUjLvHtKG7o=
github.com/go-openapi/strfmt v0.21.3/go.mod h1:k+RzNO0Da+k3FrrynSNN8F7n/peCmQQqbbXjtDfvmGg=
github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
github.com/go-openapi/swag v0.19.15/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ=
github.com/go-openapi/swag v0.21.1 h1:wm0rhTb5z7qpJRHBdPOMuY4QjVUMbF6/kwoYeRAOrKU=
github.com/go-openapi/swag v0.21.1/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ=
github.com/go-openapi/validate v0.21.0/go.mod h1:rjnrwK57VJ7A8xqfpAOEKRH8yQSGUriMu5/zuPSQ1hg=
github.com/go-openapi/validate v0.22.0 h1:b0QecH6VslW/TxtpKgzpO1SNG7GU2FsaqKdP1E2T50Y=
github.com/go-openapi/validate v0.22.0/go.mod h1:rjnrwK57VJ7A8xqfpAOEKRH8yQSGUriMu5/zuPSQ1hg=
github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g=
github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14=
github.com/go-openapi/validate v0.22.1 h1:G+c2ub6q47kfX1sOBLwIQwzBVt8qmOAARyo/9Fqs9NU=
github.com/go-openapi/validate v0.22.1/go.mod h1:rjnrwK57VJ7A8xqfpAOEKRH8yQSGUriMu5/zuPSQ1hg=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q=
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
@ -168,7 +176,6 @@ github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LB
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-stack/stack v1.8.1/go.mod h1:dcoOX6HbPZSZptuspn9bctJ+N/CnF5gGygcUP3XYfe4=
github.com/gobuffalo/attrs v0.0.0-20190224210810-a9411de4debd/go.mod h1:4duuawTqi2wkkpB4ePgWMaai6/Kc6WEz83bhFwpHzj0=
github.com/gobuffalo/depgen v0.0.0-20190329151759-d478694a28d3/go.mod h1:3STtPUQYuzV0gBVOY3vy6CfMm/ljR4pABfrTeHNLHUY=
github.com/gobuffalo/depgen v0.1.0/go.mod h1:+ifsuy7fhi15RWncXQQKjWS9JPkdah5sZvtHc2RXGlg=
@ -285,6 +292,7 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORR
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
@ -438,7 +446,6 @@ github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:F
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.3.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
@ -466,6 +473,7 @@ github.com/netfoundry/secretstream v0.1.4/go.mod h1:4wlgjAsXXlEa0tNJ7XEYvoBh/Ev0
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
@ -483,8 +491,10 @@ github.com/openziti/channel/v2 v2.0.45 h1:d2tF0XYtGooyFcM/iS6U+6r882ii8KiPvjJlYb
github.com/openziti/channel/v2 v2.0.45/go.mod h1:wPGzSoQxBrHjomugsvFPHJDiwI868QmMCmWSqUxTxpw=
github.com/openziti/edge v0.22.39 h1:vxILPpjf4pk2Ha1ga3vlbvYrDFgoA71lc46z8fQIwGs=
github.com/openziti/edge v0.22.39/go.mod h1:0jSUt6SdibB9zYhYsCF8+NGpKyUwihm7N+tr05V7DPk=
github.com/openziti/foundation/v2 v2.0.15 h1:UoAzDy7mEAfrwJSLN9HeP7+S0FL37NWGHv9jd6fLdlU=
github.com/openziti/foundation/v2 v2.0.15/go.mod h1:gEr9J8XWjbCei7ckNiLBkBPB9btLEvRiRbsGjEuORxY=
github.com/openziti/fabric v0.22.59 h1:IAMezJjvYqp1LrIztyoImwutvoQRBO3zdJIvK/8q468=
github.com/openziti/fabric v0.22.59/go.mod h1:rpRBKmyJqYp6wXRc5WsNc2h+3b+rPHte35R8C6HAPZ8=
github.com/openziti/foundation/v2 v2.0.17 h1:VQzdZXXaNqJiPakZD5O7kEu0lbOoQBoziUhp4VTzKsE=
github.com/openziti/foundation/v2 v2.0.17/go.mod h1:lLIVb/foLtVQ1Gv0VnfVmnSg/vMKU1HslYMc6rEymf4=
github.com/openziti/identity v1.0.37 h1:StwVW0AIPOvC5x6GH2aVrDE5xF+GMMt+Ssr6L+YckEs=
github.com/openziti/identity v1.0.37/go.mod h1:N7LlrZD/nG0aUxkPfpcOnuBXl4RYxmlU/Vf/1FPzDcg=
github.com/openziti/metrics v1.2.10 h1:bef0MBIi8kot0im8u+twphXlyuW3bSDWdshW9Ceagh8=
@ -611,7 +621,6 @@ go.etcd.io/etcd/client/pkg/v3 v3.5.0/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3
go.etcd.io/etcd/client/v2 v2.305.0/go.mod h1:h9puh54ZTgAKtEbut2oe9P4L/oqKCVB6xsXlzd7alYQ=
go.mongodb.org/mongo-driver v1.7.3/go.mod h1:NqaYOwnXWr5Pm7AOpO5QFxKJ503nbMse/R79oO62zWg=
go.mongodb.org/mongo-driver v1.7.5/go.mod h1:VXEWRZ6URJIkUq2SCAyapmhH0ZLRBP+FT4xhp5Zvxng=
go.mongodb.org/mongo-driver v1.8.3/go.mod h1:0sQWfOeY63QTntERDJJ/0SuKK0T1uVSgKCuAROlKEPY=
go.mongodb.org/mongo-driver v1.10.0 h1:UtV6N5k14upNp4LTduX0QCufG124fSu25Wz9tu94GLg=
go.mongodb.org/mongo-driver v1.10.0/go.mod h1:wsihk0Kdgv8Kqu1Anit4sfK+22vSFbUrAVEYRhCXrA8=
go.mozilla.org/pkcs7 v0.0.0-20200128120323-432b2356ecb1 h1:A/5uWzF44DlIgdm/PQFwfMkW0JX+cIcQi/SwLAmZP5M=
@ -623,6 +632,11 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/otel v1.11.1 h1:4WLLAmcfkmDk2ukNXJyq3/kiz/3UzCaYq6PskJsaou4=
go.opentelemetry.io/otel v1.11.1/go.mod h1:1nNhXBbWSD0nsL38H6btgnFN2k4i0sNLHNNMZMSbUGE=
go.opentelemetry.io/otel/sdk v1.11.1 h1:F7KmQgoHljhUuJyA+9BiU+EkJfyX5nVVF4wyzWZpKxs=
go.opentelemetry.io/otel/trace v1.11.1 h1:ofxdnzsNrGBYXbP7t7zpUK281+go5rF7dvdIZXF8gdQ=
go.opentelemetry.io/otel/trace v1.11.1/go.mod h1:f/Q9G7vzk5u91PhbmKbg1Qn0rzH1LJ4vbPHFGkTPtOk=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
@ -638,7 +652,6 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
@ -723,9 +736,8 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b
golang.org/x/net v0.0.0-20210421230115-4e50805a0758/go.mod h1:72T/g9IO56b78aLF+1Kcs5dz7/ng1VjMUvfKvpfy+jM=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.6.0 h1:L4ZwwTvKW9gr0ZMS1yrHD9GZhIuVjOBBnaKH+SPQK0Q=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@ -813,20 +825,20 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw=
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -836,8 +848,8 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=