This commit is contained in:
Cam Otts 2023-05-22 15:23:55 -05:00
parent de02cea5f3
commit 6a572c9977
No known key found for this signature in database
GPG Key ID: 367B7C7EBD84A8BD
3 changed files with 5 additions and 7 deletions

View File

@ -58,7 +58,9 @@ func (a *Agent) Start() error {
}
}
if shouldAck {
event.Ack()
if err := event.Ack(); err != nil {
logrus.Error("unable to Ack message", err)
}
}
} else {
logrus.Error(err)

View File

@ -73,7 +73,7 @@ func (s *amqpSource) Start(events chan ZitiEventMsg) (join chan struct{}, err er
for event := range s.msgs {
events <- &ZitiEventAMQP{
data: ZitiEventJson(event.Body),
msg: &event,
msg: event,
}
}
close(s.join)

View File

@ -5,7 +5,6 @@ import (
"time"
"github.com/openziti/zrok/util"
"github.com/pkg/errors"
amqp "github.com/rabbitmq/amqp091-go"
)
@ -58,7 +57,7 @@ func (e *ZitiEventJsonMsg) Ack() error {
type ZitiEventAMQP struct {
data ZitiEventJson
msg *amqp.Delivery
msg amqp.Delivery
}
func (e *ZitiEventAMQP) Data() ZitiEventJson {
@ -66,9 +65,6 @@ func (e *ZitiEventAMQP) Data() ZitiEventJson {
}
func (e *ZitiEventAMQP) Ack() error {
if e.msg != nil {
return errors.New("Nil delivery message")
}
return e.msg.Ack(false)
}