diff --git a/client/internal/conn_mgr.go b/client/internal/conn_mgr.go index 88b47c511..aac312dc3 100644 --- a/client/internal/conn_mgr.go +++ b/client/internal/conn_mgr.go @@ -34,9 +34,9 @@ type ConnMgr struct { lazyConnMgr *manager.Manager - wg sync.WaitGroup - ctx context.Context - ctxCancel context.CancelFunc + wg sync.WaitGroup + lazyCtx context.Context + lazyCtxCancel context.CancelFunc } func NewConnMgr(engineConfig *EngineConfig, statusRecorder *peer.Status, peerStore *peerstore.Store, iface lazyconn.WGIface, dispatcher *dispatcher.ConnectionDispatcher) *ConnMgr { @@ -86,7 +86,7 @@ func (e *ConnMgr) UpdatedRemoteFeatureFlag(ctx context.Context, enabled bool) er log.Infof("lazy connection manager is enabled by management feature flag") e.initLazyManager(ctx) e.statusRecorder.UpdateLazyConnection(true) - return e.addPeersToLazyConnManager(ctx) + return e.addPeersToLazyConnManager() } else { if e.lazyConnMgr == nil { return nil @@ -109,7 +109,7 @@ func (e *ConnMgr) UpdateRouteHAMap(haMap route.HAMap) { } // SetExcludeList sets the list of peer IDs that should always have permanent connections. -func (e *ConnMgr) SetExcludeList(peerIDs map[string]bool) { +func (e *ConnMgr) SetExcludeList(ctx context.Context, peerIDs map[string]bool) { if e.lazyConnMgr == nil { return } @@ -133,7 +133,7 @@ func (e *ConnMgr) SetExcludeList(peerIDs map[string]bool) { excludedPeers = append(excludedPeers, lazyPeerCfg) } - added := e.lazyConnMgr.ExcludePeer(e.ctx, excludedPeers) + added := e.lazyConnMgr.ExcludePeer(e.lazyCtx, excludedPeers) for _, peerID := range added { var peerConn *peer.Conn var exists bool @@ -143,7 +143,7 @@ func (e *ConnMgr) SetExcludeList(peerIDs map[string]bool) { } peerConn.Log.Infof("peer has been added to lazy connection exclude list, opening permanent connection") - if err := peerConn.Open(e.ctx); err != nil { + if err := peerConn.Open(ctx); err != nil { peerConn.Log.Errorf("failed to open connection: %v", err) } } @@ -221,9 +221,9 @@ func (e *ConnMgr) OnSignalMsg(ctx context.Context, peerKey string) (*peer.Conn, return conn, true } - if found := e.lazyConnMgr.ActivatePeer(ctx, peerKey); found { + if found := e.lazyConnMgr.ActivatePeer(e.lazyCtx, peerKey); found { conn.Log.Infof("activated peer from inactive state") - if err := conn.Open(e.ctx); err != nil { + if err := conn.Open(ctx); err != nil { conn.Log.Errorf("failed to open connection: %v", err) } } @@ -235,29 +235,27 @@ func (e *ConnMgr) Close() { return } - e.ctxCancel() + e.lazyCtxCancel() e.wg.Wait() e.lazyConnMgr = nil } -func (e *ConnMgr) initLazyManager(parentCtx context.Context) { +func (e *ConnMgr) initLazyManager(engineCtx context.Context) { cfg := manager.Config{ InactivityThreshold: inactivityThresholdEnv(), } - e.lazyConnMgr = manager.NewManager(cfg, e.peerStore, e.iface, e.dispatcher) + e.lazyConnMgr = manager.NewManager(cfg, engineCtx, e.peerStore, e.iface, e.dispatcher) - ctx, cancel := context.WithCancel(parentCtx) - e.ctx = ctx - e.ctxCancel = cancel + e.lazyCtx, e.lazyCtxCancel = context.WithCancel(engineCtx) e.wg.Add(1) go func() { defer e.wg.Done() - e.lazyConnMgr.Start(ctx) + e.lazyConnMgr.Start(e.lazyCtx) }() } -func (e *ConnMgr) addPeersToLazyConnManager(ctx context.Context) error { +func (e *ConnMgr) addPeersToLazyConnManager() error { peers := e.peerStore.PeersPubKey() lazyPeerCfgs := make([]lazyconn.PeerConfig, 0, len(peers)) for _, peerID := range peers { @@ -277,7 +275,7 @@ func (e *ConnMgr) addPeersToLazyConnManager(ctx context.Context) error { lazyPeerCfgs = append(lazyPeerCfgs, lazyPeerCfg) } - return e.lazyConnMgr.AddActivePeers(ctx, lazyPeerCfgs) + return e.lazyConnMgr.AddActivePeers(e.lazyCtx, lazyPeerCfgs) } func (e *ConnMgr) closeManager(ctx context.Context) { @@ -285,7 +283,7 @@ func (e *ConnMgr) closeManager(ctx context.Context) { return } - e.ctxCancel() + e.lazyCtxCancel() e.wg.Wait() e.lazyConnMgr = nil @@ -295,7 +293,7 @@ func (e *ConnMgr) closeManager(ctx context.Context) { } func (e *ConnMgr) isStartedWithLazyMgr() bool { - return e.lazyConnMgr != nil && e.ctxCancel != nil + return e.lazyConnMgr != nil && e.lazyCtxCancel != nil } func inactivityThresholdEnv() *time.Duration { diff --git a/client/internal/engine.go b/client/internal/engine.go index b3b7d1062..253ecb2a6 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -1082,7 +1082,7 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error { // must set the exclude list after the peers are added. Without it the manager can not figure out the peers parameters from the store excludedLazyPeers := e.toExcludedLazyPeers(forwardingRules, networkMap.GetRemotePeers()) - e.connMgr.SetExcludeList(excludedLazyPeers) + e.connMgr.SetExcludeList(e.ctx, excludedLazyPeers) e.networkSerial = serial diff --git a/client/internal/lazyconn/manager/manager.go b/client/internal/lazyconn/manager/manager.go index 15979d553..718bdbddf 100644 --- a/client/internal/lazyconn/manager/manager.go +++ b/client/internal/lazyconn/manager/manager.go @@ -41,6 +41,7 @@ type Config struct { // - Handling connection establishment based on peer signaling // - Managing route HA groups and activating all peers in a group when one peer is activated type Manager struct { + engineCtx context.Context peerStore *peerstore.Store connStateDispatcher *dispatcher.ConnectionDispatcher inactivityThreshold time.Duration @@ -59,13 +60,15 @@ type Manager struct { haGroupToPeers map[route.HAUniqueID][]string // HA group -> peer IDs in the group routesMu sync.RWMutex // protects route mappings - cancel context.CancelFunc onInactive chan peerid.ConnID } -func NewManager(config Config, peerStore *peerstore.Store, wgIface lazyconn.WGIface, connStateDispatcher *dispatcher.ConnectionDispatcher) *Manager { +// NewManager creates a new lazy connection manager +// engineCtx is the context for creating peer Connection +func NewManager(config Config, engineCtx context.Context, peerStore *peerstore.Store, wgIface lazyconn.WGIface, connStateDispatcher *dispatcher.ConnectionDispatcher) *Manager { log.Infof("setup lazy connection service") m := &Manager{ + engineCtx: engineCtx, peerStore: peerStore, connStateDispatcher: connStateDispatcher, inactivityThreshold: inactivity.DefaultInactivityThreshold, @@ -136,7 +139,6 @@ func (m *Manager) UpdateRouteHAMap(haMap route.HAMap) { func (m *Manager) Start(ctx context.Context) { defer m.close() - ctx, m.cancel = context.WithCancel(ctx) for { select { case <-ctx.Done(): @@ -341,7 +343,7 @@ func (m *Manager) activateHAGroupPeers(ctx context.Context, triggerPeerID string if m.activateSinglePeer(ctx, cfg, mp) { activatedCount++ cfg.Log.Infof("activated peer as part of HA group %s (triggered by %s)", haGroup, triggerPeerID) - m.peerStore.PeerConnOpen(ctx, cfg.PublicKey) + m.peerStore.PeerConnOpen(m.engineCtx, cfg.PublicKey) } } } @@ -395,8 +397,6 @@ func (m *Manager) close() { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() - m.cancel() - m.connStateDispatcher.RemoveListener(m.connStateListener) m.activityManager.Close() for _, iw := range m.inactivityMonitors { @@ -438,7 +438,7 @@ func (m *Manager) onPeerActivity(ctx context.Context, peerConnID peerid.ConnID) m.activateHAGroupPeers(ctx, mp.peerCfg.PublicKey) - m.peerStore.PeerConnOpen(ctx, mp.peerCfg.PublicKey) + m.peerStore.PeerConnOpen(m.engineCtx, mp.peerCfg.PublicKey) } func (m *Manager) onPeerInactivityTimedOut(peerConnID peerid.ConnID) {