third attempt at amqp reconnect in amqpSource (#344)

This commit is contained in:
Michael Quigley 2023-06-15 12:51:39 -04:00
parent 2f604d3e71
commit dc3648f73d
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62

View File

@ -36,11 +36,17 @@ type amqpSource struct {
queue amqp.Queue queue amqp.Queue
msgs <-chan amqp.Delivery msgs <-chan amqp.Delivery
errs chan *amqp.Error errs chan *amqp.Error
events chan ZitiEventMsg
close chan struct{}
join chan struct{} join chan struct{}
} }
func newAmqpSource(cfg *AmqpSourceConfig) (*amqpSource, error) { func newAmqpSource(cfg *AmqpSourceConfig) (*amqpSource, error) {
as := &amqpSource{cfg: cfg, join: make(chan struct{})} as := &amqpSource{
cfg: cfg,
close: make(chan struct{}),
join: make(chan struct{}),
}
if err := as.connect(); err != nil { if err := as.connect(); err != nil {
return nil, err return nil, err
} }
@ -48,45 +54,57 @@ func newAmqpSource(cfg *AmqpSourceConfig) (*amqpSource, error) {
} }
func (s *amqpSource) Start(events chan ZitiEventMsg) (join chan struct{}, err error) { func (s *amqpSource) Start(events chan ZitiEventMsg) (join chan struct{}, err error) {
go func() { s.events = events
logrus.Info("started") go s.run()
defer logrus.Info("stopped")
reconnect := false
for {
if reconnect || s.errs == nil {
if err := s.reconnect(); err != nil {
logrus.Errorf("error reconnecting: %v", err)
continue
}
reconnect = false
}
select {
case event := <-s.msgs:
if event.Body != nil {
events <- &ZitiEventAMQP{
data: ZitiEventJson(event.Body),
msg: event,
}
}
case err, ok := <-s.errs:
if err != nil || !ok {
logrus.Error(err)
reconnect = true
}
}
}
}()
return s.join, nil return s.join, nil
} }
func (s *amqpSource) Stop() { func (s *amqpSource) Stop() {
if s.ch != nil { close(s.close)
if err := s.ch.Close(); err != nil { <-s.join
}
func (s *amqpSource) run() {
logrus.Info("started")
defer logrus.Info("stopped")
defer close(s.join)
mainLoop:
for {
logrus.Infof("connecting to '%v'", s.cfg.Url)
if err := s.connect(); err != nil {
logrus.Errorf("error connecting to '%v': %v", s.cfg.Url, err)
select {
case <-time.After(10 * time.Second):
continue mainLoop
case <-s.close:
break mainLoop
}
}
logrus.Infof("connected to '%v'", s.cfg.Url)
msgLoop:
for {
select {
case event := <-s.msgs:
if event.Body != nil {
s.events <- &ZitiEventAMQP{
data: ZitiEventJson(event.Body),
msg: event,
}
}
case err, ok := <-s.errs:
if err != nil || !ok {
logrus.Error(err) logrus.Error(err)
break msgLoop
}
case <-s.close:
break mainLoop
}
} }
} }
close(s.join)
} }
func (s *amqpSource) connect() error { func (s *amqpSource) connect() error {
@ -117,18 +135,5 @@ func (s *amqpSource) connect() error {
s.queue = queue s.queue = queue
s.msgs = msgs s.msgs = msgs
logrus.Infof("connected to '%v'", s.cfg.Url)
return nil return nil
} }
func (s *amqpSource) reconnect() error {
s.conn = nil
s.ch = nil
s.msgs = nil
s.errs = nil
logrus.Infof("reconnecting; delay for reconnect")
time.Sleep(10 * time.Second)
return s.connect()
}