From 9ca7dfb102e5408a3b47dcc95ed19a4d930b37b8 Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Tue, 14 Mar 2023 16:23:34 -0400 Subject: [PATCH] amqp sender (#270) --- controller/metrics/amqpSender.go | 51 ++++++++++++++++++++++++++++++++ go.mod | 2 +- go.sum | 7 +++-- 3 files changed, 57 insertions(+), 3 deletions(-) create mode 100644 controller/metrics/amqpSender.go diff --git a/controller/metrics/amqpSender.go b/controller/metrics/amqpSender.go new file mode 100644 index 00000000..332d5328 --- /dev/null +++ b/controller/metrics/amqpSender.go @@ -0,0 +1,51 @@ +package metrics + +import ( + "context" + "github.com/pkg/errors" + amqp "github.com/rabbitmq/amqp091-go" + "time" +) + +type AmqpSenderConfig struct { + Url string `cf:"+secret"` + Queue string +} + +type AmqpSender struct { + conn *amqp.Connection + ch *amqp.Channel + queue amqp.Queue +} + +func NewAmqpSender(cfg *AmqpSenderConfig) (*AmqpSender, error) { + conn, err := amqp.Dial(cfg.Url) + if err != nil { + return nil, errors.Wrap(err, "error dialing amqp broker") + } + + ch, err := conn.Channel() + if err != nil { + return nil, errors.Wrap(err, "error getting channel from amqp connection") + } + + queue, err := ch.QueueDeclare(cfg.Queue, true, false, false, false, nil) + if err != nil { + return nil, errors.Wrap(err, "error creating amqp queue") + } + + return &AmqpSender{conn, ch, queue}, nil +} + +func (s *AmqpSender) Send(json string) error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err := s.ch.PublishWithContext(ctx, "", s.queue.Name, false, false, amqp.Publishing{ + ContentType: "application/json", + Body: []byte(json), + }) + if err != nil { + return errors.Wrap(err, "error sending") + } + return nil +} diff --git a/go.mod b/go.mod index f1a3ed36..7a4cf38c 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( github.com/openziti/identity v1.0.37 github.com/openziti/sdk-golang v0.18.61 github.com/pkg/errors v0.9.1 + github.com/rabbitmq/amqp091-go v1.7.0 github.com/rubenv/sql-migrate v1.1.2 github.com/shirou/gopsutil/v3 v3.23.2 github.com/sirupsen/logrus v1.9.0 @@ -41,7 +42,6 @@ require ( golang.org/x/crypto v0.6.0 golang.org/x/net v0.8.0 golang.org/x/time v0.3.0 - gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 nhooyr.io/websocket v1.8.7 ) diff --git a/go.sum b/go.sum index 47153b47..3878353a 100644 --- a/go.sum +++ b/go.sum @@ -525,6 +525,8 @@ github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:Om github.com/poy/onpar v0.0.0-20190519213022-ee068f8ea4d1 h1:oL4IBbcqwhhNWh31bjOX8C/OCy0zs9906d/VUru+bqg= github.com/poy/onpar v0.0.0-20190519213022-ee068f8ea4d1/go.mod h1:nSbFQvMj97ZyhFRSJYtut+msi4sOY6zJDGCdSc+/rZU= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rabbitmq/amqp091-go v1.7.0 h1:V5CF5qPem5OGSnEo8BoSbsDGwejg6VUJsKEdneaoTUo= +github.com/rabbitmq/amqp091-go v1.7.0/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= @@ -638,6 +640,8 @@ go.opentelemetry.io/otel/sdk v1.11.1 h1:F7KmQgoHljhUuJyA+9BiU+EkJfyX5nVVF4wyzWZp go.opentelemetry.io/otel/trace v1.11.1 h1:ofxdnzsNrGBYXbP7t7zpUK281+go5rF7dvdIZXF8gdQ= go.opentelemetry.io/otel/trace v1.11.1/go.mod h1:f/Q9G7vzk5u91PhbmKbg1Qn0rzH1LJ4vbPHFGkTPtOk= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= +go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -913,6 +917,7 @@ golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -1031,8 +1036,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= -gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw= -gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=