added ack for amqp messages

This commit is contained in:
Cam Otts
2023-05-01 13:45:45 -05:00
parent 0be4bbc513
commit acb3b66342
6 changed files with 77 additions and 23 deletions

View File

@ -4,6 +4,11 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"io"
"net/http"
"net/url"
"time"
"github.com/gorilla/websocket"
"github.com/michaelquigley/cf"
"github.com/openziti/channel/v2"
@ -16,10 +21,6 @@ import (
"github.com/openziti/zrok/controller/env"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"io"
"net/http"
"net/url"
"time"
)
func init() {
@ -47,11 +48,11 @@ func loadWebsocketSourceConfig(v interface{}, _ *cf.Options) (interface{}, error
type websocketSource struct {
cfg *WebsocketSourceConfig
ch channel.Channel
events chan ZitiEventJson
events chan ZitiEventMsg
join chan struct{}
}
func (s *websocketSource) Start(events chan ZitiEventJson) (join chan struct{}, err error) {
func (s *websocketSource) Start(events chan ZitiEventMsg) (join chan struct{}, err error) {
caCerts, err := rest_util.GetControllerWellKnownCas(s.cfg.ApiEndpoint)
if err != nil {
return nil, err
@ -150,5 +151,7 @@ func (s *websocketSource) Stop() {
}
func (s *websocketSource) HandleReceive(msg *channel.Message, _ channel.Channel) {
s.events <- ZitiEventJson(msg.Body)
s.events <- &ZitiEventJsonMsg{
data: ZitiEventJson(msg.Body),
}
}