package internal import ( "context" "fmt" nbssh "github.com/netbirdio/netbird/client/ssh" nbstatus "github.com/netbirdio/netbird/client/status" "math/rand" "net" "reflect" "runtime" "strings" "sync" "time" "github.com/netbirdio/netbird/client/internal/peer" "github.com/netbirdio/netbird/client/internal/proxy" "github.com/netbirdio/netbird/iface" mgm "github.com/netbirdio/netbird/management/client" mgmProto "github.com/netbirdio/netbird/management/proto" signal "github.com/netbirdio/netbird/signal/client" sProto "github.com/netbirdio/netbird/signal/proto" "github.com/netbirdio/netbird/util" "github.com/pion/ice/v2" log "github.com/sirupsen/logrus" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" ) // PeerConnectionTimeoutMax is a timeout of an initial connection attempt to a remote peer. // E.g. this peer will wait PeerConnectionTimeoutMax for the remote peer to respond, // if not successful then it will retry the connection attempt. // Todo pass timeout at EnginConfig const ( PeerConnectionTimeoutMax = 45000 // ms PeerConnectionTimeoutMin = 30000 // ms ) var ErrResetConnection = fmt.Errorf("reset connection") // EngineConfig is a config for the Engine type EngineConfig struct { WgPort int WgIfaceName string // WgAddr is a Wireguard local address (Netbird Network IP) WgAddr string // WgPrivateKey is a Wireguard private key of our peer (it MUST never leave the machine) WgPrivateKey wgtypes.Key // IFaceBlackList is a list of network interfaces to ignore when discovering connection candidates (ICE related) IFaceBlackList []string PreSharedKey *wgtypes.Key // UDPMuxPort default value 0 - the system will pick an available port UDPMuxPort int // UDPMuxSrflxPort default value 0 - the system will pick an available port UDPMuxSrflxPort int // SSHKey is a private SSH key in a PEM format SSHKey []byte } // Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers. type Engine struct { // signal is a Signal Service client signal signal.Client // mgmClient is a Management Service client mgmClient mgm.Client // peerConns is a map that holds all the peers that are known to this peer peerConns map[string]*peer.Conn // syncMsgMux is used to guarantee sequential Management Service message processing syncMsgMux *sync.Mutex config *EngineConfig // STUNs is a list of STUN servers used by ICE STUNs []*ice.URL // TURNs is a list of STUN servers used by ICE TURNs []*ice.URL cancel context.CancelFunc ctx context.Context wgInterface *iface.WGIface udpMux ice.UDPMux udpMuxSrflx ice.UniversalUDPMux udpMuxConn *net.UDPConn udpMuxConnSrflx *net.UDPConn // networkSerial is the latest CurrentSerial (state ID) of the network sent by the Management service networkSerial uint64 sshServerFunc func(hostKeyPEM []byte, addr string) (nbssh.Server, error) sshServer nbssh.Server statusRecorder *nbstatus.Status } // Peer is an instance of the Connection Peer type Peer struct { WgPubKey string WgAllowedIps string } // NewEngine creates a new Connection Engine func NewEngine( ctx context.Context, cancel context.CancelFunc, signalClient signal.Client, mgmClient mgm.Client, config *EngineConfig, statusRecorder *nbstatus.Status, ) *Engine { return &Engine{ ctx: ctx, cancel: cancel, signal: signalClient, mgmClient: mgmClient, peerConns: map[string]*peer.Conn{}, syncMsgMux: &sync.Mutex{}, config: config, STUNs: []*ice.URL{}, TURNs: []*ice.URL{}, networkSerial: 0, sshServerFunc: nbssh.DefaultSSHServer, statusRecorder: statusRecorder, } } func (e *Engine) Stop() error { e.syncMsgMux.Lock() defer e.syncMsgMux.Unlock() err := e.removeAllPeers() if err != nil { return err } // very ugly but we want to remove peers from the WireGuard interface first before removing interface. // Removing peers happens in the conn.CLose() asynchronously time.Sleep(500 * time.Millisecond) log.Debugf("removing Netbird interface %s", e.config.WgIfaceName) if e.wgInterface.Interface != nil { err = e.wgInterface.Close() if err != nil { log.Errorf("failed closing Netbird interface %s %v", e.config.WgIfaceName, err) return err } } if e.udpMux != nil { if err := e.udpMux.Close(); err != nil { log.Debugf("close udp mux: %v", err) } } if e.udpMuxSrflx != nil { if err := e.udpMuxSrflx.Close(); err != nil { log.Debugf("close server reflexive udp mux: %v", err) } } if e.udpMuxConn != nil { if err := e.udpMuxConn.Close(); err != nil { log.Debugf("close udp mux connection: %v", err) } } if e.udpMuxConnSrflx != nil { if err := e.udpMuxConnSrflx.Close(); err != nil { log.Debugf("close server reflexive udp mux connection: %v", err) } } if !isNil(e.sshServer) { err := e.sshServer.Stop() if err != nil { log.Warnf("failed stopping the SSH server: %v", err) } } log.Infof("stopped Netbird Engine") return nil } // Start creates a new Wireguard tunnel interface and listens to events from Signal and Management services // Connections to remote peers are not established here. // However, they will be established once an event with a list of peers to connect to will be received from Management Service func (e *Engine) Start() error { e.syncMsgMux.Lock() defer e.syncMsgMux.Unlock() wgIfaceName := e.config.WgIfaceName wgAddr := e.config.WgAddr myPrivateKey := e.config.WgPrivateKey var err error e.wgInterface, err = iface.NewWGIFace(wgIfaceName, wgAddr, iface.DefaultMTU) if err != nil { log.Errorf("failed creating wireguard interface instance %s: [%s]", wgIfaceName, err.Error()) return err } e.udpMuxConn, err = net.ListenUDP("udp4", &net.UDPAddr{Port: e.config.UDPMuxPort}) if err != nil { log.Errorf("failed listening on UDP port %d: [%s]", e.config.UDPMuxPort, err.Error()) return err } e.udpMuxConnSrflx, err = net.ListenUDP("udp4", &net.UDPAddr{Port: e.config.UDPMuxSrflxPort}) if err != nil { log.Errorf("failed listening on UDP port %d: [%s]", e.config.UDPMuxSrflxPort, err.Error()) return err } e.udpMux = ice.NewUDPMuxDefault(ice.UDPMuxParams{UDPConn: e.udpMuxConn}) e.udpMuxSrflx = ice.NewUniversalUDPMuxDefault(ice.UniversalUDPMuxParams{UDPConn: e.udpMuxConnSrflx}) err = e.wgInterface.Create() if err != nil { log.Errorf("failed creating tunnel interface %s: [%s]", wgIfaceName, err.Error()) return err } err = e.wgInterface.Configure(myPrivateKey.String(), e.config.WgPort) if err != nil { log.Errorf("failed configuring Wireguard interface [%s]: %s", wgIfaceName, err.Error()) return err } e.receiveSignalEvents() e.receiveManagementEvents() return nil } // modifyPeers updates peers that have been modified (e.g. IP address has been changed). // It closes the existing connection, removes it from the peerConns map, and creates a new one. func (e *Engine) modifyPeers(peersUpdate []*mgmProto.RemotePeerConfig) error { // first, check if peers have been modified var modified []*mgmProto.RemotePeerConfig for _, p := range peersUpdate { if peerConn, ok := e.peerConns[p.GetWgPubKey()]; ok { if peerConn.GetConf().ProxyConfig.AllowedIps != strings.Join(p.AllowedIps, ",") { modified = append(modified, p) } } } // second, close all modified connections and remove them from the state map for _, p := range modified { err := e.removePeer(p.GetWgPubKey()) if err != nil { return err } } // third, add the peer connections again for _, p := range modified { err := e.addNewPeer(p) if err != nil { return err } } return nil } // removePeers finds and removes peers that do not exist anymore in the network map received from the Management Service. // It also removes peers that have been modified (e.g. change of IP address). They will be added again in addPeers method. func (e *Engine) removePeers(peersUpdate []*mgmProto.RemotePeerConfig) error { currentPeers := make([]string, 0, len(e.peerConns)) for p := range e.peerConns { currentPeers = append(currentPeers, p) } newPeers := make([]string, 0, len(peersUpdate)) for _, p := range peersUpdate { newPeers = append(newPeers, p.GetWgPubKey()) } toRemove := util.SliceDiff(currentPeers, newPeers) for _, p := range toRemove { err := e.removePeer(p) if err != nil { return err } log.Infof("removed peer %s", p) } return nil } func (e *Engine) removeAllPeers() error { log.Debugf("removing all peer connections") for p := range e.peerConns { err := e.removePeer(p) if err != nil { return err } } return nil } // removePeer closes an existing peer connection, removes a peer, and clears authorized key of the SSH server func (e *Engine) removePeer(peerKey string) error { log.Debugf("removing peer from engine %s", peerKey) if !isNil(e.sshServer) { e.sshServer.RemoveAuthorizedKey(peerKey) } defer func() { err := e.statusRecorder.RemovePeer(peerKey) if err != nil { log.Warnf("received error when removing peer %s from status recorder: %v", peerKey, err) } }() conn, exists := e.peerConns[peerKey] if exists { delete(e.peerConns, peerKey) err := conn.Close() if err != nil { switch err.(type) { case *peer.ConnectionAlreadyClosedError: return nil default: return err } } } return nil } // GetPeerConnectionStatus returns a connection Status or nil if peer connection wasn't found func (e *Engine) GetPeerConnectionStatus(peerKey string) peer.ConnStatus { conn, exists := e.peerConns[peerKey] if exists && conn != nil { return conn.Status() } return -1 } func (e *Engine) GetPeers() []string { e.syncMsgMux.Lock() defer e.syncMsgMux.Unlock() peers := []string{} for s := range e.peerConns { peers = append(peers, s) } return peers } // GetConnectedPeers returns a connection Status or nil if peer connection wasn't found func (e *Engine) GetConnectedPeers() []string { e.syncMsgMux.Lock() defer e.syncMsgMux.Unlock() peers := []string{} for s, conn := range e.peerConns { if conn.Status() == peer.StatusConnected { peers = append(peers, s) } } return peers } func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtypes.Key, s signal.Client) error { err := s.Send(&sProto.Message{ Key: myKey.PublicKey().String(), RemoteKey: remoteKey.String(), Body: &sProto.Body{ Type: sProto.Body_CANDIDATE, Payload: candidate.Marshal(), }, }) if err != nil { log.Errorf("failed signaling candidate to the remote peer %s %s", remoteKey.String(), err) // todo ?? return err } return nil } func signalAuth(uFrag string, pwd string, myKey wgtypes.Key, remoteKey wgtypes.Key, s signal.Client, isAnswer bool) error { var t sProto.Body_Type if isAnswer { t = sProto.Body_ANSWER } else { t = sProto.Body_OFFER } msg, err := signal.MarshalCredential(myKey, remoteKey, &signal.Credential{ UFrag: uFrag, Pwd: pwd, }, t) if err != nil { return err } err = s.Send(msg) if err != nil { return err } return nil } func (e *Engine) handleSync(update *mgmProto.SyncResponse) error { e.syncMsgMux.Lock() defer e.syncMsgMux.Unlock() if update.GetWiretrusteeConfig() != nil { err := e.updateTURNs(update.GetWiretrusteeConfig().GetTurns()) if err != nil { return err } err = e.updateSTUNs(update.GetWiretrusteeConfig().GetStuns()) if err != nil { return err } // todo update signal } if update.GetNetworkMap() != nil { // only apply new changes and ignore old ones err := e.updateNetworkMap(update.GetNetworkMap()) if err != nil { return err } } return nil } func isNil(server nbssh.Server) bool { return server == nil || reflect.ValueOf(server).IsNil() } func (e *Engine) updateSSH(sshConf *mgmProto.SSHConfig) error { if sshConf.GetSshEnabled() { if runtime.GOOS == "windows" { log.Warnf("running SSH server on Windows is not supported") return nil } // start SSH server if it wasn't running if isNil(e.sshServer) { //nil sshServer means it has not yet been started var err error e.sshServer, err = e.sshServerFunc(e.config.SSHKey, fmt.Sprintf("%s:%d", e.wgInterface.Address.IP.String(), nbssh.DefaultSSHPort)) if err != nil { return err } go func() { // blocking err = e.sshServer.Start() if err != nil { // will throw error when we stop it even if it is a graceful stop log.Debugf("stopped SSH server with error %v", err) } e.syncMsgMux.Lock() defer e.syncMsgMux.Unlock() e.sshServer = nil log.Infof("stopped SSH server") }() } else { log.Debugf("SSH server is already running") } } else { // Disable SSH server request, so stop it if it was running if !isNil(e.sshServer) { err := e.sshServer.Stop() if err != nil { log.Warnf("failed to stop SSH server %v", err) } e.sshServer = nil } } return nil } func (e *Engine) updateConfig(conf *mgmProto.PeerConfig) error { if e.wgInterface.Address.String() != conf.Address { oldAddr := e.wgInterface.Address.String() log.Debugf("updating peer address from %s to %s", oldAddr, conf.Address) err := e.wgInterface.UpdateAddr(conf.Address) if err != nil { return err } e.config.WgAddr = conf.Address log.Infof("updated peer address from %s to %s", oldAddr, conf.Address) } if conf.GetSshConfig() != nil { err := e.updateSSH(conf.GetSshConfig()) if err != nil { log.Warnf("failed handling SSH server setup %v", e) } } return nil } // receiveManagementEvents connects to the Management Service event stream to receive updates from the management service // E.g. when a new peer has been registered and we are allowed to connect to it. func (e *Engine) receiveManagementEvents() { go func() { err := e.mgmClient.Sync(func(update *mgmProto.SyncResponse) error { return e.handleSync(update) }) if err != nil { // happens if management is unavailable for a long time. // We want to cancel the operation of the whole client _ = CtxGetState(e.ctx).Wrap(ErrResetConnection) e.cancel() return } log.Debugf("stopped receiving updates from Management Service") }() log.Debugf("connecting to Management Service updates stream") } func (e *Engine) updateSTUNs(stuns []*mgmProto.HostConfig) error { if len(stuns) == 0 { return nil } var newSTUNs []*ice.URL log.Debugf("got STUNs update from Management Service, updating") for _, stun := range stuns { url, err := ice.ParseURL(stun.Uri) if err != nil { return err } newSTUNs = append(newSTUNs, url) } e.STUNs = newSTUNs return nil } func (e *Engine) updateTURNs(turns []*mgmProto.ProtectedHostConfig) error { if len(turns) == 0 { return nil } var newTURNs []*ice.URL log.Debugf("got TURNs update from Management Service, updating") for _, turn := range turns { url, err := ice.ParseURL(turn.HostConfig.Uri) if err != nil { return err } url.Username = turn.User url.Password = turn.Password newTURNs = append(newTURNs, url) } e.TURNs = newTURNs return nil } func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error { // intentionally leave it before checking serial because for now it can happen that peer IP changed but serial didn't if networkMap.GetPeerConfig() != nil { err := e.updateConfig(networkMap.GetPeerConfig()) if err != nil { return err } } serial := networkMap.GetSerial() if e.networkSerial > serial { log.Debugf("received outdated NetworkMap with serial %d, ignoring", serial) return nil } log.Debugf("got peers update from Management Service, total peers to connect to = %d", len(networkMap.GetRemotePeers())) // cleanup request, most likely our peer has been deleted if networkMap.GetRemotePeersIsEmpty() { err := e.removeAllPeers() if err != nil { return err } } else { err := e.removePeers(networkMap.GetRemotePeers()) if err != nil { return err } err = e.modifyPeers(networkMap.GetRemotePeers()) if err != nil { return err } err = e.addNewPeers(networkMap.GetRemotePeers()) if err != nil { return err } // update SSHServer by adding remote peer SSH keys if !isNil(e.sshServer) { for _, config := range networkMap.GetRemotePeers() { if config.GetSshConfig() != nil && config.GetSshConfig().GetSshPubKey() != nil { err := e.sshServer.AddAuthorizedKey(config.WgPubKey, string(config.GetSshConfig().GetSshPubKey())) if err != nil { log.Warnf("failed adding authroized key to SSH DefaultServer %v", err) } } } } } e.networkSerial = serial return nil } // addNewPeers adds peers that were not know before but arrived from the Management service with the update func (e *Engine) addNewPeers(peersUpdate []*mgmProto.RemotePeerConfig) error { for _, p := range peersUpdate { err := e.addNewPeer(p) if err != nil { return err } } return nil } // addNewPeer add peer if connection doesn't exist func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error { peerKey := peerConfig.GetWgPubKey() peerIPs := peerConfig.GetAllowedIps() if _, ok := e.peerConns[peerKey]; !ok { conn, err := e.createPeerConn(peerKey, strings.Join(peerIPs, ",")) if err != nil { return err } e.peerConns[peerKey] = conn err = e.statusRecorder.AddPeer(peerKey) if err != nil { log.Warnf("error adding peer %s to status recorder, got error: %v", peerKey, err) } go e.connWorker(conn, peerKey) } return nil } func (e *Engine) connWorker(conn *peer.Conn, peerKey string) { for { // randomize starting time a bit min := 500 max := 2000 time.Sleep(time.Duration(rand.Intn(max-min)+min) * time.Millisecond) // if peer has been removed -> give up if !e.peerExists(peerKey) { log.Debugf("peer %s doesn't exist anymore, won't retry connection", peerKey) return } if !e.signal.Ready() { log.Infof("signal client isn't ready, skipping connection attempt %s", peerKey) continue } // we might have received new STUN and TURN servers meanwhile, so update them e.syncMsgMux.Lock() conf := conn.GetConf() conf.StunTurn = append(e.STUNs, e.TURNs...) conn.UpdateConf(conf) e.syncMsgMux.Unlock() err := conn.Open() if err != nil { log.Debugf("connection to peer %s failed: %v", peerKey, err) switch err.(type) { case *peer.ConnectionClosedError: // conn has been forced to close, so we exit the loop return default: } } } } func (e Engine) peerExists(peerKey string) bool { e.syncMsgMux.Lock() defer e.syncMsgMux.Unlock() _, ok := e.peerConns[peerKey] return ok } func (e Engine) createPeerConn(pubKey string, allowedIPs string) (*peer.Conn, error) { var stunTurn []*ice.URL stunTurn = append(stunTurn, e.STUNs...) stunTurn = append(stunTurn, e.TURNs...) proxyConfig := proxy.Config{ RemoteKey: pubKey, WgListenAddr: fmt.Sprintf("127.0.0.1:%d", e.config.WgPort), WgInterface: e.wgInterface, AllowedIps: allowedIPs, PreSharedKey: e.config.PreSharedKey, } // randomize connection timeout timeout := time.Duration(rand.Intn(PeerConnectionTimeoutMax-PeerConnectionTimeoutMin)+PeerConnectionTimeoutMin) * time.Millisecond config := peer.ConnConfig{ Key: pubKey, LocalKey: e.config.WgPrivateKey.PublicKey().String(), StunTurn: stunTurn, InterfaceBlackList: e.config.IFaceBlackList, Timeout: timeout, UDPMux: e.udpMux, UDPMuxSrflx: e.udpMuxSrflx, ProxyConfig: proxyConfig, } peerConn, err := peer.NewConn(config, e.statusRecorder) if err != nil { return nil, err } wgPubKey, err := wgtypes.ParseKey(pubKey) if err != nil { return nil, err } signalOffer := func(uFrag string, pwd string) error { return signalAuth(uFrag, pwd, e.config.WgPrivateKey, wgPubKey, e.signal, false) } signalCandidate := func(candidate ice.Candidate) error { return signalCandidate(candidate, e.config.WgPrivateKey, wgPubKey, e.signal) } signalAnswer := func(uFrag string, pwd string) error { return signalAuth(uFrag, pwd, e.config.WgPrivateKey, wgPubKey, e.signal, true) } peerConn.SetSignalCandidate(signalCandidate) peerConn.SetSignalOffer(signalOffer) peerConn.SetSignalAnswer(signalAnswer) return peerConn, nil } // receiveSignalEvents connects to the Signal Service event stream to negotiate connection with remote peers func (e *Engine) receiveSignalEvents() { go func() { // connect to a stream of messages coming from the signal server err := e.signal.Receive(func(msg *sProto.Message) error { e.syncMsgMux.Lock() defer e.syncMsgMux.Unlock() conn := e.peerConns[msg.Key] if conn == nil { return fmt.Errorf("wrongly addressed message %s", msg.Key) } switch msg.GetBody().Type { case sProto.Body_OFFER: remoteCred, err := signal.UnMarshalCredential(msg) if err != nil { return err } conn.OnRemoteOffer(peer.IceCredentials{ UFrag: remoteCred.UFrag, Pwd: remoteCred.Pwd, }) case sProto.Body_ANSWER: remoteCred, err := signal.UnMarshalCredential(msg) if err != nil { return err } conn.OnRemoteAnswer(peer.IceCredentials{ UFrag: remoteCred.UFrag, Pwd: remoteCred.Pwd, }) case sProto.Body_CANDIDATE: candidate, err := ice.UnmarshalCandidate(msg.GetBody().Payload) if err != nil { log.Errorf("failed on parsing remote candidate %s -> %s", candidate, err) return err } conn.OnRemoteCandidate(candidate) } return nil }) if err != nil { // happens if signal is unavailable for a long time. // We want to cancel the operation of the whole client _ = CtxGetState(e.ctx).Wrap(ErrResetConnection) e.cancel() return } }() e.signal.WaitStreamConnected() }