Add basic signal metrics (#2107)

This commit is contained in:
Viktor Liu 2024-06-13 01:20:46 +02:00 committed by GitHub
parent 94e505480b
commit 85b8f36ec1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 443 additions and 39 deletions

View File

@ -7,6 +7,9 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"github.com/netbirdio/netbird/management/server/activity"
"github.com/netbirdio/netbird/util"
@ -53,7 +56,10 @@ func startSignal(t *testing.T) (*grpc.Server, net.Listener) {
t.Fatal(err)
}
s := grpc.NewServer()
sigProto.RegisterSignalExchangeServer(s, sig.NewServer())
srv, err := sig.NewServer(otel.Meter(""))
require.NoError(t, err)
sigProto.RegisterSignalExchangeServer(s, srv)
go func() {
if err := s.Serve(lis); err != nil {
panic(err)

View File

@ -17,6 +17,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
@ -810,7 +811,7 @@ func TestEngine_MultiplePeers(t *testing.T) {
ctx, cancel := context.WithCancel(CtxInitState(context.Background()))
defer cancel()
sigServer, signalAddr, err := startSignal()
sigServer, signalAddr, err := startSignal(t)
if err != nil {
t.Fatal(err)
return
@ -1013,7 +1014,9 @@ func createEngine(ctx context.Context, cancel context.CancelFunc, setupKey strin
return e, err
}
func startSignal() (*grpc.Server, string, error) {
func startSignal(t *testing.T) (*grpc.Server, string, error) {
t.Helper()
s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))
lis, err := net.Listen("tcp", "localhost:0")
@ -1021,7 +1024,9 @@ func startSignal() (*grpc.Server, string, error) {
log.Fatalf("failed to listen: %v", err)
}
proto.RegisterSignalExchangeServer(s, signalServer.NewServer())
srv, err := signalServer.NewServer(otel.Meter(""))
require.NoError(t, err)
proto.RegisterSignalExchangeServer(s, srv)
go func() {
if err = s.Serve(lis); err != nil {

View File

@ -7,6 +7,8 @@ import (
"time"
"github.com/netbirdio/management-integrations/integrations"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
@ -39,7 +41,7 @@ var (
// we will use a management server started via to simulate the server and capture the number of retries
func TestConnectWithRetryRuns(t *testing.T) {
// start the signal server
_, signalAddr, err := startSignal()
_, signalAddr, err := startSignal(t)
if err != nil {
t.Fatalf("failed to start signal server: %v", err)
}
@ -141,7 +143,9 @@ func startManagement(t *testing.T, signalAddr string, counter *int) (*grpc.Serve
return s, lis.Addr().String(), nil
}
func startSignal() (*grpc.Server, string, error) {
func startSignal(t *testing.T) (*grpc.Server, string, error) {
t.Helper()
s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))
lis, err := net.Listen("tcp", "localhost:0")
@ -149,7 +153,9 @@ func startSignal() (*grpc.Server, string, error) {
log.Fatalf("failed to listen: %v", err)
}
proto.RegisterSignalExchangeServer(s, signalServer.NewServer())
srv, err := signalServer.NewServer(otel.Meter(""))
require.NoError(t, err)
proto.RegisterSignalExchangeServer(s, srv)
go func() {
if err = s.Serve(lis); err != nil {

1
go.mod
View File

@ -75,6 +75,7 @@ require (
github.com/things-go/go-socks5 v0.0.4
github.com/yusufpapurcu/wmi v1.2.4
github.com/zcalusic/sysinfo v1.0.2
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0
go.opentelemetry.io/otel v1.26.0
go.opentelemetry.io/otel/exporters/prometheus v0.48.0
go.opentelemetry.io/otel/metric v1.26.0

2
go.sum
View File

@ -502,6 +502,8 @@ github.com/zcalusic/sysinfo v1.0.2 h1:nwTTo2a+WQ0NXwo0BGRojOJvJ/5XKvQih+2RrtWqfx
github.com/zcalusic/sysinfo v1.0.2/go.mod h1:kluzTYflRWo6/tXVMJPdEjShsbPpsFRyy+p1mBQPC30=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0/go.mod h1:Mjt1i1INqiaoZOMGR1RIUJN+i3ChKoFRqzrRQhlkbs0=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 h1:Xs2Ncz0gNihqu9iosIZ5SkBbWo5T8JhhLJFMQL1qmLI=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0/go.mod h1:vy+2G/6NvVMpwGX/NyLqcC41fxepnuKHk16E6IZUcJc=
go.opentelemetry.io/otel v1.26.0 h1:LQwgL5s/1W7YiiRwxf03QGnWLb2HW4pLiAhaA5cZXBs=

View File

@ -1,9 +1,12 @@
# netbird Signal Server
This is a netbird signal-exchange server and client library to exchange connection information between netbird peers
This is a netbird signal-exchange server and client library to exchange
connection information between netbird peers
## Command Options
The CLI accepts the command **management** with the following options:
The CLI accepts the the following options:
```shell
start Netbird Signal Server daemon
@ -20,24 +23,38 @@ Global Flags:
--log-file string sets Netbird log path. If console is specified the the log will be output to stdout (default "/var/log/netbird/signal.log")
--log-level string (default "info")
```
## Running the Signal service (Docker)
We have packed the Signal server into docker image. You can pull the image from Docker Hub and execute it with the following commands:
We have packed the Signal server into docker image. You can pull the image from
Docker Hub and execute it with the
following commands:
````shell
docker pull netbirdio/signal:latest
docker run -d --name netbird-signal -p 10000:10000 netbirdio/signal:latest
````
The default log-level is set to INFO, if you need you can change it using by updating the docker cmd as followed:
The default log-level is set to INFO, if you need you can change it using by
updating the docker cmd as followed:
````shell
docker run -d --name netbird-signal -p 10000:10000 netbirdio/signal:latest --log-level DEBUG
````
### Run with TLS (Let's Encrypt).
By specifying the **--letsencrypt-domain** the daemon will handle SSL certificate request and configuration.
In the following example ```10000``` is the signal service **default** port, and ```443``` will be used as port for Let's Encrypt challenge and HTTP API.
> The server where you are running a container has to have a public IP (for Let's Encrypt certificate challenge).
By specifying the **--letsencrypt-domain** the daemon will handle SSL
certificate request and configuration.
Replace <YOUR-DOMAIN> with your server's public domain (e.g. mydomain.com or subdomain sub.mydomain.com).
In the following example ```10000``` is the signal service **default** port,
and ```443``` will be used as port for
Let's Encrypt challenge and HTTP API.
> The server where you are running a container has to have a public IP (for
> Let's Encrypt certificate challenge).
Replace `<YOUR-DOMAIN>` with your server's public domain (e.g. mydomain.com or
subdomain sub.mydomain.com).
```bash
# create a volume
@ -50,14 +67,57 @@ docker run -d --name netbird-signal \
netbirdio/signal:latest \
--letsencrypt-domain <YOUR-DOMAIN>
```
## Metrics
The Signal Server exposes the following metrics in Prometheus format:
### Application Metrics
- **active_peers**: A Gauge metric that tracks the number of active peers
connected to the server.
- **peer_connection_duration_seconds**: A Histogram metric that measures the
duration a peer was connected in seconds.
- **registrations_total**: A Counter metric that counts the total number of peer
registrations.
- **deregistrations_total**: A Counter metric that counts the total number of
peer deregistrations.
- **registration_failures_total**: A Counter metric that counts the total number
of failed peer registrations. Possible
labels:
- `error`: The type of error that caused the registration failure (
e.g., `missing_id`, `missing_meta`, `failed_header`).
- **registration_delay_milliseconds**: A Histogram metric that measures the time
it took to register a peer in
milliseconds.
- **messages_forwarded_total**: A Counter metric that counts the total number of
messages forwarded between peers.
- **message_forward_failures_total**: A Counter metric that counts the total
number of failed message forwards between
peers. Possible labels:
- `type`: The type of failure (
e.g., `error`, `not_connected`, `not_registered`).
- **message_forward_latency_milliseconds**: A Histogram metric that measures the
latency of message forwarding between
peers in milliseconds.
### Endpoint
The metrics are exposed in Prometheus format on the `/metrics` endpoint. By
default, the server listens on port `9090`,
so the full endpoint would be:
> http://<server_ip>:9090/metrics
## For development purposes:
The project uses gRpc library and defines service in protobuf file located in:
```proto/signalexchange.proto```
```proto/signalexchange.proto```
To build the project you have to do the following things.
Install golang gRpc tools:
```bash
#!/bin/bash
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.26

View File

@ -9,6 +9,7 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
@ -198,7 +199,11 @@ func startSignal() (*grpc.Server, net.Listener) {
panic(err)
}
s := grpc.NewServer()
sigProto.RegisterSignalExchangeServer(s, server.NewServer())
srv, err := server.NewServer(otel.Meter(""))
if err != nil {
panic(err)
}
sigProto.RegisterSignalExchangeServer(s, srv)
go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)

View File

@ -1,6 +1,7 @@
package cmd
import (
"context"
"errors"
"flag"
"fmt"
@ -13,8 +14,11 @@ import (
"strings"
"time"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"golang.org/x/crypto/acme/autocert"
"github.com/netbirdio/netbird/signal/metrics"
"github.com/netbirdio/netbird/encryption"
"github.com/netbirdio/netbird/signal/proto"
"github.com/netbirdio/netbird/signal/server"
@ -28,6 +32,10 @@ import (
"google.golang.org/grpc/keepalive"
)
const (
metricsPort = 9090
)
var (
signalPort int
signalLetsencryptDomain string
@ -95,9 +103,26 @@ var (
opts = append(opts, grpc.Creds(transportCredentials))
}
opts = append(opts, signalKaep, signalKasp)
metricsServer := metrics.NewServer(metricsPort, "")
if err != nil {
return fmt.Errorf("setup metrics: %v", err)
}
opts = append(opts, signalKaep, signalKasp, grpc.StatsHandler(otelgrpc.NewServerHandler()))
grpcServer := grpc.NewServer(opts...)
proto.RegisterSignalExchangeServer(grpcServer, server.NewServer())
go func() {
log.Infof("running metrics server: %s%s", metricsServer.Addr, metricsServer.Endpoint)
if err := metricsServer.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("Failed to start metrics server: %v", err)
}
}()
srv, err := server.NewServer(metricsServer.Meter)
if err != nil {
return fmt.Errorf("creating signal server: %v", err)
}
proto.RegisterSignalExchangeServer(grpcServer, srv)
var compatListener net.Listener
if signalPort != 10000 {
@ -150,6 +175,14 @@ var (
_ = compatListener.Close()
log.Infof("stopped gRPC backward compatibility server")
}
ctx, cancel := context.WithTimeout(cmd.Context(), 5*time.Second)
defer cancel()
if err := metricsServer.Shutdown(ctx); err != nil {
log.Errorf("Failed to stop metrics server: %v", err)
}
log.Infof("stopped metrics server")
log.Infof("stopped Signal Service")
return nil

124
signal/metrics/app.go Normal file
View File

@ -0,0 +1,124 @@
package metrics
import (
"go.opentelemetry.io/otel/metric"
)
// AppMetrics holds all the application metrics
type AppMetrics struct {
metric.Meter
ActivePeers metric.Int64UpDownCounter
PeerConnectionDuration metric.Int64Histogram
Registrations metric.Int64Counter
Deregistrations metric.Int64Counter
RegistrationFailures metric.Int64Counter
RegistrationDelay metric.Float64Histogram
MessagesForwarded metric.Int64Counter
MessageForwardFailures metric.Int64Counter
MessageForwardLatency metric.Float64Histogram
}
func NewAppMetrics(meter metric.Meter) (*AppMetrics, error) {
activePeers, err := meter.Int64UpDownCounter("active_peers")
if err != nil {
return nil, err
}
peerConnectionDuration, err := meter.Int64Histogram("peer_connection_duration_seconds",
metric.WithExplicitBucketBoundaries(getPeerConnectionDurationBucketBoundaries()...))
if err != nil {
return nil, err
}
registrations, err := meter.Int64Counter("registrations_total")
if err != nil {
return nil, err
}
deregistrations, err := meter.Int64Counter("deregistrations_total")
if err != nil {
return nil, err
}
registrationFailures, err := meter.Int64Counter("registration_failures_total")
if err != nil {
return nil, err
}
registrationDelay, err := meter.Float64Histogram("registration_delay_milliseconds",
metric.WithExplicitBucketBoundaries(getStandardBucketBoundaries()...))
if err != nil {
return nil, err
}
messagesForwarded, err := meter.Int64Counter("messages_forwarded_total")
if err != nil {
return nil, err
}
messageForwardFailures, err := meter.Int64Counter("message_forward_failures_total")
if err != nil {
return nil, err
}
messageForwardLatency, err := meter.Float64Histogram("message_forward_latency_milliseconds",
metric.WithExplicitBucketBoundaries(getStandardBucketBoundaries()...))
if err != nil {
return nil, err
}
return &AppMetrics{
Meter: meter,
ActivePeers: activePeers,
PeerConnectionDuration: peerConnectionDuration,
Registrations: registrations,
Deregistrations: deregistrations,
RegistrationFailures: registrationFailures,
RegistrationDelay: registrationDelay,
MessagesForwarded: messagesForwarded,
MessageForwardFailures: messageForwardFailures,
MessageForwardLatency: messageForwardLatency,
}, nil
}
func getStandardBucketBoundaries() []float64 {
return []float64{
0.1,
0.5,
1,
5,
10,
50,
100,
500,
1000,
5000,
10000,
}
}
func getPeerConnectionDurationBucketBoundaries() []float64 {
return []float64{
1,
60,
// 10m
600,
// 1h
3600,
// 2h,
7200,
// 6h,
21600,
// 12h,
43200,
// 24h,
86400,
// 48h,
172800,
}
}

74
signal/metrics/metrics.go Normal file
View File

@ -0,0 +1,74 @@
package metrics
import (
"context"
"fmt"
"net/http"
"reflect"
prometheus2 "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/prometheus"
api "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric"
)
const defaultEndpoint = "/metrics"
// Metrics holds the metrics information and exposes it
type Metrics struct {
Meter api.Meter
provider *metric.MeterProvider
Endpoint string
*http.Server
}
// NewServer initializes and returns a new Metrics instance
func NewServer(port int, endpoint string) *Metrics {
exporter, err := prometheus.New()
if err != nil {
return nil
}
provider := metric.NewMeterProvider(metric.WithReader(exporter))
otel.SetMeterProvider(provider)
pkg := reflect.TypeOf(defaultEndpoint).PkgPath()
meter := provider.Meter(pkg)
if endpoint == "" {
endpoint = defaultEndpoint
}
router := http.NewServeMux()
router.Handle(endpoint, promhttp.HandlerFor(
prometheus2.DefaultGatherer,
promhttp.HandlerOpts{EnableOpenMetrics: true}))
server := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: router,
}
return &Metrics{
Meter: meter,
provider: provider,
Endpoint: endpoint,
Server: server,
}
}
// Shutdown stops the metrics server
func (m *Metrics) Shutdown(ctx context.Context) error {
if err := m.Server.Shutdown(ctx); err != nil {
return fmt.Errorf("http server: %w", err)
}
if err := m.provider.Shutdown(ctx); err != nil {
return fmt.Errorf("meter provider: %w", err)
}
return nil
}

View File

@ -1,10 +1,14 @@
package peer
import (
"github.com/netbirdio/netbird/signal/proto"
log "github.com/sirupsen/logrus"
"context"
"sync"
"time"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/signal/metrics"
"github.com/netbirdio/netbird/signal/proto"
)
// Peer representation of a connected Peer
@ -33,12 +37,14 @@ type Registry struct {
Peers sync.Map
// regMutex ensures that registration and de-registrations are safe
regMutex sync.Mutex
metrics *metrics.AppMetrics
}
// NewRegistry creates a new connected Peer registry
func NewRegistry() *Registry {
func NewRegistry(metrics *metrics.AppMetrics) *Registry {
return &Registry{
regMutex: sync.Mutex{},
metrics: metrics,
}
}
@ -60,6 +66,8 @@ func (registry *Registry) IsPeerRegistered(peerId string) bool {
// Register registers peer in the registry
func (registry *Registry) Register(peer *Peer) {
start := time.Now()
registry.regMutex.Lock()
defer registry.regMutex.Unlock()
@ -72,6 +80,11 @@ func (registry *Registry) Register(peer *Peer) {
registry.Peers.Store(peer.Id, peer)
}
log.Debugf("peer registered [%s]", peer.Id)
// record time as milliseconds
registry.metrics.RegistrationDelay.Record(context.Background(), float64(time.Since(start).Nanoseconds())/1e6)
registry.metrics.Registrations.Add(context.Background(), 1)
}
// Deregister Peer from the Registry (usually once it disconnects)
@ -90,4 +103,6 @@ func (registry *Registry) Deregister(peer *Peer) {
}
}
log.Debugf("peer deregistered [%s]", peer.Id)
registry.metrics.Deregistrations.Add(context.Background(), 1)
}

View File

@ -1,13 +1,21 @@
package peer
import (
"github.com/stretchr/testify/assert"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"github.com/netbirdio/netbird/signal/metrics"
)
func TestRegistry_ShouldNotDeregisterWhenHasNewerStreamRegistered(t *testing.T) {
r := NewRegistry()
metrics, err := metrics.NewAppMetrics(otel.Meter(""))
require.NoError(t, err)
r := NewRegistry(metrics)
peerID := "peer"
@ -30,7 +38,10 @@ func TestRegistry_ShouldNotDeregisterWhenHasNewerStreamRegistered(t *testing.T)
}
func TestRegistry_GetNonExistentPeer(t *testing.T) {
r := NewRegistry()
metrics, err := metrics.NewAppMetrics(otel.Meter(""))
require.NoError(t, err)
r := NewRegistry(metrics)
peer, ok := r.Get("non_existent_peer")
@ -44,7 +55,10 @@ func TestRegistry_GetNonExistentPeer(t *testing.T) {
}
func TestRegistry_Register(t *testing.T) {
r := NewRegistry()
metrics, err := metrics.NewAppMetrics(otel.Meter(""))
require.NoError(t, err)
r := NewRegistry(metrics)
peer1 := NewPeer("test_peer_1", nil)
peer2 := NewPeer("test_peer_2", nil)
r.Register(peer1)
@ -60,7 +74,10 @@ func TestRegistry_Register(t *testing.T) {
}
func TestRegistry_Deregister(t *testing.T) {
r := NewRegistry()
metrics, err := metrics.NewAppMetrics(otel.Meter(""))
require.NoError(t, err)
r := NewRegistry(metrics)
peer1 := NewPeer("test_peer_1", nil)
peer2 := NewPeer("test_peer_2", nil)
r.Register(peer1)

View File

@ -3,72 +3,114 @@ package server
import (
"context"
"fmt"
"github.com/netbirdio/netbird/signal/peer"
"github.com/netbirdio/netbird/signal/proto"
"io"
"time"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"io"
"github.com/netbirdio/netbird/signal/metrics"
"github.com/netbirdio/netbird/signal/peer"
"github.com/netbirdio/netbird/signal/proto"
)
const (
labelType = "type"
labelTypeError = "error"
labelTypeNotConnected = "not_connected"
labelTypeNotRegistered = "not_registered"
labelError = "error"
labelErrorMissingId = "missing_id"
labelErrorMissingMeta = "missing_meta"
labelErrorFailedHeader = "failed_header"
)
// Server an instance of a Signal server
type Server struct {
registry *peer.Registry
proto.UnimplementedSignalExchangeServer
metrics *metrics.AppMetrics
}
// NewServer creates a new Signal server
func NewServer() *Server {
return &Server{
registry: peer.NewRegistry(),
func NewServer(meter metric.Meter) (*Server, error) {
appMetrics, err := metrics.NewAppMetrics(meter)
if err != nil {
return nil, fmt.Errorf("creating app metrics: %v", err)
}
s := &Server{
registry: peer.NewRegistry(appMetrics),
metrics: appMetrics,
}
return s, nil
}
// Send forwards a message to the signal peer
func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
if !s.registry.IsPeerRegistered(msg.Key) {
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeNotRegistered)))
return nil, fmt.Errorf("peer %s is not registered", msg.Key)
}
if dstPeer, found := s.registry.Get(msg.RemoteKey); found {
//forward the message to the target peer
err := dstPeer.Stream.Send(msg)
if err != nil {
if err := dstPeer.Stream.Send(msg); err != nil {
log.Errorf("error while forwarding message from peer [%s] to peer [%s] %v", msg.Key, msg.RemoteKey, err)
//todo respond to the sender?
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeError)))
} else {
s.metrics.MessagesForwarded.Add(context.Background(), 1)
}
} else {
log.Debugf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", msg.Key, msg.RemoteKey)
//todo respond to the sender?
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeNotConnected)))
}
return &proto.EncryptedMessage{}, nil
}
// ConnectStream connects to the exchange stream
func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer) error {
p, err := s.connectPeer(stream)
if err != nil {
return err
}
startRegister := time.Now()
s.metrics.ActivePeers.Add(stream.Context(), 1)
defer func() {
log.Infof("peer disconnected [%s] [streamID %d] ", p.Id, p.StreamID)
s.registry.Deregister(p)
s.metrics.PeerConnectionDuration.Record(stream.Context(), int64(time.Since(startRegister).Seconds()))
s.metrics.ActivePeers.Add(context.Background(), -1)
}()
//needed to confirm that the peer has been registered so that the client can proceed
header := metadata.Pairs(proto.HeaderRegistered, "1")
err = stream.SendHeader(header)
if err != nil {
s.metrics.RegistrationFailures.Add(stream.Context(), 1, metric.WithAttributes(attribute.String(labelError, labelErrorFailedHeader)))
return err
}
log.Infof("peer connected [%s] [streamID %d] ", p.Id, p.StreamID)
for {
//read incoming messages
msg, err := stream.Recv()
if err == io.EOF {
@ -76,18 +118,28 @@ func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer)
} else if err != nil {
return err
}
start := time.Now()
log.Debugf("received a new message from peer [%s] to peer [%s]", p.Id, msg.RemoteKey)
// lookup the target peer where the message is going to
if dstPeer, found := s.registry.Get(msg.RemoteKey); found {
//forward the message to the target peer
err := dstPeer.Stream.Send(msg)
if err != nil {
if err := dstPeer.Stream.Send(msg); err != nil {
log.Errorf("error while forwarding message from peer [%s] to peer [%s] %v", p.Id, msg.RemoteKey, err)
//todo respond to the sender?
// in milliseconds
s.metrics.MessageForwardLatency.Record(stream.Context(), float64(time.Since(start).Nanoseconds())/1e6)
s.metrics.MessagesForwarded.Add(stream.Context(), 1)
} else {
s.metrics.MessageForwardFailures.Add(stream.Context(), 1, metric.WithAttributes(attribute.String(labelType, labelTypeError)))
}
} else {
log.Debugf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", p.Id, msg.RemoteKey)
//todo respond to the sender?
s.metrics.MessageForwardFailures.Add(stream.Context(), 1, metric.WithAttributes(attribute.String(labelType, labelTypeNotConnected)))
}
}
<-stream.Context().Done()
@ -101,12 +153,16 @@ func (s Server) connectPeer(stream proto.SignalExchange_ConnectStreamServer) (*p
if meta, hasMeta := metadata.FromIncomingContext(stream.Context()); hasMeta {
if id, found := meta[proto.HeaderId]; found {
p := peer.NewPeer(id[0], stream)
s.registry.Register(p)
return p, nil
} else {
s.metrics.RegistrationFailures.Add(stream.Context(), 1, metric.WithAttributes(attribute.String(labelError, labelErrorMissingId)))
return nil, status.Errorf(codes.FailedPrecondition, "missing connection header: "+proto.HeaderId)
}
} else {
s.metrics.RegistrationFailures.Add(stream.Context(), 1, metric.WithAttributes(attribute.String(labelError, labelErrorMissingMeta)))
return nil, status.Errorf(codes.FailedPrecondition, "missing connection stream meta")
}
}