From 75feb0da8b06c5fa23595efc4bf867fc6fcc9ae3 Mon Sep 17 00:00:00 2001 From: Zoltan Papp Date: Wed, 11 Jun 2025 11:04:44 +0200 Subject: [PATCH] [client] Refactor context management in ConnMgr for clarity and consistency (#3951) In the conn_mgr we must distinguish two contexts. One is relevant for lazy-manager, and one (engine context) is relevant for peer creation. If we use the incorrect context, then when we disable the lazy connection feature, we cancel the peer connections too, instead of just the lazy manager. --- client/internal/conn_mgr.go | 38 ++++++++++----------- client/internal/engine.go | 2 +- client/internal/lazyconn/manager/manager.go | 14 ++++---- 3 files changed, 26 insertions(+), 28 deletions(-) diff --git a/client/internal/conn_mgr.go b/client/internal/conn_mgr.go index 88b47c511..aac312dc3 100644 --- a/client/internal/conn_mgr.go +++ b/client/internal/conn_mgr.go @@ -34,9 +34,9 @@ type ConnMgr struct { lazyConnMgr *manager.Manager - wg sync.WaitGroup - ctx context.Context - ctxCancel context.CancelFunc + wg sync.WaitGroup + lazyCtx context.Context + lazyCtxCancel context.CancelFunc } func NewConnMgr(engineConfig *EngineConfig, statusRecorder *peer.Status, peerStore *peerstore.Store, iface lazyconn.WGIface, dispatcher *dispatcher.ConnectionDispatcher) *ConnMgr { @@ -86,7 +86,7 @@ func (e *ConnMgr) UpdatedRemoteFeatureFlag(ctx context.Context, enabled bool) er log.Infof("lazy connection manager is enabled by management feature flag") e.initLazyManager(ctx) e.statusRecorder.UpdateLazyConnection(true) - return e.addPeersToLazyConnManager(ctx) + return e.addPeersToLazyConnManager() } else { if e.lazyConnMgr == nil { return nil @@ -109,7 +109,7 @@ func (e *ConnMgr) UpdateRouteHAMap(haMap route.HAMap) { } // SetExcludeList sets the list of peer IDs that should always have permanent connections. -func (e *ConnMgr) SetExcludeList(peerIDs map[string]bool) { +func (e *ConnMgr) SetExcludeList(ctx context.Context, peerIDs map[string]bool) { if e.lazyConnMgr == nil { return } @@ -133,7 +133,7 @@ func (e *ConnMgr) SetExcludeList(peerIDs map[string]bool) { excludedPeers = append(excludedPeers, lazyPeerCfg) } - added := e.lazyConnMgr.ExcludePeer(e.ctx, excludedPeers) + added := e.lazyConnMgr.ExcludePeer(e.lazyCtx, excludedPeers) for _, peerID := range added { var peerConn *peer.Conn var exists bool @@ -143,7 +143,7 @@ func (e *ConnMgr) SetExcludeList(peerIDs map[string]bool) { } peerConn.Log.Infof("peer has been added to lazy connection exclude list, opening permanent connection") - if err := peerConn.Open(e.ctx); err != nil { + if err := peerConn.Open(ctx); err != nil { peerConn.Log.Errorf("failed to open connection: %v", err) } } @@ -221,9 +221,9 @@ func (e *ConnMgr) OnSignalMsg(ctx context.Context, peerKey string) (*peer.Conn, return conn, true } - if found := e.lazyConnMgr.ActivatePeer(ctx, peerKey); found { + if found := e.lazyConnMgr.ActivatePeer(e.lazyCtx, peerKey); found { conn.Log.Infof("activated peer from inactive state") - if err := conn.Open(e.ctx); err != nil { + if err := conn.Open(ctx); err != nil { conn.Log.Errorf("failed to open connection: %v", err) } } @@ -235,29 +235,27 @@ func (e *ConnMgr) Close() { return } - e.ctxCancel() + e.lazyCtxCancel() e.wg.Wait() e.lazyConnMgr = nil } -func (e *ConnMgr) initLazyManager(parentCtx context.Context) { +func (e *ConnMgr) initLazyManager(engineCtx context.Context) { cfg := manager.Config{ InactivityThreshold: inactivityThresholdEnv(), } - e.lazyConnMgr = manager.NewManager(cfg, e.peerStore, e.iface, e.dispatcher) + e.lazyConnMgr = manager.NewManager(cfg, engineCtx, e.peerStore, e.iface, e.dispatcher) - ctx, cancel := context.WithCancel(parentCtx) - e.ctx = ctx - e.ctxCancel = cancel + e.lazyCtx, e.lazyCtxCancel = context.WithCancel(engineCtx) e.wg.Add(1) go func() { defer e.wg.Done() - e.lazyConnMgr.Start(ctx) + e.lazyConnMgr.Start(e.lazyCtx) }() } -func (e *ConnMgr) addPeersToLazyConnManager(ctx context.Context) error { +func (e *ConnMgr) addPeersToLazyConnManager() error { peers := e.peerStore.PeersPubKey() lazyPeerCfgs := make([]lazyconn.PeerConfig, 0, len(peers)) for _, peerID := range peers { @@ -277,7 +275,7 @@ func (e *ConnMgr) addPeersToLazyConnManager(ctx context.Context) error { lazyPeerCfgs = append(lazyPeerCfgs, lazyPeerCfg) } - return e.lazyConnMgr.AddActivePeers(ctx, lazyPeerCfgs) + return e.lazyConnMgr.AddActivePeers(e.lazyCtx, lazyPeerCfgs) } func (e *ConnMgr) closeManager(ctx context.Context) { @@ -285,7 +283,7 @@ func (e *ConnMgr) closeManager(ctx context.Context) { return } - e.ctxCancel() + e.lazyCtxCancel() e.wg.Wait() e.lazyConnMgr = nil @@ -295,7 +293,7 @@ func (e *ConnMgr) closeManager(ctx context.Context) { } func (e *ConnMgr) isStartedWithLazyMgr() bool { - return e.lazyConnMgr != nil && e.ctxCancel != nil + return e.lazyConnMgr != nil && e.lazyCtxCancel != nil } func inactivityThresholdEnv() *time.Duration { diff --git a/client/internal/engine.go b/client/internal/engine.go index b3b7d1062..253ecb2a6 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -1082,7 +1082,7 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error { // must set the exclude list after the peers are added. Without it the manager can not figure out the peers parameters from the store excludedLazyPeers := e.toExcludedLazyPeers(forwardingRules, networkMap.GetRemotePeers()) - e.connMgr.SetExcludeList(excludedLazyPeers) + e.connMgr.SetExcludeList(e.ctx, excludedLazyPeers) e.networkSerial = serial diff --git a/client/internal/lazyconn/manager/manager.go b/client/internal/lazyconn/manager/manager.go index 15979d553..718bdbddf 100644 --- a/client/internal/lazyconn/manager/manager.go +++ b/client/internal/lazyconn/manager/manager.go @@ -41,6 +41,7 @@ type Config struct { // - Handling connection establishment based on peer signaling // - Managing route HA groups and activating all peers in a group when one peer is activated type Manager struct { + engineCtx context.Context peerStore *peerstore.Store connStateDispatcher *dispatcher.ConnectionDispatcher inactivityThreshold time.Duration @@ -59,13 +60,15 @@ type Manager struct { haGroupToPeers map[route.HAUniqueID][]string // HA group -> peer IDs in the group routesMu sync.RWMutex // protects route mappings - cancel context.CancelFunc onInactive chan peerid.ConnID } -func NewManager(config Config, peerStore *peerstore.Store, wgIface lazyconn.WGIface, connStateDispatcher *dispatcher.ConnectionDispatcher) *Manager { +// NewManager creates a new lazy connection manager +// engineCtx is the context for creating peer Connection +func NewManager(config Config, engineCtx context.Context, peerStore *peerstore.Store, wgIface lazyconn.WGIface, connStateDispatcher *dispatcher.ConnectionDispatcher) *Manager { log.Infof("setup lazy connection service") m := &Manager{ + engineCtx: engineCtx, peerStore: peerStore, connStateDispatcher: connStateDispatcher, inactivityThreshold: inactivity.DefaultInactivityThreshold, @@ -136,7 +139,6 @@ func (m *Manager) UpdateRouteHAMap(haMap route.HAMap) { func (m *Manager) Start(ctx context.Context) { defer m.close() - ctx, m.cancel = context.WithCancel(ctx) for { select { case <-ctx.Done(): @@ -341,7 +343,7 @@ func (m *Manager) activateHAGroupPeers(ctx context.Context, triggerPeerID string if m.activateSinglePeer(ctx, cfg, mp) { activatedCount++ cfg.Log.Infof("activated peer as part of HA group %s (triggered by %s)", haGroup, triggerPeerID) - m.peerStore.PeerConnOpen(ctx, cfg.PublicKey) + m.peerStore.PeerConnOpen(m.engineCtx, cfg.PublicKey) } } } @@ -395,8 +397,6 @@ func (m *Manager) close() { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() - m.cancel() - m.connStateDispatcher.RemoveListener(m.connStateListener) m.activityManager.Close() for _, iw := range m.inactivityMonitors { @@ -438,7 +438,7 @@ func (m *Manager) onPeerActivity(ctx context.Context, peerConnID peerid.ConnID) m.activateHAGroupPeers(ctx, mp.peerCfg.PublicKey) - m.peerStore.PeerConnOpen(ctx, mp.peerCfg.PublicKey) + m.peerStore.PeerConnOpen(m.engineCtx, mp.peerCfg.PublicKey) } func (m *Manager) onPeerInactivityTimedOut(peerConnID peerid.ConnID) {