mirror of
https://github.com/openziti/zrok.git
synced 2025-06-25 04:02:15 +02:00
Merge pull request #354 from openziti/v0.4.0_amqp_sink_reconnect
ampqSink Reconnection (#351)
This commit is contained in:
commit
688c9d74c2
@ -31,36 +31,51 @@ func loadAmqpSinkConfig(v interface{}, _ *cf.Options) (interface{}, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type amqpSink struct {
|
type amqpSink struct {
|
||||||
|
cfg *AmqpSinkConfig
|
||||||
conn *amqp.Connection
|
conn *amqp.Connection
|
||||||
ch *amqp.Channel
|
ch *amqp.Channel
|
||||||
queue amqp.Queue
|
queue amqp.Queue
|
||||||
|
connected bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newAmqpSink(cfg *AmqpSinkConfig) (*amqpSink, error) {
|
func newAmqpSink(cfg *AmqpSinkConfig) (*amqpSink, error) {
|
||||||
conn, err := amqp.Dial(cfg.Url)
|
as := &amqpSink{cfg: cfg}
|
||||||
if err != nil {
|
return as, nil
|
||||||
return nil, errors.Wrap(err, "error dialing amqp broker")
|
|
||||||
}
|
|
||||||
|
|
||||||
ch, err := conn.Channel()
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "error getting amqp channel")
|
|
||||||
}
|
|
||||||
|
|
||||||
queue, err := ch.QueueDeclare(cfg.QueueName, true, false, false, false, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "error declaring queue")
|
|
||||||
}
|
|
||||||
|
|
||||||
return &amqpSink{conn, ch, queue}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *amqpSink) Handle(event ZitiEventJson) error {
|
func (s *amqpSink) Handle(event ZitiEventJson) error {
|
||||||
|
if !s.connected {
|
||||||
|
if err := s.connect(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
logrus.Infof("connected to '%v'", s.cfg.Url)
|
||||||
|
s.connected = true
|
||||||
|
}
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
logrus.Infof("pushing '%v'", event)
|
logrus.Infof("pushing '%v'", event)
|
||||||
return s.ch.PublishWithContext(ctx, "", s.queue.Name, false, false, amqp.Publishing{
|
err := s.ch.PublishWithContext(ctx, "", s.queue.Name, false, false, amqp.Publishing{
|
||||||
ContentType: "application/json",
|
ContentType: "application/json",
|
||||||
Body: []byte(event),
|
Body: []byte(event),
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
s.connected = false
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *amqpSink) connect() (err error) {
|
||||||
|
s.conn, err = amqp.Dial(s.cfg.Url)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "error dialing '%v'", s.cfg.Url)
|
||||||
|
}
|
||||||
|
s.ch, err = s.conn.Channel()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "error getting amqp channel from '%v'", s.cfg.Url)
|
||||||
|
}
|
||||||
|
s.queue, err = s.ch.QueueDeclare(s.cfg.QueueName, true, false, false, false, nil)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "error declaring queue '%v' with '%v'", s.cfg.QueueName, s.cfg.Url)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user