From fbb1b55beb95d3106e7a279931a69bac34f0af1f Mon Sep 17 00:00:00 2001 From: Zoltan Papp Date: Fri, 4 Jul 2025 19:52:27 +0200 Subject: [PATCH] [client] refactor lazy detection (#4050) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR introduces a new inactivity package responsible for monitoring peer activity and notifying when peers become inactive. Introduces a new Signal message type to close the peer connection after the idle timeout is reached. Periodically checks the last activity of registered peers via a Bind interface. Notifies via a channel when peers exceed a configurable inactivity threshold. Default settings DefaultInactivityThreshold is set to 15 minutes, with a minimum allowed threshold of 1 minute. Limitations This inactivity check does not support kernel WireGuard integration. In kernel–user space communication, the user space side will always be responsible for closing the connection. --- client/iface/bind/activity.go | 94 +++++ client/iface/bind/activity_test.go | 27 ++ client/iface/bind/ice_bind.go | 57 ++- client/iface/configurer/kernel_unix.go | 4 + client/iface/configurer/usp.go | 36 +- client/iface/device/device_android.go | 2 +- client/iface/device/device_darwin.go | 2 +- client/iface/device/device_ios.go | 2 +- client/iface/device/device_netstack.go | 2 +- client/iface/device/device_usp_unix.go | 2 +- client/iface/device/device_windows.go | 2 +- client/iface/device/interface.go | 1 + client/iface/iface.go | 8 + client/internal/conn_mgr.go | 66 ++-- client/internal/engine.go | 32 +- client/internal/engine_test.go | 15 +- client/internal/iface_common.go | 1 + client/internal/lazyconn/activity/listener.go | 4 +- client/internal/lazyconn/activity/manager.go | 13 +- .../lazyconn/inactivity/inactivity.go | 75 ---- .../lazyconn/inactivity/inactivity_test.go | 156 --------- .../internal/lazyconn/inactivity/manager.go | 152 ++++++++ .../lazyconn/inactivity/manager_test.go | 113 ++++++ client/internal/lazyconn/inactivity/ticker.go | 24 ++ client/internal/lazyconn/manager/manager.go | 326 ++++++++---------- client/internal/lazyconn/wgiface.go | 2 + client/internal/peer/conn.go | 49 ++- client/internal/peer/signaler.go | 10 + client/internal/peerstore/store.go | 13 +- monotime/time.go | 29 ++ monotime/time_test.go | 20 ++ signal/proto/signalexchange.pb.go | 58 ++-- signal/proto/signalexchange.proto | 3 +- 33 files changed, 857 insertions(+), 543 deletions(-) create mode 100644 client/iface/bind/activity.go create mode 100644 client/iface/bind/activity_test.go delete mode 100644 client/internal/lazyconn/inactivity/inactivity.go delete mode 100644 client/internal/lazyconn/inactivity/inactivity_test.go create mode 100644 client/internal/lazyconn/inactivity/manager.go create mode 100644 client/internal/lazyconn/inactivity/manager_test.go create mode 100644 client/internal/lazyconn/inactivity/ticker.go create mode 100644 monotime/time.go create mode 100644 monotime/time_test.go diff --git a/client/iface/bind/activity.go b/client/iface/bind/activity.go new file mode 100644 index 000000000..d3b406bcd --- /dev/null +++ b/client/iface/bind/activity.go @@ -0,0 +1,94 @@ +package bind + +import ( + "net/netip" + "sync" + "sync/atomic" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/netbirdio/netbird/monotime" +) + +const ( + saveFrequency = int64(5 * time.Second) +) + +type PeerRecord struct { + Address netip.AddrPort + LastActivity atomic.Int64 // UnixNano timestamp +} + +type ActivityRecorder struct { + mu sync.RWMutex + peers map[string]*PeerRecord // publicKey to PeerRecord map + addrToPeer map[netip.AddrPort]*PeerRecord // address to PeerRecord map +} + +func NewActivityRecorder() *ActivityRecorder { + return &ActivityRecorder{ + peers: make(map[string]*PeerRecord), + addrToPeer: make(map[netip.AddrPort]*PeerRecord), + } +} + +// GetLastActivities returns a snapshot of peer last activity +func (r *ActivityRecorder) GetLastActivities() map[string]time.Time { + r.mu.RLock() + defer r.mu.RUnlock() + + activities := make(map[string]time.Time, len(r.peers)) + for key, record := range r.peers { + unixNano := record.LastActivity.Load() + activities[key] = time.Unix(0, unixNano) + } + return activities +} + +// UpsertAddress adds or updates the address for a publicKey +func (r *ActivityRecorder) UpsertAddress(publicKey string, address netip.AddrPort) { + r.mu.Lock() + defer r.mu.Unlock() + + if pr, exists := r.peers[publicKey]; exists { + delete(r.addrToPeer, pr.Address) + pr.Address = address + } else { + record := &PeerRecord{ + Address: address, + } + record.LastActivity.Store(monotime.Now()) + r.peers[publicKey] = record + } + + r.addrToPeer[address] = r.peers[publicKey] +} + +func (r *ActivityRecorder) Remove(publicKey string) { + r.mu.Lock() + defer r.mu.Unlock() + if record, exists := r.peers[publicKey]; exists { + delete(r.addrToPeer, record.Address) + delete(r.peers, publicKey) + } +} + +// record updates LastActivity for the given address using atomic store +func (r *ActivityRecorder) record(address netip.AddrPort) { + r.mu.RLock() + record, ok := r.addrToPeer[address] + r.mu.RUnlock() + if !ok { + log.Warnf("could not find record for address %s", address) + return + } + + now := monotime.Now() + last := record.LastActivity.Load() + if now-last < saveFrequency { + return + } + + _ = record.LastActivity.CompareAndSwap(last, now) +} diff --git a/client/iface/bind/activity_test.go b/client/iface/bind/activity_test.go new file mode 100644 index 000000000..598607b95 --- /dev/null +++ b/client/iface/bind/activity_test.go @@ -0,0 +1,27 @@ +package bind + +import ( + "net/netip" + "testing" + "time" +) + +func TestActivityRecorder_GetLastActivities(t *testing.T) { + peer := "peer1" + ar := NewActivityRecorder() + ar.UpsertAddress("peer1", netip.MustParseAddrPort("192.168.0.5:51820")) + activities := ar.GetLastActivities() + + p, ok := activities[peer] + if !ok { + t.Fatalf("Expected activity for peer %s, but got none", peer) + } + + if p.IsZero() { + t.Fatalf("Expected activity for peer %s, but got zero", peer) + } + + if p.Before(time.Now().Add(-2 * time.Minute)) { + t.Fatalf("Expected activity for peer %s to be recent, but got %v", peer, p) + } +} diff --git a/client/iface/bind/ice_bind.go b/client/iface/bind/ice_bind.go index 66ec6a00d..bb7a27279 100644 --- a/client/iface/bind/ice_bind.go +++ b/client/iface/bind/ice_bind.go @@ -1,6 +1,7 @@ package bind import ( + "encoding/binary" "fmt" "net" "net/netip" @@ -51,22 +52,24 @@ type ICEBind struct { closedChanMu sync.RWMutex // protect the closeChan recreation from reading from it. closed bool - muUDPMux sync.Mutex - udpMux *UniversalUDPMuxDefault - address wgaddr.Address + muUDPMux sync.Mutex + udpMux *UniversalUDPMuxDefault + address wgaddr.Address + activityRecorder *ActivityRecorder } func NewICEBind(transportNet transport.Net, filterFn FilterFn, address wgaddr.Address) *ICEBind { b, _ := wgConn.NewStdNetBind().(*wgConn.StdNetBind) ib := &ICEBind{ - StdNetBind: b, - RecvChan: make(chan RecvMessage, 1), - transportNet: transportNet, - filterFn: filterFn, - endpoints: make(map[netip.Addr]net.Conn), - closedChan: make(chan struct{}), - closed: true, - address: address, + StdNetBind: b, + RecvChan: make(chan RecvMessage, 1), + transportNet: transportNet, + filterFn: filterFn, + endpoints: make(map[netip.Addr]net.Conn), + closedChan: make(chan struct{}), + closed: true, + address: address, + activityRecorder: NewActivityRecorder(), } rc := receiverCreator{ @@ -100,6 +103,10 @@ func (s *ICEBind) Close() error { return s.StdNetBind.Close() } +func (s *ICEBind) ActivityRecorder() *ActivityRecorder { + return s.activityRecorder +} + // GetICEMux returns the ICE UDPMux that was created and used by ICEBind func (s *ICEBind) GetICEMux() (*UniversalUDPMuxDefault, error) { s.muUDPMux.Lock() @@ -199,6 +206,11 @@ func (s *ICEBind) createIPv4ReceiverFn(pc *ipv4.PacketConn, conn *net.UDPConn, r continue } addrPort := msg.Addr.(*net.UDPAddr).AddrPort() + + if isTransportPkg(msg.Buffers, msg.N) { + s.activityRecorder.record(addrPort) + } + ep := &wgConn.StdNetEndpoint{AddrPort: addrPort} // TODO: remove allocation wgConn.GetSrcFromControl(msg.OOB[:msg.NN], ep) eps[i] = ep @@ -257,6 +269,13 @@ func (c *ICEBind) receiveRelayed(buffs [][]byte, sizes []int, eps []wgConn.Endpo copy(buffs[0], msg.Buffer) sizes[0] = len(msg.Buffer) eps[0] = wgConn.Endpoint(msg.Endpoint) + + if isTransportPkg(buffs, sizes[0]) { + if ep, ok := eps[0].(*Endpoint); ok { + c.activityRecorder.record(ep.AddrPort) + } + } + return 1, nil } } @@ -272,3 +291,19 @@ func putMessages(msgs *[]ipv6.Message, msgsPool *sync.Pool) { } msgsPool.Put(msgs) } + +func isTransportPkg(buffers [][]byte, n int) bool { + // The first buffer should contain at least 4 bytes for type + if len(buffers[0]) < 4 { + return true + } + + // WireGuard packet type is a little-endian uint32 at start + packetType := binary.LittleEndian.Uint32(buffers[0][:4]) + + // Check if packetType matches known WireGuard message types + if packetType == 4 && n > 32 { + return true + } + return false +} diff --git a/client/iface/configurer/kernel_unix.go b/client/iface/configurer/kernel_unix.go index 4922a54fc..e2ea19144 100644 --- a/client/iface/configurer/kernel_unix.go +++ b/client/iface/configurer/kernel_unix.go @@ -276,3 +276,7 @@ func (c *KernelConfigurer) GetStats() (map[string]WGStats, error) { } return stats, nil } + +func (c *KernelConfigurer) LastActivities() map[string]time.Time { + return nil +} diff --git a/client/iface/configurer/usp.go b/client/iface/configurer/usp.go index 79ce91eea..6ead716f1 100644 --- a/client/iface/configurer/usp.go +++ b/client/iface/configurer/usp.go @@ -16,6 +16,7 @@ import ( "golang.zx2c4.com/wireguard/device" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + "github.com/netbirdio/netbird/client/iface/bind" nbnet "github.com/netbirdio/netbird/util/net" ) @@ -36,16 +37,18 @@ const ( var ErrAllowedIPNotFound = fmt.Errorf("allowed IP not found") type WGUSPConfigurer struct { - device *device.Device - deviceName string + device *device.Device + deviceName string + activityRecorder *bind.ActivityRecorder uapiListener net.Listener } -func NewUSPConfigurer(device *device.Device, deviceName string) *WGUSPConfigurer { +func NewUSPConfigurer(device *device.Device, deviceName string, activityRecorder *bind.ActivityRecorder) *WGUSPConfigurer { wgCfg := &WGUSPConfigurer{ - device: device, - deviceName: deviceName, + device: device, + deviceName: deviceName, + activityRecorder: activityRecorder, } wgCfg.startUAPI() return wgCfg @@ -87,7 +90,19 @@ func (c *WGUSPConfigurer) UpdatePeer(peerKey string, allowedIps []netip.Prefix, Peers: []wgtypes.PeerConfig{peer}, } - return c.device.IpcSet(toWgUserspaceString(config)) + if ipcErr := c.device.IpcSet(toWgUserspaceString(config)); ipcErr != nil { + return ipcErr + } + + if endpoint != nil { + addr, err := netip.ParseAddr(endpoint.IP.String()) + if err != nil { + return fmt.Errorf("failed to parse endpoint address: %w", err) + } + addrPort := netip.AddrPortFrom(addr, uint16(endpoint.Port)) + c.activityRecorder.UpsertAddress(peerKey, addrPort) + } + return nil } func (c *WGUSPConfigurer) RemovePeer(peerKey string) error { @@ -104,7 +119,10 @@ func (c *WGUSPConfigurer) RemovePeer(peerKey string) error { config := wgtypes.Config{ Peers: []wgtypes.PeerConfig{peer}, } - return c.device.IpcSet(toWgUserspaceString(config)) + ipcErr := c.device.IpcSet(toWgUserspaceString(config)) + + c.activityRecorder.Remove(peerKey) + return ipcErr } func (c *WGUSPConfigurer) AddAllowedIP(peerKey string, allowedIP netip.Prefix) error { @@ -205,6 +223,10 @@ func (c *WGUSPConfigurer) FullStats() (*Stats, error) { return parseStatus(c.deviceName, ipcStr) } +func (c *WGUSPConfigurer) LastActivities() map[string]time.Time { + return c.activityRecorder.GetLastActivities() +} + // startUAPI starts the UAPI listener for managing the WireGuard interface via external tool func (t *WGUSPConfigurer) startUAPI() { var err error diff --git a/client/iface/device/device_android.go b/client/iface/device/device_android.go index ae9e29bd1..4fe6e466b 100644 --- a/client/iface/device/device_android.go +++ b/client/iface/device/device_android.go @@ -79,7 +79,7 @@ func (t *WGTunDevice) Create(routes []string, dns string, searchDomains []string // this helps with support for the older NetBird clients that had a hardcoded direct mode // t.device.DisableSomeRoamingForBrokenMobileSemantics() - t.configurer = configurer.NewUSPConfigurer(t.device, t.name) + t.configurer = configurer.NewUSPConfigurer(t.device, t.name, t.iceBind.ActivityRecorder()) err = t.configurer.ConfigureInterface(t.key, t.port) if err != nil { t.device.Close() diff --git a/client/iface/device/device_darwin.go b/client/iface/device/device_darwin.go index 01bfbf381..81de0e360 100644 --- a/client/iface/device/device_darwin.go +++ b/client/iface/device/device_darwin.go @@ -61,7 +61,7 @@ func (t *TunDevice) Create() (WGConfigurer, error) { return nil, fmt.Errorf("error assigning ip: %s", err) } - t.configurer = configurer.NewUSPConfigurer(t.device, t.name) + t.configurer = configurer.NewUSPConfigurer(t.device, t.name, t.iceBind.ActivityRecorder()) err = t.configurer.ConfigureInterface(t.key, t.port) if err != nil { t.device.Close() diff --git a/client/iface/device/device_ios.go b/client/iface/device/device_ios.go index 56d44d68e..4613762c3 100644 --- a/client/iface/device/device_ios.go +++ b/client/iface/device/device_ios.go @@ -71,7 +71,7 @@ func (t *TunDevice) Create() (WGConfigurer, error) { // this helps with support for the older NetBird clients that had a hardcoded direct mode // t.device.DisableSomeRoamingForBrokenMobileSemantics() - t.configurer = configurer.NewUSPConfigurer(t.device, t.name) + t.configurer = configurer.NewUSPConfigurer(t.device, t.name, t.iceBind.ActivityRecorder()) err = t.configurer.ConfigureInterface(t.key, t.port) if err != nil { t.device.Close() diff --git a/client/iface/device/device_netstack.go b/client/iface/device/device_netstack.go index d2f2c87a1..fc3cb0215 100644 --- a/client/iface/device/device_netstack.go +++ b/client/iface/device/device_netstack.go @@ -72,7 +72,7 @@ func (t *TunNetstackDevice) Create() (WGConfigurer, error) { device.NewLogger(wgLogLevel(), "[netbird] "), ) - t.configurer = configurer.NewUSPConfigurer(t.device, t.name) + t.configurer = configurer.NewUSPConfigurer(t.device, t.name, t.iceBind.ActivityRecorder()) err = t.configurer.ConfigureInterface(t.key, t.port) if err != nil { _ = tunIface.Close() diff --git a/client/iface/device/device_usp_unix.go b/client/iface/device/device_usp_unix.go index c45ae9676..e781f6004 100644 --- a/client/iface/device/device_usp_unix.go +++ b/client/iface/device/device_usp_unix.go @@ -64,7 +64,7 @@ func (t *USPDevice) Create() (WGConfigurer, error) { return nil, fmt.Errorf("error assigning ip: %s", err) } - t.configurer = configurer.NewUSPConfigurer(t.device, t.name) + t.configurer = configurer.NewUSPConfigurer(t.device, t.name, t.iceBind.ActivityRecorder()) err = t.configurer.ConfigureInterface(t.key, t.port) if err != nil { t.device.Close() diff --git a/client/iface/device/device_windows.go b/client/iface/device/device_windows.go index 41e615bc2..0316c4b8d 100644 --- a/client/iface/device/device_windows.go +++ b/client/iface/device/device_windows.go @@ -94,7 +94,7 @@ func (t *TunDevice) Create() (WGConfigurer, error) { return nil, fmt.Errorf("error assigning ip: %s", err) } - t.configurer = configurer.NewUSPConfigurer(t.device, t.name) + t.configurer = configurer.NewUSPConfigurer(t.device, t.name, t.iceBind.ActivityRecorder()) err = t.configurer.ConfigureInterface(t.key, t.port) if err != nil { t.device.Close() diff --git a/client/iface/device/interface.go b/client/iface/device/interface.go index 296eb7dda..d68e6bf90 100644 --- a/client/iface/device/interface.go +++ b/client/iface/device/interface.go @@ -19,4 +19,5 @@ type WGConfigurer interface { Close() GetStats() (map[string]configurer.WGStats, error) FullStats() (*configurer.Stats, error) + LastActivities() map[string]time.Time } diff --git a/client/iface/iface.go b/client/iface/iface.go index 006dfe4e7..1b9055e6c 100644 --- a/client/iface/iface.go +++ b/client/iface/iface.go @@ -217,6 +217,14 @@ func (w *WGIface) GetStats() (map[string]configurer.WGStats, error) { return w.configurer.GetStats() } +func (w *WGIface) LastActivities() map[string]time.Time { + w.mu.Lock() + defer w.mu.Unlock() + + return w.configurer.LastActivities() + +} + func (w *WGIface) FullStats() (*configurer.Stats, error) { return w.configurer.FullStats() } diff --git a/client/internal/conn_mgr.go b/client/internal/conn_mgr.go index c630d3052..c76b0a99f 100644 --- a/client/internal/conn_mgr.go +++ b/client/internal/conn_mgr.go @@ -12,7 +12,6 @@ import ( "github.com/netbirdio/netbird/client/internal/lazyconn" "github.com/netbirdio/netbird/client/internal/lazyconn/manager" "github.com/netbirdio/netbird/client/internal/peer" - "github.com/netbirdio/netbird/client/internal/peer/dispatcher" "github.com/netbirdio/netbird/client/internal/peerstore" "github.com/netbirdio/netbird/route" ) @@ -26,11 +25,11 @@ import ( // // The implementation is not thread-safe; it is protected by engine.syncMsgMux. type ConnMgr struct { - peerStore *peerstore.Store - statusRecorder *peer.Status - iface lazyconn.WGIface - dispatcher *dispatcher.ConnectionDispatcher - enabledLocally bool + peerStore *peerstore.Store + statusRecorder *peer.Status + iface lazyconn.WGIface + enabledLocally bool + rosenpassEnabled bool lazyConnMgr *manager.Manager @@ -39,12 +38,12 @@ type ConnMgr struct { lazyCtxCancel context.CancelFunc } -func NewConnMgr(engineConfig *EngineConfig, statusRecorder *peer.Status, peerStore *peerstore.Store, iface lazyconn.WGIface, dispatcher *dispatcher.ConnectionDispatcher) *ConnMgr { +func NewConnMgr(engineConfig *EngineConfig, statusRecorder *peer.Status, peerStore *peerstore.Store, iface lazyconn.WGIface) *ConnMgr { e := &ConnMgr{ - peerStore: peerStore, - statusRecorder: statusRecorder, - iface: iface, - dispatcher: dispatcher, + peerStore: peerStore, + statusRecorder: statusRecorder, + iface: iface, + rosenpassEnabled: engineConfig.RosenpassEnabled, } if engineConfig.LazyConnectionEnabled || lazyconn.IsLazyConnEnabledByEnv() { e.enabledLocally = true @@ -64,6 +63,11 @@ func (e *ConnMgr) Start(ctx context.Context) { return } + if e.rosenpassEnabled { + log.Warnf("rosenpass connection manager is enabled, lazy connection manager will not be started") + return + } + e.initLazyManager(ctx) e.statusRecorder.UpdateLazyConnection(true) } @@ -83,7 +87,12 @@ func (e *ConnMgr) UpdatedRemoteFeatureFlag(ctx context.Context, enabled bool) er return nil } - log.Infof("lazy connection manager is enabled by management feature flag") + if e.rosenpassEnabled { + log.Infof("rosenpass connection manager is enabled, lazy connection manager will not be started") + return nil + } + + log.Warnf("lazy connection manager is enabled by management feature flag") e.initLazyManager(ctx) e.statusRecorder.UpdateLazyConnection(true) return e.addPeersToLazyConnManager() @@ -133,7 +142,7 @@ func (e *ConnMgr) SetExcludeList(ctx context.Context, peerIDs map[string]bool) { excludedPeers = append(excludedPeers, lazyPeerCfg) } - added := e.lazyConnMgr.ExcludePeer(e.lazyCtx, excludedPeers) + added := e.lazyConnMgr.ExcludePeer(excludedPeers) for _, peerID := range added { var peerConn *peer.Conn var exists bool @@ -175,7 +184,7 @@ func (e *ConnMgr) AddPeerConn(ctx context.Context, peerKey string, conn *peer.Co PeerConnID: conn.ConnID(), Log: conn.Log, } - excluded, err := e.lazyConnMgr.AddPeer(e.lazyCtx, lazyPeerCfg) + excluded, err := e.lazyConnMgr.AddPeer(lazyPeerCfg) if err != nil { conn.Log.Errorf("failed to add peer to lazyconn manager: %v", err) if err := conn.Open(ctx); err != nil { @@ -201,7 +210,7 @@ func (e *ConnMgr) RemovePeerConn(peerKey string) { if !ok { return } - defer conn.Close() + defer conn.Close(false) if !e.isStartedWithLazyMgr() { return @@ -211,23 +220,28 @@ func (e *ConnMgr) RemovePeerConn(peerKey string) { conn.Log.Infof("removed peer from lazy conn manager") } -func (e *ConnMgr) OnSignalMsg(ctx context.Context, peerKey string) (*peer.Conn, bool) { - conn, ok := e.peerStore.PeerConn(peerKey) - if !ok { - return nil, false - } - +func (e *ConnMgr) ActivatePeer(ctx context.Context, conn *peer.Conn) { if !e.isStartedWithLazyMgr() { - return conn, true + return } - if found := e.lazyConnMgr.ActivatePeer(e.lazyCtx, peerKey); found { + if found := e.lazyConnMgr.ActivatePeer(conn.GetKey()); found { conn.Log.Infof("activated peer from inactive state") if err := conn.Open(ctx); err != nil { conn.Log.Errorf("failed to open connection: %v", err) } } - return conn, true +} + +// DeactivatePeer deactivates a peer connection in the lazy connection manager. +// If locally the lazy connection is disabled, we force the peer connection open. +func (e *ConnMgr) DeactivatePeer(conn *peer.Conn) { + if !e.isStartedWithLazyMgr() { + return + } + + conn.Log.Infof("closing peer connection: remote peer initiated inactive, idle lazy state and sent GOAWAY") + e.lazyConnMgr.DeactivatePeer(conn.ConnID()) } func (e *ConnMgr) Close() { @@ -244,7 +258,7 @@ func (e *ConnMgr) initLazyManager(engineCtx context.Context) { cfg := manager.Config{ InactivityThreshold: inactivityThresholdEnv(), } - e.lazyConnMgr = manager.NewManager(cfg, engineCtx, e.peerStore, e.iface, e.dispatcher) + e.lazyConnMgr = manager.NewManager(cfg, engineCtx, e.peerStore, e.iface) e.lazyCtx, e.lazyCtxCancel = context.WithCancel(engineCtx) @@ -275,7 +289,7 @@ func (e *ConnMgr) addPeersToLazyConnManager() error { lazyPeerCfgs = append(lazyPeerCfgs, lazyPeerCfg) } - return e.lazyConnMgr.AddActivePeers(e.lazyCtx, lazyPeerCfgs) + return e.lazyConnMgr.AddActivePeers(lazyPeerCfgs) } func (e *ConnMgr) closeManager(ctx context.Context) { diff --git a/client/internal/engine.go b/client/internal/engine.go index 74d84569a..e9772b359 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -38,7 +38,6 @@ import ( nftypes "github.com/netbirdio/netbird/client/internal/netflow/types" "github.com/netbirdio/netbird/client/internal/networkmonitor" "github.com/netbirdio/netbird/client/internal/peer" - "github.com/netbirdio/netbird/client/internal/peer/dispatcher" "github.com/netbirdio/netbird/client/internal/peer/guard" icemaker "github.com/netbirdio/netbird/client/internal/peer/ice" "github.com/netbirdio/netbird/client/internal/peerstore" @@ -175,8 +174,7 @@ type Engine struct { sshServerFunc func(hostKeyPEM []byte, addr string) (nbssh.Server, error) sshServer nbssh.Server - statusRecorder *peer.Status - peerConnDispatcher *dispatcher.ConnectionDispatcher + statusRecorder *peer.Status firewall firewallManager.Manager routeManager routemanager.Manager @@ -458,9 +456,7 @@ func (e *Engine) Start() error { NATExternalIPs: e.parseNATExternalIPMappings(), } - e.peerConnDispatcher = dispatcher.NewConnectionDispatcher() - - e.connMgr = NewConnMgr(e.config, e.statusRecorder, e.peerStore, wgIface, e.peerConnDispatcher) + e.connMgr = NewConnMgr(e.config, e.statusRecorder, e.peerStore, wgIface) e.connMgr.Start(e.ctx) e.srWatcher = guard.NewSRWatcher(e.signal, e.relayManager, e.mobileDep.IFaceDiscover, iceCfg) @@ -1261,7 +1257,7 @@ func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error { } if exists := e.connMgr.AddPeerConn(e.ctx, peerKey, conn); exists { - conn.Close() + conn.Close(false) return fmt.Errorf("peer already exists: %s", peerKey) } @@ -1308,13 +1304,12 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs []netip.Prefix, agentV } serviceDependencies := peer.ServiceDependencies{ - StatusRecorder: e.statusRecorder, - Signaler: e.signaler, - IFaceDiscover: e.mobileDep.IFaceDiscover, - RelayManager: e.relayManager, - SrWatcher: e.srWatcher, - Semaphore: e.connSemaphore, - PeerConnDispatcher: e.peerConnDispatcher, + StatusRecorder: e.statusRecorder, + Signaler: e.signaler, + IFaceDiscover: e.mobileDep.IFaceDiscover, + RelayManager: e.relayManager, + SrWatcher: e.srWatcher, + Semaphore: e.connSemaphore, } peerConn, err := peer.NewConn(config, serviceDependencies) if err != nil { @@ -1337,11 +1332,16 @@ func (e *Engine) receiveSignalEvents() { e.syncMsgMux.Lock() defer e.syncMsgMux.Unlock() - conn, ok := e.connMgr.OnSignalMsg(e.ctx, msg.Key) + conn, ok := e.peerStore.PeerConn(msg.Key) if !ok { return fmt.Errorf("wrongly addressed message %s", msg.Key) } + msgType := msg.GetBody().GetType() + if msgType != sProto.Body_GO_IDLE { + e.connMgr.ActivatePeer(e.ctx, conn) + } + switch msg.GetBody().Type { case sProto.Body_OFFER: remoteCred, err := signal.UnMarshalCredential(msg) @@ -1398,6 +1398,8 @@ func (e *Engine) receiveSignalEvents() { go conn.OnRemoteCandidate(candidate, e.routeManager.GetClientRoutes()) case sProto.Body_MODE: + case sProto.Body_GO_IDLE: + e.connMgr.DeactivatePeer(conn) } return nil diff --git a/client/internal/engine_test.go b/client/internal/engine_test.go index d9c9881da..f4ed8f1c0 100644 --- a/client/internal/engine_test.go +++ b/client/internal/engine_test.go @@ -36,7 +36,6 @@ import ( "github.com/netbirdio/netbird/client/iface/wgproxy" "github.com/netbirdio/netbird/client/internal/dns" "github.com/netbirdio/netbird/client/internal/peer" - "github.com/netbirdio/netbird/client/internal/peer/dispatcher" "github.com/netbirdio/netbird/client/internal/peer/guard" icemaker "github.com/netbirdio/netbird/client/internal/peer/ice" "github.com/netbirdio/netbird/client/internal/routemanager" @@ -97,6 +96,7 @@ type MockWGIface struct { GetInterfaceGUIDStringFunc func() (string, error) GetProxyFunc func() wgproxy.Proxy GetNetFunc func() *netstack.Net + LastActivitiesFunc func() map[string]time.Time } func (m *MockWGIface) FullStats() (*configurer.Stats, error) { @@ -187,6 +187,13 @@ func (m *MockWGIface) GetNet() *netstack.Net { return m.GetNetFunc() } +func (m *MockWGIface) LastActivities() map[string]time.Time { + if m.LastActivitiesFunc != nil { + return m.LastActivitiesFunc() + } + return nil +} + func TestMain(m *testing.M) { _ = util.InitLog("debug", "console") code := m.Run() @@ -404,7 +411,7 @@ func TestEngine_UpdateNetworkMap(t *testing.T) { engine.udpMux = bind.NewUniversalUDPMuxDefault(bind.UniversalUDPMuxParams{UDPConn: conn}) engine.ctx = ctx engine.srWatcher = guard.NewSRWatcher(nil, nil, nil, icemaker.Config{}) - engine.connMgr = NewConnMgr(engine.config, engine.statusRecorder, engine.peerStore, wgIface, dispatcher.NewConnectionDispatcher()) + engine.connMgr = NewConnMgr(engine.config, engine.statusRecorder, engine.peerStore, wgIface) engine.connMgr.Start(ctx) type testCase struct { @@ -793,7 +800,7 @@ func TestEngine_UpdateNetworkMapWithRoutes(t *testing.T) { engine.routeManager = mockRouteManager engine.dnsServer = &dns.MockServer{} - engine.connMgr = NewConnMgr(engine.config, engine.statusRecorder, engine.peerStore, engine.wgInterface, dispatcher.NewConnectionDispatcher()) + engine.connMgr = NewConnMgr(engine.config, engine.statusRecorder, engine.peerStore, engine.wgInterface) engine.connMgr.Start(ctx) defer func() { @@ -991,7 +998,7 @@ func TestEngine_UpdateNetworkMapWithDNSUpdate(t *testing.T) { } engine.dnsServer = mockDNSServer - engine.connMgr = NewConnMgr(engine.config, engine.statusRecorder, engine.peerStore, engine.wgInterface, dispatcher.NewConnectionDispatcher()) + engine.connMgr = NewConnMgr(engine.config, engine.statusRecorder, engine.peerStore, engine.wgInterface) engine.connMgr.Start(ctx) defer func() { diff --git a/client/internal/iface_common.go b/client/internal/iface_common.go index 999472411..38fb3561e 100644 --- a/client/internal/iface_common.go +++ b/client/internal/iface_common.go @@ -38,4 +38,5 @@ type wgIfaceBase interface { GetStats() (map[string]configurer.WGStats, error) GetNet() *netstack.Net FullStats() (*configurer.Stats, error) + LastActivities() map[string]time.Time } diff --git a/client/internal/lazyconn/activity/listener.go b/client/internal/lazyconn/activity/listener.go index 1ef48416a..81b5da17b 100644 --- a/client/internal/lazyconn/activity/listener.go +++ b/client/internal/lazyconn/activity/listener.go @@ -13,7 +13,7 @@ import ( // Listener it is not a thread safe implementation, do not call Close before ReadPackets. It will cause blocking type Listener struct { - wgIface lazyconn.WGIface + wgIface WgInterface peerCfg lazyconn.PeerConfig conn *net.UDPConn endpoint *net.UDPAddr @@ -22,7 +22,7 @@ type Listener struct { isClosed atomic.Bool // use to avoid error log when closing the listener } -func NewListener(wgIface lazyconn.WGIface, cfg lazyconn.PeerConfig) (*Listener, error) { +func NewListener(wgIface WgInterface, cfg lazyconn.PeerConfig) (*Listener, error) { d := &Listener{ wgIface: wgIface, peerCfg: cfg, diff --git a/client/internal/lazyconn/activity/manager.go b/client/internal/lazyconn/activity/manager.go index e18b96465..915fb9cb8 100644 --- a/client/internal/lazyconn/activity/manager.go +++ b/client/internal/lazyconn/activity/manager.go @@ -1,18 +1,27 @@ package activity import ( + "net" + "net/netip" "sync" + "time" log "github.com/sirupsen/logrus" + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "github.com/netbirdio/netbird/client/internal/lazyconn" peerid "github.com/netbirdio/netbird/client/internal/peer/id" ) +type WgInterface interface { + RemovePeer(peerKey string) error + UpdatePeer(peerKey string, allowedIps []netip.Prefix, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error +} + type Manager struct { OnActivityChan chan peerid.ConnID - wgIface lazyconn.WGIface + wgIface WgInterface peers map[peerid.ConnID]*Listener done chan struct{} @@ -20,7 +29,7 @@ type Manager struct { mu sync.Mutex } -func NewManager(wgIface lazyconn.WGIface) *Manager { +func NewManager(wgIface WgInterface) *Manager { m := &Manager{ OnActivityChan: make(chan peerid.ConnID, 1), wgIface: wgIface, diff --git a/client/internal/lazyconn/inactivity/inactivity.go b/client/internal/lazyconn/inactivity/inactivity.go deleted file mode 100644 index 9b7c8511b..000000000 --- a/client/internal/lazyconn/inactivity/inactivity.go +++ /dev/null @@ -1,75 +0,0 @@ -package inactivity - -import ( - "context" - "time" - - peer "github.com/netbirdio/netbird/client/internal/peer/id" -) - -const ( - DefaultInactivityThreshold = 60 * time.Minute // idle after 1 hour inactivity - MinimumInactivityThreshold = 3 * time.Minute -) - -type Monitor struct { - id peer.ConnID - timer *time.Timer - cancel context.CancelFunc - inactivityThreshold time.Duration -} - -func NewInactivityMonitor(peerID peer.ConnID, threshold time.Duration) *Monitor { - i := &Monitor{ - id: peerID, - timer: time.NewTimer(0), - inactivityThreshold: threshold, - } - i.timer.Stop() - return i -} - -func (i *Monitor) Start(ctx context.Context, timeoutChan chan peer.ConnID) { - i.timer.Reset(i.inactivityThreshold) - defer i.timer.Stop() - - ctx, i.cancel = context.WithCancel(ctx) - defer func() { - defer i.cancel() - select { - case <-i.timer.C: - default: - } - }() - - select { - case <-i.timer.C: - select { - case timeoutChan <- i.id: - case <-ctx.Done(): - return - } - case <-ctx.Done(): - return - } -} - -func (i *Monitor) Stop() { - if i.cancel == nil { - return - } - i.cancel() -} - -func (i *Monitor) PauseTimer() { - i.timer.Stop() -} - -func (i *Monitor) ResetTimer() { - i.timer.Reset(i.inactivityThreshold) -} - -func (i *Monitor) ResetMonitor(ctx context.Context, timeoutChan chan peer.ConnID) { - i.Stop() - go i.Start(ctx, timeoutChan) -} diff --git a/client/internal/lazyconn/inactivity/inactivity_test.go b/client/internal/lazyconn/inactivity/inactivity_test.go deleted file mode 100644 index 944512985..000000000 --- a/client/internal/lazyconn/inactivity/inactivity_test.go +++ /dev/null @@ -1,156 +0,0 @@ -package inactivity - -import ( - "context" - "testing" - "time" - - peerid "github.com/netbirdio/netbird/client/internal/peer/id" -) - -type MocPeer struct { -} - -func (m *MocPeer) ConnID() peerid.ConnID { - return peerid.ConnID(m) -} - -func TestInactivityMonitor(t *testing.T) { - tCtx, testTimeoutCancel := context.WithTimeout(context.Background(), time.Second*5) - defer testTimeoutCancel() - - p := &MocPeer{} - im := NewInactivityMonitor(p.ConnID(), time.Second*2) - - timeoutChan := make(chan peerid.ConnID) - - exitChan := make(chan struct{}) - - go func() { - defer close(exitChan) - im.Start(tCtx, timeoutChan) - }() - - select { - case <-timeoutChan: - case <-tCtx.Done(): - t.Fatal("timeout") - } - - select { - case <-exitChan: - case <-tCtx.Done(): - t.Fatal("timeout") - } -} - -func TestReuseInactivityMonitor(t *testing.T) { - p := &MocPeer{} - im := NewInactivityMonitor(p.ConnID(), time.Second*2) - - timeoutChan := make(chan peerid.ConnID) - - for i := 2; i > 0; i-- { - exitChan := make(chan struct{}) - - testTimeoutCtx, testTimeoutCancel := context.WithTimeout(context.Background(), time.Second*5) - - go func() { - defer close(exitChan) - im.Start(testTimeoutCtx, timeoutChan) - }() - - select { - case <-timeoutChan: - case <-testTimeoutCtx.Done(): - t.Fatal("timeout") - } - - select { - case <-exitChan: - case <-testTimeoutCtx.Done(): - t.Fatal("timeout") - } - testTimeoutCancel() - } -} - -func TestStopInactivityMonitor(t *testing.T) { - tCtx, testTimeoutCancel := context.WithTimeout(context.Background(), time.Second*5) - defer testTimeoutCancel() - - p := &MocPeer{} - im := NewInactivityMonitor(p.ConnID(), DefaultInactivityThreshold) - - timeoutChan := make(chan peerid.ConnID) - - exitChan := make(chan struct{}) - - go func() { - defer close(exitChan) - im.Start(tCtx, timeoutChan) - }() - - go func() { - time.Sleep(3 * time.Second) - im.Stop() - }() - - select { - case <-timeoutChan: - t.Fatal("unexpected timeout") - case <-exitChan: - case <-tCtx.Done(): - t.Fatal("timeout") - } -} - -func TestPauseInactivityMonitor(t *testing.T) { - tCtx, testTimeoutCancel := context.WithTimeout(context.Background(), time.Second*10) - defer testTimeoutCancel() - - p := &MocPeer{} - trashHold := time.Second * 3 - im := NewInactivityMonitor(p.ConnID(), trashHold) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - timeoutChan := make(chan peerid.ConnID) - - exitChan := make(chan struct{}) - - go func() { - defer close(exitChan) - im.Start(ctx, timeoutChan) - }() - - time.Sleep(1 * time.Second) // grant time to start the monitor - im.PauseTimer() - - // check to do not receive timeout - thresholdCtx, thresholdCancel := context.WithTimeout(context.Background(), trashHold+time.Second) - defer thresholdCancel() - select { - case <-exitChan: - t.Fatal("unexpected exit") - case <-timeoutChan: - t.Fatal("unexpected timeout") - case <-thresholdCtx.Done(): - // test ok - case <-tCtx.Done(): - t.Fatal("test timed out") - } - - // test reset timer - im.ResetTimer() - - select { - case <-tCtx.Done(): - t.Fatal("test timed out") - case <-exitChan: - t.Fatal("unexpected exit") - case <-timeoutChan: - // expected timeout - } -} diff --git a/client/internal/lazyconn/inactivity/manager.go b/client/internal/lazyconn/inactivity/manager.go new file mode 100644 index 000000000..854951729 --- /dev/null +++ b/client/internal/lazyconn/inactivity/manager.go @@ -0,0 +1,152 @@ +package inactivity + +import ( + "context" + "fmt" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/netbirdio/netbird/client/internal/lazyconn" +) + +const ( + checkInterval = 1 * time.Minute + + DefaultInactivityThreshold = 15 * time.Minute + MinimumInactivityThreshold = 1 * time.Minute +) + +type WgInterface interface { + LastActivities() map[string]time.Time +} + +type Manager struct { + inactivePeersChan chan map[string]struct{} + + iface WgInterface + interestedPeers map[string]*lazyconn.PeerConfig + inactivityThreshold time.Duration +} + +func NewManager(iface WgInterface, configuredThreshold *time.Duration) *Manager { + inactivityThreshold, err := validateInactivityThreshold(configuredThreshold) + if err != nil { + inactivityThreshold = DefaultInactivityThreshold + log.Warnf("invalid inactivity threshold configured: %v, using default: %v", err, DefaultInactivityThreshold) + } + + log.Infof("inactivity threshold configured: %v", inactivityThreshold) + return &Manager{ + inactivePeersChan: make(chan map[string]struct{}, 1), + iface: iface, + interestedPeers: make(map[string]*lazyconn.PeerConfig), + inactivityThreshold: inactivityThreshold, + } +} + +func (m *Manager) InactivePeersChan() chan map[string]struct{} { + if m == nil { + // return a nil channel that blocks forever + return nil + } + + return m.inactivePeersChan +} + +func (m *Manager) AddPeer(peerCfg *lazyconn.PeerConfig) { + if m == nil { + return + } + + if _, exists := m.interestedPeers[peerCfg.PublicKey]; exists { + return + } + + peerCfg.Log.Infof("adding peer to inactivity manager") + m.interestedPeers[peerCfg.PublicKey] = peerCfg +} + +func (m *Manager) RemovePeer(peer string) { + if m == nil { + return + } + + pi, ok := m.interestedPeers[peer] + if !ok { + return + } + + pi.Log.Debugf("remove peer from inactivity manager") + delete(m.interestedPeers, peer) +} + +func (m *Manager) Start(ctx context.Context) { + if m == nil { + return + } + + ticker := newTicker(checkInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C(): + idlePeers, err := m.checkStats() + if err != nil { + log.Errorf("error checking stats: %v", err) + return + } + + if len(idlePeers) == 0 { + continue + } + + m.notifyInactivePeers(ctx, idlePeers) + } + } +} + +func (m *Manager) notifyInactivePeers(ctx context.Context, inactivePeers map[string]struct{}) { + select { + case m.inactivePeersChan <- inactivePeers: + case <-ctx.Done(): + return + default: + return + } +} + +func (m *Manager) checkStats() (map[string]struct{}, error) { + lastActivities := m.iface.LastActivities() + + idlePeers := make(map[string]struct{}) + + for peerID, peerCfg := range m.interestedPeers { + lastActive, ok := lastActivities[peerID] + if !ok { + // when peer is in connecting state + peerCfg.Log.Warnf("peer not found in wg stats") + continue + } + + if time.Since(lastActive) > m.inactivityThreshold { + peerCfg.Log.Infof("peer is inactive since: %v", lastActive) + idlePeers[peerID] = struct{}{} + } + } + + return idlePeers, nil +} + +func validateInactivityThreshold(configuredThreshold *time.Duration) (time.Duration, error) { + if configuredThreshold == nil { + return DefaultInactivityThreshold, nil + } + if *configuredThreshold < MinimumInactivityThreshold { + return 0, fmt.Errorf("configured inactivity threshold %v is too low, using %v", *configuredThreshold, MinimumInactivityThreshold) + } + return *configuredThreshold, nil +} diff --git a/client/internal/lazyconn/inactivity/manager_test.go b/client/internal/lazyconn/inactivity/manager_test.go new file mode 100644 index 000000000..d012b41a2 --- /dev/null +++ b/client/internal/lazyconn/inactivity/manager_test.go @@ -0,0 +1,113 @@ +package inactivity + +import ( + "context" + "testing" + "time" + + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + + "github.com/netbirdio/netbird/client/internal/lazyconn" +) + +type mockWgInterface struct { + lastActivities map[string]time.Time +} + +func (m *mockWgInterface) LastActivities() map[string]time.Time { + return m.lastActivities +} + +func TestPeerTriggersInactivity(t *testing.T) { + peerID := "peer1" + + wgMock := &mockWgInterface{ + lastActivities: map[string]time.Time{ + peerID: time.Now().Add(-20 * time.Minute), + }, + } + + fakeTick := make(chan time.Time, 1) + newTicker = func(d time.Duration) Ticker { + return &fakeTickerMock{CChan: fakeTick} + } + + peerLog := log.WithField("peer", peerID) + peerCfg := &lazyconn.PeerConfig{ + PublicKey: peerID, + Log: peerLog, + } + + manager := NewManager(wgMock, nil) + manager.AddPeer(peerCfg) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start the manager in a goroutine + go manager.Start(ctx) + + // Send a tick to simulate time passage + fakeTick <- time.Now() + + // Check if peer appears on inactivePeersChan + select { + case inactivePeers := <-manager.inactivePeersChan: + assert.Contains(t, inactivePeers, peerID, "expected peer to be marked inactive") + case <-time.After(1 * time.Second): + t.Fatal("expected inactivity event, but none received") + } +} + +func TestPeerTriggersActivity(t *testing.T) { + peerID := "peer1" + + wgMock := &mockWgInterface{ + lastActivities: map[string]time.Time{ + peerID: time.Now().Add(-5 * time.Minute), + }, + } + + fakeTick := make(chan time.Time, 1) + newTicker = func(d time.Duration) Ticker { + return &fakeTickerMock{CChan: fakeTick} + } + + peerLog := log.WithField("peer", peerID) + peerCfg := &lazyconn.PeerConfig{ + PublicKey: peerID, + Log: peerLog, + } + + manager := NewManager(wgMock, nil) + manager.AddPeer(peerCfg) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start the manager in a goroutine + go manager.Start(ctx) + + // Send a tick to simulate time passage + fakeTick <- time.Now() + + // Check if peer appears on inactivePeersChan + select { + case <-manager.inactivePeersChan: + t.Fatal("expected inactive peer to be marked inactive") + case <-time.After(1 * time.Second): + // No inactivity event should be received + } +} + +// fakeTickerMock implements Ticker interface for testing +type fakeTickerMock struct { + CChan chan time.Time +} + +func (f *fakeTickerMock) C() <-chan time.Time { + return f.CChan +} + +func (f *fakeTickerMock) Stop() {} diff --git a/client/internal/lazyconn/inactivity/ticker.go b/client/internal/lazyconn/inactivity/ticker.go new file mode 100644 index 000000000..12b64bd5f --- /dev/null +++ b/client/internal/lazyconn/inactivity/ticker.go @@ -0,0 +1,24 @@ +package inactivity + +import "time" + +var newTicker = func(d time.Duration) Ticker { + return &realTicker{t: time.NewTicker(d)} +} + +type Ticker interface { + C() <-chan time.Time + Stop() +} + +type realTicker struct { + t *time.Ticker +} + +func (r *realTicker) C() <-chan time.Time { + return r.t.C +} + +func (r *realTicker) Stop() { + r.t.Stop() +} diff --git a/client/internal/lazyconn/manager/manager.go b/client/internal/lazyconn/manager/manager.go index 74ede50a7..b45b39221 100644 --- a/client/internal/lazyconn/manager/manager.go +++ b/client/internal/lazyconn/manager/manager.go @@ -52,51 +52,39 @@ type Manager struct { excludes map[string]lazyconn.PeerConfig managedPeersMu sync.Mutex - activityManager *activity.Manager - inactivityMonitors map[peerid.ConnID]*inactivity.Monitor + activityManager *activity.Manager + inactivityManager *inactivity.Manager // Route HA group management + // If any peer in the same HA group is active, all peers in that group should prevent going idle peerToHAGroups map[string][]route.HAUniqueID // peer ID -> HA groups they belong to haGroupToPeers map[route.HAUniqueID][]string // HA group -> peer IDs in the group routesMu sync.RWMutex - - onInactive chan peerid.ConnID } // NewManager creates a new lazy connection manager // engineCtx is the context for creating peer Connection -func NewManager(config Config, engineCtx context.Context, peerStore *peerstore.Store, wgIface lazyconn.WGIface, connStateDispatcher *dispatcher.ConnectionDispatcher) *Manager { +func NewManager(config Config, engineCtx context.Context, peerStore *peerstore.Store, wgIface lazyconn.WGIface) *Manager { log.Infof("setup lazy connection service") + m := &Manager{ engineCtx: engineCtx, peerStore: peerStore, - connStateDispatcher: connStateDispatcher, inactivityThreshold: inactivity.DefaultInactivityThreshold, managedPeers: make(map[string]*lazyconn.PeerConfig), managedPeersByConnID: make(map[peerid.ConnID]*managedPeer), excludes: make(map[string]lazyconn.PeerConfig), activityManager: activity.NewManager(wgIface), - inactivityMonitors: make(map[peerid.ConnID]*inactivity.Monitor), peerToHAGroups: make(map[string][]route.HAUniqueID), haGroupToPeers: make(map[route.HAUniqueID][]string), - onInactive: make(chan peerid.ConnID), } - if config.InactivityThreshold != nil { - if *config.InactivityThreshold >= inactivity.MinimumInactivityThreshold { - m.inactivityThreshold = *config.InactivityThreshold - } else { - log.Warnf("inactivity threshold is too low, using %v", m.inactivityThreshold) - } + if wgIface.IsUserspaceBind() { + m.inactivityManager = inactivity.NewManager(wgIface, config.InactivityThreshold) + } else { + log.Warnf("inactivity manager not supported for kernel mode, wait for remote peer to close the connection") } - m.connStateListener = &dispatcher.ConnectionListener{ - OnConnected: m.onPeerConnected, - OnDisconnected: m.onPeerDisconnected, - } - - connStateDispatcher.AddListener(m.connStateListener) - return m } @@ -131,24 +119,28 @@ func (m *Manager) UpdateRouteHAMap(haMap route.HAMap) { } } - log.Debugf("updated route HA mappings: %d HA groups, %d peers with routes", - len(m.haGroupToPeers), len(m.peerToHAGroups)) + log.Debugf("updated route HA mappings: %d HA groups, %d peers with routes", len(m.haGroupToPeers), len(m.peerToHAGroups)) } // Start starts the manager and listens for peer activity and inactivity events func (m *Manager) Start(ctx context.Context) { defer m.close() + if m.inactivityManager != nil { + go m.inactivityManager.Start(ctx) + } + for { select { case <-ctx.Done(): return case peerConnID := <-m.activityManager.OnActivityChan: - m.onPeerActivity(ctx, peerConnID) - case peerConnID := <-m.onInactive: - m.onPeerInactivityTimedOut(ctx, peerConnID) + m.onPeerActivity(peerConnID) + case peerIDs := <-m.inactivityManager.InactivePeersChan(): + m.onPeerInactivityTimedOut(peerIDs) } } + } // ExcludePeer marks peers for a permanent connection @@ -156,7 +148,7 @@ func (m *Manager) Start(ctx context.Context) { // Adds them back to the managed list and start the inactivity listener if they are removed from the exclude list. In // this case, we suppose that the connection status is connected or connecting. // If the peer is not exists yet in the managed list then the responsibility is the upper layer to call the AddPeer function -func (m *Manager) ExcludePeer(ctx context.Context, peerConfigs []lazyconn.PeerConfig) []string { +func (m *Manager) ExcludePeer(peerConfigs []lazyconn.PeerConfig) []string { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() @@ -187,7 +179,7 @@ func (m *Manager) ExcludePeer(ctx context.Context, peerConfigs []lazyconn.PeerCo peerCfg.Log.Infof("peer removed from lazy connection exclude list") - if err := m.addActivePeer(ctx, peerCfg); err != nil { + if err := m.addActivePeer(&peerCfg); err != nil { log.Errorf("failed to add peer to lazy connection manager: %s", err) continue } @@ -197,7 +189,7 @@ func (m *Manager) ExcludePeer(ctx context.Context, peerConfigs []lazyconn.PeerCo return added } -func (m *Manager) AddPeer(ctx context.Context, peerCfg lazyconn.PeerConfig) (bool, error) { +func (m *Manager) AddPeer(peerCfg lazyconn.PeerConfig) (bool, error) { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() @@ -217,9 +209,6 @@ func (m *Manager) AddPeer(ctx context.Context, peerCfg lazyconn.PeerConfig) (boo return false, err } - im := inactivity.NewInactivityMonitor(peerCfg.PeerConnID, m.inactivityThreshold) - m.inactivityMonitors[peerCfg.PeerConnID] = im - m.managedPeers[peerCfg.PublicKey] = &peerCfg m.managedPeersByConnID[peerCfg.PeerConnID] = &managedPeer{ peerCfg: &peerCfg, @@ -229,7 +218,7 @@ func (m *Manager) AddPeer(ctx context.Context, peerCfg lazyconn.PeerConfig) (boo // Check if this peer should be activated because its HA group peers are active if group, ok := m.shouldActivateNewPeer(peerCfg.PublicKey); ok { peerCfg.Log.Debugf("peer belongs to active HA group %s, will activate immediately", group) - m.activateNewPeerInActiveGroup(ctx, peerCfg) + m.activateNewPeerInActiveGroup(peerCfg) } return false, nil @@ -237,7 +226,7 @@ func (m *Manager) AddPeer(ctx context.Context, peerCfg lazyconn.PeerConfig) (boo // AddActivePeers adds a list of peers to the lazy connection manager // suppose these peers was in connected or in connecting states -func (m *Manager) AddActivePeers(ctx context.Context, peerCfg []lazyconn.PeerConfig) error { +func (m *Manager) AddActivePeers(peerCfg []lazyconn.PeerConfig) error { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() @@ -247,7 +236,7 @@ func (m *Manager) AddActivePeers(ctx context.Context, peerCfg []lazyconn.PeerCon continue } - if err := m.addActivePeer(ctx, cfg); err != nil { + if err := m.addActivePeer(&cfg); err != nil { cfg.Log.Errorf("failed to add peer to lazy connection manager: %v", err) return err } @@ -264,7 +253,7 @@ func (m *Manager) RemovePeer(peerID string) { // ActivatePeer activates a peer connection when a signal message is received // Also activates all peers in the same HA groups as this peer -func (m *Manager) ActivatePeer(ctx context.Context, peerID string) (found bool) { +func (m *Manager) ActivatePeer(peerID string) (found bool) { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() cfg, mp := m.getPeerForActivation(peerID) @@ -272,15 +261,42 @@ func (m *Manager) ActivatePeer(ctx context.Context, peerID string) (found bool) return false } - if !m.activateSinglePeer(ctx, cfg, mp) { + if !m.activateSinglePeer(cfg, mp) { return false } - m.activateHAGroupPeers(ctx, peerID) + m.activateHAGroupPeers(cfg) return true } +func (m *Manager) DeactivatePeer(peerID peerid.ConnID) { + m.managedPeersMu.Lock() + defer m.managedPeersMu.Unlock() + + mp, ok := m.managedPeersByConnID[peerID] + if !ok { + return + } + + if mp.expectedWatcher != watcherInactivity { + return + } + + m.peerStore.PeerConnClose(mp.peerCfg.PublicKey) + + mp.peerCfg.Log.Infof("start activity monitor") + + mp.expectedWatcher = watcherActivity + + m.inactivityManager.RemovePeer(mp.peerCfg.PublicKey) + + if err := m.activityManager.MonitorPeerActivity(*mp.peerCfg); err != nil { + mp.peerCfg.Log.Errorf("failed to create activity monitor: %v", err) + return + } +} + // getPeerForActivation checks if a peer can be activated and returns the necessary structs // Returns nil values if the peer should be skipped func (m *Manager) getPeerForActivation(peerID string) (*lazyconn.PeerConfig, *managedPeer) { @@ -302,41 +318,36 @@ func (m *Manager) getPeerForActivation(peerID string) (*lazyconn.PeerConfig, *ma return cfg, mp } -// activateSinglePeer activates a single peer (internal method) -func (m *Manager) activateSinglePeer(ctx context.Context, cfg *lazyconn.PeerConfig, mp *managedPeer) bool { - mp.expectedWatcher = watcherInactivity - - m.activityManager.RemovePeer(cfg.Log, cfg.PeerConnID) - - im, ok := m.inactivityMonitors[cfg.PeerConnID] - if !ok { - cfg.Log.Errorf("inactivity monitor not found for peer") +// activateSinglePeer activates a single peer +// return true if the peer was activated, false if it was already active +func (m *Manager) activateSinglePeer(cfg *lazyconn.PeerConfig, mp *managedPeer) bool { + if mp.expectedWatcher == watcherInactivity { return false } - cfg.Log.Infof("starting inactivity monitor") - go im.Start(ctx, m.onInactive) - + mp.expectedWatcher = watcherInactivity + m.activityManager.RemovePeer(cfg.Log, cfg.PeerConnID) + m.inactivityManager.AddPeer(cfg) return true } // activateHAGroupPeers activates all peers in HA groups that the given peer belongs to -func (m *Manager) activateHAGroupPeers(ctx context.Context, triggerPeerID string) { +func (m *Manager) activateHAGroupPeers(triggeredPeerCfg *lazyconn.PeerConfig) { var peersToActivate []string m.routesMu.RLock() - haGroups := m.peerToHAGroups[triggerPeerID] + haGroups := m.peerToHAGroups[triggeredPeerCfg.PublicKey] if len(haGroups) == 0 { m.routesMu.RUnlock() - log.Debugf("peer %s is not part of any HA groups", triggerPeerID) + triggeredPeerCfg.Log.Debugf("peer is not part of any HA groups") return } for _, haGroup := range haGroups { peers := m.haGroupToPeers[haGroup] for _, peerID := range peers { - if peerID != triggerPeerID { + if peerID != triggeredPeerCfg.PublicKey { peersToActivate = append(peersToActivate, peerID) } } @@ -350,16 +361,16 @@ func (m *Manager) activateHAGroupPeers(ctx context.Context, triggerPeerID string continue } - if m.activateSinglePeer(ctx, cfg, mp) { + if m.activateSinglePeer(cfg, mp) { activatedCount++ - cfg.Log.Infof("activated peer as part of HA group (triggered by %s)", triggerPeerID) + cfg.Log.Infof("activated peer as part of HA group (triggered by %s)", triggeredPeerCfg.PublicKey) m.peerStore.PeerConnOpen(m.engineCtx, cfg.PublicKey) } } if activatedCount > 0 { log.Infof("activated %d additional peers in HA groups for peer %s (groups: %v)", - activatedCount, triggerPeerID, haGroups) + activatedCount, triggeredPeerCfg.PublicKey, haGroups) } } @@ -394,13 +405,13 @@ func (m *Manager) shouldActivateNewPeer(peerID string) (route.HAUniqueID, bool) } // activateNewPeerInActiveGroup activates a newly added peer that should be active due to HA group -func (m *Manager) activateNewPeerInActiveGroup(ctx context.Context, peerCfg lazyconn.PeerConfig) { +func (m *Manager) activateNewPeerInActiveGroup(peerCfg lazyconn.PeerConfig) { mp, ok := m.managedPeersByConnID[peerCfg.PeerConnID] if !ok { return } - if !m.activateSinglePeer(ctx, &peerCfg, mp) { + if !m.activateSinglePeer(&peerCfg, mp) { return } @@ -408,23 +419,19 @@ func (m *Manager) activateNewPeerInActiveGroup(ctx context.Context, peerCfg lazy m.peerStore.PeerConnOpen(m.engineCtx, peerCfg.PublicKey) } -func (m *Manager) addActivePeer(ctx context.Context, peerCfg lazyconn.PeerConfig) error { +func (m *Manager) addActivePeer(peerCfg *lazyconn.PeerConfig) error { if _, ok := m.managedPeers[peerCfg.PublicKey]; ok { peerCfg.Log.Warnf("peer already managed") return nil } - im := inactivity.NewInactivityMonitor(peerCfg.PeerConnID, m.inactivityThreshold) - m.inactivityMonitors[peerCfg.PeerConnID] = im - - m.managedPeers[peerCfg.PublicKey] = &peerCfg + m.managedPeers[peerCfg.PublicKey] = peerCfg m.managedPeersByConnID[peerCfg.PeerConnID] = &managedPeer{ - peerCfg: &peerCfg, + peerCfg: peerCfg, expectedWatcher: watcherInactivity, } - peerCfg.Log.Infof("starting inactivity monitor on peer that has been removed from exclude list") - go im.Start(ctx, m.onInactive) + m.inactivityManager.AddPeer(peerCfg) return nil } @@ -436,12 +443,7 @@ func (m *Manager) removePeer(peerID string) { cfg.Log.Infof("removing lazy peer") - if im, ok := m.inactivityMonitors[cfg.PeerConnID]; ok { - im.Stop() - delete(m.inactivityMonitors, cfg.PeerConnID) - cfg.Log.Debugf("inactivity monitor stopped") - } - + m.inactivityManager.RemovePeer(cfg.PublicKey) m.activityManager.RemovePeer(cfg.Log, cfg.PeerConnID) delete(m.managedPeers, peerID) delete(m.managedPeersByConnID, cfg.PeerConnID) @@ -453,10 +455,7 @@ func (m *Manager) close() { m.connStateDispatcher.RemoveListener(m.connStateListener) m.activityManager.Close() - for _, iw := range m.inactivityMonitors { - iw.Stop() - } - m.inactivityMonitors = make(map[peerid.ConnID]*inactivity.Monitor) + m.managedPeers = make(map[string]*lazyconn.PeerConfig) m.managedPeersByConnID = make(map[peerid.ConnID]*managedPeer) @@ -470,7 +469,7 @@ func (m *Manager) close() { } // shouldDeferIdleForHA checks if peer should stay connected due to HA group requirements -func (m *Manager) shouldDeferIdleForHA(peerID string) bool { +func (m *Manager) shouldDeferIdleForHA(inactivePeers map[string]struct{}, peerID string) bool { m.routesMu.RLock() defer m.routesMu.RUnlock() @@ -480,38 +479,45 @@ func (m *Manager) shouldDeferIdleForHA(peerID string) bool { } for _, haGroup := range haGroups { - groupPeers := m.haGroupToPeers[haGroup] - - for _, groupPeerID := range groupPeers { - if groupPeerID == peerID { - continue - } - - cfg, ok := m.managedPeers[groupPeerID] - if !ok { - continue - } - - groupMp, ok := m.managedPeersByConnID[cfg.PeerConnID] - if !ok { - continue - } - - if groupMp.expectedWatcher != watcherInactivity { - continue - } - - // Other member is still connected, defer idle - if peer, ok := m.peerStore.PeerConn(groupPeerID); ok && peer.IsConnected() { - return true - } + if active := m.checkHaGroupActivity(haGroup, peerID, inactivePeers); active { + return true } } return false } -func (m *Manager) onPeerActivity(ctx context.Context, peerConnID peerid.ConnID) { +func (m *Manager) checkHaGroupActivity(haGroup route.HAUniqueID, peerID string, inactivePeers map[string]struct{}) bool { + groupPeers := m.haGroupToPeers[haGroup] + for _, groupPeerID := range groupPeers { + + if groupPeerID == peerID { + continue + } + + cfg, ok := m.managedPeers[groupPeerID] + if !ok { + continue + } + + groupMp, ok := m.managedPeersByConnID[cfg.PeerConnID] + if !ok { + continue + } + + if groupMp.expectedWatcher != watcherInactivity { + continue + } + + // If any peer in the group is active, do defer idle + if _, isInactive := inactivePeers[groupPeerID]; !isInactive { + return true + } + } + return false +} + +func (m *Manager) onPeerActivity(peerConnID peerid.ConnID) { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() @@ -528,100 +534,56 @@ func (m *Manager) onPeerActivity(ctx context.Context, peerConnID peerid.ConnID) mp.peerCfg.Log.Infof("detected peer activity") - if !m.activateSinglePeer(ctx, mp.peerCfg, mp) { + if !m.activateSinglePeer(mp.peerCfg, mp) { return } - m.activateHAGroupPeers(ctx, mp.peerCfg.PublicKey) + m.activateHAGroupPeers(mp.peerCfg) m.peerStore.PeerConnOpen(m.engineCtx, mp.peerCfg.PublicKey) } -func (m *Manager) onPeerInactivityTimedOut(ctx context.Context, peerConnID peerid.ConnID) { +func (m *Manager) onPeerInactivityTimedOut(peerIDs map[string]struct{}) { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() - mp, ok := m.managedPeersByConnID[peerConnID] - if !ok { - log.Errorf("peer not found by id: %v", peerConnID) - return - } - - if mp.expectedWatcher != watcherInactivity { - mp.peerCfg.Log.Warnf("ignore inactivity event") - return - } - - if m.shouldDeferIdleForHA(mp.peerCfg.PublicKey) { - iw, ok := m.inactivityMonitors[peerConnID] - if ok { - mp.peerCfg.Log.Debugf("resetting inactivity timer due to HA group requirements") - iw.ResetMonitor(ctx, m.onInactive) - } else { - mp.peerCfg.Log.Errorf("inactivity monitor not found for HA defer reset") + for peerID := range peerIDs { + peerCfg, ok := m.managedPeers[peerID] + if !ok { + log.Errorf("peer not found by peerId: %v", peerID) + continue } - return - } - mp.peerCfg.Log.Infof("connection timed out") + mp, ok := m.managedPeersByConnID[peerCfg.PeerConnID] + if !ok { + log.Errorf("peer not found by conn id: %v", peerCfg.PeerConnID) + continue + } - // this is blocking operation, potentially can be optimized - m.peerStore.PeerConnClose(mp.peerCfg.PublicKey) + if mp.expectedWatcher != watcherInactivity { + mp.peerCfg.Log.Warnf("ignore inactivity event") + continue + } - mp.peerCfg.Log.Infof("start activity monitor") + if m.shouldDeferIdleForHA(peerIDs, mp.peerCfg.PublicKey) { + mp.peerCfg.Log.Infof("defer inactivity due to active HA group peers") + continue + } - mp.expectedWatcher = watcherActivity + mp.peerCfg.Log.Infof("connection timed out") - // just in case free up - m.inactivityMonitors[peerConnID].PauseTimer() + // this is blocking operation, potentially can be optimized + m.peerStore.PeerConnIdle(mp.peerCfg.PublicKey) - if err := m.activityManager.MonitorPeerActivity(*mp.peerCfg); err != nil { - mp.peerCfg.Log.Errorf("failed to create activity monitor: %v", err) - return + mp.peerCfg.Log.Infof("start activity monitor") + + mp.expectedWatcher = watcherActivity + + m.inactivityManager.RemovePeer(mp.peerCfg.PublicKey) + + if err := m.activityManager.MonitorPeerActivity(*mp.peerCfg); err != nil { + mp.peerCfg.Log.Errorf("failed to create activity monitor: %v", err) + continue + } } } - -func (m *Manager) onPeerConnected(peerConnID peerid.ConnID) { - m.managedPeersMu.Lock() - defer m.managedPeersMu.Unlock() - - mp, ok := m.managedPeersByConnID[peerConnID] - if !ok { - return - } - - if mp.expectedWatcher != watcherInactivity { - return - } - - iw, ok := m.inactivityMonitors[mp.peerCfg.PeerConnID] - if !ok { - mp.peerCfg.Log.Warnf("inactivity monitor not found for peer") - return - } - - mp.peerCfg.Log.Infof("peer connected, pausing inactivity monitor while connection is not disconnected") - iw.PauseTimer() -} - -func (m *Manager) onPeerDisconnected(peerConnID peerid.ConnID) { - m.managedPeersMu.Lock() - defer m.managedPeersMu.Unlock() - - mp, ok := m.managedPeersByConnID[peerConnID] - if !ok { - return - } - - if mp.expectedWatcher != watcherInactivity { - return - } - - iw, ok := m.inactivityMonitors[mp.peerCfg.PeerConnID] - if !ok { - return - } - - mp.peerCfg.Log.Infof("reset inactivity monitor timer") - iw.ResetTimer() -} diff --git a/client/internal/lazyconn/wgiface.go b/client/internal/lazyconn/wgiface.go index 090a9319c..d55ff9670 100644 --- a/client/internal/lazyconn/wgiface.go +++ b/client/internal/lazyconn/wgiface.go @@ -11,4 +11,6 @@ import ( type WGIface interface { RemovePeer(peerKey string) error UpdatePeer(peerKey string, allowedIps []netip.Prefix, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error + IsUserspaceBind() bool + LastActivities() map[string]time.Time } diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index c3f44cc7f..1f0ec164e 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -117,10 +117,9 @@ type Conn struct { wgProxyRelay wgproxy.Proxy handshaker *Handshaker - guard *guard.Guard - semaphore *semaphoregroup.SemaphoreGroup - peerConnDispatcher *dispatcher.ConnectionDispatcher - wg sync.WaitGroup + guard *guard.Guard + semaphore *semaphoregroup.SemaphoreGroup + wg sync.WaitGroup // debug purpose dumpState *stateDump @@ -136,18 +135,17 @@ func NewConn(config ConnConfig, services ServiceDependencies) (*Conn, error) { connLog := log.WithField("peer", config.Key) var conn = &Conn{ - Log: connLog, - config: config, - statusRecorder: services.StatusRecorder, - signaler: services.Signaler, - iFaceDiscover: services.IFaceDiscover, - relayManager: services.RelayManager, - srWatcher: services.SrWatcher, - semaphore: services.Semaphore, - peerConnDispatcher: services.PeerConnDispatcher, - statusRelay: worker.NewAtomicStatus(), - statusICE: worker.NewAtomicStatus(), - dumpState: newStateDump(config.Key, connLog, services.StatusRecorder), + Log: connLog, + config: config, + statusRecorder: services.StatusRecorder, + signaler: services.Signaler, + iFaceDiscover: services.IFaceDiscover, + relayManager: services.RelayManager, + srWatcher: services.SrWatcher, + semaphore: services.Semaphore, + statusRelay: worker.NewAtomicStatus(), + statusICE: worker.NewAtomicStatus(), + dumpState: newStateDump(config.Key, connLog, services.StatusRecorder), } return conn, nil @@ -226,7 +224,7 @@ func (conn *Conn) Open(engineCtx context.Context) error { } // Close closes this peer Conn issuing a close event to the Conn closeCh -func (conn *Conn) Close() { +func (conn *Conn) Close(signalToRemote bool) { conn.mu.Lock() defer conn.wgWatcherWg.Wait() defer conn.mu.Unlock() @@ -236,6 +234,12 @@ func (conn *Conn) Close() { return } + if signalToRemote { + if err := conn.signaler.SignalIdle(conn.config.Key); err != nil { + conn.Log.Errorf("failed to signal idle state to peer: %v", err) + } + } + conn.Log.Infof("close peer connection") conn.ctxCancel() @@ -404,15 +408,10 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn } wgConfigWorkaround() - oldState := conn.currentConnPriority conn.currentConnPriority = priority conn.statusICE.SetConnected() conn.updateIceState(iceConnInfo) conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr) - - if oldState == conntype.None { - conn.peerConnDispatcher.NotifyConnected(conn.ConnID()) - } } func (conn *Conn) onICEStateDisconnected() { @@ -450,7 +449,6 @@ func (conn *Conn) onICEStateDisconnected() { } else { conn.Log.Infof("ICE disconnected, do not switch to Relay. Reset priority to: %s", conntype.None.String()) conn.currentConnPriority = conntype.None - conn.peerConnDispatcher.NotifyDisconnected(conn.ConnID()) } changed := conn.statusICE.Get() != worker.StatusDisconnected @@ -530,7 +528,6 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) { conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey) conn.Log.Infof("start to communicate with peer via relay") conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr) - conn.peerConnDispatcher.NotifyConnected(conn.ConnID()) } func (conn *Conn) onRelayDisconnected() { @@ -545,11 +542,7 @@ func (conn *Conn) onRelayDisconnected() { if conn.currentConnPriority == conntype.Relay { conn.Log.Debugf("clean up WireGuard config") - if err := conn.removeWgPeer(); err != nil { - conn.Log.Errorf("failed to remove wg endpoint: %v", err) - } conn.currentConnPriority = conntype.None - conn.peerConnDispatcher.NotifyDisconnected(conn.ConnID()) } if conn.wgProxyRelay != nil { diff --git a/client/internal/peer/signaler.go b/client/internal/peer/signaler.go index 713123e5d..9022e0299 100644 --- a/client/internal/peer/signaler.go +++ b/client/internal/peer/signaler.go @@ -68,3 +68,13 @@ func (s *Signaler) signalOfferAnswer(offerAnswer OfferAnswer, remoteKey string, return nil } + +func (s *Signaler) SignalIdle(remoteKey string) error { + return s.signal.Send(&sProto.Message{ + Key: s.wgPrivateKey.PublicKey().String(), + RemoteKey: remoteKey, + Body: &sProto.Body{ + Type: sProto.Body_GO_IDLE, + }, + }) +} diff --git a/client/internal/peerstore/store.go b/client/internal/peerstore/store.go index 81ac7a5b6..099fe4528 100644 --- a/client/internal/peerstore/store.go +++ b/client/internal/peerstore/store.go @@ -95,6 +95,17 @@ func (s *Store) PeerConnOpen(ctx context.Context, pubKey string) { } +func (s *Store) PeerConnIdle(pubKey string) { + s.peerConnsMu.RLock() + defer s.peerConnsMu.RUnlock() + + p, ok := s.peerConns[pubKey] + if !ok { + return + } + p.Close(true) +} + func (s *Store) PeerConnClose(pubKey string) { s.peerConnsMu.RLock() defer s.peerConnsMu.RUnlock() @@ -103,7 +114,7 @@ func (s *Store) PeerConnClose(pubKey string) { if !ok { return } - p.Close() + p.Close(false) } func (s *Store) PeersPubKey() []string { diff --git a/monotime/time.go b/monotime/time.go new file mode 100644 index 000000000..6032fb60b --- /dev/null +++ b/monotime/time.go @@ -0,0 +1,29 @@ +package monotime + +import ( + "time" +) + +var ( + baseWallTime time.Time + baseWallNano int64 +) + +func init() { + baseWallTime = time.Now() + baseWallNano = baseWallTime.UnixNano() +} + +// Now returns the current time as Unix nanoseconds (int64). +// It uses monotonic time measurement from the base time to ensure +// the returned value increases monotonically and is not affected +// by system clock adjustments. +// +// Performance optimization: By capturing the base wall time once at startup +// and using time.Since() for elapsed calculation, this avoids repeated +// time.Now() calls and leverages Go's internal monotonic clock for +// efficient duration measurement. +func Now() int64 { + elapsed := time.Since(baseWallTime) + return baseWallNano + int64(elapsed) +} diff --git a/monotime/time_test.go b/monotime/time_test.go new file mode 100644 index 000000000..ac837b226 --- /dev/null +++ b/monotime/time_test.go @@ -0,0 +1,20 @@ +package monotime + +import ( + "testing" + "time" +) + +func BenchmarkMonotimeNow(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + _ = Now() + } +} + +func BenchmarkTimeNow(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + _ = time.Now() + } +} diff --git a/signal/proto/signalexchange.pb.go b/signal/proto/signalexchange.pb.go index 30f704c6f..3d45dea69 100644 --- a/signal/proto/signalexchange.pb.go +++ b/signal/proto/signalexchange.pb.go @@ -29,6 +29,7 @@ const ( Body_ANSWER Body_Type = 1 Body_CANDIDATE Body_Type = 2 Body_MODE Body_Type = 4 + Body_GO_IDLE Body_Type = 5 ) // Enum value maps for Body_Type. @@ -38,12 +39,14 @@ var ( 1: "ANSWER", 2: "CANDIDATE", 4: "MODE", + 5: "GO_IDLE", } Body_Type_value = map[string]int32{ "OFFER": 0, "ANSWER": 1, "CANDIDATE": 2, "MODE": 4, + "GO_IDLE": 5, } ) @@ -225,7 +228,7 @@ type Body struct { FeaturesSupported []uint32 `protobuf:"varint,6,rep,packed,name=featuresSupported,proto3" json:"featuresSupported,omitempty"` // RosenpassConfig is a Rosenpass config of the remote peer our peer tries to connect to RosenpassConfig *RosenpassConfig `protobuf:"bytes,7,opt,name=rosenpassConfig,proto3" json:"rosenpassConfig,omitempty"` - // relayServerAddress is an IP:port of the relay server + // relayServerAddress is url of the relay server RelayServerAddress string `protobuf:"bytes,8,opt,name=relayServerAddress,proto3" json:"relayServerAddress,omitempty"` } @@ -440,7 +443,7 @@ var file_signalexchange_proto_rawDesc = []byte{ 0x52, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x28, 0x0a, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x64, 0x79, 0x52, - 0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0xa6, 0x03, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x2d, + 0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0xb3, 0x03, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x2d, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x64, 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, @@ -463,33 +466,34 @@ var file_signalexchange_proto_rawDesc = []byte{ 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x2e, 0x0a, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, - 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x22, 0x36, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x09, + 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x22, 0x43, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x09, 0x0a, 0x05, 0x4f, 0x46, 0x46, 0x45, 0x52, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x4e, 0x53, 0x57, 0x45, 0x52, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x41, 0x4e, 0x44, 0x49, 0x44, 0x41, - 0x54, 0x45, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x4f, 0x44, 0x45, 0x10, 0x04, 0x22, 0x2e, - 0x0a, 0x04, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x1b, 0x0a, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, - 0x88, 0x01, 0x01, 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x22, 0x6d, - 0x0a, 0x0f, 0x52, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, - 0x67, 0x12, 0x28, 0x0a, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75, - 0x62, 0x4b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, - 0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x30, 0x0a, 0x13, 0x72, - 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, - 0x64, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, - 0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x32, 0xb9, 0x01, - 0x0a, 0x0e, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, - 0x12, 0x4c, 0x0a, 0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, - 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, - 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, - 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, - 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x12, 0x59, - 0x0a, 0x0d, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, - 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, - 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, - 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x54, 0x45, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x4f, 0x44, 0x45, 0x10, 0x04, 0x12, 0x0b, + 0x0a, 0x07, 0x47, 0x4f, 0x5f, 0x49, 0x44, 0x4c, 0x45, 0x10, 0x05, 0x22, 0x2e, 0x0a, 0x04, 0x4d, + 0x6f, 0x64, 0x65, 0x12, 0x1b, 0x0a, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x88, 0x01, 0x01, + 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x22, 0x6d, 0x0a, 0x0f, 0x52, + 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x28, + 0x0a, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, + 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, + 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x30, 0x0a, 0x13, 0x72, 0x6f, 0x73, 0x65, + 0x6e, 0x70, 0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, + 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x32, 0xb9, 0x01, 0x0a, 0x0e, 0x53, + 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x4c, 0x0a, + 0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, + 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, + 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, + 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, + 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x12, 0x59, 0x0a, 0x0d, 0x43, + 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x20, 0x2e, 0x73, + 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, + 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, + 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, + 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/signal/proto/signalexchange.proto b/signal/proto/signalexchange.proto index 4431edd7c..b04d6ef28 100644 --- a/signal/proto/signalexchange.proto +++ b/signal/proto/signalexchange.proto @@ -47,6 +47,7 @@ message Body { ANSWER = 1; CANDIDATE = 2; MODE = 4; + GO_IDLE = 5; } Type type = 1; string payload = 2; @@ -74,4 +75,4 @@ message RosenpassConfig { bytes rosenpassPubKey = 1; // rosenpassServerAddr is an IP:port of the rosenpass service string rosenpassServerAddr = 2; -} \ No newline at end of file +}