simpler amqp sink approach (#351)

This commit is contained in:
Michael Quigley 2023-06-21 11:33:43 -04:00
parent aabf695bec
commit 5a2f6a1f72
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
3 changed files with 21 additions and 33 deletions

View File

@ -35,34 +35,35 @@ type amqpSink struct {
conn *amqp.Connection
ch *amqp.Channel
queue amqp.Queue
join chan struct{}
connected bool
}
func newAmqpSink(cfg *AmqpSinkConfig) (*amqpSink, error) {
return &amqpSink{
as := &amqpSink{
cfg: cfg,
join: make(chan struct{}),
}, nil
}
func (s *amqpSink) Start() (join chan struct{}, err error) {
logrus.Info("started")
return s.join, nil
}
return as, nil
}
func (s *amqpSink) Handle(event ZitiEventJson) error {
if !s.connected {
if err := s.connect(); err != nil {
return err
}
logrus.Infof("connected to '%v'", s.cfg.Url)
s.connected = true
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
logrus.Infof("pushing '%v'", event)
return s.ch.PublishWithContext(ctx, "", s.queue.Name, false, false, amqp.Publishing{
err := s.ch.PublishWithContext(ctx, "", s.queue.Name, false, false, amqp.Publishing{
ContentType: "application/json",
Body: []byte(event),
})
}
func (s *amqpSink) Stop() {
close(s.join)
logrus.Info("stopped")
if err != nil {
s.connected = false
}
return err
}
func (s *amqpSink) connect() (err error) {
@ -70,18 +71,13 @@ func (s *amqpSink) connect() (err error) {
if err != nil {
return errors.Wrap(err, "error dialing amqp broker")
}
s.ch, err = s.conn.Channel()
if err != nil {
return errors.Wrap(err, "error getting amqp channel")
}
s.queue, err = s.ch.QueueDeclare(s.cfg.QueueName, true, false, false, false, nil)
if err != nil {
return errors.Wrap(err, "error declaring queue")
}
logrus.Infof("connected to amqp broker at '%v'", s.cfg.Url)
return nil
}

View File

@ -14,7 +14,6 @@ type Bridge struct {
src ZitiEventJsonSource
srcJoin chan struct{}
snk ZitiEventJsonSink
snkJoin chan struct{}
events chan ZitiEventMsg
close chan struct{}
join chan struct{}
@ -40,9 +39,6 @@ func NewBridge(cfg *BridgeConfig) (*Bridge, error) {
}
func (b *Bridge) Start() (join chan struct{}, err error) {
if b.snkJoin, err = b.snk.Start(); err != nil {
return nil, err
}
if b.srcJoin, err = b.src.Start(b.events); err != nil {
return nil, err
}
@ -76,9 +72,7 @@ func (b *Bridge) Start() (join chan struct{}, err error) {
func (b *Bridge) Stop() {
b.src.Stop()
b.snk.Stop()
close(b.close)
<-b.srcJoin
<-b.snkJoin
<-b.join
}

View File

@ -79,7 +79,5 @@ type ZitiEventJsonSource interface {
}
type ZitiEventJsonSink interface {
Start() (join chan struct{}, err error)
Handle(event ZitiEventJson) error
Stop()
}