Refactor handshaker loop

This commit is contained in:
Zoltán Papp 2024-06-21 12:35:28 +02:00
parent bfe60c01ba
commit 4a08f1a1e9
4 changed files with 283 additions and 272 deletions

View File

@ -125,7 +125,6 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu
wgProxyFactory: wgProxyFactory, wgProxyFactory: wgProxyFactory,
signaler: signaler, signaler: signaler,
allowedIPsIP: allowedIPsIP.String(), allowedIPsIP: allowedIPsIP.String(),
handshaker: NewHandshaker(ctx, connLog, config, signaler),
statusRelay: StatusDisconnected, statusRelay: StatusDisconnected,
statusICE: StatusDisconnected, statusICE: StatusDisconnected,
} }
@ -133,7 +132,6 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu
rFns := WorkerRelayCallbacks{ rFns := WorkerRelayCallbacks{
OnConnReady: conn.relayConnectionIsReady, OnConnReady: conn.relayConnectionIsReady,
OnStatusChanged: conn.onWorkerRelayStateChanged, OnStatusChanged: conn.onWorkerRelayStateChanged,
DoHandshake: conn.doHandshake,
} }
wFns := WorkerICECallbacks{ wFns := WorkerICECallbacks{
@ -142,8 +140,13 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu
DoHandshake: conn.doHandshake, DoHandshake: conn.doHandshake,
} }
conn.handshaker = NewHandshaker(ctx, connLog, config, signaler, conn.onNewOffer)
go conn.handshaker.Listen()
conn.workerRelay = NewWorkerRelay(ctx, connLog, relayManager, config, rFns) conn.workerRelay = NewWorkerRelay(ctx, connLog, relayManager, config, rFns)
conn.workerICE = NewWorkerICE(ctx, connLog, config, config.ICEConfig, signaler, iFaceDiscover, statusRecorder, wFns) conn.workerICE, err = NewWorkerICE(ctx, connLog, config, config.ICEConfig, signaler, iFaceDiscover, statusRecorder, wFns)
if err != nil {
return nil, err
}
return conn, nil return conn, nil
} }
@ -525,25 +528,21 @@ func (conn *Conn) updateStatus(peerState State, remoteRosenpassPubKey []byte, re
return return
} }
func (conn *Conn) doHandshake() (*OfferAnswer, error) { func (conn *Conn) doHandshake() error {
if !conn.signaler.Ready() { if !conn.signaler.Ready() {
return nil, ErrSignalIsNotReady return ErrSignalIsNotReady
} }
var ( var (
ha HandshakeArgs ha HandshakeArgs
err error err error
) )
ha.IceUFrag, ha.IcePwd, err = conn.workerICE.GetLocalUserCredentials() ha.IceUFrag, ha.IcePwd = conn.workerICE.GetLocalUserCredentials()
if err != nil {
conn.log.Errorf("failed to get local user credentials: %v", err)
}
addr, err := conn.workerRelay.RelayAddress() addr, err := conn.workerRelay.RelayAddress()
if err == nil { if err == nil {
ha.RelayAddr = addr.String() ha.RelayAddr = addr.String()
} }
return conn.handshaker.Handshake(ha) return conn.handshaker.SendOffer(ha)
} }
func (conn *Conn) evalStatus() ConnStatus { func (conn *Conn) evalStatus() ConnStatus {
@ -558,6 +557,12 @@ func (conn *Conn) evalStatus() ConnStatus {
return StatusDisconnected return StatusDisconnected
} }
func (conn *Conn) onNewOffer(answer *OfferAnswer) {
// todo move to this callback into handshaker
go conn.workerRelay.OnNewOffer(answer)
go conn.workerICE.OnNewOffer(answer)
}
func isRosenpassEnabled(remoteRosenpassPubKey []byte) bool { func isRosenpassEnabled(remoteRosenpassPubKey []byte) bool {
return remoteRosenpassPubKey != nil return remoteRosenpassPubKey != nil
} }

View File

@ -54,12 +54,27 @@ type HandshakeArgs struct {
RelayAddr string RelayAddr string
} }
func (a HandshakeArgs) Equal(args HandshakeArgs) bool {
if a.IceUFrag != args.IceUFrag {
return false
}
if a.IcePwd != args.IcePwd {
return false
}
if a.RelayAddr != args.RelayAddr {
return false
}
return true
}
type Handshaker struct { type Handshaker struct {
mu sync.Mutex mu sync.Mutex
ctx context.Context ctx context.Context
log *log.Entry log *log.Entry
config ConnConfig config ConnConfig
signaler *Signaler signaler *Signaler
onNewOfferListener func(*OfferAnswer)
// remoteOffersCh is a channel used to wait for remote credentials to proceed with the connection // remoteOffersCh is a channel used to wait for remote credentials to proceed with the connection
remoteOffersCh chan OfferAnswer remoteOffersCh chan OfferAnswer
@ -69,10 +84,11 @@ type Handshaker struct {
remoteOfferAnswer *OfferAnswer remoteOfferAnswer *OfferAnswer
remoteOfferAnswerCreated time.Time remoteOfferAnswerCreated time.Time
handshakeArgs HandshakeArgs lastSentOffer time.Time
lastOfferArgs HandshakeArgs
} }
func NewHandshaker(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler) *Handshaker { func NewHandshaker(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, onNewOfferListener func(*OfferAnswer)) *Handshaker {
return &Handshaker{ return &Handshaker{
ctx: ctx, ctx: ctx,
log: log, log: log,
@ -80,39 +96,42 @@ func NewHandshaker(ctx context.Context, log *log.Entry, config ConnConfig, signa
signaler: signaler, signaler: signaler,
remoteOffersCh: make(chan OfferAnswer), remoteOffersCh: make(chan OfferAnswer),
remoteAnswerCh: make(chan OfferAnswer), remoteAnswerCh: make(chan OfferAnswer),
onNewOfferListener: onNewOfferListener,
} }
} }
func (h *Handshaker) Handshake(args HandshakeArgs) (*OfferAnswer, error) { func (h *Handshaker) Listen() {
for {
remoteOfferAnswer, err := h.waitForRemoteOfferConfirmation()
if err != nil {
if _, ok := err.(*ConnectionClosedError); ok {
return
}
log.Errorf("failed to received remote offer confirmation: %s", err)
continue
}
h.log.Debugf("received connection confirmation, running version %s and with remote WireGuard listen port %d", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort)
go h.onNewOfferListener(remoteOfferAnswer)
}
}
func (h *Handshaker) SendOffer(args HandshakeArgs) error {
h.mu.Lock() h.mu.Lock()
defer h.mu.Unlock() defer h.mu.Unlock()
h.log.Infof("start handshake with remote peer") if h.lastOfferArgs.Equal(args) && h.lastSentOffer.After(time.Now().Add(-time.Second)) {
h.handshakeArgs = args return nil
cachedOfferAnswer, ok := h.cachedHandshake()
if ok {
return cachedOfferAnswer, nil
} }
err := h.sendOffer(args) err := h.sendOffer(args)
if err != nil { if err != nil {
return nil, err return err
} }
// Only continue once we got a connection confirmation from the remote peer. h.lastOfferArgs = args
// The connection timeout could have happened before a confirmation received from the remote. h.lastSentOffer = time.Now()
// The connection could have also been closed externally (e.g. when we received an update from the management that peer shouldn't be connected) return nil
remoteOfferAnswer, err := h.waitForRemoteOfferConfirmation()
if err != nil {
return nil, err
}
h.storeRemoteOfferAnswer(remoteOfferAnswer)
h.log.Debugf("received connection confirmation, running version %s and with remote WireGuard listen port %d",
remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort)
return remoteOfferAnswer, nil
} }
// OnRemoteOffer handles an offer from the remote peer and returns true if the message was accepted, false otherwise // OnRemoteOffer handles an offer from the remote peer and returns true if the message was accepted, false otherwise
@ -149,6 +168,23 @@ func (h *Handshaker) OnRemoteAnswer(answer OfferAnswer) bool {
} }
} }
func (h *Handshaker) waitForRemoteOfferConfirmation() (*OfferAnswer, error) {
select {
case remoteOfferAnswer := <-h.remoteOffersCh:
// received confirmation from the remote peer -> ready to proceed
err := h.sendAnswer()
if err != nil {
return nil, err
}
return &remoteOfferAnswer, nil
case remoteOfferAnswer := <-h.remoteAnswerCh:
return &remoteOfferAnswer, nil
case <-h.ctx.Done():
// closed externally
return nil, NewConnectionClosedError(h.config.Key)
}
}
// sendOffer prepares local user credentials and signals them to the remote peer // sendOffer prepares local user credentials and signals them to the remote peer
func (h *Handshaker) sendOffer(args HandshakeArgs) error { func (h *Handshaker) sendOffer(args HandshakeArgs) error {
offer := OfferAnswer{ offer := OfferAnswer{
@ -166,12 +202,12 @@ func (h *Handshaker) sendOffer(args HandshakeArgs) error {
func (h *Handshaker) sendAnswer() error { func (h *Handshaker) sendAnswer() error {
h.log.Debugf("sending answer") h.log.Debugf("sending answer")
answer := OfferAnswer{ answer := OfferAnswer{
IceCredentials: IceCredentials{h.handshakeArgs.IceUFrag, h.handshakeArgs.IcePwd}, IceCredentials: IceCredentials{h.lastOfferArgs.IceUFrag, h.lastOfferArgs.IcePwd},
WgListenPort: h.config.LocalWgPort, WgListenPort: h.config.LocalWgPort,
Version: version.NetbirdVersion(), Version: version.NetbirdVersion(),
RosenpassPubKey: h.config.RosenpassPubKey, RosenpassPubKey: h.config.RosenpassPubKey,
RosenpassAddr: h.config.RosenpassAddr, RosenpassAddr: h.config.RosenpassAddr,
RelaySrvAddress: h.handshakeArgs.RelayAddr, RelaySrvAddress: h.lastOfferArgs.RelayAddr,
} }
err := h.signaler.SignalAnswer(answer, h.config.Key) err := h.signaler.SignalAnswer(answer, h.config.Key)
if err != nil { if err != nil {
@ -180,43 +216,3 @@ func (h *Handshaker) sendAnswer() error {
return nil return nil
} }
func (h *Handshaker) waitForRemoteOfferConfirmation() (*OfferAnswer, error) {
timeout := time.NewTimer(h.config.Timeout)
defer timeout.Stop()
select {
case remoteOfferAnswer := <-h.remoteOffersCh:
// received confirmation from the remote peer -> ready to proceed
err := h.sendAnswer()
if err != nil {
return nil, err
}
return &remoteOfferAnswer, nil
case remoteOfferAnswer := <-h.remoteAnswerCh:
return &remoteOfferAnswer, nil
case <-timeout.C:
h.log.Debugf("handshake timeout")
return nil, NewConnectionTimeoutError(h.config.Key, h.config.Timeout)
case <-h.ctx.Done():
// closed externally
return nil, NewConnectionClosedError(h.config.Key)
}
}
func (h *Handshaker) storeRemoteOfferAnswer(answer *OfferAnswer) {
h.remoteOfferAnswer = answer
h.remoteOfferAnswerCreated = time.Now()
}
func (h *Handshaker) cachedHandshake() (*OfferAnswer, bool) {
if h.remoteOfferAnswer == nil {
return nil, false
}
if time.Since(h.remoteOfferAnswerCreated) > handshakeCacheTimeout {
return nil, false
}
return h.remoteOfferAnswer, true
}

View File

@ -2,7 +2,6 @@ package peer
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"math/rand" "math/rand"
"net" "net"
@ -69,7 +68,7 @@ type ICEConnInfo struct {
type WorkerICECallbacks struct { type WorkerICECallbacks struct {
OnConnReady func(ConnPriority, ICEConnInfo) OnConnReady func(ConnPriority, ICEConnInfo)
OnStatusChanged func(ConnStatus) OnStatusChanged func(ConnStatus)
DoHandshake func() (*OfferAnswer, error) DoHandshake func() error
} }
type WorkerICE struct { type WorkerICE struct {
@ -90,12 +89,19 @@ type WorkerICE struct {
StunTurn []*stun.URI StunTurn []*stun.URI
sentExtraSrflx bool sentExtraSrflx bool
localUfrag string localUfrag string
localPwd string localPwd string
creadantialHasUsed bool
hasRelayOnLocally bool
onDisconnected context.CancelFunc
onOfferReceived context.CancelFunc
tickerCancel context.CancelFunc
ticker *time.Ticker
} }
func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, configICE ICEConfig, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, callBacks WorkerICECallbacks) *WorkerICE { func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, configICE ICEConfig, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, callBacks WorkerICECallbacks) (*WorkerICE, error) {
cice := &WorkerICE{ w := &WorkerICE{
ctx: ctx, ctx: ctx,
log: log, log: log,
config: config, config: config,
@ -105,34 +111,74 @@ func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, config
statusRecorder: statusRecorder, statusRecorder: statusRecorder,
conn: callBacks, conn: callBacks,
} }
return cice
localUfrag, localPwd, err := generateICECredentials()
if err != nil {
return nil, err
}
w.localUfrag = localUfrag
w.localPwd = localPwd
return w, nil
} }
// SetupICEConnection sets up an ICE connection with the remote peer.
// If the relay mode is supported then try to connect in p2p way only.
// It is trying to reconnection in a loop until the context is canceled.
// In case of success connection it will call the onICEConnReady callback.
func (w *WorkerICE) SetupICEConnection(hasRelayOnLocally bool) { func (w *WorkerICE) SetupICEConnection(hasRelayOnLocally bool) {
time.Sleep(20 * time.Second) w.muxAgent.Lock()
for { defer w.muxAgent.Unlock()
if !w.waitForReconnectTry() { if w.agent != nil {
return return
} }
w.hasRelayOnLocally = hasRelayOnLocally
go w.sendOffer()
}
func (w *WorkerICE) sendOffer() {
w.ticker = time.NewTicker(w.config.Timeout)
defer w.ticker.Stop()
tickerCtx, tickerCancel := context.WithCancel(w.ctx)
w.tickerCancel = tickerCancel
w.conn.OnStatusChanged(StatusConnecting) w.conn.OnStatusChanged(StatusConnecting)
w.log.Debugf("trying to establish ICE connection with peer %s", w.config.Key) w.log.Debugf("ICE trigger a new handshake")
err := w.conn.DoHandshake()
remoteOfferAnswer, err := w.conn.DoHandshake()
if err != nil { if err != nil {
if errors.Is(err, ErrSignalIsNotReady) { w.log.Errorf("%s", err)
w.log.Infof("signal client isn't ready, skipping connection attempt")
} }
continue
for {
w.log.Debugf("ICE trigger new reconnect handshake")
select {
case <-w.ticker.C:
err := w.conn.DoHandshake()
if err != nil {
w.log.Errorf("%s", err)
} }
case <-tickerCtx.Done():
w.log.Debugf("left reconnect loop")
return
case <-w.ctx.Done():
return
}
}
}
func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
log.Debugf("OnNewOffer for ICE")
w.muxAgent.Lock()
if w.agent != nil {
log.Debugf("agent already exists, skipping the offer")
w.muxAgent.Unlock()
return
}
// cancel reconnection loop
w.log.Debugf("canceling reconnection loop")
w.tickerCancel()
var preferredCandidateTypes []ice.CandidateType var preferredCandidateTypes []ice.CandidateType
if hasRelayOnLocally && remoteOfferAnswer.RelaySrvAddress != "" { if w.hasRelayOnLocally && remoteOfferAnswer.RelaySrvAddress != "" {
w.selectedPriority = connPriorityICEP2P w.selectedPriority = connPriorityICEP2P
preferredCandidateTypes = candidateTypesP2P() preferredCandidateTypes = candidateTypesP2P()
} else { } else {
@ -140,44 +186,37 @@ func (w *WorkerICE) SetupICEConnection(hasRelayOnLocally bool) {
preferredCandidateTypes = candidateTypes() preferredCandidateTypes = candidateTypes()
} }
ctx, ctxCancel := context.WithCancel(w.ctx) w.log.Debugf("recreate agent")
w.muxAgent.Lock() agentCtx, agentCancel := context.WithCancel(w.ctx)
agent, err := w.reCreateAgent(ctxCancel, preferredCandidateTypes) agent, err := w.reCreateAgent(agentCancel, preferredCandidateTypes)
if err != nil { if err != nil {
ctxCancel() w.log.Errorf("failed to recreate ICE Agent: %s", err)
w.muxAgent.Unlock() return
continue
} }
w.agent = agent w.agent = agent
// generate credentials for the next loop. Important the credentials are generated before handshake, because
// the handshake could provide a cached offer-answer
w.localUfrag, w.localPwd, err = generateICECredentials()
if err != nil {
ctxCancel()
w.muxAgent.Unlock()
continue
}
w.muxAgent.Unlock() w.muxAgent.Unlock()
w.log.Debugf("gather candidates")
err = w.agent.GatherCandidates() err = w.agent.GatherCandidates()
if err != nil { if err != nil {
ctxCancel() w.log.Debugf("failed to gather candidates: %s", err)
continue return
} }
// will block until connection succeeded // will block until connection succeeded
// but it won't release if ICE Agent went into Disconnected or Failed state, // but it won't release if ICE Agent went into Disconnected or Failed state,
// so we have to cancel it with the provided context once agent detected a broken connection // so we have to cancel it with the provided context once agent detected a broken connection
remoteConn, err := w.turnAgentDial(remoteOfferAnswer) w.log.Debugf("turnAgentDial")
remoteConn, err := w.turnAgentDial(agentCtx, remoteOfferAnswer)
if err != nil { if err != nil {
ctxCancel() w.log.Debugf("failed to dial the remote peer: %s", err)
continue return
} }
w.log.Debugf("GetSelectedCandidatePair")
pair, err := w.agent.GetSelectedCandidatePair() pair, err := w.agent.GetSelectedCandidatePair()
if err != nil { if err != nil {
ctxCancel() return
continue
} }
if !isRelayCandidate(pair.Local) { if !isRelayCandidate(pair.Local) {
@ -203,13 +242,8 @@ func (w *WorkerICE) SetupICEConnection(hasRelayOnLocally bool) {
Relayed: isRelayed(pair), Relayed: isRelayed(pair),
RelayedOnLocal: isRelayCandidate(pair.Local), RelayedOnLocal: isRelayCandidate(pair.Local),
} }
w.log.Debugf("on conn ready")
go w.conn.OnConnReady(w.selectedPriority, ci) go w.conn.OnConnReady(w.selectedPriority, ci)
<-ctx.Done()
ctxCancel()
_ = w.agent.Close()
w.conn.OnStatusChanged(StatusDisconnected)
}
} }
// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer. // OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer.
@ -233,19 +267,14 @@ func (w *WorkerICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HA
} }
} }
func (w *WorkerICE) GetLocalUserCredentials() (frag string, pwd string, err error) { func (w *WorkerICE) GetLocalUserCredentials() (frag string, pwd string) {
w.muxAgent.Lock() w.muxAgent.Lock()
defer w.muxAgent.Unlock() defer w.muxAgent.Unlock()
return w.localUfrag, w.localPwd
if w.localUfrag != "" && w.localPwd != "" {
return w.localUfrag, w.localPwd, nil
} }
w.localUfrag, w.localPwd, err = generateICECredentials() func (w *WorkerICE) reCreateAgent(agentCancel context.CancelFunc, relaySupport []ice.CandidateType) (*ice.Agent, error) {
return w.localUfrag, w.localPwd, err log.Debugf("--RECREATE AGENT-----")
}
func (w *WorkerICE) reCreateAgent(ctxCancel context.CancelFunc, relaySupport []ice.CandidateType) (*ice.Agent, error) {
transportNet, err := w.newStdNet() transportNet, err := w.newStdNet()
if err != nil { if err != nil {
w.log.Errorf("failed to create pion's stdnet: %s", err) w.log.Errorf("failed to create pion's stdnet: %s", err)
@ -258,7 +287,7 @@ func (w *WorkerICE) reCreateAgent(ctxCancel context.CancelFunc, relaySupport []i
agentConfig := &ice.AgentConfig{ agentConfig := &ice.AgentConfig{
MulticastDNSMode: ice.MulticastDNSModeDisabled, MulticastDNSMode: ice.MulticastDNSModeDisabled,
NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4, ice.NetworkTypeUDP6}, NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4, ice.NetworkTypeUDP6},
Urls: w.configICE.StunTurn.Load().([]*stun.URI), //Urls: w.configICE.StunTurn.Load().([]*stun.URI),
CandidateTypes: relaySupport, CandidateTypes: relaySupport,
InterfaceFilter: stdnet.InterfaceFilter(w.configICE.InterfaceBlackList), InterfaceFilter: stdnet.InterfaceFilter(w.configICE.InterfaceBlackList),
UDPMux: w.configICE.UDPMux, UDPMux: w.configICE.UDPMux,
@ -291,7 +320,23 @@ func (w *WorkerICE) reCreateAgent(ctxCancel context.CancelFunc, relaySupport []i
err = agent.OnConnectionStateChange(func(state ice.ConnectionState) { err = agent.OnConnectionStateChange(func(state ice.ConnectionState) {
w.log.Debugf("ICE ConnectionState has changed to %s", state.String()) w.log.Debugf("ICE ConnectionState has changed to %s", state.String())
if state == ice.ConnectionStateFailed || state == ice.ConnectionStateDisconnected { if state == ice.ConnectionStateFailed || state == ice.ConnectionStateDisconnected {
ctxCancel() w.conn.OnStatusChanged(StatusDisconnected)
w.muxAgent.Lock()
agentCancel()
_ = agent.Close()
w.agent = nil
// generate credentials for the next agent creation loop
localUfrag, localPwd, err := generateICECredentials()
if err != nil {
log.Errorf("failed to generate new ICE credentials: %s", err)
}
w.localUfrag = localUfrag
w.localPwd = localPwd
w.muxAgent.Unlock()
go w.sendOffer()
} }
}) })
if err != nil { if err != nil {
@ -387,12 +432,12 @@ func (w *WorkerICE) shouldSendExtraSrflxCandidate(candidate ice.Candidate) bool
return false return false
} }
func (w *WorkerICE) turnAgentDial(remoteOfferAnswer *OfferAnswer) (*ice.Conn, error) { func (w *WorkerICE) turnAgentDial(ctx context.Context, remoteOfferAnswer *OfferAnswer) (*ice.Conn, error) {
isControlling := w.config.LocalKey > w.config.Key isControlling := w.config.LocalKey > w.config.Key
if isControlling { if isControlling {
return w.agent.Dial(w.ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd) return w.agent.Dial(ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd)
} else { } else {
return w.agent.Accept(w.ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd) return w.agent.Accept(ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd)
} }
} }
@ -465,7 +510,7 @@ func candidateTypes() []ice.CandidateType {
} }
func candidateTypesP2P() []ice.CandidateType { func candidateTypesP2P() []ice.CandidateType {
return []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive} return []ice.CandidateType{ice.CandidateTypeHost}
} }
func isRelayCandidate(candidate ice.Candidate) bool { func isRelayCandidate(candidate ice.Candidate) bool {
@ -480,6 +525,7 @@ func isRelayed(pair *ice.CandidatePair) bool {
} }
func generateICECredentials() (string, string, error) { func generateICECredentials() (string, string, error) {
log.Debugf("-----GENERATE CREDENTIALS------")
ufrag, err := randutil.GenerateCryptoRandomString(lenUFrag, runesAlpha) ufrag, err := randutil.GenerateCryptoRandomString(lenUFrag, runesAlpha)
if err != nil { if err != nil {
return "", "", err return "", "", err

View File

@ -2,10 +2,7 @@ package peer
import ( import (
"context" "context"
"errors"
"math/rand"
"net" "net"
"time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -21,7 +18,6 @@ type RelayConnInfo struct {
type WorkerRelayCallbacks struct { type WorkerRelayCallbacks struct {
OnConnReady func(RelayConnInfo) OnConnReady func(RelayConnInfo)
OnStatusChanged func(ConnStatus) OnStatusChanged func(ConnStatus)
DoHandshake func() (*OfferAnswer, error)
} }
type WorkerRelay struct { type WorkerRelay struct {
@ -44,42 +40,30 @@ func NewWorkerRelay(ctx context.Context, log *log.Entry, relayManager *relayClie
// SetupRelayConnection todo: this function is not completed. Make no sense to put it in a for loop because we are not waiting for any event // SetupRelayConnection todo: this function is not completed. Make no sense to put it in a for loop because we are not waiting for any event
func (w *WorkerRelay) SetupRelayConnection() { func (w *WorkerRelay) SetupRelayConnection() {
for {
if !w.waitForReconnectTry() {
return
}
w.log.Debugf("trying to establish Relay connection with peer %s", w.config.Key)
remoteOfferAnswer, err := w.conn.DoHandshake()
if err != nil {
if errors.Is(err, ErrSignalIsNotReady) {
w.log.Infof("signal client isn't ready, skipping connection attempt")
}
w.log.Errorf("%s", err)
continue
} }
func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
if !w.isRelaySupported(remoteOfferAnswer) { if !w.isRelaySupported(remoteOfferAnswer) {
w.log.Infof("Relay is not supported by remote peer") w.log.Infof("Relay is not supported by remote peer")
// todo should we retry? // todo should we retry?
// if the remote peer doesn't support relay make no sense to retry infinity // if the remote peer doesn't support relay make no sense to retry infinity
// but if the remote peer supports relay just the connection is lost we should retry // but if the remote peer supports relay just the connection is lost we should retry
continue return
} }
// the relayManager will return with error in case if the connection has lost with relay server // the relayManager will return with error in case if the connection has lost with relay server
currentRelayAddress, err := w.relayManager.RelayAddress() currentRelayAddress, err := w.relayManager.RelayAddress()
if err != nil { if err != nil {
w.log.Infof("local Relay connection is lost, skipping connection attempt") w.log.Infof("local Relay connection is lost, skipping connection attempt")
continue return
} }
srv := w.preferredRelayServer(currentRelayAddress.String(), remoteOfferAnswer.RelaySrvAddress) srv := w.preferredRelayServer(currentRelayAddress.String(), remoteOfferAnswer.RelaySrvAddress)
relayedConn, err := w.relayManager.OpenConn(srv, w.config.Key) relayedConn, err := w.relayManager.OpenConn(srv, w.config.Key)
if err != nil { if err != nil {
w.log.Infof("failed to open relay connection: %s", err) w.log.Infof("do not need to reopen relay connection: %s", err)
continue return
} }
go w.conn.OnConnReady(RelayConnInfo{ go w.conn.OnConnReady(RelayConnInfo{
@ -87,9 +71,6 @@ func (w *WorkerRelay) SetupRelayConnection() {
rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey, rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey,
rosenpassAddr: remoteOfferAnswer.RosenpassAddr, rosenpassAddr: remoteOfferAnswer.RosenpassAddr,
}) })
<-w.ctx.Done()
}
} }
func (w *WorkerRelay) RelayAddress() (net.Addr, error) { func (w *WorkerRelay) RelayAddress() (net.Addr, error) {
@ -113,20 +94,3 @@ func (w *WorkerRelay) preferredRelayServer(myRelayAddress, remoteRelayAddress st
func (w *WorkerRelay) RelayIsSupportedLocally() bool { func (w *WorkerRelay) RelayIsSupportedLocally() bool {
return w.relayManager.HasRelayAddress() return w.relayManager.HasRelayAddress()
} }
// waitForReconnectTry waits for a random duration before trying to reconnect
func (w *WorkerRelay) waitForReconnectTry() bool {
minWait := 500
maxWait := 2000
duration := time.Duration(rand.Intn(maxWait-minWait)+minWait) * time.Millisecond
timeout := time.NewTimer(duration)
defer timeout.Stop()
select {
case <-w.ctx.Done():
return false
case <-timeout.C:
return true
}
}