Merge pull request #333 from openziti/v0.4.0_amqp_fix

V0.4.0 amqp fix
This commit is contained in:
Michael Quigley 2023-05-22 16:32:18 -04:00 committed by GitHub
commit d34e024b66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 5 additions and 7 deletions

View File

@ -58,7 +58,9 @@ func (a *Agent) Start() error {
} }
} }
if shouldAck { if shouldAck {
event.Ack() if err := event.Ack(); err != nil {
logrus.Error("unable to Ack message", err)
}
} }
} else { } else {
logrus.Error(err) logrus.Error(err)

View File

@ -73,7 +73,7 @@ func (s *amqpSource) Start(events chan ZitiEventMsg) (join chan struct{}, err er
for event := range s.msgs { for event := range s.msgs {
events <- &ZitiEventAMQP{ events <- &ZitiEventAMQP{
data: ZitiEventJson(event.Body), data: ZitiEventJson(event.Body),
msg: &event, msg: event,
} }
} }
close(s.join) close(s.join)

View File

@ -5,7 +5,6 @@ import (
"time" "time"
"github.com/openziti/zrok/util" "github.com/openziti/zrok/util"
"github.com/pkg/errors"
amqp "github.com/rabbitmq/amqp091-go" amqp "github.com/rabbitmq/amqp091-go"
) )
@ -58,7 +57,7 @@ func (e *ZitiEventJsonMsg) Ack() error {
type ZitiEventAMQP struct { type ZitiEventAMQP struct {
data ZitiEventJson data ZitiEventJson
msg *amqp.Delivery msg amqp.Delivery
} }
func (e *ZitiEventAMQP) Data() ZitiEventJson { func (e *ZitiEventAMQP) Data() ZitiEventJson {
@ -66,9 +65,6 @@ func (e *ZitiEventAMQP) Data() ZitiEventJson {
} }
func (e *ZitiEventAMQP) Ack() error { func (e *ZitiEventAMQP) Ack() error {
if e.msg != nil {
return errors.New("Nil delivery message")
}
return e.msg.Ack(false) return e.msg.Ack(false)
} }