mirror of
https://github.com/openziti/zrok.git
synced 2025-01-11 00:18:43 +01:00
vestigial websocket stuff (#128)
This commit is contained in:
parent
deeeaaa1c3
commit
396f892625
@ -1,39 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/michaelquigley/cf"
|
||||
"github.com/openziti/zrok/controller"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
func init() {
|
||||
rootCmd.AddCommand(newUsageCommand().cmd)
|
||||
}
|
||||
|
||||
type usageCommand struct {
|
||||
cmd *cobra.Command
|
||||
}
|
||||
|
||||
func newUsageCommand() *usageCommand {
|
||||
cmd := &cobra.Command{
|
||||
Use: "usage <configPath>",
|
||||
Short: "Start a zrok metrics agent",
|
||||
Args: cobra.ExactArgs(1),
|
||||
}
|
||||
command := &usageCommand{cmd: cmd}
|
||||
cmd.Run = command.run
|
||||
return command
|
||||
}
|
||||
|
||||
func (cmd *usageCommand) run(_ *cobra.Command, args []string) {
|
||||
cfg, err := controller.LoadConfig(args[0])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
logrus.Infof(cf.Dump(cfg, cf.DefaultOptions()))
|
||||
|
||||
if err := controller.RunUsageAgent(cfg); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
@ -1,140 +0,0 @@
|
||||
package controller
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/json"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/openziti/channel/v2"
|
||||
"github.com/openziti/channel/v2/websockets"
|
||||
"github.com/openziti/edge/rest_util"
|
||||
"github.com/openziti/fabric/event"
|
||||
"github.com/openziti/fabric/pb/mgmt_pb"
|
||||
"github.com/openziti/identity"
|
||||
"github.com/openziti/sdk-golang/ziti/constants"
|
||||
"github.com/openziti/zrok/controller/metrics"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
)
|
||||
|
||||
func RunUsageAgent(cfg *Config) error {
|
||||
zitiApiEndpointUrl, err := url.Parse(cfg.Ziti.ApiEndpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
wsUrl := "wss://" + zitiApiEndpointUrl.Host + "/fabric/v1/ws-api"
|
||||
logrus.Infof("wsUrl: %v", wsUrl)
|
||||
|
||||
caCerts, err := rest_util.GetControllerWellKnownCas(cfg.Ziti.ApiEndpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
caPool := x509.NewCertPool()
|
||||
for _, ca := range caCerts {
|
||||
caPool.AddCert(ca)
|
||||
}
|
||||
|
||||
authenticator := rest_util.NewAuthenticatorUpdb(cfg.Ziti.Username, cfg.Ziti.Password)
|
||||
authenticator.RootCas = caPool
|
||||
|
||||
apiSession, err := authenticator.Authenticate(zitiApiEndpointUrl)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dialer := &websocket.Dialer{
|
||||
TLSClientConfig: &tls.Config{
|
||||
RootCAs: caPool,
|
||||
},
|
||||
HandshakeTimeout: 5 * time.Second,
|
||||
}
|
||||
|
||||
conn, resp, err := dialer.Dial(wsUrl, http.Header{constants.ZitiSession: []string{*apiSession.Token}})
|
||||
if err != nil {
|
||||
if resp != nil {
|
||||
if body, rerr := io.ReadAll(resp.Body); rerr == nil {
|
||||
logrus.Errorf("response body '%v': %v", string(body), err)
|
||||
}
|
||||
} else {
|
||||
logrus.Errorf("no response from websocket dial: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
id := &identity.TokenId{Token: "mgmt"}
|
||||
underlayFactory := websockets.NewUnderlayFactory(id, conn, nil)
|
||||
|
||||
closeNotify := make(chan struct{})
|
||||
|
||||
bindHandler := func(binding channel.Binding) error {
|
||||
binding.AddReceiveHandler(int32(mgmt_pb.ContentType_StreamEventsEventType), &usageAgent{})
|
||||
binding.AddCloseHandler(channel.CloseHandlerF(func(ch channel.Channel) {
|
||||
close(closeNotify)
|
||||
}))
|
||||
return nil
|
||||
}
|
||||
|
||||
ch, err := channel.NewChannel("mgmt", underlayFactory, channel.BindHandlerF(bindHandler), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
streamEventsRequest := map[string]interface{}{}
|
||||
streamEventsRequest["format"] = "json"
|
||||
streamEventsRequest["subscriptions"] = []*event.Subscription{
|
||||
{
|
||||
Type: "fabric.usage",
|
||||
Options: map[string]interface{}{
|
||||
"version": uint8(3),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
msgBytes, err := json.Marshal(streamEventsRequest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
requestMsg := channel.NewMessage(int32(mgmt_pb.ContentType_StreamEventsRequestType), msgBytes)
|
||||
responseMsg, err := requestMsg.WithTimeout(5 * time.Second).SendForReply(ch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if responseMsg.ContentType == channel.ContentTypeResultType {
|
||||
result := channel.UnmarshalResult(responseMsg)
|
||||
if result.Success {
|
||||
logrus.Infof("event stream started: %v", result.Message)
|
||||
} else {
|
||||
return errors.Wrap(err, "error starting event streaming")
|
||||
}
|
||||
} else {
|
||||
return errors.Errorf("unexpected response type %v", responseMsg.ContentType)
|
||||
}
|
||||
|
||||
<-closeNotify
|
||||
return nil
|
||||
}
|
||||
|
||||
type usageAgent struct{}
|
||||
|
||||
func (ua *usageAgent) HandleReceive(msg *channel.Message, _ channel.Channel) {
|
||||
decoder := json.NewDecoder(bytes.NewReader(msg.Body))
|
||||
for {
|
||||
event := make(map[string]interface{})
|
||||
err := decoder.Decode(&event)
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err == nil {
|
||||
logrus.Info(metrics.Ingest(event))
|
||||
} else {
|
||||
logrus.Errorf("error parsing '%v': %v", string(msg.Body), err)
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user