mirror of
https://github.com/netbirdio/netbird.git
synced 2025-08-19 03:16:58 +02:00
[client] Feature/lazy connection (#3379)
With the lazy connection feature, the peer will connect to target peers on-demand. The trigger can be any IP traffic. This feature can be enabled with the NB_ENABLE_EXPERIMENTAL_LAZY_CONN environment variable. When the engine receives a network map, it binds a free UDP port for every remote peer, and the system configures WireGuard endpoints for these ports. When traffic appears on a UDP socket, the system removes this listener and starts the peer connection procedure immediately. Key changes Fix slow netbird status -d command Move from engine.go file to conn_mgr.go the peer connection related code Refactor the iface interface usage and moved interface file next to the engine code Add new command line flag and UI option to enable feature The peer.Conn struct is reusable after it has been closed. Change connection states Connection states Idle: The peer is not attempting to establish a connection. This typically means it's in a lazy state or the remote peer is expired. Connecting: The peer is actively trying to establish a connection. This occurs when the peer has entered an active state and is continuously attempting to reach the remote peer. Connected: A successful peer-to-peer connection has been established and communication is active.
This commit is contained in:
@@ -38,6 +38,7 @@ import (
|
||||
nftypes "github.com/netbirdio/netbird/client/internal/netflow/types"
|
||||
"github.com/netbirdio/netbird/client/internal/networkmonitor"
|
||||
"github.com/netbirdio/netbird/client/internal/peer"
|
||||
"github.com/netbirdio/netbird/client/internal/peer/dispatcher"
|
||||
"github.com/netbirdio/netbird/client/internal/peer/guard"
|
||||
icemaker "github.com/netbirdio/netbird/client/internal/peer/ice"
|
||||
"github.com/netbirdio/netbird/client/internal/peerstore"
|
||||
@@ -122,6 +123,8 @@ type EngineConfig struct {
|
||||
DisableFirewall bool
|
||||
|
||||
BlockLANAccess bool
|
||||
|
||||
LazyConnectionEnabled bool
|
||||
}
|
||||
|
||||
// Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers.
|
||||
@@ -134,6 +137,8 @@ type Engine struct {
|
||||
// peerConns is a map that holds all the peers that are known to this peer
|
||||
peerStore *peerstore.Store
|
||||
|
||||
connMgr *ConnMgr
|
||||
|
||||
beforePeerHook nbnet.AddHookFunc
|
||||
afterPeerHook nbnet.RemoveHookFunc
|
||||
|
||||
@@ -170,7 +175,8 @@ type Engine struct {
|
||||
sshServerFunc func(hostKeyPEM []byte, addr string) (nbssh.Server, error)
|
||||
sshServer nbssh.Server
|
||||
|
||||
statusRecorder *peer.Status
|
||||
statusRecorder *peer.Status
|
||||
peerConnDispatcher *dispatcher.ConnectionDispatcher
|
||||
|
||||
firewall firewallManager.Manager
|
||||
routeManager routemanager.Manager
|
||||
@@ -262,6 +268,10 @@ func (e *Engine) Stop() error {
|
||||
e.syncMsgMux.Lock()
|
||||
defer e.syncMsgMux.Unlock()
|
||||
|
||||
if e.connMgr != nil {
|
||||
e.connMgr.Close()
|
||||
}
|
||||
|
||||
// stopping network monitor first to avoid starting the engine again
|
||||
if e.networkMonitor != nil {
|
||||
e.networkMonitor.Stop()
|
||||
@@ -297,8 +307,7 @@ func (e *Engine) Stop() error {
|
||||
e.statusRecorder.UpdateDNSStates([]peer.NSGroupState{})
|
||||
e.statusRecorder.UpdateRelayStates([]relay.ProbeResult{})
|
||||
|
||||
err := e.removeAllPeers()
|
||||
if err != nil {
|
||||
if err := e.removeAllPeers(); err != nil {
|
||||
return fmt.Errorf("failed to remove all peers: %s", err)
|
||||
}
|
||||
|
||||
@@ -405,8 +414,7 @@ func (e *Engine) Start() error {
|
||||
|
||||
e.routeManager.SetRouteChangeListener(e.mobileDep.NetworkChangeListener)
|
||||
|
||||
err = e.wgInterfaceCreate()
|
||||
if err != nil {
|
||||
if err = e.wgInterfaceCreate(); err != nil {
|
||||
log.Errorf("failed creating tunnel interface %s: [%s]", e.config.WgIfaceName, err.Error())
|
||||
e.close()
|
||||
return fmt.Errorf("create wg interface: %w", err)
|
||||
@@ -442,6 +450,11 @@ func (e *Engine) Start() error {
|
||||
NATExternalIPs: e.parseNATExternalIPMappings(),
|
||||
}
|
||||
|
||||
e.peerConnDispatcher = dispatcher.NewConnectionDispatcher()
|
||||
|
||||
e.connMgr = NewConnMgr(e.config, e.statusRecorder, e.peerStore, wgIface, e.peerConnDispatcher)
|
||||
e.connMgr.Start(e.ctx)
|
||||
|
||||
e.srWatcher = guard.NewSRWatcher(e.signal, e.relayManager, e.mobileDep.IFaceDiscover, iceCfg)
|
||||
e.srWatcher.Start()
|
||||
|
||||
@@ -450,7 +463,6 @@ func (e *Engine) Start() error {
|
||||
|
||||
// starting network monitor at the very last to avoid disruptions
|
||||
e.startNetworkMonitor()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -550,6 +562,16 @@ func (e *Engine) modifyPeers(peersUpdate []*mgmProto.RemotePeerConfig) error {
|
||||
var modified []*mgmProto.RemotePeerConfig
|
||||
for _, p := range peersUpdate {
|
||||
peerPubKey := p.GetWgPubKey()
|
||||
currentPeer, ok := e.peerStore.PeerConn(peerPubKey)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if currentPeer.AgentVersionString() != p.AgentVersion {
|
||||
modified = append(modified, p)
|
||||
continue
|
||||
}
|
||||
|
||||
allowedIPs, ok := e.peerStore.AllowedIPs(peerPubKey)
|
||||
if !ok {
|
||||
continue
|
||||
@@ -559,8 +581,7 @@ func (e *Engine) modifyPeers(peersUpdate []*mgmProto.RemotePeerConfig) error {
|
||||
continue
|
||||
}
|
||||
|
||||
err := e.statusRecorder.UpdatePeerFQDN(peerPubKey, p.GetFqdn())
|
||||
if err != nil {
|
||||
if err := e.statusRecorder.UpdatePeerFQDN(peerPubKey, p.GetFqdn()); err != nil {
|
||||
log.Warnf("error updating peer's %s fqdn in the status recorder, got error: %v", peerPubKey, err)
|
||||
}
|
||||
}
|
||||
@@ -621,16 +642,11 @@ func (e *Engine) removePeer(peerKey string) error {
|
||||
e.sshServer.RemoveAuthorizedKey(peerKey)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
err := e.statusRecorder.RemovePeer(peerKey)
|
||||
if err != nil {
|
||||
log.Warnf("received error when removing peer %s from status recorder: %v", peerKey, err)
|
||||
}
|
||||
}()
|
||||
e.connMgr.RemovePeerConn(peerKey)
|
||||
|
||||
conn, exists := e.peerStore.Remove(peerKey)
|
||||
if exists {
|
||||
conn.Close()
|
||||
err := e.statusRecorder.RemovePeer(peerKey)
|
||||
if err != nil {
|
||||
log.Warnf("received error when removing peer %s from status recorder: %v", peerKey, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -952,6 +968,10 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := e.connMgr.UpdatedRemoteFeatureFlag(e.ctx, networkMap.GetPeerConfig().GetLazyConnectionEnabled()); err != nil {
|
||||
log.Errorf("failed to update lazy connection feature flag: %v", err)
|
||||
}
|
||||
|
||||
if e.firewall != nil {
|
||||
if localipfw, ok := e.firewall.(localIpUpdater); ok {
|
||||
if err := localipfw.UpdateLocalIPs(); err != nil {
|
||||
@@ -976,7 +996,8 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
e.updateDNSForwarder(dnsRouteFeatureFlag, fwdEntries)
|
||||
|
||||
// Ingress forward rules
|
||||
if err := e.updateForwardRules(networkMap.GetForwardingRules()); err != nil {
|
||||
forwardingRules, err := e.updateForwardRules(networkMap.GetForwardingRules())
|
||||
if err != nil {
|
||||
log.Errorf("failed to update forward rules, err: %v", err)
|
||||
}
|
||||
|
||||
@@ -1022,6 +1043,10 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
}
|
||||
}
|
||||
|
||||
// must set the exclude list after the peers are added. Without it the manager can not figure out the peers parameters from the store
|
||||
excludedLazyPeers := e.toExcludedLazyPeers(routes, forwardingRules, networkMap.GetRemotePeers())
|
||||
e.connMgr.SetExcludeList(excludedLazyPeers)
|
||||
|
||||
protoDNSConfig := networkMap.GetDNSConfig()
|
||||
if protoDNSConfig == nil {
|
||||
protoDNSConfig = &mgmProto.DNSConfig{}
|
||||
@@ -1155,7 +1180,7 @@ func (e *Engine) updateOfflinePeers(offlinePeers []*mgmProto.RemotePeerConfig) {
|
||||
IP: strings.Join(offlinePeer.GetAllowedIps(), ","),
|
||||
PubKey: offlinePeer.GetWgPubKey(),
|
||||
FQDN: offlinePeer.GetFqdn(),
|
||||
ConnStatus: peer.StatusDisconnected,
|
||||
ConnStatus: peer.StatusIdle,
|
||||
ConnStatusUpdate: time.Now(),
|
||||
Mux: new(sync.RWMutex),
|
||||
}
|
||||
@@ -1191,12 +1216,17 @@ func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error {
|
||||
peerIPs = append(peerIPs, allowedNetIP)
|
||||
}
|
||||
|
||||
conn, err := e.createPeerConn(peerKey, peerIPs)
|
||||
conn, err := e.createPeerConn(peerKey, peerIPs, peerConfig.AgentVersion)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create peer connection: %w", err)
|
||||
}
|
||||
|
||||
if ok := e.peerStore.AddPeerConn(peerKey, conn); !ok {
|
||||
err = e.statusRecorder.AddPeer(peerKey, peerConfig.Fqdn, peerIPs[0].Addr().String())
|
||||
if err != nil {
|
||||
log.Warnf("error adding peer %s to status recorder, got error: %v", peerKey, err)
|
||||
}
|
||||
|
||||
if exists := e.connMgr.AddPeerConn(e.ctx, peerKey, conn); exists {
|
||||
conn.Close()
|
||||
return fmt.Errorf("peer already exists: %s", peerKey)
|
||||
}
|
||||
@@ -1205,17 +1235,10 @@ func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error {
|
||||
conn.AddBeforeAddPeerHook(e.beforePeerHook)
|
||||
conn.AddAfterRemovePeerHook(e.afterPeerHook)
|
||||
}
|
||||
|
||||
err = e.statusRecorder.AddPeer(peerKey, peerConfig.Fqdn)
|
||||
if err != nil {
|
||||
log.Warnf("error adding peer %s to status recorder, got error: %v", peerKey, err)
|
||||
}
|
||||
|
||||
conn.Open()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Engine) createPeerConn(pubKey string, allowedIPs []netip.Prefix) (*peer.Conn, error) {
|
||||
func (e *Engine) createPeerConn(pubKey string, allowedIPs []netip.Prefix, agentVersion string) (*peer.Conn, error) {
|
||||
log.Debugf("creating peer connection %s", pubKey)
|
||||
|
||||
wgConfig := peer.WgConfig{
|
||||
@@ -1229,11 +1252,12 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs []netip.Prefix) (*peer
|
||||
// randomize connection timeout
|
||||
timeout := time.Duration(rand.Intn(PeerConnectionTimeoutMax-PeerConnectionTimeoutMin)+PeerConnectionTimeoutMin) * time.Millisecond
|
||||
config := peer.ConnConfig{
|
||||
Key: pubKey,
|
||||
LocalKey: e.config.WgPrivateKey.PublicKey().String(),
|
||||
Timeout: timeout,
|
||||
WgConfig: wgConfig,
|
||||
LocalWgPort: e.config.WgPort,
|
||||
Key: pubKey,
|
||||
LocalKey: e.config.WgPrivateKey.PublicKey().String(),
|
||||
AgentVersion: agentVersion,
|
||||
Timeout: timeout,
|
||||
WgConfig: wgConfig,
|
||||
LocalWgPort: e.config.WgPort,
|
||||
RosenpassConfig: peer.RosenpassConfig{
|
||||
PubKey: e.getRosenpassPubKey(),
|
||||
Addr: e.getRosenpassAddr(),
|
||||
@@ -1249,7 +1273,16 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs []netip.Prefix) (*peer
|
||||
},
|
||||
}
|
||||
|
||||
peerConn, err := peer.NewConn(e.ctx, config, e.statusRecorder, e.signaler, e.mobileDep.IFaceDiscover, e.relayManager, e.srWatcher, e.connSemaphore)
|
||||
serviceDependencies := peer.ServiceDependencies{
|
||||
StatusRecorder: e.statusRecorder,
|
||||
Signaler: e.signaler,
|
||||
IFaceDiscover: e.mobileDep.IFaceDiscover,
|
||||
RelayManager: e.relayManager,
|
||||
SrWatcher: e.srWatcher,
|
||||
Semaphore: e.connSemaphore,
|
||||
PeerConnDispatcher: e.peerConnDispatcher,
|
||||
}
|
||||
peerConn, err := peer.NewConn(config, serviceDependencies)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1270,7 +1303,7 @@ func (e *Engine) receiveSignalEvents() {
|
||||
e.syncMsgMux.Lock()
|
||||
defer e.syncMsgMux.Unlock()
|
||||
|
||||
conn, ok := e.peerStore.PeerConn(msg.Key)
|
||||
conn, ok := e.connMgr.OnSignalMsg(e.ctx, msg.Key)
|
||||
if !ok {
|
||||
return fmt.Errorf("wrongly addressed message %s", msg.Key)
|
||||
}
|
||||
@@ -1578,13 +1611,39 @@ func (e *Engine) getRosenpassAddr() string {
|
||||
// RunHealthProbes executes health checks for Signal, Management, Relay and WireGuard services
|
||||
// and updates the status recorder with the latest states.
|
||||
func (e *Engine) RunHealthProbes() bool {
|
||||
e.syncMsgMux.Lock()
|
||||
|
||||
signalHealthy := e.signal.IsHealthy()
|
||||
log.Debugf("signal health check: healthy=%t", signalHealthy)
|
||||
|
||||
managementHealthy := e.mgmClient.IsHealthy()
|
||||
log.Debugf("management health check: healthy=%t", managementHealthy)
|
||||
|
||||
results := append(e.probeSTUNs(), e.probeTURNs()...)
|
||||
stuns := slices.Clone(e.STUNs)
|
||||
turns := slices.Clone(e.TURNs)
|
||||
|
||||
if e.wgInterface != nil {
|
||||
stats, err := e.wgInterface.GetStats()
|
||||
if err != nil {
|
||||
log.Warnf("failed to get wireguard stats: %v", err)
|
||||
e.syncMsgMux.Unlock()
|
||||
return false
|
||||
}
|
||||
for _, key := range e.peerStore.PeersPubKey() {
|
||||
// wgStats could be zero value, in which case we just reset the stats
|
||||
wgStats, ok := stats[key]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if err := e.statusRecorder.UpdateWireGuardPeerState(key, wgStats); err != nil {
|
||||
log.Debugf("failed to update wg stats for peer %s: %s", key, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
e.syncMsgMux.Unlock()
|
||||
|
||||
results := e.probeICE(stuns, turns)
|
||||
e.statusRecorder.UpdateRelayStates(results)
|
||||
|
||||
relayHealthy := true
|
||||
@@ -1596,37 +1655,16 @@ func (e *Engine) RunHealthProbes() bool {
|
||||
}
|
||||
log.Debugf("relay health check: healthy=%t", relayHealthy)
|
||||
|
||||
for _, key := range e.peerStore.PeersPubKey() {
|
||||
wgStats, err := e.wgInterface.GetStats(key)
|
||||
if err != nil {
|
||||
log.Debugf("failed to get wg stats for peer %s: %s", key, err)
|
||||
continue
|
||||
}
|
||||
// wgStats could be zero value, in which case we just reset the stats
|
||||
if err := e.statusRecorder.UpdateWireGuardPeerState(key, wgStats); err != nil {
|
||||
log.Debugf("failed to update wg stats for peer %s: %s", key, err)
|
||||
}
|
||||
}
|
||||
|
||||
allHealthy := signalHealthy && managementHealthy && relayHealthy
|
||||
log.Debugf("all health checks completed: healthy=%t", allHealthy)
|
||||
return allHealthy
|
||||
}
|
||||
|
||||
func (e *Engine) probeSTUNs() []relay.ProbeResult {
|
||||
e.syncMsgMux.Lock()
|
||||
stuns := slices.Clone(e.STUNs)
|
||||
e.syncMsgMux.Unlock()
|
||||
|
||||
return relay.ProbeAll(e.ctx, relay.ProbeSTUN, stuns)
|
||||
}
|
||||
|
||||
func (e *Engine) probeTURNs() []relay.ProbeResult {
|
||||
e.syncMsgMux.Lock()
|
||||
turns := slices.Clone(e.TURNs)
|
||||
e.syncMsgMux.Unlock()
|
||||
|
||||
return relay.ProbeAll(e.ctx, relay.ProbeTURN, turns)
|
||||
func (e *Engine) probeICE(stuns, turns []*stun.URI) []relay.ProbeResult {
|
||||
return append(
|
||||
relay.ProbeAll(e.ctx, relay.ProbeSTUN, stuns),
|
||||
relay.ProbeAll(e.ctx, relay.ProbeSTUN, turns)...,
|
||||
)
|
||||
}
|
||||
|
||||
// restartEngine restarts the engine by cancelling the client context
|
||||
@@ -1813,21 +1851,21 @@ func (e *Engine) Address() (netip.Addr, error) {
|
||||
return ip.Unmap(), nil
|
||||
}
|
||||
|
||||
func (e *Engine) updateForwardRules(rules []*mgmProto.ForwardingRule) error {
|
||||
func (e *Engine) updateForwardRules(rules []*mgmProto.ForwardingRule) ([]firewallManager.ForwardRule, error) {
|
||||
if e.firewall == nil {
|
||||
log.Warn("firewall is disabled, not updating forwarding rules")
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if len(rules) == 0 {
|
||||
if e.ingressGatewayMgr == nil {
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
err := e.ingressGatewayMgr.Close()
|
||||
e.ingressGatewayMgr = nil
|
||||
e.statusRecorder.SetIngressGwMgr(nil)
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if e.ingressGatewayMgr == nil {
|
||||
@@ -1878,7 +1916,33 @@ func (e *Engine) updateForwardRules(rules []*mgmProto.ForwardingRule) error {
|
||||
log.Errorf("failed to update forwarding rules: %v", err)
|
||||
}
|
||||
|
||||
return nberrors.FormatErrorOrNil(merr)
|
||||
return forwardingRules, nberrors.FormatErrorOrNil(merr)
|
||||
}
|
||||
|
||||
func (e *Engine) toExcludedLazyPeers(routes []*route.Route, rules []firewallManager.ForwardRule, peers []*mgmProto.RemotePeerConfig) []string {
|
||||
excludedPeers := make([]string, 0)
|
||||
for _, r := range routes {
|
||||
if r.Peer == "" {
|
||||
continue
|
||||
}
|
||||
log.Infof("exclude router peer from lazy connection: %s", r.Peer)
|
||||
excludedPeers = append(excludedPeers, r.Peer)
|
||||
}
|
||||
|
||||
for _, r := range rules {
|
||||
ip := r.TranslatedAddress
|
||||
for _, p := range peers {
|
||||
for _, allowedIP := range p.GetAllowedIps() {
|
||||
if allowedIP != ip.String() {
|
||||
continue
|
||||
}
|
||||
log.Infof("exclude forwarder peer from lazy connection: %s", p.GetWgPubKey())
|
||||
excludedPeers = append(excludedPeers, p.GetWgPubKey())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return excludedPeers
|
||||
}
|
||||
|
||||
// isChecksEqual checks if two slices of checks are equal.
|
||||
|
Reference in New Issue
Block a user