mirror of
https://github.com/netbirdio/netbird.git
synced 2025-07-21 16:28:16 +02:00
304 lines
8.0 KiB
Go
304 lines
8.0 KiB
Go
package internal
|
|
|
|
import (
|
|
"context"
|
|
"os"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
"github.com/netbirdio/netbird/client/internal/lazyconn"
|
|
"github.com/netbirdio/netbird/client/internal/lazyconn/manager"
|
|
"github.com/netbirdio/netbird/client/internal/peer"
|
|
"github.com/netbirdio/netbird/client/internal/peer/dispatcher"
|
|
"github.com/netbirdio/netbird/client/internal/peerstore"
|
|
)
|
|
|
|
// ConnMgr coordinates both lazy connections (established on-demand) and permanent peer connections.
|
|
//
|
|
// The connection manager is responsible for:
|
|
// - Managing lazy connections via the lazyConnManager
|
|
// - Maintaining a list of excluded peers that should always have permanent connections
|
|
// - Handling connection establishment based on peer signaling
|
|
//
|
|
// The implementation is not thread-safe; it is protected by engine.syncMsgMux.
|
|
type ConnMgr struct {
|
|
peerStore *peerstore.Store
|
|
statusRecorder *peer.Status
|
|
iface lazyconn.WGIface
|
|
dispatcher *dispatcher.ConnectionDispatcher
|
|
enabledLocally bool
|
|
|
|
lazyConnMgr *manager.Manager
|
|
|
|
wg sync.WaitGroup
|
|
ctx context.Context
|
|
ctxCancel context.CancelFunc
|
|
}
|
|
|
|
func NewConnMgr(engineConfig *EngineConfig, statusRecorder *peer.Status, peerStore *peerstore.Store, iface lazyconn.WGIface, dispatcher *dispatcher.ConnectionDispatcher) *ConnMgr {
|
|
e := &ConnMgr{
|
|
peerStore: peerStore,
|
|
statusRecorder: statusRecorder,
|
|
iface: iface,
|
|
dispatcher: dispatcher,
|
|
}
|
|
if engineConfig.LazyConnectionEnabled || lazyconn.IsLazyConnEnabledByEnv() {
|
|
e.enabledLocally = true
|
|
}
|
|
return e
|
|
}
|
|
|
|
// Start initializes the connection manager and starts the lazy connection manager if enabled by env var or cmd line option.
|
|
func (e *ConnMgr) Start(ctx context.Context) {
|
|
if e.lazyConnMgr != nil {
|
|
log.Errorf("lazy connection manager is already started")
|
|
return
|
|
}
|
|
|
|
if !e.enabledLocally {
|
|
log.Infof("lazy connection manager is disabled")
|
|
return
|
|
}
|
|
|
|
e.initLazyManager(ctx)
|
|
e.statusRecorder.UpdateLazyConnection(true)
|
|
}
|
|
|
|
// UpdatedRemoteFeatureFlag is called when the remote feature flag is updated.
|
|
// If enabled, it initializes the lazy connection manager and start it. Do not need to call Start() again.
|
|
// If disabled, then it closes the lazy connection manager and open the connections to all peers.
|
|
func (e *ConnMgr) UpdatedRemoteFeatureFlag(ctx context.Context, enabled bool) error {
|
|
// do not disable lazy connection manager if it was enabled by env var
|
|
if e.enabledLocally {
|
|
return nil
|
|
}
|
|
|
|
if enabled {
|
|
// if the lazy connection manager is already started, do not start it again
|
|
if e.lazyConnMgr != nil {
|
|
return nil
|
|
}
|
|
|
|
log.Infof("lazy connection manager is enabled by management feature flag")
|
|
e.initLazyManager(ctx)
|
|
e.statusRecorder.UpdateLazyConnection(true)
|
|
return e.addPeersToLazyConnManager(ctx)
|
|
} else {
|
|
if e.lazyConnMgr == nil {
|
|
return nil
|
|
}
|
|
log.Infof("lazy connection manager is disabled by management feature flag")
|
|
e.closeManager(ctx)
|
|
e.statusRecorder.UpdateLazyConnection(false)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// SetExcludeList sets the list of peer IDs that should always have permanent connections.
|
|
func (e *ConnMgr) SetExcludeList(peerIDs map[string]bool) {
|
|
if e.lazyConnMgr == nil {
|
|
return
|
|
}
|
|
|
|
excludedPeers := make([]lazyconn.PeerConfig, 0, len(peerIDs))
|
|
|
|
for peerID := range peerIDs {
|
|
var peerConn *peer.Conn
|
|
var exists bool
|
|
if peerConn, exists = e.peerStore.PeerConn(peerID); !exists {
|
|
log.Warnf("failed to find peer conn for peerID: %s", peerID)
|
|
continue
|
|
}
|
|
|
|
lazyPeerCfg := lazyconn.PeerConfig{
|
|
PublicKey: peerID,
|
|
AllowedIPs: peerConn.WgConfig().AllowedIps,
|
|
PeerConnID: peerConn.ConnID(),
|
|
Log: peerConn.Log,
|
|
}
|
|
excludedPeers = append(excludedPeers, lazyPeerCfg)
|
|
}
|
|
|
|
added := e.lazyConnMgr.ExcludePeer(e.ctx, excludedPeers)
|
|
for _, peerID := range added {
|
|
var peerConn *peer.Conn
|
|
var exists bool
|
|
if peerConn, exists = e.peerStore.PeerConn(peerID); !exists {
|
|
// if the peer not exist in the store, it means that the engine will call the AddPeerConn in next step
|
|
continue
|
|
}
|
|
|
|
peerConn.Log.Infof("peer has been added to lazy connection exclude list, opening permanent connection")
|
|
if err := peerConn.Open(e.ctx); err != nil {
|
|
peerConn.Log.Errorf("failed to open connection: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *ConnMgr) AddPeerConn(ctx context.Context, peerKey string, conn *peer.Conn) (exists bool) {
|
|
if success := e.peerStore.AddPeerConn(peerKey, conn); !success {
|
|
return true
|
|
}
|
|
|
|
if !e.isStartedWithLazyMgr() {
|
|
if err := conn.Open(ctx); err != nil {
|
|
conn.Log.Errorf("failed to open connection: %v", err)
|
|
}
|
|
return
|
|
}
|
|
|
|
if !lazyconn.IsSupported(conn.AgentVersionString()) {
|
|
conn.Log.Warnf("peer does not support lazy connection (%s), open permanent connection", conn.AgentVersionString())
|
|
if err := conn.Open(ctx); err != nil {
|
|
conn.Log.Errorf("failed to open connection: %v", err)
|
|
}
|
|
return
|
|
}
|
|
|
|
lazyPeerCfg := lazyconn.PeerConfig{
|
|
PublicKey: peerKey,
|
|
AllowedIPs: conn.WgConfig().AllowedIps,
|
|
PeerConnID: conn.ConnID(),
|
|
Log: conn.Log,
|
|
}
|
|
excluded, err := e.lazyConnMgr.AddPeer(lazyPeerCfg)
|
|
if err != nil {
|
|
conn.Log.Errorf("failed to add peer to lazyconn manager: %v", err)
|
|
if err := conn.Open(ctx); err != nil {
|
|
conn.Log.Errorf("failed to open connection: %v", err)
|
|
}
|
|
return
|
|
}
|
|
|
|
if excluded {
|
|
conn.Log.Infof("peer is on lazy conn manager exclude list, opening connection")
|
|
if err := conn.Open(ctx); err != nil {
|
|
conn.Log.Errorf("failed to open connection: %v", err)
|
|
}
|
|
return
|
|
}
|
|
|
|
conn.Log.Infof("peer added to lazy conn manager")
|
|
return
|
|
}
|
|
|
|
func (e *ConnMgr) RemovePeerConn(peerKey string) {
|
|
conn, ok := e.peerStore.Remove(peerKey)
|
|
if !ok {
|
|
return
|
|
}
|
|
defer conn.Close()
|
|
|
|
if !e.isStartedWithLazyMgr() {
|
|
return
|
|
}
|
|
|
|
e.lazyConnMgr.RemovePeer(peerKey)
|
|
conn.Log.Infof("removed peer from lazy conn manager")
|
|
}
|
|
|
|
func (e *ConnMgr) OnSignalMsg(ctx context.Context, peerKey string) (*peer.Conn, bool) {
|
|
conn, ok := e.peerStore.PeerConn(peerKey)
|
|
if !ok {
|
|
return nil, false
|
|
}
|
|
|
|
if !e.isStartedWithLazyMgr() {
|
|
return conn, true
|
|
}
|
|
|
|
if found := e.lazyConnMgr.ActivatePeer(ctx, peerKey); found {
|
|
conn.Log.Infof("activated peer from inactive state")
|
|
if err := conn.Open(e.ctx); err != nil {
|
|
conn.Log.Errorf("failed to open connection: %v", err)
|
|
}
|
|
}
|
|
return conn, true
|
|
}
|
|
|
|
func (e *ConnMgr) Close() {
|
|
if !e.isStartedWithLazyMgr() {
|
|
return
|
|
}
|
|
|
|
e.ctxCancel()
|
|
e.wg.Wait()
|
|
e.lazyConnMgr = nil
|
|
}
|
|
|
|
func (e *ConnMgr) initLazyManager(parentCtx context.Context) {
|
|
cfg := manager.Config{
|
|
InactivityThreshold: inactivityThresholdEnv(),
|
|
}
|
|
e.lazyConnMgr = manager.NewManager(cfg, e.peerStore, e.iface, e.dispatcher)
|
|
|
|
ctx, cancel := context.WithCancel(parentCtx)
|
|
e.ctx = ctx
|
|
e.ctxCancel = cancel
|
|
|
|
e.wg.Add(1)
|
|
go func() {
|
|
defer e.wg.Done()
|
|
e.lazyConnMgr.Start(ctx)
|
|
}()
|
|
}
|
|
|
|
func (e *ConnMgr) addPeersToLazyConnManager(ctx context.Context) error {
|
|
peers := e.peerStore.PeersPubKey()
|
|
lazyPeerCfgs := make([]lazyconn.PeerConfig, 0, len(peers))
|
|
for _, peerID := range peers {
|
|
var peerConn *peer.Conn
|
|
var exists bool
|
|
if peerConn, exists = e.peerStore.PeerConn(peerID); !exists {
|
|
log.Warnf("failed to find peer conn for peerID: %s", peerID)
|
|
continue
|
|
}
|
|
|
|
lazyPeerCfg := lazyconn.PeerConfig{
|
|
PublicKey: peerID,
|
|
AllowedIPs: peerConn.WgConfig().AllowedIps,
|
|
PeerConnID: peerConn.ConnID(),
|
|
Log: peerConn.Log,
|
|
}
|
|
lazyPeerCfgs = append(lazyPeerCfgs, lazyPeerCfg)
|
|
}
|
|
|
|
return e.lazyConnMgr.AddActivePeers(ctx, lazyPeerCfgs)
|
|
}
|
|
|
|
func (e *ConnMgr) closeManager(ctx context.Context) {
|
|
if e.lazyConnMgr == nil {
|
|
return
|
|
}
|
|
|
|
e.ctxCancel()
|
|
e.wg.Wait()
|
|
e.lazyConnMgr = nil
|
|
|
|
for _, peerID := range e.peerStore.PeersPubKey() {
|
|
e.peerStore.PeerConnOpen(ctx, peerID)
|
|
}
|
|
}
|
|
|
|
func (e *ConnMgr) isStartedWithLazyMgr() bool {
|
|
return e.lazyConnMgr != nil && e.ctxCancel != nil
|
|
}
|
|
|
|
func inactivityThresholdEnv() *time.Duration {
|
|
envValue := os.Getenv(lazyconn.EnvInactivityThreshold)
|
|
if envValue == "" {
|
|
return nil
|
|
}
|
|
|
|
parsedMinutes, err := strconv.Atoi(envValue)
|
|
if err != nil || parsedMinutes <= 0 {
|
|
return nil
|
|
}
|
|
|
|
d := time.Duration(parsedMinutes) * time.Minute
|
|
return &d
|
|
}
|