netbird/client/internal/peer/worker_ice.go

440 lines
13 KiB
Go
Raw Normal View History

package peer
import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"net/netip"
"runtime"
"sync/atomic"
"time"
"github.com/pion/ice/v3"
"github.com/pion/stun/v2"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/internal/stdnet"
"github.com/netbirdio/netbird/iface"
"github.com/netbirdio/netbird/iface/bind"
"github.com/netbirdio/netbird/route"
)
const (
iceKeepAliveDefault = 4 * time.Second
iceDisconnectedTimeoutDefault = 6 * time.Second
// iceRelayAcceptanceMinWaitDefault is the same as in the Pion ICE package
iceRelayAcceptanceMinWaitDefault = 2 * time.Second
)
type ICEConfig struct {
// StunTurn is a list of STUN and TURN URLs
StunTurn atomic.Value // []*stun.URI
// InterfaceBlackList is a list of machine interfaces that should be filtered out by ICE Candidate gathering
// (e.g. if eth0 is in the list, host candidate of this interface won't be used)
InterfaceBlackList []string
DisableIPv6Discovery bool
UDPMux ice.UDPMux
UDPMuxSrflx ice.UniversalUDPMux
NATExternalIPs []string
}
type OnICEConnReadyCallback func(ConnPriority, ICEConnInfo)
type ICEConnInfo struct {
RemoteConn net.Conn
RosenpassPubKey []byte
RosenpassAddr string
LocalIceCandidateType string
RemoteIceCandidateType string
RemoteIceCandidateEndpoint string
LocalIceCandidateEndpoint string
Direct bool
Relayed bool
RelayedOnLocal bool
}
2024-06-18 11:22:40 +02:00
type WorkerICE struct {
ctx context.Context
log *log.Entry
config ConnConfig
configICE ICEConfig
2024-06-17 18:02:52 +02:00
signaler *Signaler
iFaceDiscover stdnet.ExternalIFaceDiscover
statusRecorder *Status
onICEConnReady OnICEConnReadyCallback
doHandshakeFn DoHandshake
connPriority ConnPriority
agent *ice.Agent
StunTurn []*stun.URI
sentExtraSrflx bool
}
2024-06-18 11:22:40 +02:00
func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, configICE ICEConfig, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, onICEConnReady OnICEConnReadyCallback, doHandshakeFn DoHandshake) *WorkerICE {
cice := &WorkerICE{
ctx: ctx,
log: log,
config: config,
configICE: configICE,
signaler: signaler,
iFaceDiscover: ifaceDiscover,
statusRecorder: statusRecorder,
onICEConnReady: onICEConnReady,
doHandshakeFn: doHandshakeFn,
}
return cice
}
// SetupICEConnection sets up an ICE connection with the remote peer.
// If the relay mode is supported then try to connect in p2p way only.
// It is trying to reconnection in a loop until the context is canceled.
// In case of success connection it will call the onICEConnReady callback.
2024-06-18 11:22:40 +02:00
func (w *WorkerICE) SetupICEConnection(hasRelayOnLocally bool) {
for {
2024-06-18 11:22:40 +02:00
if !w.waitForReconnectTry() {
return
}
2024-06-18 11:22:40 +02:00
remoteOfferAnswer, err := w.doHandshakeFn()
if err != nil {
if errors.Is(err, ErrSignalIsNotReady) {
2024-06-18 11:22:40 +02:00
w.log.Infof("signal client isn't ready, skipping connection attempt")
}
continue
}
2024-06-18 11:10:17 +02:00
var preferredCandidateTypes []ice.CandidateType
if hasRelayOnLocally && remoteOfferAnswer.RelaySrvAddress != "" {
2024-06-18 11:22:40 +02:00
w.connPriority = connPriorityICEP2P
2024-06-18 11:10:17 +02:00
preferredCandidateTypes = candidateTypesP2P()
} else {
2024-06-18 11:22:40 +02:00
w.connPriority = connPriorityICETurn
2024-06-18 11:10:17 +02:00
preferredCandidateTypes = candidateTypes()
}
2024-06-18 11:22:40 +02:00
ctx, ctxCancel := context.WithCancel(w.ctx)
agent, err := w.reCreateAgent(ctxCancel, preferredCandidateTypes)
if err != nil {
ctxCancel()
continue
}
2024-06-18 11:22:40 +02:00
w.agent = agent
2024-06-18 11:22:40 +02:00
err = w.agent.GatherCandidates()
if err != nil {
ctxCancel()
continue
}
// will block until connection succeeded
// but it won't release if ICE Agent went into Disconnected or Failed state,
// so we have to cancel it with the provided context once agent detected a broken connection
2024-06-18 11:22:40 +02:00
remoteConn, err := w.turnAgentDial(remoteOfferAnswer)
if err != nil {
ctxCancel()
continue
}
2024-06-18 11:22:40 +02:00
pair, err := w.agent.GetSelectedCandidatePair()
if err != nil {
ctxCancel()
continue
}
if !isRelayCandidate(pair.Local) {
// dynamically set remote WireGuard port if other side specified a different one from the default one
remoteWgPort := iface.DefaultWgPort
if remoteOfferAnswer.WgListenPort != 0 {
remoteWgPort = remoteOfferAnswer.WgListenPort
}
// To support old version's with direct mode we attempt to punch an additional role with the remote WireGuard port
2024-06-18 11:22:40 +02:00
go w.punchRemoteWGPort(pair, remoteWgPort)
}
ci := ICEConnInfo{
RemoteConn: remoteConn,
RosenpassPubKey: remoteOfferAnswer.RosenpassPubKey,
RosenpassAddr: remoteOfferAnswer.RosenpassAddr,
LocalIceCandidateType: pair.Local.Type().String(),
RemoteIceCandidateType: pair.Remote.Type().String(),
LocalIceCandidateEndpoint: fmt.Sprintf("%s:%d", pair.Local.Address(), pair.Local.Port()),
RemoteIceCandidateEndpoint: fmt.Sprintf("%s:%d", pair.Remote.Address(), pair.Remote.Port()),
Direct: !isRelayCandidate(pair.Local),
Relayed: isRelayed(pair),
RelayedOnLocal: isRelayCandidate(pair.Local),
}
2024-06-18 11:22:40 +02:00
go w.onICEConnReady(w.connPriority, ci)
<-ctx.Done()
ctxCancel()
2024-06-18 11:22:40 +02:00
_ = w.agent.Close()
}
}
// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer.
2024-06-18 11:22:40 +02:00
func (w *WorkerICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HAMap) {
w.log.Debugf("OnRemoteCandidate from peer %s -> %s", w.config.Key, candidate.String())
if w.agent == nil {
return
}
if candidateViaRoutes(candidate, haRoutes) {
return
}
2024-06-18 11:22:40 +02:00
err := w.agent.AddRemoteCandidate(candidate)
if err != nil {
2024-06-18 11:22:40 +02:00
w.log.Errorf("error while handling remote candidate")
return
}
}
2024-06-18 11:22:40 +02:00
func (w *WorkerICE) GetLocalUserCredentials() (frag string, pwd string, err error) {
if w.agent == nil {
return "", "", errors.New("ICE Agent is not initialized")
}
2024-06-18 11:22:40 +02:00
return w.agent.GetLocalUserCredentials()
}
2024-06-18 11:22:40 +02:00
func (w *WorkerICE) reCreateAgent(ctxCancel context.CancelFunc, relaySupport []ice.CandidateType) (*ice.Agent, error) {
failedTimeout := 6 * time.Second
2024-06-18 11:22:40 +02:00
transportNet, err := w.newStdNet()
if err != nil {
2024-06-18 11:22:40 +02:00
w.log.Errorf("failed to create pion's stdnet: %s", err)
}
iceKeepAlive := iceKeepAlive()
iceDisconnectedTimeout := iceDisconnectedTimeout()
iceRelayAcceptanceMinWait := iceRelayAcceptanceMinWait()
agentConfig := &ice.AgentConfig{
MulticastDNSMode: ice.MulticastDNSModeDisabled,
NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4, ice.NetworkTypeUDP6},
2024-06-18 11:22:40 +02:00
Urls: w.configICE.StunTurn.Load().([]*stun.URI),
CandidateTypes: relaySupport,
FailedTimeout: &failedTimeout,
2024-06-18 11:22:40 +02:00
InterfaceFilter: stdnet.InterfaceFilter(w.configICE.InterfaceBlackList),
UDPMux: w.configICE.UDPMux,
UDPMuxSrflx: w.configICE.UDPMuxSrflx,
NAT1To1IPs: w.configICE.NATExternalIPs,
Net: transportNet,
DisconnectedTimeout: &iceDisconnectedTimeout,
KeepaliveInterval: &iceKeepAlive,
RelayAcceptanceMinWait: &iceRelayAcceptanceMinWait,
}
2024-06-18 11:22:40 +02:00
if w.configICE.DisableIPv6Discovery {
agentConfig.NetworkTypes = []ice.NetworkType{ice.NetworkTypeUDP4}
}
2024-06-18 11:22:40 +02:00
w.sentExtraSrflx = false
agent, err := ice.NewAgent(agentConfig)
if err != nil {
return nil, err
}
2024-06-18 11:22:40 +02:00
err = agent.OnCandidate(w.onICECandidate)
if err != nil {
return nil, err
}
err = agent.OnConnectionStateChange(func(state ice.ConnectionState) {
2024-06-18 11:22:40 +02:00
w.log.Debugf("ICE ConnectionState has changed to %s", state.String())
if state == ice.ConnectionStateFailed || state == ice.ConnectionStateDisconnected {
ctxCancel()
}
})
if err != nil {
return nil, err
}
2024-06-18 11:22:40 +02:00
err = agent.OnSelectedCandidatePairChange(w.onICESelectedCandidatePair)
if err != nil {
return nil, err
}
err = agent.OnSuccessfulSelectedPairBindingResponse(func(p *ice.CandidatePair) {
2024-06-18 11:22:40 +02:00
err := w.statusRecorder.UpdateLatency(w.config.Key, p.Latency())
if err != nil {
2024-06-18 11:22:40 +02:00
w.log.Debugf("failed to update latency for peer: %s", err)
return
}
})
if err != nil {
return nil, fmt.Errorf("failed setting binding response callback: %w", err)
}
return agent, nil
}
2024-06-18 11:22:40 +02:00
func (w *WorkerICE) punchRemoteWGPort(pair *ice.CandidatePair, remoteWgPort int) {
// wait local endpoint configuration
time.Sleep(time.Second)
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", pair.Remote.Address(), remoteWgPort))
if err != nil {
2024-06-18 11:22:40 +02:00
w.log.Warnf("got an error while resolving the udp address, err: %s", err)
return
}
2024-06-18 11:22:40 +02:00
mux, ok := w.configICE.UDPMuxSrflx.(*bind.UniversalUDPMuxDefault)
if !ok {
2024-06-18 11:22:40 +02:00
w.log.Warn("invalid udp mux conversion")
return
}
_, err = mux.GetSharedConn().WriteTo([]byte{0x6e, 0x62}, addr)
if err != nil {
2024-06-18 11:22:40 +02:00
w.log.Warnf("got an error while sending the punch packet, err: %s", err)
}
}
// onICECandidate is a callback attached to an ICE Agent to receive new local connection candidates
// and then signals them to the remote peer
2024-06-18 11:22:40 +02:00
func (w *WorkerICE) onICECandidate(candidate ice.Candidate) {
// nil means candidate gathering has been ended
if candidate == nil {
return
}
// TODO: reported port is incorrect for CandidateTypeHost, makes understanding ICE use via logs confusing as port is ignored
2024-06-18 11:22:40 +02:00
w.log.Debugf("discovered local candidate %s", candidate.String())
go func() {
2024-06-18 11:22:40 +02:00
err := w.signaler.SignalICECandidate(candidate, w.config.Key)
if err != nil {
2024-06-18 11:22:40 +02:00
w.log.Errorf("failed signaling candidate to the remote peer %s %s", w.config.Key, err)
}
}()
2024-06-18 11:22:40 +02:00
if !w.shouldSendExtraSrflxCandidate(candidate) {
return
}
// sends an extra server reflexive candidate to the remote peer with our related port (usually the wireguard port)
// this is useful when network has an existing port forwarding rule for the wireguard port and this peer
extraSrflx, err := extraSrflxCandidate(candidate)
if err != nil {
2024-06-18 11:22:40 +02:00
w.log.Errorf("failed creating extra server reflexive candidate %s", err)
return
}
2024-06-18 11:22:40 +02:00
w.sentExtraSrflx = true
go func() {
2024-06-18 11:22:40 +02:00
err = w.signaler.SignalICECandidate(extraSrflx, w.config.Key)
if err != nil {
2024-06-18 11:22:40 +02:00
w.log.Errorf("failed signaling the extra server reflexive candidate: %s", err)
}
}()
}
2024-06-18 11:22:40 +02:00
func (w *WorkerICE) onICESelectedCandidatePair(c1 ice.Candidate, c2 ice.Candidate) {
w.log.Debugf("selected candidate pair [local <-> remote] -> [%s <-> %s], peer %s", c1.String(), c2.String(),
w.config.Key)
}
2024-06-18 11:22:40 +02:00
func (w *WorkerICE) shouldSendExtraSrflxCandidate(candidate ice.Candidate) bool {
if !w.sentExtraSrflx && candidate.Type() == ice.CandidateTypeServerReflexive && candidate.Port() != candidate.RelatedAddress().Port {
return true
}
return false
}
2024-06-18 11:22:40 +02:00
func (w *WorkerICE) turnAgentDial(remoteOfferAnswer *OfferAnswer) (*ice.Conn, error) {
isControlling := w.config.LocalKey > w.config.Key
if isControlling {
2024-06-18 11:22:40 +02:00
return w.agent.Dial(w.ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd)
} else {
2024-06-18 11:22:40 +02:00
return w.agent.Accept(w.ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd)
}
}
// waitForReconnectTry waits for a random duration before trying to reconnect
2024-06-18 11:22:40 +02:00
func (w *WorkerICE) waitForReconnectTry() bool {
minWait := 500
maxWait := 2000
duration := time.Duration(rand.Intn(maxWait-minWait)+minWait) * time.Millisecond
2024-06-18 11:20:01 +02:00
timeout := time.NewTimer(duration)
defer timeout.Stop()
select {
2024-06-18 11:22:40 +02:00
case <-w.ctx.Done():
return false
2024-06-18 11:20:01 +02:00
case <-timeout.C:
return true
}
}
func extraSrflxCandidate(candidate ice.Candidate) (*ice.CandidateServerReflexive, error) {
relatedAdd := candidate.RelatedAddress()
return ice.NewCandidateServerReflexive(&ice.CandidateServerReflexiveConfig{
Network: candidate.NetworkType().String(),
Address: candidate.Address(),
Port: relatedAdd.Port,
Component: candidate.Component(),
RelAddr: relatedAdd.Address,
RelPort: relatedAdd.Port,
})
}
func candidateViaRoutes(candidate ice.Candidate, clientRoutes route.HAMap) bool {
var routePrefixes []netip.Prefix
for _, routes := range clientRoutes {
if len(routes) > 0 && routes[0] != nil {
routePrefixes = append(routePrefixes, routes[0].Network)
}
}
addr, err := netip.ParseAddr(candidate.Address())
if err != nil {
log.Errorf("Failed to parse IP address %s: %v", candidate.Address(), err)
return false
}
for _, prefix := range routePrefixes {
// default route is
if prefix.Bits() == 0 {
continue
}
if prefix.Contains(addr) {
log.Debugf("Ignoring candidate [%s], its address is part of routed network %s", candidate.String(), prefix)
return true
}
}
return false
}
func candidateTypes() []ice.CandidateType {
if hasICEForceRelayConn() {
return []ice.CandidateType{ice.CandidateTypeRelay}
}
// TODO: remove this once we have refactored userspace proxy into the bind package
if runtime.GOOS == "ios" {
return []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive}
}
return []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive, ice.CandidateTypeRelay}
}
func candidateTypesP2P() []ice.CandidateType {
return []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive}
}
func isRelayCandidate(candidate ice.Candidate) bool {
return candidate.Type() == ice.CandidateTypeRelay
}
func isRelayed(pair *ice.CandidatePair) bool {
if pair.Local.Type() == ice.CandidateTypeRelay || pair.Remote.Type() == ice.CandidateTypeRelay {
return true
}
return false
}