diff --git a/controller/metrics/amqpSink.go b/controller/metrics/amqpSink.go index 520e86b7..c53f3174 100644 --- a/controller/metrics/amqpSink.go +++ b/controller/metrics/amqpSink.go @@ -31,28 +31,23 @@ func loadAmqpSinkConfig(v interface{}, _ *cf.Options) (interface{}, error) { } type amqpSink struct { + cfg *AmqpSinkConfig conn *amqp.Connection ch *amqp.Channel queue amqp.Queue + join chan struct{} } func newAmqpSink(cfg *AmqpSinkConfig) (*amqpSink, error) { - conn, err := amqp.Dial(cfg.Url) - if err != nil { - return nil, errors.Wrap(err, "error dialing amqp broker") - } + return &amqpSink{ + cfg: cfg, + join: make(chan struct{}), + }, nil +} - ch, err := conn.Channel() - if err != nil { - return nil, errors.Wrap(err, "error getting amqp channel") - } - - queue, err := ch.QueueDeclare(cfg.QueueName, true, false, false, false, nil) - if err != nil { - return nil, errors.Wrap(err, "error declaring queue") - } - - return &amqpSink{conn, ch, queue}, nil +func (s *amqpSink) Start() (join chan struct{}, err error) { + logrus.Info("started") + return s.join, nil } func (s *amqpSink) Handle(event ZitiEventJson) error { @@ -64,3 +59,29 @@ func (s *amqpSink) Handle(event ZitiEventJson) error { Body: []byte(event), }) } + +func (s *amqpSink) Stop() { + close(s.join) + logrus.Info("stopped") +} + +func (s *amqpSink) connect() (err error) { + s.conn, err = amqp.Dial(s.cfg.Url) + if err != nil { + return errors.Wrap(err, "error dialing amqp broker") + } + + s.ch, err = s.conn.Channel() + if err != nil { + return errors.Wrap(err, "error getting amqp channel") + } + + s.queue, err = s.ch.QueueDeclare(s.cfg.QueueName, true, false, false, false, nil) + if err != nil { + return errors.Wrap(err, "error declaring queue") + } + + logrus.Infof("connected to amqp broker at '%v'", s.cfg.Url) + + return nil +} diff --git a/controller/metrics/bridge.go b/controller/metrics/bridge.go index 0348a8ef..047cba45 100644 --- a/controller/metrics/bridge.go +++ b/controller/metrics/bridge.go @@ -14,6 +14,7 @@ type Bridge struct { src ZitiEventJsonSource srcJoin chan struct{} snk ZitiEventJsonSink + snkJoin chan struct{} events chan ZitiEventMsg close chan struct{} join chan struct{} @@ -39,6 +40,9 @@ func NewBridge(cfg *BridgeConfig) (*Bridge, error) { } func (b *Bridge) Start() (join chan struct{}, err error) { + if b.snkJoin, err = b.snk.Start(); err != nil { + return nil, err + } if b.srcJoin, err = b.src.Start(b.events); err != nil { return nil, err } @@ -72,7 +76,9 @@ func (b *Bridge) Start() (join chan struct{}, err error) { func (b *Bridge) Stop() { b.src.Stop() + b.snk.Stop() close(b.close) <-b.srcJoin + <-b.snkJoin <-b.join } diff --git a/controller/metrics/model.go b/controller/metrics/model.go index 4e62776a..3113d621 100644 --- a/controller/metrics/model.go +++ b/controller/metrics/model.go @@ -79,5 +79,7 @@ type ZitiEventJsonSource interface { } type ZitiEventJsonSink interface { + Start() (join chan struct{}, err error) Handle(event ZitiEventJson) error + Stop() }