Separate lifecircle of handshake, ice, relay connections

- fix Stun, Turn address update thread safety issue
- move conn worker login into peer package
This commit is contained in:
Zoltán Papp 2024-06-17 17:52:22 +02:00
parent a7760bf0a7
commit e407fe02c5
11 changed files with 1139 additions and 980 deletions

View File

@ -12,6 +12,7 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/pion/ice/v3"
@ -96,6 +97,7 @@ type EngineConfig struct {
type Engine struct {
// signal is a Signal Service client
signal signal.Client
signaler *Signaler
// mgmClient is a Management Service client
mgmClient mgm.Client
// peerConns is a map that holds all the peers that are known to this peer
@ -117,6 +119,7 @@ type Engine struct {
STUNs []*stun.URI
// TURNs is a list of STUN servers used by ICE
TURNs []*stun.URI
StunTurn atomic.Value
// clientRoutes is the most recent list of clientRoutes received from the Management Service
clientRoutes route.HAMap
@ -154,8 +157,6 @@ type Engine struct {
relayProbe *Probe
wgProbe *Probe
wgConnWorker sync.WaitGroup
relayManager *relayClient.Manager
}
@ -207,11 +208,11 @@ func NewEngineWithProbes(
relayProbe *Probe,
wgProbe *Probe,
) *Engine {
return &Engine{
clientCtx: clientCtx,
clientCancel: clientCancel,
signal: signalClient,
signaler: NewSignaler(signalClient, config.WgPrivateKey),
mgmClient: mgmClient,
relayManager: relayManager,
peerConns: make(map[string]*peer.Conn),
@ -258,7 +259,6 @@ func (e *Engine) Stop() error {
time.Sleep(500 * time.Millisecond)
e.close()
e.wgConnWorker.Wait()
log.Infof("stopped Netbird Engine")
return nil
}
@ -457,69 +457,8 @@ func (e *Engine) removePeer(peerKey string) error {
conn, exists := e.peerConns[peerKey]
if exists {
delete(e.peerConns, peerKey)
err := conn.Close()
if err != nil {
switch err.(type) {
case *peer.ConnectionAlreadyClosedError:
return nil
default:
return err
conn.Close()
}
}
}
return nil
}
func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtypes.Key, s signal.Client) error {
err := s.Send(&sProto.Message{
Key: myKey.PublicKey().String(),
RemoteKey: remoteKey.String(),
Body: &sProto.Body{
Type: sProto.Body_CANDIDATE,
Payload: candidate.Marshal(),
},
})
if err != nil {
return err
}
return nil
}
func sendSignal(message *sProto.Message, s signal.Client) error {
return s.Send(message)
}
// SignalOfferAnswer signals either an offer or an answer to remote peer
func SignalOfferAnswer(offerAnswer peer.OfferAnswer, myKey wgtypes.Key, remoteKey wgtypes.Key, s signal.Client,
isAnswer bool) error {
var t sProto.Body_Type
if isAnswer {
t = sProto.Body_ANSWER
} else {
t = sProto.Body_OFFER
}
msg, err := signal.MarshalCredential(
myKey,
offerAnswer.WgListenPort,
remoteKey, &signal.Credential{
UFrag: offerAnswer.IceCredentials.UFrag,
Pwd: offerAnswer.IceCredentials.Pwd,
},
t,
offerAnswer.RosenpassPubKey,
offerAnswer.RosenpassAddr,
offerAnswer.RelaySrvAddress)
if err != nil {
return err
}
err = s.Send(msg)
if err != nil {
return err
}
return nil
}
@ -538,6 +477,11 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
return err
}
var stunTurn []*stun.URI
stunTurn = append(stunTurn, e.STUNs...)
stunTurn = append(stunTurn, e.TURNs...)
e.StunTurn.Store(stunTurn)
// todo update relay address in the relay manager
// todo update signal
@ -893,57 +837,10 @@ func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error {
if err != nil {
log.Warnf("error adding peer %s to status recorder, got error: %v", peerKey, err)
}
e.wgConnWorker.Add(1)
go e.connWorker(conn, peerKey)
}
return nil
}
func (e *Engine) connWorker(conn *peer.Conn, peerKey string) {
defer e.wgConnWorker.Done()
for {
// randomize starting time a bit
min := 500
max := 2000
duration := time.Duration(rand.Intn(max-min)+min) * time.Millisecond
select {
case <-e.ctx.Done():
return
case <-time.After(duration):
}
// if peer has been removed -> give up
if !e.peerExists(peerKey) {
log.Debugf("peer %s doesn't exist anymore, won't retry connection", peerKey)
return
}
if !e.signal.Ready() {
log.Infof("signal client isn't ready, skipping connection attempt %s", peerKey)
continue
}
// we might have received new STUN and TURN servers meanwhile, so update them
e.syncMsgMux.Lock()
conn.UpdateStunTurn(append(e.STUNs, e.TURNs...))
e.syncMsgMux.Unlock()
err := conn.Open(e.ctx)
if err != nil {
log.Debugf("connection to peer %s failed: %v", peerKey, err)
var connectionClosedError *peer.ConnectionClosedError
switch {
case errors.As(err, &connectionClosedError):
// conn has been forced to close, so we exit the loop
return
default:
}
}
}
}
func (e *Engine) peerExists(peerKey string) bool {
e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock()
@ -953,9 +850,6 @@ func (e *Engine) peerExists(peerKey string) bool {
func (e *Engine) createPeerConn(pubKey string, allowedIPs string) (*peer.Conn, error) {
log.Debugf("creating peer connection %s", pubKey)
var stunTurn []*stun.URI
stunTurn = append(stunTurn, e.STUNs...)
stunTurn = append(stunTurn, e.TURNs...)
wgConfig := peer.WgConfig{
RemoteKey: pubKey,
@ -990,50 +884,27 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs string) (*peer.Conn, e
config := peer.ConnConfig{
Key: pubKey,
LocalKey: e.config.WgPrivateKey.PublicKey().String(),
StunTurn: stunTurn,
InterfaceBlackList: e.config.IFaceBlackList,
DisableIPv6Discovery: e.config.DisableIPv6Discovery,
Timeout: timeout,
UDPMux: e.udpMux.UDPMuxDefault,
UDPMuxSrflx: e.udpMux,
WgConfig: wgConfig,
LocalWgPort: e.config.WgPort,
NATExternalIPs: e.parseNATExternalIPMappings(),
RosenpassPubKey: e.getRosenpassPubKey(),
RosenpassAddr: e.getRosenpassAddr(),
ICEConfig: peer.ICEConfig{
StunTurn: e.StunTurn,
InterfaceBlackList: e.config.IFaceBlackList,
DisableIPv6Discovery: e.config.DisableIPv6Discovery,
UDPMux: e.udpMux.UDPMuxDefault,
UDPMuxSrflx: e.udpMux,
NATExternalIPs: e.parseNATExternalIPMappings(),
},
}
peerConn, err := peer.NewConn(config, e.statusRecorder, e.wgProxyFactory, e.mobileDep.TunAdapter, e.mobileDep.IFaceDiscover, e.relayManager)
peerConn, err := peer.NewConn(e.ctx, config, e.statusRecorder, e.wgProxyFactory, e.signaler, e.mobileDep.IFaceDiscover, e.relayManager)
if err != nil {
return nil, err
}
wgPubKey, err := wgtypes.ParseKey(pubKey)
if err != nil {
return nil, err
}
signalOffer := func(offerAnswer peer.OfferAnswer) error {
return SignalOfferAnswer(offerAnswer, e.config.WgPrivateKey, wgPubKey, e.signal, false)
}
signalCandidate := func(candidate ice.Candidate) error {
return signalCandidate(candidate, e.config.WgPrivateKey, wgPubKey, e.signal)
}
signalAnswer := func(offerAnswer peer.OfferAnswer) error {
return SignalOfferAnswer(offerAnswer, e.config.WgPrivateKey, wgPubKey, e.signal, true)
}
peerConn.SetSignalCandidate(signalCandidate)
peerConn.SetSignalOffer(signalOffer)
peerConn.SetSignalAnswer(signalAnswer)
peerConn.SetSendSignalMessage(func(message *sProto.Message) error {
return sendSignal(message, e.signal)
})
if e.rpManager != nil {
peerConn.SetOnConnected(e.rpManager.OnConnected)
peerConn.SetOnDisconnected(e.rpManager.OnDisconnected)
}
@ -1107,7 +978,7 @@ func (e *Engine) receiveSignalEvents() {
return err
}
conn.OnRemoteCandidate(candidate, e.GetClientRoutes())
go conn.OnRemoteCandidate(candidate, e.GetClientRoutes())
case sProto.Body_MODE:
}
@ -1402,7 +1273,7 @@ func (e *Engine) receiveProbeEvents() {
for _, peer := range e.peerConns {
key := peer.GetKey()
wgStats, err := peer.GetConf().WgConfig.WgInterface.GetStats(key)
wgStats, err := peer.WgConfig().WgInterface.GetStats(key)
if err != nil {
log.Debugf("failed to get wg stats for peer %s: %s", key, err)
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,436 @@
package peer
import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"net/netip"
"runtime"
"sync/atomic"
"time"
"github.com/pion/ice/v3"
"github.com/pion/stun/v2"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/internal"
"github.com/netbirdio/netbird/client/internal/stdnet"
"github.com/netbirdio/netbird/iface"
"github.com/netbirdio/netbird/iface/bind"
"github.com/netbirdio/netbird/route"
)
const (
iceKeepAliveDefault = 4 * time.Second
iceDisconnectedTimeoutDefault = 6 * time.Second
// iceRelayAcceptanceMinWaitDefault is the same as in the Pion ICE package
iceRelayAcceptanceMinWaitDefault = 2 * time.Second
)
type ICEConfig struct {
// StunTurn is a list of STUN and TURN URLs
StunTurn atomic.Value // []*stun.URI
// InterfaceBlackList is a list of machine interfaces that should be filtered out by ICE Candidate gathering
// (e.g. if eth0 is in the list, host candidate of this interface won't be used)
InterfaceBlackList []string
DisableIPv6Discovery bool
UDPMux ice.UDPMux
UDPMuxSrflx ice.UniversalUDPMux
NATExternalIPs []string
}
type OnICEConnReadyCallback func(ConnPriority, ICEConnInfo)
type ICEConnInfo struct {
RemoteConn net.Conn
RosenpassPubKey []byte
RosenpassAddr string
LocalIceCandidateType string
RemoteIceCandidateType string
RemoteIceCandidateEndpoint string
LocalIceCandidateEndpoint string
Direct bool
Relayed bool
RelayedOnLocal bool
}
type ConnectorICE struct {
ctx context.Context
log *log.Entry
config ConnConfig
configICE ICEConfig
signaler *internal.Signaler
iFaceDiscover stdnet.ExternalIFaceDiscover
statusRecorder *Status
onICEConnReady OnICEConnReadyCallback
doHandshakeFn DoHandshake
connPriority ConnPriority
agent *ice.Agent
StunTurn []*stun.URI
sentExtraSrflx bool
}
func NewConnectorICE(ctx context.Context, log *log.Entry, config ConnConfig, configICE ICEConfig, signaler *internal.Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, onICEConnReady OnICEConnReadyCallback, doHandshakeFn DoHandshake) *ConnectorICE {
cice := &ConnectorICE{
ctx: ctx,
log: log,
config: config,
configICE: configICE,
signaler: signaler,
iFaceDiscover: ifaceDiscover,
statusRecorder: statusRecorder,
onICEConnReady: onICEConnReady,
doHandshakeFn: doHandshakeFn,
}
return cice
}
// SetupICEConnection sets up an ICE connection with the remote peer.
// If the relay mode is supported then try to connect in p2p way only.
// It is trying to reconnection in a loop until the context is canceled.
// In case of success connection it will call the onICEConnReady callback.
func (conn *ConnectorICE) SetupICEConnection(relayMode bool) {
var preferredCandidateTypes []ice.CandidateType
if relayMode {
conn.connPriority = connPriorityICEP2P
preferredCandidateTypes = candidateTypesP2P()
} else {
conn.connPriority = connPriorityICETurn
preferredCandidateTypes = candidateTypes()
}
for {
if !conn.waitForReconnectTry() {
return
}
remoteOfferAnswer, err := conn.doHandshakeFn()
if err != nil {
if errors.Is(err, ErrSignalIsNotReady) {
conn.log.Infof("signal client isn't ready, skipping connection attempt")
}
continue
}
ctx, ctxCancel := context.WithCancel(conn.ctx)
agent, err := conn.reCreateAgent(ctxCancel, preferredCandidateTypes)
if err != nil {
ctxCancel()
continue
}
conn.agent = agent
err = conn.agent.GatherCandidates()
if err != nil {
ctxCancel()
continue
}
// will block until connection succeeded
// but it won't release if ICE Agent went into Disconnected or Failed state,
// so we have to cancel it with the provided context once agent detected a broken connection
remoteConn, err := conn.turnAgentDial(remoteOfferAnswer)
if err != nil {
ctxCancel()
continue
}
pair, err := conn.agent.GetSelectedCandidatePair()
if err != nil {
ctxCancel()
continue
}
if !isRelayCandidate(pair.Local) {
// dynamically set remote WireGuard port if other side specified a different one from the default one
remoteWgPort := iface.DefaultWgPort
if remoteOfferAnswer.WgListenPort != 0 {
remoteWgPort = remoteOfferAnswer.WgListenPort
}
// To support old version's with direct mode we attempt to punch an additional role with the remote WireGuard port
go conn.punchRemoteWGPort(pair, remoteWgPort)
}
ci := ICEConnInfo{
RemoteConn: remoteConn,
RosenpassPubKey: remoteOfferAnswer.RosenpassPubKey,
RosenpassAddr: remoteOfferAnswer.RosenpassAddr,
LocalIceCandidateType: pair.Local.Type().String(),
RemoteIceCandidateType: pair.Remote.Type().String(),
LocalIceCandidateEndpoint: fmt.Sprintf("%s:%d", pair.Local.Address(), pair.Local.Port()),
RemoteIceCandidateEndpoint: fmt.Sprintf("%s:%d", pair.Remote.Address(), pair.Remote.Port()),
Direct: !isRelayCandidate(pair.Local),
Relayed: isRelayed(pair),
RelayedOnLocal: isRelayCandidate(pair.Local),
}
go conn.onICEConnReady(conn.connPriority, ci)
<-ctx.Done()
ctxCancel()
_ = conn.agent.Close()
}
}
// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer.
func (conn *ConnectorICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HAMap) {
conn.log.Debugf("OnRemoteCandidate from peer %s -> %s", conn.config.Key, candidate.String())
if conn.agent == nil {
return
}
if candidateViaRoutes(candidate, haRoutes) {
return
}
err := conn.agent.AddRemoteCandidate(candidate)
if err != nil {
conn.log.Errorf("error while handling remote candidate")
return
}
}
func (conn *ConnectorICE) GetLocalUserCredentials() (frag string, pwd string, err error) {
if conn.agent == nil {
return "", "", errors.New("ICE Agent is not initialized")
}
return conn.agent.GetLocalUserCredentials()
}
func (conn *ConnectorICE) reCreateAgent(ctxCancel context.CancelFunc, relaySupport []ice.CandidateType) (*ice.Agent, error) {
failedTimeout := 6 * time.Second
transportNet, err := conn.newStdNet()
if err != nil {
conn.log.Errorf("failed to create pion's stdnet: %s", err)
}
iceKeepAlive := iceKeepAlive()
iceDisconnectedTimeout := iceDisconnectedTimeout()
iceRelayAcceptanceMinWait := iceRelayAcceptanceMinWait()
agentConfig := &ice.AgentConfig{
MulticastDNSMode: ice.MulticastDNSModeDisabled,
NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4, ice.NetworkTypeUDP6},
Urls: conn.configICE.StunTurn.Load().([]*stun.URI),
CandidateTypes: relaySupport,
FailedTimeout: &failedTimeout,
InterfaceFilter: stdnet.InterfaceFilter(conn.configICE.InterfaceBlackList),
UDPMux: conn.configICE.UDPMux,
UDPMuxSrflx: conn.configICE.UDPMuxSrflx,
NAT1To1IPs: conn.configICE.NATExternalIPs,
Net: transportNet,
DisconnectedTimeout: &iceDisconnectedTimeout,
KeepaliveInterval: &iceKeepAlive,
RelayAcceptanceMinWait: &iceRelayAcceptanceMinWait,
}
if conn.configICE.DisableIPv6Discovery {
agentConfig.NetworkTypes = []ice.NetworkType{ice.NetworkTypeUDP4}
}
conn.sentExtraSrflx = false
agent, err := ice.NewAgent(agentConfig)
if err != nil {
return nil, err
}
err = agent.OnCandidate(conn.onICECandidate)
if err != nil {
return nil, err
}
err = agent.OnConnectionStateChange(func(state ice.ConnectionState) {
conn.log.Debugf("ICE ConnectionState has changed to %s", state.String())
if state == ice.ConnectionStateFailed || state == ice.ConnectionStateDisconnected {
ctxCancel()
}
})
if err != nil {
return nil, err
}
err = agent.OnSelectedCandidatePairChange(conn.onICESelectedCandidatePair)
if err != nil {
return nil, err
}
err = agent.OnSuccessfulSelectedPairBindingResponse(func(p *ice.CandidatePair) {
err := conn.statusRecorder.UpdateLatency(conn.config.Key, p.Latency())
if err != nil {
conn.log.Debugf("failed to update latency for peer: %s", err)
return
}
})
if err != nil {
return nil, fmt.Errorf("failed setting binding response callback: %w", err)
}
return agent, nil
}
func (conn *ConnectorICE) punchRemoteWGPort(pair *ice.CandidatePair, remoteWgPort int) {
// wait local endpoint configuration
time.Sleep(time.Second)
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", pair.Remote.Address(), remoteWgPort))
if err != nil {
conn.log.Warnf("got an error while resolving the udp address, err: %s", err)
return
}
mux, ok := conn.configICE.UDPMuxSrflx.(*bind.UniversalUDPMuxDefault)
if !ok {
conn.log.Warn("invalid udp mux conversion")
return
}
_, err = mux.GetSharedConn().WriteTo([]byte{0x6e, 0x62}, addr)
if err != nil {
conn.log.Warnf("got an error while sending the punch packet, err: %s", err)
}
}
// onICECandidate is a callback attached to an ICE Agent to receive new local connection candidates
// and then signals them to the remote peer
func (conn *ConnectorICE) onICECandidate(candidate ice.Candidate) {
// nil means candidate gathering has been ended
if candidate == nil {
return
}
// TODO: reported port is incorrect for CandidateTypeHost, makes understanding ICE use via logs confusing as port is ignored
conn.log.Debugf("discovered local candidate %s", candidate.String())
go func() {
err := conn.signaler.SignalICECandidate(candidate, conn.config.Key)
if err != nil {
conn.log.Errorf("failed signaling candidate to the remote peer %s %s", conn.config.Key, err)
}
}()
if !conn.shouldSendExtraSrflxCandidate(candidate) {
return
}
// sends an extra server reflexive candidate to the remote peer with our related port (usually the wireguard port)
// this is useful when network has an existing port forwarding rule for the wireguard port and this peer
extraSrflx, err := extraSrflxCandidate(candidate)
if err != nil {
conn.log.Errorf("failed creating extra server reflexive candidate %s", err)
return
}
conn.sentExtraSrflx = true
go func() {
err = conn.signaler.SignalICECandidate(extraSrflx, conn.config.Key)
if err != nil {
conn.log.Errorf("failed signaling the extra server reflexive candidate: %s", err)
}
}()
}
func (conn *ConnectorICE) onICESelectedCandidatePair(c1 ice.Candidate, c2 ice.Candidate) {
conn.log.Debugf("selected candidate pair [local <-> remote] -> [%s <-> %s], peer %s", c1.String(), c2.String(),
conn.config.Key)
}
func (conn *ConnectorICE) shouldSendExtraSrflxCandidate(candidate ice.Candidate) bool {
if !conn.sentExtraSrflx && candidate.Type() == ice.CandidateTypeServerReflexive && candidate.Port() != candidate.RelatedAddress().Port {
return true
}
return false
}
func (conn *ConnectorICE) turnAgentDial(remoteOfferAnswer *OfferAnswer) (*ice.Conn, error) {
isControlling := conn.config.LocalKey > conn.config.Key
if isControlling {
return conn.agent.Dial(conn.ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd)
} else {
return conn.agent.Accept(conn.ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd)
}
}
// waitForReconnectTry waits for a random duration before trying to reconnect
func (conn *ConnectorICE) waitForReconnectTry() bool {
minWait := 500
maxWait := 2000
duration := time.Duration(rand.Intn(maxWait-minWait)+minWait) * time.Millisecond
select {
case <-conn.ctx.Done():
return false
case <-time.After(duration):
return true
}
}
func extraSrflxCandidate(candidate ice.Candidate) (*ice.CandidateServerReflexive, error) {
relatedAdd := candidate.RelatedAddress()
return ice.NewCandidateServerReflexive(&ice.CandidateServerReflexiveConfig{
Network: candidate.NetworkType().String(),
Address: candidate.Address(),
Port: relatedAdd.Port,
Component: candidate.Component(),
RelAddr: relatedAdd.Address,
RelPort: relatedAdd.Port,
})
}
func candidateViaRoutes(candidate ice.Candidate, clientRoutes route.HAMap) bool {
var routePrefixes []netip.Prefix
for _, routes := range clientRoutes {
if len(routes) > 0 && routes[0] != nil {
routePrefixes = append(routePrefixes, routes[0].Network)
}
}
addr, err := netip.ParseAddr(candidate.Address())
if err != nil {
log.Errorf("Failed to parse IP address %s: %v", candidate.Address(), err)
return false
}
for _, prefix := range routePrefixes {
// default route is
if prefix.Bits() == 0 {
continue
}
if prefix.Contains(addr) {
log.Debugf("Ignoring candidate [%s], its address is part of routed network %s", candidate.String(), prefix)
return true
}
}
return false
}
func candidateTypes() []ice.CandidateType {
if hasICEForceRelayConn() {
return []ice.CandidateType{ice.CandidateTypeRelay}
}
// TODO: remove this once we have refactored userspace proxy into the bind package
if runtime.GOOS == "ios" {
return []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive}
}
return []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive, ice.CandidateTypeRelay}
}
func candidateTypesP2P() []ice.CandidateType {
return []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive}
}
func isRelayCandidate(candidate ice.Candidate) bool {
return candidate.Type() == ice.CandidateTypeRelay
}
func isRelayed(pair *ice.CandidatePair) bool {
if pair.Local.Type() == ice.CandidateTypeRelay || pair.Remote.Type() == ice.CandidateTypeRelay {
return true
}
return false
}

View File

@ -0,0 +1,115 @@
package peer
import (
"context"
"errors"
"math/rand"
"net"
"time"
log "github.com/sirupsen/logrus"
relayClient "github.com/netbirdio/netbird/relay/client"
)
type OnRelayReadyCallback func(info RelayConnInfo)
type RelayConnInfo struct {
relayedConn net.Conn
rosenpassPubKey []byte
rosenpassAddr string
}
type ConnectorRelay struct {
ctx context.Context
log *log.Entry
relayManager *relayClient.Manager
config ConnConfig
onRelayConnReadyFN OnRelayReadyCallback
doHandshakeFn DoHandshake
}
func NewConnectorRelay(ctx context.Context, log *log.Entry, relayManager *relayClient.Manager, config ConnConfig, onRelayConnReadyFN OnRelayReadyCallback, doHandshakeFn DoHandshake) *ConnectorRelay {
return &ConnectorRelay{
ctx: ctx,
log: log,
relayManager: relayManager,
config: config,
onRelayConnReadyFN: onRelayConnReadyFN,
doHandshakeFn: doHandshakeFn,
}
}
// SetupRelayConnection todo: this function is not completed. Make no sense to put it in a for loop because we are not waiting for any event
func (conn *ConnectorRelay) SetupRelayConnection() {
for {
if !conn.waitForReconnectTry() {
return
}
remoteOfferAnswer, err := conn.doHandshakeFn()
if err != nil {
if errors.Is(err, ErrSignalIsNotReady) {
conn.log.Infof("signal client isn't ready, skipping connection attempt")
}
conn.log.Errorf("failed to do handshake: %v", err)
continue
}
if !conn.isRelaySupported(remoteOfferAnswer) {
// todo should we retry?
continue
}
// the relayManager will return with error in case if the connection has lost with relay server
currentRelayAddress, err := conn.relayManager.RelayAddress()
if err != nil {
continue
}
srv := conn.preferredRelayServer(currentRelayAddress.String(), remoteOfferAnswer.RelaySrvAddress)
relayedConn, err := conn.relayManager.OpenConn(srv, conn.config.Key)
if err != nil {
continue
}
go conn.onRelayConnReadyFN(RelayConnInfo{
relayedConn: relayedConn,
rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey,
rosenpassAddr: remoteOfferAnswer.RosenpassAddr,
})
}
}
func (conn *ConnectorRelay) RelayAddress() (net.Addr, error) {
return conn.relayManager.RelayAddress()
}
// todo check my side too
func (conn *ConnectorRelay) isRelaySupported(answer *OfferAnswer) bool {
return answer.RelaySrvAddress != ""
}
func (conn *ConnectorRelay) preferredRelayServer(myRelayAddress, remoteRelayAddress string) string {
if conn.config.LocalKey > conn.config.Key {
return myRelayAddress
}
return remoteRelayAddress
}
func (conn *ConnectorRelay) RelayIsSupported() bool {
return conn.relayManager.IsSupported()
}
// waitForReconnectTry waits for a random duration before trying to reconnect
func (conn *ConnectorRelay) waitForReconnectTry() bool {
minWait := 500
maxWait := 2000
duration := time.Duration(rand.Intn(maxWait-minWait)+minWait) * time.Millisecond
select {
case <-conn.ctx.Done():
return false
case <-time.After(duration):
return true
}
}

View File

@ -40,7 +40,7 @@ func TestConn_GetKey(t *testing.T) {
defer func() {
_ = wgProxyFactory.Free()
}()
conn, err := NewConn(connConf, nil, wgProxyFactory, nil, nil)
conn, err := NewConn(connConf, nil, wgProxyFactory, nil, nil, nil)
if err != nil {
return
}
@ -55,7 +55,7 @@ func TestConn_OnRemoteOffer(t *testing.T) {
defer func() {
_ = wgProxyFactory.Free()
}()
conn, err := NewConn(connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil)
conn, err := NewConn(connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil, nil)
if err != nil {
return
}
@ -92,7 +92,7 @@ func TestConn_OnRemoteAnswer(t *testing.T) {
defer func() {
_ = wgProxyFactory.Free()
}()
conn, err := NewConn(connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil)
conn, err := NewConn(connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil, nil)
if err != nil {
return
}
@ -128,7 +128,7 @@ func TestConn_Status(t *testing.T) {
defer func() {
_ = wgProxyFactory.Free()
}()
conn, err := NewConn(connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil)
conn, err := NewConn(connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil, nil)
if err != nil {
return
}
@ -158,7 +158,7 @@ func TestConn_Close(t *testing.T) {
defer func() {
_ = wgProxyFactory.Free()
}()
conn, err := NewConn(connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil)
conn, err := NewConn(connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil, nil)
if err != nil {
return
}

View File

@ -0,0 +1,210 @@
package peer
import (
"context"
"errors"
"sync"
"time"
log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/conn"
"github.com/netbirdio/netbird/client/internal"
"github.com/netbirdio/netbird/version"
)
const (
handshakeCacheTimeout = 3 * time.Second
)
var (
ErrSignalIsNotReady = errors.New("signal is not ready")
)
type DoHandshake func() (*OfferAnswer, error)
// IceCredentials ICE protocol credentials struct
type IceCredentials struct {
UFrag string
Pwd string
}
// OfferAnswer represents a session establishment offer or answer
type OfferAnswer struct {
IceCredentials IceCredentials
// WgListenPort is a remote WireGuard listen port.
// This field is used when establishing a direct WireGuard connection without any proxy.
// We can set the remote peer's endpoint with this port.
WgListenPort int
// Version of NetBird Agent
Version string
// RosenpassPubKey is the Rosenpass public key of the remote peer when receiving this message
// This value is the local Rosenpass server public key when sending the message
RosenpassPubKey []byte
// RosenpassAddr is the Rosenpass server address (IP:port) of the remote peer when receiving this message
// This value is the local Rosenpass server address when sending the message
RosenpassAddr string
// relay server address
RelaySrvAddress string
}
type HandshakeArgs struct {
IceUFrag string
IcePwd string
RelayAddr string
}
type Handshaker struct {
mu sync.Mutex
ctx context.Context
config ConnConfig
signaler *internal.Signaler
// remoteOffersCh is a channel used to wait for remote credentials to proceed with the connection
remoteOffersCh chan OfferAnswer
// remoteAnswerCh is a channel used to wait for remote credentials answer (confirmation of our offer) to proceed with the connection
remoteAnswerCh chan OfferAnswer
remoteOfferAnswer *OfferAnswer
remoteOfferAnswerCreated time.Time
}
func NewHandshaker(ctx context.Context, config ConnConfig, signaler *internal.Signaler) *Handshaker {
return &Handshaker{
ctx: ctx,
config: config,
signaler: signaler,
remoteOffersCh: make(chan OfferAnswer),
remoteAnswerCh: make(chan OfferAnswer),
}
}
func (h *Handshaker) Handshake(args HandshakeArgs) (*OfferAnswer, error) {
h.mu.Lock()
defer h.mu.Unlock()
cachedOfferAnswer, ok := h.cachedHandshake()
if ok {
return cachedOfferAnswer, nil
}
err := h.sendOffer(args)
if err != nil {
return nil, err
}
// Only continue once we got a connection confirmation from the remote peer.
// The connection timeout could have happened before a confirmation received from the remote.
// The connection could have also been closed externally (e.g. when we received an update from the management that peer shouldn't be connected)
remoteOfferAnswer, err := h.waitForRemoteOfferConfirmation()
if err != nil {
return nil, err
}
h.storeRemoteOfferAnswer(remoteOfferAnswer)
log.Debugf("received connection confirmation from peer %s running version %s and with remote WireGuard listen port %d",
h.config.Key, remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort)
return remoteOfferAnswer, nil
}
// OnRemoteOffer handles an offer from the remote peer and returns true if the message was accepted, false otherwise
// doesn't block, discards the message if connection wasn't ready
func (h *Handshaker) OnRemoteOffer(offer OfferAnswer) bool {
select {
case h.remoteOffersCh <- offer:
return true
default:
log.Debugf("OnRemoteOffer skipping message from peer %s on status %s because is not ready", h.config.Key, conn.status.String())
// connection might not be ready yet to receive so we ignore the message
return false
}
}
// OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise
// doesn't block, discards the message if connection wasn't ready
func (h *Handshaker) OnRemoteAnswer(answer OfferAnswer) bool {
select {
case h.remoteAnswerCh <- answer:
return true
default:
// connection might not be ready yet to receive so we ignore the message
log.Debugf("OnRemoteAnswer skipping message from peer %s on status %s because is not ready", h.config.Key, conn.status.String())
return false
}
}
// sendOffer prepares local user credentials and signals them to the remote peer
func (h *Handshaker) sendOffer(args HandshakeArgs) error {
offer := OfferAnswer{
IceCredentials: IceCredentials{args.IceUFrag, args.IcePwd},
WgListenPort: h.config.LocalWgPort,
Version: version.NetbirdVersion(),
RosenpassPubKey: h.config.RosenpassPubKey,
RosenpassAddr: h.config.RosenpassAddr,
RelaySrvAddress: args.RelayAddr,
}
return h.signaler.SignalOffer(offer, h.config.Key)
}
func (h *Handshaker) sendAnswer() error {
localUFrag, localPwd, err := conn.connectorICE.GetLocalUserCredentials()
if err != nil {
return err
}
log.Debugf("sending answer to %s", h.config.Key)
answer := OfferAnswer{
IceCredentials: IceCredentials{localUFrag, localPwd},
WgListenPort: h.config.LocalWgPort,
Version: version.NetbirdVersion(),
RosenpassPubKey: h.config.RosenpassPubKey,
RosenpassAddr: h.config.RosenpassAddr,
}
err = h.signaler.SignalAnswer(answer, h.config.Key)
if err != nil {
return err
}
return nil
}
func (h *Handshaker) waitForRemoteOfferConfirmation() (*OfferAnswer, error) {
select {
case remoteOfferAnswer := <-h.remoteOffersCh:
// received confirmation from the remote peer -> ready to proceed
err := h.sendAnswer()
if err != nil {
return nil, err
}
return &remoteOfferAnswer, nil
case remoteOfferAnswer := <-h.remoteAnswerCh:
return &remoteOfferAnswer, nil
case <-time.After(h.config.Timeout):
return nil, NewConnectionTimeoutError(h.config.Key, h.config.Timeout)
case <-h.ctx.Done():
// closed externally
return nil, NewConnectionClosedError(h.config.Key)
}
}
func (h *Handshaker) storeRemoteOfferAnswer(answer *OfferAnswer) {
h.remoteOfferAnswer = answer
h.remoteOfferAnswerCreated = time.Now()
}
func (h *Handshaker) cachedHandshake() (*OfferAnswer, bool) {
if h.remoteOfferAnswer == nil {
return nil, false
}
if time.Since(h.remoteOfferAnswerCreated) > handshakeCacheTimeout {
return nil, false
}
return h.remoteOfferAnswer, true
}

View File

@ -6,6 +6,6 @@ import (
"github.com/netbirdio/netbird/client/internal/stdnet"
)
func (conn *Conn) newStdNet() (*stdnet.Net, error) {
return stdnet.NewNet(conn.config.InterfaceBlackList)
func (conn *ConnectorICE) newStdNet() (*stdnet.Net, error) {
return stdnet.NewNet(conn.configICE.InterfaceBlackList)
}

View File

@ -2,6 +2,6 @@ package peer
import "github.com/netbirdio/netbird/client/internal/stdnet"
func (conn *Conn) newStdNet() (*stdnet.Net, error) {
return stdnet.NewNetWithDiscover(conn.iFaceDiscover, conn.config.InterfaceBlackList)
func (conn *ConnectorICE) newStdNet() (*stdnet.Net, error) {
return stdnet.NewNetWithDiscover(conn.iFaceDiscover, conn.configICE.InterfaceBlackList)
}

View File

@ -0,0 +1,71 @@
package internal
import (
"github.com/pion/ice/v3"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"github.com/netbirdio/netbird/client/internal/peer"
signal "github.com/netbirdio/netbird/signal/client"
sProto "github.com/netbirdio/netbird/signal/proto"
)
type Signaler struct {
signal signal.Client
wgPrivateKey wgtypes.Key
}
func NewSignaler(signal signal.Client, wgPrivateKey wgtypes.Key) *Signaler {
return &Signaler{
signal: signal,
wgPrivateKey: wgPrivateKey,
}
}
func (s *Signaler) SignalOffer(offer peer.OfferAnswer, remoteKey string) error {
return s.signalOfferAnswer(offer, remoteKey, sProto.Body_OFFER)
}
func (s *Signaler) SignalAnswer(offer peer.OfferAnswer, remoteKey string) error {
return s.signalOfferAnswer(offer, remoteKey, sProto.Body_ANSWER)
}
func (s *Signaler) SignalICECandidate(candidate ice.Candidate, remoteKey string) error {
return s.signal.Send(&sProto.Message{
Key: s.wgPrivateKey.PublicKey().String(),
RemoteKey: remoteKey,
Body: &sProto.Body{
Type: sProto.Body_CANDIDATE,
Payload: candidate.Marshal(),
},
})
}
func (s *Signaler) Ready() bool {
return s.signal.Ready()
}
// SignalOfferAnswer signals either an offer or an answer to remote peer
func (s *Signaler) signalOfferAnswer(offerAnswer peer.OfferAnswer, remoteKey string, bodyType sProto.Body_Type) error {
msg, err := signal.MarshalCredential(
s.wgPrivateKey,
offerAnswer.WgListenPort,
remoteKey,
&signal.Credential{
UFrag: offerAnswer.IceCredentials.UFrag,
Pwd: offerAnswer.IceCredentials.Pwd,
},
bodyType,
offerAnswer.RosenpassPubKey,
offerAnswer.RosenpassAddr,
offerAnswer.RelaySrvAddress)
if err != nil {
return err
}
err = s.signal.Send(msg)
if err != nil {
return err
}
return nil
}

View File

@ -108,6 +108,10 @@ func (m *Manager) RelayAddress() (net.Addr, error) {
return m.relayClient.RelayRemoteAddress()
}
func (m *Manager) IsSupported() bool {
return m.srvAddress != ""
}
func (m *Manager) openConnVia(serverAddress, peerKey string) (net.Conn, error) {
// check if already has a connection to the desired relay server
m.relayClientsMutex.RLock()

View File

@ -51,10 +51,10 @@ func UnMarshalCredential(msg *proto.Message) (*Credential, error) {
}
// MarshalCredential marshal a Credential instance and returns a Message object
func MarshalCredential(myKey wgtypes.Key, myPort int, remoteKey wgtypes.Key, credential *Credential, t proto.Body_Type, rosenpassPubKey []byte, rosenpassAddr string, relaySrvAddress string) (*proto.Message, error) {
func MarshalCredential(myKey wgtypes.Key, myPort int, remoteKey string, credential *Credential, t proto.Body_Type, rosenpassPubKey []byte, rosenpassAddr string, relaySrvAddress string) (*proto.Message, error) {
return &proto.Message{
Key: myKey.PublicKey().String(),
RemoteKey: remoteKey.String(),
RemoteKey: remoteKey,
Body: &proto.Body{
Type: t,
Payload: fmt.Sprintf("%s:%s", credential.UFrag, credential.Pwd),