Fallback to relay conn

This commit is contained in:
Zoltán Papp 2024-06-20 18:17:30 +02:00
parent c7db2c0524
commit 6801dcb3f6
6 changed files with 122 additions and 17 deletions

View File

@ -80,7 +80,8 @@ type Conn struct {
config ConnConfig
statusRecorder *Status
wgProxyFactory *wgproxy.Factory
wgProxy wgproxy.Proxy
wgProxyICE wgproxy.Proxy
wgProxyRelay wgproxy.Proxy
signaler *Signaler
allowedIPsIP string
handshaker *Handshaker
@ -99,6 +100,8 @@ type Conn struct {
afterRemovePeerHooks []AfterRemovePeerHookFunc
currentConnType ConnPriority
endpointRelay *net.UDPAddr
}
// NewConn creates a new not opened Conn to the remote peer.
@ -147,7 +150,6 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu
// Open opens connection to the remote peer
// It will try to establish a connection using ICE and in parallel with relay. The higher priority connection type will
// be used.
// todo implement on disconnected event from ICE and relay too.
func (conn *Conn) Open() {
conn.log.Debugf("open connection to peer")
@ -176,12 +178,20 @@ func (conn *Conn) Close() {
defer conn.mu.Unlock()
conn.ctxCancel()
if conn.wgProxy != nil {
err := conn.wgProxy.CloseConn()
if conn.wgProxyRelay != nil {
err := conn.wgProxyRelay.CloseConn()
if err != nil {
conn.log.Errorf("failed to close wg proxy: %v", err)
conn.log.Errorf("failed to close wg proxy for relay: %v", err)
}
conn.wgProxy = nil
conn.wgProxyRelay = nil
}
if conn.wgProxyICE != nil {
err := conn.wgProxyICE.CloseConn()
if err != nil {
conn.log.Errorf("failed to close wg proxy for ice: %v", err)
}
conn.wgProxyICE = nil
}
// todo: is it problem if we try to remove a peer what is never existed?
@ -277,6 +287,7 @@ func (conn *Conn) GetKey() string {
func (conn *Conn) onWorkerICEStateChanged(newState ConnStatus) {
conn.mu.Lock()
defer conn.mu.Unlock()
log.Debugf("ICE connection state changed to %s", newState)
defer func() {
conn.statusICE = newState
}()
@ -289,6 +300,16 @@ func (conn *Conn) onWorkerICEStateChanged(newState ConnStatus) {
return
}
if conn.endpointRelay != nil {
err := conn.configureWGEndpoint(conn.endpointRelay)
if err != nil {
conn.log.Errorf("failed to switch back to relay conn: %v", err)
}
// todo update status to relay related things
log.Debugf("switched back to relay connection")
return
}
if newState > conn.statusICE {
peerState := State{
PubKey: conn.config.Key,
@ -307,6 +328,8 @@ func (conn *Conn) onWorkerRelayStateChanged(newState ConnStatus) {
conn.statusRelay = newState
}()
conn.log.Debugf("Relay connection state changed to %s", newState)
if conn.statusICE == StatusConnected {
return
}
@ -363,7 +386,7 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) {
}
}
err = conn.config.WgConfig.WgInterface.UpdatePeer(conn.config.WgConfig.RemoteKey, conn.config.WgConfig.AllowedIps, defaultWgKeepAlive, endpointUdpAddr, conn.config.WgConfig.PreSharedKey)
err = conn.configureWGEndpoint(endpointUdpAddr)
if err != nil {
if err := wgProxy.CloseConn(); err != nil {
conn.log.Warnf("Failed to close relay connection: %v", err)
@ -371,14 +394,14 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) {
conn.log.Errorf("Failed to update wg peer configuration: %v", err)
return
}
conn.endpointRelay = endpointUdpAddr
if conn.wgProxy != nil {
if err := conn.wgProxy.CloseConn(); err != nil {
if conn.wgProxyRelay != nil {
if err := conn.wgProxyRelay.CloseConn(); err != nil {
conn.log.Warnf("failed to close depracated wg proxy conn: %v", err)
}
}
conn.wgProxy = wgProxy
conn.wgProxyRelay = wgProxy
conn.currentConnType = connPriorityRelay
peerState := State{
@ -408,6 +431,8 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon
if conn.currentConnType != 0 {
conn.log.Infof("update connection to ICE type")
} else {
conn.log.Infof("set ICE to active connection")
}
var (
@ -417,7 +442,7 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon
if iceConnInfo.RelayedOnLocal {
conn.log.Debugf("setup ice turn connection")
wgProxy = conn.wgProxyFactory.GetProxy(conn.ctx)
ep, err := conn.wgProxy.AddTurnConn(iceConnInfo.RemoteConn)
ep, err := conn.wgProxyICE.AddTurnConn(iceConnInfo.RemoteConn)
if err != nil {
conn.log.Errorf("failed to add turn net.Conn to local proxy: %v", err)
return
@ -448,12 +473,12 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon
return
}
if conn.wgProxy != nil {
if err := conn.wgProxy.CloseConn(); err != nil {
if conn.wgProxyICE != nil {
if err := conn.wgProxyICE.CloseConn(); err != nil {
conn.log.Warnf("failed to close depracated wg proxy conn: %v", err)
}
}
conn.wgProxy = wgProxy
conn.wgProxyICE = wgProxy
conn.currentConnType = priority
@ -469,6 +494,15 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon
conn.updateStatus(peerState, iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr)
}
func (conn *Conn) configureWGEndpoint(addr *net.UDPAddr) error {
return conn.config.WgConfig.WgInterface.UpdatePeer(
conn.config.WgConfig.RemoteKey,
conn.config.WgConfig.AllowedIps,
defaultWgKeepAlive,
addr,
conn.config.WgConfig.PreSharedKey,
)
}
func (conn *Conn) updateStatus(peerState State, remoteRosenpassPubKey []byte, remoteRosenpassAddr string) {
peerState.PubKey = conn.config.Key
peerState.ConnStatus = StatusConnected

View File

@ -2,15 +2,19 @@ package peer
import (
"context"
"os"
"sync"
"testing"
"time"
"github.com/magiconair/properties/assert"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/internal/stdnet"
"github.com/netbirdio/netbird/client/internal/wgproxy"
"github.com/netbirdio/netbird/iface"
relayClient "github.com/netbirdio/netbird/relay/client"
"github.com/netbirdio/netbird/util"
)
var connConf = ConnConfig{
@ -23,6 +27,12 @@ var connConf = ConnConfig{
},
}
func TestMain(m *testing.M) {
_ = util.InitLog("trace", "console")
code := m.Run()
os.Exit(code)
}
func TestNewConn_interfaceFilter(t *testing.T) {
ignore := []string{iface.WgInterfaceDefault, "tun0", "zt", "ZeroTier", "utun", "wg", "ts",
"Tailscale", "tailscale"}
@ -158,3 +168,50 @@ func TestConn_Status(t *testing.T) {
})
}
}
func TestConn_Switch(t *testing.T) {
ctx := context.Background()
wgProxyFactory := wgproxy.NewFactory(ctx, connConf.LocalWgPort)
connConfAlice := ConnConfig{
Key: "LLHf3Ma6z6mdLbriAJbqhX7+nM/B71lgw2+91q3LfhU=",
LocalKey: "RRHf3Ma6z6mdLbriAJbqhX7+nM/B71lgw2+91q3LfhU=",
Timeout: time.Second,
LocalWgPort: 51820,
ICEConfig: ICEConfig{
InterfaceBlackList: nil,
},
WgConfig: WgConfig{
WgListenPort: 51820,
RemoteKey: "LLHf3Ma6z6mdLbriAJbqhX7+nM/B71lgw2+91q3LfhU=",
AllowedIps: "172.16.254.0/16",
},
}
relayManagerAlice := relayClient.NewManager(ctx, "127.0.0.1:1234", connConf.LocalKey)
connAlice, err := NewConn(ctx, connConfAlice, NewRecorder("https://mgm"), wgProxyFactory, nil, nil, relayManagerAlice)
if err != nil {
log.Fatalf("failed to create conn: %v", err)
}
connAlice.Open()
connConfbob := ConnConfig{
Key: "RRHf3Ma6z6mdLbriAJbqhX7+nM/B71lgw2+91q3LfhU=",
LocalKey: "LLHf3Ma6z6mdLbriAJbqhX7+nM/B71lgw2+91q3LfhU=",
Timeout: time.Second,
LocalWgPort: 51820,
ICEConfig: ICEConfig{
InterfaceBlackList: nil,
},
WgConfig: WgConfig{
WgListenPort: 51820,
RemoteKey: "RRHf3Ma6z6mdLbriAJbqhX7+nM/B71lgw2+91q3LfhU=",
AllowedIps: "172.16.254.0/16",
},
}
relayManagerBob := relayClient.NewManager(ctx, "127.0.0.1:1234", connConf.LocalKey)
connBob, err := NewConn(ctx, connConfbob, NewRecorder("https://mgm"), wgProxyFactory, nil, nil, relayManagerBob)
if err != nil {
log.Fatalf("failed to create conn: %v", err)
}
connBob.Open()
}

View File

@ -87,6 +87,7 @@ func (h *Handshaker) Handshake(args HandshakeArgs) (*OfferAnswer, error) {
h.mu.Lock()
defer h.mu.Unlock()
h.log.Infof("start handshake with remote peer")
h.handshakeArgs = args
cachedOfferAnswer, ok := h.cachedHandshake()
@ -195,6 +196,7 @@ func (h *Handshaker) waitForRemoteOfferConfirmation() (*OfferAnswer, error) {
case remoteOfferAnswer := <-h.remoteAnswerCh:
return &remoteOfferAnswer, nil
case <-timeout.C:
h.log.Debugf("handshake timeout")
return nil, NewConnectionTimeoutError(h.config.Key, h.config.Timeout)
case <-h.ctx.Done():
// closed externally

View File

@ -113,6 +113,7 @@ func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, config
// It is trying to reconnection in a loop until the context is canceled.
// In case of success connection it will call the onICEConnReady callback.
func (w *WorkerICE) SetupICEConnection(hasRelayOnLocally bool) {
time.Sleep(20 * time.Second)
for {
if !w.waitForReconnectTry() {
return

View File

@ -8,6 +8,17 @@ import (
func NewFactory(ctx context.Context, wgPort int) *Factory {
f := &Factory{wgPort: wgPort}
// todo: put it back
/*
ebpfProxy := NewWGEBPFProxy(ctx, wgPort)
err := ebpfProxy.listen()
if err != nil {
log.Warnf("failed to initialize ebpf proxy, fallback to user space proxy: %s", err)
return f
}
f.ebpfProxy = ebpfProxy
*/
return f
}

View File

@ -91,10 +91,10 @@ func (m *Manager) OpenConn(serverAddress, peerKey string) (net.Conn, error) {
}
if !foreign {
log.Debugf("open connection to permanent server: %s", peerKey)
log.Debugf("open peer connection via permanent server: %s", peerKey)
return m.relayClient.OpenConn(peerKey)
} else {
log.Debugf("open connection to foreign server: %s", serverAddress)
log.Debugf("open peer connection via foreign server: %s", serverAddress)
return m.openConnVia(serverAddress, peerKey)
}
}