diff --git a/management/server/grpcserver.go b/management/server/grpcserver.go index 32b553f9b..383cb0d1f 100644 --- a/management/server/grpcserver.go +++ b/management/server/grpcserver.go @@ -159,6 +159,11 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi select { // condition when there are some updates case update, open := <-updates: + + if s.appMetrics != nil { + s.appMetrics.GRPCMetrics().UpdateChannelQueueLength(len(updates) + 1) + } + if !open { log.Debugf("updates channel for peer %s was closed", peerKey.String()) s.cancelPeerRoutines(peer) diff --git a/management/server/telemetry/grpc_metrics.go b/management/server/telemetry/grpc_metrics.go index 4ca592179..25789f5c7 100644 --- a/management/server/telemetry/grpc_metrics.go +++ b/management/server/telemetry/grpc_metrics.go @@ -19,6 +19,7 @@ type GRPCMetrics struct { activeStreamsGauge asyncint64.Gauge syncRequestDuration syncint64.Histogram loginRequestDuration syncint64.Histogram + channelQueueLength syncint64.Histogram ctx context.Context } @@ -52,6 +53,18 @@ func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, erro return nil, err } + // We use histogram here as we have multiple channel at the same time and we want to see a slice at any given time + // Then we should be able to extract min, manx, mean and the percentiles. + // TODO(yury): This needs custom bucketing as we are interested in the values from 0 to server.channelBufferSize (100) + channelQueue, err := meter.SyncInt64().Histogram( + "management.grpc.updatechannel.queue", + instrument.WithDescription("Number of update messages in the channel queue"), + instrument.WithUnit("length"), + ) + if err != nil { + return nil, err + } + return &GRPCMetrics{ meter: meter, syncRequestsCounter: syncRequestsCounter, @@ -60,6 +73,7 @@ func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, erro activeStreamsGauge: activeStreamsGauge, syncRequestDuration: syncRequestDuration, loginRequestDuration: loginRequestDuration, + channelQueueLength: channelQueue, ctx: ctx, }, err } @@ -100,3 +114,8 @@ func (grpcMetrics *GRPCMetrics) RegisterConnectedStreams(producer func() int64) }, ) } + +// UpdateChannelQueueLength update the histogram that keep distribution of the update messages channel queue +func (metrics *GRPCMetrics) UpdateChannelQueueLength(len int) { + metrics.channelQueueLength.Record(metrics.ctx, int64(len)) +}