Code cleaning

This commit is contained in:
Zoltán Papp 2024-06-19 11:52:40 +02:00
parent 2f32e0d8cf
commit 4d2a25b728
3 changed files with 74 additions and 50 deletions

View File

@ -63,6 +63,15 @@ type ConnConfig struct {
type BeforeAddPeerHookFunc func(connID nbnet.ConnectionID, IP net.IP) error type BeforeAddPeerHookFunc func(connID nbnet.ConnectionID, IP net.IP) error
type AfterRemovePeerHookFunc func(connID nbnet.ConnectionID) error type AfterRemovePeerHookFunc func(connID nbnet.ConnectionID) error
type WorkerCallbacks struct {
OnRelayReadyCallback func(info RelayConnInfo)
OnRelayStatusChanged func(ConnStatus)
OnICEConnReadyCallback func(ConnPriority, ICEConnInfo)
OnICEStatusChanged func(ConnStatus)
DoHandshake func(*OfferAnswer, error)
}
type Conn struct { type Conn struct {
log *log.Entry log *log.Entry
mu sync.Mutex mu sync.Mutex
@ -116,8 +125,21 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu
statusRelay: StatusDisconnected, statusRelay: StatusDisconnected,
statusICE: StatusDisconnected, statusICE: StatusDisconnected,
} }
conn.workerICE = NewWorkerICE(ctx, conn.log, config, config.ICEConfig, signaler, iFaceDiscover, statusRecorder, conn.iCEConnectionIsReady, conn.onWorkerICEStateChanged, conn.doHandshake)
conn.workerRelay = NewWorkerRelay(ctx, conn.log, relayManager, config, conn.relayConnectionIsReady, conn.onWorkerRelayStateChanged, conn.doHandshake) rFns := WorkerRelayCallbacks{
OnConnReady: conn.relayConnectionIsReady,
OnStatusChanged: conn.onWorkerRelayStateChanged,
DoHandshake: conn.doHandshake,
}
wFns := WorkerICECallbacks{
OnConnReady: conn.iCEConnectionIsReady,
OnStatusChanged: conn.onWorkerICEStateChanged,
DoHandshake: conn.doHandshake,
}
conn.workerRelay = NewWorkerRelay(ctx, conn.log, relayManager, config, rFns)
conn.workerICE = NewWorkerICE(ctx, conn.log, config, config.ICEConfig, signaler, iFaceDiscover, statusRecorder, wFns)
return conn, nil return conn, nil
} }

View File

@ -53,8 +53,6 @@ type ICEConfig struct {
NATExternalIPs []string NATExternalIPs []string
} }
type OnICEConnReadyCallback func(ConnPriority, ICEConnInfo)
type ICEConnInfo struct { type ICEConnInfo struct {
RemoteConn net.Conn RemoteConn net.Conn
RosenpassPubKey []byte RosenpassPubKey []byte
@ -68,6 +66,12 @@ type ICEConnInfo struct {
RelayedOnLocal bool RelayedOnLocal bool
} }
type WorkerICECallbacks struct {
OnConnReady func(ConnPriority, ICEConnInfo)
OnStatusChanged func(ConnStatus)
DoHandshake func() (*OfferAnswer, error)
}
type WorkerICE struct { type WorkerICE struct {
ctx context.Context ctx context.Context
log *log.Entry log *log.Entry
@ -76,9 +80,7 @@ type WorkerICE struct {
signaler *Signaler signaler *Signaler
iFaceDiscover stdnet.ExternalIFaceDiscover iFaceDiscover stdnet.ExternalIFaceDiscover
statusRecorder *Status statusRecorder *Status
onICEConnReady OnICEConnReadyCallback conn WorkerICECallbacks
onStatusChanged func(ConnStatus)
doHandshakeFn DoHandshake
selectedPriority ConnPriority selectedPriority ConnPriority
@ -92,7 +94,7 @@ type WorkerICE struct {
localPwd string localPwd string
} }
func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, configICE ICEConfig, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, onICEConnReady OnICEConnReadyCallback, onStatusChanged func(ConnStatus), doHandshakeFn DoHandshake) *WorkerICE { func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, configICE ICEConfig, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, callBacks WorkerICECallbacks) *WorkerICE {
cice := &WorkerICE{ cice := &WorkerICE{
ctx: ctx, ctx: ctx,
log: log, log: log,
@ -101,9 +103,7 @@ func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, config
signaler: signaler, signaler: signaler,
iFaceDiscover: ifaceDiscover, iFaceDiscover: ifaceDiscover,
statusRecorder: statusRecorder, statusRecorder: statusRecorder,
onICEConnReady: onICEConnReady, conn: callBacks,
onStatusChanged: onStatusChanged,
doHandshakeFn: doHandshakeFn,
} }
return cice return cice
} }
@ -118,9 +118,9 @@ func (w *WorkerICE) SetupICEConnection(hasRelayOnLocally bool) {
return return
} }
w.onStatusChanged(StatusConnecting) w.conn.OnStatusChanged(StatusConnecting)
remoteOfferAnswer, err := w.doHandshakeFn() remoteOfferAnswer, err := w.conn.DoHandshake()
if err != nil { if err != nil {
if errors.Is(err, ErrSignalIsNotReady) { if errors.Is(err, ErrSignalIsNotReady) {
w.log.Infof("signal client isn't ready, skipping connection attempt") w.log.Infof("signal client isn't ready, skipping connection attempt")
@ -200,12 +200,12 @@ func (w *WorkerICE) SetupICEConnection(hasRelayOnLocally bool) {
Relayed: isRelayed(pair), Relayed: isRelayed(pair),
RelayedOnLocal: isRelayCandidate(pair.Local), RelayedOnLocal: isRelayCandidate(pair.Local),
} }
go w.onICEConnReady(w.selectedPriority, ci) go w.conn.OnConnReady(w.selectedPriority, ci)
<-ctx.Done() <-ctx.Done()
ctxCancel() ctxCancel()
_ = w.agent.Close() _ = w.agent.Close()
w.onStatusChanged(StatusDisconnected) w.conn.OnStatusChanged(StatusDisconnected)
} }
} }

