diff --git a/controller/metrics/amqpSource.go b/controller/metrics/amqpSource.go index 0d9a0ab6..c7cd66ac 100644 --- a/controller/metrics/amqpSource.go +++ b/controller/metrics/amqpSource.go @@ -30,17 +30,23 @@ func loadAmqpSourceConfig(v interface{}, _ *cf.Options) (interface{}, error) { } type amqpSource struct { - cfg *AmqpSourceConfig - conn *amqp.Connection - ch *amqp.Channel - queue amqp.Queue - msgs <-chan amqp.Delivery - errs chan *amqp.Error - join chan struct{} + cfg *AmqpSourceConfig + conn *amqp.Connection + ch *amqp.Channel + queue amqp.Queue + msgs <-chan amqp.Delivery + errs chan *amqp.Error + events chan ZitiEventMsg + close chan struct{} + join chan struct{} } func newAmqpSource(cfg *AmqpSourceConfig) (*amqpSource, error) { - as := &amqpSource{cfg: cfg, join: make(chan struct{})} + as := &amqpSource{ + cfg: cfg, + close: make(chan struct{}), + join: make(chan struct{}), + } if err := as.connect(); err != nil { return nil, err } @@ -48,45 +54,57 @@ func newAmqpSource(cfg *AmqpSourceConfig) (*amqpSource, error) { } func (s *amqpSource) Start(events chan ZitiEventMsg) (join chan struct{}, err error) { - go func() { - logrus.Info("started") - defer logrus.Info("stopped") - - reconnect := false - for { - if reconnect || s.errs == nil { - if err := s.reconnect(); err != nil { - logrus.Errorf("error reconnecting: %v", err) - continue - } - reconnect = false - } - select { - case event := <-s.msgs: - if event.Body != nil { - events <- &ZitiEventAMQP{ - data: ZitiEventJson(event.Body), - msg: event, - } - } - case err, ok := <-s.errs: - if err != nil || !ok { - logrus.Error(err) - reconnect = true - } - } - } - }() + s.events = events + go s.run() return s.join, nil } func (s *amqpSource) Stop() { - if s.ch != nil { - if err := s.ch.Close(); err != nil { - logrus.Error(err) + close(s.close) + <-s.join +} + +func (s *amqpSource) run() { + logrus.Info("started") + defer logrus.Info("stopped") + defer close(s.join) + +mainLoop: + for { + logrus.Infof("connecting to '%v'", s.cfg.Url) + if err := s.connect(); err != nil { + logrus.Errorf("error connecting to '%v': %v", s.cfg.Url, err) + select { + case <-time.After(10 * time.Second): + continue mainLoop + case <-s.close: + break mainLoop + } + } + logrus.Infof("connected to '%v'", s.cfg.Url) + + msgLoop: + for { + select { + case event := <-s.msgs: + if event.Body != nil { + s.events <- &ZitiEventAMQP{ + data: ZitiEventJson(event.Body), + msg: event, + } + } + + case err, ok := <-s.errs: + if err != nil || !ok { + logrus.Error(err) + break msgLoop + } + + case <-s.close: + break mainLoop + } } } - close(s.join) } func (s *amqpSource) connect() error { @@ -117,18 +135,5 @@ func (s *amqpSource) connect() error { s.queue = queue s.msgs = msgs - logrus.Infof("connected to '%v'", s.cfg.Url) - return nil } - -func (s *amqpSource) reconnect() error { - s.conn = nil - s.ch = nil - s.msgs = nil - s.errs = nil - - logrus.Infof("reconnecting; delay for reconnect") - time.Sleep(10 * time.Second) - return s.connect() -}