code cleaning

This commit is contained in:
Zoltan Papp 2025-01-18 01:11:31 +01:00
parent 3d35d6fe09
commit e5d9e3fb13
4 changed files with 20 additions and 100 deletions

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"math/rand" "math/rand"
"net" "net"
"os"
"runtime" "runtime"
"strings" "strings"
"sync" "sync"
@ -127,25 +126,15 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu
} }
ctrl := isController(config) ctrl := isController(config)
conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, relayManager)
conn.workerRelay.SetOnConnReady(conn.relayConnectionIsReady)
conn.workerRelay.SetOnDisconnected(conn.onWorkerRelayStateDisconnected)
relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally() conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, conn, relayManager)
conn.workerICE, err = NewWorkerICE(ctx, connLog, config, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally) conn.workerICE, err = NewWorkerICE(ctx, connLog, config, conn, signaler, iFaceDiscover, statusRecorder, conn.workerRelay.RelayIsSupportedLocally())
if err != nil { if err != nil {
return nil, err return nil, err
} }
conn.workerICE.SetOnConnReady(conn.iCEConnectionIsReady)
conn.workerICE.SetOnDisconnected(conn.onWorkerICEStateDisconnected)
conn.handshaker = NewHandshaker(ctx, connLog, config, signaler, conn.workerICE, conn.workerRelay) conn.handshaker = NewHandshaker(ctx, connLog, config, signaler, conn.workerICE, conn.workerRelay)
conn.handshaker.AddOnNewOfferListener(conn.workerRelay.OnNewOffer)
if os.Getenv("NB_FORCE_RELAY") != "true" {
conn.handshaker.AddOnNewOfferListener(conn.workerICE.OnNewOffer)
}
conn.guard = guard.NewGuard(connLog, ctrl, conn.isConnectedOnAllWay, config.Timeout, srWatcher) conn.guard = guard.NewGuard(connLog, ctrl, conn.isConnectedOnAllWay, config.Timeout, srWatcher)
go conn.handshaker.Listen() go conn.handshaker.Listen()

View File

