From 459e0e60a1ddb49ebe475400055210221c83c95d Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Fri, 9 Jun 2023 11:56:58 -0400 Subject: [PATCH 1/2] rudimentary reconnection support in metrics.amqpSource (#344) --- controller/metrics/amqpSource.go | 107 ++++++++++++++++++++++--------- 1 file changed, 75 insertions(+), 32 deletions(-) diff --git a/controller/metrics/amqpSource.go b/controller/metrics/amqpSource.go index 62b4b317..b2e86a45 100644 --- a/controller/metrics/amqpSource.go +++ b/controller/metrics/amqpSource.go @@ -6,6 +6,7 @@ import ( "github.com/pkg/errors" amqp "github.com/rabbitmq/amqp091-go" "github.com/sirupsen/logrus" + "time" ) func init() { @@ -29,54 +30,52 @@ func loadAmqpSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) { } type amqpSource struct { + cfg *AmqpSourceConfig conn *amqp.Connection ch *amqp.Channel queue amqp.Queue msgs <-chan amqp.Delivery + errs chan *amqp.Error join chan struct{} } func newAmqpSource(cfg *AmqpSourceConfig) (*amqpSource, error) { - conn, err := amqp.Dial(cfg.Url) - if err != nil { - return nil, errors.Wrap(err, "error dialing amqp broker") + as := &amqpSource{cfg: cfg, join: make(chan struct{})} + if err := as.connect(); err != nil { + return nil, err } - - ch, err := conn.Channel() - if err != nil { - return nil, errors.Wrap(err, "error getting amqp channel") - } - - queue, err := ch.QueueDeclare(cfg.QueueName, true, false, false, false, nil) - if err != nil { - return nil, errors.Wrap(err, "error declaring queue") - } - - msgs, err := ch.Consume(cfg.QueueName, "zrok", false, false, false, false, nil) - if err != nil { - return nil, errors.Wrap(err, "error consuming") - } - - return &amqpSource{ - conn, - ch, - queue, - msgs, - make(chan struct{}), - }, nil + return as, nil } func (s *amqpSource) Start(events chan ZitiEventMsg) (join chan struct{}, err error) { go func() { logrus.Info("started") defer logrus.Info("stopped") - for event := range s.msgs { - events <- &ZitiEventAMQP{ - data: ZitiEventJson(event.Body), - msg: event, + + reconnect := false + for { + if reconnect { + if err := s.reconnect(); err != nil { + logrus.Errorf("error reconnecting: %v", err) + continue + } + reconnect = false + } + select { + case event := <-s.msgs: + if event.Body != nil { + events <- &ZitiEventAMQP{ + data: ZitiEventJson(event.Body), + msg: event, + } + } + case err := <-s.errs: + if err != nil { + logrus.Error(err) + reconnect = true + } } } - close(s.join) }() return s.join, nil } @@ -85,5 +84,49 @@ func (s *amqpSource) Stop() { if err := s.ch.Close(); err != nil { logrus.Error(err) } - <-s.join + close(s.join) +} + +func (s *amqpSource) connect() error { + conn, err := amqp.Dial(s.cfg.Url) + if err != nil { + return errors.Wrap(err, "error dialing amqp broker") + } + + ch, err := conn.Channel() + if err != nil { + return errors.Wrap(err, "error getting amqp channel") + } + + queue, err := ch.QueueDeclare(s.cfg.QueueName, true, false, false, false, nil) + if err != nil { + return errors.Wrap(err, "error declaring queue") + } + + msgs, err := ch.Consume(s.cfg.QueueName, "zrok", false, false, false, false, nil) + if err != nil { + return errors.Wrap(err, "error consuming") + } + + s.errs = make(chan *amqp.Error) + conn.NotifyClose(s.errs) + s.conn = conn + s.ch = ch + s.queue = queue + s.msgs = msgs + + logrus.Infof("connected to '%v'", s.cfg.Url) + + return nil +} + +func (s *amqpSource) reconnect() error { + s.conn = nil + s.ch = nil + s.msgs = nil + s.errs = nil + + logrus.Infof("reconnecting; delay for reconnect") + time.Sleep(10 * time.Second) + return s.connect() } From 368ddf5f01a43c82370c8c2250a4ae6822868027 Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Fri, 9 Jun 2023 12:05:02 -0400 Subject: [PATCH 2/2] fix for controller shutdown when amqpSource is in reconnecting state (#344) --- controller/metrics/amqpSource.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/controller/metrics/amqpSource.go b/controller/metrics/amqpSource.go index b2e86a45..54681602 100644 --- a/controller/metrics/amqpSource.go +++ b/controller/metrics/amqpSource.go @@ -81,8 +81,10 @@ func (s *amqpSource) Start(events chan ZitiEventMsg) (join chan struct{}, err er } func (s *amqpSource) Stop() { - if err := s.ch.Close(); err != nil { - logrus.Error(err) + if s.ch != nil { + if err := s.ch.Close(); err != nil { + logrus.Error(err) + } } close(s.join) }