From 3d35d6fe0928f0bb73a4765965b79bbb909ccd27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Mon, 4 Nov 2024 17:01:22 +0100 Subject: [PATCH] Wait to finish onConnReady callback --- client/internal/peer/conn.go | 52 ++++++++++------------- client/internal/peer/worker_ice.go | 63 +++++++++++++++++++++------- client/internal/peer/worker_relay.go | 48 ++++++++++++++++++--- 3 files changed, 112 insertions(+), 51 deletions(-) diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index 84a8c221f..964870614 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -65,14 +65,6 @@ type ConnConfig struct { ICEConfig icemaker.Config } -type WorkerCallbacks struct { - OnRelayReadyCallback func(info RelayConnInfo) - OnRelayStatusChanged func(ConnStatus) - - OnICEConnReadyCallback func(ConnPriority, ICEConnInfo) - OnICEStatusChanged func(ConnStatus) -} - type Conn struct { log *log.Entry mu sync.Mutex @@ -134,24 +126,18 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu statusICE: NewAtomicConnStatus(), } - rFns := WorkerRelayCallbacks{ - OnConnReady: conn.relayConnectionIsReady, - OnDisconnected: conn.onWorkerRelayStateDisconnected, - } - - wFns := WorkerICECallbacks{ - OnConnReady: conn.iCEConnectionIsReady, - OnStatusChanged: conn.onWorkerICEStateDisconnected, - } - ctrl := isController(config) - conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, relayManager, rFns) + conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, relayManager) + conn.workerRelay.SetOnConnReady(conn.relayConnectionIsReady) + conn.workerRelay.SetOnDisconnected(conn.onWorkerRelayStateDisconnected) relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally() - conn.workerICE, err = NewWorkerICE(ctx, connLog, config, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally, wFns) + conn.workerICE, err = NewWorkerICE(ctx, connLog, config, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally) if err != nil { return nil, err } + conn.workerICE.SetOnConnReady(conn.iCEConnectionIsReady) + conn.workerICE.SetOnDisconnected(conn.onWorkerICEStateDisconnected) conn.handshaker = NewHandshaker(ctx, connLog, config, signaler, conn.workerICE, conn.workerRelay) @@ -301,7 +287,7 @@ func (conn *Conn) GetKey() string { } // configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected -func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICEConnInfo) { +func (conn *Conn) iCEConnectionIsReady(iceConnInfo ICEConnInfo) { conn.mu.Lock() defer conn.mu.Unlock() @@ -309,14 +295,18 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon return } - if remoteConnNil(conn.log, iceConnInfo.RemoteConn) { - conn.log.Errorf("remote ICE connection is nil") - return - } + /* + // temporarily disabled the check + if remoteConnNil(conn.log, iceConnInfo.RemoteConn) { + conn.log.Errorf("remote ICE connection is nil") + return + } + + */ conn.log.Debugf("ICE connection is ready") - if conn.currentConnPriority > priority { + if conn.currentConnPriority > iceConnInfo.Priority { conn.statusICE.Set(StatusConnected) conn.updateIceState(iceConnInfo) return @@ -366,14 +356,14 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon return } wgConfigWorkaround() - conn.currentConnPriority = priority + conn.currentConnPriority = iceConnInfo.Priority conn.statusICE.Set(StatusConnected) conn.updateIceState(iceConnInfo) conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr) } // todo review to make sense to handle connecting and disconnected status also? -func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) { +func (conn *Conn) onWorkerICEStateDisconnected() { conn.mu.Lock() defer conn.mu.Unlock() @@ -381,7 +371,7 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) { return } - conn.log.Tracef("ICE connection state changed to %s", newState) + conn.log.Tracef("ICE connection state changed to disconnected") if conn.wgProxyICE != nil { if err := conn.wgProxyICE.CloseConn(); err != nil { @@ -401,8 +391,8 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) { conn.currentConnPriority = connPriorityRelay } - changed := conn.statusICE.Get() != newState && newState != StatusConnecting - conn.statusICE.Set(newState) + changed := conn.statusICE.Get() != StatusDisconnected + conn.statusICE.Set(StatusDisconnected) conn.guard.SetICEConnDisconnected(changed) diff --git a/client/internal/peer/worker_ice.go b/client/internal/peer/worker_ice.go index 55894218d..22d87abd3 100644 --- a/client/internal/peer/worker_ice.go +++ b/client/internal/peer/worker_ice.go @@ -29,11 +29,7 @@ type ICEConnInfo struct { LocalIceCandidateEndpoint string Relayed bool RelayedOnLocal bool -} - -type WorkerICECallbacks struct { - OnConnReady func(ConnPriority, ICEConnInfo) - OnStatusChanged func(ConnStatus) + Priority ConnPriority } type WorkerICE struct { @@ -44,9 +40,10 @@ type WorkerICE struct { iFaceDiscover stdnet.ExternalIFaceDiscover statusRecorder *Status hasRelayOnLocally bool - conn WorkerICECallbacks - selectedPriority ConnPriority + onConnReady func(ICEConnInfo) + onDisconnected func() + callBackMu sync.Mutex agent *ice.Agent muxAgent sync.Mutex @@ -59,7 +56,7 @@ type WorkerICE struct { localPwd string } -func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, hasRelayOnLocally bool, callBacks WorkerICECallbacks) (*WorkerICE, error) { +func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, hasRelayOnLocally bool) (*WorkerICE, error) { w := &WorkerICE{ ctx: ctx, log: log, @@ -68,7 +65,6 @@ func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, signal iFaceDiscover: ifaceDiscover, statusRecorder: statusRecorder, hasRelayOnLocally: hasRelayOnLocally, - conn: callBacks, } localUfrag, localPwd, err := icemaker.GenerateICECredentials() @@ -90,12 +86,16 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { return } - var preferredCandidateTypes []ice.CandidateType + var ( + preferredCandidateTypes []ice.CandidateType + selectedPriority ConnPriority + ) + if w.hasRelayOnLocally && remoteOfferAnswer.RelaySrvAddress != "" { - w.selectedPriority = connPriorityICEP2P + selectedPriority = connPriorityICEP2P preferredCandidateTypes = icemaker.CandidateTypesP2P() } else { - w.selectedPriority = connPriorityICETurn + selectedPriority = connPriorityICETurn preferredCandidateTypes = icemaker.CandidateTypes() } @@ -154,9 +154,10 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { RemoteIceCandidateEndpoint: fmt.Sprintf("%s:%d", pair.Remote.Address(), pair.Remote.Port()), Relayed: isRelayed(pair), RelayedOnLocal: isRelayCandidate(pair.Local), + Priority: selectedPriority, } w.log.Debugf("on ICE conn read to use ready") - go w.conn.OnConnReady(w.selectedPriority, ci) + w.notifyOnReady(ci) } // OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer. @@ -180,6 +181,20 @@ func (w *WorkerICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HA } } +func (w *WorkerICE) SetOnConnReady(f func(ICEConnInfo)) { + w.callBackMu.Lock() + defer w.callBackMu.Unlock() + + w.onConnReady = f +} + +func (w *WorkerICE) SetOnDisconnected(f func()) { + w.callBackMu.Lock() + defer w.callBackMu.Unlock() + + w.onDisconnected = f +} + func (w *WorkerICE) GetLocalUserCredentials() (frag string, pwd string) { w.muxAgent.Lock() defer w.muxAgent.Unlock() @@ -216,7 +231,7 @@ func (w *WorkerICE) reCreateAgent(agentCancel context.CancelFunc, candidates []i err = agent.OnConnectionStateChange(func(state ice.ConnectionState) { w.log.Debugf("ICE ConnectionState has changed to %s", state.String()) if state == ice.ConnectionStateFailed || state == ice.ConnectionStateDisconnected { - w.conn.OnStatusChanged(StatusDisconnected) + w.notifyDisconnected() w.muxAgent.Lock() agentCancel() @@ -328,6 +343,26 @@ func (w *WorkerICE) turnAgentDial(ctx context.Context, remoteOfferAnswer *OfferA } } +func (w *WorkerICE) notifyDisconnected() { + w.callBackMu.Lock() + defer w.callBackMu.Unlock() + + if w.onDisconnected == nil { + return + } + w.onDisconnected() +} + +func (w *WorkerICE) notifyOnReady(ci ICEConnInfo) { + w.callBackMu.Lock() + defer w.callBackMu.Unlock() + + if w.onConnReady == nil { + return + } + w.onConnReady(ci) +} + func extraSrflxCandidate(candidate ice.Candidate) (*ice.CandidateServerReflexive, error) { relatedAdd := candidate.RelatedAddress() return ice.NewCandidateServerReflexive(&ice.CandidateServerReflexiveConfig{ diff --git a/client/internal/peer/worker_relay.go b/client/internal/peer/worker_relay.go index c22dcdeda..708534940 100644 --- a/client/internal/peer/worker_relay.go +++ b/client/internal/peer/worker_relay.go @@ -34,7 +34,10 @@ type WorkerRelay struct { isController bool config ConnConfig relayManager relayClient.ManagerService - callBacks WorkerRelayCallbacks + + onConnReady func(info RelayConnInfo) + onDisconnected func() + callBackMu sync.Mutex relayedConn net.Conn relayLock sync.Mutex @@ -45,13 +48,12 @@ type WorkerRelay struct { relaySupportedOnRemotePeer atomic.Bool } -func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, relayManager relayClient.ManagerService, callbacks WorkerRelayCallbacks) *WorkerRelay { +func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, relayManager relayClient.ManagerService) *WorkerRelay { r := &WorkerRelay{ log: log, isController: ctrl, config: config, relayManager: relayManager, - callBacks: callbacks, } return r } @@ -95,7 +97,7 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) { } w.log.Debugf("peer conn opened via Relay: %s", srv) - go w.callBacks.OnConnReady(RelayConnInfo{ + go w.notifyOnReady(RelayConnInfo{ relayedConn: relayedConn, rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey, rosenpassAddr: remoteOfferAnswer.RosenpassAddr, @@ -131,6 +133,20 @@ func (w *WorkerRelay) DisableWgWatcher() { w.ctxCancelWgWatch() } +func (w *WorkerRelay) SetOnConnReady(f func(info RelayConnInfo)) { + w.callBackMu.Lock() + defer w.callBackMu.Unlock() + + w.onConnReady = f +} + +func (w *WorkerRelay) SetOnDisconnected(f func()) { + w.callBackMu.Lock() + defer w.callBackMu.Unlock() + + w.onDisconnected = f +} + func (w *WorkerRelay) RelayInstanceAddress() (string, error) { return w.relayManager.RelayInstanceAddress() } @@ -187,7 +203,7 @@ func (w *WorkerRelay) wgStateCheck(ctx context.Context, ctxCancel context.Cancel w.relayLock.Lock() _ = w.relayedConn.Close() w.relayLock.Unlock() - w.callBacks.OnDisconnected() + go w.notifyDisconnected() return } @@ -232,5 +248,25 @@ func (w *WorkerRelay) onRelayMGDisconnected() { if w.ctxCancelWgWatch != nil { w.ctxCancelWgWatch() } - go w.callBacks.OnDisconnected() + go w.notifyDisconnected() +} + +func (w *WorkerRelay) notifyDisconnected() { + w.callBackMu.Lock() + defer w.callBackMu.Unlock() + + if w.onDisconnected == nil { + return + } + w.onDisconnected() +} + +func (w *WorkerRelay) notifyOnReady(ci RelayConnInfo) { + w.callBackMu.Lock() + defer w.callBackMu.Unlock() + + if w.onConnReady == nil { + return + } + w.onConnReady(ci) }