diff --git a/controller/metrics/amqpSink.go b/controller/metrics/amqpSink.go index 520e86b7..336d85ed 100644 --- a/controller/metrics/amqpSink.go +++ b/controller/metrics/amqpSink.go @@ -31,36 +31,51 @@ func loadAmqpSinkConfig(v interface{}, _ *cf.Options) (interface{}, error) { } type amqpSink struct { - conn *amqp.Connection - ch *amqp.Channel - queue amqp.Queue + cfg *AmqpSinkConfig + conn *amqp.Connection + ch *amqp.Channel + queue amqp.Queue + connected bool } func newAmqpSink(cfg *AmqpSinkConfig) (*amqpSink, error) { - conn, err := amqp.Dial(cfg.Url) - if err != nil { - return nil, errors.Wrap(err, "error dialing amqp broker") - } - - ch, err := conn.Channel() - if err != nil { - return nil, errors.Wrap(err, "error getting amqp channel") - } - - queue, err := ch.QueueDeclare(cfg.QueueName, true, false, false, false, nil) - if err != nil { - return nil, errors.Wrap(err, "error declaring queue") - } - - return &amqpSink{conn, ch, queue}, nil + as := &amqpSink{cfg: cfg} + return as, nil } func (s *amqpSink) Handle(event ZitiEventJson) error { + if !s.connected { + if err := s.connect(); err != nil { + return err + } + logrus.Infof("connected to '%v'", s.cfg.Url) + s.connected = true + } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() logrus.Infof("pushing '%v'", event) - return s.ch.PublishWithContext(ctx, "", s.queue.Name, false, false, amqp.Publishing{ + err := s.ch.PublishWithContext(ctx, "", s.queue.Name, false, false, amqp.Publishing{ ContentType: "application/json", Body: []byte(event), }) + if err != nil { + s.connected = false + } + return err +} + +func (s *amqpSink) connect() (err error) { + s.conn, err = amqp.Dial(s.cfg.Url) + if err != nil { + return errors.Wrapf(err, "error dialing '%v'", s.cfg.Url) + } + s.ch, err = s.conn.Channel() + if err != nil { + return errors.Wrapf(err, "error getting amqp channel from '%v'", s.cfg.Url) + } + s.queue, err = s.ch.QueueDeclare(s.cfg.QueueName, true, false, false, false, nil) + if err != nil { + return errors.Wrapf(err, "error declaring queue '%v' with '%v'", s.cfg.QueueName, s.cfg.Url) + } + return nil }