View File

@ -12,33 +12,33 @@ import (
relayClient "github.com/netbirdio/netbird/relay/client" relayClient "github.com/netbirdio/netbird/relay/client"
) )
type OnRelayReadyCallback func(info RelayConnInfo)
type RelayConnInfo struct { type RelayConnInfo struct {
relayedConn net.Conn relayedConn net.Conn
rosenpassPubKey []byte rosenpassPubKey []byte
rosenpassAddr string rosenpassAddr string
} }
type WorkerRelayCallbacks struct {
OnConnReady func(RelayConnInfo)
OnStatusChanged func(ConnStatus)
DoHandshake func() (*OfferAnswer, error)
}
type WorkerRelay struct { type WorkerRelay struct {
ctx context.Context ctx context.Context
log *log.Entry log *log.Entry
relayManager *relayClient.Manager relayManager *relayClient.Manager
config ConnConfig config ConnConfig
onRelayConnReadyFN OnRelayReadyCallback conn WorkerRelayCallbacks
onStatusChanged func(ConnStatus)
doHandshakeFn DoHandshake
} }
func NewWorkerRelay(ctx context.Context, log *log.Entry, relayManager *relayClient.Manager, config ConnConfig, onRelayConnReadyFN OnRelayReadyCallback, onStatusChanged func(ConnStatus), doHandshakeFn DoHandshake) *WorkerRelay { func NewWorkerRelay(ctx context.Context, log *log.Entry, relayManager *relayClient.Manager, config ConnConfig, callbacks WorkerRelayCallbacks) *WorkerRelay {
return &WorkerRelay{ return &WorkerRelay{
ctx: ctx, ctx: ctx,
log: log, log: log,
relayManager: relayManager, relayManager: relayManager,
config: config, config: config,
onRelayConnReadyFN: onRelayConnReadyFN, conn: callbacks,
onStatusChanged: onStatusChanged,
doHandshakeFn: doHandshakeFn,
} }
} }
@ -49,7 +49,7 @@ func (w *WorkerRelay) SetupRelayConnection() {
return return
} }
remoteOfferAnswer, err := w.doHandshakeFn() remoteOfferAnswer, err := w.conn.DoHandshake()
if err != nil { if err != nil {
if errors.Is(err, ErrSignalIsNotReady) { if errors.Is(err, ErrSignalIsNotReady) {
w.log.Infof("signal client isn't ready, skipping connection attempt") w.log.Infof("signal client isn't ready, skipping connection attempt")
@ -75,7 +75,7 @@ func (w *WorkerRelay) SetupRelayConnection() {
continue continue
} }
go w.onRelayConnReadyFN(RelayConnInfo{ go w.conn.OnConnReady(RelayConnInfo{
relayedConn: relayedConn, relayedConn: relayedConn,
rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey, rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey,
rosenpassAddr: remoteOfferAnswer.RosenpassAddr, rosenpassAddr: remoteOfferAnswer.RosenpassAddr,
@ -89,8 +89,10 @@ func (w *WorkerRelay) RelayAddress() (net.Addr, error) {
return w.relayManager.RelayAddress() return w.relayManager.RelayAddress()
} }
// todo check my side too
func (w *WorkerRelay) isRelaySupported(answer *OfferAnswer) bool { func (w *WorkerRelay) isRelaySupported(answer *OfferAnswer) bool {
if !w.relayManager.HasRelayAddress() {
return false
}
return answer.RelaySrvAddress != "" return answer.RelaySrvAddress != ""
} }