Add historical based algorithm

This commit is contained in:
Zoltán Papp 2025-06-21 19:38:44 +02:00
parent 9691e197df
commit 9307e7e0ea
5 changed files with 199 additions and 92 deletions

View File

@ -1,7 +1,12 @@
package inactivity
import (
"container/list"
"context"
"fmt"
"math"
"os"
"strconv"
"time"
log "github.com/sirupsen/logrus"
@ -24,40 +29,86 @@ const (
handshakeRespBytes = 92
handshakeMaxInterval = 3 * time.Minute
checkInterval = keepAliveInterval // todo: 5 * time.Second
keepAliveCheckPeriod = keepAliveInterval
checkInterval = 1 * time.Minute
inactivityThreshold = 3 // number of checks to consider peer inactive
)
const (
// todo make it configurable
DefaultInactivityThreshold = 60 * time.Minute // idle after 1 hour inactivity
DefaultInactivityThreshold = 5 * time.Minute
MinimumInactivityThreshold = 3 * time.Minute
recorderEnv = "NB_LAZYCONN_RECORDER_ENABLED"
)
type WgInterface interface {
GetStats() (map[string]configurer.WGStats, error)
}
type peerInfo struct {
lastRxBytesAtLastIdleCheck int64
lastIdleCheckAt time.Time
inActivityInRow int
log *log.Entry
type peerHistory struct {
lastRxBytes int64 // last received bytes
bytesHistory *list.List // linked list of int64
historySize int
summarizedBytes int64
log *log.Entry
}
func newPeerHistory(log *log.Entry, historySize int) *peerHistory {
return &peerHistory{
bytesHistory: list.New(),
historySize: historySize,
log: log,
}
}
func (pi *peerHistory) appendRxBytes(rxBytes int64) {
// If at capacity, remove the oldest element (front)
if pi.bytesHistory.Len() == pi.historySize {
pi.summarizedBytes -= pi.bytesHistory.Front().Value.(int64)
pi.bytesHistory.Remove(pi.bytesHistory.Front())
}
// Add the new rxBytes at the back
pi.bytesHistory.PushBack(rxBytes)
pi.summarizedBytes += rxBytes
}
func (pi *peerHistory) historyString() string {
var history []string
for e := pi.bytesHistory.Front(); e != nil; e = e.Next() {
history = append(history, fmt.Sprintf("%d", e.Value.(int64)))
}
return fmt.Sprintf("%s", history)
}
func (pi *peerHistory) reset() {
for e := pi.bytesHistory.Front(); e != nil; e = e.Next() {
e.Value = int64(0)
}
pi.summarizedBytes = 0
}
type Manager struct {
InactivePeersChan chan []string
iface WgInterface
interestedPeers map[string]*peerInfo
interestedPeers map[string]*peerHistory
maxBytesPerPeriod int64
historySize int // Size of the history buffer for each peer, used to track received bytes over time
recorder *Recorder
}
func NewManager(iface WgInterface) *Manager {
func NewManager(iface WgInterface, configuredThreshold *time.Duration) *Manager {
inactivityThreshold, err := validateInactivityThreshold(configuredThreshold)
if err != nil {
inactivityThreshold = DefaultInactivityThreshold
log.Warnf("invalid inactivity threshold configured: %v, using default: %v", err, DefaultInactivityThreshold)
}
expectedMaxBytes := calculateExpectedMaxBytes(inactivityThreshold)
log.Infof("receive less than %d bytes per %v, will be considered inactive", expectedMaxBytes, inactivityThreshold)
return &Manager{
InactivePeersChan: make(chan []string, 1),
iface: iface,
interestedPeers: make(map[string]*peerInfo),
interestedPeers: make(map[string]*peerHistory),
historySize: calculateHistorySize(inactivityThreshold),
maxBytesPerPeriod: expectedMaxBytes,
}
}
@ -67,9 +118,7 @@ func (m *Manager) AddPeer(peerCfg *lazyconn.PeerConfig) {
}
peerCfg.Log.Debugf("adding peer to inactivity manager")
m.interestedPeers[peerCfg.PublicKey] = &peerInfo{
log: peerCfg.Log,
}
m.interestedPeers[peerCfg.PublicKey] = newPeerHistory(peerCfg.Log, m.historySize)
}
func (m *Manager) RemovePeer(peer string) {
@ -83,6 +132,12 @@ func (m *Manager) RemovePeer(peer string) {
}
func (m *Manager) Start(ctx context.Context) {
enabled, err := strconv.ParseBool(os.Getenv(recorderEnv))
if err == nil && enabled {
m.recorder = NewRecorder()
defer m.recorder.Close()
}
ticker := newTicker(checkInterval)
defer ticker.Stop()
@ -124,76 +179,69 @@ func (m *Manager) checkStats(now time.Time) ([]string, error) {
var idlePeers []string
for peer, info := range m.interestedPeers {
for peer, history := range m.interestedPeers {
stat, found := stats[peer]
if !found {
// when peer is in connecting state
info.log.Warnf("peer not found in wg stats")
history.log.Warnf("peer not found in wg stats")
}
// First measurement: initialize
if info.lastIdleCheckAt.IsZero() {
info.lastIdleCheckAt = now
info.lastRxBytesAtLastIdleCheck = stat.RxBytes
info.log.Infof("initializing RxBytes: %v, %v", now, stat.RxBytes)
deltaRx := stat.RxBytes - history.lastRxBytes
if deltaRx < 0 {
deltaRx = 0 // reset to zero if negative
history.reset()
}
m.recorder.ReceivedBytes(peer, now, deltaRx)
history.lastRxBytes = stat.RxBytes
history.appendRxBytes(deltaRx)
// not enough history to determine inactivity
if history.bytesHistory.Len() < m.historySize {
history.log.Tracef("not enough history to determine inactivity, current history size: %d, required: %d", history.bytesHistory.Len(), m.historySize)
continue
}
// check only every idleCheckDuration
if shouldSkipIdleCheck(now, info.lastIdleCheckAt) {
continue
}
// sometimes we measure false inactivity, so we need to check if we have inactivity in a row
if isInactive(stat, info) {
info.inActivityInRow++
} else {
info.inActivityInRow = 0
}
info.log.Infof("peer inactivity counter: %d", info.inActivityInRow)
if info.inActivityInRow >= inactivityThreshold {
info.log.Infof("peer is inactive for %d checks, marking as inactive", info.inActivityInRow)
history.log.Tracef("summarized Bytes: %d", history.summarizedBytes)
if history.summarizedBytes <= m.maxBytesPerPeriod {
idlePeers = append(idlePeers, peer)
info.inActivityInRow = 0
history.log.Tracef("peer is inactive, summarizedBytes: %d, maxBytesPerPeriod: %d, %v", history.summarizedBytes, m.maxBytesPerPeriod, history.historyString())
} else {
history.log.Tracef("peer is active, summarizedBytes: %d, maxBytesPerPeriod: %d, %v", history.summarizedBytes, m.maxBytesPerPeriod, history.historyString())
}
info.lastIdleCheckAt = now
info.lastRxBytesAtLastIdleCheck = stat.RxBytes
}
return idlePeers, nil
}
func isInactive(stat configurer.WGStats, info *peerInfo) bool {
rxSyncPrevPeriod := stat.RxBytes - info.lastRxBytesAtLastIdleCheck
// when the peer reconnected we lose the rx bytes from between the reset and the last check.
// We will suppose the peer was active
if rxSyncPrevPeriod < 0 {
info.log.Debugf("rxBytes decreased, resetting last rxBytes at last idle check")
return false
func validateInactivityThreshold(configuredThreshold *time.Duration) (time.Duration, error) {
if configuredThreshold == nil {
return DefaultInactivityThreshold, nil
}
switch rxSyncPrevPeriod {
case 0:
info.log.Debugf("peer inactive, received 0 bytes")
return true
case keepAliveBytes:
info.log.Debugf("peer inactive, only keep alive received, current RxBytes: %d", rxSyncPrevPeriod)
return true
case handshakeInitBytes + keepAliveBytes:
info.log.Debugf("peer inactive, only handshakeInitBytes + keepAliveBytes, current RxBytes: %d", rxSyncPrevPeriod)
return true
case handshakeRespBytes + keepAliveBytes:
info.log.Debugf("peer inactive, only handshakeRespBytes + keepAliveBytes, current RxBytes: %d", rxSyncPrevPeriod)
return true
default:
info.log.Infof("active, RxBytes: %d", rxSyncPrevPeriod)
return false
if *configuredThreshold < MinimumInactivityThreshold {
return 0, fmt.Errorf("configured inactivity threshold %v is too low, using %v", *configuredThreshold, MinimumInactivityThreshold)
}
return *configuredThreshold, nil
}
func shouldSkipIdleCheck(now, lastIdleCheckAt time.Time) bool {
minDuration := keepAliveCheckPeriod - (checkInterval / 2)
return now.Sub(lastIdleCheckAt) < minDuration
// calculateHistorySize calculates the number of history entries needed based on the inactivity threshold.
func calculateHistorySize(inactivityThreshold time.Duration) int {
return int(math.Ceil(inactivityThreshold.Minutes() / checkInterval.Minutes()))
}
func calculateExpectedMaxBytes(duration time.Duration) int64 {
// Calculate number of keep-alive packets expected
keepAliveCount := int64(duration.Seconds() / keepAliveInterval.Seconds())
keepAliveBytes := keepAliveCount * keepAliveBytes
// Calculate potential handshake packets (conservative estimate)
handshakeCount := int64(duration.Minutes() / handshakeMaxInterval.Minutes())
if handshakeCount == 0 && duration >= handshakeMaxInterval {
handshakeCount = 1
}
handshakeBytes := handshakeCount * (handshakeInitBytes + keepAliveBytes) // handshake + extra bytes
// todo: fine tune this value, add some overhead for unexpected lag
return keepAliveBytes + handshakeBytes
}

View File

@ -25,7 +25,7 @@ func TestNewManager(t *testing.T) {
t.Run(fmt.Sprintf("Scenario %d", i), func(t *testing.T) {
mock := newMockWgInterface("peer1", sc.Data, timer)
manager := NewManager(mock)
manager := NewManager(mock, nil)
peerCfg := &lazyconn.PeerConfig{
PublicKey: "peer1",
Log: log.WithField("peer", "peer1"),

View File

@ -0,0 +1,58 @@
package inactivity
import (
"fmt"
"os"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
type Recorder struct {
mu sync.Mutex
file *os.File
filename string
}
func NewRecorder() *Recorder {
timestamp := time.Now().Format("20060102_150405")
filename := fmt.Sprintf("inactivity_log_%s.txt", timestamp)
file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
log.Errorf("error opening file: %v", err)
}
return &Recorder{
file: file,
filename: filename,
}
}
func (r *Recorder) ReceivedBytes(peer string, now time.Time, bytes int64) {
if r == nil {
return
}
r.mu.Lock()
defer r.mu.Unlock()
entry := fmt.Sprintf("%s; %s; %d\n", now.Format(time.RFC3339), peer, bytes)
_, err := r.file.WriteString(entry)
if err != nil {
log.Errorf("error writing to file: %v", err)
}
}
func (r *Recorder) Close() {
if r == nil {
return
}
r.mu.Lock()
defer r.mu.Unlock()
if err := r.file.Close(); err != nil {
log.Errorf("error closing file: %v", err)
}
}

View File

@ -52,52 +52,60 @@ var scenarios = []scenario{
{
ExpectedInactive: true,
Data: []rxHistory{
//96
{when: 0 * time.Second, RxBytes: 32},
{when: 25 * time.Second, RxBytes: 32},
{when: 50 * time.Second, RxBytes: 32},
//212
{when: 75 * time.Second, RxBytes: 32},
{when: 100 * time.Second, RxBytes: 32},
{when: 100 * time.Second, RxBytes: 148},
//96
{when: 125 * time.Second, RxBytes: 32},
{when: 150 * time.Second, RxBytes: 32},
{when: 175 * time.Second, RxBytes: 32},
//212
{when: 200 * time.Second, RxBytes: 32},
{when: 225 * time.Second, RxBytes: 32},
{when: 225 * time.Second, RxBytes: 148},
//96
{when: 250 * time.Second, RxBytes: 32},
{when: 275 * time.Second, RxBytes: 32},
{when: 300 * time.Second, RxBytes: 32},
{when: 325 * time.Second, RxBytes: 32},
{when: 350 * time.Second, RxBytes: 32},
{when: 350 * time.Second, RxBytes: 148},
{when: 375 * time.Second, RxBytes: 32},
{when: 400 * time.Second, RxBytes: 32},
{when: 425 * time.Second, RxBytes: 32},
{when: 450 * time.Second, RxBytes: 32},
{when: 475 * time.Second, RxBytes: 32},
{when: 475 * time.Second, RxBytes: 148},
{when: 500 * time.Second, RxBytes: 32},
{when: 525 * time.Second, RxBytes: 32},
{when: 550 * time.Second, RxBytes: 32},
{when: 575 * time.Second, RxBytes: 32},
{when: 600 * time.Second, RxBytes: 32},
{when: 600 * time.Second, RxBytes: 148},
{when: 625 * time.Second, RxBytes: 32},
{when: 650 * time.Second, RxBytes: 32},
{when: 675 * time.Second, RxBytes: 32},
{when: 700 * time.Second, RxBytes: 32},
{when: 725 * time.Second, RxBytes: 32},
{when: 725 * time.Second, RxBytes: 148},
{when: 750 * time.Second, RxBytes: 32},
},
},
{
ExpectedInactive: false,
Data: []rxHistory{
{when: 0 * time.Second, RxBytes: 32},
{when: 25 * time.Second, RxBytes: 32},
{when: 50 * time.Second, RxBytes: 100},
{when: 75 * time.Second, RxBytes: 32},
},
},
}

View File

@ -67,6 +67,7 @@ type Manager struct {
// engineCtx is the context for creating peer Connection
func NewManager(config Config, engineCtx context.Context, peerStore *peerstore.Store, wgIface lazyconn.WGIface, connStateDispatcher *dispatcher.ConnectionDispatcher) *Manager {
log.Infof("setup lazy connection service")
m := &Manager{
engineCtx: engineCtx,
peerStore: peerStore,
@ -76,19 +77,11 @@ func NewManager(config Config, engineCtx context.Context, peerStore *peerstore.S
managedPeersByConnID: make(map[peerid.ConnID]*managedPeer),
excludes: make(map[string]lazyconn.PeerConfig),
activityManager: activity.NewManager(wgIface),
inactivityManager: inactivity.NewManager(wgIface),
inactivityManager: inactivity.NewManager(wgIface, config.InactivityThreshold),
peerToHAGroups: make(map[string][]route.HAUniqueID),
haGroupToPeers: make(map[route.HAUniqueID][]string),
}
if config.InactivityThreshold != nil {
if *config.InactivityThreshold >= inactivity.MinimumInactivityThreshold {
m.inactivityThreshold = *config.InactivityThreshold
} else {
log.Warnf("inactivity threshold is too low, using %v", m.inactivityThreshold)
}
}
m.connStateListener = &dispatcher.ConnectionListener{
OnConnected: m.onPeerConnected,
OnDisconnected: m.onPeerDisconnected,
@ -620,5 +613,5 @@ func (m *Manager) onPeerDisconnected(peerConnID peerid.ConnID) {
}
// todo reset inactivity monitor
mp.peerCfg.Log.Infof("--- peer disconnected, stopping inactivity monitor?")
mp.peerCfg.Log.Warnf("--- peer disconnected, stopping inactivity monitor?")
}