mirror of
https://github.com/netbirdio/netbird.git
synced 2025-08-09 07:15:15 +02:00
[management] sync calls to UpdateAccountPeers from BufferUpdateAccountPeers (#4137)
--------- Co-authored-by: Maycon Santos <mlsmaycon@gmail.com> Co-authored-by: Pedro Costa <550684+pnmcosta@users.noreply.github.com>
This commit is contained in:
@ -120,14 +120,20 @@ type MockAccountManager struct {
|
|||||||
GetAccountOnboardingFunc func(ctx context.Context, accountID, userID string) (*types.AccountOnboarding, error)
|
GetAccountOnboardingFunc func(ctx context.Context, accountID, userID string) (*types.AccountOnboarding, error)
|
||||||
UpdateAccountOnboardingFunc func(ctx context.Context, accountID, userID string, onboarding *types.AccountOnboarding) (*types.AccountOnboarding, error)
|
UpdateAccountOnboardingFunc func(ctx context.Context, accountID, userID string, onboarding *types.AccountOnboarding) (*types.AccountOnboarding, error)
|
||||||
GetOrCreateAccountByPrivateDomainFunc func(ctx context.Context, initiatorId, domain string) (*types.Account, bool, error)
|
GetOrCreateAccountByPrivateDomainFunc func(ctx context.Context, initiatorId, domain string) (*types.Account, bool, error)
|
||||||
|
UpdateAccountPeersFunc func(ctx context.Context, accountID string)
|
||||||
|
BufferUpdateAccountPeersFunc func(ctx context.Context, accountID string)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (am *MockAccountManager) UpdateAccountPeers(ctx context.Context, accountID string) {
|
func (am *MockAccountManager) UpdateAccountPeers(ctx context.Context, accountID string) {
|
||||||
// do nothing
|
if am.UpdateAccountPeersFunc != nil {
|
||||||
|
am.UpdateAccountPeersFunc(ctx, accountID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (am *MockAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
|
func (am *MockAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
|
||||||
// do nothing
|
if am.BufferUpdateAccountPeersFunc != nil {
|
||||||
|
am.BufferUpdateAccountPeersFunc(ctx, accountID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (am *MockAccountManager) DeleteSetupKey(ctx context.Context, accountID, userID, keyID string) error {
|
func (am *MockAccountManager) DeleteSetupKey(ctx context.Context, accountID, userID, keyID string) error {
|
||||||
|
@ -9,6 +9,7 @@ import (
|
|||||||
"slices"
|
"slices"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/rs/xid"
|
"github.com/rs/xid"
|
||||||
@ -1280,18 +1281,39 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
|
type bufferUpdate struct {
|
||||||
mu, _ := am.accountUpdateLocks.LoadOrStore(accountID, &sync.Mutex{})
|
mu sync.Mutex
|
||||||
lock := mu.(*sync.Mutex)
|
next *time.Timer
|
||||||
|
update atomic.Bool
|
||||||
|
}
|
||||||
|
|
||||||
if !lock.TryLock() {
|
func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
|
||||||
|
bufUpd, _ := am.accountUpdateLocks.LoadOrStore(accountID, &bufferUpdate{})
|
||||||
|
b := bufUpd.(*bufferUpdate)
|
||||||
|
|
||||||
|
if !b.mu.TryLock() {
|
||||||
|
b.update.Store(true)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if b.next != nil {
|
||||||
|
b.next.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(time.Duration(am.updateAccountPeersBufferInterval.Load()))
|
defer b.mu.Unlock()
|
||||||
lock.Unlock()
|
|
||||||
am.UpdateAccountPeers(ctx, accountID)
|
am.UpdateAccountPeers(ctx, accountID)
|
||||||
|
if !b.update.Load() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
b.update.Store(false)
|
||||||
|
if b.next == nil {
|
||||||
|
b.next = time.AfterFunc(time.Duration(am.updateAccountPeersBufferInterval.Load()), func() {
|
||||||
|
am.UpdateAccountPeers(ctx, accountID)
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
b.next.Reset(time.Duration(am.updateAccountPeersBufferInterval.Load()))
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,6 +13,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -25,6 +26,7 @@ import (
|
|||||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
|
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
|
||||||
|
"github.com/netbirdio/netbird/management/server/mock_server"
|
||||||
"github.com/netbirdio/netbird/management/server/permissions"
|
"github.com/netbirdio/netbird/management/server/permissions"
|
||||||
"github.com/netbirdio/netbird/management/server/settings"
|
"github.com/netbirdio/netbird/management/server/settings"
|
||||||
"github.com/netbirdio/netbird/management/server/status"
|
"github.com/netbirdio/netbird/management/server/status"
|
||||||
@ -2251,3 +2253,131 @@ func Test_AddPeer(t *testing.T) {
|
|||||||
assert.Equal(t, totalPeers, maps.Values(account.SetupKeys)[0].UsedTimes)
|
assert.Equal(t, totalPeers, maps.Values(account.SetupKeys)[0].UsedTimes)
|
||||||
assert.Equal(t, uint64(totalPeers), account.Network.Serial)
|
assert.Equal(t, uint64(totalPeers), account.Network.Serial)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBufferUpdateAccountPeers(t *testing.T) {
|
||||||
|
const (
|
||||||
|
peersCount = 1000
|
||||||
|
updateAccountInterval = 50 * time.Millisecond
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
deletedPeers, updatePeersDeleted, updatePeersRuns atomic.Int32
|
||||||
|
uapLastRun, dpLastRun atomic.Int64
|
||||||
|
|
||||||
|
totalNewRuns, totalOldRuns int
|
||||||
|
)
|
||||||
|
|
||||||
|
uap := func(ctx context.Context, accountID string) {
|
||||||
|
updatePeersDeleted.Store(deletedPeers.Load())
|
||||||
|
updatePeersRuns.Add(1)
|
||||||
|
uapLastRun.Store(time.Now().UnixMilli())
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("new approach", func(t *testing.T) {
|
||||||
|
updatePeersRuns.Store(0)
|
||||||
|
updatePeersDeleted.Store(0)
|
||||||
|
deletedPeers.Store(0)
|
||||||
|
|
||||||
|
var mustore sync.Map
|
||||||
|
bufupd := func(ctx context.Context, accountID string) {
|
||||||
|
mu, _ := mustore.LoadOrStore(accountID, &bufferUpdate{})
|
||||||
|
b := mu.(*bufferUpdate)
|
||||||
|
|
||||||
|
if !b.mu.TryLock() {
|
||||||
|
b.update.Store(true)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.next != nil {
|
||||||
|
b.next.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer b.mu.Unlock()
|
||||||
|
uap(ctx, accountID)
|
||||||
|
if !b.update.Load() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
b.update.Store(false)
|
||||||
|
b.next = time.AfterFunc(updateAccountInterval, func() {
|
||||||
|
uap(ctx, accountID)
|
||||||
|
})
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
dp := func(ctx context.Context, accountID, peerID, userID string) error {
|
||||||
|
deletedPeers.Add(1)
|
||||||
|
dpLastRun.Store(time.Now().UnixMilli())
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
bufupd(ctx, accountID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
am := mock_server.MockAccountManager{
|
||||||
|
UpdateAccountPeersFunc: uap,
|
||||||
|
BufferUpdateAccountPeersFunc: bufupd,
|
||||||
|
DeletePeerFunc: dp,
|
||||||
|
}
|
||||||
|
empty := ""
|
||||||
|
for range peersCount {
|
||||||
|
//nolint
|
||||||
|
am.DeletePeer(context.Background(), empty, empty, empty)
|
||||||
|
}
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
assert.Equal(t, peersCount, int(deletedPeers.Load()), "Expected all peers to be deleted")
|
||||||
|
assert.Equal(t, peersCount, int(updatePeersDeleted.Load()), "Expected all peers to be updated in the buffer")
|
||||||
|
assert.GreaterOrEqual(t, uapLastRun.Load(), dpLastRun.Load(), "Expected update account peers to run after delete peer")
|
||||||
|
|
||||||
|
totalNewRuns = int(updatePeersRuns.Load())
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("old approach", func(t *testing.T) {
|
||||||
|
updatePeersRuns.Store(0)
|
||||||
|
updatePeersDeleted.Store(0)
|
||||||
|
deletedPeers.Store(0)
|
||||||
|
|
||||||
|
var mustore sync.Map
|
||||||
|
bufupd := func(ctx context.Context, accountID string) {
|
||||||
|
mu, _ := mustore.LoadOrStore(accountID, &sync.Mutex{})
|
||||||
|
b := mu.(*sync.Mutex)
|
||||||
|
|
||||||
|
if !b.TryLock() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
time.Sleep(updateAccountInterval)
|
||||||
|
b.Unlock()
|
||||||
|
uap(ctx, accountID)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
dp := func(ctx context.Context, accountID, peerID, userID string) error {
|
||||||
|
deletedPeers.Add(1)
|
||||||
|
dpLastRun.Store(time.Now().UnixMilli())
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
bufupd(ctx, accountID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
am := mock_server.MockAccountManager{
|
||||||
|
UpdateAccountPeersFunc: uap,
|
||||||
|
BufferUpdateAccountPeersFunc: bufupd,
|
||||||
|
DeletePeerFunc: dp,
|
||||||
|
}
|
||||||
|
empty := ""
|
||||||
|
for range peersCount {
|
||||||
|
//nolint
|
||||||
|
am.DeletePeer(context.Background(), empty, empty, empty)
|
||||||
|
}
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
assert.Equal(t, peersCount, int(deletedPeers.Load()), "Expected all peers to be deleted")
|
||||||
|
assert.Equal(t, peersCount, int(updatePeersDeleted.Load()), "Expected all peers to be updated in the buffer")
|
||||||
|
assert.GreaterOrEqual(t, uapLastRun.Load(), dpLastRun.Load(), "Expected update account peers to run after delete peer")
|
||||||
|
|
||||||
|
totalOldRuns = int(updatePeersRuns.Load())
|
||||||
|
})
|
||||||
|
assert.Less(t, totalNewRuns, totalOldRuns, "Expected new approach to run less than old approach. New runs: %d, Old runs: %d", totalNewRuns, totalOldRuns)
|
||||||
|
t.Logf("New runs: %d, Old runs: %d", totalNewRuns, totalOldRuns)
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user