Add metrics for PeersUpdateManager (#1310)

With this change we should be able to collect and expose the following histograms:

* `management.updatechannel.create.duration.ms`  with `closed` boolean label
* `management.updatechannel.create.duration.micro` with `closed` boolean label
* `management.updatechannel.close.one.duration.ms`
* `management.updatechannel.close.one.duration.micro`
* `management.updatechannel.close.multiple.duration.ms`
* `management.updatechannel.close.multiple.duration.micro`
* `management.updatechannel.close.multiple.channels`
* `management.updatechannel.send.duration.ms` with `found` and `dropped` boolean labels
* `management.updatechannel.send.duration.micro` with `found` and `dropped` boolean labels
* `management.updatechannel.get.all.duration.ms`
* `management.updatechannel.get.all.duration.micro`
* `management.updatechannel.get.all.peers`
This commit is contained in:
Yury Gargay 2023-11-16 18:21:52 +01:00 committed by GitHub
parent 456aaf2868
commit b58094de0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 266 additions and 41 deletions

View File

@ -73,7 +73,7 @@ func startManagement(t *testing.T, config *mgmt.Config) (*grpc.Server, net.Liste
t.Fatal(err) t.Fatal(err)
} }
peersUpdateManager := mgmt.NewPeersUpdateManager() peersUpdateManager := mgmt.NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{} eventStore := &activity.InMemoryEventStore{}
if err != nil { if err != nil {
return nil, nil return nil, nil

View File

@ -1044,7 +1044,7 @@ func startManagement(dataDir string) (*grpc.Server, string, error) {
return nil, "", err return nil, "", err
} }
peersUpdateManager := server.NewPeersUpdateManager() peersUpdateManager := server.NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{} eventStore := &activity.InMemoryEventStore{}
if err != nil { if err != nil {
return nil, "", err return nil, "", err

View File

@ -58,7 +58,7 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
t.Fatal(err) t.Fatal(err)
} }
peersUpdateManager := mgmt.NewPeersUpdateManager() peersUpdateManager := mgmt.NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{} eventStore := &activity.InMemoryEventStore{}
accountManager, err := mgmt.BuildManager(store, peersUpdateManager, nil, "", "", accountManager, err := mgmt.BuildManager(store, peersUpdateManager, nil, "", "",
eventStore, false) eventStore, false)

View File

@ -130,7 +130,7 @@ var (
if err != nil { if err != nil {
return fmt.Errorf("failed creating Store: %s: %v", config.Datadir, err) return fmt.Errorf("failed creating Store: %s: %v", config.Datadir, err)
} }
peersUpdateManager := server.NewPeersUpdateManager() peersUpdateManager := server.NewPeersUpdateManager(appMetrics)
var idpManager idp.Manager var idpManager idp.Manager
if config.IdpManagerConfig != nil { if config.IdpManagerConfig != nil {

View File

@ -2047,7 +2047,7 @@ func createManager(t *testing.T) (*DefaultAccountManager, error) {
return nil, err return nil, err
} }
eventStore := &activity.InMemoryEventStore{} eventStore := &activity.InMemoryEventStore{}
return BuildManager(store, NewPeersUpdateManager(), nil, "", "netbird.cloud", eventStore, false) return BuildManager(store, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, false)
} }
func createStore(t *testing.T) (Store, error) { func createStore(t *testing.T) (Store, error) {

View File

@ -192,7 +192,7 @@ func createDNSManager(t *testing.T) (*DefaultAccountManager, error) {
return nil, err return nil, err
} }
eventStore := &activity.InMemoryEventStore{} eventStore := &activity.InMemoryEventStore{}
return BuildManager(store, NewPeersUpdateManager(), nil, "", "netbird.test", eventStore, false) return BuildManager(store, NewPeersUpdateManager(nil), nil, "", "netbird.test", eventStore, false)
} }
func createDNSStore(t *testing.T) (Store, error) { func createDNSStore(t *testing.T) (Store, error) {

View File

@ -410,7 +410,7 @@ func startManagement(t *testing.T, config *Config) (*grpc.Server, string, error)
if err != nil { if err != nil {
return nil, "", err return nil, "", err
} }
peersUpdateManager := NewPeersUpdateManager() peersUpdateManager := NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{} eventStore := &activity.InMemoryEventStore{}
accountManager, err := BuildManager(store, peersUpdateManager, nil, "", "", accountManager, err := BuildManager(store, peersUpdateManager, nil, "", "",
eventStore, false) eventStore, false)

View File

@ -501,7 +501,7 @@ func startServer(config *server.Config) (*grpc.Server, net.Listener) {
if err != nil { if err != nil {
log.Fatalf("failed creating a store: %s: %v", config.Datadir, err) log.Fatalf("failed creating a store: %s: %v", config.Datadir, err)
} }
peersUpdateManager := server.NewPeersUpdateManager() peersUpdateManager := server.NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{} eventStore := &activity.InMemoryEventStore{}
accountManager, err := server.BuildManager(store, peersUpdateManager, nil, "", "", accountManager, err := server.BuildManager(store, peersUpdateManager, nil, "", "",
eventStore, false) eventStore, false)

View File

@ -747,7 +747,7 @@ func createNSManager(t *testing.T) (*DefaultAccountManager, error) {
return nil, err return nil, err
} }
eventStore := &activity.InMemoryEventStore{} eventStore := &activity.InMemoryEventStore{}
return BuildManager(store, NewPeersUpdateManager(), nil, "", "", eventStore, false) return BuildManager(store, NewPeersUpdateManager(nil), nil, "", "", eventStore, false)
} }
func createNSStore(t *testing.T) (Store, error) { func createNSStore(t *testing.T) (Store, error) {

View File

@ -1013,7 +1013,7 @@ func createRouterManager(t *testing.T) (*DefaultAccountManager, error) {
return nil, err return nil, err
} }
eventStore := &activity.InMemoryEventStore{} eventStore := &activity.InMemoryEventStore{}
return BuildManager(store, NewPeersUpdateManager(), nil, "", "", eventStore, false) return BuildManager(store, NewPeersUpdateManager(nil), nil, "", "", eventStore, false)
} }
func createRouterStore(t *testing.T) (Store, error) { func createRouterStore(t *testing.T) (Store, error) {

View File

@ -20,13 +20,14 @@ const defaultEndpoint = "/metrics"
// MockAppMetrics mocks the AppMetrics interface // MockAppMetrics mocks the AppMetrics interface
type MockAppMetrics struct { type MockAppMetrics struct {
GetMeterFunc func() metric2.Meter GetMeterFunc func() metric2.Meter
CloseFunc func() error CloseFunc func() error
ExposeFunc func(port int, endpoint string) error ExposeFunc func(port int, endpoint string) error
IDPMetricsFunc func() *IDPMetrics IDPMetricsFunc func() *IDPMetrics
HTTPMiddlewareFunc func() *HTTPMiddleware HTTPMiddlewareFunc func() *HTTPMiddleware
GRPCMetricsFunc func() *GRPCMetrics GRPCMetricsFunc func() *GRPCMetrics
StoreMetricsFunc func() *StoreMetrics StoreMetricsFunc func() *StoreMetrics
UpdateChannelMetricsFunc func() *UpdateChannelMetrics
} }
// GetMeter mocks the GetMeter function of the AppMetrics interface // GetMeter mocks the GetMeter function of the AppMetrics interface
@ -85,6 +86,14 @@ func (mock *MockAppMetrics) StoreMetrics() *StoreMetrics {
return nil return nil
} }
// UpdateChannelMetrics mocks the MockAppMetrics function of the UpdateChannelMetrics interface
func (mock *MockAppMetrics) UpdateChannelMetrics() *UpdateChannelMetrics {
if mock.UpdateChannelMetricsFunc != nil {
return mock.UpdateChannelMetricsFunc()
}
return nil
}
// AppMetrics is metrics interface // AppMetrics is metrics interface
type AppMetrics interface { type AppMetrics interface {
GetMeter() metric2.Meter GetMeter() metric2.Meter
@ -94,18 +103,20 @@ type AppMetrics interface {
HTTPMiddleware() *HTTPMiddleware HTTPMiddleware() *HTTPMiddleware
GRPCMetrics() *GRPCMetrics GRPCMetrics() *GRPCMetrics
StoreMetrics() *StoreMetrics StoreMetrics() *StoreMetrics
UpdateChannelMetrics() *UpdateChannelMetrics
} }
// defaultAppMetrics are core application metrics based on OpenTelemetry https://opentelemetry.io/ // defaultAppMetrics are core application metrics based on OpenTelemetry https://opentelemetry.io/
type defaultAppMetrics struct { type defaultAppMetrics struct {
// Meter can be used by different application parts to create counters and measure things // Meter can be used by different application parts to create counters and measure things
Meter metric2.Meter Meter metric2.Meter
listener net.Listener listener net.Listener
ctx context.Context ctx context.Context
idpMetrics *IDPMetrics idpMetrics *IDPMetrics
httpMiddleware *HTTPMiddleware httpMiddleware *HTTPMiddleware
grpcMetrics *GRPCMetrics grpcMetrics *GRPCMetrics
storeMetrics *StoreMetrics storeMetrics *StoreMetrics
updateChannelMetrics *UpdateChannelMetrics
} }
// IDPMetrics returns metrics for the idp package // IDPMetrics returns metrics for the idp package
@ -128,6 +139,11 @@ func (appMetrics *defaultAppMetrics) StoreMetrics() *StoreMetrics {
return appMetrics.storeMetrics return appMetrics.storeMetrics
} }
// UpdateChannelMetrics returns metrics for the updatechannel
func (appMetrics *defaultAppMetrics) UpdateChannelMetrics() *UpdateChannelMetrics {
return appMetrics.updateChannelMetrics
}
// Close stop application metrics HTTP handler and closes listener. // Close stop application metrics HTTP handler and closes listener.
func (appMetrics *defaultAppMetrics) Close() error { func (appMetrics *defaultAppMetrics) Close() error {
if appMetrics.listener == nil { if appMetrics.listener == nil {
@ -199,6 +215,18 @@ func NewDefaultAppMetrics(ctx context.Context) (AppMetrics, error) {
return nil, err return nil, err
} }
return &defaultAppMetrics{Meter: meter, ctx: ctx, idpMetrics: idpMetrics, httpMiddleware: middleware, updateChannelMetrics, err := NewUpdateChannelMetrics(ctx, meter)
grpcMetrics: grpcMetrics, storeMetrics: storeMetrics}, nil if err != nil {
return nil, err
}
return &defaultAppMetrics{
Meter: meter,
ctx: ctx,
idpMetrics: idpMetrics,
httpMiddleware: middleware,
grpcMetrics: grpcMetrics,
storeMetrics: storeMetrics,
updateChannelMetrics: updateChannelMetrics,
}, nil
} }

View File

@ -9,7 +9,7 @@ import (
"go.opentelemetry.io/otel/metric/instrument/syncint64" "go.opentelemetry.io/otel/metric/instrument/syncint64"
) )
// StoreMetrics represents all metrics related to the FileStore // StoreMetrics represents all metrics related to the Store
type StoreMetrics struct { type StoreMetrics struct {
globalLockAcquisitionDurationMicro syncint64.Histogram globalLockAcquisitionDurationMicro syncint64.Histogram
globalLockAcquisitionDurationMs syncint64.Histogram globalLockAcquisitionDurationMs syncint64.Histogram

View File

@ -0,0 +1,141 @@
package telemetry
import (
"context"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
)
// UpdateChannelMetrics represents all metrics related to the UpdateChannel
type UpdateChannelMetrics struct {
createChannelDurationMs syncint64.Histogram
createChannelDurationMicro syncint64.Histogram
closeChannelDurationMs syncint64.Histogram
closeChannelDurationMicro syncint64.Histogram
closeChannelsDurationMs syncint64.Histogram
closeChannelsDurationMicro syncint64.Histogram
closeChannels syncint64.Histogram
sendUpdateDurationMs syncint64.Histogram
sendUpdateDurationMicro syncint64.Histogram
getAllConnectedPeersDurationMs syncint64.Histogram
getAllConnectedPeersDurationMicro syncint64.Histogram
getAllConnectedPeers syncint64.Histogram
ctx context.Context
}
// NewUpdateChannelMetrics creates an instance of UpdateChannel
func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateChannelMetrics, error) {
createChannelDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.create.duration.ms")
if err != nil {
return nil, err
}
createChannelDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.create.duration.micro")
if err != nil {
return nil, err
}
closeChannelDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.close.one.duration.ms")
if err != nil {
return nil, err
}
closeChannelDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.close.one.duration.micro")
if err != nil {
return nil, err
}
closeChannelsDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.close.multiple.duration.ms")
if err != nil {
return nil, err
}
closeChannelsDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.close.multiple.duration.micro")
if err != nil {
return nil, err
}
closeChannels, err := meter.SyncInt64().Histogram("management.updatechannel.close.multiple.channels")
if err != nil {
return nil, err
}
sendUpdateDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.send.duration.ms")
if err != nil {
return nil, err
}
sendUpdateDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.send.duration.micro")
if err != nil {
return nil, err
}
getAllConnectedPeersDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.duration.ms")
if err != nil {
return nil, err
}
getAllConnectedPeersDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.duration.micro")
if err != nil {
return nil, err
}
getAllConnectedPeers, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.peers")
if err != nil {
return nil, err
}
return &UpdateChannelMetrics{
createChannelDurationMs: createChannelDurationMs,
createChannelDurationMicro: createChannelDurationMicro,
closeChannelDurationMs: closeChannelDurationMs,
closeChannelDurationMicro: closeChannelDurationMicro,
closeChannelsDurationMs: closeChannelsDurationMs,
closeChannelsDurationMicro: closeChannelsDurationMicro,
closeChannels: closeChannels,
sendUpdateDurationMs: sendUpdateDurationMs,
sendUpdateDurationMicro: sendUpdateDurationMicro,
getAllConnectedPeersDurationMs: getAllConnectedPeersDurationMs,
getAllConnectedPeersDurationMicro: getAllConnectedPeersDurationMicro,
getAllConnectedPeers: getAllConnectedPeers,
ctx: ctx,
}, nil
}
// CountCreateChannelDuration counts the duration of the CreateChannel method,
// closed indicates if existing channel was closed before creation of a new one
func (metrics *UpdateChannelMetrics) CountCreateChannelDuration(duration time.Duration, closed bool) {
metrics.createChannelDurationMs.Record(metrics.ctx, duration.Milliseconds(), attribute.Bool("closed", closed))
metrics.createChannelDurationMicro.Record(metrics.ctx, duration.Microseconds(), attribute.Bool("closed", closed))
}
// CountCloseChannelDuration counts the duration of the CloseChannel method
func (metrics *UpdateChannelMetrics) CountCloseChannelDuration(duration time.Duration) {
metrics.closeChannelDurationMs.Record(metrics.ctx, duration.Milliseconds())
metrics.closeChannelDurationMicro.Record(metrics.ctx, duration.Microseconds())
}
// CountCloseChannelsDuration counts the duration of the CloseChannels method and the number of channels have been closed
func (metrics *UpdateChannelMetrics) CountCloseChannelsDuration(duration time.Duration, channels int) {
metrics.closeChannelsDurationMs.Record(metrics.ctx, duration.Milliseconds())
metrics.closeChannelsDurationMicro.Record(metrics.ctx, duration.Microseconds())
metrics.closeChannels.Record(metrics.ctx, int64(channels))
}
// CountSendUpdateDuration counts the duration of the SendUpdate method
// found indicates if peer had channel, dropped indicates if the message was dropped due channel buffer overload
func (metrics *UpdateChannelMetrics) CountSendUpdateDuration(duration time.Duration, found, dropped bool) {
attrs := []attribute.KeyValue{attribute.Bool("found", found), attribute.Bool("dropped", dropped)}
metrics.sendUpdateDurationMs.Record(metrics.ctx, duration.Milliseconds(), attrs...)
metrics.sendUpdateDurationMicro.Record(metrics.ctx, duration.Microseconds(), attrs...)
}
// CountGetAllConnectedPeersDuration counts the duration of the GetAllConnectedPeers method and the number of peers have been returned
func (metrics *UpdateChannelMetrics) CountGetAllConnectedPeersDuration(duration time.Duration, peers int) {
metrics.getAllConnectedPeersDurationMs.Record(metrics.ctx, duration.Milliseconds())
metrics.getAllConnectedPeersDurationMicro.Record(metrics.ctx, duration.Microseconds())
metrics.getAllConnectedPeers.Record(metrics.ctx, int64(peers))
}

View File

@ -20,7 +20,7 @@ var TurnTestHost = &Host{
func TestTimeBasedAuthSecretsManager_GenerateCredentials(t *testing.T) { func TestTimeBasedAuthSecretsManager_GenerateCredentials(t *testing.T) {
ttl := util.Duration{Duration: time.Hour} ttl := util.Duration{Duration: time.Hour}
secret := "some_secret" secret := "some_secret"
peersManager := NewPeersUpdateManager() peersManager := NewPeersUpdateManager(nil)
tested := NewTimeBasedAuthSecretsManager(peersManager, &TURNConfig{ tested := NewTimeBasedAuthSecretsManager(peersManager, &TURNConfig{
CredentialsTTL: ttl, CredentialsTTL: ttl,
@ -44,7 +44,7 @@ func TestTimeBasedAuthSecretsManager_GenerateCredentials(t *testing.T) {
func TestTimeBasedAuthSecretsManager_SetupRefresh(t *testing.T) { func TestTimeBasedAuthSecretsManager_SetupRefresh(t *testing.T) {
ttl := util.Duration{Duration: 2 * time.Second} ttl := util.Duration{Duration: 2 * time.Second}
secret := "some_secret" secret := "some_secret"
peersManager := NewPeersUpdateManager() peersManager := NewPeersUpdateManager(nil)
peer := "some_peer" peer := "some_peer"
updateChannel := peersManager.CreateChannel(peer) updateChannel := peersManager.CreateChannel(peer)
@ -93,7 +93,7 @@ loop:
func TestTimeBasedAuthSecretsManager_CancelRefresh(t *testing.T) { func TestTimeBasedAuthSecretsManager_CancelRefresh(t *testing.T) {
ttl := util.Duration{Duration: time.Hour} ttl := util.Duration{Duration: time.Hour}
secret := "some_secret" secret := "some_secret"
peersManager := NewPeersUpdateManager() peersManager := NewPeersUpdateManager(nil)
peer := "some_peer" peer := "some_peer"
tested := NewTimeBasedAuthSecretsManager(peersManager, &TURNConfig{ tested := NewTimeBasedAuthSecretsManager(peersManager, &TURNConfig{

View File

@ -2,10 +2,12 @@ package server
import ( import (
"sync" "sync"
"time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/management/proto" "github.com/netbirdio/netbird/management/proto"
"github.com/netbirdio/netbird/management/server/telemetry"
) )
const channelBufferSize = 100 const channelBufferSize = 100
@ -17,26 +19,41 @@ type UpdateMessage struct {
type PeersUpdateManager struct { type PeersUpdateManager struct {
// peerChannels is an update channel indexed by Peer.ID // peerChannels is an update channel indexed by Peer.ID
peerChannels map[string]chan *UpdateMessage peerChannels map[string]chan *UpdateMessage
channelsMux *sync.Mutex // channelsMux keeps the mutex to access peerChannels
channelsMux *sync.Mutex
// metrics provides method to collect application metrics
metrics telemetry.AppMetrics
} }
// NewPeersUpdateManager returns a new instance of PeersUpdateManager // NewPeersUpdateManager returns a new instance of PeersUpdateManager
func NewPeersUpdateManager() *PeersUpdateManager { func NewPeersUpdateManager(metrics telemetry.AppMetrics) *PeersUpdateManager {
return &PeersUpdateManager{ return &PeersUpdateManager{
peerChannels: make(map[string]chan *UpdateMessage), peerChannels: make(map[string]chan *UpdateMessage),
channelsMux: &sync.Mutex{}, channelsMux: &sync.Mutex{},
metrics: metrics,
} }
} }
// SendUpdate sends update message to the peer's channel // SendUpdate sends update message to the peer's channel
func (p *PeersUpdateManager) SendUpdate(peerID string, update *UpdateMessage) { func (p *PeersUpdateManager) SendUpdate(peerID string, update *UpdateMessage) {
start := time.Now()
var found, dropped bool
p.channelsMux.Lock() p.channelsMux.Lock()
defer p.channelsMux.Unlock() defer func() {
p.channelsMux.Unlock()
if p.metrics != nil {
p.metrics.UpdateChannelMetrics().CountSendUpdateDuration(time.Since(start), found, dropped)
}
}()
if channel, ok := p.peerChannels[peerID]; ok { if channel, ok := p.peerChannels[peerID]; ok {
found = true
select { select {
case channel <- update: case channel <- update:
log.Debugf("update was sent to channel for peer %s", peerID) log.Debugf("update was sent to channel for peer %s", peerID)
default: default:
dropped = true
log.Warnf("channel for peer %s is %d full", peerID, len(channel)) log.Warnf("channel for peer %s is %d full", peerID, len(channel))
} }
} else { } else {
@ -46,10 +63,20 @@ func (p *PeersUpdateManager) SendUpdate(peerID string, update *UpdateMessage) {
// CreateChannel creates a go channel for a given peer used to deliver updates relevant to the peer. // CreateChannel creates a go channel for a given peer used to deliver updates relevant to the peer.
func (p *PeersUpdateManager) CreateChannel(peerID string) chan *UpdateMessage { func (p *PeersUpdateManager) CreateChannel(peerID string) chan *UpdateMessage {
start := time.Now()
closed := false
p.channelsMux.Lock() p.channelsMux.Lock()
defer p.channelsMux.Unlock() defer func() {
p.channelsMux.Unlock()
if p.metrics != nil {
p.metrics.UpdateChannelMetrics().CountCreateChannelDuration(time.Since(start), closed)
}
}()
if channel, ok := p.peerChannels[peerID]; ok { if channel, ok := p.peerChannels[peerID]; ok {
closed = true
delete(p.peerChannels, peerID) delete(p.peerChannels, peerID)
close(channel) close(channel)
} }
@ -58,6 +85,7 @@ func (p *PeersUpdateManager) CreateChannel(peerID string) chan *UpdateMessage {
p.peerChannels[peerID] = channel p.peerChannels[peerID] = channel
log.Debugf("opened updates channel for a peer %s", peerID) log.Debugf("opened updates channel for a peer %s", peerID)
return channel return channel
} }
@ -72,8 +100,16 @@ func (p *PeersUpdateManager) closeChannel(peerID string) {
// CloseChannels closes updates channel for each given peer // CloseChannels closes updates channel for each given peer
func (p *PeersUpdateManager) CloseChannels(peerIDs []string) { func (p *PeersUpdateManager) CloseChannels(peerIDs []string) {
start := time.Now()
p.channelsMux.Lock() p.channelsMux.Lock()
defer p.channelsMux.Unlock() defer func() {
p.channelsMux.Unlock()
if p.metrics != nil {
p.metrics.UpdateChannelMetrics().CountCloseChannelsDuration(time.Since(start), len(peerIDs))
}
}()
for _, id := range peerIDs { for _, id := range peerIDs {
p.closeChannel(id) p.closeChannel(id)
} }
@ -81,18 +117,37 @@ func (p *PeersUpdateManager) CloseChannels(peerIDs []string) {
// CloseChannel closes updates channel of a given peer // CloseChannel closes updates channel of a given peer
func (p *PeersUpdateManager) CloseChannel(peerID string) { func (p *PeersUpdateManager) CloseChannel(peerID string) {
start := time.Now()
p.channelsMux.Lock() p.channelsMux.Lock()
defer p.channelsMux.Unlock() defer func() {
p.channelsMux.Unlock()
if p.metrics != nil {
p.metrics.UpdateChannelMetrics().CountCloseChannelDuration(time.Since(start))
}
}()
p.closeChannel(peerID) p.closeChannel(peerID)
} }
// GetAllConnectedPeers returns a copy of the connected peers map // GetAllConnectedPeers returns a copy of the connected peers map
func (p *PeersUpdateManager) GetAllConnectedPeers() map[string]struct{} { func (p *PeersUpdateManager) GetAllConnectedPeers() map[string]struct{} {
start := time.Now()
p.channelsMux.Lock() p.channelsMux.Lock()
defer p.channelsMux.Unlock()
m := make(map[string]struct{}) m := make(map[string]struct{})
defer func() {
p.channelsMux.Unlock()
if p.metrics != nil {
p.metrics.UpdateChannelMetrics().CountGetAllConnectedPeersDuration(time.Since(start), len(m))
}
}()
for ID := range p.peerChannels { for ID := range p.peerChannels {
m[ID] = struct{}{} m[ID] = struct{}{}
} }
return m return m
} }

View File

@ -1,16 +1,17 @@
package server package server
import ( import (
"github.com/netbirdio/netbird/management/proto"
"testing" "testing"
"time" "time"
"github.com/netbirdio/netbird/management/proto"
) )
//var peersUpdater *PeersUpdateManager //var peersUpdater *PeersUpdateManager
func TestCreateChannel(t *testing.T) { func TestCreateChannel(t *testing.T) {
peer := "test-create" peer := "test-create"
peersUpdater := NewPeersUpdateManager() peersUpdater := NewPeersUpdateManager(nil)
defer peersUpdater.CloseChannel(peer) defer peersUpdater.CloseChannel(peer)
_ = peersUpdater.CreateChannel(peer) _ = peersUpdater.CreateChannel(peer)
@ -21,7 +22,7 @@ func TestCreateChannel(t *testing.T) {
func TestSendUpdate(t *testing.T) { func TestSendUpdate(t *testing.T) {
peer := "test-sendupdate" peer := "test-sendupdate"
peersUpdater := NewPeersUpdateManager() peersUpdater := NewPeersUpdateManager(nil)
update1 := &UpdateMessage{Update: &proto.SyncResponse{ update1 := &UpdateMessage{Update: &proto.SyncResponse{
NetworkMap: &proto.NetworkMap{ NetworkMap: &proto.NetworkMap{
Serial: 0, Serial: 0,
@ -65,7 +66,7 @@ func TestSendUpdate(t *testing.T) {
func TestCloseChannel(t *testing.T) { func TestCloseChannel(t *testing.T) {
peer := "test-close" peer := "test-close"
peersUpdater := NewPeersUpdateManager() peersUpdater := NewPeersUpdateManager(nil)
_ = peersUpdater.CreateChannel(peer) _ = peersUpdater.CreateChannel(peer)
if _, ok := peersUpdater.peerChannels[peer]; !ok { if _, ok := peersUpdater.peerChannels[peer]; !ok {
t.Error("Error creating the channel") t.Error("Error creating the channel")