@ -3,6 +3,7 @@ package peer
import ( import (
"context" "context"
"errors" "errors"
"os"
"sync" "sync"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -58,7 +59,7 @@ type Handshaker struct {
} }
func NewHandshaker(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay) *Handshaker { func NewHandshaker(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay) *Handshaker {
return &Handshaker{ hs := &Handshaker{
ctx: ctx, ctx: ctx,
log: log, log: log,
config: config, config: config,
@ -68,10 +69,12 @@ func NewHandshaker(ctx context.Context, log *log.Entry, config ConnConfig, signa
remoteOffersCh: make(chan OfferAnswer), remoteOffersCh: make(chan OfferAnswer),
remoteAnswerCh: make(chan OfferAnswer), remoteAnswerCh: make(chan OfferAnswer),
} }
}
func (h *Handshaker) AddOnNewOfferListener(offer func(remoteOfferAnswer *OfferAnswer)) { hs.onNewOfferListeners = append(hs.onNewOfferListeners, hs.relay.OnNewOffer)
h.onNewOfferListeners = append(h.onNewOfferListeners, offer) if os.Getenv("NB_FORCE_RELAY") != "true" {
hs.onNewOfferListeners = append(hs.onNewOfferListeners, hs.ice.OnNewOffer)
}
return hs
} }
func (h *Handshaker) Listen() { func (h *Handshaker) Listen() {

View File

@ -36,15 +36,12 @@ type WorkerICE struct {
ctx context.Context ctx context.Context
log *log.Entry log *log.Entry
config ConnConfig config ConnConfig
conn *Conn
signaler *Signaler signaler *Signaler
iFaceDiscover stdnet.ExternalIFaceDiscover iFaceDiscover stdnet.ExternalIFaceDiscover
statusRecorder *Status statusRecorder *Status
hasRelayOnLocally bool hasRelayOnLocally bool
onConnReady func(ICEConnInfo)
onDisconnected func()
callBackMu sync.Mutex
agent *ice.Agent agent *ice.Agent
muxAgent sync.Mutex muxAgent sync.Mutex
@ -56,11 +53,12 @@ type WorkerICE struct {
localPwd string localPwd string
} }
func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, hasRelayOnLocally bool) (*WorkerICE, error) { func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, conn *Conn, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, hasRelayOnLocally bool) (*WorkerICE, error) {
w := &WorkerICE{ w := &WorkerICE{
ctx: ctx, ctx: ctx,
log: log, log: log,
config: config, config: config,
conn: conn,
signaler: signaler, signaler: signaler,
iFaceDiscover: ifaceDiscover, iFaceDiscover: ifaceDiscover,
statusRecorder: statusRecorder, statusRecorder: statusRecorder,
@ -157,7 +155,7 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
Priority: selectedPriority, Priority: selectedPriority,
} }
w.log.Debugf("on ICE conn read to use ready") w.log.Debugf("on ICE conn read to use ready")
w.notifyOnReady(ci) w.conn.iCEConnectionIsReady(ci)
} }
// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer. // OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer.
@ -181,20 +179,6 @@ func (w *WorkerICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HA
} }
} }
func (w *WorkerICE) SetOnConnReady(f func(ICEConnInfo)) {
w.callBackMu.Lock()
defer w.callBackMu.Unlock()
w.onConnReady = f
}
func (w *WorkerICE) SetOnDisconnected(f func()) {
w.callBackMu.Lock()
defer w.callBackMu.Unlock()
w.onDisconnected = f
}
func (w *WorkerICE) GetLocalUserCredentials() (frag string, pwd string) { func (w *WorkerICE) GetLocalUserCredentials() (frag string, pwd string) {
w.muxAgent.Lock() w.muxAgent.Lock()
defer w.muxAgent.Unlock() defer w.muxAgent.Unlock()
@ -231,7 +215,7 @@ func (w *WorkerICE) reCreateAgent(agentCancel context.CancelFunc, candidates []i
err = agent.OnConnectionStateChange(func(state ice.ConnectionState) { err = agent.OnConnectionStateChange(func(state ice.ConnectionState) {
w.log.Debugf("ICE ConnectionState has changed to %s", state.String()) w.log.Debugf("ICE ConnectionState has changed to %s", state.String())
if state == ice.ConnectionStateFailed || state == ice.ConnectionStateDisconnected { if state == ice.ConnectionStateFailed || state == ice.ConnectionStateDisconnected {
w.notifyDisconnected() w.conn.onWorkerICEStateDisconnected()
w.muxAgent.Lock() w.muxAgent.Lock()
agentCancel() agentCancel()
@ -343,26 +327,6 @@ func (w *WorkerICE) turnAgentDial(ctx context.Context, remoteOfferAnswer *OfferA
} }
} }
func (w *WorkerICE) notifyDisconnected() {
w.callBackMu.Lock()
defer w.callBackMu.Unlock()
if w.onDisconnected == nil {
return
}
w.onDisconnected()
}
func (w *WorkerICE) notifyOnReady(ci ICEConnInfo) {
w.callBackMu.Lock()
defer w.callBackMu.Unlock()
if w.onConnReady == nil {
return
}
w.onConnReady(ci)
}
func extraSrflxCandidate(candidate ice.Candidate) (*ice.CandidateServerReflexive, error) { func extraSrflxCandidate(candidate ice.Candidate) (*ice.CandidateServerReflexive, error) {
relatedAdd := candidate.RelatedAddress() relatedAdd := candidate.RelatedAddress()
return ice.NewCandidateServerReflexive(&ice.CandidateServerReflexiveConfig{ return ice.NewCandidateServerReflexive(&ice.CandidateServerReflexiveConfig{

View File

@ -34,10 +34,7 @@ type WorkerRelay struct {
isController bool isController bool
config ConnConfig config ConnConfig
relayManager relayClient.ManagerService relayManager relayClient.ManagerService
conn *Conn
onConnReady func(info RelayConnInfo)
onDisconnected func()
callBackMu sync.Mutex
relayedConn net.Conn relayedConn net.Conn
relayLock sync.Mutex relayLock sync.Mutex
@ -48,12 +45,13 @@ type WorkerRelay struct {
relaySupportedOnRemotePeer atomic.Bool relaySupportedOnRemotePeer atomic.Bool
} }
func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, relayManager relayClient.ManagerService) *WorkerRelay { func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, conn *Conn, relayManager *relayClient.Manager) *WorkerRelay {
r := &WorkerRelay{ r := &WorkerRelay{
log: log, log: log,
isController: ctrl, isController: ctrl,
config: config, config: config,
relayManager: relayManager, relayManager: relayManager,
conn: conn,
} }
return r return r
} }
@ -97,7 +95,7 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
} }
w.log.Debugf("peer conn opened via Relay: %s", srv) w.log.Debugf("peer conn opened via Relay: %s", srv)
go w.notifyOnReady(RelayConnInfo{ go w.conn.relayConnectionIsReady(RelayConnInfo{
relayedConn: relayedConn, relayedConn: relayedConn,
rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey, rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey,
rosenpassAddr: remoteOfferAnswer.RosenpassAddr, rosenpassAddr: remoteOfferAnswer.RosenpassAddr,
@ -133,20 +131,6 @@ func (w *WorkerRelay) DisableWgWatcher() {
w.ctxCancelWgWatch() w.ctxCancelWgWatch()
} }
func (w *WorkerRelay) SetOnConnReady(f func(info RelayConnInfo)) {
w.callBackMu.Lock()
defer w.callBackMu.Unlock()
w.onConnReady = f
}
func (w *WorkerRelay) SetOnDisconnected(f func()) {
w.callBackMu.Lock()
defer w.callBackMu.Unlock()
w.onDisconnected = f
}
func (w *WorkerRelay) RelayInstanceAddress() (string, error) { func (w *WorkerRelay) RelayInstanceAddress() (string, error) {
return w.relayManager.RelayInstanceAddress() return w.relayManager.RelayInstanceAddress()
} }
@ -203,7 +187,7 @@ func (w *WorkerRelay) wgStateCheck(ctx context.Context, ctxCancel context.Cancel
w.relayLock.Lock() w.relayLock.Lock()
_ = w.relayedConn.Close() _ = w.relayedConn.Close()
w.relayLock.Unlock() w.relayLock.Unlock()
go w.notifyDisconnected() go w.conn.onWorkerRelayStateDisconnected()
return return
} }
@ -248,25 +232,5 @@ func (w *WorkerRelay) onRelayMGDisconnected() {
if w.ctxCancelWgWatch != nil { if w.ctxCancelWgWatch != nil {
w.ctxCancelWgWatch() w.ctxCancelWgWatch()
} }
go w.notifyDisconnected() go w.conn.onWorkerRelayStateDisconnected()
}
func (w *WorkerRelay) notifyDisconnected() {
w.callBackMu.Lock()
defer w.callBackMu.Unlock()
if w.onDisconnected == nil {
return
}
w.onDisconnected()
}
func (w *WorkerRelay) notifyOnReady(ci RelayConnInfo) {
w.callBackMu.Lock()
defer w.callBackMu.Unlock()
if w.onConnReady == nil {
return
}
w.onConnReady(ci)
} }