From 61034aaf4df8f7a1cc114d59c4a4373bb664dd62 Mon Sep 17 00:00:00 2001 From: Zoltan Papp Date: Wed, 22 May 2024 11:15:29 +0200 Subject: [PATCH] Gracefully conn worker shutdown (#2022) Because the connWorker are operating with the e.peerConns list we must ensure all workers exited before we modify the content of the e.peerConns list. If we do not do that the engine will start new connWorkers for the exists ones, and they start connection for the same peers in parallel. --- client/internal/engine.go | 12 +++++++++++- client/internal/engine_test.go | 9 ++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/client/internal/engine.go b/client/internal/engine.go index 351b21b2e..9c96dc50d 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -150,6 +150,8 @@ type Engine struct { signalProbe *Probe relayProbe *Probe wgProbe *Probe + + wgConnWorker sync.WaitGroup } // Peer is an instance of the Connection Peer @@ -245,6 +247,7 @@ func (e *Engine) Stop() error { time.Sleep(500 * time.Millisecond) e.close() + e.wgConnWorker.Wait() log.Infof("stopped Netbird Engine") return nil } @@ -869,18 +872,25 @@ func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error { log.Warnf("error adding peer %s to status recorder, got error: %v", peerKey, err) } + e.wgConnWorker.Add(1) go e.connWorker(conn, peerKey) } return nil } func (e *Engine) connWorker(conn *peer.Conn, peerKey string) { + defer e.wgConnWorker.Done() for { // randomize starting time a bit min := 500 max := 2000 - time.Sleep(time.Duration(rand.Intn(max-min)+min) * time.Millisecond) + duration := time.Duration(rand.Intn(max-min)+min) * time.Millisecond + select { + case <-e.ctx.Done(): + return + case <-time.After(duration): + } // if peer has been removed -> give up if !e.peerExists(peerKey) { diff --git a/client/internal/engine_test.go b/client/internal/engine_test.go index 0239ae58a..f5a98cb7f 100644 --- a/client/internal/engine_test.go +++ b/client/internal/engine_test.go @@ -229,6 +229,7 @@ func TestEngine_UpdateNetworkMap(t *testing.T) { t.Fatal(err) } engine.udpMux = bind.NewUniversalUDPMuxDefault(bind.UniversalUDPMuxParams{UDPConn: conn}) + engine.ctx = ctx type testCase struct { name string @@ -408,6 +409,7 @@ func TestEngine_Sync(t *testing.T) { WgPrivateKey: key, WgPort: 33100, }, MobileDependency{}, peer.NewRecorder("https://mgm")) + engine.ctx = ctx engine.dnsServer = &dns.MockServer{ UpdateDNSServerFunc: func(serial uint64, update nbdns.Config) error { return nil }, @@ -566,6 +568,7 @@ func TestEngine_UpdateNetworkMapWithRoutes(t *testing.T) { WgPrivateKey: key, WgPort: 33100, }, MobileDependency{}, peer.NewRecorder("https://mgm")) + engine.ctx = ctx newNet, err := stdnet.NewNet() if err != nil { t.Fatal(err) @@ -735,6 +738,8 @@ func TestEngine_UpdateNetworkMapWithDNSUpdate(t *testing.T) { WgPrivateKey: key, WgPort: 33100, }, MobileDependency{}, peer.NewRecorder("https://mgm")) + engine.ctx = ctx + newNet, err := stdnet.NewNet() if err != nil { t.Fatal(err) @@ -1003,7 +1008,9 @@ func createEngine(ctx context.Context, cancel context.CancelFunc, setupKey strin WgPort: wgPort, } - return NewEngine(ctx, cancel, signalClient, mgmtClient, conf, MobileDependency{}, peer.NewRecorder("https://mgm")), nil + e, err := NewEngine(ctx, cancel, signalClient, mgmtClient, conf, MobileDependency{}, peer.NewRecorder("https://mgm")), nil + e.ctx = ctx + return e, err } func startSignal() (*grpc.Server, string, error) {