From 2147bf75eb25a4d63ea428484a1dbfac5e2fbf82 Mon Sep 17 00:00:00 2001 From: Maycon Santos Date: Mon, 9 Dec 2024 17:10:31 +0100 Subject: [PATCH 1/2] [client] Add peer conn init limit (#3001) Limit the peer connection initialization to 200 peers at the same time --- client/internal/engine.go | 6 +- client/internal/peer/conn.go | 9 ++- client/internal/peer/conn_test.go | 9 +-- util/semaphore-group/semaphore_group.go | 48 ++++++++++++++ util/semaphore-group/semaphore_group_test.go | 66 ++++++++++++++++++++ 5 files changed, 131 insertions(+), 7 deletions(-) create mode 100644 util/semaphore-group/semaphore_group.go create mode 100644 util/semaphore-group/semaphore_group_test.go diff --git a/client/internal/engine.go b/client/internal/engine.go index 63caec02a..34219def1 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -39,6 +39,7 @@ import ( "github.com/netbirdio/netbird/client/internal/routemanager" "github.com/netbirdio/netbird/client/internal/routemanager/systemops" "github.com/netbirdio/netbird/client/internal/statemanager" + semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group" nbssh "github.com/netbirdio/netbird/client/ssh" "github.com/netbirdio/netbird/client/system" @@ -62,6 +63,7 @@ import ( const ( PeerConnectionTimeoutMax = 45000 // ms PeerConnectionTimeoutMin = 30000 // ms + connInitLimit = 200 ) var ErrResetConnection = fmt.Errorf("reset connection") @@ -177,6 +179,7 @@ type Engine struct { // Network map persistence persistNetworkMap bool latestNetworkMap *mgmProto.NetworkMap + connSemaphore *semaphoregroup.SemaphoreGroup } // Peer is an instance of the Connection Peer @@ -242,6 +245,7 @@ func NewEngineWithProbes( statusRecorder: statusRecorder, probes: probes, checks: checks, + connSemaphore: semaphoregroup.NewSemaphoreGroup(connInitLimit), } if runtime.GOOS == "ios" { if !fileExists(mobileDep.StateFilePath) { @@ -1051,7 +1055,7 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs string) (*peer.Conn, e }, } - peerConn, err := peer.NewConn(e.ctx, config, e.statusRecorder, e.signaler, e.mobileDep.IFaceDiscover, e.relayManager, e.srWatcher) + peerConn, err := peer.NewConn(e.ctx, config, e.statusRecorder, e.signaler, e.mobileDep.IFaceDiscover, e.relayManager, e.srWatcher, e.connSemaphore) if err != nil { return nil, err } diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index 3a698a82a..5c2e2cb60 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -23,6 +23,7 @@ import ( relayClient "github.com/netbirdio/netbird/relay/client" "github.com/netbirdio/netbird/route" nbnet "github.com/netbirdio/netbird/util/net" + semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group" ) type ConnPriority int @@ -104,12 +105,13 @@ type Conn struct { wgProxyICE wgproxy.Proxy wgProxyRelay wgproxy.Proxy - guard *guard.Guard + guard *guard.Guard + semaphore *semaphoregroup.SemaphoreGroup } // NewConn creates a new not opened Conn to the remote peer. // To establish a connection run Conn.Open -func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Status, signaler *Signaler, iFaceDiscover stdnet.ExternalIFaceDiscover, relayManager *relayClient.Manager, srWatcher *guard.SRWatcher) (*Conn, error) { +func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Status, signaler *Signaler, iFaceDiscover stdnet.ExternalIFaceDiscover, relayManager *relayClient.Manager, srWatcher *guard.SRWatcher, semaphore *semaphoregroup.SemaphoreGroup) (*Conn, error) { allowedIP, _, err := net.ParseCIDR(config.WgConfig.AllowedIps) if err != nil { log.Errorf("failed to parse allowedIPS: %v", err) @@ -130,6 +132,7 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu allowedIP: allowedIP, statusRelay: NewAtomicConnStatus(), statusICE: NewAtomicConnStatus(), + semaphore: semaphore, } rFns := WorkerRelayCallbacks{ @@ -169,6 +172,7 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu // It will try to establish a connection using ICE and in parallel with relay. The higher priority connection type will // be used. func (conn *Conn) Open() { + conn.semaphore.Add(conn.ctx) conn.log.Debugf("open connection to peer") conn.mu.Lock() @@ -191,6 +195,7 @@ func (conn *Conn) Open() { } func (conn *Conn) startHandshakeAndReconnect(ctx context.Context) { + defer conn.semaphore.Done(conn.ctx) conn.waitInitialRandomSleepTime(ctx) err := conn.handshaker.sendOffer() diff --git a/client/internal/peer/conn_test.go b/client/internal/peer/conn_test.go index 039952588..b3e9d5b60 100644 --- a/client/internal/peer/conn_test.go +++ b/client/internal/peer/conn_test.go @@ -14,6 +14,7 @@ import ( "github.com/netbirdio/netbird/client/internal/peer/ice" "github.com/netbirdio/netbird/client/internal/stdnet" "github.com/netbirdio/netbird/util" + semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group" ) var connConf = ConnConfig{ @@ -46,7 +47,7 @@ func TestNewConn_interfaceFilter(t *testing.T) { func TestConn_GetKey(t *testing.T) { swWatcher := guard.NewSRWatcher(nil, nil, nil, connConf.ICEConfig) - conn, err := NewConn(context.Background(), connConf, nil, nil, nil, nil, swWatcher) + conn, err := NewConn(context.Background(), connConf, nil, nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1)) if err != nil { return } @@ -58,7 +59,7 @@ func TestConn_GetKey(t *testing.T) { func TestConn_OnRemoteOffer(t *testing.T) { swWatcher := guard.NewSRWatcher(nil, nil, nil, connConf.ICEConfig) - conn, err := NewConn(context.Background(), connConf, NewRecorder("https://mgm"), nil, nil, nil, swWatcher) + conn, err := NewConn(context.Background(), connConf, NewRecorder("https://mgm"), nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1)) if err != nil { return } @@ -92,7 +93,7 @@ func TestConn_OnRemoteOffer(t *testing.T) { func TestConn_OnRemoteAnswer(t *testing.T) { swWatcher := guard.NewSRWatcher(nil, nil, nil, connConf.ICEConfig) - conn, err := NewConn(context.Background(), connConf, NewRecorder("https://mgm"), nil, nil, nil, swWatcher) + conn, err := NewConn(context.Background(), connConf, NewRecorder("https://mgm"), nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1)) if err != nil { return } @@ -125,7 +126,7 @@ func TestConn_OnRemoteAnswer(t *testing.T) { } func TestConn_Status(t *testing.T) { swWatcher := guard.NewSRWatcher(nil, nil, nil, connConf.ICEConfig) - conn, err := NewConn(context.Background(), connConf, NewRecorder("https://mgm"), nil, nil, nil, swWatcher) + conn, err := NewConn(context.Background(), connConf, NewRecorder("https://mgm"), nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1)) if err != nil { return } diff --git a/util/semaphore-group/semaphore_group.go b/util/semaphore-group/semaphore_group.go new file mode 100644 index 000000000..ad74e1bfc --- /dev/null +++ b/util/semaphore-group/semaphore_group.go @@ -0,0 +1,48 @@ +package semaphoregroup + +import ( + "context" + "sync" +) + +// SemaphoreGroup is a custom type that combines sync.WaitGroup and a semaphore. +type SemaphoreGroup struct { + waitGroup sync.WaitGroup + semaphore chan struct{} +} + +// NewSemaphoreGroup creates a new SemaphoreGroup with the specified semaphore limit. +func NewSemaphoreGroup(limit int) *SemaphoreGroup { + return &SemaphoreGroup{ + semaphore: make(chan struct{}, limit), + } +} + +// Add increments the internal WaitGroup counter and acquires a semaphore slot. +func (sg *SemaphoreGroup) Add(ctx context.Context) { + sg.waitGroup.Add(1) + + // Acquire semaphore slot + select { + case <-ctx.Done(): + return + case sg.semaphore <- struct{}{}: + } +} + +// Done decrements the internal WaitGroup counter and releases a semaphore slot. +func (sg *SemaphoreGroup) Done(ctx context.Context) { + sg.waitGroup.Done() + + // Release semaphore slot + select { + case <-ctx.Done(): + return + case <-sg.semaphore: + } +} + +// Wait waits until the internal WaitGroup counter is zero. +func (sg *SemaphoreGroup) Wait() { + sg.waitGroup.Wait() +} diff --git a/util/semaphore-group/semaphore_group_test.go b/util/semaphore-group/semaphore_group_test.go new file mode 100644 index 000000000..d4491cf77 --- /dev/null +++ b/util/semaphore-group/semaphore_group_test.go @@ -0,0 +1,66 @@ +package semaphoregroup + +import ( + "context" + "testing" + "time" +) + +func TestSemaphoreGroup(t *testing.T) { + semGroup := NewSemaphoreGroup(2) + + for i := 0; i < 5; i++ { + semGroup.Add(context.Background()) + go func(id int) { + defer semGroup.Done(context.Background()) + + got := len(semGroup.semaphore) + if got == 0 { + t.Errorf("Expected semaphore length > 0 , got 0") + } + + time.Sleep(time.Millisecond) + t.Logf("Goroutine %d is running\n", id) + }(i) + } + + semGroup.Wait() + + want := 0 + got := len(semGroup.semaphore) + if got != want { + t.Errorf("Expected semaphore length %d, got %d", want, got) + } +} + +func TestSemaphoreGroupContext(t *testing.T) { + semGroup := NewSemaphoreGroup(1) + semGroup.Add(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + t.Cleanup(cancel) + rChan := make(chan struct{}) + + go func() { + semGroup.Add(ctx) + rChan <- struct{}{} + }() + select { + case <-rChan: + case <-time.NewTimer(2 * time.Second).C: + t.Error("Adding to semaphore group should not block when context is not done") + } + + semGroup.Done(context.Background()) + + ctxDone, cancelDone := context.WithTimeout(context.Background(), 1*time.Second) + t.Cleanup(cancelDone) + go func() { + semGroup.Done(ctxDone) + rChan <- struct{}{} + }() + select { + case <-rChan: + case <-time.NewTimer(2 * time.Second).C: + t.Error("Releasing from semaphore group should not block when context is not done") + } +} From 97bb74f824786c20f78f3131bb8ed3e9ece26782 Mon Sep 17 00:00:00 2001 From: Bethuel Mmbaga Date: Mon, 9 Dec 2024 18:40:06 +0100 Subject: [PATCH 2/2] Remove peer login log (#3005) Signed-off-by: bcmmbaga --- management/server/peer.go | 1 - 1 file changed, 1 deletion(-) diff --git a/management/server/peer.go b/management/server/peer.go index 761aa39a2..ba211be96 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -740,7 +740,6 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login PeerLogin) // it means that the client has already checked if it needs login and had been through the SSO flow // so, we can skip this check and directly proceed with the login if login.UserID == "" { - log.Info("Peer needs login") err = am.checkIFPeerNeedsLoginWithoutLock(ctx, accountID, login) if err != nil { return nil, nil, nil, err