Check wg handshake status on worker relay

This commit is contained in:
Zoltán Papp 2024-07-23 22:43:20 +02:00
parent 2576221315
commit e9e3b8ba10
4 changed files with 63 additions and 19 deletions

View File

@ -336,13 +336,12 @@ func (conn *Conn) reconnectLoopWithRetry() {
return return
} }
// checks if there is peer connection is established via relay or ice and that it has a wireguard handshake and skip offer // checks if there is peer connection is established via relay or ice
conn.log.Tracef("ticker timedout, relay state: %s, ice state: %s", conn.statusRelay, conn.statusICE) conn.log.Infof("ticker timedout, relay state: %s, ice state: %s", conn.statusRelay, conn.statusICE)
if conn.isConnected() { if conn.isConnected() {
continue continue
} }
conn.log.Debugf("ticker timed out, retry to do handshake")
err := conn.handshaker.sendOffer() err := conn.handshaker.sendOffer()
if err != nil { if err != nil {
conn.log.Errorf("failed to do handshake: %v", err) conn.log.Errorf("failed to do handshake: %v", err)
@ -722,18 +721,7 @@ func (conn *Conn) isConnected() bool {
if conn.statusICE != StatusConnected && conn.statusICE != StatusConnecting { if conn.statusICE != StatusConnected && conn.statusICE != StatusConnecting {
return false return false
} }
wgStats, err := conn.config.WgConfig.WgInterface.GetStats(conn.config.Key)
if err != nil {
conn.log.Errorf("failed to get wg stats: %v", err)
return false
}
if time.Since(wgStats.LastHandshake) > 2*time.Minute {
return false
}
return true return true
} }
func isRosenpassEnabled(remoteRosenpassPubKey []byte) bool { func isRosenpassEnabled(remoteRosenpassPubKey []byte) bool {

View File

@ -4,12 +4,19 @@ import (
"context" "context"
"errors" "errors"
"net" "net"
"time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/iface"
relayClient "github.com/netbirdio/netbird/relay/client" relayClient "github.com/netbirdio/netbird/relay/client"
) )
var (
wgHandshakePeriod = 2 * time.Minute
wgHandshakeOvertime = 30000 * time.Millisecond
)
type RelayConnInfo struct { type RelayConnInfo struct {
relayedConn net.Conn relayedConn net.Conn
rosenpassPubKey []byte rosenpassPubKey []byte
@ -25,11 +32,12 @@ type WorkerRelay struct {
ctx context.Context ctx context.Context
log *log.Entry log *log.Entry
config ConnConfig config ConnConfig
relayManager *relayClient.Manager wgInterface iface.IWGIface
relayManager relayClient.ManagerService
conn WorkerRelayCallbacks conn WorkerRelayCallbacks
} }
func NewWorkerRelay(ctx context.Context, log *log.Entry, config ConnConfig, relayManager *relayClient.Manager, callbacks WorkerRelayCallbacks) *WorkerRelay { func NewWorkerRelay(ctx context.Context, log *log.Entry, config ConnConfig, relayManager relayClient.ManagerService, callbacks WorkerRelayCallbacks) *WorkerRelay {
return &WorkerRelay{ return &WorkerRelay{
ctx: ctx, ctx: ctx,
log: log, log: log,
@ -48,7 +56,7 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
// the relayManager will return with error in case if the connection has lost with relay server // the relayManager will return with error in case if the connection has lost with relay server
currentRelayAddress, err := w.relayManager.RelayInstanceAddress() currentRelayAddress, err := w.relayManager.RelayInstanceAddress()
if err != nil { if err != nil {
w.log.Infof("local Relay connection is lost, skipping connection attempt") w.log.Errorf("failed to handle new offer: %s", err)
return return
} }
@ -61,10 +69,12 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
w.log.Infof("do not need to reopen relay connection") w.log.Infof("do not need to reopen relay connection")
return return
} }
w.log.Infof("do not need to reopen relay connection: %s", err) w.log.Errorf("failed to open connection via Relay: %s", err)
return return
} }
go w.wgStateCheck(relayedConn)
w.log.Debugf("Relay connection established with %s", srv) w.log.Debugf("Relay connection established with %s", srv)
go w.conn.OnConnReady(RelayConnInfo{ go w.conn.OnConnReady(RelayConnInfo{
relayedConn: relayedConn, relayedConn: relayedConn,
@ -85,6 +95,35 @@ func (w *WorkerRelay) RelayIsSupportedLocally() bool {
return w.relayManager.HasRelayAddress() return w.relayManager.HasRelayAddress()
} }
// wgStateCheck help to check the state of the wireguard handshake and relay connection
func (w *WorkerRelay) wgStateCheck(conn net.Conn) {
timer := time.NewTimer(wgHandshakeOvertime)
defer timer.Stop()
for {
select {
case <-timer.C:
lastHandshake, err := w.wgState()
if err != nil {
w.log.Errorf("failed to read wg stats: %v", err)
continue
}
log.Infof("last handshake: %v", lastHandshake)
if time.Since(lastHandshake) > wgHandshakePeriod {
w.log.Infof("Wireguard handshake timed out, closing relay connection")
_ = conn.Close()
w.conn.OnDisconnected()
return
}
resetTime := (lastHandshake.Add(wgHandshakeOvertime + wgHandshakePeriod)).Sub(time.Now())
timer.Reset(resetTime)
case <-w.ctx.Done():
return
}
}
}
func (w *WorkerRelay) isRelaySupported(answer *OfferAnswer) bool { func (w *WorkerRelay) isRelaySupported(answer *OfferAnswer) bool {
if !w.relayManager.HasRelayAddress() { if !w.relayManager.HasRelayAddress() {
return false return false
@ -98,3 +137,11 @@ func (w *WorkerRelay) preferredRelayServer(myRelayAddress, remoteRelayAddress st
} }
return remoteRelayAddress return remoteRelayAddress
} }
func (w *WorkerRelay) wgState() (time.Time, error) {
wgState, err := w.config.WgConfig.WgInterface.GetStats(w.config.Key)
if err != nil {
return time.Time{}, err
}
return wgState.LastHandshake, nil
}

View File

@ -181,7 +181,7 @@ func (p *WGEBPFProxy) proxyToRemote() {
conn, ok := p.turnConnStore[uint16(addr.Port)] conn, ok := p.turnConnStore[uint16(addr.Port)]
p.turnConnMutex.Unlock() p.turnConnMutex.Unlock()
if !ok { if !ok {
log.Infof("turn conn not found by port: %d", addr.Port) log.Infof("turn conn not found by port, exit form proxy: %d", addr.Port)
return // todo replace it to return. For debug troubleshooting keep it return // todo replace it to return. For debug troubleshooting keep it
} }

View File

@ -30,6 +30,15 @@ func NewRelayTrack() *RelayTrack {
return &RelayTrack{} return &RelayTrack{}
} }
type ManagerService interface {
Serve() error
OpenConn(serverAddress, peerKey string, onClosedListener func()) (net.Conn, error)
RelayInstanceAddress() (string, error)
ServerURL() string
HasRelayAddress() bool
UpdateToken(token *relayAuth.Token)
}
// Manager is a manager for the relay client. It establish one persistent connection to the given relay server. In case // Manager is a manager for the relay client. It establish one persistent connection to the given relay server. In case
// of network error the manager will try to reconnect to the server. // of network error the manager will try to reconnect to the server.
// The manager also manage temproary relay connection. If a client wants to communicate with an another client on a // The manager also manage temproary relay connection. If a client wants to communicate with an another client on a