netbird/client/internal/peer/worker_relay.go

166 lines
4.0 KiB
Go
Raw Normal View History

package peer
import (
"context"
2024-06-25 15:13:08 +02:00
"errors"
"net"
"time"
log "github.com/sirupsen/logrus"
relayClient "github.com/netbirdio/netbird/relay/client"
)
var (
wgHandshakePeriod = 2 * time.Minute
wgHandshakeOvertime = 30000 * time.Millisecond
)
type RelayConnInfo struct {
relayedConn net.Conn
rosenpassPubKey []byte
rosenpassAddr string
}
2024-06-19 11:52:40 +02:00
type WorkerRelayCallbacks struct {
2024-06-25 15:13:08 +02:00
OnConnReady func(RelayConnInfo)
OnDisconnected func()
2024-06-19 11:52:40 +02:00
}
2024-06-18 11:22:40 +02:00
type WorkerRelay struct {
2024-07-23 23:04:38 +02:00
parentCtx context.Context
2024-06-19 11:52:40 +02:00
log *log.Entry
config ConnConfig
relayManager relayClient.ManagerService
2024-06-19 11:52:40 +02:00
conn WorkerRelayCallbacks
2024-07-23 23:04:38 +02:00
ctxCancel context.CancelFunc
}
func NewWorkerRelay(ctx context.Context, log *log.Entry, config ConnConfig, relayManager relayClient.ManagerService, callbacks WorkerRelayCallbacks) *WorkerRelay {
r := &WorkerRelay{
2024-07-23 23:04:38 +02:00
parentCtx: ctx,
2024-06-19 11:52:40 +02:00
log: log,
config: config,
2024-06-21 15:35:15 +02:00
relayManager: relayManager,
2024-06-19 11:52:40 +02:00
conn: callbacks,
}
return r
}
2024-06-21 12:35:28 +02:00
func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
if !w.isRelaySupported(remoteOfferAnswer) {
w.log.Infof("Relay is not supported by remote peer")
return
}
// the relayManager will return with error in case if the connection has lost with relay server
2024-07-02 11:57:17 +02:00
currentRelayAddress, err := w.relayManager.RelayInstanceAddress()
2024-06-21 12:35:28 +02:00
if err != nil {
w.log.Errorf("failed to handle new offer: %s", err)
2024-06-21 12:35:28 +02:00
return
}
2024-07-02 11:57:17 +02:00
srv := w.preferredRelayServer(currentRelayAddress, remoteOfferAnswer.RelaySrvAddress)
2024-06-25 15:13:08 +02:00
relayedConn, err := w.relayManager.OpenConn(srv, w.config.Key)
2024-06-21 12:35:28 +02:00
if err != nil {
2024-06-25 15:13:08 +02:00
// todo handle all type errors
if errors.Is(err, relayClient.ErrConnAlreadyExists) {
w.log.Infof("do not need to reopen relay connection")
return
}
w.log.Errorf("failed to open connection via Relay: %s", err)
2024-06-21 12:35:28 +02:00
return
}
2024-06-21 12:35:28 +02:00
ctx, ctxCancel := context.WithCancel(w.parentCtx)
w.ctxCancel = ctxCancel
2024-07-25 12:37:59 +02:00
err = w.relayManager.AddCloseListener(srv, w.disconnected)
if err != nil {
log.Errorf("failed to add close listener: %s", err)
_ = relayedConn.Close()
ctxCancel()
return
}
go w.wgStateCheck(ctx, relayedConn)
w.log.Debugf("peer conn opened via Relay: %s", srv)
2024-06-21 12:35:28 +02:00
go w.conn.OnConnReady(RelayConnInfo{
relayedConn: relayedConn,
rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey,
rosenpassAddr: remoteOfferAnswer.RosenpassAddr,
})
}
2024-07-02 11:57:17 +02:00
func (w *WorkerRelay) RelayInstanceAddress() (string, error) {
return w.relayManager.RelayInstanceAddress()
}
2024-06-25 15:13:08 +02:00
func (w *WorkerRelay) IsController() bool {
return w.config.LocalKey > w.config.Key
}
func (w *WorkerRelay) RelayIsSupportedLocally() bool {
return w.relayManager.HasRelayAddress()
}
// wgStateCheck help to check the state of the wireguard handshake and relay connection
func (w *WorkerRelay) wgStateCheck(ctx context.Context, conn net.Conn) {
timer := time.NewTimer(wgHandshakeOvertime)
defer timer.Stop()
for {
select {
case <-timer.C:
lastHandshake, err := w.wgState()
if err != nil {
w.log.Errorf("failed to read wg stats: %v", err)
continue
}
2024-07-23 23:04:38 +02:00
w.log.Tracef("last handshake: %v", lastHandshake)
if time.Since(lastHandshake) > wgHandshakePeriod {
w.log.Infof("Wireguard handshake timed out, closing relay connection")
_ = conn.Close()
w.conn.OnDisconnected()
return
}
2024-07-24 18:01:43 +02:00
resetTime := time.Until(lastHandshake.Add(wgHandshakeOvertime + wgHandshakePeriod))
timer.Reset(resetTime)
case <-ctx.Done():
return
}
}
}
2024-06-18 11:22:40 +02:00
func (w *WorkerRelay) isRelaySupported(answer *OfferAnswer) bool {
2024-06-19 11:52:40 +02:00
if !w.relayManager.HasRelayAddress() {
return false
}
return answer.RelaySrvAddress != ""
}
2024-06-18 11:22:40 +02:00
func (w *WorkerRelay) preferredRelayServer(myRelayAddress, remoteRelayAddress string) string {
2024-06-25 15:13:08 +02:00
if w.IsController() {
return myRelayAddress
}
return remoteRelayAddress
}
func (w *WorkerRelay) wgState() (time.Time, error) {
wgState, err := w.config.WgConfig.WgInterface.GetStats(w.config.Key)
if err != nil {
return time.Time{}, err
}
return wgState.LastHandshake, nil
}
2024-07-23 23:04:38 +02:00
func (w *WorkerRelay) disconnected() {
if w.ctxCancel != nil {
w.ctxCancel()
}
2024-07-23 23:04:38 +02:00
w.conn.OnDisconnected()
}