From 4ebf6e1c4c5b549ad6983b2a7a36874fd8a85dc4 Mon Sep 17 00:00:00 2001 From: Zoltan Papp Date: Wed, 25 Sep 2024 18:50:10 +0200 Subject: [PATCH] [client] Close the remote conn in proxy (#2626) Port the conn close call to eBPF proxy --- client/internal/engine.go | 2 +- client/internal/peer/conn.go | 8 +- client/internal/peer/conn_test.go | 8 +- .../internal/wgproxy/{ => ebpf}/portlookup.go | 2 +- .../wgproxy/{ => ebpf}/portlookup_test.go | 2 +- .../wgproxy/{proxy_ebpf.go => ebpf/proxy.go} | 169 ++++++++++-------- .../proxy_test.go} | 9 +- client/internal/wgproxy/ebpf/wrapper.go | 44 +++++ client/internal/wgproxy/factory.go | 22 --- client/internal/wgproxy/factory_linux.go | 33 +++- client/internal/wgproxy/factory_nonlinux.go | 16 +- client/internal/wgproxy/proxy.go | 6 +- client/internal/wgproxy/proxy_test.go | 128 +++++++++++++ client/internal/wgproxy/proxy_userspace.go | 129 ------------- client/internal/wgproxy/usp/proxy.go | 146 +++++++++++++++ relay/client/picker_test.go | 2 +- 16 files changed, 469 insertions(+), 257 deletions(-) rename client/internal/wgproxy/{ => ebpf}/portlookup.go (96%) rename client/internal/wgproxy/{ => ebpf}/portlookup_test.go (97%) rename client/internal/wgproxy/{proxy_ebpf.go => ebpf/proxy.go} (65%) rename client/internal/wgproxy/{proxy_ebpf_test.go => ebpf/proxy_test.go} (86%) create mode 100644 client/internal/wgproxy/ebpf/wrapper.go delete mode 100644 client/internal/wgproxy/factory.go create mode 100644 client/internal/wgproxy/proxy_test.go delete mode 100644 client/internal/wgproxy/proxy_userspace.go create mode 100644 client/internal/wgproxy/usp/proxy.go diff --git a/client/internal/engine.go b/client/internal/engine.go index b0deb5a29..463507ad8 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -292,7 +292,7 @@ func (e *Engine) Start() error { e.wgInterface = wgIface userspace := e.wgInterface.IsUserspaceBind() - e.wgProxyFactory = wgproxy.NewFactory(e.ctx, userspace, e.config.WgPort) + e.wgProxyFactory = wgproxy.NewFactory(userspace, e.config.WgPort) if e.config.RosenpassEnabled { log.Infof("rosenpass is enabled") diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index 911ddd228..ea6d892b9 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -527,8 +527,8 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) { conn.log.Debugf("Relay connection is ready to use") conn.statusRelay.Set(StatusConnected) - wgProxy := conn.wgProxyFactory.GetProxy(conn.ctx) - endpoint, err := wgProxy.AddTurnConn(rci.relayedConn) + wgProxy := conn.wgProxyFactory.GetProxy() + endpoint, err := wgProxy.AddTurnConn(conn.ctx, rci.relayedConn) if err != nil { conn.log.Errorf("failed to add relayed net.Conn to local proxy: %v", err) return @@ -775,8 +775,8 @@ func (conn *Conn) getEndpointForICEConnInfo(iceConnInfo ICEConnInfo) (net.Addr, return iceConnInfo.RemoteConn.RemoteAddr(), nil, nil } conn.log.Debugf("setup ice turn connection") - wgProxy := conn.wgProxyFactory.GetProxy(conn.ctx) - ep, err := wgProxy.AddTurnConn(iceConnInfo.RemoteConn) + wgProxy := conn.wgProxyFactory.GetProxy() + ep, err := wgProxy.AddTurnConn(conn.ctx, iceConnInfo.RemoteConn) if err != nil { conn.log.Errorf("failed to add turn net.Conn to local proxy: %v", err) if errClose := wgProxy.CloseConn(); errClose != nil { diff --git a/client/internal/peer/conn_test.go b/client/internal/peer/conn_test.go index 80c25f63c..22e5409f8 100644 --- a/client/internal/peer/conn_test.go +++ b/client/internal/peer/conn_test.go @@ -44,7 +44,7 @@ func TestNewConn_interfaceFilter(t *testing.T) { } func TestConn_GetKey(t *testing.T) { - wgProxyFactory := wgproxy.NewFactory(context.Background(), false, connConf.LocalWgPort) + wgProxyFactory := wgproxy.NewFactory(false, connConf.LocalWgPort) defer func() { _ = wgProxyFactory.Free() }() @@ -59,7 +59,7 @@ func TestConn_GetKey(t *testing.T) { } func TestConn_OnRemoteOffer(t *testing.T) { - wgProxyFactory := wgproxy.NewFactory(context.Background(), false, connConf.LocalWgPort) + wgProxyFactory := wgproxy.NewFactory(false, connConf.LocalWgPort) defer func() { _ = wgProxyFactory.Free() }() @@ -96,7 +96,7 @@ func TestConn_OnRemoteOffer(t *testing.T) { } func TestConn_OnRemoteAnswer(t *testing.T) { - wgProxyFactory := wgproxy.NewFactory(context.Background(), false, connConf.LocalWgPort) + wgProxyFactory := wgproxy.NewFactory(false, connConf.LocalWgPort) defer func() { _ = wgProxyFactory.Free() }() @@ -132,7 +132,7 @@ func TestConn_OnRemoteAnswer(t *testing.T) { wg.Wait() } func TestConn_Status(t *testing.T) { - wgProxyFactory := wgproxy.NewFactory(context.Background(), false, connConf.LocalWgPort) + wgProxyFactory := wgproxy.NewFactory(false, connConf.LocalWgPort) defer func() { _ = wgProxyFactory.Free() }() diff --git a/client/internal/wgproxy/portlookup.go b/client/internal/wgproxy/ebpf/portlookup.go similarity index 96% rename from client/internal/wgproxy/portlookup.go rename to client/internal/wgproxy/ebpf/portlookup.go index 6f3d33487..0e2c20c99 100644 --- a/client/internal/wgproxy/portlookup.go +++ b/client/internal/wgproxy/ebpf/portlookup.go @@ -1,4 +1,4 @@ -package wgproxy +package ebpf import ( "fmt" diff --git a/client/internal/wgproxy/portlookup_test.go b/client/internal/wgproxy/ebpf/portlookup_test.go similarity index 97% rename from client/internal/wgproxy/portlookup_test.go rename to client/internal/wgproxy/ebpf/portlookup_test.go index 6a386f330..92f4b8eee 100644 --- a/client/internal/wgproxy/portlookup_test.go +++ b/client/internal/wgproxy/ebpf/portlookup_test.go @@ -1,4 +1,4 @@ -package wgproxy +package ebpf import ( "fmt" diff --git a/client/internal/wgproxy/proxy_ebpf.go b/client/internal/wgproxy/ebpf/proxy.go similarity index 65% rename from client/internal/wgproxy/proxy_ebpf.go rename to client/internal/wgproxy/ebpf/proxy.go index d385cc4ca..4bd4bfff6 100644 --- a/client/internal/wgproxy/proxy_ebpf.go +++ b/client/internal/wgproxy/ebpf/proxy.go @@ -1,6 +1,6 @@ //go:build linux && !android -package wgproxy +package ebpf import ( "context" @@ -13,47 +13,49 @@ import ( "github.com/google/gopacket" "github.com/google/gopacket/layers" + "github.com/hashicorp/go-multierror" "github.com/pion/transport/v3" log "github.com/sirupsen/logrus" + nberrors "github.com/netbirdio/netbird/client/errors" "github.com/netbirdio/netbird/client/internal/ebpf" ebpfMgr "github.com/netbirdio/netbird/client/internal/ebpf/manager" nbnet "github.com/netbirdio/netbird/util/net" ) +const ( + loopbackAddr = "127.0.0.1" +) + // WGEBPFProxy definition for proxy with EBPF support type WGEBPFProxy struct { - ebpfManager ebpfMgr.Manager - - ctx context.Context - cancel context.CancelFunc - - lastUsedPort uint16 localWGListenPort int + ebpfManager ebpfMgr.Manager turnConnStore map[uint16]net.Conn turnConnMutex sync.Mutex - rawConn net.PacketConn - conn transport.UDPConn + lastUsedPort uint16 + rawConn net.PacketConn + conn transport.UDPConn + + ctx context.Context + ctxCancel context.CancelFunc } // NewWGEBPFProxy create new WGEBPFProxy instance -func NewWGEBPFProxy(ctx context.Context, wgPort int) *WGEBPFProxy { +func NewWGEBPFProxy(wgPort int) *WGEBPFProxy { log.Debugf("instantiate ebpf proxy") wgProxy := &WGEBPFProxy{ localWGListenPort: wgPort, ebpfManager: ebpf.GetEbpfManagerInstance(), - lastUsedPort: 0, turnConnStore: make(map[uint16]net.Conn), } - wgProxy.ctx, wgProxy.cancel = context.WithCancel(ctx) - return wgProxy } -// listen load ebpf program and listen the proxy -func (p *WGEBPFProxy) listen() error { +// Listen load ebpf program and listen the proxy +func (p *WGEBPFProxy) Listen() error { pl := portLookup{} wgPorxyPort, err := pl.searchFreePort() if err != nil { @@ -72,9 +74,11 @@ func (p *WGEBPFProxy) listen() error { addr := net.UDPAddr{ Port: wgPorxyPort, - IP: net.ParseIP("127.0.0.1"), + IP: net.ParseIP(loopbackAddr), } + p.ctx, p.ctxCancel = context.WithCancel(context.Background()) + conn, err := nbnet.ListenUDP("udp", &addr) if err != nil { cErr := p.Free() @@ -91,108 +95,112 @@ func (p *WGEBPFProxy) listen() error { } // AddTurnConn add new turn connection for the proxy -func (p *WGEBPFProxy) AddTurnConn(turnConn net.Conn) (net.Addr, error) { +func (p *WGEBPFProxy) AddTurnConn(ctx context.Context, turnConn net.Conn) (net.Addr, error) { wgEndpointPort, err := p.storeTurnConn(turnConn) if err != nil { return nil, err } - go p.proxyToLocal(wgEndpointPort, turnConn) + go p.proxyToLocal(ctx, wgEndpointPort, turnConn) log.Infof("turn conn added to wg proxy store: %s, endpoint port: :%d", turnConn.RemoteAddr(), wgEndpointPort) wgEndpoint := &net.UDPAddr{ - IP: net.ParseIP("127.0.0.1"), + IP: net.ParseIP(loopbackAddr), Port: int(wgEndpointPort), } return wgEndpoint, nil } -// CloseConn doing nothing because this type of proxy implementation does not store the connection -func (p *WGEBPFProxy) CloseConn() error { - return nil -} - -// Free resources +// Free resources except the remoteConns will be keep open. func (p *WGEBPFProxy) Free() error { log.Debugf("free up ebpf wg proxy") - var err1, err2, err3 error - if p.conn != nil { - err1 = p.conn.Close() + if p.ctx != nil && p.ctx.Err() != nil { + //nolint + return nil } - err2 = p.ebpfManager.FreeWGProxy() - if p.rawConn != nil { - err3 = p.rawConn.Close() + p.ctxCancel() + + var result *multierror.Error + if err := p.conn.Close(); err != nil { + result = multierror.Append(result, err) } - if err1 != nil { - return err1 + if err := p.ebpfManager.FreeWGProxy(); err != nil { + result = multierror.Append(result, err) } - if err2 != nil { - return err2 + if err := p.rawConn.Close(); err != nil { + result = multierror.Append(result, err) } - - return err3 + return nberrors.FormatErrorOrNil(result) } -func (p *WGEBPFProxy) proxyToLocal(endpointPort uint16, remoteConn net.Conn) { +func (p *WGEBPFProxy) proxyToLocal(ctx context.Context, endpointPort uint16, remoteConn net.Conn) { + defer p.removeTurnConn(endpointPort) + + var ( + err error + n int + ) buf := make([]byte, 1500) - var err error - defer func() { - p.removeTurnConn(endpointPort) - }() - for { - select { - case <-p.ctx.Done(): - return - default: - var n int - n, err = remoteConn.Read(buf) - if err != nil { - if err != io.EOF { - log.Errorf("failed to read from turn conn (endpoint: :%d): %s", endpointPort, err) - } + for ctx.Err() == nil { + n, err = remoteConn.Read(buf) + if err != nil { + if ctx.Err() != nil { return } - err = p.sendPkg(buf[:n], endpointPort) - if err != nil { - log.Errorf("failed to write out turn pkg to local conn: %v", err) + if err != io.EOF { + log.Errorf("failed to read from turn conn (endpoint: :%d): %s", endpointPort, err) } + return + } + + if err := p.sendPkg(buf[:n], endpointPort); err != nil { + if ctx.Err() != nil || p.ctx.Err() != nil { + return + } + log.Errorf("failed to write out turn pkg to local conn: %v", err) } } } // proxyToRemote read messages from local WireGuard interface and forward it to remote conn +// From this go routine has only one instance. func (p *WGEBPFProxy) proxyToRemote() { buf := make([]byte, 1500) - for { - select { - case <-p.ctx.Done(): - return - default: - n, addr, err := p.conn.ReadFromUDP(buf) - if err != nil { - log.Errorf("failed to read UDP pkg from WG: %s", err) + for p.ctx.Err() == nil { + if err := p.readAndForwardPacket(buf); err != nil { + if p.ctx.Err() != nil { return } - - p.turnConnMutex.Lock() - conn, ok := p.turnConnStore[uint16(addr.Port)] - p.turnConnMutex.Unlock() - if !ok { - log.Debugf("turn conn not found by port because conn already has been closed: %d", addr.Port) - continue - } - - _, err = conn.Write(buf[:n]) - if err != nil { - log.Debugf("failed to forward local wg pkg (%d) to remote turn conn: %s", addr.Port, err) - } + log.Errorf("failed to proxy packet to remote conn: %s", err) } } } +func (p *WGEBPFProxy) readAndForwardPacket(buf []byte) error { + n, addr, err := p.conn.ReadFromUDP(buf) + if err != nil { + return fmt.Errorf("failed to read UDP packet from WG: %w", err) + } + + p.turnConnMutex.Lock() + conn, ok := p.turnConnStore[uint16(addr.Port)] + p.turnConnMutex.Unlock() + if !ok { + if p.ctx.Err() == nil { + log.Debugf("turn conn not found by port because conn already has been closed: %d", addr.Port) + } + return nil + } + + if _, err := conn.Write(buf[:n]); err != nil { + return fmt.Errorf("failed to forward local WG packet (%d) to remote turn conn: %w", addr.Port, err) + } + return nil +} + func (p *WGEBPFProxy) storeTurnConn(turnConn net.Conn) (uint16, error) { p.turnConnMutex.Lock() defer p.turnConnMutex.Unlock() @@ -206,11 +214,14 @@ func (p *WGEBPFProxy) storeTurnConn(turnConn net.Conn) (uint16, error) { } func (p *WGEBPFProxy) removeTurnConn(turnConnID uint16) { - log.Debugf("remove turn conn from store by port: %d", turnConnID) p.turnConnMutex.Lock() defer p.turnConnMutex.Unlock() - delete(p.turnConnStore, turnConnID) + _, ok := p.turnConnStore[turnConnID] + if ok { + log.Debugf("remove turn conn from store by port: %d", turnConnID) + } + delete(p.turnConnStore, turnConnID) } func (p *WGEBPFProxy) nextFreePort() (uint16, error) { diff --git a/client/internal/wgproxy/proxy_ebpf_test.go b/client/internal/wgproxy/ebpf/proxy_test.go similarity index 86% rename from client/internal/wgproxy/proxy_ebpf_test.go rename to client/internal/wgproxy/ebpf/proxy_test.go index 821e64218..b15bc686c 100644 --- a/client/internal/wgproxy/proxy_ebpf_test.go +++ b/client/internal/wgproxy/ebpf/proxy_test.go @@ -1,14 +1,13 @@ //go:build linux && !android -package wgproxy +package ebpf import ( - "context" "testing" ) func TestWGEBPFProxy_connStore(t *testing.T) { - wgProxy := NewWGEBPFProxy(context.Background(), 1) + wgProxy := NewWGEBPFProxy(1) p, _ := wgProxy.storeTurnConn(nil) if p != 1 { @@ -28,7 +27,7 @@ func TestWGEBPFProxy_connStore(t *testing.T) { } func TestWGEBPFProxy_portCalculation_overflow(t *testing.T) { - wgProxy := NewWGEBPFProxy(context.Background(), 1) + wgProxy := NewWGEBPFProxy(1) _, _ = wgProxy.storeTurnConn(nil) wgProxy.lastUsedPort = 65535 @@ -44,7 +43,7 @@ func TestWGEBPFProxy_portCalculation_overflow(t *testing.T) { } func TestWGEBPFProxy_portCalculation_maxConn(t *testing.T) { - wgProxy := NewWGEBPFProxy(context.Background(), 1) + wgProxy := NewWGEBPFProxy(1) for i := 0; i < 65535; i++ { _, _ = wgProxy.storeTurnConn(nil) diff --git a/client/internal/wgproxy/ebpf/wrapper.go b/client/internal/wgproxy/ebpf/wrapper.go new file mode 100644 index 000000000..c5639f840 --- /dev/null +++ b/client/internal/wgproxy/ebpf/wrapper.go @@ -0,0 +1,44 @@ +//go:build linux && !android + +package ebpf + +import ( + "context" + "fmt" + "net" +) + +// ProxyWrapper help to keep the remoteConn instance for net.Conn.Close function call +type ProxyWrapper struct { + WgeBPFProxy *WGEBPFProxy + + remoteConn net.Conn + cancel context.CancelFunc // with thic cancel function, we stop remoteToLocal thread +} + +func (e *ProxyWrapper) AddTurnConn(ctx context.Context, remoteConn net.Conn) (net.Addr, error) { + ctxConn, cancel := context.WithCancel(ctx) + addr, err := e.WgeBPFProxy.AddTurnConn(ctxConn, remoteConn) + + if err != nil { + cancel() + return nil, fmt.Errorf("add turn conn: %w", err) + } + e.remoteConn = remoteConn + e.cancel = cancel + return addr, err +} + +// CloseConn close the remoteConn and automatically remove the conn instance from the map +func (e *ProxyWrapper) CloseConn() error { + if e.cancel == nil { + return fmt.Errorf("proxy not started") + } + + e.cancel() + + if err := e.remoteConn.Close(); err != nil { + return fmt.Errorf("failed to close remote conn: %w", err) + } + return nil +} diff --git a/client/internal/wgproxy/factory.go b/client/internal/wgproxy/factory.go deleted file mode 100644 index f4eb150b0..000000000 --- a/client/internal/wgproxy/factory.go +++ /dev/null @@ -1,22 +0,0 @@ -package wgproxy - -import "context" - -type Factory struct { - wgPort int - ebpfProxy Proxy -} - -func (w *Factory) GetProxy(ctx context.Context) Proxy { - if w.ebpfProxy != nil { - return w.ebpfProxy - } - return NewWGUserSpaceProxy(ctx, w.wgPort) -} - -func (w *Factory) Free() error { - if w.ebpfProxy != nil { - return w.ebpfProxy.Free() - } - return nil -} diff --git a/client/internal/wgproxy/factory_linux.go b/client/internal/wgproxy/factory_linux.go index d01ae7e74..369ba99db 100644 --- a/client/internal/wgproxy/factory_linux.go +++ b/client/internal/wgproxy/factory_linux.go @@ -3,20 +3,26 @@ package wgproxy import ( - "context" - log "github.com/sirupsen/logrus" + + "github.com/netbirdio/netbird/client/internal/wgproxy/ebpf" + "github.com/netbirdio/netbird/client/internal/wgproxy/usp" ) -func NewFactory(ctx context.Context, userspace bool, wgPort int) *Factory { +type Factory struct { + wgPort int + ebpfProxy *ebpf.WGEBPFProxy +} + +func NewFactory(userspace bool, wgPort int) *Factory { f := &Factory{wgPort: wgPort} if userspace { return f } - ebpfProxy := NewWGEBPFProxy(ctx, wgPort) - err := ebpfProxy.listen() + ebpfProxy := ebpf.NewWGEBPFProxy(wgPort) + err := ebpfProxy.Listen() if err != nil { log.Warnf("failed to initialize ebpf proxy, fallback to user space proxy: %s", err) return f @@ -25,3 +31,20 @@ func NewFactory(ctx context.Context, userspace bool, wgPort int) *Factory { f.ebpfProxy = ebpfProxy return f } + +func (w *Factory) GetProxy() Proxy { + if w.ebpfProxy != nil { + p := &ebpf.ProxyWrapper{ + WgeBPFProxy: w.ebpfProxy, + } + return p + } + return usp.NewWGUserSpaceProxy(w.wgPort) +} + +func (w *Factory) Free() error { + if w.ebpfProxy == nil { + return nil + } + return w.ebpfProxy.Free() +} diff --git a/client/internal/wgproxy/factory_nonlinux.go b/client/internal/wgproxy/factory_nonlinux.go index d1640c97d..f930b09b3 100644 --- a/client/internal/wgproxy/factory_nonlinux.go +++ b/client/internal/wgproxy/factory_nonlinux.go @@ -2,8 +2,20 @@ package wgproxy -import "context" +import "github.com/netbirdio/netbird/client/internal/wgproxy/usp" -func NewFactory(ctx context.Context, _ bool, wgPort int) *Factory { +type Factory struct { + wgPort int +} + +func NewFactory(_ bool, wgPort int) *Factory { return &Factory{wgPort: wgPort} } + +func (w *Factory) GetProxy() Proxy { + return usp.NewWGUserSpaceProxy(w.wgPort) +} + +func (w *Factory) Free() error { + return nil +} diff --git a/client/internal/wgproxy/proxy.go b/client/internal/wgproxy/proxy.go index b88df73a0..96fae8dd1 100644 --- a/client/internal/wgproxy/proxy.go +++ b/client/internal/wgproxy/proxy.go @@ -1,12 +1,12 @@ package wgproxy import ( + "context" "net" ) -// Proxy is a transfer layer between the Turn connection and the WireGuard +// Proxy is a transfer layer between the relayed connection and the WireGuard type Proxy interface { - AddTurnConn(turnConn net.Conn) (net.Addr, error) + AddTurnConn(ctx context.Context, turnConn net.Conn) (net.Addr, error) CloseConn() error - Free() error } diff --git a/client/internal/wgproxy/proxy_test.go b/client/internal/wgproxy/proxy_test.go new file mode 100644 index 000000000..b09e6be55 --- /dev/null +++ b/client/internal/wgproxy/proxy_test.go @@ -0,0 +1,128 @@ +//go:build linux + +package wgproxy + +import ( + "context" + "io" + "net" + "os" + "runtime" + "testing" + "time" + + "github.com/netbirdio/netbird/client/internal/wgproxy/ebpf" + "github.com/netbirdio/netbird/client/internal/wgproxy/usp" + "github.com/netbirdio/netbird/util" +) + +func TestMain(m *testing.M) { + _ = util.InitLog("trace", "console") + code := m.Run() + os.Exit(code) +} + +type mocConn struct { + closeChan chan struct{} + closed bool +} + +func newMockConn() *mocConn { + return &mocConn{ + closeChan: make(chan struct{}), + } +} + +func (m *mocConn) Read(b []byte) (n int, err error) { + <-m.closeChan + return 0, io.EOF +} + +func (m *mocConn) Write(b []byte) (n int, err error) { + <-m.closeChan + return 0, io.EOF +} + +func (m *mocConn) Close() error { + if m.closed == true { + return nil + } + + m.closed = true + close(m.closeChan) + return nil +} + +func (m *mocConn) LocalAddr() net.Addr { + panic("implement me") +} + +func (m *mocConn) RemoteAddr() net.Addr { + return &net.UDPAddr{ + IP: net.ParseIP("172.16.254.1"), + } +} + +func (m *mocConn) SetDeadline(t time.Time) error { + panic("implement me") +} + +func (m *mocConn) SetReadDeadline(t time.Time) error { + panic("implement me") +} + +func (m *mocConn) SetWriteDeadline(t time.Time) error { + panic("implement me") +} + +func TestProxyCloseByRemoteConn(t *testing.T) { + ctx := context.Background() + + tests := []struct { + name string + proxy Proxy + }{ + { + name: "userspace proxy", + proxy: usp.NewWGUserSpaceProxy(51830), + }, + } + + if runtime.GOOS == "linux" && os.Getenv("GITHUB_ACTIONS") != "true" { + ebpfProxy := ebpf.NewWGEBPFProxy(51831) + if err := ebpfProxy.Listen(); err != nil { + t.Fatalf("failed to initialize ebpf proxy: %s", err) + } + defer func() { + if err := ebpfProxy.Free(); err != nil { + t.Errorf("failed to free ebpf proxy: %s", err) + } + }() + proxyWrapper := &ebpf.ProxyWrapper{ + WgeBPFProxy: ebpfProxy, + } + + tests = append(tests, struct { + name string + proxy Proxy + }{ + name: "ebpf proxy", + proxy: proxyWrapper, + }) + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + relayedConn := newMockConn() + _, err := tt.proxy.AddTurnConn(ctx, relayedConn) + if err != nil { + t.Errorf("error: %v", err) + } + + _ = relayedConn.Close() + if err := tt.proxy.CloseConn(); err != nil { + t.Errorf("error: %v", err) + } + }) + } +} diff --git a/client/internal/wgproxy/proxy_userspace.go b/client/internal/wgproxy/proxy_userspace.go deleted file mode 100644 index 8fc640b6a..000000000 --- a/client/internal/wgproxy/proxy_userspace.go +++ /dev/null @@ -1,129 +0,0 @@ -package wgproxy - -import ( - "context" - "fmt" - "io" - "net" - - log "github.com/sirupsen/logrus" -) - -// WGUserSpaceProxy proxies -type WGUserSpaceProxy struct { - localWGListenPort int - ctx context.Context - cancel context.CancelFunc - - remoteConn net.Conn - localConn net.Conn -} - -// NewWGUserSpaceProxy instantiate a user space WireGuard proxy -func NewWGUserSpaceProxy(ctx context.Context, wgPort int) *WGUserSpaceProxy { - log.Debugf("Initializing new user space proxy with port %d", wgPort) - p := &WGUserSpaceProxy{ - localWGListenPort: wgPort, - } - p.ctx, p.cancel = context.WithCancel(ctx) - return p -} - -// AddTurnConn start the proxy with the given remote conn -func (p *WGUserSpaceProxy) AddTurnConn(remoteConn net.Conn) (net.Addr, error) { - p.remoteConn = remoteConn - - var err error - dialer := &net.Dialer{} - p.localConn, err = dialer.DialContext(p.ctx, "udp", fmt.Sprintf(":%d", p.localWGListenPort)) - if err != nil { - log.Errorf("failed dialing to local Wireguard port %s", err) - return nil, err - } - - go p.proxyToRemote() - go p.proxyToLocal() - - return p.localConn.LocalAddr(), err -} - -// CloseConn close the localConn -func (p *WGUserSpaceProxy) CloseConn() error { - p.cancel() - if p.localConn == nil { - return nil - } - - if p.remoteConn == nil { - return nil - } - - if err := p.remoteConn.Close(); err != nil { - log.Warnf("failed to close remote conn: %s", err) - } - return p.localConn.Close() -} - -// Free doing nothing because this implementation of proxy does not have global state -func (p *WGUserSpaceProxy) Free() error { - return nil -} - -// proxyToRemote proxies everything from Wireguard to the RemoteKey peer -// blocks -func (p *WGUserSpaceProxy) proxyToRemote() { - defer log.Infof("exit from proxyToRemote: %s", p.localConn.LocalAddr()) - - buf := make([]byte, 1500) - for { - select { - case <-p.ctx.Done(): - return - default: - n, err := p.localConn.Read(buf) - if err != nil { - log.Debugf("failed to read from wg interface conn: %s", err) - continue - } - - _, err = p.remoteConn.Write(buf[:n]) - if err != nil { - if err == io.EOF { - p.cancel() - } else { - log.Debugf("failed to write to remote conn: %s", err) - } - continue - } - } - } -} - -// proxyToLocal proxies everything from the RemoteKey peer to local Wireguard -// blocks -func (p *WGUserSpaceProxy) proxyToLocal() { - defer p.cancel() - defer log.Infof("exit from proxyToLocal: %s", p.localConn.LocalAddr()) - buf := make([]byte, 1500) - for { - select { - case <-p.ctx.Done(): - return - default: - n, err := p.remoteConn.Read(buf) - if err != nil { - if err == io.EOF { - return - } - log.Errorf("failed to read from remote conn: %s", err) - continue - } - - _, err = p.localConn.Write(buf[:n]) - if err != nil { - log.Debugf("failed to write to wg interface conn: %s", err) - continue - } - } - } -} diff --git a/client/internal/wgproxy/usp/proxy.go b/client/internal/wgproxy/usp/proxy.go new file mode 100644 index 000000000..83a8725d8 --- /dev/null +++ b/client/internal/wgproxy/usp/proxy.go @@ -0,0 +1,146 @@ +package usp + +import ( + "context" + "fmt" + "net" + "sync" + + "github.com/hashicorp/go-multierror" + log "github.com/sirupsen/logrus" + + "github.com/netbirdio/netbird/client/errors" +) + +// WGUserSpaceProxy proxies +type WGUserSpaceProxy struct { + localWGListenPort int + ctx context.Context + cancel context.CancelFunc + + remoteConn net.Conn + localConn net.Conn + closeMu sync.Mutex + closed bool +} + +// NewWGUserSpaceProxy instantiate a user space WireGuard proxy. This is not a thread safe implementation +func NewWGUserSpaceProxy(wgPort int) *WGUserSpaceProxy { + log.Debugf("Initializing new user space proxy with port %d", wgPort) + p := &WGUserSpaceProxy{ + localWGListenPort: wgPort, + } + return p +} + +// AddTurnConn start the proxy with the given remote conn +func (p *WGUserSpaceProxy) AddTurnConn(ctx context.Context, remoteConn net.Conn) (net.Addr, error) { + p.ctx, p.cancel = context.WithCancel(ctx) + + p.remoteConn = remoteConn + + var err error + dialer := net.Dialer{} + p.localConn, err = dialer.DialContext(p.ctx, "udp", fmt.Sprintf(":%d", p.localWGListenPort)) + if err != nil { + log.Errorf("failed dialing to local Wireguard port %s", err) + return nil, err + } + + go p.proxyToRemote() + go p.proxyToLocal() + + return p.localConn.LocalAddr(), err +} + +// CloseConn close the localConn +func (p *WGUserSpaceProxy) CloseConn() error { + if p.cancel == nil { + return fmt.Errorf("proxy not started") + } + return p.close() +} + +func (p *WGUserSpaceProxy) close() error { + p.closeMu.Lock() + defer p.closeMu.Unlock() + + // prevent double close + if p.closed { + return nil + } + p.closed = true + + p.cancel() + + var result *multierror.Error + if err := p.remoteConn.Close(); err != nil { + result = multierror.Append(result, fmt.Errorf("remote conn: %s", err)) + } + + if err := p.localConn.Close(); err != nil { + result = multierror.Append(result, fmt.Errorf("local conn: %s", err)) + } + return errors.FormatErrorOrNil(result) +} + +// proxyToRemote proxies from Wireguard to the RemoteKey +func (p *WGUserSpaceProxy) proxyToRemote() { + defer func() { + if err := p.close(); err != nil { + log.Warnf("error in proxy to remote loop: %s", err) + } + }() + + buf := make([]byte, 1500) + for p.ctx.Err() == nil { + n, err := p.localConn.Read(buf) + if err != nil { + if p.ctx.Err() != nil { + return + } + log.Debugf("failed to read from wg interface conn: %s", err) + return + } + + _, err = p.remoteConn.Write(buf[:n]) + if err != nil { + if p.ctx.Err() != nil { + return + } + + log.Debugf("failed to write to remote conn: %s", err) + return + } + } +} + +// proxyToLocal proxies from the Remote peer to local WireGuard +func (p *WGUserSpaceProxy) proxyToLocal() { + defer func() { + if err := p.close(); err != nil { + log.Warnf("error in proxy to local loop: %s", err) + } + }() + + buf := make([]byte, 1500) + for p.ctx.Err() == nil { + n, err := p.remoteConn.Read(buf) + if err != nil { + if p.ctx.Err() != nil { + return + } + log.Errorf("failed to read from remote conn: %s, %s", p.remoteConn.RemoteAddr(), err) + return + } + + _, err = p.localConn.Write(buf[:n]) + if err != nil { + if p.ctx.Err() != nil { + return + } + log.Debugf("failed to write to wg interface conn: %s", err) + continue + } + } +} diff --git a/relay/client/picker_test.go b/relay/client/picker_test.go index f5649d700..eb14581e0 100644 --- a/relay/client/picker_test.go +++ b/relay/client/picker_test.go @@ -13,7 +13,7 @@ func TestServerPicker_UnavailableServers(t *testing.T) { PeerID: "test", } - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() go func() {