mirror of
https://github.com/openziti/zrok.git
synced 2024-11-29 03:24:15 +01:00
Merge pull request #318 from openziti/amqp_ack
added ack for amqp messages
This commit is contained in:
commit
04a541df8f
@ -7,7 +7,7 @@ import (
|
||||
)
|
||||
|
||||
type Agent struct {
|
||||
events chan ZitiEventJson
|
||||
events chan ZitiEventMsg
|
||||
src ZitiEventJsonSource
|
||||
srcJoin chan struct{}
|
||||
cache *cache
|
||||
@ -31,7 +31,7 @@ func (a *Agent) AddUsageSink(snk UsageSink) {
|
||||
}
|
||||
|
||||
func (a *Agent) Start() error {
|
||||
a.events = make(chan ZitiEventJson)
|
||||
a.events = make(chan ZitiEventMsg)
|
||||
srcJoin, err := a.src.Start(a.events)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -44,15 +44,22 @@ func (a *Agent) Start() error {
|
||||
for {
|
||||
select {
|
||||
case event := <-a.events:
|
||||
if usage, err := Ingest(event); err == nil {
|
||||
if usage, err := Ingest(event.Data()); err == nil {
|
||||
if err := a.cache.addZrokDetail(usage); err != nil {
|
||||
logrus.Error(err)
|
||||
}
|
||||
shouldAck := true
|
||||
for _, snk := range a.snks {
|
||||
if err := snk.Handle(usage); err != nil {
|
||||
logrus.Error(err)
|
||||
if shouldAck {
|
||||
shouldAck = false
|
||||
}
|
||||
}
|
||||
}
|
||||
if shouldAck {
|
||||
event.Ack()
|
||||
}
|
||||
} else {
|
||||
logrus.Error(err)
|
||||
}
|
||||
|
@ -52,7 +52,7 @@ func newAmqpSource(cfg *AmqpSourceConfig) (*amqpSource, error) {
|
||||
return nil, errors.Wrap(err, "error declaring queue")
|
||||
}
|
||||
|
||||
msgs, err := ch.Consume(cfg.QueueName, "zrok", true, false, false, false, nil)
|
||||
msgs, err := ch.Consume(cfg.QueueName, "zrok", false, false, false, false, nil)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error consuming")
|
||||
}
|
||||
@ -66,12 +66,15 @@ func newAmqpSource(cfg *AmqpSourceConfig) (*amqpSource, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *amqpSource) Start(events chan ZitiEventJson) (join chan struct{}, err error) {
|
||||
func (s *amqpSource) Start(events chan ZitiEventMsg) (join chan struct{}, err error) {
|
||||
go func() {
|
||||
logrus.Info("started")
|
||||
defer logrus.Info("stopped")
|
||||
for event := range s.msgs {
|
||||
events <- ZitiEventJson(event.Body)
|
||||
events <- &ZitiEventAMQP{
|
||||
data: ZitiEventJson(event.Body),
|
||||
msg: &event,
|
||||
}
|
||||
}
|
||||
close(s.join)
|
||||
}()
|
||||
|
@ -14,14 +14,14 @@ type Bridge struct {
|
||||
src ZitiEventJsonSource
|
||||
srcJoin chan struct{}
|
||||
snk ZitiEventJsonSink
|
||||
events chan ZitiEventJson
|
||||
events chan ZitiEventMsg
|
||||
close chan struct{}
|
||||
join chan struct{}
|
||||
}
|
||||
|
||||
func NewBridge(cfg *BridgeConfig) (*Bridge, error) {
|
||||
b := &Bridge{
|
||||
events: make(chan ZitiEventJson),
|
||||
events: make(chan ZitiEventMsg),
|
||||
join: make(chan struct{}),
|
||||
close: make(chan struct{}),
|
||||
}
|
||||
@ -53,11 +53,12 @@ func (b *Bridge) Start() (join chan struct{}, err error) {
|
||||
select {
|
||||
case eventJson := <-b.events:
|
||||
logrus.Info(eventJson)
|
||||
if err := b.snk.Handle(eventJson); err == nil {
|
||||
logrus.Infof("-> %v", eventJson)
|
||||
if err := b.snk.Handle(eventJson.Data()); err == nil {
|
||||
logrus.Infof("-> %v", eventJson.Data())
|
||||
} else {
|
||||
logrus.Error(err)
|
||||
}
|
||||
eventJson.Ack()
|
||||
|
||||
case <-b.close:
|
||||
logrus.Info("received close signal")
|
||||
|
@ -2,12 +2,13 @@ package metrics
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"os"
|
||||
|
||||
"github.com/michaelquigley/cf"
|
||||
"github.com/nxadm/tail"
|
||||
"github.com/openziti/zrok/controller/env"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"os"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -36,7 +37,7 @@ type fileSource struct {
|
||||
t *tail.Tail
|
||||
}
|
||||
|
||||
func (s *fileSource) Start(events chan ZitiEventJson) (join chan struct{}, err error) {
|
||||
func (s *fileSource) Start(events chan ZitiEventMsg) (join chan struct{}, err error) {
|
||||
f, err := os.Open(s.cfg.Path)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "error opening '%v'", s.cfg.Path)
|
||||
@ -69,7 +70,7 @@ func (s *fileSource) Stop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *fileSource) tail(ptr int64, events chan ZitiEventJson) {
|
||||
func (s *fileSource) tail(ptr int64, events chan ZitiEventMsg) {
|
||||
logrus.Info("started")
|
||||
defer logrus.Info("stopped")
|
||||
|
||||
@ -85,7 +86,9 @@ func (s *fileSource) tail(ptr int64, events chan ZitiEventJson) {
|
||||
}
|
||||
|
||||
for event := range s.t.Lines {
|
||||
events <- ZitiEventJson(event.Text)
|
||||
events <- &ZitiEventJsonMsg{
|
||||
data: ZitiEventJson(event.Text),
|
||||
}
|
||||
|
||||
if err := s.writePtr(event.SeekInfo.Offset); err != nil {
|
||||
logrus.Error(err)
|
||||
|
@ -2,8 +2,11 @@ package metrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/openziti/zrok/util"
|
||||
"time"
|
||||
|
||||
"github.com/openziti/zrok/util"
|
||||
"github.com/pkg/errors"
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
type Usage struct {
|
||||
@ -41,8 +44,41 @@ type UsageSink interface {
|
||||
|
||||
type ZitiEventJson string
|
||||
|
||||
type ZitiEventJsonMsg struct {
|
||||
data ZitiEventJson
|
||||
}
|
||||
|
||||
func (e *ZitiEventJsonMsg) Data() ZitiEventJson {
|
||||
return e.data
|
||||
}
|
||||
|
||||
func (e *ZitiEventJsonMsg) Ack() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type ZitiEventAMQP struct {
|
||||
data ZitiEventJson
|
||||
msg *amqp.Delivery
|
||||
}
|
||||
|
||||
func (e *ZitiEventAMQP) Data() ZitiEventJson {
|
||||
return e.data
|
||||
}
|
||||
|
||||
func (e *ZitiEventAMQP) Ack() error {
|
||||
if e.msg != nil {
|
||||
return errors.New("Nil delivery message")
|
||||
}
|
||||
return e.msg.Ack(false)
|
||||
}
|
||||
|
||||
type ZitiEventMsg interface {
|
||||
Data() ZitiEventJson
|
||||
Ack() error
|
||||
}
|
||||
|
||||
type ZitiEventJsonSource interface {
|
||||
Start(chan ZitiEventJson) (join chan struct{}, err error)
|
||||
Start(chan ZitiEventMsg) (join chan struct{}, err error)
|
||||
Stop()
|
||||
}
|
||||
|
||||
|
@ -4,6 +4,11 @@ import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/michaelquigley/cf"
|
||||
"github.com/openziti/channel/v2"
|
||||
@ -16,10 +21,6 @@ import (
|
||||
"github.com/openziti/zrok/controller/env"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -47,11 +48,11 @@ func loadWebsocketSourceConfig(v interface{}, _ *cf.Options) (interface{}, error
|
||||
type websocketSource struct {
|
||||
cfg *WebsocketSourceConfig
|
||||
ch channel.Channel
|
||||
events chan ZitiEventJson
|
||||
events chan ZitiEventMsg
|
||||
join chan struct{}
|
||||
}
|
||||
|
||||
func (s *websocketSource) Start(events chan ZitiEventJson) (join chan struct{}, err error) {
|
||||
func (s *websocketSource) Start(events chan ZitiEventMsg) (join chan struct{}, err error) {
|
||||
caCerts, err := rest_util.GetControllerWellKnownCas(s.cfg.ApiEndpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -150,5 +151,7 @@ func (s *websocketSource) Stop() {
|
||||
}
|
||||
|
||||
func (s *websocketSource) HandleReceive(msg *channel.Message, _ channel.Channel) {
|
||||
s.events <- ZitiEventJson(msg.Body)
|
||||
s.events <- &ZitiEventJsonMsg{
|
||||
data: ZitiEventJson(msg.Body),
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user