Handle on offer listener in handshaker

This commit is contained in:
Zoltán Papp 2024-06-21 15:35:15 +02:00
parent 4d67d72785
commit 7581bbd925
3 changed files with 31 additions and 35 deletions

View File

@ -140,13 +140,18 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu
DoHandshake: conn.doHandshake, DoHandshake: conn.doHandshake,
} }
conn.handshaker = NewHandshaker(ctx, connLog, config, signaler, conn.onNewOffer) conn.handshaker = NewHandshaker(ctx, connLog, config, signaler)
go conn.handshaker.Listen() conn.workerRelay = NewWorkerRelay(ctx, connLog, config, relayManager, rFns)
conn.workerRelay = NewWorkerRelay(ctx, connLog, relayManager, config, rFns)
conn.workerICE, err = NewWorkerICE(ctx, connLog, config, config.ICEConfig, signaler, iFaceDiscover, statusRecorder, wFns) conn.workerICE, err = NewWorkerICE(ctx, connLog, config, config.ICEConfig, signaler, iFaceDiscover, statusRecorder, wFns)
if err != nil { if err != nil {
return nil, err return nil, err
} }
conn.handshaker.AddOnNewOfferListener(conn.workerRelay.OnNewOffer)
conn.handshaker.AddOnNewOfferListener(conn.workerICE.OnNewOffer)
go conn.handshaker.Listen()
return conn, nil return conn, nil
} }
@ -169,9 +174,6 @@ func (conn *Conn) Open() {
} }
relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally() relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
if relayIsSupportedLocally {
go conn.workerRelay.SetupRelayConnection()
}
go conn.workerICE.SetupICEConnection(relayIsSupportedLocally) go conn.workerICE.SetupICEConnection(relayIsSupportedLocally)
} }
@ -557,12 +559,6 @@ func (conn *Conn) evalStatus() ConnStatus {
return StatusDisconnected return StatusDisconnected
} }
func (conn *Conn) onNewOffer(answer *OfferAnswer) {
// todo move to this callback into handshaker
go conn.workerRelay.OnNewOffer(answer)
go conn.workerICE.OnNewOffer(answer)
}
func isRosenpassEnabled(remoteRosenpassPubKey []byte) bool { func isRosenpassEnabled(remoteRosenpassPubKey []byte) bool {
return remoteRosenpassPubKey != nil return remoteRosenpassPubKey != nil
} }

View File

@ -53,12 +53,12 @@ type HandshakeArgs struct {
} }
type Handshaker struct { type Handshaker struct {
mu sync.Mutex mu sync.Mutex
ctx context.Context ctx context.Context
log *log.Entry log *log.Entry
config ConnConfig config ConnConfig
signaler *Signaler signaler *Signaler
onNewOfferListener func(*OfferAnswer) onNewOfferListeners []func(*OfferAnswer)
// remoteOffersCh is a channel used to wait for remote credentials to proceed with the connection // remoteOffersCh is a channel used to wait for remote credentials to proceed with the connection
remoteOffersCh chan OfferAnswer remoteOffersCh chan OfferAnswer
@ -71,18 +71,21 @@ type Handshaker struct {
lastOfferArgs HandshakeArgs lastOfferArgs HandshakeArgs
} }
func NewHandshaker(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, onNewOfferListener func(*OfferAnswer)) *Handshaker { func NewHandshaker(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler) *Handshaker {
return &Handshaker{ return &Handshaker{
ctx: ctx, ctx: ctx,
log: log, log: log,
config: config, config: config,
signaler: signaler, signaler: signaler,
remoteOffersCh: make(chan OfferAnswer), remoteOffersCh: make(chan OfferAnswer),
remoteAnswerCh: make(chan OfferAnswer), remoteAnswerCh: make(chan OfferAnswer),
onNewOfferListener: onNewOfferListener,
} }
} }
func (h *Handshaker) AddOnNewOfferListener(offer func(remoteOfferAnswer *OfferAnswer)) {
h.onNewOfferListeners = append(h.onNewOfferListeners, offer)
}
func (h *Handshaker) Listen() { func (h *Handshaker) Listen() {
for { for {
remoteOfferAnswer, err := h.waitForRemoteOfferConfirmation() remoteOfferAnswer, err := h.waitForRemoteOfferConfirmation()
@ -96,7 +99,9 @@ func (h *Handshaker) Listen() {
} }
h.log.Debugf("received connection confirmation, running version %s and with remote WireGuard listen port %d", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort) h.log.Debugf("received connection confirmation, running version %s and with remote WireGuard listen port %d", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort)
go h.onNewOfferListener(remoteOfferAnswer) for _, listener := range h.onNewOfferListeners {
go listener(remoteOfferAnswer)
}
} }
} }

View File

@ -23,26 +23,21 @@ type WorkerRelayCallbacks struct {
type WorkerRelay struct { type WorkerRelay struct {
ctx context.Context ctx context.Context
log *log.Entry log *log.Entry
relayManager *relayClient.Manager
config ConnConfig config ConnConfig
relayManager *relayClient.Manager
conn WorkerRelayCallbacks conn WorkerRelayCallbacks
} }
func NewWorkerRelay(ctx context.Context, log *log.Entry, relayManager *relayClient.Manager, config ConnConfig, callbacks WorkerRelayCallbacks) *WorkerRelay { func NewWorkerRelay(ctx context.Context, log *log.Entry, config ConnConfig, relayManager *relayClient.Manager, callbacks WorkerRelayCallbacks) *WorkerRelay {
return &WorkerRelay{ return &WorkerRelay{
ctx: ctx, ctx: ctx,
log: log, log: log,
relayManager: relayManager,
config: config, config: config,
relayManager: relayManager,
conn: callbacks, conn: callbacks,
} }
} }
// SetupRelayConnection todo: this function is not completed. Make no sense to put it in a for loop because we are not waiting for any event
func (w *WorkerRelay) SetupRelayConnection() {
}
func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) { func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
if !w.isRelaySupported(remoteOfferAnswer) { if !w.isRelaySupported(remoteOfferAnswer) {
w.log.Infof("Relay is not supported by remote peer") w.log.Infof("Relay is not supported by remote peer")