From 2a51609436f78544dd24fd416072325ce665bf15 Mon Sep 17 00:00:00 2001 From: Viktor Liu <17948409+lixmal@users.noreply.github.com> Date: Fri, 20 Jun 2025 18:07:19 +0200 Subject: [PATCH] [client] Handle lazy routing peers that are part of HA groups (#3943) * Activate new lazy routing peers if the HA group is active * Prevent lazy peers going to idle if HA group members are active (#3948) --- client/internal/conn_mgr.go | 2 +- .../lazyconn/inactivity/inactivity.go | 5 + client/internal/lazyconn/manager/manager.go | 149 +++++++++++++++--- client/internal/peer/conn.go | 6 +- 4 files changed, 137 insertions(+), 25 deletions(-) diff --git a/client/internal/conn_mgr.go b/client/internal/conn_mgr.go index aac312dc3..c630d3052 100644 --- a/client/internal/conn_mgr.go +++ b/client/internal/conn_mgr.go @@ -175,7 +175,7 @@ func (e *ConnMgr) AddPeerConn(ctx context.Context, peerKey string, conn *peer.Co PeerConnID: conn.ConnID(), Log: conn.Log, } - excluded, err := e.lazyConnMgr.AddPeer(lazyPeerCfg) + excluded, err := e.lazyConnMgr.AddPeer(e.lazyCtx, lazyPeerCfg) if err != nil { conn.Log.Errorf("failed to add peer to lazyconn manager: %v", err) if err := conn.Open(ctx); err != nil { diff --git a/client/internal/lazyconn/inactivity/inactivity.go b/client/internal/lazyconn/inactivity/inactivity.go index a30c1846d..9b7c8511b 100644 --- a/client/internal/lazyconn/inactivity/inactivity.go +++ b/client/internal/lazyconn/inactivity/inactivity.go @@ -68,3 +68,8 @@ func (i *Monitor) PauseTimer() { func (i *Monitor) ResetTimer() { i.timer.Reset(i.inactivityThreshold) } + +func (i *Monitor) ResetMonitor(ctx context.Context, timeoutChan chan peer.ConnID) { + i.Stop() + go i.Start(ctx, timeoutChan) +} diff --git a/client/internal/lazyconn/manager/manager.go b/client/internal/lazyconn/manager/manager.go index 718bdbddf..74ede50a7 100644 --- a/client/internal/lazyconn/manager/manager.go +++ b/client/internal/lazyconn/manager/manager.go @@ -58,7 +58,7 @@ type Manager struct { // Route HA group management peerToHAGroups map[string][]route.HAUniqueID // peer ID -> HA groups they belong to haGroupToPeers map[route.HAUniqueID][]string // HA group -> peer IDs in the group - routesMu sync.RWMutex // protects route mappings + routesMu sync.RWMutex onInactive chan peerid.ConnID } @@ -146,7 +146,7 @@ func (m *Manager) Start(ctx context.Context) { case peerConnID := <-m.activityManager.OnActivityChan: m.onPeerActivity(ctx, peerConnID) case peerConnID := <-m.onInactive: - m.onPeerInactivityTimedOut(peerConnID) + m.onPeerInactivityTimedOut(ctx, peerConnID) } } } @@ -197,7 +197,7 @@ func (m *Manager) ExcludePeer(ctx context.Context, peerConfigs []lazyconn.PeerCo return added } -func (m *Manager) AddPeer(peerCfg lazyconn.PeerConfig) (bool, error) { +func (m *Manager) AddPeer(ctx context.Context, peerCfg lazyconn.PeerConfig) (bool, error) { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() @@ -225,6 +225,13 @@ func (m *Manager) AddPeer(peerCfg lazyconn.PeerConfig) (bool, error) { peerCfg: &peerCfg, expectedWatcher: watcherActivity, } + + // Check if this peer should be activated because its HA group peers are active + if group, ok := m.shouldActivateNewPeer(peerCfg.PublicKey); ok { + peerCfg.Log.Debugf("peer belongs to active HA group %s, will activate immediately", group) + m.activateNewPeerInActiveGroup(ctx, peerCfg) + } + return false, nil } @@ -315,36 +322,38 @@ func (m *Manager) activateSinglePeer(ctx context.Context, cfg *lazyconn.PeerConf // activateHAGroupPeers activates all peers in HA groups that the given peer belongs to func (m *Manager) activateHAGroupPeers(ctx context.Context, triggerPeerID string) { + var peersToActivate []string + m.routesMu.RLock() haGroups := m.peerToHAGroups[triggerPeerID] - m.routesMu.RUnlock() if len(haGroups) == 0 { + m.routesMu.RUnlock() log.Debugf("peer %s is not part of any HA groups", triggerPeerID) return } - activatedCount := 0 for _, haGroup := range haGroups { - m.routesMu.RLock() peers := m.haGroupToPeers[haGroup] - m.routesMu.RUnlock() - for _, peerID := range peers { - if peerID == triggerPeerID { - continue + if peerID != triggerPeerID { + peersToActivate = append(peersToActivate, peerID) } + } + } + m.routesMu.RUnlock() - cfg, mp := m.getPeerForActivation(peerID) - if cfg == nil { - continue - } + activatedCount := 0 + for _, peerID := range peersToActivate { + cfg, mp := m.getPeerForActivation(peerID) + if cfg == nil { + continue + } - if m.activateSinglePeer(ctx, cfg, mp) { - activatedCount++ - cfg.Log.Infof("activated peer as part of HA group %s (triggered by %s)", haGroup, triggerPeerID) - m.peerStore.PeerConnOpen(m.engineCtx, cfg.PublicKey) - } + if m.activateSinglePeer(ctx, cfg, mp) { + activatedCount++ + cfg.Log.Infof("activated peer as part of HA group (triggered by %s)", triggerPeerID) + m.peerStore.PeerConnOpen(m.engineCtx, cfg.PublicKey) } } @@ -354,6 +363,51 @@ func (m *Manager) activateHAGroupPeers(ctx context.Context, triggerPeerID string } } +// shouldActivateNewPeer checks if a newly added peer should be activated +// because other peers in its HA groups are already active +func (m *Manager) shouldActivateNewPeer(peerID string) (route.HAUniqueID, bool) { + m.routesMu.RLock() + defer m.routesMu.RUnlock() + + haGroups := m.peerToHAGroups[peerID] + if len(haGroups) == 0 { + return "", false + } + + for _, haGroup := range haGroups { + peers := m.haGroupToPeers[haGroup] + for _, groupPeerID := range peers { + if groupPeerID == peerID { + continue + } + + cfg, ok := m.managedPeers[groupPeerID] + if !ok { + continue + } + if mp, ok := m.managedPeersByConnID[cfg.PeerConnID]; ok && mp.expectedWatcher == watcherInactivity { + return haGroup, true + } + } + } + return "", false +} + +// activateNewPeerInActiveGroup activates a newly added peer that should be active due to HA group +func (m *Manager) activateNewPeerInActiveGroup(ctx context.Context, peerCfg lazyconn.PeerConfig) { + mp, ok := m.managedPeersByConnID[peerCfg.PeerConnID] + if !ok { + return + } + + if !m.activateSinglePeer(ctx, &peerCfg, mp) { + return + } + + peerCfg.Log.Infof("activated newly added peer due to active HA group peers") + m.peerStore.PeerConnOpen(m.engineCtx, peerCfg.PublicKey) +} + func (m *Manager) addActivePeer(ctx context.Context, peerCfg lazyconn.PeerConfig) error { if _, ok := m.managedPeers[peerCfg.PublicKey]; ok { peerCfg.Log.Warnf("peer already managed") @@ -415,6 +469,48 @@ func (m *Manager) close() { log.Infof("lazy connection manager closed") } +// shouldDeferIdleForHA checks if peer should stay connected due to HA group requirements +func (m *Manager) shouldDeferIdleForHA(peerID string) bool { + m.routesMu.RLock() + defer m.routesMu.RUnlock() + + haGroups := m.peerToHAGroups[peerID] + if len(haGroups) == 0 { + return false + } + + for _, haGroup := range haGroups { + groupPeers := m.haGroupToPeers[haGroup] + + for _, groupPeerID := range groupPeers { + if groupPeerID == peerID { + continue + } + + cfg, ok := m.managedPeers[groupPeerID] + if !ok { + continue + } + + groupMp, ok := m.managedPeersByConnID[cfg.PeerConnID] + if !ok { + continue + } + + if groupMp.expectedWatcher != watcherInactivity { + continue + } + + // Other member is still connected, defer idle + if peer, ok := m.peerStore.PeerConn(groupPeerID); ok && peer.IsConnected() { + return true + } + } + } + + return false +} + func (m *Manager) onPeerActivity(ctx context.Context, peerConnID peerid.ConnID) { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() @@ -441,7 +537,7 @@ func (m *Manager) onPeerActivity(ctx context.Context, peerConnID peerid.ConnID) m.peerStore.PeerConnOpen(m.engineCtx, mp.peerCfg.PublicKey) } -func (m *Manager) onPeerInactivityTimedOut(peerConnID peerid.ConnID) { +func (m *Manager) onPeerInactivityTimedOut(ctx context.Context, peerConnID peerid.ConnID) { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() @@ -456,6 +552,17 @@ func (m *Manager) onPeerInactivityTimedOut(peerConnID peerid.ConnID) { return } + if m.shouldDeferIdleForHA(mp.peerCfg.PublicKey) { + iw, ok := m.inactivityMonitors[peerConnID] + if ok { + mp.peerCfg.Log.Debugf("resetting inactivity timer due to HA group requirements") + iw.ResetMonitor(ctx, m.onInactive) + } else { + mp.peerCfg.Log.Errorf("inactivity monitor not found for HA defer reset") + } + return + } + mp.peerCfg.Log.Infof("connection timed out") // this is blocking operation, potentially can be optimized @@ -489,7 +596,7 @@ func (m *Manager) onPeerConnected(peerConnID peerid.ConnID) { iw, ok := m.inactivityMonitors[mp.peerCfg.PeerConnID] if !ok { - mp.peerCfg.Log.Errorf("inactivity monitor not found for peer") + mp.peerCfg.Log.Warnf("inactivity monitor not found for peer") return } diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index b33023873..c3f44cc7f 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -317,12 +317,12 @@ func (conn *Conn) WgConfig() WgConfig { return conn.config.WgConfig } -// IsConnected unit tests only -// refactor unit test to use status recorder use refactor status recorded to manage connection status in peer.Conn +// IsConnected returns true if the peer is connected func (conn *Conn) IsConnected() bool { conn.mu.Lock() defer conn.mu.Unlock() - return conn.currentConnPriority != conntype.None + + return conn.evalStatus() == StatusConnected } func (conn *Conn) GetKey() string {