From 85b8f36ec1755af0eda56402ba0b7d11b2ab2560 Mon Sep 17 00:00:00 2001 From: Viktor Liu <17948409+lixmal@users.noreply.github.com> Date: Thu, 13 Jun 2024 01:20:46 +0200 Subject: [PATCH] Add basic signal metrics (#2107) --- client/cmd/testutil.go | 8 ++- client/internal/engine_test.go | 11 ++- client/server/server_test.go | 12 +++- go.mod | 1 + go.sum | 2 + signal/README.md | 78 ++++++++++++++++++--- signal/client/client_test.go | 7 +- signal/cmd/run.go | 37 +++++++++- signal/metrics/app.go | 124 +++++++++++++++++++++++++++++++++ signal/metrics/metrics.go | 74 ++++++++++++++++++++ signal/peer/peer.go | 21 +++++- signal/peer/peer_test.go | 27 +++++-- signal/server/signal.go | 80 +++++++++++++++++---- 13 files changed, 443 insertions(+), 39 deletions(-) create mode 100644 signal/metrics/app.go create mode 100644 signal/metrics/metrics.go diff --git a/client/cmd/testutil.go b/client/cmd/testutil.go index 35fd7c537..1fc36b373 100644 --- a/client/cmd/testutil.go +++ b/client/cmd/testutil.go @@ -7,6 +7,9 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "github.com/netbirdio/netbird/management/server/activity" "github.com/netbirdio/netbird/util" @@ -53,7 +56,10 @@ func startSignal(t *testing.T) (*grpc.Server, net.Listener) { t.Fatal(err) } s := grpc.NewServer() - sigProto.RegisterSignalExchangeServer(s, sig.NewServer()) + srv, err := sig.NewServer(otel.Meter("")) + require.NoError(t, err) + + sigProto.RegisterSignalExchangeServer(s, srv) go func() { if err := s.Serve(lis); err != nil { panic(err) diff --git a/client/internal/engine_test.go b/client/internal/engine_test.go index f5a98cb7f..a6461e24b 100644 --- a/client/internal/engine_test.go +++ b/client/internal/engine_test.go @@ -17,6 +17,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -810,7 +811,7 @@ func TestEngine_MultiplePeers(t *testing.T) { ctx, cancel := context.WithCancel(CtxInitState(context.Background())) defer cancel() - sigServer, signalAddr, err := startSignal() + sigServer, signalAddr, err := startSignal(t) if err != nil { t.Fatal(err) return @@ -1013,7 +1014,9 @@ func createEngine(ctx context.Context, cancel context.CancelFunc, setupKey strin return e, err } -func startSignal() (*grpc.Server, string, error) { +func startSignal(t *testing.T) (*grpc.Server, string, error) { + t.Helper() + s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp)) lis, err := net.Listen("tcp", "localhost:0") @@ -1021,7 +1024,9 @@ func startSignal() (*grpc.Server, string, error) { log.Fatalf("failed to listen: %v", err) } - proto.RegisterSignalExchangeServer(s, signalServer.NewServer()) + srv, err := signalServer.NewServer(otel.Meter("")) + require.NoError(t, err) + proto.RegisterSignalExchangeServer(s, srv) go func() { if err = s.Serve(lis); err != nil { diff --git a/client/server/server_test.go b/client/server/server_test.go index a9f23ce7c..46fc9fa8e 100644 --- a/client/server/server_test.go +++ b/client/server/server_test.go @@ -7,6 +7,8 @@ import ( "time" "github.com/netbirdio/management-integrations/integrations" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" log "github.com/sirupsen/logrus" "google.golang.org/grpc" @@ -39,7 +41,7 @@ var ( // we will use a management server started via to simulate the server and capture the number of retries func TestConnectWithRetryRuns(t *testing.T) { // start the signal server - _, signalAddr, err := startSignal() + _, signalAddr, err := startSignal(t) if err != nil { t.Fatalf("failed to start signal server: %v", err) } @@ -141,7 +143,9 @@ func startManagement(t *testing.T, signalAddr string, counter *int) (*grpc.Serve return s, lis.Addr().String(), nil } -func startSignal() (*grpc.Server, string, error) { +func startSignal(t *testing.T) (*grpc.Server, string, error) { + t.Helper() + s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp)) lis, err := net.Listen("tcp", "localhost:0") @@ -149,7 +153,9 @@ func startSignal() (*grpc.Server, string, error) { log.Fatalf("failed to listen: %v", err) } - proto.RegisterSignalExchangeServer(s, signalServer.NewServer()) + srv, err := signalServer.NewServer(otel.Meter("")) + require.NoError(t, err) + proto.RegisterSignalExchangeServer(s, srv) go func() { if err = s.Serve(lis); err != nil { diff --git a/go.mod b/go.mod index 44e0ee559..a300eb1e5 100644 --- a/go.mod +++ b/go.mod @@ -75,6 +75,7 @@ require ( github.com/things-go/go-socks5 v0.0.4 github.com/yusufpapurcu/wmi v1.2.4 github.com/zcalusic/sysinfo v1.0.2 + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 go.opentelemetry.io/otel v1.26.0 go.opentelemetry.io/otel/exporters/prometheus v0.48.0 go.opentelemetry.io/otel/metric v1.26.0 diff --git a/go.sum b/go.sum index 5a1197763..e46b0b744 100644 --- a/go.sum +++ b/go.sum @@ -502,6 +502,8 @@ github.com/zcalusic/sysinfo v1.0.2 h1:nwTTo2a+WQ0NXwo0BGRojOJvJ/5XKvQih+2RrtWqfx github.com/zcalusic/sysinfo v1.0.2/go.mod h1:kluzTYflRWo6/tXVMJPdEjShsbPpsFRyy+p1mBQPC30= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0/go.mod h1:Mjt1i1INqiaoZOMGR1RIUJN+i3ChKoFRqzrRQhlkbs0= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 h1:Xs2Ncz0gNihqu9iosIZ5SkBbWo5T8JhhLJFMQL1qmLI= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0/go.mod h1:vy+2G/6NvVMpwGX/NyLqcC41fxepnuKHk16E6IZUcJc= go.opentelemetry.io/otel v1.26.0 h1:LQwgL5s/1W7YiiRwxf03QGnWLb2HW4pLiAhaA5cZXBs= diff --git a/signal/README.md b/signal/README.md index 96c80a490..dd2d761ad 100644 --- a/signal/README.md +++ b/signal/README.md @@ -1,9 +1,12 @@ # netbird Signal Server -This is a netbird signal-exchange server and client library to exchange connection information between netbird peers +This is a netbird signal-exchange server and client library to exchange +connection information between netbird peers ## Command Options -The CLI accepts the command **management** with the following options: + +The CLI accepts the the following options: + ```shell start Netbird Signal Server daemon @@ -20,24 +23,38 @@ Global Flags: --log-file string sets Netbird log path. If console is specified the the log will be output to stdout (default "/var/log/netbird/signal.log") --log-level string (default "info") ``` + ## Running the Signal service (Docker) -We have packed the Signal server into docker image. You can pull the image from Docker Hub and execute it with the following commands: +We have packed the Signal server into docker image. You can pull the image from +Docker Hub and execute it with the +following commands: + ````shell docker pull netbirdio/signal:latest docker run -d --name netbird-signal -p 10000:10000 netbirdio/signal:latest ```` -The default log-level is set to INFO, if you need you can change it using by updating the docker cmd as followed: + +The default log-level is set to INFO, if you need you can change it using by +updating the docker cmd as followed: + ````shell docker run -d --name netbird-signal -p 10000:10000 netbirdio/signal:latest --log-level DEBUG ```` + ### Run with TLS (Let's Encrypt). -By specifying the **--letsencrypt-domain** the daemon will handle SSL certificate request and configuration. -In the following example ```10000``` is the signal service **default** port, and ```443``` will be used as port for Let's Encrypt challenge and HTTP API. -> The server where you are running a container has to have a public IP (for Let's Encrypt certificate challenge). +By specifying the **--letsencrypt-domain** the daemon will handle SSL +certificate request and configuration. -Replace with your server's public domain (e.g. mydomain.com or subdomain sub.mydomain.com). +In the following example ```10000``` is the signal service **default** port, +and ```443``` will be used as port for +Let's Encrypt challenge and HTTP API. +> The server where you are running a container has to have a public IP (for +> Let's Encrypt certificate challenge). + +Replace `` with your server's public domain (e.g. mydomain.com or +subdomain sub.mydomain.com). ```bash # create a volume @@ -50,14 +67,57 @@ docker run -d --name netbird-signal \ netbirdio/signal:latest \ --letsencrypt-domain ``` + +## Metrics + +The Signal Server exposes the following metrics in Prometheus format: + +### Application Metrics + +- **active_peers**: A Gauge metric that tracks the number of active peers + connected to the server. +- **peer_connection_duration_seconds**: A Histogram metric that measures the + duration a peer was connected in seconds. +- **registrations_total**: A Counter metric that counts the total number of peer + registrations. +- **deregistrations_total**: A Counter metric that counts the total number of + peer deregistrations. +- **registration_failures_total**: A Counter metric that counts the total number + of failed peer registrations. Possible + labels: + - `error`: The type of error that caused the registration failure ( + e.g., `missing_id`, `missing_meta`, `failed_header`). +- **registration_delay_milliseconds**: A Histogram metric that measures the time + it took to register a peer in + milliseconds. +- **messages_forwarded_total**: A Counter metric that counts the total number of + messages forwarded between peers. +- **message_forward_failures_total**: A Counter metric that counts the total + number of failed message forwards between + peers. Possible labels: + - `type`: The type of failure ( + e.g., `error`, `not_connected`, `not_registered`). +- **message_forward_latency_milliseconds**: A Histogram metric that measures the + latency of message forwarding between + peers in milliseconds. + +### Endpoint + +The metrics are exposed in Prometheus format on the `/metrics` endpoint. By +default, the server listens on port `9090`, +so the full endpoint would be: + +> http://:9090/metrics + ## For development purposes: The project uses gRpc library and defines service in protobuf file located in: - ```proto/signalexchange.proto``` +```proto/signalexchange.proto``` To build the project you have to do the following things. Install golang gRpc tools: + ```bash #!/bin/bash go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.26 diff --git a/signal/client/client_test.go b/signal/client/client_test.go index 3ad348b6f..2525493b4 100644 --- a/signal/client/client_test.go +++ b/signal/client/client_test.go @@ -9,6 +9,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" log "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -198,7 +199,11 @@ func startSignal() (*grpc.Server, net.Listener) { panic(err) } s := grpc.NewServer() - sigProto.RegisterSignalExchangeServer(s, server.NewServer()) + srv, err := server.NewServer(otel.Meter("")) + if err != nil { + panic(err) + } + sigProto.RegisterSignalExchangeServer(s, srv) go func() { if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) diff --git a/signal/cmd/run.go b/signal/cmd/run.go index 10a2da636..4b0dc583e 100644 --- a/signal/cmd/run.go +++ b/signal/cmd/run.go @@ -1,6 +1,7 @@ package cmd import ( + "context" "errors" "flag" "fmt" @@ -13,8 +14,11 @@ import ( "strings" "time" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "golang.org/x/crypto/acme/autocert" + "github.com/netbirdio/netbird/signal/metrics" + "github.com/netbirdio/netbird/encryption" "github.com/netbirdio/netbird/signal/proto" "github.com/netbirdio/netbird/signal/server" @@ -28,6 +32,10 @@ import ( "google.golang.org/grpc/keepalive" ) +const ( + metricsPort = 9090 +) + var ( signalPort int signalLetsencryptDomain string @@ -95,9 +103,26 @@ var ( opts = append(opts, grpc.Creds(transportCredentials)) } - opts = append(opts, signalKaep, signalKasp) + metricsServer := metrics.NewServer(metricsPort, "") + if err != nil { + return fmt.Errorf("setup metrics: %v", err) + } + + opts = append(opts, signalKaep, signalKasp, grpc.StatsHandler(otelgrpc.NewServerHandler())) grpcServer := grpc.NewServer(opts...) - proto.RegisterSignalExchangeServer(grpcServer, server.NewServer()) + + go func() { + log.Infof("running metrics server: %s%s", metricsServer.Addr, metricsServer.Endpoint) + if err := metricsServer.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) { + log.Fatalf("Failed to start metrics server: %v", err) + } + }() + + srv, err := server.NewServer(metricsServer.Meter) + if err != nil { + return fmt.Errorf("creating signal server: %v", err) + } + proto.RegisterSignalExchangeServer(grpcServer, srv) var compatListener net.Listener if signalPort != 10000 { @@ -150,6 +175,14 @@ var ( _ = compatListener.Close() log.Infof("stopped gRPC backward compatibility server") } + + ctx, cancel := context.WithTimeout(cmd.Context(), 5*time.Second) + defer cancel() + if err := metricsServer.Shutdown(ctx); err != nil { + log.Errorf("Failed to stop metrics server: %v", err) + } + log.Infof("stopped metrics server") + log.Infof("stopped Signal Service") return nil diff --git a/signal/metrics/app.go b/signal/metrics/app.go new file mode 100644 index 000000000..fb882a5d4 --- /dev/null +++ b/signal/metrics/app.go @@ -0,0 +1,124 @@ +package metrics + +import ( + "go.opentelemetry.io/otel/metric" +) + +// AppMetrics holds all the application metrics +type AppMetrics struct { + metric.Meter + + ActivePeers metric.Int64UpDownCounter + PeerConnectionDuration metric.Int64Histogram + + Registrations metric.Int64Counter + Deregistrations metric.Int64Counter + RegistrationFailures metric.Int64Counter + RegistrationDelay metric.Float64Histogram + + MessagesForwarded metric.Int64Counter + MessageForwardFailures metric.Int64Counter + MessageForwardLatency metric.Float64Histogram +} + +func NewAppMetrics(meter metric.Meter) (*AppMetrics, error) { + activePeers, err := meter.Int64UpDownCounter("active_peers") + if err != nil { + return nil, err + } + + peerConnectionDuration, err := meter.Int64Histogram("peer_connection_duration_seconds", + metric.WithExplicitBucketBoundaries(getPeerConnectionDurationBucketBoundaries()...)) + if err != nil { + return nil, err + } + + registrations, err := meter.Int64Counter("registrations_total") + if err != nil { + return nil, err + } + + deregistrations, err := meter.Int64Counter("deregistrations_total") + if err != nil { + return nil, err + } + + registrationFailures, err := meter.Int64Counter("registration_failures_total") + if err != nil { + return nil, err + } + + registrationDelay, err := meter.Float64Histogram("registration_delay_milliseconds", + metric.WithExplicitBucketBoundaries(getStandardBucketBoundaries()...)) + if err != nil { + return nil, err + } + + messagesForwarded, err := meter.Int64Counter("messages_forwarded_total") + if err != nil { + return nil, err + } + + messageForwardFailures, err := meter.Int64Counter("message_forward_failures_total") + if err != nil { + return nil, err + } + + messageForwardLatency, err := meter.Float64Histogram("message_forward_latency_milliseconds", + metric.WithExplicitBucketBoundaries(getStandardBucketBoundaries()...)) + if err != nil { + return nil, err + } + + return &AppMetrics{ + Meter: meter, + + ActivePeers: activePeers, + PeerConnectionDuration: peerConnectionDuration, + + Registrations: registrations, + Deregistrations: deregistrations, + RegistrationFailures: registrationFailures, + RegistrationDelay: registrationDelay, + + MessagesForwarded: messagesForwarded, + MessageForwardFailures: messageForwardFailures, + MessageForwardLatency: messageForwardLatency, + }, nil +} + +func getStandardBucketBoundaries() []float64 { + return []float64{ + 0.1, + 0.5, + 1, + 5, + 10, + 50, + 100, + 500, + 1000, + 5000, + 10000, + } +} +func getPeerConnectionDurationBucketBoundaries() []float64 { + return []float64{ + 1, + 60, + // 10m + 600, + // 1h + 3600, + // 2h, + 7200, + // 6h, + 21600, + // 12h, + 43200, + // 24h, + 86400, + // 48h, + 172800, + } +} diff --git a/signal/metrics/metrics.go b/signal/metrics/metrics.go new file mode 100644 index 000000000..30db1600a --- /dev/null +++ b/signal/metrics/metrics.go @@ -0,0 +1,74 @@ +package metrics + +import ( + "context" + "fmt" + "net/http" + "reflect" + + prometheus2 "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/prometheus" + api "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk/metric" +) + +const defaultEndpoint = "/metrics" + +// Metrics holds the metrics information and exposes it +type Metrics struct { + Meter api.Meter + provider *metric.MeterProvider + Endpoint string + + *http.Server +} + +// NewServer initializes and returns a new Metrics instance +func NewServer(port int, endpoint string) *Metrics { + exporter, err := prometheus.New() + if err != nil { + return nil + } + + provider := metric.NewMeterProvider(metric.WithReader(exporter)) + otel.SetMeterProvider(provider) + + pkg := reflect.TypeOf(defaultEndpoint).PkgPath() + meter := provider.Meter(pkg) + + if endpoint == "" { + endpoint = defaultEndpoint + } + + router := http.NewServeMux() + router.Handle(endpoint, promhttp.HandlerFor( + prometheus2.DefaultGatherer, + promhttp.HandlerOpts{EnableOpenMetrics: true})) + + server := &http.Server{ + Addr: fmt.Sprintf(":%d", port), + Handler: router, + } + + return &Metrics{ + Meter: meter, + provider: provider, + Endpoint: endpoint, + Server: server, + } +} + +// Shutdown stops the metrics server +func (m *Metrics) Shutdown(ctx context.Context) error { + if err := m.Server.Shutdown(ctx); err != nil { + return fmt.Errorf("http server: %w", err) + } + + if err := m.provider.Shutdown(ctx); err != nil { + return fmt.Errorf("meter provider: %w", err) + } + + return nil +} diff --git a/signal/peer/peer.go b/signal/peer/peer.go index 612e250a5..3149526b2 100644 --- a/signal/peer/peer.go +++ b/signal/peer/peer.go @@ -1,10 +1,14 @@ package peer import ( - "github.com/netbirdio/netbird/signal/proto" - log "github.com/sirupsen/logrus" + "context" "sync" "time" + + log "github.com/sirupsen/logrus" + + "github.com/netbirdio/netbird/signal/metrics" + "github.com/netbirdio/netbird/signal/proto" ) // Peer representation of a connected Peer @@ -33,12 +37,14 @@ type Registry struct { Peers sync.Map // regMutex ensures that registration and de-registrations are safe regMutex sync.Mutex + metrics *metrics.AppMetrics } // NewRegistry creates a new connected Peer registry -func NewRegistry() *Registry { +func NewRegistry(metrics *metrics.AppMetrics) *Registry { return &Registry{ regMutex: sync.Mutex{}, + metrics: metrics, } } @@ -60,6 +66,8 @@ func (registry *Registry) IsPeerRegistered(peerId string) bool { // Register registers peer in the registry func (registry *Registry) Register(peer *Peer) { + start := time.Now() + registry.regMutex.Lock() defer registry.regMutex.Unlock() @@ -72,6 +80,11 @@ func (registry *Registry) Register(peer *Peer) { registry.Peers.Store(peer.Id, peer) } log.Debugf("peer registered [%s]", peer.Id) + + // record time as milliseconds + registry.metrics.RegistrationDelay.Record(context.Background(), float64(time.Since(start).Nanoseconds())/1e6) + + registry.metrics.Registrations.Add(context.Background(), 1) } // Deregister Peer from the Registry (usually once it disconnects) @@ -90,4 +103,6 @@ func (registry *Registry) Deregister(peer *Peer) { } } log.Debugf("peer deregistered [%s]", peer.Id) + + registry.metrics.Deregistrations.Add(context.Background(), 1) } diff --git a/signal/peer/peer_test.go b/signal/peer/peer_test.go index bf3dc706a..fb85fedda 100644 --- a/signal/peer/peer_test.go +++ b/signal/peer/peer_test.go @@ -1,13 +1,21 @@ package peer import ( - "github.com/stretchr/testify/assert" "testing" "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + + "github.com/netbirdio/netbird/signal/metrics" ) func TestRegistry_ShouldNotDeregisterWhenHasNewerStreamRegistered(t *testing.T) { - r := NewRegistry() + metrics, err := metrics.NewAppMetrics(otel.Meter("")) + require.NoError(t, err) + + r := NewRegistry(metrics) peerID := "peer" @@ -30,7 +38,10 @@ func TestRegistry_ShouldNotDeregisterWhenHasNewerStreamRegistered(t *testing.T) } func TestRegistry_GetNonExistentPeer(t *testing.T) { - r := NewRegistry() + metrics, err := metrics.NewAppMetrics(otel.Meter("")) + require.NoError(t, err) + + r := NewRegistry(metrics) peer, ok := r.Get("non_existent_peer") @@ -44,7 +55,10 @@ func TestRegistry_GetNonExistentPeer(t *testing.T) { } func TestRegistry_Register(t *testing.T) { - r := NewRegistry() + metrics, err := metrics.NewAppMetrics(otel.Meter("")) + require.NoError(t, err) + + r := NewRegistry(metrics) peer1 := NewPeer("test_peer_1", nil) peer2 := NewPeer("test_peer_2", nil) r.Register(peer1) @@ -60,7 +74,10 @@ func TestRegistry_Register(t *testing.T) { } func TestRegistry_Deregister(t *testing.T) { - r := NewRegistry() + metrics, err := metrics.NewAppMetrics(otel.Meter("")) + require.NoError(t, err) + + r := NewRegistry(metrics) peer1 := NewPeer("test_peer_1", nil) peer2 := NewPeer("test_peer_2", nil) r.Register(peer1) diff --git a/signal/server/signal.go b/signal/server/signal.go index 84045e800..fc9c19efd 100644 --- a/signal/server/signal.go +++ b/signal/server/signal.go @@ -3,72 +3,114 @@ package server import ( "context" "fmt" - "github.com/netbirdio/netbird/signal/peer" - "github.com/netbirdio/netbird/signal/proto" + "io" + "time" + log "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - "io" + + "github.com/netbirdio/netbird/signal/metrics" + "github.com/netbirdio/netbird/signal/peer" + "github.com/netbirdio/netbird/signal/proto" +) + +const ( + labelType = "type" + labelTypeError = "error" + labelTypeNotConnected = "not_connected" + labelTypeNotRegistered = "not_registered" + + labelError = "error" + labelErrorMissingId = "missing_id" + labelErrorMissingMeta = "missing_meta" + labelErrorFailedHeader = "failed_header" ) // Server an instance of a Signal server type Server struct { registry *peer.Registry proto.UnimplementedSignalExchangeServer + + metrics *metrics.AppMetrics } // NewServer creates a new Signal server -func NewServer() *Server { - return &Server{ - registry: peer.NewRegistry(), +func NewServer(meter metric.Meter) (*Server, error) { + appMetrics, err := metrics.NewAppMetrics(meter) + if err != nil { + return nil, fmt.Errorf("creating app metrics: %v", err) } + + s := &Server{ + registry: peer.NewRegistry(appMetrics), + metrics: appMetrics, + } + + return s, nil } // Send forwards a message to the signal peer func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto.EncryptedMessage, error) { - if !s.registry.IsPeerRegistered(msg.Key) { + s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeNotRegistered))) + return nil, fmt.Errorf("peer %s is not registered", msg.Key) } if dstPeer, found := s.registry.Get(msg.RemoteKey); found { //forward the message to the target peer - err := dstPeer.Stream.Send(msg) - if err != nil { + if err := dstPeer.Stream.Send(msg); err != nil { log.Errorf("error while forwarding message from peer [%s] to peer [%s] %v", msg.Key, msg.RemoteKey, err) //todo respond to the sender? + + s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeError))) + } else { + s.metrics.MessagesForwarded.Add(context.Background(), 1) } } else { log.Debugf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", msg.Key, msg.RemoteKey) //todo respond to the sender? + + s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeNotConnected))) } return &proto.EncryptedMessage{}, nil } // ConnectStream connects to the exchange stream func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer) error { - p, err := s.connectPeer(stream) if err != nil { return err } + startRegister := time.Now() + + s.metrics.ActivePeers.Add(stream.Context(), 1) + defer func() { log.Infof("peer disconnected [%s] [streamID %d] ", p.Id, p.StreamID) s.registry.Deregister(p) + + s.metrics.PeerConnectionDuration.Record(stream.Context(), int64(time.Since(startRegister).Seconds())) + s.metrics.ActivePeers.Add(context.Background(), -1) }() //needed to confirm that the peer has been registered so that the client can proceed header := metadata.Pairs(proto.HeaderRegistered, "1") err = stream.SendHeader(header) if err != nil { + s.metrics.RegistrationFailures.Add(stream.Context(), 1, metric.WithAttributes(attribute.String(labelError, labelErrorFailedHeader))) return err } log.Infof("peer connected [%s] [streamID %d] ", p.Id, p.StreamID) for { + //read incoming messages msg, err := stream.Recv() if err == io.EOF { @@ -76,18 +118,28 @@ func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer) } else if err != nil { return err } + start := time.Now() + log.Debugf("received a new message from peer [%s] to peer [%s]", p.Id, msg.RemoteKey) + // lookup the target peer where the message is going to if dstPeer, found := s.registry.Get(msg.RemoteKey); found { //forward the message to the target peer - err := dstPeer.Stream.Send(msg) - if err != nil { + if err := dstPeer.Stream.Send(msg); err != nil { log.Errorf("error while forwarding message from peer [%s] to peer [%s] %v", p.Id, msg.RemoteKey, err) //todo respond to the sender? + + // in milliseconds + s.metrics.MessageForwardLatency.Record(stream.Context(), float64(time.Since(start).Nanoseconds())/1e6) + s.metrics.MessagesForwarded.Add(stream.Context(), 1) + } else { + s.metrics.MessageForwardFailures.Add(stream.Context(), 1, metric.WithAttributes(attribute.String(labelType, labelTypeError))) } } else { log.Debugf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", p.Id, msg.RemoteKey) //todo respond to the sender? + + s.metrics.MessageForwardFailures.Add(stream.Context(), 1, metric.WithAttributes(attribute.String(labelType, labelTypeNotConnected))) } } <-stream.Context().Done() @@ -101,12 +153,16 @@ func (s Server) connectPeer(stream proto.SignalExchange_ConnectStreamServer) (*p if meta, hasMeta := metadata.FromIncomingContext(stream.Context()); hasMeta { if id, found := meta[proto.HeaderId]; found { p := peer.NewPeer(id[0], stream) + s.registry.Register(p) + return p, nil } else { + s.metrics.RegistrationFailures.Add(stream.Context(), 1, metric.WithAttributes(attribute.String(labelError, labelErrorMissingId))) return nil, status.Errorf(codes.FailedPrecondition, "missing connection header: "+proto.HeaderId) } } else { + s.metrics.RegistrationFailures.Add(stream.Context(), 1, metric.WithAttributes(attribute.String(labelError, labelErrorMissingMeta))) return nil, status.Errorf(codes.FailedPrecondition, "missing connection stream meta") } }