mirror of
https://github.com/openziti/zrok.git
synced 2024-11-26 18:13:52 +01:00
websocket source (#128)
This commit is contained in:
parent
7a1d491b99
commit
755315e598
@ -15,7 +15,7 @@ type FileSourceConfig struct {
|
|||||||
IndexPath string
|
IndexPath string
|
||||||
}
|
}
|
||||||
|
|
||||||
func loadFileSourceConfig(v interface{}, opts *cf.Options) (interface{}, error) {
|
func loadFileSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) {
|
||||||
if submap, ok := v.(map[string]interface{}); ok {
|
if submap, ok := v.(map[string]interface{}); ok {
|
||||||
cfg := &FileSourceConfig{}
|
cfg := &FileSourceConfig{}
|
||||||
if err := cf.Bind(cfg, submap, cf.DefaultOptions()); err != nil {
|
if err := cf.Bind(cfg, submap, cf.DefaultOptions()); err != nil {
|
||||||
|
@ -1,20 +1,162 @@
|
|||||||
package metrics
|
package metrics
|
||||||
|
|
||||||
import "github.com/michaelquigley/cf"
|
import (
|
||||||
|
"bytes"
|
||||||
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
|
"encoding/json"
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
"github.com/michaelquigley/cf"
|
||||||
|
"github.com/openziti/channel/v2"
|
||||||
|
"github.com/openziti/channel/v2/websockets"
|
||||||
|
"github.com/openziti/edge/rest_util"
|
||||||
|
"github.com/openziti/fabric/event"
|
||||||
|
"github.com/openziti/fabric/pb/mgmt_pb"
|
||||||
|
"github.com/openziti/identity"
|
||||||
|
"github.com/openziti/sdk-golang/ziti/constants"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
type WebsocketSourceConfig struct {
|
type WebsocketSourceConfig struct {
|
||||||
WebsocketEndpoint string
|
WebsocketEndpoint string
|
||||||
|
ApiEndpoint string
|
||||||
|
Username string
|
||||||
|
Password string
|
||||||
}
|
}
|
||||||
|
|
||||||
func loadWebsocketSourceConfig(v interface{}, opts *cf.Options) (interface{}, error) {
|
func loadWebsocketSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) {
|
||||||
return nil, nil
|
if submap, ok := v.(map[string]interface{}); ok {
|
||||||
|
cfg := &WebsocketSourceConfig{}
|
||||||
|
if err := cf.Bind(cfg, submap, cf.DefaultOptions()); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &websocketSource{cfg: cfg}, nil
|
||||||
|
}
|
||||||
|
return nil, errors.New("invalid config structure for 'websocket' source")
|
||||||
}
|
}
|
||||||
|
|
||||||
type websocketSource struct{}
|
type websocketSource struct {
|
||||||
|
cfg *WebsocketSourceConfig
|
||||||
|
ch channel.Channel
|
||||||
|
events chan map[string]interface{}
|
||||||
|
join chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *websocketSource) Start(events chan map[string]interface{}) (chan struct{}, error) {
|
func (s *websocketSource) Start(events chan map[string]interface{}) (chan struct{}, error) {
|
||||||
return nil, nil
|
caCerts, err := rest_util.GetControllerWellKnownCas(s.cfg.ApiEndpoint)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
caPool := x509.NewCertPool()
|
||||||
|
for _, ca := range caCerts {
|
||||||
|
caPool.AddCert(ca)
|
||||||
|
}
|
||||||
|
|
||||||
|
authenticator := rest_util.NewAuthenticatorUpdb(s.cfg.Username, s.cfg.Password)
|
||||||
|
authenticator.RootCas = caPool
|
||||||
|
|
||||||
|
apiEndpointUrl, err := url.Parse(s.cfg.ApiEndpoint)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
apiSession, err := authenticator.Authenticate(apiEndpointUrl)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
dialer := &websocket.Dialer{
|
||||||
|
TLSClientConfig: &tls.Config{
|
||||||
|
RootCAs: caPool,
|
||||||
|
},
|
||||||
|
HandshakeTimeout: 5 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, resp, err := dialer.Dial(s.cfg.WebsocketEndpoint, http.Header{constants.ZitiSession: []string{*apiSession.Token}})
|
||||||
|
if err != nil {
|
||||||
|
if resp != nil {
|
||||||
|
if body, rerr := io.ReadAll(resp.Body); rerr == nil {
|
||||||
|
logrus.Errorf("response body '%v': %v", string(body), err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logrus.Errorf("no response from websocket dial: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
id := &identity.TokenId{Token: "mgmt"}
|
||||||
|
underlayFactory := websockets.NewUnderlayFactory(id, conn, nil)
|
||||||
|
|
||||||
|
s.join = make(chan struct{})
|
||||||
|
s.events = events
|
||||||
|
bindHandler := func(binding channel.Binding) error {
|
||||||
|
binding.AddReceiveHandler(int32(mgmt_pb.ContentType_StreamEventsEventType), s)
|
||||||
|
binding.AddCloseHandler(channel.CloseHandlerF(func(ch channel.Channel) {
|
||||||
|
close(s.join)
|
||||||
|
}))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
s.ch, err = channel.NewChannel("mgmt", underlayFactory, channel.BindHandlerF(bindHandler), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
streamEventsRequest := map[string]interface{}{}
|
||||||
|
streamEventsRequest["format"] = "json"
|
||||||
|
streamEventsRequest["subscriptions"] = []*event.Subscription{
|
||||||
|
{
|
||||||
|
Type: "fabric.usage",
|
||||||
|
Options: map[string]interface{}{
|
||||||
|
"version": uint8(3),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
msgBytes, err := json.Marshal(streamEventsRequest)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
requestMsg := channel.NewMessage(int32(mgmt_pb.ContentType_StreamEventsRequestType), msgBytes)
|
||||||
|
responseMsg, err := requestMsg.WithTimeout(5 * time.Second).SendForReply(s.ch)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if responseMsg.ContentType == channel.ContentTypeResultType {
|
||||||
|
result := channel.UnmarshalResult(responseMsg)
|
||||||
|
if result.Success {
|
||||||
|
logrus.Infof("event stream started: %v", result.Message)
|
||||||
|
} else {
|
||||||
|
return nil, errors.Wrap(err, "error starting event streaming")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return nil, errors.Errorf("unexpected response type %v", responseMsg.ContentType)
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.join, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *websocketSource) Stop() {
|
func (s *websocketSource) Stop() {
|
||||||
|
_ = s.ch.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *websocketSource) HandleReceive(msg *channel.Message, _ channel.Channel) {
|
||||||
|
decoder := json.NewDecoder(bytes.NewReader(msg.Body))
|
||||||
|
for {
|
||||||
|
ev := make(map[string]interface{})
|
||||||
|
err := decoder.Decode(&ev)
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
s.events <- ev
|
||||||
|
} else {
|
||||||
|
logrus.Errorf("error parsing '%v': %v", string(msg.Body), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,3 +2,9 @@ source:
|
|||||||
type: file
|
type: file
|
||||||
path: /tmp/fabric-usage.log
|
path: /tmp/fabric-usage.log
|
||||||
|
|
||||||
|
#source:
|
||||||
|
# type: websocket
|
||||||
|
# websocket_endpoint: wss://127.0.0.1:1280/fabric/v1/ws-api
|
||||||
|
# api_endpoint: https://127.0.0.1:1280
|
||||||
|
# username: admin
|
||||||
|
# password: ""
|
Loading…
Reference in New Issue
Block a user