From e407fe02c5730d09939f838f5ec3ae32b737e65b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= <zoltan.pmail@gmail.com> Date: Mon, 17 Jun 2024 17:52:22 +0200 Subject: [PATCH] Separate lifecircle of handshake, ice, relay connections - fix Stun, Turn address update thread safety issue - move conn worker login into peer package --- client/internal/engine.go | 189 +---- client/internal/peer/conn.go | 1072 ++++++------------------ client/internal/peer/conn_ice.go | 436 ++++++++++ client/internal/peer/conn_relay.go | 115 +++ client/internal/peer/conn_test.go | 10 +- client/internal/peer/handshaker.go | 210 +++++ client/internal/peer/stdnet.go | 4 +- client/internal/peer/stdnet_android.go | 4 +- client/internal/signaler.go | 71 ++ relay/client/manager.go | 4 + signal/client/client.go | 4 +- 11 files changed, 1139 insertions(+), 980 deletions(-) create mode 100644 client/internal/peer/conn_ice.go create mode 100644 client/internal/peer/conn_relay.go create mode 100644 client/internal/peer/handshaker.go create mode 100644 client/internal/signaler.go diff --git a/client/internal/engine.go b/client/internal/engine.go index a2f395ea0..d80d1f235 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -12,6 +12,7 @@ import ( "runtime" "strings" "sync" + "sync/atomic" "time" "github.com/pion/ice/v3" @@ -95,7 +96,8 @@ type EngineConfig struct { // Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers. type Engine struct { // signal is a Signal Service client - signal signal.Client + signal signal.Client + signaler *Signaler // mgmClient is a Management Service client mgmClient mgm.Client // peerConns is a map that holds all the peers that are known to this peer @@ -116,7 +118,8 @@ type Engine struct { // STUNs is a list of STUN servers used by ICE STUNs []*stun.URI // TURNs is a list of STUN servers used by ICE - TURNs []*stun.URI + TURNs []*stun.URI + StunTurn atomic.Value // clientRoutes is the most recent list of clientRoutes received from the Management Service clientRoutes route.HAMap @@ -154,8 +157,6 @@ type Engine struct { relayProbe *Probe wgProbe *Probe - wgConnWorker sync.WaitGroup - relayManager *relayClient.Manager } @@ -207,11 +208,11 @@ func NewEngineWithProbes( relayProbe *Probe, wgProbe *Probe, ) *Engine { - return &Engine{ clientCtx: clientCtx, clientCancel: clientCancel, signal: signalClient, + signaler: NewSignaler(signalClient, config.WgPrivateKey), mgmClient: mgmClient, relayManager: relayManager, peerConns: make(map[string]*peer.Conn), @@ -258,7 +259,6 @@ func (e *Engine) Stop() error { time.Sleep(500 * time.Millisecond) e.close() - e.wgConnWorker.Wait() log.Infof("stopped Netbird Engine") return nil } @@ -457,72 +457,11 @@ func (e *Engine) removePeer(peerKey string) error { conn, exists := e.peerConns[peerKey] if exists { delete(e.peerConns, peerKey) - err := conn.Close() - if err != nil { - switch err.(type) { - case *peer.ConnectionAlreadyClosedError: - return nil - default: - return err - } - } + conn.Close() } return nil } -func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtypes.Key, s signal.Client) error { - err := s.Send(&sProto.Message{ - Key: myKey.PublicKey().String(), - RemoteKey: remoteKey.String(), - Body: &sProto.Body{ - Type: sProto.Body_CANDIDATE, - Payload: candidate.Marshal(), - }, - }) - if err != nil { - return err - } - - return nil -} - -func sendSignal(message *sProto.Message, s signal.Client) error { - return s.Send(message) -} - -// SignalOfferAnswer signals either an offer or an answer to remote peer -func SignalOfferAnswer(offerAnswer peer.OfferAnswer, myKey wgtypes.Key, remoteKey wgtypes.Key, s signal.Client, - isAnswer bool) error { - var t sProto.Body_Type - if isAnswer { - t = sProto.Body_ANSWER - } else { - t = sProto.Body_OFFER - } - - msg, err := signal.MarshalCredential( - myKey, - offerAnswer.WgListenPort, - remoteKey, &signal.Credential{ - UFrag: offerAnswer.IceCredentials.UFrag, - Pwd: offerAnswer.IceCredentials.Pwd, - }, - t, - offerAnswer.RosenpassPubKey, - offerAnswer.RosenpassAddr, - offerAnswer.RelaySrvAddress) - if err != nil { - return err - } - - err = s.Send(msg) - if err != nil { - return err - } - - return nil -} - func (e *Engine) handleSync(update *mgmProto.SyncResponse) error { e.syncMsgMux.Lock() defer e.syncMsgMux.Unlock() @@ -538,6 +477,11 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error { return err } + var stunTurn []*stun.URI + stunTurn = append(stunTurn, e.STUNs...) + stunTurn = append(stunTurn, e.TURNs...) + e.StunTurn.Store(stunTurn) + // todo update relay address in the relay manager // todo update signal @@ -893,57 +837,10 @@ func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error { if err != nil { log.Warnf("error adding peer %s to status recorder, got error: %v", peerKey, err) } - - e.wgConnWorker.Add(1) - go e.connWorker(conn, peerKey) } return nil } -func (e *Engine) connWorker(conn *peer.Conn, peerKey string) { - defer e.wgConnWorker.Done() - for { - - // randomize starting time a bit - min := 500 - max := 2000 - duration := time.Duration(rand.Intn(max-min)+min) * time.Millisecond - select { - case <-e.ctx.Done(): - return - case <-time.After(duration): - } - - // if peer has been removed -> give up - if !e.peerExists(peerKey) { - log.Debugf("peer %s doesn't exist anymore, won't retry connection", peerKey) - return - } - - if !e.signal.Ready() { - log.Infof("signal client isn't ready, skipping connection attempt %s", peerKey) - continue - } - - // we might have received new STUN and TURN servers meanwhile, so update them - e.syncMsgMux.Lock() - conn.UpdateStunTurn(append(e.STUNs, e.TURNs...)) - e.syncMsgMux.Unlock() - - err := conn.Open(e.ctx) - if err != nil { - log.Debugf("connection to peer %s failed: %v", peerKey, err) - var connectionClosedError *peer.ConnectionClosedError - switch { - case errors.As(err, &connectionClosedError): - // conn has been forced to close, so we exit the loop - return - default: - } - } - } -} - func (e *Engine) peerExists(peerKey string) bool { e.syncMsgMux.Lock() defer e.syncMsgMux.Unlock() @@ -953,9 +850,6 @@ func (e *Engine) peerExists(peerKey string) bool { func (e *Engine) createPeerConn(pubKey string, allowedIPs string) (*peer.Conn, error) { log.Debugf("creating peer connection %s", pubKey) - var stunTurn []*stun.URI - stunTurn = append(stunTurn, e.STUNs...) - stunTurn = append(stunTurn, e.TURNs...) wgConfig := peer.WgConfig{ RemoteKey: pubKey, @@ -988,52 +882,29 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs string) (*peer.Conn, e // randomize connection timeout timeout := time.Duration(rand.Intn(PeerConnectionTimeoutMax-PeerConnectionTimeoutMin)+PeerConnectionTimeoutMin) * time.Millisecond config := peer.ConnConfig{ - Key: pubKey, - LocalKey: e.config.WgPrivateKey.PublicKey().String(), - StunTurn: stunTurn, - InterfaceBlackList: e.config.IFaceBlackList, - DisableIPv6Discovery: e.config.DisableIPv6Discovery, - Timeout: timeout, - UDPMux: e.udpMux.UDPMuxDefault, - UDPMuxSrflx: e.udpMux, - WgConfig: wgConfig, - LocalWgPort: e.config.WgPort, - NATExternalIPs: e.parseNATExternalIPMappings(), - RosenpassPubKey: e.getRosenpassPubKey(), - RosenpassAddr: e.getRosenpassAddr(), + Key: pubKey, + LocalKey: e.config.WgPrivateKey.PublicKey().String(), + Timeout: timeout, + WgConfig: wgConfig, + LocalWgPort: e.config.WgPort, + RosenpassPubKey: e.getRosenpassPubKey(), + RosenpassAddr: e.getRosenpassAddr(), + ICEConfig: peer.ICEConfig{ + StunTurn: e.StunTurn, + InterfaceBlackList: e.config.IFaceBlackList, + DisableIPv6Discovery: e.config.DisableIPv6Discovery, + UDPMux: e.udpMux.UDPMuxDefault, + UDPMuxSrflx: e.udpMux, + NATExternalIPs: e.parseNATExternalIPMappings(), + }, } - peerConn, err := peer.NewConn(config, e.statusRecorder, e.wgProxyFactory, e.mobileDep.TunAdapter, e.mobileDep.IFaceDiscover, e.relayManager) + peerConn, err := peer.NewConn(e.ctx, config, e.statusRecorder, e.wgProxyFactory, e.signaler, e.mobileDep.IFaceDiscover, e.relayManager) if err != nil { return nil, err } - wgPubKey, err := wgtypes.ParseKey(pubKey) - if err != nil { - return nil, err - } - - signalOffer := func(offerAnswer peer.OfferAnswer) error { - return SignalOfferAnswer(offerAnswer, e.config.WgPrivateKey, wgPubKey, e.signal, false) - } - - signalCandidate := func(candidate ice.Candidate) error { - return signalCandidate(candidate, e.config.WgPrivateKey, wgPubKey, e.signal) - } - - signalAnswer := func(offerAnswer peer.OfferAnswer) error { - return SignalOfferAnswer(offerAnswer, e.config.WgPrivateKey, wgPubKey, e.signal, true) - } - - peerConn.SetSignalCandidate(signalCandidate) - peerConn.SetSignalOffer(signalOffer) - peerConn.SetSignalAnswer(signalAnswer) - peerConn.SetSendSignalMessage(func(message *sProto.Message) error { - return sendSignal(message, e.signal) - }) - if e.rpManager != nil { - peerConn.SetOnConnected(e.rpManager.OnConnected) peerConn.SetOnDisconnected(e.rpManager.OnDisconnected) } @@ -1107,7 +978,7 @@ func (e *Engine) receiveSignalEvents() { return err } - conn.OnRemoteCandidate(candidate, e.GetClientRoutes()) + go conn.OnRemoteCandidate(candidate, e.GetClientRoutes()) case sProto.Body_MODE: } @@ -1402,7 +1273,7 @@ func (e *Engine) receiveProbeEvents() { for _, peer := range e.peerConns { key := peer.GetKey() - wgStats, err := peer.GetConf().WgConfig.WgInterface.GetStats(key) + wgStats, err := peer.WgConfig().WgInterface.GetStats(key) if err != nil { log.Debugf("failed to get wg stats for peer %s: %s", key, err) } diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index b2fb58855..16daf492e 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -2,37 +2,33 @@ package peer import ( "context" - "fmt" "net" - "net/netip" "runtime" "strings" "sync" "time" "github.com/pion/ice/v3" - "github.com/pion/stun/v2" log "github.com/sirupsen/logrus" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + "github.com/netbirdio/netbird/client/internal" "github.com/netbirdio/netbird/client/internal/stdnet" "github.com/netbirdio/netbird/client/internal/wgproxy" "github.com/netbirdio/netbird/iface" - "github.com/netbirdio/netbird/iface/bind" relayClient "github.com/netbirdio/netbird/relay/client" "github.com/netbirdio/netbird/route" - sProto "github.com/netbirdio/netbird/signal/proto" nbnet "github.com/netbirdio/netbird/util/net" - "github.com/netbirdio/netbird/version" ) -const ( - iceKeepAliveDefault = 4 * time.Second - iceDisconnectedTimeoutDefault = 6 * time.Second - // iceRelayAcceptanceMinWaitDefault is the same as in the Pion ICE package - iceRelayAcceptanceMinWaitDefault = 2 * time.Second +type ConnPriority int +const ( defaultWgKeepAlive = 25 * time.Second + + connPriorityRelay ConnPriority = 1 + connPriorityICETurn = 1 + connPriorityICEP2P = 2 ) type WgConfig struct { @@ -45,637 +41,151 @@ type WgConfig struct { // ConnConfig is a peer Connection configuration type ConnConfig struct { - // Key is a public key of a remote peer Key string // LocalKey is a public key of a local peer LocalKey string - // StunTurn is a list of STUN and TURN URLs - StunTurn []*stun.URI - - // InterfaceBlackList is a list of machine interfaces that should be filtered out by ICE Candidate gathering - // (e.g. if eth0 is in the list, host candidate of this interface won't be used) - InterfaceBlackList []string - DisableIPv6Discovery bool - Timeout time.Duration WgConfig WgConfig - UDPMux ice.UDPMux - UDPMuxSrflx ice.UniversalUDPMux - LocalWgPort int - NATExternalIPs []string - // RosenpassPubKey is this peer's Rosenpass public key RosenpassPubKey []byte // RosenpassPubKey is this peer's RosenpassAddr server address (IP:port) RosenpassAddr string -} -// OfferAnswer represents a session establishment offer or answer -type OfferAnswer struct { - IceCredentials IceCredentials - // WgListenPort is a remote WireGuard listen port. - // This field is used when establishing a direct WireGuard connection without any proxy. - // We can set the remote peer's endpoint with this port. - WgListenPort int - - // Version of NetBird Agent - Version string - // RosenpassPubKey is the Rosenpass public key of the remote peer when receiving this message - // This value is the local Rosenpass server public key when sending the message - RosenpassPubKey []byte - // RosenpassAddr is the Rosenpass server address (IP:port) of the remote peer when receiving this message - // This value is the local Rosenpass server address when sending the message - RosenpassAddr string - - // relay server address - RelaySrvAddress string -} - -// IceCredentials ICE protocol credentials struct -type IceCredentials struct { - UFrag string - Pwd string + // ICEConfig ICE protocol configuration + ICEConfig ICEConfig } type BeforeAddPeerHookFunc func(connID nbnet.ConnectionID, IP net.IP) error type AfterRemovePeerHookFunc func(connID nbnet.ConnectionID) error type Conn struct { - config ConnConfig - mu sync.Mutex - - // signalCandidate is a handler function to signal remote peer about local connection candidate - signalCandidate func(candidate ice.Candidate) error - // signalOffer is a handler function to signal remote peer our connection offer (credentials) - signalOffer func(OfferAnswer) error - signalAnswer func(OfferAnswer) error - sendSignalMessage func(message *sProto.Message) error - onConnected func(remoteWireGuardKey string, remoteRosenpassPubKey []byte, wireGuardIP string, remoteRosenpassAddr string) - onDisconnected func(remotePeer string, wgIP string) - - // remoteOffersCh is a channel used to wait for remote credentials to proceed with the connection - remoteOffersCh chan OfferAnswer - // remoteAnswerCh is a channel used to wait for remote credentials answer (confirmation of our offer) to proceed with the connection - remoteAnswerCh chan OfferAnswer - closeCh chan struct{} - ctx context.Context - notifyDisconnected context.CancelFunc - - agent *ice.Agent - status ConnStatus - + log *log.Entry + mu sync.Mutex + ctx context.Context + ctxCancel context.CancelFunc + config ConnConfig statusRecorder *Status - wgProxyFactory *wgproxy.Factory wgProxy wgproxy.Proxy + signaler *internal.Signaler + allowedIPsIP string + handshaker *Handshaker + closeCh chan struct{} - adapter iface.TunAdapter - iFaceDiscover stdnet.ExternalIFaceDiscover - sentExtraSrflx bool + onConnected func(remoteWireGuardKey string, remoteRosenpassPubKey []byte, wireGuardIP string, remoteRosenpassAddr string) + onDisconnected func(remotePeer string, wgIP string) + + status ConnStatus + + connectorICE *ConnectorICE + connectorRelay *ConnectorRelay connID nbnet.ConnectionID beforeAddPeerHooks []BeforeAddPeerHookFunc afterRemovePeerHooks []AfterRemovePeerHookFunc - relayManager *relayClient.Manager + currentConnType ConnPriority } // NewConn creates a new not opened Conn to the remote peer. // To establish a connection run Conn.Open -func NewConn(config ConnConfig, statusRecorder *Status, wgProxyFactory *wgproxy.Factory, adapter iface.TunAdapter, iFaceDiscover stdnet.ExternalIFaceDiscover, relayManager *relayClient.Manager) (*Conn, error) { - return &Conn{ +func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Status, wgProxyFactory *wgproxy.Factory, signaler *internal.Signaler, iFaceDiscover stdnet.ExternalIFaceDiscover, relayManager *relayClient.Manager) (*Conn, error) { + _, allowedIPsIP, err := net.ParseCIDR(config.WgConfig.AllowedIps) + if err != nil { + log.Errorf("failed to parse allowedIPS: %v", err) + return nil, err + } + + ctx, ctxCancel := context.WithCancel(engineCtx) + + var conn = &Conn{ + log: log.WithField("peer", config.Key), + ctx: ctx, + ctxCancel: ctxCancel, config: config, - mu: sync.Mutex{}, - status: StatusDisconnected, - closeCh: make(chan struct{}), - remoteOffersCh: make(chan OfferAnswer), - remoteAnswerCh: make(chan OfferAnswer), statusRecorder: statusRecorder, wgProxyFactory: wgProxyFactory, - adapter: adapter, - iFaceDiscover: iFaceDiscover, - relayManager: relayManager, - }, nil + signaler: signaler, + allowedIPsIP: allowedIPsIP.String(), + handshaker: NewHandshaker(ctx, config, signaler), + status: StatusDisconnected, + closeCh: make(chan struct{}), + } + conn.connectorICE = NewConnectorICE(ctx, conn.log, config, config.ICEConfig, signaler, iFaceDiscover, statusRecorder, conn.iCEConnectionIsReady, conn.doHandshake) + conn.connectorRelay = NewConnectorRelay(ctx, conn.log, relayManager, config, conn.relayConnectionIsReady, conn.doHandshake) + return conn, nil } -// Open opens connection to the remote peer starting ICE candidate gathering process. -// Blocks until connection has been closed or connection timeout. -// ConnStatus will be set accordingly -func (conn *Conn) Open(ctx context.Context) error { - log.Debugf("trying to connect to peer %s", conn.config.Key) +// Open opens connection to the remote peer +// It will try to establish a connection using ICE and in parallel with relay. The higher priority connection type will +// be used. +// todo implement on disconnected event from ICE and relay too. +func (conn *Conn) Open() { + conn.log.Debugf("trying to connect to peer") peerState := State{ PubKey: conn.config.Key, IP: strings.Split(conn.config.WgConfig.AllowedIps, "/")[0], ConnStatusUpdate: time.Now(), - ConnStatus: conn.status, + ConnStatus: StatusDisconnected, Mux: new(sync.RWMutex), } err := conn.statusRecorder.UpdatePeerState(peerState) if err != nil { - log.Warnf("error while updating the state of peer %s,err: %v", conn.config.Key, err) + conn.log.Warnf("error while updating the state err: %v", err) } - defer func() { - err := conn.cleanup() + /* + peerState = State{ + PubKey: conn.config.Key, + ConnStatus: StatusConnecting, + ConnStatusUpdate: time.Now(), + Mux: new(sync.RWMutex), + } + err = conn.statusRecorder.UpdatePeerState(peerState) if err != nil { - log.Warnf("error while cleaning up peer connection %s: %v", conn.config.Key, err) - return + log.Warnf("error while updating the state of peer %s,err: %v", conn.config.Key, err) } - }() - - err = conn.sendOffer() - if err != nil { - return err + */ + relayIsSupportedLocally := conn.connectorRelay.RelayIsSupported() + if relayIsSupportedLocally { + go conn.connectorRelay.SetupRelayConnection() } - - log.Debugf("connection offer sent to peer %s, waiting for the confirmation", conn.config.Key) - - // Only continue once we got a connection confirmation from the remote peer. - // The connection timeout could have happened before a confirmation received from the remote. - // The connection could have also been closed externally (e.g. when we received an update from the management that peer shouldn't be connected) - remoteOfferAnswer, err := conn.waitForRemoteOfferConfirmation() - if err != nil { - return err - } - - log.Debugf("received connection confirmation from peer %s running version %s and with remote WireGuard listen port %d", - conn.config.Key, remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort) - - // at this point we received offer/answer and we are ready to gather candidates - conn.mu.Lock() - conn.status = StatusConnecting - conn.ctx, conn.notifyDisconnected = context.WithCancel(ctx) - defer conn.notifyDisconnected() - conn.mu.Unlock() - - peerState = State{ - PubKey: conn.config.Key, - ConnStatus: conn.status, - ConnStatusUpdate: time.Now(), - Mux: new(sync.RWMutex), - } - err = conn.statusRecorder.UpdatePeerState(peerState) - if err != nil { - log.Warnf("error while updating the state of peer %s,err: %v", conn.config.Key, err) - } - - // in edge case this function can block while the manager set up a new relay server connection - relayOperate := conn.setupRelayConnection(remoteOfferAnswer) - - err = conn.setupICEConnection(remoteOfferAnswer, relayOperate) - if err != nil { - log.Errorf("failed to setup ICE connection: %s", err) - if !relayOperate { - return err - } - } - - // wait until connection disconnected or has been closed externally (upper layer, e.g. engine) - err = conn.waitForDisconnection() - return err -} - -func (conn *Conn) AddBeforeAddPeerHook(hook BeforeAddPeerHookFunc) { - conn.beforeAddPeerHooks = append(conn.beforeAddPeerHooks, hook) -} - -func (conn *Conn) AddAfterRemovePeerHook(hook AfterRemovePeerHookFunc) { - conn.afterRemovePeerHooks = append(conn.afterRemovePeerHooks, hook) -} - -// GetConf returns the connection config -func (conn *Conn) GetConf() ConnConfig { - return conn.config -} - -// WgConfig returns the WireGuard config -func (conn *Conn) WgConfig() WgConfig { - return conn.config.WgConfig -} - -// UpdateStunTurn update the turn and stun addresses -func (conn *Conn) UpdateStunTurn(turnStun []*stun.URI) { - conn.config.StunTurn = turnStun -} - -// SetSignalOffer sets a handler function to be triggered by Conn when a new connection offer has to be signalled to the remote peer -func (conn *Conn) SetSignalOffer(handler func(offer OfferAnswer) error) { - conn.signalOffer = handler -} - -// SetOnConnected sets a handler function to be triggered by Conn when a new connection to a remote peer established -func (conn *Conn) SetOnConnected(handler func(remoteWireGuardKey string, remoteRosenpassPubKey []byte, wireGuardIP string, remoteRosenpassAddr string)) { - conn.onConnected = handler -} - -// SetOnDisconnected sets a handler function to be triggered by Conn when a connection to a remote disconnected -func (conn *Conn) SetOnDisconnected(handler func(remotePeer string, wgIP string)) { - conn.onDisconnected = handler -} - -// SetSignalAnswer sets a handler function to be triggered by Conn when a new connection answer has to be signalled to the remote peer -func (conn *Conn) SetSignalAnswer(handler func(answer OfferAnswer) error) { - conn.signalAnswer = handler -} - -// SetSignalCandidate sets a handler function to be triggered by Conn when a new ICE local connection candidate has to be signalled to the remote peer -func (conn *Conn) SetSignalCandidate(handler func(candidate ice.Candidate) error) { - conn.signalCandidate = handler -} - -// SetSendSignalMessage sets a handler function to be triggered by Conn when there is new message to send via signal -func (conn *Conn) SetSendSignalMessage(handler func(message *sProto.Message) error) { - conn.sendSignalMessage = handler + go conn.connectorICE.SetupICEConnection(relayIsSupportedLocally) } // Close closes this peer Conn issuing a close event to the Conn closeCh -func (conn *Conn) Close() error { +func (conn *Conn) Close() { conn.mu.Lock() - defer conn.mu.Unlock() - select { - case conn.closeCh <- struct{}{}: - return nil - default: - // probably could happen when peer has been added and removed right after not even starting to connect - // todo further investigate - // this really happens due to unordered messages coming from management - // more importantly it causes inconsistency -> 2 Conn objects for the same peer - // e.g. this flow: - // update from management has peers: [1,2,3,4] - // engine creates a Conn for peers: [1,2,3,4] and schedules Open in ~1sec - // before conn.Open() another update from management arrives with peers: [1,2,3] - // engine removes peer 4 and calls conn.Close() which does nothing (this default clause) - // before conn.Open() another update from management arrives with peers: [1,2,3,4,5] - // engine adds a new Conn for 4 and 5 - // therefore peer 4 has 2 Conn objects - log.Warnf("Connection has been already closed or attempted closing not started connection %s", conn.config.Key) - return NewConnectionAlreadyClosed(conn.config.Key) - } -} - -// Status returns current status of the Conn -func (conn *Conn) Status() ConnStatus { - conn.mu.Lock() - defer conn.mu.Unlock() - return conn.status -} - -// OnRemoteOffer handles an offer from the remote peer and returns true if the message was accepted, false otherwise -// doesn't block, discards the message if connection wasn't ready -func (conn *Conn) OnRemoteOffer(offer OfferAnswer) bool { - log.Debugf("OnRemoteOffer from peer %s on status %s", conn.config.Key, conn.status.String()) - - select { - case conn.remoteOffersCh <- offer: - return true - default: - log.Debugf("OnRemoteOffer skipping message from peer %s on status %s because is not ready", conn.config.Key, conn.status.String()) - // connection might not be ready yet to receive so we ignore the message - return false - } -} - -// OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise -// doesn't block, discards the message if connection wasn't ready -func (conn *Conn) OnRemoteAnswer(answer OfferAnswer) bool { - log.Debugf("OnRemoteAnswer from peer %s on status %s", conn.config.Key, conn.status.String()) - - select { - case conn.remoteAnswerCh <- answer: - return true - default: - // connection might not be ready yet to receive so we ignore the message - log.Debugf("OnRemoteAnswer skipping message from peer %s on status %s because is not ready", conn.config.Key, conn.status.String()) - return false - } -} - -// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer. -func (conn *Conn) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HAMap) { - log.Debugf("OnRemoteCandidate from peer %s -> %s", conn.config.Key, candidate.String()) - go func() { - conn.mu.Lock() - defer conn.mu.Unlock() - - if conn.agent == nil { - return - } - - if candidateViaRoutes(candidate, haRoutes) { - return - } - - err := conn.agent.AddRemoteCandidate(candidate) - if err != nil { - log.Errorf("error while handling remote candidate from peer %s", conn.config.Key) - return - } - }() -} - -func (conn *Conn) GetKey() string { - return conn.config.Key -} - -func (conn *Conn) reCreateAgent(relaySupport []ice.CandidateType) error { - conn.mu.Lock() - defer conn.mu.Unlock() - - failedTimeout := 6 * time.Second - - var err error - transportNet, err := conn.newStdNet() - if err != nil { - log.Errorf("failed to create pion's stdnet: %s", err) - } - - iceKeepAlive := iceKeepAlive() - iceDisconnectedTimeout := iceDisconnectedTimeout() - iceRelayAcceptanceMinWait := iceRelayAcceptanceMinWait() - - agentConfig := &ice.AgentConfig{ - MulticastDNSMode: ice.MulticastDNSModeDisabled, - NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4, ice.NetworkTypeUDP6}, - Urls: conn.config.StunTurn, - CandidateTypes: candidateTypes(), - FailedTimeout: &failedTimeout, - InterfaceFilter: stdnet.InterfaceFilter(conn.config.InterfaceBlackList), - UDPMux: conn.config.UDPMux, - UDPMuxSrflx: conn.config.UDPMuxSrflx, - NAT1To1IPs: conn.config.NATExternalIPs, - Net: transportNet, - DisconnectedTimeout: &iceDisconnectedTimeout, - KeepaliveInterval: &iceKeepAlive, - RelayAcceptanceMinWait: &iceRelayAcceptanceMinWait, - } - - if conn.config.DisableIPv6Discovery { - agentConfig.NetworkTypes = []ice.NetworkType{ice.NetworkTypeUDP4} - } - - conn.agent, err = ice.NewAgent(agentConfig) - if err != nil { - return err - } - - err = conn.agent.OnCandidate(conn.onICECandidate) - if err != nil { - return err - } - - err = conn.agent.OnConnectionStateChange(conn.onICEConnectionStateChange) - if err != nil { - return err - } - - err = conn.agent.OnSelectedCandidatePairChange(conn.onICESelectedCandidatePair) - if err != nil { - return err - } - - err = conn.agent.OnSuccessfulSelectedPairBindingResponse(func(p *ice.CandidatePair) { - err := conn.statusRecorder.UpdateLatency(conn.config.Key, p.Latency()) - if err != nil { - log.Debugf("failed to update latency for peer %s: %s", conn.config.Key, err) - return - } - }) - if err != nil { - return fmt.Errorf("failed setting binding response callback: %w", err) - } - - return nil -} - -func (conn *Conn) configureWgConnectionForRelay(remoteConn net.Conn, remoteRosenpassPubKey []byte, remoteRosenpassAddr string) error { - conn.mu.Lock() - defer conn.mu.Unlock() - - conn.wgProxy = conn.wgProxyFactory.GetProxy(conn.ctx) - endpoint, err := conn.wgProxy.AddTurnConn(remoteConn) - if err != nil { - return err - } - - endpointUdpAddr, _ := net.ResolveUDPAddr(endpoint.Network(), endpoint.String()) - log.Debugf("Conn resolved IP for %s: %s", endpoint, endpointUdpAddr.IP) - - conn.connID = nbnet.GenerateConnID() - for _, hook := range conn.beforeAddPeerHooks { - if err := hook(conn.connID, endpointUdpAddr.IP); err != nil { - log.Errorf("Before add peer hook failed: %v", err) - } - } - - err = conn.config.WgConfig.WgInterface.UpdatePeer(conn.config.WgConfig.RemoteKey, conn.config.WgConfig.AllowedIps, defaultWgKeepAlive, endpointUdpAddr, conn.config.WgConfig.PreSharedKey) - if err != nil { - if conn.wgProxy != nil { - if err := conn.wgProxy.CloseConn(); err != nil { - log.Warnf("Failed to close relay connection: %v", err) - } - } - // todo: is this nil correct? - return nil - } - - conn.status = StatusConnected - - peerState := State{ - PubKey: conn.config.Key, - ConnStatus: StatusConnected, - ConnStatusUpdate: time.Now(), - LocalIceCandidateType: "", - RemoteIceCandidateType: "", - LocalIceCandidateEndpoint: "", - RemoteIceCandidateEndpoint: "", - Direct: false, - RosenpassEnabled: isRosenpassEnabled(remoteRosenpassPubKey), - Mux: new(sync.RWMutex), - Relayed: true, - } - - err = conn.statusRecorder.UpdatePeerState(peerState) - if err != nil { - log.Warnf("unable to save peer's state, got error: %v", err) - } - - _, ipNet, err := net.ParseCIDR(conn.config.WgConfig.AllowedIps) - if err != nil { - return nil - } - - if runtime.GOOS == "ios" { - runtime.GC() - } - - if conn.onConnected != nil { - conn.onConnected(conn.config.Key, remoteRosenpassPubKey, ipNet.IP.String(), remoteRosenpassAddr) - } - - return nil -} - -// configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected -func (conn *Conn) configureConnection(remoteConn net.Conn, remoteWgPort int, remoteRosenpassPubKey []byte, remoteRosenpassAddr string) (net.Addr, error) { - conn.mu.Lock() - defer conn.mu.Unlock() - - pair, err := conn.agent.GetSelectedCandidatePair() - if err != nil { - return nil, err - } - - var endpoint net.Addr - if isRelayCandidate(pair.Local) { - log.Debugf("setup relay connection") - conn.wgProxy = conn.wgProxyFactory.GetProxy(conn.ctx) - endpoint, err = conn.wgProxy.AddTurnConn(remoteConn) - if err != nil { - return nil, err - } - } else { - // To support old version's with direct mode we attempt to punch an additional role with the remote WireGuard port - go conn.punchRemoteWGPort(pair, remoteWgPort) - endpoint = remoteConn.RemoteAddr() - } - - endpointUdpAddr, _ := net.ResolveUDPAddr(endpoint.Network(), endpoint.String()) - log.Debugf("Conn resolved IP for %s: %s", endpoint, endpointUdpAddr.IP) - - conn.connID = nbnet.GenerateConnID() - for _, hook := range conn.beforeAddPeerHooks { - if err := hook(conn.connID, endpointUdpAddr.IP); err != nil { - log.Errorf("Before add peer hook failed: %v", err) - } - } - - err = conn.config.WgConfig.WgInterface.UpdatePeer(conn.config.WgConfig.RemoteKey, conn.config.WgConfig.AllowedIps, defaultWgKeepAlive, endpointUdpAddr, conn.config.WgConfig.PreSharedKey) - if err != nil { - if conn.wgProxy != nil { - if err := conn.wgProxy.CloseConn(); err != nil { - log.Warnf("Failed to close turn connection: %v", err) - } - } - return nil, fmt.Errorf("update peer: %w", err) - } - - conn.status = StatusConnected - rosenpassEnabled := false - if remoteRosenpassPubKey != nil { - rosenpassEnabled = true - } - - peerState := State{ - PubKey: conn.config.Key, - ConnStatus: conn.status, - ConnStatusUpdate: time.Now(), - LocalIceCandidateType: pair.Local.Type().String(), - RemoteIceCandidateType: pair.Remote.Type().String(), - LocalIceCandidateEndpoint: fmt.Sprintf("%s:%d", pair.Local.Address(), pair.Local.Port()), - RemoteIceCandidateEndpoint: fmt.Sprintf("%s:%d", pair.Remote.Address(), pair.Remote.Port()), - Direct: !isRelayCandidate(pair.Local), - RosenpassEnabled: rosenpassEnabled, - Mux: new(sync.RWMutex), - } - if pair.Local.Type() == ice.CandidateTypeRelay || pair.Remote.Type() == ice.CandidateTypeRelay { - peerState.Relayed = true - } - - err = conn.statusRecorder.UpdatePeerState(peerState) - if err != nil { - log.Warnf("unable to save peer's state, got error: %v", err) - } - - _, ipNet, err := net.ParseCIDR(conn.config.WgConfig.AllowedIps) - if err != nil { - return nil, err - } - - if runtime.GOOS == "ios" { - runtime.GC() - } - - if conn.onConnected != nil { - conn.onConnected(conn.config.Key, remoteRosenpassPubKey, ipNet.IP.String(), remoteRosenpassAddr) - } - - return endpoint, nil -} - -func (conn *Conn) punchRemoteWGPort(pair *ice.CandidatePair, remoteWgPort int) { - // wait local endpoint configuration - time.Sleep(time.Second) - addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", pair.Remote.Address(), remoteWgPort)) - if err != nil { - log.Warnf("got an error while resolving the udp address, err: %s", err) - return - } - - mux, ok := conn.config.UDPMuxSrflx.(*bind.UniversalUDPMuxDefault) - if !ok { - log.Warn("invalid udp mux conversion") - return - } - _, err = mux.GetSharedConn().WriteTo([]byte{0x6e, 0x62}, addr) - if err != nil { - log.Warnf("got an error while sending the punch packet, err: %s", err) - } -} - -func (conn *Conn) waitForDisconnection() error { - select { - case <-conn.closeCh: - // closed externally - return NewConnectionClosedError(conn.config.Key) - case <-conn.ctx.Done(): - // disconnected from the remote peer - return NewConnectionDisconnectedError(conn.config.Key) - } -} - -// cleanup closes all open resources and sets status to StatusDisconnected -func (conn *Conn) cleanup() error { - log.Debugf("trying to cleanup %s", conn.config.Key) - conn.mu.Lock() - defer conn.mu.Unlock() - - conn.sentExtraSrflx = false - - var err1, err2, err3 error - if conn.agent != nil { - err1 = conn.agent.Close() - if err1 == nil { - conn.agent = nil - } - } + conn.ctxCancel() if conn.wgProxy != nil { - err2 = conn.wgProxy.CloseConn() + err := conn.wgProxy.CloseConn() + if err != nil { + conn.log.Errorf("failed to close wg proxy: %v", err) + } conn.wgProxy = nil } // todo: is it problem if we try to remove a peer what is never existed? - err3 = conn.config.WgConfig.WgInterface.RemovePeer(conn.config.WgConfig.RemoteKey) + err := conn.config.WgConfig.WgInterface.RemovePeer(conn.config.WgConfig.RemoteKey) + if err != nil { + conn.log.Errorf("failed to remove wg endpoint: %v", err) + } if conn.connID != "" { for _, hook := range conn.afterRemovePeerHooks { if err := hook(conn.connID); err != nil { - log.Errorf("After remove peer hook failed: %v", err) + conn.log.Errorf("After remove peer hook failed: %v", err) } } - } - conn.connID = "" - - if conn.notifyDisconnected != nil { - conn.notifyDisconnected() - conn.notifyDisconnected = nil + conn.connID = "" } if conn.status == StatusConnected && conn.onDisconnected != nil { @@ -690,298 +200,240 @@ func (conn *Conn) cleanup() error { ConnStatusUpdate: time.Now(), Mux: new(sync.RWMutex), } - err := conn.statusRecorder.UpdatePeerState(peerState) + err = conn.statusRecorder.UpdatePeerState(peerState) if err != nil { // pretty common error because by that time Engine can already remove the peer and status won't be available. // todo rethink status updates - log.Debugf("error while updating peer's %s state, err: %v", conn.config.Key, err) + conn.log.Debugf("error while updating peer's state, err: %v", err) } if err := conn.statusRecorder.UpdateWireGuardPeerState(conn.config.Key, iface.WGStats{}); err != nil { - log.Debugf("failed to reset wireguard stats for peer %s: %s", conn.config.Key, err) + conn.log.Debugf("failed to reset wireguard stats for peer: %s", err) } - log.Debugf("cleaned up connection to peer %s", conn.config.Key) - if err1 != nil { - return err1 - } - if err2 != nil { - return err2 - } - return err3 + conn.mu.Unlock() } -// onICECandidate is a callback attached to an ICE Agent to receive new local connection candidates -// and then signals them to the remote peer -func (conn *Conn) onICECandidate(candidate ice.Candidate) { - // nil means candidate gathering has been ended - if candidate == nil { - return - } - - // TODO: reported port is incorrect for CandidateTypeHost, makes understanding ICE use via logs confusing as port is ignored - log.Debugf("discovered local candidate %s", candidate.String()) - go func() { - err := conn.signalCandidate(candidate) - if err != nil { - log.Errorf("failed signaling candidate to the remote peer %s %s", conn.config.Key, err) - } - }() - - if !conn.shouldSendExtraSrflxCandidate(candidate) { - return - } - - // sends an extra server reflexive candidate to the remote peer with our related port (usually the wireguard port) - // this is useful when network has an existing port forwarding rule for the wireguard port and this peer - extraSrflx, err := extraSrflxCandidate(candidate) - if err != nil { - log.Errorf("failed creating extra server reflexive candidate %s", err) - return - } - conn.sentExtraSrflx = true - - go func() { - err = conn.signalCandidate(extraSrflx) - if err != nil { - log.Errorf("failed signaling the extra server reflexive candidate to the remote peer %s: %s", conn.config.Key, err) - } - }() +// OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise +// doesn't block, discards the message if connection wasn't ready +func (conn *Conn) OnRemoteAnswer(answer OfferAnswer) bool { + conn.log.Debugf("OnRemoteAnswer, status %s", conn.status.String()) + return conn.handshaker.OnRemoteAnswer(answer) } -func (conn *Conn) onICESelectedCandidatePair(c1 ice.Candidate, c2 ice.Candidate) { - log.Debugf("selected candidate pair [local <-> remote] -> [%s <-> %s], peer %s", c1.String(), c2.String(), - conn.config.Key) +// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer. +func (conn *Conn) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HAMap) { + conn.connectorICE.OnRemoteCandidate(candidate, haRoutes) } -// onICEConnectionStateChange registers callback of an ICE Agent to track connection state -func (conn *Conn) onICEConnectionStateChange(state ice.ConnectionState) { - log.Debugf("peer %s ICE ConnectionState has changed to %s", conn.config.Key, state.String()) - if state == ice.ConnectionStateFailed || state == ice.ConnectionStateDisconnected { - conn.notifyDisconnected() - } +func (conn *Conn) AddBeforeAddPeerHook(hook BeforeAddPeerHookFunc) { + conn.beforeAddPeerHooks = append(conn.beforeAddPeerHooks, hook) } -func (conn *Conn) sendAnswer() error { +func (conn *Conn) AddAfterRemovePeerHook(hook AfterRemovePeerHookFunc) { + conn.afterRemovePeerHooks = append(conn.afterRemovePeerHooks, hook) +} + +// SetOnConnected sets a handler function to be triggered by Conn when a new connection to a remote peer established +func (conn *Conn) SetOnConnected(handler func(remoteWireGuardKey string, remoteRosenpassPubKey []byte, wireGuardIP string, remoteRosenpassAddr string)) { + conn.onConnected = handler +} + +// SetOnDisconnected sets a handler function to be triggered by Conn when a connection to a remote disconnected +func (conn *Conn) SetOnDisconnected(handler func(remotePeer string, wgIP string)) { + conn.onDisconnected = handler +} + +func (conn *Conn) OnRemoteOffer(offer OfferAnswer) bool { + conn.log.Debugf("OnRemoteOffer, on status %s", conn.status.String()) + return conn.handshaker.OnRemoteOffer(offer) +} + +// WgConfig returns the WireGuard config +func (conn *Conn) WgConfig() WgConfig { + return conn.config.WgConfig +} + +// Status returns current status of the Conn +func (conn *Conn) Status() ConnStatus { + conn.mu.Lock() + defer conn.mu.Unlock() + return conn.status +} + +func (conn *Conn) GetKey() string { + return conn.config.Key +} + +func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) { conn.mu.Lock() defer conn.mu.Unlock() - localUFrag, localPwd, err := conn.agent.GetLocalUserCredentials() - if err != nil { - return err + if conn.ctx.Err() != nil { + return } - log.Debugf("sending answer to %s", conn.config.Key) - err = conn.signalAnswer(OfferAnswer{ - IceCredentials: IceCredentials{localUFrag, localPwd}, - WgListenPort: conn.config.LocalWgPort, - Version: version.NetbirdVersion(), - RosenpassPubKey: conn.config.RosenpassPubKey, - RosenpassAddr: conn.config.RosenpassAddr, - }) - if err != nil { - return err + if conn.currentConnType > connPriorityRelay { + return } - return nil + wgProxy := conn.wgProxyFactory.GetProxy(conn.ctx) + endpoint, err := wgProxy.AddTurnConn(rci.relayedConn) + if err != nil { + conn.log.Errorf("failed to add relayed net.Conn to local proxy: %v", err) + return + } + + endpointUdpAddr, _ := net.ResolveUDPAddr(endpoint.Network(), endpoint.String()) + conn.log.Debugf("conn resolved IP for %s: %s", endpoint, endpointUdpAddr.IP) + + conn.connID = nbnet.GenerateConnID() + for _, hook := range conn.beforeAddPeerHooks { + if err := hook(conn.connID, endpointUdpAddr.IP); err != nil { + conn.log.Errorf("Before add peer hook failed: %v", err) + } + } + + err = conn.config.WgConfig.WgInterface.UpdatePeer(conn.config.WgConfig.RemoteKey, conn.config.WgConfig.AllowedIps, defaultWgKeepAlive, endpointUdpAddr, conn.config.WgConfig.PreSharedKey) + if err != nil { + if err := wgProxy.CloseConn(); err != nil { + conn.log.Warnf("Failed to close relay connection: %v", err) + } + conn.log.Errorf("Failed to update wg peer configuration: %v", err) + return + } + + if conn.wgProxy != nil { + if err := conn.wgProxy.CloseConn(); err != nil { + conn.log.Warnf("failed to close depracated wg proxy conn: %v", err) + } + } + conn.wgProxy = wgProxy + + conn.currentConnType = connPriorityRelay + + peerState := State{ + Direct: false, + Relayed: true, + } + + conn.updateStatus(peerState, rci.rosenpassPubKey, rci.rosenpassAddr) } -// sendOffer prepares local user credentials and signals them to the remote peer -func (conn *Conn) sendOffer() error { +// configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected +func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICEConnInfo) { conn.mu.Lock() defer conn.mu.Unlock() - localUFrag, localPwd, err := conn.agent.GetLocalUserCredentials() - if err != nil { - return err - } - oa := OfferAnswer{ - IceCredentials: IceCredentials{localUFrag, localPwd}, - WgListenPort: conn.config.LocalWgPort, - Version: version.NetbirdVersion(), - RosenpassPubKey: conn.config.RosenpassPubKey, - RosenpassAddr: conn.config.RosenpassAddr, + if conn.ctx.Err() != nil { + return } - relayIPAddress, err := conn.relayManager.RelayAddress() - if err == nil { - oa.RelaySrvAddress = relayIPAddress.String() + if conn.currentConnType > priority { + return } - return conn.signalOffer(oa) -} - -func (conn *Conn) shouldSendExtraSrflxCandidate(candidate ice.Candidate) bool { - if !conn.sentExtraSrflx && candidate.Type() == ice.CandidateTypeServerReflexive && candidate.Port() != candidate.RelatedAddress().Port { - return true - } - return false -} - -func (conn *Conn) waitForRemoteOfferConfirmation() (*OfferAnswer, error) { - var remoteOfferAnswer OfferAnswer - select { - case remoteOfferAnswer = <-conn.remoteOffersCh: - // received confirmation from the remote peer -> ready to proceed - err := conn.sendAnswer() + var ( + endpoint net.Addr + wgProxy wgproxy.Proxy + ) + if iceConnInfo.RelayedOnLocal { + conn.log.Debugf("setup ice turn connection") + wgProxy = conn.wgProxyFactory.GetProxy(conn.ctx) + ep, err := conn.wgProxy.AddTurnConn(iceConnInfo.RemoteConn) if err != nil { - return nil, err + conn.log.Errorf("failed to add turn net.Conn to local proxy: %v", err) + return } - case remoteOfferAnswer = <-conn.remoteAnswerCh: - case <-time.After(conn.config.Timeout): - return nil, NewConnectionTimeoutError(conn.config.Key, conn.config.Timeout) - case <-conn.closeCh: - // closed externally - return nil, NewConnectionClosedError(conn.config.Key) - } - - return &remoteOfferAnswer, nil -} - -func (conn *Conn) turnAgentDial(remoteOfferAnswer *OfferAnswer) (*ice.Conn, error) { - isControlling := conn.config.LocalKey > conn.config.Key - if isControlling { - return conn.agent.Dial(conn.ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd) + endpoint = ep } else { - return conn.agent.Accept(conn.ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd) - } -} - -func (conn *Conn) setupRelayConnection(remoteOfferAnswer *OfferAnswer) bool { - if !isRelaySupported(remoteOfferAnswer) { - return false + endpoint = iceConnInfo.RemoteConn.RemoteAddr() } - currentRelayAddress, err := conn.relayManager.RelayAddress() - if err != nil { - return false - } + endpointUdpAddr, _ := net.ResolveUDPAddr(endpoint.Network(), endpoint.String()) + conn.log.Debugf("Conn resolved IP for %s: %s", endpoint, endpointUdpAddr.IP) - conn.preferredRelayServer(currentRelayAddress.String(), remoteOfferAnswer.RelaySrvAddress) - relayConn, err := conn.relayManager.OpenConn(remoteOfferAnswer.RelaySrvAddress, conn.config.Key) - if err != nil { - return false - } - - err = conn.configureWgConnectionForRelay(relayConn, remoteOfferAnswer.RosenpassPubKey, remoteOfferAnswer.RosenpassAddr) - if err != nil { - log.Errorf("failed to configure WireGuard connection for relay: %s", err) - return false - } - return true -} - -func (conn *Conn) preferredRelayServer(myRelayAddress, remoteRelayAddress string) string { - if conn.config.LocalKey > conn.config.Key { - return myRelayAddress - } - return remoteRelayAddress -} - -func (conn *Conn) setupICEConnection(remoteOfferAnswer *OfferAnswer, relayOperate bool) error { - var preferredCandidateTypes []ice.CandidateType - if relayOperate { - preferredCandidateTypes = candidateTypesP2P() - } else { - preferredCandidateTypes = candidateTypes() - } - - err := conn.reCreateAgent(preferredCandidateTypes) - if err != nil { - return err - } - - err = conn.agent.GatherCandidates() - if err != nil { - return fmt.Errorf("gather candidates: %v", err) - } - - // will block until connection succeeded - // but it won't release if ICE Agent went into Disconnected or Failed state, - // so we have to cancel it with the provided context once agent detected a broken connection - remoteConn, err := conn.turnAgentDial(remoteOfferAnswer) - if err != nil { - return err - } - - // dynamically set remote WireGuard port if other side specified a different one from the default one - remoteWgPort := iface.DefaultWgPort - if remoteOfferAnswer.WgListenPort != 0 { - remoteWgPort = remoteOfferAnswer.WgListenPort - } - - // the ice connection has been established successfully so we are ready to start the proxy - remoteAddr, err := conn.configureConnection(remoteConn, remoteWgPort, remoteOfferAnswer.RosenpassPubKey, - remoteOfferAnswer.RosenpassAddr) - if err != nil { - return err - } - - log.Infof("connected to peer %s, endpoint address: %s", conn.config.Key, remoteAddr.String()) - return nil -} - -func extraSrflxCandidate(candidate ice.Candidate) (*ice.CandidateServerReflexive, error) { - relatedAdd := candidate.RelatedAddress() - return ice.NewCandidateServerReflexive(&ice.CandidateServerReflexiveConfig{ - Network: candidate.NetworkType().String(), - Address: candidate.Address(), - Port: relatedAdd.Port, - Component: candidate.Component(), - RelAddr: relatedAdd.Address, - RelPort: relatedAdd.Port, - }) -} - -func candidateViaRoutes(candidate ice.Candidate, clientRoutes route.HAMap) bool { - var routePrefixes []netip.Prefix - for _, routes := range clientRoutes { - if len(routes) > 0 && routes[0] != nil { - routePrefixes = append(routePrefixes, routes[0].Network) + conn.connID = nbnet.GenerateConnID() + for _, hook := range conn.beforeAddPeerHooks { + if err := hook(conn.connID, endpointUdpAddr.IP); err != nil { + conn.log.Errorf("Before add peer hook failed: %v", err) } } - addr, err := netip.ParseAddr(candidate.Address()) + err := conn.config.WgConfig.WgInterface.UpdatePeer(conn.config.WgConfig.RemoteKey, conn.config.WgConfig.AllowedIps, defaultWgKeepAlive, endpointUdpAddr, conn.config.WgConfig.PreSharedKey) if err != nil { - log.Errorf("Failed to parse IP address %s: %v", candidate.Address(), err) - return false + if wgProxy != nil { + if err := wgProxy.CloseConn(); err != nil { + conn.log.Warnf("Failed to close turn connection: %v", err) + } + } + conn.log.Warnf("Failed to update wg peer configuration: %v", err) + return } - for _, prefix := range routePrefixes { - // default route is - if prefix.Bits() == 0 { - continue - } - - if prefix.Contains(addr) { - log.Debugf("Ignoring candidate [%s], its address is part of routed network %s", candidate.String(), prefix) - return true + if conn.wgProxy != nil { + if err := conn.wgProxy.CloseConn(); err != nil { + conn.log.Warnf("failed to close depracated wg proxy conn: %v", err) } } - return false + conn.wgProxy = wgProxy + + conn.currentConnType = priority + + peerState := State{ + LocalIceCandidateType: iceConnInfo.LocalIceCandidateType, + RemoteIceCandidateType: iceConnInfo.RemoteIceCandidateType, + LocalIceCandidateEndpoint: iceConnInfo.LocalIceCandidateEndpoint, + RemoteIceCandidateEndpoint: iceConnInfo.RemoteIceCandidateEndpoint, + Direct: iceConnInfo.Direct, + Relayed: iceConnInfo.Relayed, + } + + conn.updateStatus(peerState, iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr) } -func candidateTypes() []ice.CandidateType { - if hasICEForceRelayConn() { - return []ice.CandidateType{ice.CandidateTypeRelay} +func (conn *Conn) updateStatus(peerState State, remoteRosenpassPubKey []byte, remoteRosenpassAddr string) { + conn.status = StatusConnected + + peerState.PubKey = conn.config.Key + peerState.ConnStatus = StatusConnected + peerState.ConnStatusUpdate = time.Now() + peerState.RosenpassEnabled = isRosenpassEnabled(remoteRosenpassPubKey) + peerState.Mux = new(sync.RWMutex) + + err := conn.statusRecorder.UpdatePeerState(peerState) + if err != nil { + conn.log.Warnf("unable to save peer's state, got error: %v", err) } - // TODO: remove this once we have refactored userspace proxy into the bind package + if runtime.GOOS == "ios" { - return []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive} + runtime.GC() } - return []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive, ice.CandidateTypeRelay} + + if conn.onConnected != nil { + conn.onConnected(conn.config.Key, remoteRosenpassPubKey, conn.allowedIPsIP, remoteRosenpassAddr) + } + return } -func candidateTypesP2P() []ice.CandidateType { - return []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive} -} +func (conn *Conn) doHandshake() (*OfferAnswer, error) { + if !conn.signaler.Ready() { + return nil, ErrSignalIsNotReady + } -func isRelayCandidate(candidate ice.Candidate) bool { - return candidate.Type() == ice.CandidateTypeRelay -} + uFreg, pwd, err := conn.connectorICE.GetLocalUserCredentials() + if err != nil { + conn.log.Errorf("failed to get local user credentials: %v", err) + } -// todo check my side too -func isRelaySupported(answer *OfferAnswer) bool { - return answer.RelaySrvAddress != "" + addr, err := conn.connectorRelay.RelayAddress() + if err != nil { + conn.log.Errorf("failed to get local relay address: %v", err) + } + return conn.handshaker.Handshake(HandshakeArgs{ + uFreg, + pwd, + addr.String(), + }) } func isRosenpassEnabled(remoteRosenpassPubKey []byte) bool { diff --git a/client/internal/peer/conn_ice.go b/client/internal/peer/conn_ice.go new file mode 100644 index 000000000..e3d33a4d7 --- /dev/null +++ b/client/internal/peer/conn_ice.go @@ -0,0 +1,436 @@ +package peer + +import ( + "context" + "errors" + "fmt" + "math/rand" + "net" + "net/netip" + "runtime" + "sync/atomic" + "time" + + "github.com/pion/ice/v3" + "github.com/pion/stun/v2" + log "github.com/sirupsen/logrus" + + "github.com/netbirdio/netbird/client/internal" + "github.com/netbirdio/netbird/client/internal/stdnet" + "github.com/netbirdio/netbird/iface" + "github.com/netbirdio/netbird/iface/bind" + "github.com/netbirdio/netbird/route" +) + +const ( + iceKeepAliveDefault = 4 * time.Second + iceDisconnectedTimeoutDefault = 6 * time.Second + // iceRelayAcceptanceMinWaitDefault is the same as in the Pion ICE package + iceRelayAcceptanceMinWaitDefault = 2 * time.Second +) + +type ICEConfig struct { + // StunTurn is a list of STUN and TURN URLs + StunTurn atomic.Value // []*stun.URI + + // InterfaceBlackList is a list of machine interfaces that should be filtered out by ICE Candidate gathering + // (e.g. if eth0 is in the list, host candidate of this interface won't be used) + InterfaceBlackList []string + DisableIPv6Discovery bool + + UDPMux ice.UDPMux + UDPMuxSrflx ice.UniversalUDPMux + + NATExternalIPs []string +} + +type OnICEConnReadyCallback func(ConnPriority, ICEConnInfo) + +type ICEConnInfo struct { + RemoteConn net.Conn + RosenpassPubKey []byte + RosenpassAddr string + LocalIceCandidateType string + RemoteIceCandidateType string + RemoteIceCandidateEndpoint string + LocalIceCandidateEndpoint string + Direct bool + Relayed bool + RelayedOnLocal bool +} + +type ConnectorICE struct { + ctx context.Context + log *log.Entry + config ConnConfig + configICE ICEConfig + signaler *internal.Signaler + iFaceDiscover stdnet.ExternalIFaceDiscover + statusRecorder *Status + onICEConnReady OnICEConnReadyCallback + doHandshakeFn DoHandshake + + connPriority ConnPriority + + agent *ice.Agent + + StunTurn []*stun.URI + + sentExtraSrflx bool +} + +func NewConnectorICE(ctx context.Context, log *log.Entry, config ConnConfig, configICE ICEConfig, signaler *internal.Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, onICEConnReady OnICEConnReadyCallback, doHandshakeFn DoHandshake) *ConnectorICE { + cice := &ConnectorICE{ + ctx: ctx, + log: log, + config: config, + configICE: configICE, + signaler: signaler, + iFaceDiscover: ifaceDiscover, + statusRecorder: statusRecorder, + onICEConnReady: onICEConnReady, + doHandshakeFn: doHandshakeFn, + } + return cice +} + +// SetupICEConnection sets up an ICE connection with the remote peer. +// If the relay mode is supported then try to connect in p2p way only. +// It is trying to reconnection in a loop until the context is canceled. +// In case of success connection it will call the onICEConnReady callback. +func (conn *ConnectorICE) SetupICEConnection(relayMode bool) { + var preferredCandidateTypes []ice.CandidateType + if relayMode { + conn.connPriority = connPriorityICEP2P + preferredCandidateTypes = candidateTypesP2P() + } else { + conn.connPriority = connPriorityICETurn + preferredCandidateTypes = candidateTypes() + } + + for { + if !conn.waitForReconnectTry() { + return + } + + remoteOfferAnswer, err := conn.doHandshakeFn() + if err != nil { + if errors.Is(err, ErrSignalIsNotReady) { + conn.log.Infof("signal client isn't ready, skipping connection attempt") + } + continue + } + + ctx, ctxCancel := context.WithCancel(conn.ctx) + agent, err := conn.reCreateAgent(ctxCancel, preferredCandidateTypes) + if err != nil { + ctxCancel() + continue + } + conn.agent = agent + + err = conn.agent.GatherCandidates() + if err != nil { + ctxCancel() + continue + } + + // will block until connection succeeded + // but it won't release if ICE Agent went into Disconnected or Failed state, + // so we have to cancel it with the provided context once agent detected a broken connection + remoteConn, err := conn.turnAgentDial(remoteOfferAnswer) + if err != nil { + ctxCancel() + continue + } + + pair, err := conn.agent.GetSelectedCandidatePair() + if err != nil { + ctxCancel() + continue + } + + if !isRelayCandidate(pair.Local) { + // dynamically set remote WireGuard port if other side specified a different one from the default one + remoteWgPort := iface.DefaultWgPort + if remoteOfferAnswer.WgListenPort != 0 { + remoteWgPort = remoteOfferAnswer.WgListenPort + } + + // To support old version's with direct mode we attempt to punch an additional role with the remote WireGuard port + go conn.punchRemoteWGPort(pair, remoteWgPort) + } + + ci := ICEConnInfo{ + RemoteConn: remoteConn, + RosenpassPubKey: remoteOfferAnswer.RosenpassPubKey, + RosenpassAddr: remoteOfferAnswer.RosenpassAddr, + LocalIceCandidateType: pair.Local.Type().String(), + RemoteIceCandidateType: pair.Remote.Type().String(), + LocalIceCandidateEndpoint: fmt.Sprintf("%s:%d", pair.Local.Address(), pair.Local.Port()), + RemoteIceCandidateEndpoint: fmt.Sprintf("%s:%d", pair.Remote.Address(), pair.Remote.Port()), + Direct: !isRelayCandidate(pair.Local), + Relayed: isRelayed(pair), + RelayedOnLocal: isRelayCandidate(pair.Local), + } + go conn.onICEConnReady(conn.connPriority, ci) + + <-ctx.Done() + ctxCancel() + _ = conn.agent.Close() + } +} + +// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer. +func (conn *ConnectorICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HAMap) { + conn.log.Debugf("OnRemoteCandidate from peer %s -> %s", conn.config.Key, candidate.String()) + if conn.agent == nil { + return + } + + if candidateViaRoutes(candidate, haRoutes) { + return + } + + err := conn.agent.AddRemoteCandidate(candidate) + if err != nil { + conn.log.Errorf("error while handling remote candidate") + return + } +} + +func (conn *ConnectorICE) GetLocalUserCredentials() (frag string, pwd string, err error) { + if conn.agent == nil { + return "", "", errors.New("ICE Agent is not initialized") + } + return conn.agent.GetLocalUserCredentials() +} + +func (conn *ConnectorICE) reCreateAgent(ctxCancel context.CancelFunc, relaySupport []ice.CandidateType) (*ice.Agent, error) { + failedTimeout := 6 * time.Second + transportNet, err := conn.newStdNet() + if err != nil { + conn.log.Errorf("failed to create pion's stdnet: %s", err) + } + + iceKeepAlive := iceKeepAlive() + iceDisconnectedTimeout := iceDisconnectedTimeout() + iceRelayAcceptanceMinWait := iceRelayAcceptanceMinWait() + + agentConfig := &ice.AgentConfig{ + MulticastDNSMode: ice.MulticastDNSModeDisabled, + NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4, ice.NetworkTypeUDP6}, + Urls: conn.configICE.StunTurn.Load().([]*stun.URI), + CandidateTypes: relaySupport, + FailedTimeout: &failedTimeout, + InterfaceFilter: stdnet.InterfaceFilter(conn.configICE.InterfaceBlackList), + UDPMux: conn.configICE.UDPMux, + UDPMuxSrflx: conn.configICE.UDPMuxSrflx, + NAT1To1IPs: conn.configICE.NATExternalIPs, + Net: transportNet, + DisconnectedTimeout: &iceDisconnectedTimeout, + KeepaliveInterval: &iceKeepAlive, + RelayAcceptanceMinWait: &iceRelayAcceptanceMinWait, + } + + if conn.configICE.DisableIPv6Discovery { + agentConfig.NetworkTypes = []ice.NetworkType{ice.NetworkTypeUDP4} + } + + conn.sentExtraSrflx = false + agent, err := ice.NewAgent(agentConfig) + if err != nil { + return nil, err + } + + err = agent.OnCandidate(conn.onICECandidate) + if err != nil { + return nil, err + } + + err = agent.OnConnectionStateChange(func(state ice.ConnectionState) { + conn.log.Debugf("ICE ConnectionState has changed to %s", state.String()) + if state == ice.ConnectionStateFailed || state == ice.ConnectionStateDisconnected { + ctxCancel() + } + }) + if err != nil { + return nil, err + } + + err = agent.OnSelectedCandidatePairChange(conn.onICESelectedCandidatePair) + if err != nil { + return nil, err + } + + err = agent.OnSuccessfulSelectedPairBindingResponse(func(p *ice.CandidatePair) { + err := conn.statusRecorder.UpdateLatency(conn.config.Key, p.Latency()) + if err != nil { + conn.log.Debugf("failed to update latency for peer: %s", err) + return + } + }) + if err != nil { + return nil, fmt.Errorf("failed setting binding response callback: %w", err) + } + + return agent, nil +} + +func (conn *ConnectorICE) punchRemoteWGPort(pair *ice.CandidatePair, remoteWgPort int) { + // wait local endpoint configuration + time.Sleep(time.Second) + addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", pair.Remote.Address(), remoteWgPort)) + if err != nil { + conn.log.Warnf("got an error while resolving the udp address, err: %s", err) + return + } + + mux, ok := conn.configICE.UDPMuxSrflx.(*bind.UniversalUDPMuxDefault) + if !ok { + conn.log.Warn("invalid udp mux conversion") + return + } + _, err = mux.GetSharedConn().WriteTo([]byte{0x6e, 0x62}, addr) + if err != nil { + conn.log.Warnf("got an error while sending the punch packet, err: %s", err) + } +} + +// onICECandidate is a callback attached to an ICE Agent to receive new local connection candidates +// and then signals them to the remote peer +func (conn *ConnectorICE) onICECandidate(candidate ice.Candidate) { + // nil means candidate gathering has been ended + if candidate == nil { + return + } + + // TODO: reported port is incorrect for CandidateTypeHost, makes understanding ICE use via logs confusing as port is ignored + conn.log.Debugf("discovered local candidate %s", candidate.String()) + go func() { + err := conn.signaler.SignalICECandidate(candidate, conn.config.Key) + if err != nil { + conn.log.Errorf("failed signaling candidate to the remote peer %s %s", conn.config.Key, err) + } + }() + + if !conn.shouldSendExtraSrflxCandidate(candidate) { + return + } + + // sends an extra server reflexive candidate to the remote peer with our related port (usually the wireguard port) + // this is useful when network has an existing port forwarding rule for the wireguard port and this peer + extraSrflx, err := extraSrflxCandidate(candidate) + if err != nil { + conn.log.Errorf("failed creating extra server reflexive candidate %s", err) + return + } + conn.sentExtraSrflx = true + + go func() { + err = conn.signaler.SignalICECandidate(extraSrflx, conn.config.Key) + if err != nil { + conn.log.Errorf("failed signaling the extra server reflexive candidate: %s", err) + } + }() +} + +func (conn *ConnectorICE) onICESelectedCandidatePair(c1 ice.Candidate, c2 ice.Candidate) { + conn.log.Debugf("selected candidate pair [local <-> remote] -> [%s <-> %s], peer %s", c1.String(), c2.String(), + conn.config.Key) +} + +func (conn *ConnectorICE) shouldSendExtraSrflxCandidate(candidate ice.Candidate) bool { + if !conn.sentExtraSrflx && candidate.Type() == ice.CandidateTypeServerReflexive && candidate.Port() != candidate.RelatedAddress().Port { + return true + } + return false +} + +func (conn *ConnectorICE) turnAgentDial(remoteOfferAnswer *OfferAnswer) (*ice.Conn, error) { + isControlling := conn.config.LocalKey > conn.config.Key + if isControlling { + return conn.agent.Dial(conn.ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd) + } else { + return conn.agent.Accept(conn.ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd) + } +} + +// waitForReconnectTry waits for a random duration before trying to reconnect +func (conn *ConnectorICE) waitForReconnectTry() bool { + minWait := 500 + maxWait := 2000 + duration := time.Duration(rand.Intn(maxWait-minWait)+minWait) * time.Millisecond + select { + case <-conn.ctx.Done(): + return false + case <-time.After(duration): + return true + } +} + +func extraSrflxCandidate(candidate ice.Candidate) (*ice.CandidateServerReflexive, error) { + relatedAdd := candidate.RelatedAddress() + return ice.NewCandidateServerReflexive(&ice.CandidateServerReflexiveConfig{ + Network: candidate.NetworkType().String(), + Address: candidate.Address(), + Port: relatedAdd.Port, + Component: candidate.Component(), + RelAddr: relatedAdd.Address, + RelPort: relatedAdd.Port, + }) +} + +func candidateViaRoutes(candidate ice.Candidate, clientRoutes route.HAMap) bool { + var routePrefixes []netip.Prefix + for _, routes := range clientRoutes { + if len(routes) > 0 && routes[0] != nil { + routePrefixes = append(routePrefixes, routes[0].Network) + } + } + + addr, err := netip.ParseAddr(candidate.Address()) + if err != nil { + log.Errorf("Failed to parse IP address %s: %v", candidate.Address(), err) + return false + } + + for _, prefix := range routePrefixes { + // default route is + if prefix.Bits() == 0 { + continue + } + + if prefix.Contains(addr) { + log.Debugf("Ignoring candidate [%s], its address is part of routed network %s", candidate.String(), prefix) + return true + } + } + return false +} + +func candidateTypes() []ice.CandidateType { + if hasICEForceRelayConn() { + return []ice.CandidateType{ice.CandidateTypeRelay} + } + // TODO: remove this once we have refactored userspace proxy into the bind package + if runtime.GOOS == "ios" { + return []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive} + } + return []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive, ice.CandidateTypeRelay} +} + +func candidateTypesP2P() []ice.CandidateType { + return []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive} +} + +func isRelayCandidate(candidate ice.Candidate) bool { + return candidate.Type() == ice.CandidateTypeRelay +} + +func isRelayed(pair *ice.CandidatePair) bool { + if pair.Local.Type() == ice.CandidateTypeRelay || pair.Remote.Type() == ice.CandidateTypeRelay { + return true + } + return false +} diff --git a/client/internal/peer/conn_relay.go b/client/internal/peer/conn_relay.go new file mode 100644 index 000000000..23bc47263 --- /dev/null +++ b/client/internal/peer/conn_relay.go @@ -0,0 +1,115 @@ +package peer + +import ( + "context" + "errors" + "math/rand" + "net" + "time" + + log "github.com/sirupsen/logrus" + + relayClient "github.com/netbirdio/netbird/relay/client" +) + +type OnRelayReadyCallback func(info RelayConnInfo) + +type RelayConnInfo struct { + relayedConn net.Conn + rosenpassPubKey []byte + rosenpassAddr string +} + +type ConnectorRelay struct { + ctx context.Context + log *log.Entry + relayManager *relayClient.Manager + config ConnConfig + onRelayConnReadyFN OnRelayReadyCallback + doHandshakeFn DoHandshake +} + +func NewConnectorRelay(ctx context.Context, log *log.Entry, relayManager *relayClient.Manager, config ConnConfig, onRelayConnReadyFN OnRelayReadyCallback, doHandshakeFn DoHandshake) *ConnectorRelay { + return &ConnectorRelay{ + ctx: ctx, + log: log, + relayManager: relayManager, + config: config, + onRelayConnReadyFN: onRelayConnReadyFN, + doHandshakeFn: doHandshakeFn, + } +} + +// SetupRelayConnection todo: this function is not completed. Make no sense to put it in a for loop because we are not waiting for any event +func (conn *ConnectorRelay) SetupRelayConnection() { + for { + if !conn.waitForReconnectTry() { + return + } + + remoteOfferAnswer, err := conn.doHandshakeFn() + if err != nil { + if errors.Is(err, ErrSignalIsNotReady) { + conn.log.Infof("signal client isn't ready, skipping connection attempt") + } + conn.log.Errorf("failed to do handshake: %v", err) + continue + } + + if !conn.isRelaySupported(remoteOfferAnswer) { + // todo should we retry? + continue + } + + // the relayManager will return with error in case if the connection has lost with relay server + currentRelayAddress, err := conn.relayManager.RelayAddress() + if err != nil { + continue + } + + srv := conn.preferredRelayServer(currentRelayAddress.String(), remoteOfferAnswer.RelaySrvAddress) + relayedConn, err := conn.relayManager.OpenConn(srv, conn.config.Key) + if err != nil { + continue + } + + go conn.onRelayConnReadyFN(RelayConnInfo{ + relayedConn: relayedConn, + rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey, + rosenpassAddr: remoteOfferAnswer.RosenpassAddr, + }) + } +} + +func (conn *ConnectorRelay) RelayAddress() (net.Addr, error) { + return conn.relayManager.RelayAddress() +} + +// todo check my side too +func (conn *ConnectorRelay) isRelaySupported(answer *OfferAnswer) bool { + return answer.RelaySrvAddress != "" +} + +func (conn *ConnectorRelay) preferredRelayServer(myRelayAddress, remoteRelayAddress string) string { + if conn.config.LocalKey > conn.config.Key { + return myRelayAddress + } + return remoteRelayAddress +} + +func (conn *ConnectorRelay) RelayIsSupported() bool { + return conn.relayManager.IsSupported() +} + +// waitForReconnectTry waits for a random duration before trying to reconnect +func (conn *ConnectorRelay) waitForReconnectTry() bool { + minWait := 500 + maxWait := 2000 + duration := time.Duration(rand.Intn(maxWait-minWait)+minWait) * time.Millisecond + select { + case <-conn.ctx.Done(): + return false + case <-time.After(duration): + return true + } +} diff --git a/client/internal/peer/conn_test.go b/client/internal/peer/conn_test.go index c16134808..10f1ac5ef 100644 --- a/client/internal/peer/conn_test.go +++ b/client/internal/peer/conn_test.go @@ -40,7 +40,7 @@ func TestConn_GetKey(t *testing.T) { defer func() { _ = wgProxyFactory.Free() }() - conn, err := NewConn(connConf, nil, wgProxyFactory, nil, nil) + conn, err := NewConn(connConf, nil, wgProxyFactory, nil, nil, nil) if err != nil { return } @@ -55,7 +55,7 @@ func TestConn_OnRemoteOffer(t *testing.T) { defer func() { _ = wgProxyFactory.Free() }() - conn, err := NewConn(connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil) + conn, err := NewConn(connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil, nil) if err != nil { return } @@ -92,7 +92,7 @@ func TestConn_OnRemoteAnswer(t *testing.T) { defer func() { _ = wgProxyFactory.Free() }() - conn, err := NewConn(connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil) + conn, err := NewConn(connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil, nil) if err != nil { return } @@ -128,7 +128,7 @@ func TestConn_Status(t *testing.T) { defer func() { _ = wgProxyFactory.Free() }() - conn, err := NewConn(connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil) + conn, err := NewConn(connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil, nil) if err != nil { return } @@ -158,7 +158,7 @@ func TestConn_Close(t *testing.T) { defer func() { _ = wgProxyFactory.Free() }() - conn, err := NewConn(connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil) + conn, err := NewConn(connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil, nil) if err != nil { return } diff --git a/client/internal/peer/handshaker.go b/client/internal/peer/handshaker.go new file mode 100644 index 000000000..6562c12e7 --- /dev/null +++ b/client/internal/peer/handshaker.go @@ -0,0 +1,210 @@ +package peer + +import ( + "context" + "errors" + "sync" + "time" + + log "github.com/sirupsen/logrus" + "golang.zx2c4.com/wireguard/conn" + + "github.com/netbirdio/netbird/client/internal" + "github.com/netbirdio/netbird/version" +) + +const ( + handshakeCacheTimeout = 3 * time.Second +) + +var ( + ErrSignalIsNotReady = errors.New("signal is not ready") +) + +type DoHandshake func() (*OfferAnswer, error) + +// IceCredentials ICE protocol credentials struct +type IceCredentials struct { + UFrag string + Pwd string +} + +// OfferAnswer represents a session establishment offer or answer +type OfferAnswer struct { + IceCredentials IceCredentials + // WgListenPort is a remote WireGuard listen port. + // This field is used when establishing a direct WireGuard connection without any proxy. + // We can set the remote peer's endpoint with this port. + WgListenPort int + + // Version of NetBird Agent + Version string + // RosenpassPubKey is the Rosenpass public key of the remote peer when receiving this message + // This value is the local Rosenpass server public key when sending the message + RosenpassPubKey []byte + // RosenpassAddr is the Rosenpass server address (IP:port) of the remote peer when receiving this message + // This value is the local Rosenpass server address when sending the message + RosenpassAddr string + + // relay server address + RelaySrvAddress string +} + +type HandshakeArgs struct { + IceUFrag string + IcePwd string + RelayAddr string +} + +type Handshaker struct { + mu sync.Mutex + ctx context.Context + config ConnConfig + signaler *internal.Signaler + + // remoteOffersCh is a channel used to wait for remote credentials to proceed with the connection + remoteOffersCh chan OfferAnswer + // remoteAnswerCh is a channel used to wait for remote credentials answer (confirmation of our offer) to proceed with the connection + remoteAnswerCh chan OfferAnswer + + remoteOfferAnswer *OfferAnswer + remoteOfferAnswerCreated time.Time +} + +func NewHandshaker(ctx context.Context, config ConnConfig, signaler *internal.Signaler) *Handshaker { + return &Handshaker{ + ctx: ctx, + config: config, + signaler: signaler, + remoteOffersCh: make(chan OfferAnswer), + remoteAnswerCh: make(chan OfferAnswer), + } +} + +func (h *Handshaker) Handshake(args HandshakeArgs) (*OfferAnswer, error) { + h.mu.Lock() + defer h.mu.Unlock() + + cachedOfferAnswer, ok := h.cachedHandshake() + if ok { + return cachedOfferAnswer, nil + } + + err := h.sendOffer(args) + if err != nil { + return nil, err + } + + // Only continue once we got a connection confirmation from the remote peer. + // The connection timeout could have happened before a confirmation received from the remote. + // The connection could have also been closed externally (e.g. when we received an update from the management that peer shouldn't be connected) + remoteOfferAnswer, err := h.waitForRemoteOfferConfirmation() + if err != nil { + return nil, err + } + h.storeRemoteOfferAnswer(remoteOfferAnswer) + + log.Debugf("received connection confirmation from peer %s running version %s and with remote WireGuard listen port %d", + h.config.Key, remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort) + + return remoteOfferAnswer, nil +} + +// OnRemoteOffer handles an offer from the remote peer and returns true if the message was accepted, false otherwise +// doesn't block, discards the message if connection wasn't ready +func (h *Handshaker) OnRemoteOffer(offer OfferAnswer) bool { + + select { + case h.remoteOffersCh <- offer: + return true + default: + log.Debugf("OnRemoteOffer skipping message from peer %s on status %s because is not ready", h.config.Key, conn.status.String()) + // connection might not be ready yet to receive so we ignore the message + return false + } +} + +// OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise +// doesn't block, discards the message if connection wasn't ready +func (h *Handshaker) OnRemoteAnswer(answer OfferAnswer) bool { + select { + case h.remoteAnswerCh <- answer: + return true + default: + // connection might not be ready yet to receive so we ignore the message + log.Debugf("OnRemoteAnswer skipping message from peer %s on status %s because is not ready", h.config.Key, conn.status.String()) + return false + } +} + +// sendOffer prepares local user credentials and signals them to the remote peer +func (h *Handshaker) sendOffer(args HandshakeArgs) error { + offer := OfferAnswer{ + IceCredentials: IceCredentials{args.IceUFrag, args.IcePwd}, + WgListenPort: h.config.LocalWgPort, + Version: version.NetbirdVersion(), + RosenpassPubKey: h.config.RosenpassPubKey, + RosenpassAddr: h.config.RosenpassAddr, + RelaySrvAddress: args.RelayAddr, + } + + return h.signaler.SignalOffer(offer, h.config.Key) +} + +func (h *Handshaker) sendAnswer() error { + localUFrag, localPwd, err := conn.connectorICE.GetLocalUserCredentials() + if err != nil { + return err + } + + log.Debugf("sending answer to %s", h.config.Key) + answer := OfferAnswer{ + IceCredentials: IceCredentials{localUFrag, localPwd}, + WgListenPort: h.config.LocalWgPort, + Version: version.NetbirdVersion(), + RosenpassPubKey: h.config.RosenpassPubKey, + RosenpassAddr: h.config.RosenpassAddr, + } + err = h.signaler.SignalAnswer(answer, h.config.Key) + if err != nil { + return err + } + + return nil +} + +func (h *Handshaker) waitForRemoteOfferConfirmation() (*OfferAnswer, error) { + select { + case remoteOfferAnswer := <-h.remoteOffersCh: + // received confirmation from the remote peer -> ready to proceed + err := h.sendAnswer() + if err != nil { + return nil, err + } + return &remoteOfferAnswer, nil + case remoteOfferAnswer := <-h.remoteAnswerCh: + return &remoteOfferAnswer, nil + case <-time.After(h.config.Timeout): + return nil, NewConnectionTimeoutError(h.config.Key, h.config.Timeout) + case <-h.ctx.Done(): + // closed externally + return nil, NewConnectionClosedError(h.config.Key) + } +} + +func (h *Handshaker) storeRemoteOfferAnswer(answer *OfferAnswer) { + h.remoteOfferAnswer = answer + h.remoteOfferAnswerCreated = time.Now() +} + +func (h *Handshaker) cachedHandshake() (*OfferAnswer, bool) { + if h.remoteOfferAnswer == nil { + return nil, false + } + + if time.Since(h.remoteOfferAnswerCreated) > handshakeCacheTimeout { + return nil, false + } + + return h.remoteOfferAnswer, true +} diff --git a/client/internal/peer/stdnet.go b/client/internal/peer/stdnet.go index 13f5886f5..06b484010 100644 --- a/client/internal/peer/stdnet.go +++ b/client/internal/peer/stdnet.go @@ -6,6 +6,6 @@ import ( "github.com/netbirdio/netbird/client/internal/stdnet" ) -func (conn *Conn) newStdNet() (*stdnet.Net, error) { - return stdnet.NewNet(conn.config.InterfaceBlackList) +func (conn *ConnectorICE) newStdNet() (*stdnet.Net, error) { + return stdnet.NewNet(conn.configICE.InterfaceBlackList) } diff --git a/client/internal/peer/stdnet_android.go b/client/internal/peer/stdnet_android.go index 8a2454371..dba94375f 100644 --- a/client/internal/peer/stdnet_android.go +++ b/client/internal/peer/stdnet_android.go @@ -2,6 +2,6 @@ package peer import "github.com/netbirdio/netbird/client/internal/stdnet" -func (conn *Conn) newStdNet() (*stdnet.Net, error) { - return stdnet.NewNetWithDiscover(conn.iFaceDiscover, conn.config.InterfaceBlackList) +func (conn *ConnectorICE) newStdNet() (*stdnet.Net, error) { + return stdnet.NewNetWithDiscover(conn.iFaceDiscover, conn.configICE.InterfaceBlackList) } diff --git a/client/internal/signaler.go b/client/internal/signaler.go new file mode 100644 index 000000000..cd466784d --- /dev/null +++ b/client/internal/signaler.go @@ -0,0 +1,71 @@ +package internal + +import ( + "github.com/pion/ice/v3" + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + + "github.com/netbirdio/netbird/client/internal/peer" + signal "github.com/netbirdio/netbird/signal/client" + sProto "github.com/netbirdio/netbird/signal/proto" +) + +type Signaler struct { + signal signal.Client + wgPrivateKey wgtypes.Key +} + +func NewSignaler(signal signal.Client, wgPrivateKey wgtypes.Key) *Signaler { + return &Signaler{ + signal: signal, + wgPrivateKey: wgPrivateKey, + } +} + +func (s *Signaler) SignalOffer(offer peer.OfferAnswer, remoteKey string) error { + return s.signalOfferAnswer(offer, remoteKey, sProto.Body_OFFER) +} + +func (s *Signaler) SignalAnswer(offer peer.OfferAnswer, remoteKey string) error { + return s.signalOfferAnswer(offer, remoteKey, sProto.Body_ANSWER) +} + +func (s *Signaler) SignalICECandidate(candidate ice.Candidate, remoteKey string) error { + return s.signal.Send(&sProto.Message{ + Key: s.wgPrivateKey.PublicKey().String(), + RemoteKey: remoteKey, + Body: &sProto.Body{ + Type: sProto.Body_CANDIDATE, + Payload: candidate.Marshal(), + }, + }) +} + +func (s *Signaler) Ready() bool { + return s.signal.Ready() +} + +// SignalOfferAnswer signals either an offer or an answer to remote peer +func (s *Signaler) signalOfferAnswer(offerAnswer peer.OfferAnswer, remoteKey string, bodyType sProto.Body_Type) error { + msg, err := signal.MarshalCredential( + s.wgPrivateKey, + offerAnswer.WgListenPort, + remoteKey, + &signal.Credential{ + UFrag: offerAnswer.IceCredentials.UFrag, + Pwd: offerAnswer.IceCredentials.Pwd, + }, + bodyType, + offerAnswer.RosenpassPubKey, + offerAnswer.RosenpassAddr, + offerAnswer.RelaySrvAddress) + if err != nil { + return err + } + + err = s.signal.Send(msg) + if err != nil { + return err + } + + return nil +} diff --git a/relay/client/manager.go b/relay/client/manager.go index 6b816c4fb..97a791936 100644 --- a/relay/client/manager.go +++ b/relay/client/manager.go @@ -108,6 +108,10 @@ func (m *Manager) RelayAddress() (net.Addr, error) { return m.relayClient.RelayRemoteAddress() } +func (m *Manager) IsSupported() bool { + return m.srvAddress != "" +} + func (m *Manager) openConnVia(serverAddress, peerKey string) (net.Conn, error) { // check if already has a connection to the desired relay server m.relayClientsMutex.RLock() diff --git a/signal/client/client.go b/signal/client/client.go index 7c54178e2..ced3fb7d0 100644 --- a/signal/client/client.go +++ b/signal/client/client.go @@ -51,10 +51,10 @@ func UnMarshalCredential(msg *proto.Message) (*Credential, error) { } // MarshalCredential marshal a Credential instance and returns a Message object -func MarshalCredential(myKey wgtypes.Key, myPort int, remoteKey wgtypes.Key, credential *Credential, t proto.Body_Type, rosenpassPubKey []byte, rosenpassAddr string, relaySrvAddress string) (*proto.Message, error) { +func MarshalCredential(myKey wgtypes.Key, myPort int, remoteKey string, credential *Credential, t proto.Body_Type, rosenpassPubKey []byte, rosenpassAddr string, relaySrvAddress string) (*proto.Message, error) { return &proto.Message{ Key: myKey.PublicKey().String(), - RemoteKey: remoteKey.String(), + RemoteKey: remoteKey, Body: &proto.Body{ Type: t, Payload: fmt.Sprintf("%s:%s", credential.UFrag, credential.Pwd),