From b41f36fccd6f1e1f0901c8550add2daa5f69a692 Mon Sep 17 00:00:00 2001 From: Misha Bragin Date: Sat, 22 Oct 2022 15:06:54 +0200 Subject: [PATCH] Add gRPC metrics (#522) --- client/cmd/testutil.go | 2 +- client/internal/engine_test.go | 2 +- management/client/client_test.go | 2 +- management/cmd/management.go | 2 +- management/server/grpcserver.go | 25 ++++++- management/server/management_proto_test.go | 2 +- management/server/management_test.go | 2 +- management/server/telemetry/app_metrics.go | 23 ++++++- management/server/telemetry/grpc_metrics.go | 76 +++++++++++++++++++++ util/file.go | 7 +- 10 files changed, 131 insertions(+), 12 deletions(-) create mode 100644 management/server/telemetry/grpc_metrics.go diff --git a/client/cmd/testutil.go b/client/cmd/testutil.go index b3e4a1610..2ded1cac0 100644 --- a/client/cmd/testutil.go +++ b/client/cmd/testutil.go @@ -73,7 +73,7 @@ func startManagement(t *testing.T, config *mgmt.Config) (*grpc.Server, net.Liste t.Fatal(err) } turnManager := mgmt.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig) - mgmtServer, err := mgmt.NewServer(config, accountManager, peersUpdateManager, turnManager) + mgmtServer, err := mgmt.NewServer(config, accountManager, peersUpdateManager, turnManager, nil) if err != nil { t.Fatal(err) } diff --git a/client/internal/engine_test.go b/client/internal/engine_test.go index d111af76e..279d9c14f 100644 --- a/client/internal/engine_test.go +++ b/client/internal/engine_test.go @@ -766,7 +766,7 @@ func startManagement(port int, dataDir string) (*grpc.Server, error) { return nil, err } turnManager := server.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig) - mgmtServer, err := server.NewServer(config, accountManager, peersUpdateManager, turnManager) + mgmtServer, err := server.NewServer(config, accountManager, peersUpdateManager, turnManager, nil) if err != nil { return nil, err } diff --git a/management/client/client_test.go b/management/client/client_test.go index 8c0af487b..48e1f2be8 100644 --- a/management/client/client_test.go +++ b/management/client/client_test.go @@ -60,7 +60,7 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) { t.Fatal(err) } turnManager := mgmt.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig) - mgmtServer, err := mgmt.NewServer(config, accountManager, peersUpdateManager, turnManager) + mgmtServer, err := mgmt.NewServer(config, accountManager, peersUpdateManager, turnManager, nil) if err != nil { t.Fatal(err) } diff --git a/management/cmd/management.go b/management/cmd/management.go index ea8e73c88..0aacfe576 100644 --- a/management/cmd/management.go +++ b/management/cmd/management.go @@ -173,7 +173,7 @@ var ( } gRPCAPIHandler := grpc.NewServer(gRPCOpts...) - srv, err := server.NewServer(config, accountManager, peersUpdateManager, turnManager) + srv, err := server.NewServer(config, accountManager, peersUpdateManager, turnManager, appMetrics) if err != nil { return fmt.Errorf("failed creating gRPC API handler: %v", err) } diff --git a/management/server/grpcserver.go b/management/server/grpcserver.go index e0e4ea3f9..d7418fd49 100644 --- a/management/server/grpcserver.go +++ b/management/server/grpcserver.go @@ -3,6 +3,7 @@ package server import ( "context" "fmt" + "github.com/netbirdio/netbird/management/server/telemetry" "github.com/netbirdio/netbird/route" gPeer "google.golang.org/grpc/peer" "strings" @@ -30,10 +31,12 @@ type GRPCServer struct { config *Config turnCredentialsManager TURNCredentialsManager jwtMiddleware *middleware.JWTMiddleware + appMetrics telemetry.AppMetrics } // NewServer creates a new Management server -func NewServer(config *Config, accountManager AccountManager, peersUpdateManager *PeersUpdateManager, turnCredentialsManager TURNCredentialsManager) (*GRPCServer, error) { +func NewServer(config *Config, accountManager AccountManager, peersUpdateManager *PeersUpdateManager, + turnCredentialsManager TURNCredentialsManager, appMetrics telemetry.AppMetrics) (*GRPCServer, error) { key, err := wgtypes.GeneratePrivateKey() if err != nil { return nil, err @@ -53,6 +56,16 @@ func NewServer(config *Config, accountManager AccountManager, peersUpdateManager log.Debug("unable to use http config to create new jwt middleware") } + if appMetrics != nil { + // update gauge based on number of connected peers which is equal to open gRPC streams + err = appMetrics.GRPCMetrics().RegisterConnectedStreams(func() int64 { + return int64(len(peersUpdateManager.peerChannels)) + }) + if err != nil { + return nil, err + } + } + return &GRPCServer{ wgKey: key, // peerKey -> event channel @@ -61,11 +74,15 @@ func NewServer(config *Config, accountManager AccountManager, peersUpdateManager config: config, turnCredentialsManager: turnCredentialsManager, jwtMiddleware: jwtMiddleware, + appMetrics: appMetrics, }, nil } func (s *GRPCServer) GetServerKey(ctx context.Context, req *proto.Empty) (*proto.ServerKeyResponse, error) { // todo introduce something more meaningful with the key expiration/rotation + if s.appMetrics != nil { + s.appMetrics.GRPCMetrics().CountGetKeyRequest() + } now := time.Now().Add(24 * time.Hour) secs := int64(now.Second()) nanos := int32(now.Nanosecond()) @@ -80,6 +97,9 @@ func (s *GRPCServer) GetServerKey(ctx context.Context, req *proto.Empty) (*proto // Sync validates the existence of a connecting peer, sends an initial state (all available for the connecting peers) and // notifies the connected peer of any updates (e.g. new peers under the same account) func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error { + if s.appMetrics != nil { + s.appMetrics.GRPCMetrics().CountSyncRequest() + } p, ok := gRPCPeer.FromContext(srv.Context()) if ok { log.Debugf("Sync request from peer [%s] [%s]", req.WgPubKey, p.Addr.String()) @@ -259,6 +279,9 @@ func (s *GRPCServer) registerPeer(peerKey wgtypes.Key, req *proto.LoginRequest) // In case it isn't, the endpoint checks whether setup key is provided within the request and tries to register a peer. // In case of the successful registration login is also successful func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto.EncryptedMessage, error) { + if s.appMetrics != nil { + s.appMetrics.GRPCMetrics().CountLoginRequest() + } p, ok := gRPCPeer.FromContext(ctx) if ok { log.Debugf("Login request from peer [%s] [%s]", req.WgPubKey, p.Addr.String()) diff --git a/management/server/management_proto_test.go b/management/server/management_proto_test.go index ae26eb486..74fb0181b 100644 --- a/management/server/management_proto_test.go +++ b/management/server/management_proto_test.go @@ -408,7 +408,7 @@ func startManagement(t *testing.T, port int, config *Config) (*grpc.Server, erro return nil, err } turnManager := NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig) - mgmtServer, err := NewServer(config, accountManager, peersUpdateManager, turnManager) + mgmtServer, err := NewServer(config, accountManager, peersUpdateManager, turnManager, nil) if err != nil { return nil, err } diff --git a/management/server/management_test.go b/management/server/management_test.go index 8a2d0a1c6..c71dc16ca 100644 --- a/management/server/management_test.go +++ b/management/server/management_test.go @@ -498,7 +498,7 @@ func startServer(config *server.Config) (*grpc.Server, net.Listener) { log.Fatalf("failed creating a manager: %v", err) } turnManager := server.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig) - mgmtServer, err := server.NewServer(config, accountManager, peersUpdateManager, turnManager) + mgmtServer, err := server.NewServer(config, accountManager, peersUpdateManager, turnManager, nil) Expect(err).NotTo(HaveOccurred()) mgmtProto.RegisterManagementServiceServer(s, mgmtServer) go func() { diff --git a/management/server/telemetry/app_metrics.go b/management/server/telemetry/app_metrics.go index 54bde57cb..8686a0b3d 100644 --- a/management/server/telemetry/app_metrics.go +++ b/management/server/telemetry/app_metrics.go @@ -24,6 +24,7 @@ type MockAppMetrics struct { ExposeFunc func(port int, endpoint string) error IDPMetricsFunc func() *IDPMetrics HTTPMiddlewareFunc func() *HTTPMiddleware + GRPCMetricsFunc func() *GRPCMetrics } // GetMeter mocks the GetMeter function of the AppMetrics interface @@ -66,6 +67,14 @@ func (mock *MockAppMetrics) HTTPMiddleware() *HTTPMiddleware { return nil } +// GRPCMetrics mocks the GRPCMetrics function of the IDPMetrics interface +func (mock *MockAppMetrics) GRPCMetrics() *GRPCMetrics { + if mock.GRPCMetricsFunc != nil { + return mock.GRPCMetricsFunc() + } + return nil +} + // AppMetrics is metrics interface type AppMetrics interface { GetMeter() metric2.Meter @@ -73,6 +82,7 @@ type AppMetrics interface { Expose(port int, endpoint string) error IDPMetrics() *IDPMetrics HTTPMiddleware() *HTTPMiddleware + GRPCMetrics() *GRPCMetrics } // defaultAppMetrics are core application metrics based on OpenTelemetry https://opentelemetry.io/ @@ -83,6 +93,7 @@ type defaultAppMetrics struct { ctx context.Context idpMetrics *IDPMetrics httpMiddleware *HTTPMiddleware + grpcMetrics *GRPCMetrics } // IDPMetrics returns metrics for the idp package @@ -95,6 +106,11 @@ func (appMetrics *defaultAppMetrics) HTTPMiddleware() *HTTPMiddleware { return appMetrics.httpMiddleware } +// GRPCMetrics returns metrics for the gRPC api +func (appMetrics *defaultAppMetrics) GRPCMetrics() *GRPCMetrics { + return appMetrics.grpcMetrics +} + // Close stop application metrics HTTP handler and closes listener. func (appMetrics *defaultAppMetrics) Close() error { if appMetrics.listener == nil { @@ -155,6 +171,11 @@ func NewDefaultAppMetrics(ctx context.Context) (AppMetrics, error) { if err != nil { return nil, err } + grpcMetrics, err := NewGRPCMetrics(ctx, meter) + if err != nil { + return nil, err + } - return &defaultAppMetrics{Meter: meter, ctx: ctx, idpMetrics: idpMetrics, httpMiddleware: middleware}, nil + return &defaultAppMetrics{Meter: meter, ctx: ctx, idpMetrics: idpMetrics, httpMiddleware: middleware, + grpcMetrics: grpcMetrics}, nil } diff --git a/management/server/telemetry/grpc_metrics.go b/management/server/telemetry/grpc_metrics.go new file mode 100644 index 000000000..f4495ce4e --- /dev/null +++ b/management/server/telemetry/grpc_metrics.go @@ -0,0 +1,76 @@ +package telemetry + +import ( + "context" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/metric/instrument/asyncint64" + "go.opentelemetry.io/otel/metric/instrument/syncint64" +) + +// GRPCMetrics are gRPC server metrics +type GRPCMetrics struct { + meter metric.Meter + syncRequestsCounter syncint64.Counter + loginRequestsCounter syncint64.Counter + getKeyRequestsCounter syncint64.Counter + activeStreamsGauge asyncint64.Gauge + ctx context.Context +} + +// NewGRPCMetrics creates new GRPCMetrics struct and registers common metrics of the gRPC server +func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, error) { + syncRequestsCounter, err := meter.SyncInt64().Counter("management.grpc.sync.request.counter", instrument.WithUnit("1")) + if err != nil { + return nil, err + } + loginRequestsCounter, err := meter.SyncInt64().Counter("management.grpc.login.request.counter", instrument.WithUnit("1")) + if err != nil { + return nil, err + } + getKeyRequestsCounter, err := meter.SyncInt64().Counter("management.grpc.key.request.counter", instrument.WithUnit("1")) + if err != nil { + return nil, err + } + + activeStreamsGauge, err := meter.AsyncInt64().Gauge("management.grpc.connected.streams", instrument.WithUnit("1")) + if err != nil { + return nil, err + } + + return &GRPCMetrics{ + meter: meter, + syncRequestsCounter: syncRequestsCounter, + loginRequestsCounter: loginRequestsCounter, + getKeyRequestsCounter: getKeyRequestsCounter, + activeStreamsGauge: activeStreamsGauge, + ctx: ctx, + }, err +} + +// CountSyncRequest counts the number of gRPC sync requests coming to the gRPC API +func (grpcMetrics *GRPCMetrics) CountSyncRequest() { + grpcMetrics.syncRequestsCounter.Add(grpcMetrics.ctx, 1) +} + +// CountGetKeyRequest counts the number of gRPC get server key requests coming to the gRPC API +func (grpcMetrics *GRPCMetrics) CountGetKeyRequest() { + grpcMetrics.getKeyRequestsCounter.Add(grpcMetrics.ctx, 1) +} + +// CountLoginRequest counts the number of gRPC login requests coming to the gRPC API +func (grpcMetrics *GRPCMetrics) CountLoginRequest() { + grpcMetrics.loginRequestsCounter.Add(grpcMetrics.ctx, 1) +} + +// RegisterConnectedStreams registers a function that collects number of active streams and feeds it to the metrics gauge. +func (grpcMetrics *GRPCMetrics) RegisterConnectedStreams(producer func() int64) error { + return grpcMetrics.meter.RegisterCallback( + []instrument.Asynchronous{ + grpcMetrics.activeStreamsGauge, + }, + func(ctx context.Context) { + grpcMetrics.activeStreamsGauge.Observe(ctx, producer()) + }, + ) +} diff --git a/util/file.go b/util/file.go index 0d5461f4f..ff9a989b1 100644 --- a/util/file.go +++ b/util/file.go @@ -3,7 +3,6 @@ package util import ( "encoding/json" "io" - "io/ioutil" "os" "path/filepath" ) @@ -24,7 +23,7 @@ func WriteJson(file string, obj interface{}) error { return err } - tempFile, err := ioutil.TempFile(configDir, ".*"+configFileName) + tempFile, err := os.CreateTemp(configDir, ".*"+configFileName) if err != nil { return err } @@ -43,7 +42,7 @@ func WriteJson(file string, obj interface{}) error { } }() - err = ioutil.WriteFile(tempFileName, bs, 0600) + err = os.WriteFile(tempFileName, bs, 0600) if err != nil { return err } @@ -65,7 +64,7 @@ func ReadJson(file string, res interface{}) (interface{}, error) { } defer f.Close() - bs, err := ioutil.ReadAll(f) + bs, err := io.ReadAll(f) if err != nil { return nil, err }