From 0a67f5be1a3251924e4ded6e182906b7f6ecf896 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Tue, 25 Jun 2024 15:13:08 +0200 Subject: [PATCH] Fix logic --- client/internal/peer/conn.go | 370 ++++++++++++--------- client/internal/peer/handshaker.go | 9 +- client/internal/peer/worker_ice.go | 122 ++----- client/internal/peer/worker_relay.go | 31 +- client/internal/wgproxy/proxy_userspace.go | 5 + relay/client/client.go | 2 +- relay/client/guard.go | 1 - relay/client/manager.go | 77 ++++- relay/client/manager_test.go | 12 +- relay/cmd/main.go | 2 +- relay/messages/id.go | 2 +- relay/server/server.go | 3 +- 12 files changed, 345 insertions(+), 291 deletions(-) diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index 3f7003187..f93d4e8a6 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -2,6 +2,7 @@ package peer import ( "context" + "math/rand" "net" "runtime" "strings" @@ -26,8 +27,8 @@ const ( defaultWgKeepAlive = 25 * time.Second connPriorityRelay ConnPriority = 1 - connPriorityICETurn = 1 - connPriorityICEP2P = 2 + connPriorityICETurn ConnPriority = 1 + connPriorityICEP2P ConnPriority = 2 ) type WgConfig struct { @@ -69,7 +70,6 @@ type WorkerCallbacks struct { OnICEConnReadyCallback func(ConnPriority, ICEConnInfo) OnICEStatusChanged func(ConnStatus) - DoHandshake func(*OfferAnswer, error) } type Conn struct { @@ -83,6 +83,7 @@ type Conn struct { wgProxyICE wgproxy.Proxy wgProxyRelay wgproxy.Proxy signaler *Signaler + relayManager *relayClient.Manager allowedIPsIP string handshaker *Handshaker @@ -101,6 +102,9 @@ type Conn struct { afterRemovePeerHooks []AfterRemovePeerHookFunc endpointRelay *net.UDPAddr + + iCEDisconnected chan struct{} + relayDisconnected chan struct{} } // NewConn creates a new not opened Conn to the remote peer. @@ -116,32 +120,36 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu connLog := log.WithField("peer", config.Key) var conn = &Conn{ - log: connLog, - ctx: ctx, - ctxCancel: ctxCancel, - config: config, - statusRecorder: statusRecorder, - wgProxyFactory: wgProxyFactory, - signaler: signaler, - allowedIPsIP: allowedIPsIP.String(), - statusRelay: StatusDisconnected, - statusICE: StatusDisconnected, + log: connLog, + ctx: ctx, + ctxCancel: ctxCancel, + config: config, + statusRecorder: statusRecorder, + wgProxyFactory: wgProxyFactory, + signaler: signaler, + relayManager: relayManager, + allowedIPsIP: allowedIPsIP.String(), + statusRelay: StatusDisconnected, + statusICE: StatusDisconnected, + iCEDisconnected: make(chan struct{}), + relayDisconnected: make(chan struct{}), } rFns := WorkerRelayCallbacks{ - OnConnReady: conn.relayConnectionIsReady, - OnStatusChanged: conn.onWorkerRelayStateChanged, + OnConnReady: conn.relayConnectionIsReady, + OnDisconnected: conn.onWorkerRelayStateDisconnected, } wFns := WorkerICECallbacks{ OnConnReady: conn.iCEConnectionIsReady, - OnStatusChanged: conn.onWorkerICEStateChanged, - DoHandshake: conn.doHandshake, + OnStatusChanged: conn.onWorkerICEStateDisconnected, } conn.handshaker = NewHandshaker(ctx, connLog, config, signaler) conn.workerRelay = NewWorkerRelay(ctx, connLog, config, relayManager, rFns) - conn.workerICE, err = NewWorkerICE(ctx, connLog, config, config.ICEConfig, signaler, iFaceDiscover, statusRecorder, wFns) + + relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally() + conn.workerICE, err = NewWorkerICE(ctx, connLog, config, config.ICEConfig, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally, wFns) if err != nil { return nil, err } @@ -172,14 +180,21 @@ func (conn *Conn) Open() { conn.log.Warnf("error while updating the state err: %v", err) } - relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally() - go conn.workerICE.SetupICEConnection(relayIsSupportedLocally) + conn.waitRandomSleepTime() + + err = conn.doHandshake() + if err != nil { + conn.log.Errorf("failed to send offer: %v", err) + } + + go conn.reconnectLoop() } // Close closes this peer Conn issuing a close event to the Conn closeCh func (conn *Conn) Close() { conn.mu.Lock() defer conn.mu.Unlock() + conn.ctxCancel() if conn.wgProxyRelay != nil { @@ -198,7 +213,6 @@ func (conn *Conn) Close() { conn.wgProxyICE = nil } - // todo: is it problem if we try to remove a peer what is never existed? err := conn.config.WgConfig.WgInterface.RemovePeer(conn.config.WgConfig.RemoteKey) if err != nil { conn.log.Errorf("failed to remove wg endpoint: %v", err) @@ -268,7 +282,7 @@ func (conn *Conn) SetOnDisconnected(handler func(remotePeer string, wgIP string) } func (conn *Conn) OnRemoteOffer(offer OfferAnswer) bool { - conn.log.Debugf("OnRemoteOffer, on status ICE: %s, status relay: %s", conn.statusICE, conn.statusRelay) + conn.log.Debugf("OnRemoteOffer, on status ICE: %s, status Relay: %s", conn.statusICE, conn.statusRelay) return conn.handshaker.OnRemoteOffer(offer) } @@ -288,136 +302,39 @@ func (conn *Conn) GetKey() string { return conn.config.Key } -func (conn *Conn) onWorkerICEStateChanged(newState ConnStatus) { - conn.mu.Lock() - defer conn.mu.Unlock() - log.Debugf("ICE connection state changed to %s", newState) - defer func() { - conn.statusICE = newState - }() - - if conn.statusRelay == StatusConnected { - return +func (conn *Conn) reconnectLoop() { + ticker := time.NewTicker(conn.config.Timeout) // todo use the interval from config + if !conn.workerRelay.IsController() { + ticker.Stop() + } else { + defer ticker.Stop() } - if conn.evalStatus() == newState { - return - } - - if conn.endpointRelay != nil { - err := conn.configureWGEndpoint(conn.endpointRelay) - if err != nil { - conn.log.Errorf("failed to switch back to relay conn: %v", err) - } - // todo update status to relay related things - log.Debugf("switched back to relay connection") - return - } - - if newState > conn.statusICE { - peerState := State{ - PubKey: conn.config.Key, - ConnStatus: newState, - ConnStatusUpdate: time.Now(), - Mux: new(sync.RWMutex), - } - _ = conn.statusRecorder.UpdatePeerState(peerState) - } -} - -func (conn *Conn) onWorkerRelayStateChanged(newState ConnStatus) { - conn.mu.Lock() - defer conn.mu.Unlock() - defer func() { - conn.statusRelay = newState - }() - - conn.log.Debugf("Relay connection state changed to %s", newState) - - if conn.statusICE == StatusConnected { - return - } - - if conn.evalStatus() == newState { - return - } - - if newState > conn.statusRelay { - peerState := State{ - PubKey: conn.config.Key, - ConnStatus: newState, - ConnStatusUpdate: time.Now(), - Mux: new(sync.RWMutex), - } - _ = conn.statusRecorder.UpdatePeerState(peerState) - } -} - -func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) { - conn.mu.Lock() - defer conn.mu.Unlock() - - if conn.ctx.Err() != nil { - return - } - - conn.log.Debugf("relay connection is ready") - - conn.statusRelay = stateConnected - - // todo review this condition - if conn.currentConnType > connPriorityRelay { - if conn.statusICE == StatusConnected { - log.Debugf("do not switch to relay because current priority is: %v", conn.currentConnType) + for { + select { + case <-ticker.C: + // checks if there is peer connection is established via relay or ice and that it has a wireguard handshake and skip offer + // todo check wg handshake + if conn.statusRelay == StatusConnected && conn.statusICE == StatusConnected { + continue + } + case <-conn.relayDisconnected: + conn.log.Debugf("Relay connection is disconnected, start to send new offer") + ticker.Reset(10 * time.Second) + conn.waitRandomSleepTime() + case <-conn.iCEDisconnected: + conn.log.Debugf("ICE connection is disconnected, start to send new offer") + ticker.Reset(10 * time.Second) + conn.waitRandomSleepTime() + case <-conn.ctx.Done(): return } - } - if conn.currentConnType != 0 { - conn.log.Infof("update connection to Relay type") - } - - wgProxy := conn.wgProxyFactory.GetProxy(conn.ctx) - endpoint, err := wgProxy.AddTurnConn(rci.relayedConn) - if err != nil { - conn.log.Errorf("failed to add relayed net.Conn to local proxy: %v", err) - return - } - - endpointUdpAddr, _ := net.ResolveUDPAddr(endpoint.Network(), endpoint.String()) - conn.log.Debugf("conn resolved IP for %s: %s", endpoint, endpointUdpAddr.IP) - - conn.connID = nbnet.GenerateConnID() - for _, hook := range conn.beforeAddPeerHooks { - if err := hook(conn.connID, endpointUdpAddr.IP); err != nil { - conn.log.Errorf("Before add peer hook failed: %v", err) + err := conn.doHandshake() + if err != nil { + conn.log.Errorf("failed to do handshake: %v", err) } } - - err = conn.configureWGEndpoint(endpointUdpAddr) - if err != nil { - if err := wgProxy.CloseConn(); err != nil { - conn.log.Warnf("Failed to close relay connection: %v", err) - } - conn.log.Errorf("Failed to update wg peer configuration: %v", err) - return - } - conn.endpointRelay = endpointUdpAddr - - if conn.wgProxyRelay != nil { - if err := conn.wgProxyRelay.CloseConn(); err != nil { - conn.log.Warnf("failed to close depracated wg proxy conn: %v", err) - } - } - conn.wgProxyRelay = wgProxy - conn.currentConnType = connPriorityRelay - - peerState := State{ - Direct: false, - Relayed: true, - } - - conn.updateStatus(peerState, rci.rosenpassPubKey, rci.rosenpassAddr) } // configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected @@ -431,9 +348,8 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon conn.log.Debugf("ICE connection is ready") - conn.statusICE = stateConnected + conn.statusICE = StatusConnected - // todo review this condition if conn.currentConnType > priority { return } @@ -503,6 +419,150 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon conn.updateStatus(peerState, iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr) } +// todo review to make sense to handle connection and disconnected status also? +func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) { + conn.mu.Lock() + defer conn.mu.Unlock() + + conn.log.Tracef("ICE connection state changed to %s", newState) + defer func() { + conn.statusICE = newState + + select { + case conn.iCEDisconnected <- struct{}{}: + default: + } + }() + + // switch back to relay connection + if conn.endpointRelay != nil { + conn.log.Debugf("ICE disconnected, set Relay to active connection") + err := conn.configureWGEndpoint(conn.endpointRelay) + if err != nil { + conn.log.Errorf("failed to switch to relay conn: %v", err) + } + // todo update status to relay related things + return + } + + if conn.statusRelay == StatusConnected { + return + } + + if conn.evalStatus() == newState { + return + } + + if newState > conn.statusICE { + peerState := State{ + PubKey: conn.config.Key, + ConnStatus: newState, + ConnStatusUpdate: time.Now(), + Mux: new(sync.RWMutex), + } + _ = conn.statusRecorder.UpdatePeerState(peerState) + } +} + +func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) { + conn.mu.Lock() + defer conn.mu.Unlock() + + if conn.ctx.Err() != nil { + return + } + + conn.log.Debugf("Relay connection is ready to use") + conn.statusRelay = StatusConnected + + wgProxy := conn.wgProxyFactory.GetProxy(conn.ctx) + endpoint, err := wgProxy.AddTurnConn(rci.relayedConn) + if err != nil { + conn.log.Errorf("failed to add relayed net.Conn to local proxy: %v", err) + return + } + + endpointUdpAddr, _ := net.ResolveUDPAddr(endpoint.Network(), endpoint.String()) + conn.endpointRelay = endpointUdpAddr + conn.log.Debugf("conn resolved IP for %s: %s", endpoint, endpointUdpAddr.IP) + + if conn.currentConnType > connPriorityRelay { + if conn.statusICE == StatusConnected { + log.Debugf("do not switch to relay because current priority is: %v", conn.currentConnType) + return + } + } + + conn.connID = nbnet.GenerateConnID() + for _, hook := range conn.beforeAddPeerHooks { + if err := hook(conn.connID, endpointUdpAddr.IP); err != nil { + conn.log.Errorf("Before add peer hook failed: %v", err) + } + } + + err = conn.configureWGEndpoint(endpointUdpAddr) + if err != nil { + if err := wgProxy.CloseConn(); err != nil { + conn.log.Warnf("Failed to close relay connection: %v", err) + } + conn.log.Errorf("Failed to update wg peer configuration: %v", err) + return + } + + if conn.wgProxyRelay != nil { + if err := conn.wgProxyRelay.CloseConn(); err != nil { + conn.log.Warnf("failed to close depracated wg proxy conn: %v", err) + } + } + conn.wgProxyRelay = wgProxy + conn.currentConnType = connPriorityRelay + + peerState := State{ + Direct: false, + Relayed: true, + } + + conn.log.Infof("start to communicate with peer via relay") + conn.updateStatus(peerState, rci.rosenpassPubKey, rci.rosenpassAddr) +} + +func (conn *Conn) onWorkerRelayStateDisconnected() { + conn.mu.Lock() + defer conn.mu.Unlock() + defer func() { + conn.statusRelay = StatusDisconnected + + select { + case conn.relayDisconnected <- struct{}{}: + default: + } + }() + + if conn.wgProxyRelay != nil { + conn.endpointRelay = nil + _ = conn.wgProxyRelay.CloseConn() + conn.wgProxyRelay = nil + } + + if conn.statusICE == StatusConnected { + return + } + + if conn.evalStatus() == StatusDisconnected { + return + } + + if StatusDisconnected > conn.statusRelay { + peerState := State{ + PubKey: conn.config.Key, + ConnStatus: StatusDisconnected, + ConnStatusUpdate: time.Now(), + Mux: new(sync.RWMutex), + } + _ = conn.statusRecorder.UpdatePeerState(peerState) + } +} + func (conn *Conn) configureWGEndpoint(addr *net.UDPAddr) error { return conn.config.WgConfig.WgInterface.UpdatePeer( conn.config.WgConfig.RemoteKey, @@ -531,7 +591,6 @@ func (conn *Conn) updateStatus(peerState State, remoteRosenpassPubKey []byte, re if conn.onConnected != nil { conn.onConnected(conn.config.Key, remoteRosenpassPubKey, conn.allowedIPsIP, remoteRosenpassAddr) } - return } func (conn *Conn) doHandshake() error { @@ -548,9 +607,24 @@ func (conn *Conn) doHandshake() error { if err == nil { ha.RelayAddr = addr.String() } + conn.log.Tracef("send new offer: %#v", ha) return conn.handshaker.SendOffer(ha) } +func (conn *Conn) waitRandomSleepTime() { + minWait := 500 + maxWait := 2000 + duration := time.Duration(rand.Intn(maxWait-minWait)+minWait) * time.Millisecond + + timeout := time.NewTimer(duration) + defer timeout.Stop() + + select { + case <-conn.ctx.Done(): + case <-timeout.C: + } +} + func (conn *Conn) evalStatus() ConnStatus { if conn.statusRelay == StatusConnected || conn.statusICE == StatusConnected { return StatusConnected diff --git a/client/internal/peer/handshaker.go b/client/internal/peer/handshaker.go index 8d4377e36..0ec615291 100644 --- a/client/internal/peer/handshaker.go +++ b/client/internal/peer/handshaker.go @@ -4,17 +4,12 @@ import ( "context" "errors" "sync" - "time" log "github.com/sirupsen/logrus" "github.com/netbirdio/netbird/version" ) -const ( - handshakeCacheTimeout = 3 * time.Second -) - var ( ErrSignalIsNotReady = errors.New("signal is not ready") ) @@ -65,9 +60,6 @@ type Handshaker struct { // remoteAnswerCh is a channel used to wait for remote credentials answer (confirmation of our offer) to proceed with the connection remoteAnswerCh chan OfferAnswer - remoteOfferAnswer *OfferAnswer - remoteOfferAnswerCreated time.Time - lastOfferArgs HandshakeArgs } @@ -88,6 +80,7 @@ func (h *Handshaker) AddOnNewOfferListener(offer func(remoteOfferAnswer *OfferAn func (h *Handshaker) Listen() { for { + log.Debugf("wait for remote offer confirmation") remoteOfferAnswer, err := h.waitForRemoteOfferConfirmation() if err != nil { if _, ok := err.(*ConnectionClosedError); ok { diff --git a/client/internal/peer/worker_ice.go b/client/internal/peer/worker_ice.go index 93ad070ae..75892ca52 100644 --- a/client/internal/peer/worker_ice.go +++ b/client/internal/peer/worker_ice.go @@ -3,7 +3,6 @@ package peer import ( "context" "fmt" - "math/rand" "net" "net/netip" "runtime" @@ -68,18 +67,18 @@ type ICEConnInfo struct { type WorkerICECallbacks struct { OnConnReady func(ConnPriority, ICEConnInfo) OnStatusChanged func(ConnStatus) - DoHandshake func() error } type WorkerICE struct { - ctx context.Context - log *log.Entry - config ConnConfig - configICE ICEConfig - signaler *Signaler - iFaceDiscover stdnet.ExternalIFaceDiscover - statusRecorder *Status - conn WorkerICECallbacks + ctx context.Context + log *log.Entry + config ConnConfig + configICE ICEConfig + signaler *Signaler + iFaceDiscover stdnet.ExternalIFaceDiscover + statusRecorder *Status + hasRelayOnLocally bool + conn WorkerICECallbacks selectedPriority ConnPriority @@ -90,24 +89,21 @@ type WorkerICE struct { sentExtraSrflx bool - localUfrag string - localPwd string - creadantialHasUsed bool - hasRelayOnLocally bool - tickerCancel context.CancelFunc - ticker *time.Ticker + localUfrag string + localPwd string } -func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, configICE ICEConfig, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, callBacks WorkerICECallbacks) (*WorkerICE, error) { +func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, configICE ICEConfig, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, hasRelayOnLocally bool, callBacks WorkerICECallbacks) (*WorkerICE, error) { w := &WorkerICE{ - ctx: ctx, - log: log, - config: config, - configICE: configICE, - signaler: signaler, - iFaceDiscover: ifaceDiscover, - statusRecorder: statusRecorder, - conn: callBacks, + ctx: ctx, + log: log, + config: config, + configICE: configICE, + signaler: signaler, + iFaceDiscover: ifaceDiscover, + statusRecorder: statusRecorder, + hasRelayOnLocally: hasRelayOnLocally, + conn: callBacks, } localUfrag, localPwd, err := generateICECredentials() @@ -119,62 +115,16 @@ func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, config return w, nil } -func (w *WorkerICE) SetupICEConnection(hasRelayOnLocally bool) { - w.muxAgent.Lock() - defer w.muxAgent.Unlock() - if w.agent != nil { - return - } - - w.hasRelayOnLocally = hasRelayOnLocally - go w.sendOffer() -} - -func (w *WorkerICE) sendOffer() { - w.ticker = time.NewTicker(w.config.Timeout) - defer w.ticker.Stop() - - tickerCtx, tickerCancel := context.WithCancel(w.ctx) - w.tickerCancel = tickerCancel - w.conn.OnStatusChanged(StatusConnecting) - - w.log.Debugf("ICE trigger a new handshake") - err := w.conn.DoHandshake() - if err != nil { - w.log.Errorf("%s", err) - } - - for { - w.log.Debugf("ICE trigger new reconnect handshake") - select { - case <-w.ticker.C: - err := w.conn.DoHandshake() - if err != nil { - w.log.Errorf("%s", err) - } - case <-tickerCtx.Done(): - w.log.Debugf("left reconnect loop") - return - case <-w.ctx.Done(): - return - } - } -} - func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { - log.Debugf("OnNewOffer for ICE") + w.log.Debugf("OnNewOffer for ICE") w.muxAgent.Lock() if w.agent != nil { - log.Debugf("agent already exists, skipping the offer") + w.log.Debugf("agent already exists, skipping the offer") w.muxAgent.Unlock() return } - // cancel reconnection loop - w.log.Debugf("canceling reconnection loop") - w.tickerCancel() - var preferredCandidateTypes []ice.CandidateType if w.hasRelayOnLocally && remoteOfferAnswer.RelaySrvAddress != "" { w.selectedPriority = connPriorityICEP2P @@ -184,7 +134,7 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { preferredCandidateTypes = candidateTypes() } - w.log.Debugf("recreate agent") + w.log.Debugf("recreate ICE agent") agentCtx, agentCancel := context.WithCancel(w.ctx) agent, err := w.reCreateAgent(agentCancel, preferredCandidateTypes) if err != nil { @@ -204,14 +154,14 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { // will block until connection succeeded // but it won't release if ICE Agent went into Disconnected or Failed state, // so we have to cancel it with the provided context once agent detected a broken connection - w.log.Debugf("turnAgentDial") + w.log.Debugf("turn agent dial") remoteConn, err := w.turnAgentDial(agentCtx, remoteOfferAnswer) if err != nil { w.log.Debugf("failed to dial the remote peer: %s", err) return } + w.log.Debugf("agent dial succeeded") - w.log.Debugf("GetSelectedCandidatePair") pair, err := w.agent.GetSelectedCandidatePair() if err != nil { return @@ -240,7 +190,7 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { Relayed: isRelayed(pair), RelayedOnLocal: isRelayCandidate(pair.Local), } - w.log.Debugf("on conn ready") + w.log.Debugf("on ICE conn read to use ready") go w.conn.OnConnReady(w.selectedPriority, ci) } @@ -325,7 +275,6 @@ func (w *WorkerICE) reCreateAgent(agentCancel context.CancelFunc, relaySupport [ w.agent = nil w.muxAgent.Unlock() - go w.sendOffer() } }) if err != nil { @@ -430,23 +379,6 @@ func (w *WorkerICE) turnAgentDial(ctx context.Context, remoteOfferAnswer *OfferA } } -// waitForReconnectTry waits for a random duration before trying to reconnect -func (w *WorkerICE) waitForReconnectTry() bool { - minWait := 500 - maxWait := 2000 - duration := time.Duration(rand.Intn(maxWait-minWait)+minWait) * time.Millisecond - - timeout := time.NewTimer(duration) - defer timeout.Stop() - - select { - case <-w.ctx.Done(): - return false - case <-timeout.C: - return true - } -} - func extraSrflxCandidate(candidate ice.Candidate) (*ice.CandidateServerReflexive, error) { relatedAdd := candidate.RelatedAddress() return ice.NewCandidateServerReflexive(&ice.CandidateServerReflexiveConfig{ diff --git a/client/internal/peer/worker_relay.go b/client/internal/peer/worker_relay.go index 158b8e068..5ad4a1fb5 100644 --- a/client/internal/peer/worker_relay.go +++ b/client/internal/peer/worker_relay.go @@ -2,6 +2,7 @@ package peer import ( "context" + "errors" "net" log "github.com/sirupsen/logrus" @@ -16,8 +17,8 @@ type RelayConnInfo struct { } type WorkerRelayCallbacks struct { - OnConnReady func(RelayConnInfo) - OnStatusChanged func(ConnStatus) + OnConnReady func(RelayConnInfo) + OnDisconnected func() } type WorkerRelay struct { @@ -41,9 +42,6 @@ func NewWorkerRelay(ctx context.Context, log *log.Entry, config ConnConfig, rela func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) { if !w.isRelaySupported(remoteOfferAnswer) { w.log.Infof("Relay is not supported by remote peer") - // todo should we retry? - // if the remote peer doesn't support relay make no sense to retry infinity - // but if the remote peer supports relay just the connection is lost we should retry return } @@ -55,12 +53,19 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) { } srv := w.preferredRelayServer(currentRelayAddress.String(), remoteOfferAnswer.RelaySrvAddress) - relayedConn, err := w.relayManager.OpenConn(srv, w.config.Key) + + relayedConn, err := w.relayManager.OpenConn(srv, w.config.Key, w.conn.OnDisconnected) if err != nil { + // todo handle all type errors + if errors.Is(err, relayClient.ErrConnAlreadyExists) { + w.log.Infof("do not need to reopen relay connection") + return + } w.log.Infof("do not need to reopen relay connection: %s", err) return } + w.log.Debugf("Relay connection established with %s", srv) go w.conn.OnConnReady(RelayConnInfo{ relayedConn: relayedConn, rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey, @@ -72,6 +77,14 @@ func (w *WorkerRelay) RelayAddress() (net.Addr, error) { return w.relayManager.RelayAddress() } +func (w *WorkerRelay) IsController() bool { + return w.config.LocalKey > w.config.Key +} + +func (w *WorkerRelay) RelayIsSupportedLocally() bool { + return w.relayManager.HasRelayAddress() +} + func (w *WorkerRelay) isRelaySupported(answer *OfferAnswer) bool { if !w.relayManager.HasRelayAddress() { return false @@ -80,12 +93,8 @@ func (w *WorkerRelay) isRelaySupported(answer *OfferAnswer) bool { } func (w *WorkerRelay) preferredRelayServer(myRelayAddress, remoteRelayAddress string) string { - if w.config.LocalKey > w.config.Key { + if w.IsController() { return myRelayAddress } return remoteRelayAddress } - -func (w *WorkerRelay) RelayIsSupportedLocally() bool { - return w.relayManager.HasRelayAddress() -} diff --git a/client/internal/wgproxy/proxy_userspace.go b/client/internal/wgproxy/proxy_userspace.go index e51bfbdcd..c2c8a9b51 100644 --- a/client/internal/wgproxy/proxy_userspace.go +++ b/client/internal/wgproxy/proxy_userspace.go @@ -73,6 +73,7 @@ func (p *WGUserSpaceProxy) proxyToRemote() { default: n, err := p.localConn.Read(buf) if err != nil { + log.Debugf("failed to read from wg interface conn: %s", err) continue } @@ -80,6 +81,8 @@ func (p *WGUserSpaceProxy) proxyToRemote() { if err != nil { if err == io.EOF { p.cancel() + } else { + log.Debugf("failed to write to remote conn: %s", err) } continue } @@ -103,11 +106,13 @@ func (p *WGUserSpaceProxy) proxyToLocal() { p.cancel() return } + log.Errorf("failed to read from remote conn: %s", err) continue } _, err = p.localConn.Write(buf[:n]) if err != nil { + log.Debugf("failed to write to wg interface conn: %s", err) continue } } diff --git a/relay/client/client.go b/relay/client/client.go index 612080abd..1ba815018 100644 --- a/relay/client/client.go +++ b/relay/client/client.go @@ -115,7 +115,7 @@ func NewClient(ctx context.Context, serverAddress, peerID string) *Client { // Connect establishes a connection to the relay server. It blocks until the connection is established or an error occurs. func (c *Client) Connect() error { - log.Infof("connecting to relay server: %s", c.serverAddress) + c.log.Infof("connecting to relay server: %s", c.serverAddress) c.readLoopMutex.Lock() defer c.readLoopMutex.Unlock() diff --git a/relay/client/guard.go b/relay/client/guard.go index 5cea25653..050a9f0e7 100644 --- a/relay/client/guard.go +++ b/relay/client/guard.go @@ -21,7 +21,6 @@ func NewGuard(context context.Context, relayClient *Client) *Guard { ctx: context, relayClient: relayClient, } - return g } diff --git a/relay/client/manager.go b/relay/client/manager.go index 1810b8167..ae44be4b9 100644 --- a/relay/client/manager.go +++ b/relay/client/manager.go @@ -44,19 +44,22 @@ type Manager struct { relayClients map[string]*RelayTrack relayClientsMutex sync.RWMutex + + onDisconnectedListeners map[string]map[*func()]struct{} + listenerLock sync.Mutex } func NewManager(ctx context.Context, serverAddress string, peerID string) *Manager { return &Manager{ - ctx: ctx, - srvAddress: serverAddress, - peerID: peerID, - relayClients: make(map[string]*RelayTrack), + ctx: ctx, + srvAddress: serverAddress, + peerID: peerID, + relayClients: make(map[string]*RelayTrack), + onDisconnectedListeners: make(map[string]map[*func()]struct{}), } } // Serve starts the manager. It will establish a connection to the relay server and start the relay cleanup loop. -// todo: consider to return an error if the initial connection to the relay server is not established. func (m *Manager) Serve() error { if m.relayClient != nil { return fmt.Errorf("manager already serving") @@ -70,8 +73,9 @@ func (m *Manager) Serve() error { } m.reconnectGuard = NewGuard(m.ctx, m.relayClient) - m.relayClient.SetOnDisconnectListener(m.reconnectGuard.OnDisconnected) - + m.relayClient.SetOnDisconnectListener(func() { + m.onServerDisconnected(m.srvAddress) + }) m.startCleanupLoop() return nil @@ -80,7 +84,7 @@ func (m *Manager) Serve() error { // OpenConn opens a connection to the given peer key. If the peer is on the same relay server, the connection will be // established via the relay server. If the peer is on a different relay server, the manager will establish a new // connection to the relay server. -func (m *Manager) OpenConn(serverAddress, peerKey string) (net.Conn, error) { +func (m *Manager) OpenConn(serverAddress, peerKey string, onClosedListener func()) (net.Conn, error) { if m.relayClient == nil { return nil, errRelayClientNotConnected } @@ -90,13 +94,26 @@ func (m *Manager) OpenConn(serverAddress, peerKey string) (net.Conn, error) { return nil, err } + var ( + netConn net.Conn + ) if !foreign { log.Debugf("open peer connection via permanent server: %s", peerKey) - return m.relayClient.OpenConn(peerKey) + netConn, err = m.relayClient.OpenConn(peerKey) } else { log.Debugf("open peer connection via foreign server: %s", serverAddress) - return m.openConnVia(serverAddress, peerKey) + netConn, err = m.openConnVia(serverAddress, peerKey) } + + if err != nil { + return nil, err + } + + if onClosedListener != nil { + m.addListener(serverAddress, onClosedListener) + } + + return netConn, err } // RelayAddress returns the address of the permanent relay server. It could change if the network connection is lost. @@ -152,7 +169,7 @@ func (m *Manager) openConnVia(serverAddress, peerKey string) (net.Conn, error) { } // if connection closed then delete the relay client from the list relayClient.SetOnDisconnectListener(func() { - m.deleteRelayConn(serverAddress) + m.onServerDisconnected(serverAddress) }) rt.relayClient = relayClient rt.Unlock() @@ -164,11 +181,12 @@ func (m *Manager) openConnVia(serverAddress, peerKey string) (net.Conn, error) { return conn, nil } -func (m *Manager) deleteRelayConn(address string) { - log.Infof("deleting relay client for %s", address) - m.relayClientsMutex.Lock() - delete(m.relayClients, address) - m.relayClientsMutex.Unlock() +func (m *Manager) onServerDisconnected(serverAddress string) { + if serverAddress == m.srvAddress { + m.reconnectGuard.OnDisconnected() + } + + m.notifyOnDisconnectListeners(serverAddress) } func (m *Manager) isForeignServer(address string) (bool, error) { @@ -212,8 +230,33 @@ func (m *Manager) cleanUpUnusedRelays() { go func() { _ = rt.relayClient.Close() }() - log.Debugf("clean up relay client: %s", addr) + log.Debugf("clean up unused relay server connection: %s", addr) delete(m.relayClients, addr) rt.Unlock() } } + +func (m *Manager) addListener(serverAddress string, onClosedListener func()) { + m.listenerLock.Lock() + l, ok := m.onDisconnectedListeners[serverAddress] + if !ok { + l = make(map[*func()]struct{}) + } + l[&onClosedListener] = struct{}{} + m.onDisconnectedListeners[serverAddress] = l + m.listenerLock.Unlock() +} + +func (m *Manager) notifyOnDisconnectListeners(serverAddress string) { + m.listenerLock.Lock() + l, ok := m.onDisconnectedListeners[serverAddress] + if !ok { + return + } + for f := range l { + go (*f)() + } + delete(m.onDisconnectedListeners, serverAddress) + m.listenerLock.Unlock() + +} diff --git a/relay/client/manager_test.go b/relay/client/manager_test.go index f4de9b4de..69539cc5e 100644 --- a/relay/client/manager_test.go +++ b/relay/client/manager_test.go @@ -61,11 +61,11 @@ func TestForeignConn(t *testing.T) { if err != nil { t.Fatalf("failed to get relay address: %s", err) } - connAliceToBob, err := clientAlice.OpenConn(bobsSrvAddr.String(), idBob) + connAliceToBob, err := clientAlice.OpenConn(bobsSrvAddr.String(), idBob, nil) if err != nil { t.Fatalf("failed to bind channel: %s", err) } - connBobToAlice, err := clientBob.OpenConn(bobsSrvAddr.String(), idAlice) + connBobToAlice, err := clientBob.OpenConn(bobsSrvAddr.String(), idAlice, nil) if err != nil { t.Fatalf("failed to bind channel: %s", err) } @@ -139,7 +139,7 @@ func TestForeginConnClose(t *testing.T) { mgr := NewManager(mCtx, addr1, idAlice) mgr.Serve() - conn, err := mgr.OpenConn(addr2, "anotherpeer") + conn, err := mgr.OpenConn(addr2, "anotherpeer", nil) if err != nil { t.Fatalf("failed to bind channel: %s", err) } @@ -198,7 +198,7 @@ func TestForeginAutoClose(t *testing.T) { mgr.Serve() t.Log("open connection to another peer") - conn, err := mgr.OpenConn(addr2, "anotherpeer") + conn, err := mgr.OpenConn(addr2, "anotherpeer", nil) if err != nil { t.Fatalf("failed to bind channel: %s", err) } @@ -246,7 +246,7 @@ func TestAutoReconnect(t *testing.T) { if err != nil { t.Errorf("failed to get relay address: %s", err) } - conn, err := clientAlice.OpenConn(ra.String(), "bob") + conn, err := clientAlice.OpenConn(ra.String(), "bob", nil) if err != nil { t.Errorf("failed to bind channel: %s", err) } @@ -264,7 +264,7 @@ func TestAutoReconnect(t *testing.T) { time.Sleep(reconnectingTimeout + 1*time.Second) log.Infof("reopent the connection") - _, err = clientAlice.OpenConn(ra.String(), "bob") + _, err = clientAlice.OpenConn(ra.String(), "bob", nil) if err != nil { t.Errorf("failed to open channel: %s", err) } diff --git a/relay/cmd/main.go b/relay/cmd/main.go index dc5cfe69d..eb7d87a1f 100644 --- a/relay/cmd/main.go +++ b/relay/cmd/main.go @@ -32,7 +32,7 @@ func init() { func waitForExitSignal() { osSigs := make(chan os.Signal, 1) signal.Notify(osSigs, syscall.SIGINT, syscall.SIGTERM) - _ = <-osSigs + <-osSigs } func execute(cmd *cobra.Command, args []string) { diff --git a/relay/messages/id.go b/relay/messages/id.go index d37dc37f8..c2248fb2b 100644 --- a/relay/messages/id.go +++ b/relay/messages/id.go @@ -16,5 +16,5 @@ func HashID(peerID string) ([]byte, string) { } func HashIDToString(idHash []byte) string { - return base64.StdEncoding.EncodeToString(idHash[:]) + return base64.StdEncoding.EncodeToString(idHash) } diff --git a/relay/server/server.go b/relay/server/server.go index bb9666a53..e27fd0f4f 100644 --- a/relay/server/server.go +++ b/relay/server/server.go @@ -80,7 +80,7 @@ func (r *Server) Close() error { func (r *Server) accept(conn net.Conn) { peer, err := handShake(conn) if err != nil { - log.Errorf("failed to handshake wiht %s: %s", conn.RemoteAddr(), err) + log.Errorf("failed to handshake with %s: %s", conn.RemoteAddr(), err) cErr := conn.Close() if cErr != nil { log.Errorf("failed to close connection, %s: %s", conn.RemoteAddr(), cErr) @@ -134,7 +134,6 @@ func (r *Server) accept(conn net.Conn) { if err != nil { peer.Log.Errorf("failed to write transport message to: %s", dp.String()) } - return }() case messages.MsgClose: peer.Log.Infof("peer disconnected gracefully")