diff --git a/management/client/client_test.go b/management/client/client_test.go index c163d1833..1847af73e 100644 --- a/management/client/client_test.go +++ b/management/client/client_test.go @@ -87,6 +87,12 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) { ). Return(&types.Settings{}, nil). AnyTimes() + settingsMockManager. + EXPECT(). + GetExtraSettings(gomock.Any(), gomock.Any()). + Return(&types.ExtraSettings{}, nil). + AnyTimes() + permissionsManagerMock := permissions.NewMockManager(ctrl) permissionsManagerMock. EXPECT(). diff --git a/management/server/account/manager.go b/management/server/account/manager.go index ed17fa5ec..f8aa2756a 100644 --- a/management/server/account/manager.go +++ b/management/server/account/manager.go @@ -112,6 +112,7 @@ type Manager interface { GetAccountSettings(ctx context.Context, accountID string, userID string) (*types.Settings, error) DeleteSetupKey(ctx context.Context, accountID, userID, keyID string) error UpdateAccountPeers(ctx context.Context, accountID string) + BufferUpdateAccountPeers(ctx context.Context, accountID string) BuildUserInfosForAccount(ctx context.Context, accountID, initiatorUserID string, accountUsers []*types.User) (map[string]*types.UserInfo, error) SyncUserJWTGroups(ctx context.Context, userAuth nbcontext.UserAuth) error GetStore() store.Store diff --git a/management/server/dns_test.go b/management/server/dns_test.go index 02bb042d7..31c944a25 100644 --- a/management/server/dns_test.go +++ b/management/server/dns_test.go @@ -216,6 +216,8 @@ func createDNSManager(t *testing.T) (*DefaultAccountManager, error) { t.Cleanup(ctrl.Finish) settingsMockManager := settings.NewMockManager(ctrl) + // return empty extra settings for expected calls to UpdateAccountPeers + settingsMockManager.EXPECT().GetExtraSettings(gomock.Any(), gomock.Any()).Return(&types.ExtraSettings{}, nil).AnyTimes() permissionsManager := permissions.NewManager(store) return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.test", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false) } diff --git a/management/server/ephemeral.go b/management/server/ephemeral.go index 3cb9b7536..9f4348ebb 100644 --- a/management/server/ephemeral.go +++ b/management/server/ephemeral.go @@ -15,6 +15,8 @@ import ( const ( ephemeralLifeTime = 10 * time.Minute + // cleanupWindow is the time window to wait after nearest peer deadline to start the cleanup procedure. + cleanupWindow = 1 * time.Minute ) var ( @@ -41,6 +43,9 @@ type EphemeralManager struct { tailPeer *ephemeralPeer peersLock sync.Mutex timer *time.Timer + + lifeTime time.Duration + cleanupWindow time.Duration } // NewEphemeralManager instantiate new EphemeralManager @@ -48,6 +53,9 @@ func NewEphemeralManager(store store.Store, accountManager nbAccount.Manager) *E return &EphemeralManager{ store: store, accountManager: accountManager, + + lifeTime: ephemeralLifeTime, + cleanupWindow: cleanupWindow, } } @@ -60,7 +68,7 @@ func (e *EphemeralManager) LoadInitialPeers(ctx context.Context) { e.loadEphemeralPeers(ctx) if e.headPeer != nil { - e.timer = time.AfterFunc(ephemeralLifeTime, func() { + e.timer = time.AfterFunc(e.lifeTime, func() { e.cleanup(ctx) }) } @@ -113,9 +121,13 @@ func (e *EphemeralManager) OnPeerDisconnected(ctx context.Context, peer *nbpeer. return } - e.addPeer(peer.AccountID, peer.ID, newDeadLine()) + e.addPeer(peer.AccountID, peer.ID, e.newDeadLine()) if e.timer == nil { - e.timer = time.AfterFunc(e.headPeer.deadline.Sub(timeNow()), func() { + delay := e.headPeer.deadline.Sub(timeNow()) + e.cleanupWindow + if delay < 0 { + delay = 0 + } + e.timer = time.AfterFunc(delay, func() { e.cleanup(ctx) }) } @@ -128,7 +140,7 @@ func (e *EphemeralManager) loadEphemeralPeers(ctx context.Context) { return } - t := newDeadLine() + t := e.newDeadLine() for _, p := range peers { e.addPeer(p.AccountID, p.ID, t) } @@ -155,7 +167,11 @@ func (e *EphemeralManager) cleanup(ctx context.Context) { } if e.headPeer != nil { - e.timer = time.AfterFunc(e.headPeer.deadline.Sub(timeNow()), func() { + delay := e.headPeer.deadline.Sub(timeNow()) + e.cleanupWindow + if delay < 0 { + delay = 0 + } + e.timer = time.AfterFunc(delay, func() { e.cleanup(ctx) }) } else { @@ -164,13 +180,20 @@ func (e *EphemeralManager) cleanup(ctx context.Context) { e.peersLock.Unlock() + bufferAccountCall := make(map[string]struct{}) + for id, p := range deletePeers { log.WithContext(ctx).Debugf("delete ephemeral peer: %s", id) err := e.accountManager.DeletePeer(ctx, p.accountID, id, activity.SystemInitiator) if err != nil { log.WithContext(ctx).Errorf("failed to delete ephemeral peer: %s", err) + } else { + bufferAccountCall[p.accountID] = struct{}{} } } + for accountID := range bufferAccountCall { + e.accountManager.BufferUpdateAccountPeers(ctx, accountID) + } } func (e *EphemeralManager) addPeer(accountID string, peerID string, deadline time.Time) { @@ -223,6 +246,6 @@ func (e *EphemeralManager) isPeerOnList(id string) bool { return false } -func newDeadLine() time.Time { - return timeNow().Add(ephemeralLifeTime) +func (e *EphemeralManager) newDeadLine() time.Time { + return timeNow().Add(e.lifeTime) } diff --git a/management/server/ephemeral_test.go b/management/server/ephemeral_test.go index 3cf6ae7f3..f71d48c58 100644 --- a/management/server/ephemeral_test.go +++ b/management/server/ephemeral_test.go @@ -3,9 +3,12 @@ package server import ( "context" "fmt" + "sync" "testing" "time" + "github.com/stretchr/testify/assert" + nbAccount "github.com/netbirdio/netbird/management/server/account" nbpeer "github.com/netbirdio/netbird/management/server/peer" "github.com/netbirdio/netbird/management/server/store" @@ -27,28 +30,65 @@ func (s *MockStore) GetAllEphemeralPeers(_ context.Context, _ store.LockingStren return peers, nil } -type MocAccountManager struct { +type MockAccountManager struct { + mu sync.Mutex nbAccount.Manager - store *MockStore + store *MockStore + deletePeerCalls int + bufferUpdateCalls map[string]int + wg *sync.WaitGroup } -func (a MocAccountManager) DeletePeer(_ context.Context, accountID, peerID, userID string) error { +func (a *MockAccountManager) DeletePeer(_ context.Context, accountID, peerID, userID string) error { + a.mu.Lock() + defer a.mu.Unlock() + a.deletePeerCalls++ + if a.wg != nil { + a.wg.Done() + } delete(a.store.account.Peers, peerID) - return nil //nolint:nil + return nil } -func (a MocAccountManager) GetStore() store.Store { +func (a *MockAccountManager) GetDeletePeerCalls() int { + a.mu.Lock() + defer a.mu.Unlock() + return a.deletePeerCalls +} + +func (a *MockAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) { + a.mu.Lock() + defer a.mu.Unlock() + if a.bufferUpdateCalls == nil { + a.bufferUpdateCalls = make(map[string]int) + } + a.bufferUpdateCalls[accountID]++ +} + +func (a *MockAccountManager) GetBufferUpdateCalls(accountID string) int { + a.mu.Lock() + defer a.mu.Unlock() + if a.bufferUpdateCalls == nil { + return 0 + } + return a.bufferUpdateCalls[accountID] +} + +func (a *MockAccountManager) GetStore() store.Store { return a.store } func TestNewManager(t *testing.T) { + t.Cleanup(func() { + timeNow = time.Now + }) startTime := time.Now() timeNow = func() time.Time { return startTime } store := &MockStore{} - am := MocAccountManager{ + am := MockAccountManager{ store: store, } @@ -56,7 +96,7 @@ func TestNewManager(t *testing.T) { numberOfEphemeralPeers := 3 seedPeers(store, numberOfPeers, numberOfEphemeralPeers) - mgr := NewEphemeralManager(store, am) + mgr := NewEphemeralManager(store, &am) mgr.loadEphemeralPeers(context.Background()) startTime = startTime.Add(ephemeralLifeTime + 1) mgr.cleanup(context.Background()) @@ -67,13 +107,16 @@ func TestNewManager(t *testing.T) { } func TestNewManagerPeerConnected(t *testing.T) { + t.Cleanup(func() { + timeNow = time.Now + }) startTime := time.Now() timeNow = func() time.Time { return startTime } store := &MockStore{} - am := MocAccountManager{ + am := MockAccountManager{ store: store, } @@ -81,7 +124,7 @@ func TestNewManagerPeerConnected(t *testing.T) { numberOfEphemeralPeers := 3 seedPeers(store, numberOfPeers, numberOfEphemeralPeers) - mgr := NewEphemeralManager(store, am) + mgr := NewEphemeralManager(store, &am) mgr.loadEphemeralPeers(context.Background()) mgr.OnPeerConnected(context.Background(), store.account.Peers["ephemeral_peer_0"]) @@ -95,13 +138,16 @@ func TestNewManagerPeerConnected(t *testing.T) { } func TestNewManagerPeerDisconnected(t *testing.T) { + t.Cleanup(func() { + timeNow = time.Now + }) startTime := time.Now() timeNow = func() time.Time { return startTime } store := &MockStore{} - am := MocAccountManager{ + am := MockAccountManager{ store: store, } @@ -109,7 +155,7 @@ func TestNewManagerPeerDisconnected(t *testing.T) { numberOfEphemeralPeers := 3 seedPeers(store, numberOfPeers, numberOfEphemeralPeers) - mgr := NewEphemeralManager(store, am) + mgr := NewEphemeralManager(store, &am) mgr.loadEphemeralPeers(context.Background()) for _, v := range store.account.Peers { mgr.OnPeerConnected(context.Background(), v) @@ -126,6 +172,36 @@ func TestNewManagerPeerDisconnected(t *testing.T) { } } +func TestCleanupSchedulingBehaviorIsBatched(t *testing.T) { + const ( + ephemeralPeers = 10 + testLifeTime = 1 * time.Second + testCleanupWindow = 100 * time.Millisecond + ) + mockStore := &MockStore{} + mockAM := &MockAccountManager{ + store: mockStore, + } + mockAM.wg = &sync.WaitGroup{} + mockAM.wg.Add(ephemeralPeers) + mgr := NewEphemeralManager(mockStore, mockAM) + mgr.lifeTime = testLifeTime + mgr.cleanupWindow = testCleanupWindow + + account := newAccountWithId(context.Background(), "account", "", "", false) + mockStore.account = account + for i := range ephemeralPeers { + p := &nbpeer.Peer{ID: fmt.Sprintf("peer-%d", i), AccountID: account.Id, Ephemeral: true} + mockStore.account.Peers[p.ID] = p + time.Sleep(testCleanupWindow / ephemeralPeers) + mgr.OnPeerDisconnected(context.Background(), p) + } + mockAM.wg.Wait() + assert.Len(t, mockStore.account.Peers, 0, "all ephemeral peers should be cleaned up after the lifetime") + assert.Equal(t, 1, mockAM.GetBufferUpdateCalls(account.Id), "buffer update should be called once") + assert.Equal(t, ephemeralPeers, mockAM.GetDeletePeerCalls(), "should have deleted all peers") +} + func seedPeers(store *MockStore, numberOfPeers int, numberOfEphemeralPeers int) { store.account = newAccountWithId(context.Background(), "my account", "", "", false) diff --git a/management/server/management_proto_test.go b/management/server/management_proto_test.go index 337890ef9..57c00ed9f 100644 --- a/management/server/management_proto_test.go +++ b/management/server/management_proto_test.go @@ -440,7 +440,11 @@ func startManagementForTest(t *testing.T, testFile string, config *types.Config) GetSettings(gomock.Any(), gomock.Any(), gomock.Any()). AnyTimes(). Return(&types.Settings{}, nil) - + settingsMockManager. + EXPECT(). + GetExtraSettings(gomock.Any(), gomock.Any()). + Return(&types.ExtraSettings{}, nil). + AnyTimes() permissionsManager := permissions.NewManager(store) accountManager, err := BuildManager(ctx, store, peersUpdateManager, nil, "", "netbird.selfhosted", diff --git a/management/server/mock_server/account_mock.go b/management/server/mock_server/account_mock.go index 8837f9f50..4004f1b57 100644 --- a/management/server/mock_server/account_mock.go +++ b/management/server/mock_server/account_mock.go @@ -126,6 +126,10 @@ func (am *MockAccountManager) UpdateAccountPeers(ctx context.Context, accountID // do nothing } +func (am *MockAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) { + // do nothing +} + func (am *MockAccountManager) DeleteSetupKey(ctx context.Context, accountID, userID, keyID string) error { if am.DeleteSetupKeyFunc != nil { return am.DeleteSetupKeyFunc(ctx, accountID, userID, keyID) diff --git a/management/server/nameserver_test.go b/management/server/nameserver_test.go index 75d1e7972..8fada742c 100644 --- a/management/server/nameserver_test.go +++ b/management/server/nameserver_test.go @@ -778,6 +778,12 @@ func createNSManager(t *testing.T) (*DefaultAccountManager, error) { ctrl := gomock.NewController(t) t.Cleanup(ctrl.Finish) settingsMockManager := settings.NewMockManager(ctrl) + settingsMockManager. + EXPECT(). + GetExtraSettings(gomock.Any(), gomock.Any()). + Return(&types.ExtraSettings{}, nil). + AnyTimes() + permissionsManager := permissions.NewManager(store) return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false) } diff --git a/management/server/peer.go b/management/server/peer.go index 44156e534..a60513b38 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -375,7 +375,7 @@ func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peer storeEvent() } - if updateAccountPeers { + if updateAccountPeers && userID != activity.SystemInitiator { am.BufferUpdateAccountPeers(ctx, accountID) } @@ -1177,6 +1177,19 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account globalStart := time.Now() + hasPeersConnected := false + for _, peer := range account.Peers { + if am.peersUpdateManager.HasChannel(peer.ID) { + hasPeersConnected = true + break + } + + } + + if !hasPeersConnected { + return + } + approvedPeersMap, err := am.integratedPeerValidator.GetValidatedPeers(account.Id, maps.Values(account.Groups), maps.Values(account.Peers), account.Settings.Extra) if err != nil { log.WithContext(ctx).Errorf("failed to send out updates to peers, failed to get validate peers: %v", err) @@ -1198,6 +1211,12 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account return } + extraSetting, err := am.settingsManager.GetExtraSettings(ctx, accountID) + if err != nil { + log.WithContext(ctx).Errorf("failed to get flow enabled status: %v", err) + return + } + for _, peer := range account.Peers { if !am.peersUpdateManager.HasChannel(peer.ID) { log.WithContext(ctx).Tracef("peer %s doesn't have a channel, skipping network map update", peer.ID) @@ -1232,12 +1251,6 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account } am.metrics.UpdateChannelMetrics().CountMergeNetworkMapDuration(time.Since(start)) - extraSetting, err := am.settingsManager.GetExtraSettings(ctx, accountID) - if err != nil { - log.WithContext(ctx).Errorf("failed to get flow enabled status: %v", err) - return - } - start = time.Now() update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSetting) am.metrics.UpdateChannelMetrics().CountToSyncResponseDuration(time.Since(start)) diff --git a/management/server/peer_test.go b/management/server/peer_test.go index 31439d670..07ec5037b 100644 --- a/management/server/peer_test.go +++ b/management/server/peer_test.go @@ -1344,6 +1344,11 @@ func Test_RegisterPeerBySetupKey(t *testing.T) { ctrl := gomock.NewController(t) t.Cleanup(ctrl.Finish) settingsMockManager := settings.NewMockManager(ctrl) + settingsMockManager. + EXPECT(). + GetExtraSettings(gomock.Any(), gomock.Any()). + Return(&types.ExtraSettings{}, nil). + AnyTimes() permissionsManager := permissions.NewManager(s) am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false) @@ -1556,6 +1561,11 @@ func Test_LoginPeer(t *testing.T) { ctrl := gomock.NewController(t) t.Cleanup(ctrl.Finish) settingsMockManager := settings.NewMockManager(ctrl) + settingsMockManager. + EXPECT(). + GetExtraSettings(gomock.Any(), gomock.Any()). + Return(&types.ExtraSettings{}, nil). + AnyTimes() permissionsManager := permissions.NewManager(s) am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)