Fix loop close

This commit is contained in:
Zoltán Papp 2024-07-23 23:04:38 +02:00
parent e9e3b8ba10
commit 20eb1f50e3
3 changed files with 16 additions and 7 deletions

View File

@ -337,7 +337,7 @@ func (conn *Conn) reconnectLoopWithRetry() {
} }
// checks if there is peer connection is established via relay or ice // checks if there is peer connection is established via relay or ice
conn.log.Infof("ticker timedout, relay state: %s, ice state: %s", conn.statusRelay, conn.statusICE) conn.log.Tracef("ticker timedout, relay state: %s, ice state: %s", conn.statusRelay, conn.statusICE)
if conn.isConnected() { if conn.isConnected() {
continue continue
} }

View File

@ -29,17 +29,20 @@ type WorkerRelayCallbacks struct {
} }
type WorkerRelay struct { type WorkerRelay struct {
ctx context.Context parentCtx context.Context
log *log.Entry log *log.Entry
config ConnConfig config ConnConfig
wgInterface iface.IWGIface wgInterface iface.IWGIface
relayManager relayClient.ManagerService relayManager relayClient.ManagerService
conn WorkerRelayCallbacks conn WorkerRelayCallbacks
ctx context.Context
ctxCancel context.CancelFunc
} }
func NewWorkerRelay(ctx context.Context, log *log.Entry, config ConnConfig, relayManager relayClient.ManagerService, callbacks WorkerRelayCallbacks) *WorkerRelay { func NewWorkerRelay(ctx context.Context, log *log.Entry, config ConnConfig, relayManager relayClient.ManagerService, callbacks WorkerRelayCallbacks) *WorkerRelay {
return &WorkerRelay{ return &WorkerRelay{
ctx: ctx, parentCtx: ctx,
log: log, log: log,
config: config, config: config,
relayManager: relayManager, relayManager: relayManager,
@ -62,8 +65,10 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
srv := w.preferredRelayServer(currentRelayAddress, remoteOfferAnswer.RelaySrvAddress) srv := w.preferredRelayServer(currentRelayAddress, remoteOfferAnswer.RelaySrvAddress)
relayedConn, err := w.relayManager.OpenConn(srv, w.config.Key, w.conn.OnDisconnected) w.ctx, w.ctxCancel = context.WithCancel(w.parentCtx)
relayedConn, err := w.relayManager.OpenConn(srv, w.config.Key, w.disconnected)
if err != nil { if err != nil {
w.ctxCancel()
// todo handle all type errors // todo handle all type errors
if errors.Is(err, relayClient.ErrConnAlreadyExists) { if errors.Is(err, relayClient.ErrConnAlreadyExists) {
w.log.Infof("do not need to reopen relay connection") w.log.Infof("do not need to reopen relay connection")
@ -99,7 +104,6 @@ func (w *WorkerRelay) RelayIsSupportedLocally() bool {
func (w *WorkerRelay) wgStateCheck(conn net.Conn) { func (w *WorkerRelay) wgStateCheck(conn net.Conn) {
timer := time.NewTimer(wgHandshakeOvertime) timer := time.NewTimer(wgHandshakeOvertime)
defer timer.Stop() defer timer.Stop()
for { for {
select { select {
case <-timer.C: case <-timer.C:
@ -108,7 +112,7 @@ func (w *WorkerRelay) wgStateCheck(conn net.Conn) {
w.log.Errorf("failed to read wg stats: %v", err) w.log.Errorf("failed to read wg stats: %v", err)
continue continue
} }
log.Infof("last handshake: %v", lastHandshake) w.log.Tracef("last handshake: %v", lastHandshake)
if time.Since(lastHandshake) > wgHandshakePeriod { if time.Since(lastHandshake) > wgHandshakePeriod {
w.log.Infof("Wireguard handshake timed out, closing relay connection") w.log.Infof("Wireguard handshake timed out, closing relay connection")
@ -145,3 +149,8 @@ func (w *WorkerRelay) wgState() (time.Time, error) {
} }
return wgState.LastHandshake, nil return wgState.LastHandshake, nil
} }
func (w *WorkerRelay) disconnected() {
w.ctxCancel()
w.conn.OnDisconnected()
}

View File

@ -206,7 +206,7 @@ func (p *WGEBPFProxy) storeTurnConn(turnConn net.Conn) (uint16, error) {
} }
func (p *WGEBPFProxy) removeTurnConn(turnConnID uint16) { func (p *WGEBPFProxy) removeTurnConn(turnConnID uint16) {
log.Tracef("remove turn conn from store by port: %d", turnConnID) log.Debugf("remove turn conn from store by port: %d", turnConnID)
p.turnConnMutex.Lock() p.turnConnMutex.Lock()
defer p.turnConnMutex.Unlock() defer p.turnConnMutex.Unlock()
delete(p.turnConnStore, turnConnID) delete(p.turnConnStore, turnConnID)