From 546538f570fd87d594ed1cc0cc6da8d9cf45a4ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Mon, 23 Jun 2025 19:51:41 +0200 Subject: [PATCH] Check 5 min intervals --- .../internal/lazyconn/inactivity/manager.go | 62 ++++++------ .../lazyconn/inactivity/scenarios_test.go | 98 +++++++++++++++++++ 2 files changed, 130 insertions(+), 30 deletions(-) diff --git a/client/internal/lazyconn/inactivity/manager.go b/client/internal/lazyconn/inactivity/manager.go index 34d12e25e..862afc6c5 100644 --- a/client/internal/lazyconn/inactivity/manager.go +++ b/client/internal/lazyconn/inactivity/manager.go @@ -30,9 +30,10 @@ const ( handshakeMaxInterval = 3 * time.Minute checkInterval = 1 * time.Minute + historySize = 5 * time.Minute - DefaultInactivityThreshold = 5 * time.Minute - MinimumInactivityThreshold = 3 * time.Minute + DefaultInactivityThreshold = 15 * time.Minute + MinimumInactivityThreshold = 5 * time.Minute recorderEnv = "NB_LAZYCONN_RECORDER_ENABLED" ) @@ -42,11 +43,12 @@ type WgInterface interface { } type peerHistory struct { - lastRxBytes int64 // last received bytes - bytesHistory *list.List // linked list of int64 - historySize int - summarizedBytes int64 - log *log.Entry + lastRxBytes int64 // last received bytes + bytesHistory *list.List // linked list of int64 + historySize int + summarizedBytes int64 + inactivityCounter int // counter to track inactivity + log *log.Entry } func newPeerHistory(log *log.Entry, historySize int) *peerHistory { @@ -92,6 +94,8 @@ type Manager struct { maxBytesPerPeriod int64 historySize int // Size of the history buffer for each peer, used to track received bytes over time recorder *Recorder + + thresholdOfInactivity int // Number of consecutive checks with low activity to consider a peer inactive } func NewManager(iface WgInterface, configuredThreshold *time.Duration) *Manager { @@ -101,14 +105,15 @@ func NewManager(iface WgInterface, configuredThreshold *time.Duration) *Manager log.Warnf("invalid inactivity threshold configured: %v, using default: %v", err, DefaultInactivityThreshold) } - expectedMaxBytes := calculateExpectedMaxBytes(inactivityThreshold) + expectedMaxBytes := calculateExpectedMaxBytes() log.Infof("receive less than %d bytes per %v, will be considered inactive", expectedMaxBytes, inactivityThreshold) return &Manager{ - InactivePeersChan: make(chan []string, 1), - iface: iface, - interestedPeers: make(map[string]*peerHistory), - historySize: calculateHistorySize(inactivityThreshold), - maxBytesPerPeriod: expectedMaxBytes, + InactivePeersChan: make(chan []string, 1), + iface: iface, + interestedPeers: make(map[string]*peerHistory), + historySize: int(historySize.Minutes()), + maxBytesPerPeriod: expectedMaxBytes, + thresholdOfInactivity: int(math.Ceil(inactivityThreshold.Minutes() / checkInterval.Minutes())), } } @@ -199,16 +204,22 @@ func (m *Manager) checkStats(now time.Time) ([]string, error) { // not enough history to determine inactivity if history.bytesHistory.Len() < m.historySize { - history.log.Tracef("not enough history to determine inactivity, current history size: %d, required: %d", history.bytesHistory.Len(), m.historySize) + history.log.Debugf("not enough history to determine inactivity, current history size: %d, required: %d", history.bytesHistory.Len(), m.historySize) continue } - history.log.Tracef("summarized Bytes: %d", history.summarizedBytes) if history.summarizedBytes <= m.maxBytesPerPeriod { - idlePeers = append(idlePeers, peer) - history.log.Tracef("peer is inactive, summarizedBytes: %d, maxBytesPerPeriod: %d, %v", history.summarizedBytes, m.maxBytesPerPeriod, history.historyString()) + history.inactivityCounter++ + history.log.Debugf("peer is inactive, summarizedBytes: %d, maxBytesPerPeriod: %d, %v", history.summarizedBytes, m.maxBytesPerPeriod, history.historyString()) } else { - history.log.Tracef("peer is active, summarizedBytes: %d, maxBytesPerPeriod: %d, %v", history.summarizedBytes, m.maxBytesPerPeriod, history.historyString()) + history.inactivityCounter = 0 // reset inactivity counter if activity is detected + history.log.Debugf("peer is active, summarizedBytes: %d, maxBytesPerPeriod: %d, %v", history.summarizedBytes, m.maxBytesPerPeriod, history.historyString()) + } + + if history.inactivityCounter >= m.thresholdOfInactivity { + history.log.Infof("peer is inactive for %d consecutive checks, marking as idle (limit %d) ", history.inactivityCounter, m.thresholdOfInactivity) + idlePeers = append(idlePeers, peer) + history.inactivityCounter = 0 // reset inactivity counter after marking as idle } } @@ -225,23 +236,14 @@ func validateInactivityThreshold(configuredThreshold *time.Duration) (time.Durat return *configuredThreshold, nil } -// calculateHistorySize calculates the number of history entries needed based on the inactivity threshold. -func calculateHistorySize(inactivityThreshold time.Duration) int { - return int(math.Ceil(inactivityThreshold.Minutes() / checkInterval.Minutes())) -} - -func calculateExpectedMaxBytes(duration time.Duration) int64 { +func calculateExpectedMaxBytes() int64 { // Calculate number of keep-alive packets expected - keepAliveCount := int64(duration.Seconds() / keepAliveInterval.Seconds()) + keepAliveCount := int64(historySize.Seconds() / keepAliveInterval.Seconds()) keepAliveBytes := keepAliveCount * keepAliveBytes // Calculate potential handshake packets (conservative estimate) - handshakeCount := int64(duration.Minutes() / handshakeMaxInterval.Minutes()) - if handshakeCount == 0 && duration >= handshakeMaxInterval { - handshakeCount = 1 - } + handshakeCount := int64(historySize.Minutes() / handshakeMaxInterval.Minutes()) handshakeBytes := handshakeCount * (handshakeInitBytes + keepAliveBytes) // handshake + extra bytes - // todo: fine tune this value, add some overhead for unexpected lag return keepAliveBytes + handshakeBytes } diff --git a/client/internal/lazyconn/inactivity/scenarios_test.go b/client/internal/lazyconn/inactivity/scenarios_test.go index b8c7257e3..58c9786ed 100644 --- a/client/internal/lazyconn/inactivity/scenarios_test.go +++ b/client/internal/lazyconn/inactivity/scenarios_test.go @@ -108,4 +108,102 @@ var scenarios = []scenario{ {when: 750 * time.Second, RxBytes: 32}, }, }, + { + // Rosenpass + ExpectedInactive: true, + Data: []rxHistory{ + {when: 0 * time.Second, RxBytes: 1200}, + {when: 0 * time.Second, RxBytes: 1200}, + {when: 0 * time.Second, RxBytes: 128}, + {when: 0 * time.Second, RxBytes: 1200}, + {when: 0 * time.Second, RxBytes: 128}, + {when: 0 * time.Second, RxBytes: 2}, + {when: 35 * time.Second, RxBytes: 32}, + {when: 60 * time.Second, RxBytes: 32}, + {when: 85 * time.Second, RxBytes: 32}, + {when: 110 * time.Second, RxBytes: 32}, + {when: 120 * time.Second, RxBytes: 1152}, + {when: 120 * time.Second, RxBytes: 92}, + {when: 120 * time.Second, RxBytes: 240}, + {when: 130 * time.Second, RxBytes: 1200}, + {when: 130 * time.Second, RxBytes: 32}, + {when: 130 * time.Second, RxBytes: 1200}, + {when: 130 * time.Second, RxBytes: 128}, + {when: 165 * time.Second, RxBytes: 32}, + {when: 190 * time.Second, RxBytes: 32}, + {when: 215 * time.Second, RxBytes: 32}, + {when: 240 * time.Second, RxBytes: 92}, + {when: 240 * time.Second, RxBytes: 1200}, + {when: 240 * time.Second, RxBytes: 128}, + {when: 260 * time.Second, RxBytes: 1200}, + {when: 260 * time.Second, RxBytes: 1200}, + {when: 260 * time.Second, RxBytes: 128}, + {when: 320 * time.Second, RxBytes: 32}, + {when: 345 * time.Second, RxBytes: 32}, + {when: 370 * time.Second, RxBytes: 92}, + {when: 370 * time.Second, RxBytes: 1200}, + {when: 370 * time.Second, RxBytes: 128}, + {when: 390 * time.Second, RxBytes: 1200}, + {when: 390 * time.Second, RxBytes: 128}, + {when: 450 * time.Second, RxBytes: 32}, + {when: 475 * time.Second, RxBytes: 32}, + {when: 500 * time.Second, RxBytes: 92}, + {when: 500 * time.Second, RxBytes: 1200}, + {when: 500 * time.Second, RxBytes: 128}, + {when: 520 * time.Second, RxBytes: 1200}, + {when: 520 * time.Second, RxBytes: 128}, + }, + }, + { + ExpectedInactive: true, + Data: []rxHistory{ + {when: 0 * time.Second, RxBytes: 1152}, + {when: 0 * time.Second, RxBytes: 1152}, + {when: 0 * time.Second, RxBytes: 240}, + {when: 0 * time.Second, RxBytes: 1152}, + {when: 1 * time.Second, RxBytes: 240}, + {when: 1 * time.Second, RxBytes: 2}, + {when: 11 * time.Second, RxBytes: 32}, + {when: 121 * time.Second, RxBytes: 1200}, + {when: 121 * time.Second, RxBytes: 148}, + {when: 121 * time.Second, RxBytes: 32}, + {when: 121 * time.Second, RxBytes: 128}, + {when: 131 * time.Second, RxBytes: 1152}, + {when: 131 * time.Second, RxBytes: 1152}, + {when: 131 * time.Second, RxBytes: 240}, + {when: 141 * time.Second, RxBytes: 32}, + {when: 191 * time.Second, RxBytes: 32}, + {when: 241 * time.Second, RxBytes: 1152}, + {when: 241 * time.Second, RxBytes: 148}, + {when: 241 * time.Second, RxBytes: 32}, + {when: 241 * time.Second, RxBytes: 240}, + {when: 251 * time.Second, RxBytes: 32}, + {when: 261 * time.Second, RxBytes: 1152}, + {when: 261 * time.Second, RxBytes: 1152}, + {when: 261 * time.Second, RxBytes: 240}, + {when: 271 * time.Second, RxBytes: 32}, + {when: 296 * time.Second, RxBytes: 32}, + {when: 371 * time.Second, RxBytes: 1152}, + {when: 371 * time.Second, RxBytes: 148}, + {when: 371 * time.Second, RxBytes: 32}, + {when: 371 * time.Second, RxBytes: 240}, + {when: 381 * time.Second, RxBytes: 32}, + {when: 391 * time.Second, RxBytes: 1152}, + {when: 391 * time.Second, RxBytes: 240}, + {when: 401 * time.Second, RxBytes: 32}, + {when: 426 * time.Second, RxBytes: 32}, + {when: 501 * time.Second, RxBytes: 1152}, + {when: 501 * time.Second, RxBytes: 148}, + {when: 501 * time.Second, RxBytes: 32}, + {when: 501 * time.Second, RxBytes: 240}, + {when: 511 * time.Second, RxBytes: 32}, + {when: 521 * time.Second, RxBytes: 1152}, + {when: 521 * time.Second, RxBytes: 240}, + {when: 531 * time.Second, RxBytes: 32}, + {when: 631 * time.Second, RxBytes: 1152}, + {when: 631 * time.Second, RxBytes: 148}, + {when: 631 * time.Second, RxBytes: 32}, + {when: 631 * time.Second, RxBytes: 240}, + }, + }, }