[client] Refactor context management in ConnMgr for clarity and consistency (#3951)

In the conn_mgr we must distinguish two contexts. One is relevant for lazy-manager, and one (engine context) is relevant for peer creation. If we use the incorrect context, then when we disable the lazy connection feature, we cancel the peer connections too, instead of just the lazy manager.
This commit is contained in:
Zoltan Papp
2025-06-11 11:04:44 +02:00
committed by GitHub
parent 87376afd13
commit 75feb0da8b
3 changed files with 26 additions and 28 deletions

View File

@ -34,9 +34,9 @@ type ConnMgr struct {
lazyConnMgr *manager.Manager
wg sync.WaitGroup
ctx context.Context
ctxCancel context.CancelFunc
wg sync.WaitGroup
lazyCtx context.Context
lazyCtxCancel context.CancelFunc
}
func NewConnMgr(engineConfig *EngineConfig, statusRecorder *peer.Status, peerStore *peerstore.Store, iface lazyconn.WGIface, dispatcher *dispatcher.ConnectionDispatcher) *ConnMgr {
@ -86,7 +86,7 @@ func (e *ConnMgr) UpdatedRemoteFeatureFlag(ctx context.Context, enabled bool) er
log.Infof("lazy connection manager is enabled by management feature flag")
e.initLazyManager(ctx)
e.statusRecorder.UpdateLazyConnection(true)
return e.addPeersToLazyConnManager(ctx)
return e.addPeersToLazyConnManager()
} else {
if e.lazyConnMgr == nil {
return nil
@ -109,7 +109,7 @@ func (e *ConnMgr) UpdateRouteHAMap(haMap route.HAMap) {
}
// SetExcludeList sets the list of peer IDs that should always have permanent connections.
func (e *ConnMgr) SetExcludeList(peerIDs map[string]bool) {
func (e *ConnMgr) SetExcludeList(ctx context.Context, peerIDs map[string]bool) {
if e.lazyConnMgr == nil {
return
}
@ -133,7 +133,7 @@ func (e *ConnMgr) SetExcludeList(peerIDs map[string]bool) {
excludedPeers = append(excludedPeers, lazyPeerCfg)
}
added := e.lazyConnMgr.ExcludePeer(e.ctx, excludedPeers)
added := e.lazyConnMgr.ExcludePeer(e.lazyCtx, excludedPeers)
for _, peerID := range added {
var peerConn *peer.Conn
var exists bool
@ -143,7 +143,7 @@ func (e *ConnMgr) SetExcludeList(peerIDs map[string]bool) {
}
peerConn.Log.Infof("peer has been added to lazy connection exclude list, opening permanent connection")
if err := peerConn.Open(e.ctx); err != nil {
if err := peerConn.Open(ctx); err != nil {
peerConn.Log.Errorf("failed to open connection: %v", err)
}
}
@ -221,9 +221,9 @@ func (e *ConnMgr) OnSignalMsg(ctx context.Context, peerKey string) (*peer.Conn,
return conn, true
}
if found := e.lazyConnMgr.ActivatePeer(ctx, peerKey); found {
if found := e.lazyConnMgr.ActivatePeer(e.lazyCtx, peerKey); found {
conn.Log.Infof("activated peer from inactive state")
if err := conn.Open(e.ctx); err != nil {
if err := conn.Open(ctx); err != nil {
conn.Log.Errorf("failed to open connection: %v", err)
}
}
@ -235,29 +235,27 @@ func (e *ConnMgr) Close() {
return
}
e.ctxCancel()
e.lazyCtxCancel()
e.wg.Wait()
e.lazyConnMgr = nil
}
func (e *ConnMgr) initLazyManager(parentCtx context.Context) {
func (e *ConnMgr) initLazyManager(engineCtx context.Context) {
cfg := manager.Config{
InactivityThreshold: inactivityThresholdEnv(),
}
e.lazyConnMgr = manager.NewManager(cfg, e.peerStore, e.iface, e.dispatcher)
e.lazyConnMgr = manager.NewManager(cfg, engineCtx, e.peerStore, e.iface, e.dispatcher)
ctx, cancel := context.WithCancel(parentCtx)
e.ctx = ctx
e.ctxCancel = cancel
e.lazyCtx, e.lazyCtxCancel = context.WithCancel(engineCtx)
e.wg.Add(1)
go func() {
defer e.wg.Done()
e.lazyConnMgr.Start(ctx)
e.lazyConnMgr.Start(e.lazyCtx)
}()
}
func (e *ConnMgr) addPeersToLazyConnManager(ctx context.Context) error {
func (e *ConnMgr) addPeersToLazyConnManager() error {
peers := e.peerStore.PeersPubKey()
lazyPeerCfgs := make([]lazyconn.PeerConfig, 0, len(peers))
for _, peerID := range peers {
@ -277,7 +275,7 @@ func (e *ConnMgr) addPeersToLazyConnManager(ctx context.Context) error {
lazyPeerCfgs = append(lazyPeerCfgs, lazyPeerCfg)
}
return e.lazyConnMgr.AddActivePeers(ctx, lazyPeerCfgs)
return e.lazyConnMgr.AddActivePeers(e.lazyCtx, lazyPeerCfgs)
}
func (e *ConnMgr) closeManager(ctx context.Context) {
@ -285,7 +283,7 @@ func (e *ConnMgr) closeManager(ctx context.Context) {
return
}
e.ctxCancel()
e.lazyCtxCancel()
e.wg.Wait()
e.lazyConnMgr = nil
@ -295,7 +293,7 @@ func (e *ConnMgr) closeManager(ctx context.Context) {
}
func (e *ConnMgr) isStartedWithLazyMgr() bool {
return e.lazyConnMgr != nil && e.ctxCancel != nil
return e.lazyConnMgr != nil && e.lazyCtxCancel != nil
}
func inactivityThresholdEnv() *time.Duration {

View File

@ -1082,7 +1082,7 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
// must set the exclude list after the peers are added. Without it the manager can not figure out the peers parameters from the store
excludedLazyPeers := e.toExcludedLazyPeers(forwardingRules, networkMap.GetRemotePeers())
e.connMgr.SetExcludeList(excludedLazyPeers)
e.connMgr.SetExcludeList(e.ctx, excludedLazyPeers)
e.networkSerial = serial

View File

@ -41,6 +41,7 @@ type Config struct {
// - Handling connection establishment based on peer signaling
// - Managing route HA groups and activating all peers in a group when one peer is activated
type Manager struct {
engineCtx context.Context
peerStore *peerstore.Store
connStateDispatcher *dispatcher.ConnectionDispatcher
inactivityThreshold time.Duration
@ -59,13 +60,15 @@ type Manager struct {
haGroupToPeers map[route.HAUniqueID][]string // HA group -> peer IDs in the group
routesMu sync.RWMutex // protects route mappings
cancel context.CancelFunc
onInactive chan peerid.ConnID
}
func NewManager(config Config, peerStore *peerstore.Store, wgIface lazyconn.WGIface, connStateDispatcher *dispatcher.ConnectionDispatcher) *Manager {
// NewManager creates a new lazy connection manager
// engineCtx is the context for creating peer Connection
func NewManager(config Config, engineCtx context.Context, peerStore *peerstore.Store, wgIface lazyconn.WGIface, connStateDispatcher *dispatcher.ConnectionDispatcher) *Manager {
log.Infof("setup lazy connection service")
m := &Manager{
engineCtx: engineCtx,
peerStore: peerStore,
connStateDispatcher: connStateDispatcher,
inactivityThreshold: inactivity.DefaultInactivityThreshold,
@ -136,7 +139,6 @@ func (m *Manager) UpdateRouteHAMap(haMap route.HAMap) {
func (m *Manager) Start(ctx context.Context) {
defer m.close()
ctx, m.cancel = context.WithCancel(ctx)
for {
select {
case <-ctx.Done():
@ -341,7 +343,7 @@ func (m *Manager) activateHAGroupPeers(ctx context.Context, triggerPeerID string
if m.activateSinglePeer(ctx, cfg, mp) {
activatedCount++
cfg.Log.Infof("activated peer as part of HA group %s (triggered by %s)", haGroup, triggerPeerID)
m.peerStore.PeerConnOpen(ctx, cfg.PublicKey)
m.peerStore.PeerConnOpen(m.engineCtx, cfg.PublicKey)
}
}
}
@ -395,8 +397,6 @@ func (m *Manager) close() {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
m.cancel()
m.connStateDispatcher.RemoveListener(m.connStateListener)
m.activityManager.Close()
for _, iw := range m.inactivityMonitors {
@ -438,7 +438,7 @@ func (m *Manager) onPeerActivity(ctx context.Context, peerConnID peerid.ConnID)
m.activateHAGroupPeers(ctx, mp.peerCfg.PublicKey)
m.peerStore.PeerConnOpen(ctx, mp.peerCfg.PublicKey)
m.peerStore.PeerConnOpen(m.engineCtx, mp.peerCfg.PublicKey)
}
func (m *Manager) onPeerInactivityTimedOut(peerConnID peerid.ConnID) {