Gracefully conn worker shutdown (#2022)

Because the connWorker are operating with the e.peerConns list we must ensure all workers exited before we modify the content of the e.peerConns list.
If we do not do that the engine will start new connWorkers for the exists ones, and they start connection for the same peers in parallel.
This commit is contained in:
Zoltan Papp 2024-05-22 11:15:29 +02:00 committed by GitHub
parent b8717b8956
commit 61034aaf4d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 19 additions and 2 deletions

View File

@ -150,6 +150,8 @@ type Engine struct {
signalProbe *Probe signalProbe *Probe
relayProbe *Probe relayProbe *Probe
wgProbe *Probe wgProbe *Probe
wgConnWorker sync.WaitGroup
} }
// Peer is an instance of the Connection Peer // Peer is an instance of the Connection Peer
@ -245,6 +247,7 @@ func (e *Engine) Stop() error {
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
e.close() e.close()
e.wgConnWorker.Wait()
log.Infof("stopped Netbird Engine") log.Infof("stopped Netbird Engine")
return nil return nil
} }
@ -869,18 +872,25 @@ func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error {
log.Warnf("error adding peer %s to status recorder, got error: %v", peerKey, err) log.Warnf("error adding peer %s to status recorder, got error: %v", peerKey, err)
} }
e.wgConnWorker.Add(1)
go e.connWorker(conn, peerKey) go e.connWorker(conn, peerKey)
} }
return nil return nil
} }
func (e *Engine) connWorker(conn *peer.Conn, peerKey string) { func (e *Engine) connWorker(conn *peer.Conn, peerKey string) {
defer e.wgConnWorker.Done()
for { for {
// randomize starting time a bit // randomize starting time a bit
min := 500 min := 500
max := 2000 max := 2000
time.Sleep(time.Duration(rand.Intn(max-min)+min) * time.Millisecond) duration := time.Duration(rand.Intn(max-min)+min) * time.Millisecond
select {
case <-e.ctx.Done():
return
case <-time.After(duration):
}
// if peer has been removed -> give up // if peer has been removed -> give up
if !e.peerExists(peerKey) { if !e.peerExists(peerKey) {

View File

@ -229,6 +229,7 @@ func TestEngine_UpdateNetworkMap(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
engine.udpMux = bind.NewUniversalUDPMuxDefault(bind.UniversalUDPMuxParams{UDPConn: conn}) engine.udpMux = bind.NewUniversalUDPMuxDefault(bind.UniversalUDPMuxParams{UDPConn: conn})
engine.ctx = ctx
type testCase struct { type testCase struct {
name string name string
@ -408,6 +409,7 @@ func TestEngine_Sync(t *testing.T) {
WgPrivateKey: key, WgPrivateKey: key,
WgPort: 33100, WgPort: 33100,
}, MobileDependency{}, peer.NewRecorder("https://mgm")) }, MobileDependency{}, peer.NewRecorder("https://mgm"))
engine.ctx = ctx
engine.dnsServer = &dns.MockServer{ engine.dnsServer = &dns.MockServer{
UpdateDNSServerFunc: func(serial uint64, update nbdns.Config) error { return nil }, UpdateDNSServerFunc: func(serial uint64, update nbdns.Config) error { return nil },
@ -566,6 +568,7 @@ func TestEngine_UpdateNetworkMapWithRoutes(t *testing.T) {
WgPrivateKey: key, WgPrivateKey: key,
WgPort: 33100, WgPort: 33100,
}, MobileDependency{}, peer.NewRecorder("https://mgm")) }, MobileDependency{}, peer.NewRecorder("https://mgm"))
engine.ctx = ctx
newNet, err := stdnet.NewNet() newNet, err := stdnet.NewNet()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -735,6 +738,8 @@ func TestEngine_UpdateNetworkMapWithDNSUpdate(t *testing.T) {
WgPrivateKey: key, WgPrivateKey: key,
WgPort: 33100, WgPort: 33100,
}, MobileDependency{}, peer.NewRecorder("https://mgm")) }, MobileDependency{}, peer.NewRecorder("https://mgm"))
engine.ctx = ctx
newNet, err := stdnet.NewNet() newNet, err := stdnet.NewNet()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -1003,7 +1008,9 @@ func createEngine(ctx context.Context, cancel context.CancelFunc, setupKey strin
WgPort: wgPort, WgPort: wgPort,
} }
return NewEngine(ctx, cancel, signalClient, mgmtClient, conf, MobileDependency{}, peer.NewRecorder("https://mgm")), nil e, err := NewEngine(ctx, cancel, signalClient, mgmtClient, conf, MobileDependency{}, peer.NewRecorder("https://mgm")), nil
e.ctx = ctx
return e, err
} }
func startSignal() (*grpc.Server, string, error) { func startSignal() (*grpc.Server, string, error) {