From c2720823a67961b60d08734e16e0cfc9be9a811e Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Thu, 1 May 2025 15:17:57 -0400 Subject: [PATCH] waiting for ziti@v1.6.0 to resolve channel/v3 indirect dependency --- CHANGELOG.md | 2 - controller/metrics/websocketSource.go | 157 ++++++++++++++++++++++++++ go.mod | 14 ++- go.sum | 16 ++- 4 files changed, 181 insertions(+), 8 deletions(-) create mode 100644 controller/metrics/websocketSource.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 791ce4bf..c11efc45 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,8 +2,6 @@ ## v1.0.4 -REMOVED: The `websocket`-based OpenZiti metrics source has been removed. The preferred metrics sources are AMQP or file-based ingestion - CHANGE: `github.com/openziti/sdk-golang` has been updated to `v1.1.0`. Related dependencies and indirects also updated CHANGE: Updated to `golang` `v1.24` as the official build toolchain diff --git a/controller/metrics/websocketSource.go b/controller/metrics/websocketSource.go new file mode 100644 index 00000000..a4361c54 --- /dev/null +++ b/controller/metrics/websocketSource.go @@ -0,0 +1,157 @@ +package metrics + +import ( + "crypto/tls" + "crypto/x509" + "encoding/json" + "github.com/gorilla/websocket" + "github.com/michaelquigley/cf" + "github.com/openziti/channel/v4" + "github.com/openziti/channel/v4/websockets" + "github.com/openziti/edge-api/rest_util" + "github.com/openziti/identity" + "github.com/openziti/ziti/common/pb/mgmt_pb" + "github.com/openziti/ziti/controller/event" + "github.com/openziti/zrok/controller/env" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "io" + "net/http" + "net/url" + "time" +) + +const ZitiSession = "zt-session" + +func init() { + env.GetCfOptions().AddFlexibleSetter("websocketSource", loadWebsocketSourceConfig) +} + +type WebsocketSourceConfig struct { + WebsocketEndpoint string // wss://127.0.0.1:1280/fabric/v1/ws-api + ApiEndpoint string // https://127.0.0.1:1280 + Username string + Password string `cf:"+secret"` +} + +func loadWebsocketSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) { + if submap, ok := v.(map[string]interface{}); ok { + cfg := &WebsocketSourceConfig{} + if err := cf.Bind(cfg, submap, cf.DefaultOptions()); err != nil { + return nil, err + } + return &websocketSource{cfg: cfg}, nil + } + return nil, errors.New("invalid config structure for 'websocketSource'") +} + +type websocketSource struct { + cfg *WebsocketSourceConfig + ch channel.Channel + events chan ZitiEventMsg + join chan struct{} +} + +func (s *websocketSource) Start(events chan ZitiEventMsg) (join chan struct{}, err error) { + caCerts, err := rest_util.GetControllerWellKnownCas(s.cfg.ApiEndpoint) + if err != nil { + return nil, err + } + caPool := x509.NewCertPool() + for _, ca := range caCerts { + caPool.AddCert(ca) + } + + authenticator := rest_util.NewAuthenticatorUpdb(s.cfg.Username, s.cfg.Password) + authenticator.RootCas = caPool + + apiEndpointUrl, err := url.Parse(s.cfg.ApiEndpoint) + if err != nil { + return nil, err + } + apiSession, err := authenticator.Authenticate(apiEndpointUrl) + if err != nil { + return nil, err + } + + dialer := &websocket.Dialer{ + TLSClientConfig: &tls.Config{ + RootCAs: caPool, + }, + HandshakeTimeout: 5 * time.Second, + } + + conn, resp, err := dialer.Dial(s.cfg.WebsocketEndpoint, http.Header{ZitiSession: []string{*apiSession.Token}}) + if err != nil { + if resp != nil { + if body, rerr := io.ReadAll(resp.Body); rerr == nil { + logrus.Errorf("response body '%v': %v", string(body), err) + } + } else { + logrus.Errorf("no response from websocket dial: %v", err) + } + } + + id := &identity.TokenId{Token: "mgmt"} + underlayFactory := websockets.NewUnderlayFactory(id, conn, nil) + + s.join = make(chan struct{}) + s.events = events + bindHandler := func(binding channel.Binding) error { + binding.AddReceiveHandler(int32(mgmt_pb.ContentType_StreamEventsEventType), s) + binding.AddCloseHandler(channel.CloseHandlerF(func(ch channel.Channel) { + close(s.join) + })) + return nil + } + + s.ch, err = channel.NewChannel("mgmt", underlayFactory, channel.BindHandlerF(bindHandler), nil) + if err != nil { + return nil, err + } + + streamEventsRequest := map[string]interface{}{} + streamEventsRequest["format"] = "json" + streamEventsRequest["subscriptions"] = []*event.Subscription{ + { + Type: "fabric.usage", + Options: map[string]interface{}{ + "version": uint8(3), + }, + }, + } + + msgBytes, err := json.Marshal(streamEventsRequest) + if err != nil { + return nil, err + } + + requestMsg := channel.NewMessage(int32(mgmt_pb.ContentType_StreamEventsRequestType), msgBytes) + responseMsg, err := requestMsg.WithTimeout(5 * time.Second).SendForReply(s.ch) + if err != nil { + return nil, err + } + + if responseMsg.ContentType == channel.ContentTypeResultType { + result := channel.UnmarshalResult(responseMsg) + if result.Success { + logrus.Infof("event stream started: %v", result.Message) + } else { + return nil, errors.Wrap(err, "error starting event streaming") + } + } else { + return nil, errors.Errorf("unexpected response type %v", responseMsg.ContentType) + } + + return s.join, nil +} + +func (s *websocketSource) Stop() { + _ = s.ch.Close() +} + +func (s *websocketSource) HandleReceive(msg *channel.Message, _ channel.Channel) { + s.events <- &ZitiEventJsonMsg{ + data: ZitiEventJson(msg.Body), + } +} diff --git a/go.mod b/go.mod index b48ab2a9..2c71fd60 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/golang-jwt/jwt/v5 v5.2.2 github.com/google/go-cmp v0.7.0 github.com/google/uuid v1.6.0 + github.com/gorilla/websocket v1.5.3 github.com/greenpau/caddy-security v1.1.29 github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0 github.com/iancoleman/strcase v0.3.0 @@ -38,10 +39,13 @@ require ( github.com/net-byte/vtun v1.7.0 github.com/net-byte/water v0.0.7 github.com/nxadm/tail v1.4.8 + github.com/openziti/channel/v4 v4.0.6 github.com/openziti/cobra-to-md v1.0.1 - github.com/openziti/edge-api v0.26.42 + github.com/openziti/edge-api v0.26.43 + github.com/openziti/identity v1.0.101 github.com/openziti/sdk-golang v1.1.0 github.com/openziti/transport/v2 v2.0.168 + github.com/openziti/ziti v1.5.4 github.com/orcaman/concurrent-map/v2 v2.0.1 github.com/pkg/errors v0.9.1 github.com/rabbitmq/amqp091-go v1.10.0 @@ -85,6 +89,7 @@ require ( github.com/aymanbagabas/go-osc52 v1.0.3 // indirect github.com/beevik/etree v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/biogo/store v0.0.0-20200525035639-8c94ae1e7c9c // indirect github.com/caddyserver/certmagic v0.21.3 // indirect github.com/caddyserver/zerossl v0.1.3 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect @@ -143,7 +148,6 @@ require ( github.com/gorilla/mux v1.8.1 // indirect github.com/gorilla/schema v1.4.1 // indirect github.com/gorilla/securecookie v1.1.2 // indirect - github.com/gorilla/websocket v1.5.3 // indirect github.com/greenpau/go-authcrunch v1.1.4 // indirect github.com/greenpau/versioned v1.0.30 // indirect github.com/huandu/xstrings v1.4.0 // indirect @@ -190,11 +194,13 @@ require ( github.com/oklog/ulid v1.3.1 // indirect github.com/onsi/ginkgo/v2 v2.13.2 // indirect github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b // indirect - github.com/openziti/channel/v4 v4.0.6 // indirect + github.com/openziti/channel/v3 v3.0.39 // indirect + github.com/openziti/foundation v0.17.22 // indirect github.com/openziti/foundation/v2 v2.0.60 // indirect - github.com/openziti/identity v1.0.101 // indirect github.com/openziti/metrics v1.4.1 // indirect github.com/openziti/secretstream v0.1.32 // indirect + github.com/openziti/storage v0.4.7 // indirect + github.com/openziti/transport v0.1.5 // indirect github.com/parallaxsecond/parsec-client-go v0.0.0-20221025095442-f0a77d263cf9 // indirect github.com/pires/go-proxyproto v0.7.0 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect diff --git a/go.sum b/go.sum index f611aa3c..cd687b6a 100644 --- a/go.sum +++ b/go.sum @@ -136,6 +136,8 @@ github.com/beevik/etree v1.3.0/go.mod h1:aiPf89g/1k3AShMVAzriilpcE4R/Vuor90y83zV github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/biogo/store v0.0.0-20200525035639-8c94ae1e7c9c h1:qq7IeRYvCmew9ThhcfslD5XD25P/UsBxsTozk6ywF+A= +github.com/biogo/store v0.0.0-20200525035639-8c94ae1e7c9c/go.mod h1:wdbXg77soR6ESRprAMEwAQDFtLT6EAGF5o1GRy0cB5k= github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM= github.com/caddyserver/caddy/v2 v2.8.4 h1:q3pe0wpBj1OcHFZ3n/1nl4V4bxBrYoSoab7rL9BMYNk= github.com/caddyserver/caddy/v2 v2.8.4/go.mod h1:vmDAHp3d05JIvuhc24LmnxVlsZmWnUwbP5WMjzcMPWw= @@ -735,12 +737,16 @@ github.com/onsi/gomega v1.29.0 h1:KIA/t2t5UBzoirT4H9tsML45GEbo3ouUnBHsCfD2tVg= github.com/onsi/gomega v1.29.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ= github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b h1:FfH+VrHHk6Lxt9HdVS0PXzSXFyS2NbZKXv33FYPol0A= github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b/go.mod h1:AC62GU6hc0BrNm+9RK9VSiwa/EUe1bkIeFORAMcHvJU= +github.com/openziti/channel/v3 v3.0.39 h1:UM0iY0tbz4EbOVT3tX4mfN1wSAXxkkWIrKmQ7RhE/Hg= +github.com/openziti/channel/v3 v3.0.39/go.mod h1:7k3mQhtWlgX0HaQBkllDTOH5WAf7DcyyMLqJXrL+/fI= github.com/openziti/channel/v4 v4.0.6 h1:pcIn9xNnQXnenHoaOt347nG0dmnsisomnEIPq4F5YHI= github.com/openziti/channel/v4 v4.0.6/go.mod h1:sU3S/MToQHES1V22O+xtQbX0QSteoMo6RzQiEMfir+w= github.com/openziti/cobra-to-md v1.0.1 h1:WRinNoIRmwWUSJm+pSNXMjOrtU48oxXDZgeCYQfVXxE= github.com/openziti/cobra-to-md v1.0.1/go.mod h1:FjCpk/yzHF7/r28oSTNr5P57yN5VolpdAtS/g7KNi2c= -github.com/openziti/edge-api v0.26.42 h1:Wi/BUttSUvedT9XGht7vi/zI/TNGc3ApvjkAviWhauA= -github.com/openziti/edge-api v0.26.42/go.mod h1:sYHVpm26Jr1u7VooNJzTb2b2nGSlmCHMnbGC8XfWSng= +github.com/openziti/edge-api v0.26.43 h1:z5QxsaSr4DohfgsJ0fF7Ey94ZBKsYfFN1IsigA+Ev2w= +github.com/openziti/edge-api v0.26.43/go.mod h1:sYHVpm26Jr1u7VooNJzTb2b2nGSlmCHMnbGC8XfWSng= +github.com/openziti/foundation v0.17.22 h1:zFfgNHdLocEcoBqtognpJnYsVQaTm28EwqMNjBw8avk= +github.com/openziti/foundation v0.17.22/go.mod h1:Nnr0vsqovHJimkdBHf2s4ZJAgvNhg6UdVqP+s0nY3P4= github.com/openziti/foundation/v2 v2.0.60 h1:YWXbtqptZwSwATF6NQ76phpzVHzRycuAf+B5Fhch/Wg= github.com/openziti/foundation/v2 v2.0.60/go.mod h1:zhNVpu5e295IRDSTblhpagoJHIwyL1y00V4wyxinNjw= github.com/openziti/identity v1.0.101 h1:XujPT6eCv3Gqsodh9846EoWfQZwDAFrjPLxm1cipveY= @@ -751,8 +757,14 @@ github.com/openziti/sdk-golang v1.1.0 h1:g92rQhxwkAo7HGqB/4CMiGy50HzPKQ+DWk1wRUV github.com/openziti/sdk-golang v1.1.0/go.mod h1:XANR3SLMl1rmtHvjxvO1ojyJSd0ExB+gnjaqLoOxmCU= github.com/openziti/secretstream v0.1.32 h1:89/ZVcwIQjdVmWDfVRfMEChJJXTLXJ59AYBw5j646M4= github.com/openziti/secretstream v0.1.32/go.mod h1:8YaIbjyMwBeKQ7eOYcoVPKHT10u+4OVPXpnZAeDzC6o= +github.com/openziti/storage v0.4.7 h1:3phONUaSrcH2m6D6eQT4DG6b7eZEW9IuewfG2RaUCtA= +github.com/openziti/storage v0.4.7/go.mod h1:2iqAlZPvGhXErIgGwvpmWC2f9EdVKcdtX622Ht8c2vw= +github.com/openziti/transport v0.1.5 h1:+MgLyUDOsDdixm7GT9TgHd2pBIREf2AHsPa2D6mMNhY= +github.com/openziti/transport v0.1.5/go.mod h1:NK1wWR16q1IBgvItzruGtvDc2C7AaJpRQqryC2tGIgA= github.com/openziti/transport/v2 v2.0.168 h1:1Anf7X+4xmSKQ12GdPJFhoMZi04QxgD4MJu3agFc1R4= github.com/openziti/transport/v2 v2.0.168/go.mod h1:vE9FGxPB6I89SWun5mOz3Tuz2QDctwNfB4oqDIdzPoM= +github.com/openziti/ziti v1.5.4 h1:5pUW1MCqSQNIjgynFa0JIyweCR29lWHawJLvjamc2hM= +github.com/openziti/ziti v1.5.4/go.mod h1:zUXQWI1jx4ScbTqSupRNx7+daIIT2zKHA/yzIKYJN7k= github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= github.com/parallaxsecond/parsec-client-go v0.0.0-20221025095442-f0a77d263cf9 h1:mOvehYivJ4Aqu2CPe3D3lv8jhqOI9/1o0THxJHBE0qw=