Add gRPC metrics (#522)

This commit is contained in:
Misha Bragin 2022-10-22 15:06:54 +02:00 committed by GitHub
parent d2cde4a040
commit b41f36fccd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 131 additions and 12 deletions

View File

@ -73,7 +73,7 @@ func startManagement(t *testing.T, config *mgmt.Config) (*grpc.Server, net.Liste
t.Fatal(err)
}
turnManager := mgmt.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig)
mgmtServer, err := mgmt.NewServer(config, accountManager, peersUpdateManager, turnManager)
mgmtServer, err := mgmt.NewServer(config, accountManager, peersUpdateManager, turnManager, nil)
if err != nil {
t.Fatal(err)
}

View File

@ -766,7 +766,7 @@ func startManagement(port int, dataDir string) (*grpc.Server, error) {
return nil, err
}
turnManager := server.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig)
mgmtServer, err := server.NewServer(config, accountManager, peersUpdateManager, turnManager)
mgmtServer, err := server.NewServer(config, accountManager, peersUpdateManager, turnManager, nil)
if err != nil {
return nil, err
}

View File

@ -60,7 +60,7 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
t.Fatal(err)
}
turnManager := mgmt.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig)
mgmtServer, err := mgmt.NewServer(config, accountManager, peersUpdateManager, turnManager)
mgmtServer, err := mgmt.NewServer(config, accountManager, peersUpdateManager, turnManager, nil)
if err != nil {
t.Fatal(err)
}

View File

@ -173,7 +173,7 @@ var (
}
gRPCAPIHandler := grpc.NewServer(gRPCOpts...)
srv, err := server.NewServer(config, accountManager, peersUpdateManager, turnManager)
srv, err := server.NewServer(config, accountManager, peersUpdateManager, turnManager, appMetrics)
if err != nil {
return fmt.Errorf("failed creating gRPC API handler: %v", err)
}

View File

