From aabf695beca2178c62a43c5b202f32971487f7dd Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Wed, 21 Jun 2023 10:35:10 -0400 Subject: [PATCH 1/4] interim step for enhacing ZitiEventJsonSink to support reconnection; bridge and model adjustments (#351) --- controller/metrics/amqpSink.go | 51 ++++++++++++++++++++++++---------- controller/metrics/bridge.go | 6 ++++ controller/metrics/model.go | 2 ++ 3 files changed, 44 insertions(+), 15 deletions(-) diff --git a/controller/metrics/amqpSink.go b/controller/metrics/amqpSink.go index 520e86b7..c53f3174 100644 --- a/controller/metrics/amqpSink.go +++ b/controller/metrics/amqpSink.go @@ -31,28 +31,23 @@ func loadAmqpSinkConfig(v interface{}, _ *cf.Options) (interface{}, error) { } type amqpSink struct { + cfg *AmqpSinkConfig conn *amqp.Connection ch *amqp.Channel queue amqp.Queue + join chan struct{} } func newAmqpSink(cfg *AmqpSinkConfig) (*amqpSink, error) { - conn, err := amqp.Dial(cfg.Url) - if err != nil { - return nil, errors.Wrap(err, "error dialing amqp broker") - } + return &amqpSink{ + cfg: cfg, + join: make(chan struct{}), + }, nil +} - ch, err := conn.Channel() - if err != nil { - return nil, errors.Wrap(err, "error getting amqp channel") - } - - queue, err := ch.QueueDeclare(cfg.QueueName, true, false, false, false, nil) - if err != nil { - return nil, errors.Wrap(err, "error declaring queue") - } - - return &amqpSink{conn, ch, queue}, nil +func (s *amqpSink) Start() (join chan struct{}, err error) { + logrus.Info("started") + return s.join, nil } func (s *amqpSink) Handle(event ZitiEventJson) error { @@ -64,3 +59,29 @@ func (s *amqpSink) Handle(event ZitiEventJson) error { Body: []byte(event), }) } + +func (s *amqpSink) Stop() { + close(s.join) + logrus.Info("stopped") +} + +func (s *amqpSink) connect() (err error) { + s.conn, err = amqp.Dial(s.cfg.Url) + if err != nil { + return errors.Wrap(err, "error dialing amqp broker") + } + + s.ch, err = s.conn.Channel() + if err != nil { + return errors.Wrap(err, "error getting amqp channel") + } + + s.queue, err = s.ch.QueueDeclare(s.cfg.QueueName, true, false, false, false, nil) + if err != nil { + return errors.Wrap(err, "error declaring queue") + } + + logrus.Infof("connected to amqp broker at '%v'", s.cfg.Url) + + return nil +} diff --git a/controller/metrics/bridge.go b/controller/metrics/bridge.go index 0348a8ef..047cba45 100644 --- a/controller/metrics/bridge.go +++ b/controller/metrics/bridge.go @@ -14,6 +14,7 @@ type Bridge struct { src ZitiEventJsonSource srcJoin chan struct{} snk ZitiEventJsonSink + snkJoin chan struct{} events chan ZitiEventMsg close chan struct{} join chan struct{} @@ -39,6 +40,9 @@ func NewBridge(cfg *BridgeConfig) (*Bridge, error) { } func (b *Bridge) Start() (join chan struct{}, err error) { + if b.snkJoin, err = b.snk.Start(); err != nil { + return nil, err + } if b.srcJoin, err = b.src.Start(b.events); err != nil { return nil, err } @@ -72,7 +76,9 @@ func (b *Bridge) Start() (join chan struct{}, err error) { func (b *Bridge) Stop() { b.src.Stop() + b.snk.Stop() close(b.close) <-b.srcJoin + <-b.snkJoin <-b.join } diff --git a/controller/metrics/model.go b/controller/metrics/model.go index 4e62776a..3113d621 100644 --- a/controller/metrics/model.go +++ b/controller/metrics/model.go @@ -79,5 +79,7 @@ type ZitiEventJsonSource interface { } type ZitiEventJsonSink interface { + Start() (join chan struct{}, err error) Handle(event ZitiEventJson) error + Stop() } From 5a2f6a1f7243992adeeffdfebe95fbb2a40b2d5b Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Wed, 21 Jun 2023 11:33:43 -0400 Subject: [PATCH 2/4] simpler amqp sink approach (#351) --- controller/metrics/amqpSink.go | 46 ++++++++++++++++------------------ controller/metrics/bridge.go | 6 ----- controller/metrics/model.go | 2 -- 3 files changed, 21 insertions(+), 33 deletions(-) diff --git a/controller/metrics/amqpSink.go b/controller/metrics/amqpSink.go index c53f3174..419b9ff7 100644 --- a/controller/metrics/amqpSink.go +++ b/controller/metrics/amqpSink.go @@ -31,38 +31,39 @@ func loadAmqpSinkConfig(v interface{}, _ *cf.Options) (interface{}, error) { } type amqpSink struct { - cfg *AmqpSinkConfig - conn *amqp.Connection - ch *amqp.Channel - queue amqp.Queue - join chan struct{} + cfg *AmqpSinkConfig + conn *amqp.Connection + ch *amqp.Channel + queue amqp.Queue + connected bool } func newAmqpSink(cfg *AmqpSinkConfig) (*amqpSink, error) { - return &amqpSink{ - cfg: cfg, - join: make(chan struct{}), - }, nil -} - -func (s *amqpSink) Start() (join chan struct{}, err error) { - logrus.Info("started") - return s.join, nil + as := &amqpSink{ + cfg: cfg, + } + return as, nil } func (s *amqpSink) Handle(event ZitiEventJson) error { + if !s.connected { + if err := s.connect(); err != nil { + return err + } + logrus.Infof("connected to '%v'", s.cfg.Url) + s.connected = true + } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() logrus.Infof("pushing '%v'", event) - return s.ch.PublishWithContext(ctx, "", s.queue.Name, false, false, amqp.Publishing{ + err := s.ch.PublishWithContext(ctx, "", s.queue.Name, false, false, amqp.Publishing{ ContentType: "application/json", Body: []byte(event), }) -} - -func (s *amqpSink) Stop() { - close(s.join) - logrus.Info("stopped") + if err != nil { + s.connected = false + } + return err } func (s *amqpSink) connect() (err error) { @@ -70,18 +71,13 @@ func (s *amqpSink) connect() (err error) { if err != nil { return errors.Wrap(err, "error dialing amqp broker") } - s.ch, err = s.conn.Channel() if err != nil { return errors.Wrap(err, "error getting amqp channel") } - s.queue, err = s.ch.QueueDeclare(s.cfg.QueueName, true, false, false, false, nil) if err != nil { return errors.Wrap(err, "error declaring queue") } - - logrus.Infof("connected to amqp broker at '%v'", s.cfg.Url) - return nil } diff --git a/controller/metrics/bridge.go b/controller/metrics/bridge.go index 047cba45..0348a8ef 100644 --- a/controller/metrics/bridge.go +++ b/controller/metrics/bridge.go @@ -14,7 +14,6 @@ type Bridge struct { src ZitiEventJsonSource srcJoin chan struct{} snk ZitiEventJsonSink - snkJoin chan struct{} events chan ZitiEventMsg close chan struct{} join chan struct{} @@ -40,9 +39,6 @@ func NewBridge(cfg *BridgeConfig) (*Bridge, error) { } func (b *Bridge) Start() (join chan struct{}, err error) { - if b.snkJoin, err = b.snk.Start(); err != nil { - return nil, err - } if b.srcJoin, err = b.src.Start(b.events); err != nil { return nil, err } @@ -76,9 +72,7 @@ func (b *Bridge) Start() (join chan struct{}, err error) { func (b *Bridge) Stop() { b.src.Stop() - b.snk.Stop() close(b.close) <-b.srcJoin - <-b.snkJoin <-b.join } diff --git a/controller/metrics/model.go b/controller/metrics/model.go index 3113d621..4e62776a 100644 --- a/controller/metrics/model.go +++ b/controller/metrics/model.go @@ -79,7 +79,5 @@ type ZitiEventJsonSource interface { } type ZitiEventJsonSink interface { - Start() (join chan struct{}, err error) Handle(event ZitiEventJson) error - Stop() } From c2021980fb42654a2ac93e4548a21b0ad56f48a1 Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Wed, 21 Jun 2023 11:41:13 -0400 Subject: [PATCH 3/4] lint (#351) --- controller/metrics/amqpSink.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/controller/metrics/amqpSink.go b/controller/metrics/amqpSink.go index 419b9ff7..a903b154 100644 --- a/controller/metrics/amqpSink.go +++ b/controller/metrics/amqpSink.go @@ -39,9 +39,7 @@ type amqpSink struct { } func newAmqpSink(cfg *AmqpSinkConfig) (*amqpSink, error) { - as := &amqpSink{ - cfg: cfg, - } + as := &amqpSink{cfg: cfg} return as, nil } From d09140b14e5694e6a7f02e8d85dc6776e522d6fd Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Wed, 21 Jun 2023 11:43:14 -0400 Subject: [PATCH 4/4] error message improvements in amqpSink (#351) --- controller/metrics/amqpSink.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/controller/metrics/amqpSink.go b/controller/metrics/amqpSink.go index a903b154..336d85ed 100644 --- a/controller/metrics/amqpSink.go +++ b/controller/metrics/amqpSink.go @@ -67,15 +67,15 @@ func (s *amqpSink) Handle(event ZitiEventJson) error { func (s *amqpSink) connect() (err error) { s.conn, err = amqp.Dial(s.cfg.Url) if err != nil { - return errors.Wrap(err, "error dialing amqp broker") + return errors.Wrapf(err, "error dialing '%v'", s.cfg.Url) } s.ch, err = s.conn.Channel() if err != nil { - return errors.Wrap(err, "error getting amqp channel") + return errors.Wrapf(err, "error getting amqp channel from '%v'", s.cfg.Url) } s.queue, err = s.ch.QueueDeclare(s.cfg.QueueName, true, false, false, false, nil) if err != nil { - return errors.Wrap(err, "error declaring queue") + return errors.Wrapf(err, "error declaring queue '%v' with '%v'", s.cfg.QueueName, s.cfg.Url) } return nil }