interim step for enhacing ZitiEventJsonSink to support reconnection; bridge and model adjustments (#351)

This commit is contained in:
Michael Quigley 2023-06-21 10:35:10 -04:00
parent af746b2382
commit aabf695bec
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
3 changed files with 44 additions and 15 deletions

View File

@ -31,28 +31,23 @@ func loadAmqpSinkConfig(v interface{}, _ *cf.Options) (interface{}, error) {
} }
type amqpSink struct { type amqpSink struct {
cfg *AmqpSinkConfig
conn *amqp.Connection conn *amqp.Connection
ch *amqp.Channel ch *amqp.Channel
queue amqp.Queue queue amqp.Queue
join chan struct{}
} }
func newAmqpSink(cfg *AmqpSinkConfig) (*amqpSink, error) { func newAmqpSink(cfg *AmqpSinkConfig) (*amqpSink, error) {
conn, err := amqp.Dial(cfg.Url) return &amqpSink{
if err != nil { cfg: cfg,
return nil, errors.Wrap(err, "error dialing amqp broker") join: make(chan struct{}),
} }, nil
}
ch, err := conn.Channel() func (s *amqpSink) Start() (join chan struct{}, err error) {
if err != nil { logrus.Info("started")
return nil, errors.Wrap(err, "error getting amqp channel") return s.join, nil
}
queue, err := ch.QueueDeclare(cfg.QueueName, true, false, false, false, nil)
if err != nil {
return nil, errors.Wrap(err, "error declaring queue")
}
return &amqpSink{conn, ch, queue}, nil
} }
func (s *amqpSink) Handle(event ZitiEventJson) error { func (s *amqpSink) Handle(event ZitiEventJson) error {
@ -64,3 +59,29 @@ func (s *amqpSink) Handle(event ZitiEventJson) error {
Body: []byte(event), Body: []byte(event),
}) })
} }
func (s *amqpSink) Stop() {
close(s.join)
logrus.Info("stopped")
}
func (s *amqpSink) connect() (err error) {
s.conn, err = amqp.Dial(s.cfg.Url)
if err != nil {
return errors.Wrap(err, "error dialing amqp broker")
}
s.ch, err = s.conn.Channel()
if err != nil {
return errors.Wrap(err, "error getting amqp channel")
}
s.queue, err = s.ch.QueueDeclare(s.cfg.QueueName, true, false, false, false, nil)
if err != nil {
return errors.Wrap(err, "error declaring queue")
}
logrus.Infof("connected to amqp broker at '%v'", s.cfg.Url)
return nil
}

View File

@ -14,6 +14,7 @@ type Bridge struct {
src ZitiEventJsonSource src ZitiEventJsonSource
srcJoin chan struct{} srcJoin chan struct{}
snk ZitiEventJsonSink snk ZitiEventJsonSink
snkJoin chan struct{}
events chan ZitiEventMsg events chan ZitiEventMsg
close chan struct{} close chan struct{}
join chan struct{} join chan struct{}
@ -39,6 +40,9 @@ func NewBridge(cfg *BridgeConfig) (*Bridge, error) {
} }
func (b *Bridge) Start() (join chan struct{}, err error) { func (b *Bridge) Start() (join chan struct{}, err error) {
if b.snkJoin, err = b.snk.Start(); err != nil {
return nil, err
}
if b.srcJoin, err = b.src.Start(b.events); err != nil { if b.srcJoin, err = b.src.Start(b.events); err != nil {
return nil, err return nil, err
} }
@ -72,7 +76,9 @@ func (b *Bridge) Start() (join chan struct{}, err error) {
func (b *Bridge) Stop() { func (b *Bridge) Stop() {
b.src.Stop() b.src.Stop()
b.snk.Stop()
close(b.close) close(b.close)
<-b.srcJoin <-b.srcJoin
<-b.snkJoin
<-b.join <-b.join
} }

View File

@ -79,5 +79,7 @@ type ZitiEventJsonSource interface {
} }
type ZitiEventJsonSink interface { type ZitiEventJsonSink interface {
Start() (join chan struct{}, err error)
Handle(event ZitiEventJson) error Handle(event ZitiEventJson) error
Stop()
} }