@ -3,6 +3,7 @@ package server
import (
"context"
"fmt"
"github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/route"
gPeer "google.golang.org/grpc/peer"
"strings"
@ -30,10 +31,12 @@ type GRPCServer struct {
config *Config
turnCredentialsManager TURNCredentialsManager
jwtMiddleware *middleware.JWTMiddleware
appMetrics telemetry.AppMetrics
}
// NewServer creates a new Management server
func NewServer(config *Config, accountManager AccountManager, peersUpdateManager *PeersUpdateManager, turnCredentialsManager TURNCredentialsManager) (*GRPCServer, error) {
func NewServer(config *Config, accountManager AccountManager, peersUpdateManager *PeersUpdateManager,
turnCredentialsManager TURNCredentialsManager, appMetrics telemetry.AppMetrics) (*GRPCServer, error) {
key, err := wgtypes.GeneratePrivateKey()
if err != nil {
return nil, err
@ -53,6 +56,16 @@ func NewServer(config *Config, accountManager AccountManager, peersUpdateManager
log.Debug("unable to use http config to create new jwt middleware")
}
if appMetrics != nil {
// update gauge based on number of connected peers which is equal to open gRPC streams
err = appMetrics.GRPCMetrics().RegisterConnectedStreams(func() int64 {
return int64(len(peersUpdateManager.peerChannels))
})
if err != nil {
return nil, err
}
}
return &GRPCServer{
wgKey: key,
// peerKey -> event channel
@ -61,11 +74,15 @@ func NewServer(config *Config, accountManager AccountManager, peersUpdateManager
config: config,
turnCredentialsManager: turnCredentialsManager,
jwtMiddleware: jwtMiddleware,
appMetrics: appMetrics,
}, nil
}
func (s *GRPCServer) GetServerKey(ctx context.Context, req *proto.Empty) (*proto.ServerKeyResponse, error) {
// todo introduce something more meaningful with the key expiration/rotation
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountGetKeyRequest()
}
now := time.Now().Add(24 * time.Hour)
secs := int64(now.Second())
nanos := int32(now.Nanosecond())
@ -80,6 +97,9 @@ func (s *GRPCServer) GetServerKey(ctx context.Context, req *proto.Empty) (*proto
// Sync validates the existence of a connecting peer, sends an initial state (all available for the connecting peers) and
// notifies the connected peer of any updates (e.g. new peers under the same account)
func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error {
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountSyncRequest()
}
p, ok := gRPCPeer.FromContext(srv.Context())
if ok {
log.Debugf("Sync request from peer [%s] [%s]", req.WgPubKey, p.Addr.String())
@ -259,6 +279,9 @@ func (s *GRPCServer) registerPeer(peerKey wgtypes.Key, req *proto.LoginRequest)
// In case it isn't, the endpoint checks whether setup key is provided within the request and tries to register a peer.
// In case of the successful registration login is also successful
func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountLoginRequest()
}
p, ok := gRPCPeer.FromContext(ctx)
if ok {
log.Debugf("Login request from peer [%s] [%s]", req.WgPubKey, p.Addr.String())

View File

@ -408,7 +408,7 @@ func startManagement(t *testing.T, port int, config *Config) (*grpc.Server, erro
return nil, err
}
turnManager := NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig)
mgmtServer, err := NewServer(config, accountManager, peersUpdateManager, turnManager)
mgmtServer, err := NewServer(config, accountManager, peersUpdateManager, turnManager, nil)
if err != nil {
return nil, err
}

View File

@ -498,7 +498,7 @@ func startServer(config *server.Config) (*grpc.Server, net.Listener) {
log.Fatalf("failed creating a manager: %v", err)
}
turnManager := server.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig)
mgmtServer, err := server.NewServer(config, accountManager, peersUpdateManager, turnManager)
mgmtServer, err := server.NewServer(config, accountManager, peersUpdateManager, turnManager, nil)
Expect(err).NotTo(HaveOccurred())
mgmtProto.RegisterManagementServiceServer(s, mgmtServer)
go func() {

View File

@ -24,6 +24,7 @@ type MockAppMetrics struct {
ExposeFunc func(port int, endpoint string) error
IDPMetricsFunc func() *IDPMetrics
HTTPMiddlewareFunc func() *HTTPMiddleware
GRPCMetricsFunc func() *GRPCMetrics
}
// GetMeter mocks the GetMeter function of the AppMetrics interface
@ -66,6 +67,14 @@ func (mock *MockAppMetrics) HTTPMiddleware() *HTTPMiddleware {
return nil
}
// GRPCMetrics mocks the GRPCMetrics function of the IDPMetrics interface
func (mock *MockAppMetrics) GRPCMetrics() *GRPCMetrics {
if mock.GRPCMetricsFunc != nil {
return mock.GRPCMetricsFunc()
}
return nil
}
// AppMetrics is metrics interface
type AppMetrics interface {
GetMeter() metric2.Meter
@ -73,6 +82,7 @@ type AppMetrics interface {
Expose(port int, endpoint string) error
IDPMetrics() *IDPMetrics
HTTPMiddleware() *HTTPMiddleware
GRPCMetrics() *GRPCMetrics
}
// defaultAppMetrics are core application metrics based on OpenTelemetry https://opentelemetry.io/
@ -83,6 +93,7 @@ type defaultAppMetrics struct {
ctx context.Context
idpMetrics *IDPMetrics
httpMiddleware *HTTPMiddleware
grpcMetrics *GRPCMetrics
}
// IDPMetrics returns metrics for the idp package
@ -95,6 +106,11 @@ func (appMetrics *defaultAppMetrics) HTTPMiddleware() *HTTPMiddleware {
return appMetrics.httpMiddleware
}
// GRPCMetrics returns metrics for the gRPC api
func (appMetrics *defaultAppMetrics) GRPCMetrics() *GRPCMetrics {
return appMetrics.grpcMetrics
}
// Close stop application metrics HTTP handler and closes listener.
func (appMetrics *defaultAppMetrics) Close() error {
if appMetrics.listener == nil {
@ -155,6 +171,11 @@ func NewDefaultAppMetrics(ctx context.Context) (AppMetrics, error) {
if err != nil {
return nil, err
}
grpcMetrics, err := NewGRPCMetrics(ctx, meter)
if err != nil {
return nil, err
}
return &defaultAppMetrics{Meter: meter, ctx: ctx, idpMetrics: idpMetrics, httpMiddleware: middleware}, nil
return &defaultAppMetrics{Meter: meter, ctx: ctx, idpMetrics: idpMetrics, httpMiddleware: middleware,
grpcMetrics: grpcMetrics}, nil
}

View File

@ -0,0 +1,76 @@
package telemetry
import (
"context"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/asyncint64"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
)
// GRPCMetrics are gRPC server metrics
type GRPCMetrics struct {
meter metric.Meter
syncRequestsCounter syncint64.Counter
loginRequestsCounter syncint64.Counter
getKeyRequestsCounter syncint64.Counter
activeStreamsGauge asyncint64.Gauge
ctx context.Context
}
// NewGRPCMetrics creates new GRPCMetrics struct and registers common metrics of the gRPC server
func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, error) {
syncRequestsCounter, err := meter.SyncInt64().Counter("management.grpc.sync.request.counter", instrument.WithUnit("1"))
if err != nil {
return nil, err
}
loginRequestsCounter, err := meter.SyncInt64().Counter("management.grpc.login.request.counter", instrument.WithUnit("1"))
if err != nil {
return nil, err
}
getKeyRequestsCounter, err := meter.SyncInt64().Counter("management.grpc.key.request.counter", instrument.WithUnit("1"))
if err != nil {
return nil, err
}
activeStreamsGauge, err := meter.AsyncInt64().Gauge("management.grpc.connected.streams", instrument.WithUnit("1"))
if err != nil {
return nil, err
}
return &GRPCMetrics{
meter: meter,
syncRequestsCounter: syncRequestsCounter,
loginRequestsCounter: loginRequestsCounter,
getKeyRequestsCounter: getKeyRequestsCounter,
activeStreamsGauge: activeStreamsGauge,
ctx: ctx,
}, err
}
// CountSyncRequest counts the number of gRPC sync requests coming to the gRPC API
func (grpcMetrics *GRPCMetrics) CountSyncRequest() {
grpcMetrics.syncRequestsCounter.Add(grpcMetrics.ctx, 1)
}
// CountGetKeyRequest counts the number of gRPC get server key requests coming to the gRPC API
func (grpcMetrics *GRPCMetrics) CountGetKeyRequest() {
grpcMetrics.getKeyRequestsCounter.Add(grpcMetrics.ctx, 1)
}
// CountLoginRequest counts the number of gRPC login requests coming to the gRPC API
func (grpcMetrics *GRPCMetrics) CountLoginRequest() {
grpcMetrics.loginRequestsCounter.Add(grpcMetrics.ctx, 1)
}
// RegisterConnectedStreams registers a function that collects number of active streams and feeds it to the metrics gauge.
func (grpcMetrics *GRPCMetrics) RegisterConnectedStreams(producer func() int64) error {
return grpcMetrics.meter.RegisterCallback(
[]instrument.Asynchronous{
grpcMetrics.activeStreamsGauge,
},
func(ctx context.Context) {
grpcMetrics.activeStreamsGauge.Observe(ctx, producer())
},
)
}

View File

@ -3,7 +3,6 @@ package util
import (
"encoding/json"
"io"
"io/ioutil"
"os"
"path/filepath"
)
@ -24,7 +23,7 @@ func WriteJson(file string, obj interface{}) error {
return err
}
tempFile, err := ioutil.TempFile(configDir, ".*"+configFileName)
tempFile, err := os.CreateTemp(configDir, ".*"+configFileName)
if err != nil {
return err
}
@ -43,7 +42,7 @@ func WriteJson(file string, obj interface{}) error {
}
}()
err = ioutil.WriteFile(tempFileName, bs, 0600)
err = os.WriteFile(tempFileName, bs, 0600)
if err != nil {
return err
}
@ -65,7 +64,7 @@ func ReadJson(file string, res interface{}) (interface{}, error) {
}
defer f.Close()
bs, err := ioutil.ReadAll(f)
bs, err := io.ReadAll(f)
if err != nil {
return nil, err
}