From 06a17f0eee9b3cffc261e65616624eb6124733a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Sun, 16 Feb 2025 22:16:42 +0100 Subject: [PATCH] Implement redirect to in eBPF proxy --- client/iface/wgproxy/ebpf/proxy.go | 16 ++--- client/iface/wgproxy/ebpf/wrapper.go | 80 ++++++++++++++++-------- client/iface/wgproxy/factory_kernel.go | 4 +- client/iface/wgproxy/proxy_linux_test.go | 6 +- client/iface/wgproxy/proxy_test.go | 4 +- client/internal/peer/conn.go | 1 + 6 files changed, 69 insertions(+), 42 deletions(-) diff --git a/client/iface/wgproxy/ebpf/proxy.go b/client/iface/wgproxy/ebpf/proxy.go index e21fc35d4..6462fbadb 100644 --- a/client/iface/wgproxy/ebpf/proxy.go +++ b/client/iface/wgproxy/ebpf/proxy.go @@ -26,6 +26,10 @@ const ( loopbackAddr = "127.0.0.1" ) +var ( + localHostNetIP = net.ParseIP("127.0.0.1") +) + // WGEBPFProxy definition for proxy with EBPF support type WGEBPFProxy struct { localWGListenPort int @@ -249,19 +253,17 @@ func (p *WGEBPFProxy) prepareSenderRawSocket() (net.PacketConn, error) { return packetConn, nil } -func (p *WGEBPFProxy) sendPkg(data []byte, port int) error { - localhost := net.ParseIP("127.0.0.1") - +func (p *WGEBPFProxy) sendPkg(data []byte, endpointAddr *net.UDPAddr) error { payload := gopacket.Payload(data) ipH := &layers.IPv4{ - DstIP: localhost, - SrcIP: localhost, + DstIP: localHostNetIP, + SrcIP: endpointAddr.IP, Version: 4, TTL: 64, Protocol: layers.IPProtocolUDP, } udpH := &layers.UDP{ - SrcPort: layers.UDPPort(port), + SrcPort: layers.UDPPort(endpointAddr.Port), DstPort: layers.UDPPort(p.localWGListenPort), } @@ -276,7 +278,7 @@ func (p *WGEBPFProxy) sendPkg(data []byte, port int) error { if err != nil { return fmt.Errorf("serialize layers: %w", err) } - if _, err = p.rawConn.WriteTo(layerBuffer.Bytes(), &net.IPAddr{IP: localhost}); err != nil { + if _, err = p.rawConn.WriteTo(layerBuffer.Bytes(), &net.IPAddr{IP: localHostNetIP}); err != nil { return fmt.Errorf("write to raw conn: %w", err) } return nil diff --git a/client/iface/wgproxy/ebpf/wrapper.go b/client/iface/wgproxy/ebpf/wrapper.go index 412afdee1..c30cc03f6 100644 --- a/client/iface/wgproxy/ebpf/wrapper.go +++ b/client/iface/wgproxy/ebpf/wrapper.go @@ -15,32 +15,39 @@ import ( // ProxyWrapper help to keep the remoteConn instance for net.Conn.Close function call type ProxyWrapper struct { - WgeBPFProxy *WGEBPFProxy + wgeBPFProxy *WGEBPFProxy remoteConn net.Conn ctx context.Context cancel context.CancelFunc - wgEndpointAddr *net.UDPAddr + wgRelayedEndpointAddr *net.UDPAddr + wgEndpointCurrentUsedAddr *net.UDPAddr - pausedMu sync.Mutex - paused bool - isStarted bool + paused bool + pausedCond *sync.Cond + isStarted bool } +func NewProxyWrapper(proxy *WGEBPFProxy) *ProxyWrapper { + return &ProxyWrapper{ + wgeBPFProxy: proxy, + pausedCond: sync.NewCond(&sync.Mutex{}), + } +} func (p *ProxyWrapper) AddTurnConn(ctx context.Context, endpoint *net.UDPAddr, remoteConn net.Conn) error { - addr, err := p.WgeBPFProxy.AddTurnConn(remoteConn) + addr, err := p.wgeBPFProxy.AddTurnConn(remoteConn) if err != nil { return fmt.Errorf("add turn conn: %w", err) } p.remoteConn = remoteConn p.ctx, p.cancel = context.WithCancel(ctx) - p.wgEndpointAddr = addr + p.wgRelayedEndpointAddr = addr return err } func (p *ProxyWrapper) EndpointAddr() *net.UDPAddr { - return p.wgEndpointAddr + return p.wgRelayedEndpointAddr } func (p *ProxyWrapper) Work() { @@ -48,14 +55,19 @@ func (p *ProxyWrapper) Work() { return } - p.pausedMu.Lock() + p.pausedCond.L.Lock() p.paused = false - p.pausedMu.Unlock() + + p.wgEndpointCurrentUsedAddr = p.wgRelayedEndpointAddr if !p.isStarted { p.isStarted = true go p.proxyToLocal(p.ctx) } + + p.pausedCond.L.Unlock() + // todo: review to should be inside the lock scope + p.pausedCond.Signal() } func (p *ProxyWrapper) Pause() { @@ -64,31 +76,42 @@ func (p *ProxyWrapper) Pause() { } log.Tracef("pause proxy reading from: %s", p.remoteConn.RemoteAddr()) - p.pausedMu.Lock() + p.pausedCond.L.Lock() p.paused = true - p.pausedMu.Unlock() + p.pausedCond.L.Unlock() } func (p *ProxyWrapper) RedirectTo(endpoint *net.UDPAddr) { - // todo implement me + p.pausedCond.L.Lock() + p.paused = false + + p.wgEndpointCurrentUsedAddr = endpoint + + p.pausedCond.L.Unlock() + p.pausedCond.Signal() } // CloseConn close the remoteConn and automatically remove the conn instance from the map -func (e *ProxyWrapper) CloseConn() error { - if e.cancel == nil { +func (p *ProxyWrapper) CloseConn() error { + if p.cancel == nil { return fmt.Errorf("proxy not started") } - e.cancel() + p.cancel() - if err := e.remoteConn.Close(); err != nil && !errors.Is(err, net.ErrClosed) { + p.pausedCond.L.Lock() + p.paused = false + p.pausedCond.L.Unlock() + p.pausedCond.Signal() + + if err := p.remoteConn.Close(); err != nil && !errors.Is(err, net.ErrClosed) { return fmt.Errorf("failed to close remote conn: %w", err) } return nil } func (p *ProxyWrapper) proxyToLocal(ctx context.Context) { - defer p.WgeBPFProxy.removeTurnConn(uint16(p.wgEndpointAddr.Port)) + defer p.wgeBPFProxy.removeTurnConn(uint16(p.wgRelayedEndpointAddr.Port)) buf := make([]byte, 1500) for { @@ -97,14 +120,21 @@ func (p *ProxyWrapper) proxyToLocal(ctx context.Context) { return } - p.pausedMu.Lock() - if p.paused { - p.pausedMu.Unlock() - continue + for { + p.pausedCond.L.Lock() + if p.paused { + p.pausedCond.Wait() + if !p.paused { + break + } + p.pausedCond.L.Unlock() + continue + } + break } - err = p.WgeBPFProxy.sendPkg(buf[:n], p.wgEndpointAddr.Port) - p.pausedMu.Unlock() + err = p.wgeBPFProxy.sendPkg(buf[:n], p.wgEndpointCurrentUsedAddr) + p.pausedCond.L.Unlock() if err != nil { if ctx.Err() != nil { @@ -122,7 +152,7 @@ func (p *ProxyWrapper) readFromRemote(ctx context.Context, buf []byte) (int, err return 0, ctx.Err() } if !errors.Is(err, io.EOF) { - log.Errorf("failed to read from turn conn (endpoint: :%d): %s", p.wgEndpointAddr.Port, err) + log.Errorf("failed to read from turn conn (endpoint: :%d): %s", p.wgRelayedEndpointAddr.Port, err) } return 0, err } diff --git a/client/iface/wgproxy/factory_kernel.go b/client/iface/wgproxy/factory_kernel.go index 3ad7dc59d..c5e0b290d 100644 --- a/client/iface/wgproxy/factory_kernel.go +++ b/client/iface/wgproxy/factory_kernel.go @@ -36,9 +36,7 @@ func (w *KernelFactory) GetProxy() Proxy { return udpProxy.NewWGUDPProxy(w.wgPort) } - return &ebpf.ProxyWrapper{ - WgeBPFProxy: w.ebpfProxy, - } + return ebpf.NewProxyWrapper(w.ebpfProxy) } func (w *KernelFactory) Free() error { diff --git a/client/iface/wgproxy/proxy_linux_test.go b/client/iface/wgproxy/proxy_linux_test.go index 298c98cc0..4d83b39bd 100644 --- a/client/iface/wgproxy/proxy_linux_test.go +++ b/client/iface/wgproxy/proxy_linux_test.go @@ -32,10 +32,8 @@ func TestProxyCloseByRemoteConnEBPF(t *testing.T) { proxy Proxy }{ { - name: "ebpf proxy", - proxy: &ebpf.ProxyWrapper{ - WgeBPFProxy: ebpfProxy, - }, + name: "ebpf proxy", + proxy: ebpf.NewProxyWrapper(ebpfProxy), }, } diff --git a/client/iface/wgproxy/proxy_test.go b/client/iface/wgproxy/proxy_test.go index 64b617621..2165b8aba 100644 --- a/client/iface/wgproxy/proxy_test.go +++ b/client/iface/wgproxy/proxy_test.go @@ -98,9 +98,7 @@ func TestProxyCloseByRemoteConn(t *testing.T) { t.Errorf("failed to free ebpf proxy: %s", err) } }() - proxyWrapper := &ebpf.ProxyWrapper{ - WgeBPFProxy: ebpfProxy, - } + proxyWrapper := ebpf.NewProxyWrapper(ebpfProxy) tests = append(tests, struct { name string diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index 927000647..b46dd33cb 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -387,6 +387,7 @@ func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEC wgConfigWorkaround() if conn.wgProxyRelay != nil { + conn.log.Debugf("redirect packages from relayed conn to WireGuard") conn.wgProxyRelay.RedirectTo(ep) }