diff --git a/controller/metrics/fileSource.go b/controller/metrics/fileSource.go index 3112a106..b85bd770 100644 --- a/controller/metrics/fileSource.go +++ b/controller/metrics/fileSource.go @@ -15,7 +15,7 @@ type FileSourceConfig struct { IndexPath string } -func loadFileSourceConfig(v interface{}, opts *cf.Options) (interface{}, error) { +func loadFileSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) { if submap, ok := v.(map[string]interface{}); ok { cfg := &FileSourceConfig{} if err := cf.Bind(cfg, submap, cf.DefaultOptions()); err != nil { diff --git a/controller/metrics/websocketSource.go b/controller/metrics/websocketSource.go index c0c0afe3..4e757c3b 100644 --- a/controller/metrics/websocketSource.go +++ b/controller/metrics/websocketSource.go @@ -1,20 +1,162 @@ package metrics -import "github.com/michaelquigley/cf" +import ( + "bytes" + "crypto/tls" + "crypto/x509" + "encoding/json" + "github.com/gorilla/websocket" + "github.com/michaelquigley/cf" + "github.com/openziti/channel/v2" + "github.com/openziti/channel/v2/websockets" + "github.com/openziti/edge/rest_util" + "github.com/openziti/fabric/event" + "github.com/openziti/fabric/pb/mgmt_pb" + "github.com/openziti/identity" + "github.com/openziti/sdk-golang/ziti/constants" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "io" + "net/http" + "net/url" + "time" +) type WebsocketSourceConfig struct { WebsocketEndpoint string + ApiEndpoint string + Username string + Password string } -func loadWebsocketSourceConfig(v interface{}, opts *cf.Options) (interface{}, error) { - return nil, nil +func loadWebsocketSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) { + if submap, ok := v.(map[string]interface{}); ok { + cfg := &WebsocketSourceConfig{} + if err := cf.Bind(cfg, submap, cf.DefaultOptions()); err != nil { + return nil, err + } + return &websocketSource{cfg: cfg}, nil + } + return nil, errors.New("invalid config structure for 'websocket' source") } -type websocketSource struct{} +type websocketSource struct { + cfg *WebsocketSourceConfig + ch channel.Channel + events chan map[string]interface{} + join chan struct{} +} func (s *websocketSource) Start(events chan map[string]interface{}) (chan struct{}, error) { - return nil, nil + caCerts, err := rest_util.GetControllerWellKnownCas(s.cfg.ApiEndpoint) + if err != nil { + return nil, err + } + caPool := x509.NewCertPool() + for _, ca := range caCerts { + caPool.AddCert(ca) + } + + authenticator := rest_util.NewAuthenticatorUpdb(s.cfg.Username, s.cfg.Password) + authenticator.RootCas = caPool + + apiEndpointUrl, err := url.Parse(s.cfg.ApiEndpoint) + if err != nil { + return nil, err + } + apiSession, err := authenticator.Authenticate(apiEndpointUrl) + if err != nil { + return nil, err + } + + dialer := &websocket.Dialer{ + TLSClientConfig: &tls.Config{ + RootCAs: caPool, + }, + HandshakeTimeout: 5 * time.Second, + } + + conn, resp, err := dialer.Dial(s.cfg.WebsocketEndpoint, http.Header{constants.ZitiSession: []string{*apiSession.Token}}) + if err != nil { + if resp != nil { + if body, rerr := io.ReadAll(resp.Body); rerr == nil { + logrus.Errorf("response body '%v': %v", string(body), err) + } + } else { + logrus.Errorf("no response from websocket dial: %v", err) + } + } + + id := &identity.TokenId{Token: "mgmt"} + underlayFactory := websockets.NewUnderlayFactory(id, conn, nil) + + s.join = make(chan struct{}) + s.events = events + bindHandler := func(binding channel.Binding) error { + binding.AddReceiveHandler(int32(mgmt_pb.ContentType_StreamEventsEventType), s) + binding.AddCloseHandler(channel.CloseHandlerF(func(ch channel.Channel) { + close(s.join) + })) + return nil + } + + s.ch, err = channel.NewChannel("mgmt", underlayFactory, channel.BindHandlerF(bindHandler), nil) + if err != nil { + return nil, err + } + + streamEventsRequest := map[string]interface{}{} + streamEventsRequest["format"] = "json" + streamEventsRequest["subscriptions"] = []*event.Subscription{ + { + Type: "fabric.usage", + Options: map[string]interface{}{ + "version": uint8(3), + }, + }, + } + + msgBytes, err := json.Marshal(streamEventsRequest) + if err != nil { + return nil, err + } + + requestMsg := channel.NewMessage(int32(mgmt_pb.ContentType_StreamEventsRequestType), msgBytes) + responseMsg, err := requestMsg.WithTimeout(5 * time.Second).SendForReply(s.ch) + if err != nil { + return nil, err + } + + if responseMsg.ContentType == channel.ContentTypeResultType { + result := channel.UnmarshalResult(responseMsg) + if result.Success { + logrus.Infof("event stream started: %v", result.Message) + } else { + return nil, errors.Wrap(err, "error starting event streaming") + } + } else { + return nil, errors.Errorf("unexpected response type %v", responseMsg.ContentType) + } + + return s.join, nil } func (s *websocketSource) Stop() { + _ = s.ch.Close() +} + +func (s *websocketSource) HandleReceive(msg *channel.Message, _ channel.Channel) { + decoder := json.NewDecoder(bytes.NewReader(msg.Body)) + for { + ev := make(map[string]interface{}) + err := decoder.Decode(&ev) + if err == io.EOF { + break + } + if err == nil { + s.events <- ev + } else { + logrus.Errorf("error parsing '%v': %v", string(msg.Body), err) + } + } } diff --git a/etc/metrics.yml b/etc/metrics.yml index b0fc6308..0293f52d 100644 --- a/etc/metrics.yml +++ b/etc/metrics.yml @@ -1,4 +1,10 @@ source: type: file path: /tmp/fabric-usage.log - \ No newline at end of file + +#source: +# type: websocket +# websocket_endpoint: wss://127.0.0.1:1280/fabric/v1/ws-api +# api_endpoint: https://127.0.0.1:1280 +# username: admin +# password: "" \ No newline at end of file