From acb3b663426030dc75c4df2051cc58efaf59ee8f Mon Sep 17 00:00:00 2001 From: Cam Otts Date: Mon, 1 May 2023 13:45:45 -0500 Subject: [PATCH] added ack for amqp messages --- controller/metrics/agent.go | 13 +++++++-- controller/metrics/amqpSource.go | 10 +++++-- controller/metrics/bridge.go | 9 +++--- controller/metrics/fileSource.go | 11 +++++--- controller/metrics/model.go | 40 +++++++++++++++++++++++++-- controller/metrics/websocketSource.go | 17 +++++++----- 6 files changed, 77 insertions(+), 23 deletions(-) diff --git a/controller/metrics/agent.go b/controller/metrics/agent.go index ce10c8fb..04a8eb93 100644 --- a/controller/metrics/agent.go +++ b/controller/metrics/agent.go @@ -7,7 +7,7 @@ import ( ) type Agent struct { - events chan ZitiEventJson + events chan ZitiEventMsg src ZitiEventJsonSource srcJoin chan struct{} cache *cache @@ -31,7 +31,7 @@ func (a *Agent) AddUsageSink(snk UsageSink) { } func (a *Agent) Start() error { - a.events = make(chan ZitiEventJson) + a.events = make(chan ZitiEventMsg) srcJoin, err := a.src.Start(a.events) if err != nil { return err @@ -44,15 +44,22 @@ func (a *Agent) Start() error { for { select { case event := <-a.events: - if usage, err := Ingest(event); err == nil { + if usage, err := Ingest(event.Data()); err == nil { if err := a.cache.addZrokDetail(usage); err != nil { logrus.Error(err) } + shouldAck := true for _, snk := range a.snks { if err := snk.Handle(usage); err != nil { logrus.Error(err) + if shouldAck { + shouldAck = false + } } } + if shouldAck { + event.Ack() + } } else { logrus.Error(err) } diff --git a/controller/metrics/amqpSource.go b/controller/metrics/amqpSource.go index 506a4984..e0815b82 100644 --- a/controller/metrics/amqpSource.go +++ b/controller/metrics/amqpSource.go @@ -52,7 +52,7 @@ func newAmqpSource(cfg *AmqpSourceConfig) (*amqpSource, error) { return nil, errors.Wrap(err, "error declaring queue") } - msgs, err := ch.Consume(cfg.QueueName, "zrok", true, false, false, false, nil) + msgs, err := ch.Consume(cfg.QueueName, "zrok", false, false, false, false, nil) if err != nil { return nil, errors.Wrap(err, "error consuming") } @@ -66,12 +66,16 @@ func newAmqpSource(cfg *AmqpSourceConfig) (*amqpSource, error) { }, nil } -func (s *amqpSource) Start(events chan ZitiEventJson) (join chan struct{}, err error) { +func (s *amqpSource) Start(events chan ZitiEventMsg) (join chan struct{}, err error) { go func() { logrus.Info("started") defer logrus.Info("stopped") for event := range s.msgs { - events <- ZitiEventJson(event.Body) + logrus.Info("Got event message") + events <- &ZitiEventAMQP{ + data: ZitiEventJson(event.Body), + msg: &event, + } } close(s.join) }() diff --git a/controller/metrics/bridge.go b/controller/metrics/bridge.go index 17e8d6e9..0348a8ef 100644 --- a/controller/metrics/bridge.go +++ b/controller/metrics/bridge.go @@ -14,14 +14,14 @@ type Bridge struct { src ZitiEventJsonSource srcJoin chan struct{} snk ZitiEventJsonSink - events chan ZitiEventJson + events chan ZitiEventMsg close chan struct{} join chan struct{} } func NewBridge(cfg *BridgeConfig) (*Bridge, error) { b := &Bridge{ - events: make(chan ZitiEventJson), + events: make(chan ZitiEventMsg), join: make(chan struct{}), close: make(chan struct{}), } @@ -53,11 +53,12 @@ func (b *Bridge) Start() (join chan struct{}, err error) { select { case eventJson := <-b.events: logrus.Info(eventJson) - if err := b.snk.Handle(eventJson); err == nil { - logrus.Infof("-> %v", eventJson) + if err := b.snk.Handle(eventJson.Data()); err == nil { + logrus.Infof("-> %v", eventJson.Data()) } else { logrus.Error(err) } + eventJson.Ack() case <-b.close: logrus.Info("received close signal") diff --git a/controller/metrics/fileSource.go b/controller/metrics/fileSource.go index ac7f86b8..402b2159 100644 --- a/controller/metrics/fileSource.go +++ b/controller/metrics/fileSource.go @@ -2,12 +2,13 @@ package metrics import ( "encoding/binary" + "os" + "github.com/michaelquigley/cf" "github.com/nxadm/tail" "github.com/openziti/zrok/controller/env" "github.com/pkg/errors" "github.com/sirupsen/logrus" - "os" ) func init() { @@ -36,7 +37,7 @@ type fileSource struct { t *tail.Tail } -func (s *fileSource) Start(events chan ZitiEventJson) (join chan struct{}, err error) { +func (s *fileSource) Start(events chan ZitiEventMsg) (join chan struct{}, err error) { f, err := os.Open(s.cfg.Path) if err != nil { return nil, errors.Wrapf(err, "error opening '%v'", s.cfg.Path) @@ -69,7 +70,7 @@ func (s *fileSource) Stop() { } } -func (s *fileSource) tail(ptr int64, events chan ZitiEventJson) { +func (s *fileSource) tail(ptr int64, events chan ZitiEventMsg) { logrus.Info("started") defer logrus.Info("stopped") @@ -85,7 +86,9 @@ func (s *fileSource) tail(ptr int64, events chan ZitiEventJson) { } for event := range s.t.Lines { - events <- ZitiEventJson(event.Text) + events <- &ZitiEventJsonMsg{ + data: ZitiEventJson(event.Text), + } if err := s.writePtr(event.SeekInfo.Offset); err != nil { logrus.Error(err) diff --git a/controller/metrics/model.go b/controller/metrics/model.go index bda74939..d32697c9 100644 --- a/controller/metrics/model.go +++ b/controller/metrics/model.go @@ -2,8 +2,11 @@ package metrics import ( "fmt" - "github.com/openziti/zrok/util" "time" + + "github.com/openziti/zrok/util" + "github.com/pkg/errors" + amqp "github.com/rabbitmq/amqp091-go" ) type Usage struct { @@ -41,8 +44,41 @@ type UsageSink interface { type ZitiEventJson string +type ZitiEventJsonMsg struct { + data ZitiEventJson +} + +func (e *ZitiEventJsonMsg) Data() ZitiEventJson { + return e.data +} + +func (e *ZitiEventJsonMsg) Ack() error { + return nil +} + +type ZitiEventAMQP struct { + data ZitiEventJson + msg *amqp.Delivery +} + +func (e *ZitiEventAMQP) Data() ZitiEventJson { + return e.data +} + +func (e *ZitiEventAMQP) Ack() error { + if e.msg != nil { + return errors.New("Nil delivery message") + } + return e.msg.Ack(false) +} + +type ZitiEventMsg interface { + Data() ZitiEventJson + Ack() error +} + type ZitiEventJsonSource interface { - Start(chan ZitiEventJson) (join chan struct{}, err error) + Start(chan ZitiEventMsg) (join chan struct{}, err error) Stop() } diff --git a/controller/metrics/websocketSource.go b/controller/metrics/websocketSource.go index 1eb7e4e5..c0838abf 100644 --- a/controller/metrics/websocketSource.go +++ b/controller/metrics/websocketSource.go @@ -4,6 +4,11 @@ import ( "crypto/tls" "crypto/x509" "encoding/json" + "io" + "net/http" + "net/url" + "time" + "github.com/gorilla/websocket" "github.com/michaelquigley/cf" "github.com/openziti/channel/v2" @@ -16,10 +21,6 @@ import ( "github.com/openziti/zrok/controller/env" "github.com/pkg/errors" "github.com/sirupsen/logrus" - "io" - "net/http" - "net/url" - "time" ) func init() { @@ -47,11 +48,11 @@ func loadWebsocketSourceConfig(v interface{}, _ *cf.Options) (interface{}, error type websocketSource struct { cfg *WebsocketSourceConfig ch channel.Channel - events chan ZitiEventJson + events chan ZitiEventMsg join chan struct{} } -func (s *websocketSource) Start(events chan ZitiEventJson) (join chan struct{}, err error) { +func (s *websocketSource) Start(events chan ZitiEventMsg) (join chan struct{}, err error) { caCerts, err := rest_util.GetControllerWellKnownCas(s.cfg.ApiEndpoint) if err != nil { return nil, err @@ -150,5 +151,7 @@ func (s *websocketSource) Stop() { } func (s *websocketSource) HandleReceive(msg *channel.Message, _ channel.Channel) { - s.events <- ZitiEventJson(msg.Body) + s.events <- &ZitiEventJsonMsg{ + data: ZitiEventJson(msg.Body), + } }