Wait to finish onConnReady callback

This commit is contained in:
Zoltán Papp 2024-11-04 17:01:22 +01:00
parent a9d06b883f
commit 3d35d6fe09
3 changed files with 112 additions and 51 deletions

View File

@ -65,14 +65,6 @@ type ConnConfig struct {
ICEConfig icemaker.Config ICEConfig icemaker.Config
} }
type WorkerCallbacks struct {
OnRelayReadyCallback func(info RelayConnInfo)
OnRelayStatusChanged func(ConnStatus)
OnICEConnReadyCallback func(ConnPriority, ICEConnInfo)
OnICEStatusChanged func(ConnStatus)
}
type Conn struct { type Conn struct {
log *log.Entry log *log.Entry
mu sync.Mutex mu sync.Mutex
@ -134,24 +126,18 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu
statusICE: NewAtomicConnStatus(), statusICE: NewAtomicConnStatus(),
} }
rFns := WorkerRelayCallbacks{
OnConnReady: conn.relayConnectionIsReady,
OnDisconnected: conn.onWorkerRelayStateDisconnected,
}
wFns := WorkerICECallbacks{
OnConnReady: conn.iCEConnectionIsReady,
OnStatusChanged: conn.onWorkerICEStateDisconnected,
}
ctrl := isController(config) ctrl := isController(config)
conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, relayManager, rFns) conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, relayManager)
conn.workerRelay.SetOnConnReady(conn.relayConnectionIsReady)
conn.workerRelay.SetOnDisconnected(conn.onWorkerRelayStateDisconnected)
relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally() relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
conn.workerICE, err = NewWorkerICE(ctx, connLog, config, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally, wFns) conn.workerICE, err = NewWorkerICE(ctx, connLog, config, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally)
if err != nil { if err != nil {
return nil, err return nil, err
} }
conn.workerICE.SetOnConnReady(conn.iCEConnectionIsReady)
conn.workerICE.SetOnDisconnected(conn.onWorkerICEStateDisconnected)
conn.handshaker = NewHandshaker(ctx, connLog, config, signaler, conn.workerICE, conn.workerRelay) conn.handshaker = NewHandshaker(ctx, connLog, config, signaler, conn.workerICE, conn.workerRelay)
@ -301,7 +287,7 @@ func (conn *Conn) GetKey() string {
} }
// configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected // configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected
func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICEConnInfo) { func (conn *Conn) iCEConnectionIsReady(iceConnInfo ICEConnInfo) {
conn.mu.Lock() conn.mu.Lock()
defer conn.mu.Unlock() defer conn.mu.Unlock()
@ -309,14 +295,18 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon
return return
} }
if remoteConnNil(conn.log, iceConnInfo.RemoteConn) { /*
conn.log.Errorf("remote ICE connection is nil") // temporarily disabled the check
return if remoteConnNil(conn.log, iceConnInfo.RemoteConn) {
} conn.log.Errorf("remote ICE connection is nil")
return
}
*/
conn.log.Debugf("ICE connection is ready") conn.log.Debugf("ICE connection is ready")
if conn.currentConnPriority > priority { if conn.currentConnPriority > iceConnInfo.Priority {
conn.statusICE.Set(StatusConnected) conn.statusICE.Set(StatusConnected)
conn.updateIceState(iceConnInfo) conn.updateIceState(iceConnInfo)
return return
@ -366,14 +356,14 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon
return return
} }
wgConfigWorkaround() wgConfigWorkaround()
conn.currentConnPriority = priority conn.currentConnPriority = iceConnInfo.Priority
conn.statusICE.Set(StatusConnected) conn.statusICE.Set(StatusConnected)
conn.updateIceState(iceConnInfo) conn.updateIceState(iceConnInfo)
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr) conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr)
} }
// todo review to make sense to handle connecting and disconnected status also? // todo review to make sense to handle connecting and disconnected status also?
func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) { func (conn *Conn) onWorkerICEStateDisconnected() {
conn.mu.Lock() conn.mu.Lock()
defer conn.mu.Unlock() defer conn.mu.Unlock()
@ -381,7 +371,7 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) {
return return
} }
conn.log.Tracef("ICE connection state changed to %s", newState) conn.log.Tracef("ICE connection state changed to disconnected")
if conn.wgProxyICE != nil { if conn.wgProxyICE != nil {
if err := conn.wgProxyICE.CloseConn(); err != nil { if err := conn.wgProxyICE.CloseConn(); err != nil {
@ -401,8 +391,8 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) {
conn.currentConnPriority = connPriorityRelay conn.currentConnPriority = connPriorityRelay
} }
changed := conn.statusICE.Get() != newState && newState != StatusConnecting changed := conn.statusICE.Get() != StatusDisconnected
conn.statusICE.Set(newState) conn.statusICE.Set(StatusDisconnected)
conn.guard.SetICEConnDisconnected(changed) conn.guard.SetICEConnDisconnected(changed)

View File

@ -29,11 +29,7 @@ type ICEConnInfo struct {
LocalIceCandidateEndpoint string LocalIceCandidateEndpoint string
Relayed bool Relayed bool
RelayedOnLocal bool RelayedOnLocal bool
} Priority ConnPriority
type WorkerICECallbacks struct {
OnConnReady func(ConnPriority, ICEConnInfo)
OnStatusChanged func(ConnStatus)
} }
type WorkerICE struct { type WorkerICE struct {
@ -44,9 +40,10 @@ type WorkerICE struct {
iFaceDiscover stdnet.ExternalIFaceDiscover iFaceDiscover stdnet.ExternalIFaceDiscover
statusRecorder *Status statusRecorder *Status
hasRelayOnLocally bool hasRelayOnLocally bool
conn WorkerICECallbacks
selectedPriority ConnPriority onConnReady func(ICEConnInfo)
onDisconnected func()
callBackMu sync.Mutex
agent *ice.Agent agent *ice.Agent
muxAgent sync.Mutex muxAgent sync.Mutex
@ -59,7 +56,7 @@ type WorkerICE struct {
localPwd string localPwd string
} }
func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, hasRelayOnLocally bool, callBacks WorkerICECallbacks) (*WorkerICE, error) { func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, hasRelayOnLocally bool) (*WorkerICE, error) {
w := &WorkerICE{ w := &WorkerICE{
ctx: ctx, ctx: ctx,
log: log, log: log,
@ -68,7 +65,6 @@ func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, signal
iFaceDiscover: ifaceDiscover, iFaceDiscover: ifaceDiscover,
statusRecorder: statusRecorder, statusRecorder: statusRecorder,
hasRelayOnLocally: hasRelayOnLocally, hasRelayOnLocally: hasRelayOnLocally,
conn: callBacks,
} }
localUfrag, localPwd, err := icemaker.GenerateICECredentials() localUfrag, localPwd, err := icemaker.GenerateICECredentials()
@ -90,12 +86,16 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
return return
} }
var preferredCandidateTypes []ice.CandidateType var (
preferredCandidateTypes []ice.CandidateType
selectedPriority ConnPriority
)
if w.hasRelayOnLocally && remoteOfferAnswer.RelaySrvAddress != "" { if w.hasRelayOnLocally && remoteOfferAnswer.RelaySrvAddress != "" {
w.selectedPriority = connPriorityICEP2P selectedPriority = connPriorityICEP2P
preferredCandidateTypes = icemaker.CandidateTypesP2P() preferredCandidateTypes = icemaker.CandidateTypesP2P()
} else { } else {
w.selectedPriority = connPriorityICETurn selectedPriority = connPriorityICETurn
preferredCandidateTypes = icemaker.CandidateTypes() preferredCandidateTypes = icemaker.CandidateTypes()
} }
@ -154,9 +154,10 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
RemoteIceCandidateEndpoint: fmt.Sprintf("%s:%d", pair.Remote.Address(), pair.Remote.Port()), RemoteIceCandidateEndpoint: fmt.Sprintf("%s:%d", pair.Remote.Address(), pair.Remote.Port()),
Relayed: isRelayed(pair), Relayed: isRelayed(pair),
RelayedOnLocal: isRelayCandidate(pair.Local), RelayedOnLocal: isRelayCandidate(pair.Local),
Priority: selectedPriority,
} }
w.log.Debugf("on ICE conn read to use ready") w.log.Debugf("on ICE conn read to use ready")
go w.conn.OnConnReady(w.selectedPriority, ci) w.notifyOnReady(ci)
} }
// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer. // OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer.
@ -180,6 +181,20 @@ func (w *WorkerICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HA
} }
} }
func (w *WorkerICE) SetOnConnReady(f func(ICEConnInfo)) {
w.callBackMu.Lock()
defer w.callBackMu.Unlock()
w.onConnReady = f
}
func (w *WorkerICE) SetOnDisconnected(f func()) {
w.callBackMu.Lock()
defer w.callBackMu.Unlock()
w.onDisconnected = f
}
func (w *WorkerICE) GetLocalUserCredentials() (frag string, pwd string) { func (w *WorkerICE) GetLocalUserCredentials() (frag string, pwd string) {
w.muxAgent.Lock() w.muxAgent.Lock()
defer w.muxAgent.Unlock() defer w.muxAgent.Unlock()
@ -216,7 +231,7 @@ func (w *WorkerICE) reCreateAgent(agentCancel context.CancelFunc, candidates []i
err = agent.OnConnectionStateChange(func(state ice.ConnectionState) { err = agent.OnConnectionStateChange(func(state ice.ConnectionState) {
w.log.Debugf("ICE ConnectionState has changed to %s", state.String()) w.log.Debugf("ICE ConnectionState has changed to %s", state.String())
if state == ice.ConnectionStateFailed || state == ice.ConnectionStateDisconnected { if state == ice.ConnectionStateFailed || state == ice.ConnectionStateDisconnected {
w.conn.OnStatusChanged(StatusDisconnected) w.notifyDisconnected()
w.muxAgent.Lock() w.muxAgent.Lock()
agentCancel() agentCancel()
@ -328,6 +343,26 @@ func (w *WorkerICE) turnAgentDial(ctx context.Context, remoteOfferAnswer *OfferA
} }
} }
func (w *WorkerICE) notifyDisconnected() {
w.callBackMu.Lock()
defer w.callBackMu.Unlock()
if w.onDisconnected == nil {
return
}
w.onDisconnected()
}
func (w *WorkerICE) notifyOnReady(ci ICEConnInfo) {
w.callBackMu.Lock()
defer w.callBackMu.Unlock()
if w.onConnReady == nil {
return
}
w.onConnReady(ci)
}
func extraSrflxCandidate(candidate ice.Candidate) (*ice.CandidateServerReflexive, error) { func extraSrflxCandidate(candidate ice.Candidate) (*ice.CandidateServerReflexive, error) {
relatedAdd := candidate.RelatedAddress() relatedAdd := candidate.RelatedAddress()
return ice.NewCandidateServerReflexive(&ice.CandidateServerReflexiveConfig{ return ice.NewCandidateServerReflexive(&ice.CandidateServerReflexiveConfig{

View File

@ -34,7 +34,10 @@ type WorkerRelay struct {
isController bool isController bool
config ConnConfig config ConnConfig
relayManager relayClient.ManagerService relayManager relayClient.ManagerService
callBacks WorkerRelayCallbacks
onConnReady func(info RelayConnInfo)
onDisconnected func()
callBackMu sync.Mutex
relayedConn net.Conn relayedConn net.Conn
relayLock sync.Mutex relayLock sync.Mutex
@ -45,13 +48,12 @@ type WorkerRelay struct {
relaySupportedOnRemotePeer atomic.Bool relaySupportedOnRemotePeer atomic.Bool
} }
func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, relayManager relayClient.ManagerService, callbacks WorkerRelayCallbacks) *WorkerRelay { func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, relayManager relayClient.ManagerService) *WorkerRelay {
r := &WorkerRelay{ r := &WorkerRelay{
log: log, log: log,
isController: ctrl, isController: ctrl,
config: config, config: config,
relayManager: relayManager, relayManager: relayManager,
callBacks: callbacks,
} }
return r return r
} }
@ -95,7 +97,7 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
} }
w.log.Debugf("peer conn opened via Relay: %s", srv) w.log.Debugf("peer conn opened via Relay: %s", srv)
go w.callBacks.OnConnReady(RelayConnInfo{ go w.notifyOnReady(RelayConnInfo{
relayedConn: relayedConn, relayedConn: relayedConn,
rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey, rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey,
rosenpassAddr: remoteOfferAnswer.RosenpassAddr, rosenpassAddr: remoteOfferAnswer.RosenpassAddr,
@ -131,6 +133,20 @@ func (w *WorkerRelay) DisableWgWatcher() {
w.ctxCancelWgWatch() w.ctxCancelWgWatch()
} }
func (w *WorkerRelay) SetOnConnReady(f func(info RelayConnInfo)) {
w.callBackMu.Lock()
defer w.callBackMu.Unlock()
w.onConnReady = f
}
func (w *WorkerRelay) SetOnDisconnected(f func()) {
w.callBackMu.Lock()
defer w.callBackMu.Unlock()
w.onDisconnected = f
}
func (w *WorkerRelay) RelayInstanceAddress() (string, error) { func (w *WorkerRelay) RelayInstanceAddress() (string, error) {
return w.relayManager.RelayInstanceAddress() return w.relayManager.RelayInstanceAddress()
} }
@ -187,7 +203,7 @@ func (w *WorkerRelay) wgStateCheck(ctx context.Context, ctxCancel context.Cancel
w.relayLock.Lock() w.relayLock.Lock()
_ = w.relayedConn.Close() _ = w.relayedConn.Close()
w.relayLock.Unlock() w.relayLock.Unlock()
w.callBacks.OnDisconnected() go w.notifyDisconnected()
return return
} }
@ -232,5 +248,25 @@ func (w *WorkerRelay) onRelayMGDisconnected() {
if w.ctxCancelWgWatch != nil { if w.ctxCancelWgWatch != nil {
w.ctxCancelWgWatch() w.ctxCancelWgWatch()
} }
go w.callBacks.OnDisconnected() go w.notifyDisconnected()
}
func (w *WorkerRelay) notifyDisconnected() {
w.callBackMu.Lock()
defer w.callBackMu.Unlock()
if w.onDisconnected == nil {
return
}
w.onDisconnected()
}
func (w *WorkerRelay) notifyOnReady(ci RelayConnInfo) {
w.callBackMu.Lock()
defer w.callBackMu.Unlock()
if w.onConnReady == nil {
return
}
w.onConnReady(ci)
} }