diff --git a/client/internal/lazyconn/inactivity/manager.go b/client/internal/lazyconn/inactivity/manager.go index 34340cd7f..34d12e25e 100644 --- a/client/internal/lazyconn/inactivity/manager.go +++ b/client/internal/lazyconn/inactivity/manager.go @@ -1,7 +1,12 @@ package inactivity import ( + "container/list" "context" + "fmt" + "math" + "os" + "strconv" "time" log "github.com/sirupsen/logrus" @@ -24,40 +29,86 @@ const ( handshakeRespBytes = 92 handshakeMaxInterval = 3 * time.Minute - checkInterval = keepAliveInterval // todo: 5 * time.Second - keepAliveCheckPeriod = keepAliveInterval + checkInterval = 1 * time.Minute - inactivityThreshold = 3 // number of checks to consider peer inactive -) - -const ( - // todo make it configurable - DefaultInactivityThreshold = 60 * time.Minute // idle after 1 hour inactivity + DefaultInactivityThreshold = 5 * time.Minute MinimumInactivityThreshold = 3 * time.Minute + + recorderEnv = "NB_LAZYCONN_RECORDER_ENABLED" ) type WgInterface interface { GetStats() (map[string]configurer.WGStats, error) } -type peerInfo struct { - lastRxBytesAtLastIdleCheck int64 - lastIdleCheckAt time.Time - inActivityInRow int - log *log.Entry +type peerHistory struct { + lastRxBytes int64 // last received bytes + bytesHistory *list.List // linked list of int64 + historySize int + summarizedBytes int64 + log *log.Entry +} + +func newPeerHistory(log *log.Entry, historySize int) *peerHistory { + return &peerHistory{ + bytesHistory: list.New(), + historySize: historySize, + log: log, + } +} + +func (pi *peerHistory) appendRxBytes(rxBytes int64) { + // If at capacity, remove the oldest element (front) + if pi.bytesHistory.Len() == pi.historySize { + pi.summarizedBytes -= pi.bytesHistory.Front().Value.(int64) + pi.bytesHistory.Remove(pi.bytesHistory.Front()) + } + + // Add the new rxBytes at the back + pi.bytesHistory.PushBack(rxBytes) + pi.summarizedBytes += rxBytes +} + +func (pi *peerHistory) historyString() string { + var history []string + for e := pi.bytesHistory.Front(); e != nil; e = e.Next() { + history = append(history, fmt.Sprintf("%d", e.Value.(int64))) + } + return fmt.Sprintf("%s", history) +} + +func (pi *peerHistory) reset() { + for e := pi.bytesHistory.Front(); e != nil; e = e.Next() { + e.Value = int64(0) + } + pi.summarizedBytes = 0 } type Manager struct { InactivePeersChan chan []string iface WgInterface - interestedPeers map[string]*peerInfo + interestedPeers map[string]*peerHistory + + maxBytesPerPeriod int64 + historySize int // Size of the history buffer for each peer, used to track received bytes over time + recorder *Recorder } -func NewManager(iface WgInterface) *Manager { +func NewManager(iface WgInterface, configuredThreshold *time.Duration) *Manager { + inactivityThreshold, err := validateInactivityThreshold(configuredThreshold) + if err != nil { + inactivityThreshold = DefaultInactivityThreshold + log.Warnf("invalid inactivity threshold configured: %v, using default: %v", err, DefaultInactivityThreshold) + } + + expectedMaxBytes := calculateExpectedMaxBytes(inactivityThreshold) + log.Infof("receive less than %d bytes per %v, will be considered inactive", expectedMaxBytes, inactivityThreshold) return &Manager{ InactivePeersChan: make(chan []string, 1), iface: iface, - interestedPeers: make(map[string]*peerInfo), + interestedPeers: make(map[string]*peerHistory), + historySize: calculateHistorySize(inactivityThreshold), + maxBytesPerPeriod: expectedMaxBytes, } } @@ -67,9 +118,7 @@ func (m *Manager) AddPeer(peerCfg *lazyconn.PeerConfig) { } peerCfg.Log.Debugf("adding peer to inactivity manager") - m.interestedPeers[peerCfg.PublicKey] = &peerInfo{ - log: peerCfg.Log, - } + m.interestedPeers[peerCfg.PublicKey] = newPeerHistory(peerCfg.Log, m.historySize) } func (m *Manager) RemovePeer(peer string) { @@ -83,6 +132,12 @@ func (m *Manager) RemovePeer(peer string) { } func (m *Manager) Start(ctx context.Context) { + enabled, err := strconv.ParseBool(os.Getenv(recorderEnv)) + if err == nil && enabled { + m.recorder = NewRecorder() + defer m.recorder.Close() + } + ticker := newTicker(checkInterval) defer ticker.Stop() @@ -124,76 +179,69 @@ func (m *Manager) checkStats(now time.Time) ([]string, error) { var idlePeers []string - for peer, info := range m.interestedPeers { + for peer, history := range m.interestedPeers { stat, found := stats[peer] if !found { // when peer is in connecting state - info.log.Warnf("peer not found in wg stats") + history.log.Warnf("peer not found in wg stats") } - // First measurement: initialize - if info.lastIdleCheckAt.IsZero() { - info.lastIdleCheckAt = now - info.lastRxBytesAtLastIdleCheck = stat.RxBytes - info.log.Infof("initializing RxBytes: %v, %v", now, stat.RxBytes) + deltaRx := stat.RxBytes - history.lastRxBytes + if deltaRx < 0 { + deltaRx = 0 // reset to zero if negative + history.reset() + } + + m.recorder.ReceivedBytes(peer, now, deltaRx) + + history.lastRxBytes = stat.RxBytes + history.appendRxBytes(deltaRx) + + // not enough history to determine inactivity + if history.bytesHistory.Len() < m.historySize { + history.log.Tracef("not enough history to determine inactivity, current history size: %d, required: %d", history.bytesHistory.Len(), m.historySize) continue } - // check only every idleCheckDuration - if shouldSkipIdleCheck(now, info.lastIdleCheckAt) { - continue - } - - // sometimes we measure false inactivity, so we need to check if we have inactivity in a row - if isInactive(stat, info) { - info.inActivityInRow++ - } else { - info.inActivityInRow = 0 - } - - info.log.Infof("peer inactivity counter: %d", info.inActivityInRow) - if info.inActivityInRow >= inactivityThreshold { - info.log.Infof("peer is inactive for %d checks, marking as inactive", info.inActivityInRow) + history.log.Tracef("summarized Bytes: %d", history.summarizedBytes) + if history.summarizedBytes <= m.maxBytesPerPeriod { idlePeers = append(idlePeers, peer) - info.inActivityInRow = 0 + history.log.Tracef("peer is inactive, summarizedBytes: %d, maxBytesPerPeriod: %d, %v", history.summarizedBytes, m.maxBytesPerPeriod, history.historyString()) + } else { + history.log.Tracef("peer is active, summarizedBytes: %d, maxBytesPerPeriod: %d, %v", history.summarizedBytes, m.maxBytesPerPeriod, history.historyString()) } - info.lastIdleCheckAt = now - info.lastRxBytesAtLastIdleCheck = stat.RxBytes } return idlePeers, nil } -func isInactive(stat configurer.WGStats, info *peerInfo) bool { - rxSyncPrevPeriod := stat.RxBytes - info.lastRxBytesAtLastIdleCheck - - // when the peer reconnected we lose the rx bytes from between the reset and the last check. - // We will suppose the peer was active - if rxSyncPrevPeriod < 0 { - info.log.Debugf("rxBytes decreased, resetting last rxBytes at last idle check") - return false +func validateInactivityThreshold(configuredThreshold *time.Duration) (time.Duration, error) { + if configuredThreshold == nil { + return DefaultInactivityThreshold, nil } - - switch rxSyncPrevPeriod { - case 0: - info.log.Debugf("peer inactive, received 0 bytes") - return true - case keepAliveBytes: - info.log.Debugf("peer inactive, only keep alive received, current RxBytes: %d", rxSyncPrevPeriod) - return true - case handshakeInitBytes + keepAliveBytes: - info.log.Debugf("peer inactive, only handshakeInitBytes + keepAliveBytes, current RxBytes: %d", rxSyncPrevPeriod) - return true - case handshakeRespBytes + keepAliveBytes: - info.log.Debugf("peer inactive, only handshakeRespBytes + keepAliveBytes, current RxBytes: %d", rxSyncPrevPeriod) - return true - default: - info.log.Infof("active, RxBytes: %d", rxSyncPrevPeriod) - return false + if *configuredThreshold < MinimumInactivityThreshold { + return 0, fmt.Errorf("configured inactivity threshold %v is too low, using %v", *configuredThreshold, MinimumInactivityThreshold) } + return *configuredThreshold, nil } -func shouldSkipIdleCheck(now, lastIdleCheckAt time.Time) bool { - minDuration := keepAliveCheckPeriod - (checkInterval / 2) - return now.Sub(lastIdleCheckAt) < minDuration +// calculateHistorySize calculates the number of history entries needed based on the inactivity threshold. +func calculateHistorySize(inactivityThreshold time.Duration) int { + return int(math.Ceil(inactivityThreshold.Minutes() / checkInterval.Minutes())) +} + +func calculateExpectedMaxBytes(duration time.Duration) int64 { + // Calculate number of keep-alive packets expected + keepAliveCount := int64(duration.Seconds() / keepAliveInterval.Seconds()) + keepAliveBytes := keepAliveCount * keepAliveBytes + + // Calculate potential handshake packets (conservative estimate) + handshakeCount := int64(duration.Minutes() / handshakeMaxInterval.Minutes()) + if handshakeCount == 0 && duration >= handshakeMaxInterval { + handshakeCount = 1 + } + handshakeBytes := handshakeCount * (handshakeInitBytes + keepAliveBytes) // handshake + extra bytes + + // todo: fine tune this value, add some overhead for unexpected lag + return keepAliveBytes + handshakeBytes } diff --git a/client/internal/lazyconn/inactivity/manager_test.go b/client/internal/lazyconn/inactivity/manager_test.go index 6aeb0b226..e19b6ef39 100644 --- a/client/internal/lazyconn/inactivity/manager_test.go +++ b/client/internal/lazyconn/inactivity/manager_test.go @@ -25,7 +25,7 @@ func TestNewManager(t *testing.T) { t.Run(fmt.Sprintf("Scenario %d", i), func(t *testing.T) { mock := newMockWgInterface("peer1", sc.Data, timer) - manager := NewManager(mock) + manager := NewManager(mock, nil) peerCfg := &lazyconn.PeerConfig{ PublicKey: "peer1", Log: log.WithField("peer", "peer1"), diff --git a/client/internal/lazyconn/inactivity/recorder.go b/client/internal/lazyconn/inactivity/recorder.go new file mode 100644 index 000000000..d4c049f83 --- /dev/null +++ b/client/internal/lazyconn/inactivity/recorder.go @@ -0,0 +1,58 @@ +package inactivity + +import ( + "fmt" + "os" + "sync" + "time" + + log "github.com/sirupsen/logrus" +) + +type Recorder struct { + mu sync.Mutex + file *os.File + filename string +} + +func NewRecorder() *Recorder { + timestamp := time.Now().Format("20060102_150405") + filename := fmt.Sprintf("inactivity_log_%s.txt", timestamp) + file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + log.Errorf("error opening file: %v", err) + } + + return &Recorder{ + file: file, + filename: filename, + } +} + +func (r *Recorder) ReceivedBytes(peer string, now time.Time, bytes int64) { + if r == nil { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + entry := fmt.Sprintf("%s; %s; %d\n", now.Format(time.RFC3339), peer, bytes) + _, err := r.file.WriteString(entry) + if err != nil { + log.Errorf("error writing to file: %v", err) + } +} + +func (r *Recorder) Close() { + if r == nil { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + if err := r.file.Close(); err != nil { + log.Errorf("error closing file: %v", err) + } +} diff --git a/client/internal/lazyconn/inactivity/scenarios_test.go b/client/internal/lazyconn/inactivity/scenarios_test.go index b22a96663..b8c7257e3 100644 --- a/client/internal/lazyconn/inactivity/scenarios_test.go +++ b/client/internal/lazyconn/inactivity/scenarios_test.go @@ -52,52 +52,60 @@ var scenarios = []scenario{ { ExpectedInactive: true, Data: []rxHistory{ + //96 {when: 0 * time.Second, RxBytes: 32}, {when: 25 * time.Second, RxBytes: 32}, {when: 50 * time.Second, RxBytes: 32}, + + //212 {when: 75 * time.Second, RxBytes: 32}, {when: 100 * time.Second, RxBytes: 32}, {when: 100 * time.Second, RxBytes: 148}, + + //96 {when: 125 * time.Second, RxBytes: 32}, {when: 150 * time.Second, RxBytes: 32}, {when: 175 * time.Second, RxBytes: 32}, + + //212 {when: 200 * time.Second, RxBytes: 32}, {when: 225 * time.Second, RxBytes: 32}, {when: 225 * time.Second, RxBytes: 148}, + + //96 {when: 250 * time.Second, RxBytes: 32}, {when: 275 * time.Second, RxBytes: 32}, {when: 300 * time.Second, RxBytes: 32}, + {when: 325 * time.Second, RxBytes: 32}, {when: 350 * time.Second, RxBytes: 32}, {when: 350 * time.Second, RxBytes: 148}, + {when: 375 * time.Second, RxBytes: 32}, {when: 400 * time.Second, RxBytes: 32}, + {when: 425 * time.Second, RxBytes: 32}, {when: 450 * time.Second, RxBytes: 32}, {when: 475 * time.Second, RxBytes: 32}, {when: 475 * time.Second, RxBytes: 148}, + {when: 500 * time.Second, RxBytes: 32}, {when: 525 * time.Second, RxBytes: 32}, + {when: 550 * time.Second, RxBytes: 32}, {when: 575 * time.Second, RxBytes: 32}, {when: 600 * time.Second, RxBytes: 32}, {when: 600 * time.Second, RxBytes: 148}, + {when: 625 * time.Second, RxBytes: 32}, {when: 650 * time.Second, RxBytes: 32}, + {when: 675 * time.Second, RxBytes: 32}, {when: 700 * time.Second, RxBytes: 32}, + {when: 725 * time.Second, RxBytes: 32}, {when: 725 * time.Second, RxBytes: 148}, {when: 750 * time.Second, RxBytes: 32}, }, }, - { - ExpectedInactive: false, - Data: []rxHistory{ - {when: 0 * time.Second, RxBytes: 32}, - {when: 25 * time.Second, RxBytes: 32}, - {when: 50 * time.Second, RxBytes: 100}, - {when: 75 * time.Second, RxBytes: 32}, - }, - }, } diff --git a/client/internal/lazyconn/manager/manager.go b/client/internal/lazyconn/manager/manager.go index 338b9865f..09885f7ff 100644 --- a/client/internal/lazyconn/manager/manager.go +++ b/client/internal/lazyconn/manager/manager.go @@ -67,6 +67,7 @@ type Manager struct { // engineCtx is the context for creating peer Connection func NewManager(config Config, engineCtx context.Context, peerStore *peerstore.Store, wgIface lazyconn.WGIface, connStateDispatcher *dispatcher.ConnectionDispatcher) *Manager { log.Infof("setup lazy connection service") + m := &Manager{ engineCtx: engineCtx, peerStore: peerStore, @@ -76,19 +77,11 @@ func NewManager(config Config, engineCtx context.Context, peerStore *peerstore.S managedPeersByConnID: make(map[peerid.ConnID]*managedPeer), excludes: make(map[string]lazyconn.PeerConfig), activityManager: activity.NewManager(wgIface), - inactivityManager: inactivity.NewManager(wgIface), + inactivityManager: inactivity.NewManager(wgIface, config.InactivityThreshold), peerToHAGroups: make(map[string][]route.HAUniqueID), haGroupToPeers: make(map[route.HAUniqueID][]string), } - if config.InactivityThreshold != nil { - if *config.InactivityThreshold >= inactivity.MinimumInactivityThreshold { - m.inactivityThreshold = *config.InactivityThreshold - } else { - log.Warnf("inactivity threshold is too low, using %v", m.inactivityThreshold) - } - } - m.connStateListener = &dispatcher.ConnectionListener{ OnConnected: m.onPeerConnected, OnDisconnected: m.onPeerDisconnected, @@ -620,5 +613,5 @@ func (m *Manager) onPeerDisconnected(peerConnID peerid.ConnID) { } // todo reset inactivity monitor - mp.peerCfg.Log.Infof("--- peer disconnected, stopping inactivity monitor?") + mp.peerCfg.Log.Warnf("--- peer disconnected, stopping inactivity monitor?") }