diff --git a/client/internal/engine.go b/client/internal/engine.go index ad9b79d7a..038f39e5c 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -20,8 +20,8 @@ import ( "github.com/netbirdio/netbird/client/internal/acl" "github.com/netbirdio/netbird/client/internal/dns" "github.com/netbirdio/netbird/client/internal/peer" - "github.com/netbirdio/netbird/client/internal/proxy" "github.com/netbirdio/netbird/client/internal/routemanager" + "github.com/netbirdio/netbird/client/internal/wgproxy" nbssh "github.com/netbirdio/netbird/client/ssh" nbdns "github.com/netbirdio/netbird/dns" "github.com/netbirdio/netbird/iface" @@ -101,7 +101,8 @@ type Engine struct { ctx context.Context - wgInterface *iface.WGIface + wgInterface *iface.WGIface + wgProxyFactory *wgproxy.Factory udpMux *bind.UniversalUDPMuxDefault udpMuxConn io.Closer @@ -132,6 +133,7 @@ func NewEngine( signalClient signal.Client, mgmClient mgm.Client, config *EngineConfig, mobileDep MobileDependency, statusRecorder *peer.Status, ) *Engine { + return &Engine{ ctx: ctx, cancel: cancel, @@ -146,6 +148,7 @@ func NewEngine( networkSerial: 0, sshServerFunc: nbssh.DefaultSSHServer, statusRecorder: statusRecorder, + wgProxyFactory: wgproxy.NewFactory(config.WgPort), } } @@ -282,7 +285,7 @@ func (e *Engine) modifyPeers(peersUpdate []*mgmProto.RemotePeerConfig) error { for _, p := range peersUpdate { peerPubKey := p.GetWgPubKey() if peerConn, ok := e.peerConns[peerPubKey]; ok { - if peerConn.GetConf().ProxyConfig.AllowedIps != strings.Join(p.AllowedIps, ",") { + if peerConn.WgConfig().AllowedIps != strings.Join(p.AllowedIps, ",") { modified = append(modified, p) continue } @@ -795,9 +798,7 @@ func (e *Engine) connWorker(conn *peer.Conn, peerKey string) { // we might have received new STUN and TURN servers meanwhile, so update them e.syncMsgMux.Lock() - conf := conn.GetConf() - conf.StunTurn = append(e.STUNs, e.TURNs...) - conn.UpdateConf(conf) + conn.UpdateStunTurn(append(e.STUNs, e.TURNs...)) e.syncMsgMux.Unlock() err := conn.Open() @@ -826,9 +827,9 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs string) (*peer.Conn, e stunTurn = append(stunTurn, e.STUNs...) stunTurn = append(stunTurn, e.TURNs...) - proxyConfig := proxy.Config{ + wgConfig := peer.WgConfig{ RemoteKey: pubKey, - WgListenAddr: fmt.Sprintf("127.0.0.1:%d", e.config.WgPort), + WgListenPort: e.config.WgPort, WgInterface: e.wgInterface, AllowedIps: allowedIPs, PreSharedKey: e.config.PreSharedKey, @@ -845,13 +846,13 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs string) (*peer.Conn, e Timeout: timeout, UDPMux: e.udpMux.UDPMuxDefault, UDPMuxSrflx: e.udpMux, - ProxyConfig: proxyConfig, + WgConfig: wgConfig, LocalWgPort: e.config.WgPort, NATExternalIPs: e.parseNATExternalIPMappings(), UserspaceBind: e.wgInterface.IsUserspaceBind(), } - peerConn, err := peer.NewConn(config, e.statusRecorder, e.mobileDep.TunAdapter, e.mobileDep.IFaceDiscover) + peerConn, err := peer.NewConn(config, e.statusRecorder, e.wgProxyFactory, e.mobileDep.TunAdapter, e.mobileDep.IFaceDiscover) if err != nil { return nil, err } @@ -1008,6 +1009,10 @@ func (e *Engine) parseNATExternalIPMappings() []string { } func (e *Engine) close() { + if err := e.wgProxyFactory.Free(); err != nil { + log.Errorf("failed closing ebpf proxy: %s", err) + } + log.Debugf("removing Netbird interface %s", e.config.WgIfaceName) if e.wgInterface != nil { if err := e.wgInterface.Close(); err != nil { diff --git a/client/internal/engine_test.go b/client/internal/engine_test.go index 4d67968f6..60c07c0c9 100644 --- a/client/internal/engine_test.go +++ b/client/internal/engine_test.go @@ -367,9 +367,9 @@ func TestEngine_UpdateNetworkMap(t *testing.T) { t.Errorf("expecting Engine.peerConns to contain peer %s", p) } expectedAllowedIPs := strings.Join(p.AllowedIps, ",") - if conn.GetConf().ProxyConfig.AllowedIps != expectedAllowedIPs { + if conn.WgConfig().AllowedIps != expectedAllowedIPs { t.Errorf("expecting peer %s to have AllowedIPs= %s, got %s", p.GetWgPubKey(), - expectedAllowedIPs, conn.GetConf().ProxyConfig.AllowedIps) + expectedAllowedIPs, conn.WgConfig().AllowedIps) } } }) diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index 9a3ef106b..9247ed3c5 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -10,9 +10,10 @@ import ( "github.com/pion/ice/v2" log "github.com/sirupsen/logrus" + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" - "github.com/netbirdio/netbird/client/internal/proxy" "github.com/netbirdio/netbird/client/internal/stdnet" + "github.com/netbirdio/netbird/client/internal/wgproxy" "github.com/netbirdio/netbird/iface" "github.com/netbirdio/netbird/iface/bind" signal "github.com/netbirdio/netbird/signal/client" @@ -23,8 +24,18 @@ import ( const ( iceKeepAliveDefault = 4 * time.Second iceDisconnectedTimeoutDefault = 6 * time.Second + + defaultWgKeepAlive = 25 * time.Second ) +type WgConfig struct { + WgListenPort int + RemoteKey string + WgInterface *iface.WGIface + AllowedIps string + PreSharedKey *wgtypes.Key +} + // ConnConfig is a peer Connection configuration type ConnConfig struct { @@ -43,7 +54,7 @@ type ConnConfig struct { Timeout time.Duration - ProxyConfig proxy.Config + WgConfig WgConfig UDPMux ice.UDPMux UDPMuxSrflx ice.UniversalUDPMux @@ -98,7 +109,9 @@ type Conn struct { statusRecorder *Status - proxy proxy.Proxy + wgProxyFactory *wgproxy.Factory + wgProxy wgproxy.Proxy + remoteModeCh chan ModeMessage meta meta @@ -122,14 +135,19 @@ func (conn *Conn) GetConf() ConnConfig { return conn.config } -// UpdateConf updates the connection config -func (conn *Conn) UpdateConf(conf ConnConfig) { - conn.config = conf +// WgConfig returns the WireGuard config +func (conn *Conn) WgConfig() WgConfig { + return conn.config.WgConfig +} + +// UpdateStunTurn update the turn and stun addresses +func (conn *Conn) UpdateStunTurn(turnStun []*ice.URL) { + conn.config.StunTurn = turnStun } // NewConn creates a new not opened Conn to the remote peer. // To establish a connection run Conn.Open -func NewConn(config ConnConfig, statusRecorder *Status, adapter iface.TunAdapter, iFaceDiscover stdnet.ExternalIFaceDiscover) (*Conn, error) { +func NewConn(config ConnConfig, statusRecorder *Status, wgProxyFactory *wgproxy.Factory, adapter iface.TunAdapter, iFaceDiscover stdnet.ExternalIFaceDiscover) (*Conn, error) { return &Conn{ config: config, mu: sync.Mutex{}, @@ -139,6 +157,7 @@ func NewConn(config ConnConfig, statusRecorder *Status, adapter iface.TunAdapter remoteAnswerCh: make(chan OfferAnswer), statusRecorder: statusRecorder, remoteModeCh: make(chan ModeMessage, 1), + wgProxyFactory: wgProxyFactory, adapter: adapter, iFaceDiscover: iFaceDiscover, }, nil @@ -215,12 +234,12 @@ func (conn *Conn) candidateTypes() []ice.CandidateType { func (conn *Conn) Open() error { log.Debugf("trying to connect to peer %s", conn.config.Key) - peerState := State{PubKey: conn.config.Key} - - peerState.IP = strings.Split(conn.config.ProxyConfig.AllowedIps, "/")[0] - peerState.ConnStatusUpdate = time.Now() - peerState.ConnStatus = conn.status - + peerState := State{ + PubKey: conn.config.Key, + IP: strings.Split(conn.config.WgConfig.AllowedIps, "/")[0], + ConnStatusUpdate: time.Now(), + ConnStatus: conn.status, + } err := conn.statusRecorder.UpdatePeerState(peerState) if err != nil { log.Warnf("erro while updating the state of peer %s,err: %v", conn.config.Key, err) @@ -275,10 +294,11 @@ func (conn *Conn) Open() error { defer conn.notifyDisconnected() conn.mu.Unlock() - peerState = State{PubKey: conn.config.Key} - - peerState.ConnStatus = conn.status - peerState.ConnStatusUpdate = time.Now() + peerState = State{ + PubKey: conn.config.Key, + ConnStatus: conn.status, + ConnStatusUpdate: time.Now(), + } err = conn.statusRecorder.UpdatePeerState(peerState) if err != nil { log.Warnf("erro while updating the state of peer %s,err: %v", conn.config.Key, err) @@ -309,19 +329,12 @@ func (conn *Conn) Open() error { remoteWgPort = remoteOfferAnswer.WgListenPort } // the ice connection has been established successfully so we are ready to start the proxy - err = conn.startProxy(remoteConn, remoteWgPort) + remoteAddr, err := conn.configureConnection(remoteConn, remoteWgPort) if err != nil { return err } - if conn.proxy.Type() == proxy.TypeDirectNoProxy { - host, _, _ := net.SplitHostPort(remoteConn.LocalAddr().String()) - rhost, _, _ := net.SplitHostPort(remoteConn.RemoteAddr().String()) - // direct Wireguard connection - log.Infof("directly connected to peer %s [laddr <-> raddr] [%s:%d <-> %s:%d]", conn.config.Key, host, conn.config.LocalWgPort, rhost, remoteWgPort) - } else { - log.Infof("connected to peer %s [laddr <-> raddr] [%s <-> %s]", conn.config.Key, remoteConn.LocalAddr().String(), remoteConn.RemoteAddr().String()) - } + log.Infof("connected to peer %s, endpoint address: %s", conn.config.Key, remoteAddr.String()) // wait until connection disconnected or has been closed externally (upper layer, e.g. engine) select { @@ -338,54 +351,60 @@ func isRelayCandidate(candidate ice.Candidate) bool { return candidate.Type() == ice.CandidateTypeRelay } -// startProxy starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected -func (conn *Conn) startProxy(remoteConn net.Conn, remoteWgPort int) error { +// configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected +func (conn *Conn) configureConnection(remoteConn net.Conn, remoteWgPort int) (net.Addr, error) { conn.mu.Lock() defer conn.mu.Unlock() - var pair *ice.CandidatePair pair, err := conn.agent.GetSelectedCandidatePair() if err != nil { - return err + return nil, err } - peerState := State{PubKey: conn.config.Key} - p := conn.getProxy(pair, remoteWgPort) - conn.proxy = p - err = p.Start(remoteConn) + var endpoint net.Addr + if isRelayCandidate(pair.Local) { + log.Debugf("setup relay connection") + conn.wgProxy = conn.wgProxyFactory.GetProxy() + endpoint, err = conn.wgProxy.AddTurnConn(remoteConn) + if err != nil { + return nil, err + } + } else { + // To support old version's with direct mode we attempt to punch an additional role with the remote wireguard port + go conn.punchRemoteWGPort(pair, remoteWgPort) + endpoint = remoteConn.RemoteAddr() + } + + endpointUdpAddr, _ := net.ResolveUDPAddr(endpoint.Network(), endpoint.String()) + + err = conn.config.WgConfig.WgInterface.UpdatePeer(conn.config.WgConfig.RemoteKey, conn.config.WgConfig.AllowedIps, defaultWgKeepAlive, endpointUdpAddr, conn.config.WgConfig.PreSharedKey) if err != nil { - return err + if conn.wgProxy != nil { + _ = conn.wgProxy.CloseConn() + } + return nil, err } conn.status = StatusConnected - peerState.ConnStatus = conn.status - peerState.ConnStatusUpdate = time.Now() - peerState.LocalIceCandidateType = pair.Local.Type().String() - peerState.RemoteIceCandidateType = pair.Remote.Type().String() + peerState := State{ + PubKey: conn.config.Key, + ConnStatus: conn.status, + ConnStatusUpdate: time.Now(), + LocalIceCandidateType: pair.Local.Type().String(), + RemoteIceCandidateType: pair.Remote.Type().String(), + Direct: !isRelayCandidate(pair.Local), + } if pair.Local.Type() == ice.CandidateTypeRelay || pair.Remote.Type() == ice.CandidateTypeRelay { peerState.Relayed = true } - peerState.Direct = p.Type() == proxy.TypeDirectNoProxy || p.Type() == proxy.TypeNoProxy err = conn.statusRecorder.UpdatePeerState(peerState) if err != nil { log.Warnf("unable to save peer's state, got error: %v", err) } - return nil -} - -// todo rename this method and the proxy package to something more appropriate -func (conn *Conn) getProxy(pair *ice.CandidatePair, remoteWgPort int) proxy.Proxy { - if isRelayCandidate(pair.Local) { - return proxy.NewWireGuardProxy(conn.config.ProxyConfig) - } - - // To support old version's with direct mode we attempt to punch an additional role with the remote wireguard port - go conn.punchRemoteWGPort(pair, remoteWgPort) - - return proxy.NewNoProxy(conn.config.ProxyConfig) + return endpoint, nil } func (conn *Conn) punchRemoteWGPort(pair *ice.CandidatePair, remoteWgPort int) { @@ -414,22 +433,22 @@ func (conn *Conn) cleanup() error { conn.mu.Lock() defer conn.mu.Unlock() + var err1, err2, err3 error if conn.agent != nil { - err := conn.agent.Close() - if err != nil { - return err + err1 = conn.agent.Close() + if err1 == nil { + conn.agent = nil } - conn.agent = nil } - if conn.proxy != nil { - err := conn.proxy.Close() - if err != nil { - return err - } - conn.proxy = nil + if conn.wgProxy != nil { + err2 = conn.wgProxy.CloseConn() + conn.wgProxy = nil } + // todo: is it problem if we try to remove a peer what is never existed? + err3 = conn.config.WgConfig.WgInterface.RemovePeer(conn.config.WgConfig.RemoteKey) + if conn.notifyDisconnected != nil { conn.notifyDisconnected() conn.notifyDisconnected = nil @@ -437,10 +456,11 @@ func (conn *Conn) cleanup() error { conn.status = StatusDisconnected - peerState := State{PubKey: conn.config.Key} - peerState.ConnStatus = conn.status - peerState.ConnStatusUpdate = time.Now() - + peerState := State{ + PubKey: conn.config.Key, + ConnStatus: conn.status, + ConnStatusUpdate: time.Now(), + } err := conn.statusRecorder.UpdatePeerState(peerState) if err != nil { // pretty common error because by that time Engine can already remove the peer and status won't be available. @@ -449,8 +469,13 @@ func (conn *Conn) cleanup() error { } log.Debugf("cleaned up connection to peer %s", conn.config.Key) - - return nil + if err1 != nil { + return err1 + } + if err2 != nil { + return err2 + } + return err3 } // SetSignalOffer sets a handler function to be triggered by Conn when a new connection offer has to be signalled to the remote peer diff --git a/client/internal/peer/conn_test.go b/client/internal/peer/conn_test.go index dedc381ac..ac2fa5c00 100644 --- a/client/internal/peer/conn_test.go +++ b/client/internal/peer/conn_test.go @@ -5,12 +5,11 @@ import ( "testing" "time" - "github.com/netbirdio/netbird/client/internal/stdnet" - "github.com/magiconair/properties/assert" "github.com/pion/ice/v2" - "github.com/netbirdio/netbird/client/internal/proxy" + "github.com/netbirdio/netbird/client/internal/stdnet" + "github.com/netbirdio/netbird/client/internal/wgproxy" "github.com/netbirdio/netbird/iface" ) @@ -20,7 +19,6 @@ var connConf = ConnConfig{ StunTurn: []*ice.URL{}, InterfaceBlackList: nil, Timeout: time.Second, - ProxyConfig: proxy.Config{}, LocalWgPort: 51820, } @@ -37,7 +35,11 @@ func TestNewConn_interfaceFilter(t *testing.T) { } func TestConn_GetKey(t *testing.T) { - conn, err := NewConn(connConf, nil, nil, nil) + wgProxyFactory := wgproxy.NewFactory(connConf.LocalWgPort) + defer func() { + _ = wgProxyFactory.Free() + }() + conn, err := NewConn(connConf, nil, wgProxyFactory, nil, nil) if err != nil { return } @@ -48,8 +50,11 @@ func TestConn_GetKey(t *testing.T) { } func TestConn_OnRemoteOffer(t *testing.T) { - - conn, err := NewConn(connConf, NewRecorder("https://mgm"), nil, nil) + wgProxyFactory := wgproxy.NewFactory(connConf.LocalWgPort) + defer func() { + _ = wgProxyFactory.Free() + }() + conn, err := NewConn(connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil) if err != nil { return } @@ -82,8 +87,11 @@ func TestConn_OnRemoteOffer(t *testing.T) { } func TestConn_OnRemoteAnswer(t *testing.T) { - - conn, err := NewConn(connConf, NewRecorder("https://mgm"), nil, nil) + wgProxyFactory := wgproxy.NewFactory(connConf.LocalWgPort) + defer func() { + _ = wgProxyFactory.Free() + }() + conn, err := NewConn(connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil) if err != nil { return } @@ -115,8 +123,11 @@ func TestConn_OnRemoteAnswer(t *testing.T) { wg.Wait() } func TestConn_Status(t *testing.T) { - - conn, err := NewConn(connConf, NewRecorder("https://mgm"), nil, nil) + wgProxyFactory := wgproxy.NewFactory(connConf.LocalWgPort) + defer func() { + _ = wgProxyFactory.Free() + }() + conn, err := NewConn(connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil) if err != nil { return } @@ -142,8 +153,11 @@ func TestConn_Status(t *testing.T) { } func TestConn_Close(t *testing.T) { - - conn, err := NewConn(connConf, NewRecorder("https://mgm"), nil, nil) + wgProxyFactory := wgproxy.NewFactory(connConf.LocalWgPort) + defer func() { + _ = wgProxyFactory.Free() + }() + conn, err := NewConn(connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil) if err != nil { return } diff --git a/client/internal/proxy/dummy.go b/client/internal/proxy/dummy.go deleted file mode 100644 index ebd7cd68f..000000000 --- a/client/internal/proxy/dummy.go +++ /dev/null @@ -1,72 +0,0 @@ -package proxy - -import ( - "context" - log "github.com/sirupsen/logrus" - "net" - "time" -) - -// DummyProxy just sends pings to the RemoteKey peer and reads responses -type DummyProxy struct { - conn net.Conn - remote string - ctx context.Context - cancel context.CancelFunc -} - -func NewDummyProxy(remote string) *DummyProxy { - p := &DummyProxy{remote: remote} - p.ctx, p.cancel = context.WithCancel(context.Background()) - return p -} - -func (p *DummyProxy) Close() error { - p.cancel() - return nil -} - -func (p *DummyProxy) Start(remoteConn net.Conn) error { - p.conn = remoteConn - go func() { - buf := make([]byte, 1500) - for { - select { - case <-p.ctx.Done(): - return - default: - _, err := p.conn.Read(buf) - if err != nil { - log.Errorf("error while reading RemoteKey %s proxy %v", p.remote, err) - return - } - //log.Debugf("received %s from %s", string(buf[:n]), p.remote) - } - - } - }() - - go func() { - for { - select { - case <-p.ctx.Done(): - return - default: - _, err := p.conn.Write([]byte("hello")) - //log.Debugf("sent ping to %s", p.remote) - if err != nil { - log.Errorf("error while writing to RemoteKey %s proxy %v", p.remote, err) - return - } - time.Sleep(5 * time.Second) - } - } - - }() - - return nil -} - -func (p *DummyProxy) Type() Type { - return TypeDummy -} diff --git a/client/internal/proxy/noproxy.go b/client/internal/proxy/noproxy.go deleted file mode 100644 index dcfe182fd..000000000 --- a/client/internal/proxy/noproxy.go +++ /dev/null @@ -1,42 +0,0 @@ -package proxy - -import ( - log "github.com/sirupsen/logrus" - "net" -) - -// NoProxy is used just to configure WireGuard without any local proxy in between. -// Used when the WireGuard interface is userspace and uses bind.ICEBind -type NoProxy struct { - config Config -} - -// NewNoProxy creates a new NoProxy with a provided config -func NewNoProxy(config Config) *NoProxy { - return &NoProxy{config: config} -} - -// Close removes peer from the WireGuard interface -func (p *NoProxy) Close() error { - err := p.config.WgInterface.RemovePeer(p.config.RemoteKey) - if err != nil { - return err - } - return nil -} - -// Start just updates WireGuard peer with the remote address -func (p *NoProxy) Start(remoteConn net.Conn) error { - - log.Debugf("using NoProxy to connect to peer %s at %s", p.config.RemoteKey, remoteConn.RemoteAddr().String()) - addr, err := net.ResolveUDPAddr("udp", remoteConn.RemoteAddr().String()) - if err != nil { - return err - } - return p.config.WgInterface.UpdatePeer(p.config.RemoteKey, p.config.AllowedIps, DefaultWgKeepAlive, - addr, p.config.PreSharedKey) -} - -func (p *NoProxy) Type() Type { - return TypeNoProxy -} diff --git a/client/internal/proxy/proxy.go b/client/internal/proxy/proxy.go deleted file mode 100644 index a0b9e98a1..000000000 --- a/client/internal/proxy/proxy.go +++ /dev/null @@ -1,35 +0,0 @@ -package proxy - -import ( - "github.com/netbirdio/netbird/iface" - "golang.zx2c4.com/wireguard/wgctrl/wgtypes" - "io" - "net" - "time" -) - -const DefaultWgKeepAlive = 25 * time.Second - -type Type string - -const ( - TypeDirectNoProxy Type = "DirectNoProxy" - TypeWireGuard Type = "WireGuard" - TypeDummy Type = "Dummy" - TypeNoProxy Type = "NoProxy" -) - -type Config struct { - WgListenAddr string - RemoteKey string - WgInterface *iface.WGIface - AllowedIps string - PreSharedKey *wgtypes.Key -} - -type Proxy interface { - io.Closer - // Start creates a local remoteConn and starts proxying data from/to remoteConn - Start(remoteConn net.Conn) error - Type() Type -} diff --git a/client/internal/proxy/wireguard.go b/client/internal/proxy/wireguard.go deleted file mode 100644 index ec3c6a730..000000000 --- a/client/internal/proxy/wireguard.go +++ /dev/null @@ -1,128 +0,0 @@ -package proxy - -import ( - "context" - log "github.com/sirupsen/logrus" - "net" -) - -// WireGuardProxy proxies -type WireGuardProxy struct { - ctx context.Context - cancel context.CancelFunc - - config Config - - remoteConn net.Conn - localConn net.Conn -} - -func NewWireGuardProxy(config Config) *WireGuardProxy { - p := &WireGuardProxy{config: config} - p.ctx, p.cancel = context.WithCancel(context.Background()) - return p -} - -func (p *WireGuardProxy) updateEndpoint() error { - udpAddr, err := net.ResolveUDPAddr(p.localConn.LocalAddr().Network(), p.localConn.LocalAddr().String()) - if err != nil { - return err - } - // add local proxy connection as a Wireguard peer - err = p.config.WgInterface.UpdatePeer(p.config.RemoteKey, p.config.AllowedIps, DefaultWgKeepAlive, - udpAddr, p.config.PreSharedKey) - if err != nil { - return err - } - - return nil -} - -func (p *WireGuardProxy) Start(remoteConn net.Conn) error { - p.remoteConn = remoteConn - - var err error - p.localConn, err = net.Dial("udp", p.config.WgListenAddr) - if err != nil { - log.Errorf("failed dialing to local Wireguard port %s", err) - return err - } - - err = p.updateEndpoint() - if err != nil { - log.Errorf("error while updating Wireguard peer endpoint [%s] %v", p.config.RemoteKey, err) - return err - } - - go p.proxyToRemote() - go p.proxyToLocal() - - return nil -} - -func (p *WireGuardProxy) Close() error { - p.cancel() - if c := p.localConn; c != nil { - err := p.localConn.Close() - if err != nil { - return err - } - } - err := p.config.WgInterface.RemovePeer(p.config.RemoteKey) - if err != nil { - return err - } - return nil -} - -// proxyToRemote proxies everything from Wireguard to the RemoteKey peer -// blocks -func (p *WireGuardProxy) proxyToRemote() { - - buf := make([]byte, 1500) - for { - select { - case <-p.ctx.Done(): - log.Debugf("stopped proxying to remote peer %s due to closed connection", p.config.RemoteKey) - return - default: - n, err := p.localConn.Read(buf) - if err != nil { - continue - } - - _, err = p.remoteConn.Write(buf[:n]) - if err != nil { - continue - } - } - } -} - -// proxyToLocal proxies everything from the RemoteKey peer to local Wireguard -// blocks -func (p *WireGuardProxy) proxyToLocal() { - - buf := make([]byte, 1500) - for { - select { - case <-p.ctx.Done(): - log.Debugf("stopped proxying from remote peer %s due to closed connection", p.config.RemoteKey) - return - default: - n, err := p.remoteConn.Read(buf) - if err != nil { - continue - } - - _, err = p.localConn.Write(buf[:n]) - if err != nil { - continue - } - } - } -} - -func (p *WireGuardProxy) Type() Type { - return TypeWireGuard -} diff --git a/client/internal/wgproxy/bpf/portreplace.c b/client/internal/wgproxy/bpf/portreplace.c new file mode 100644 index 000000000..25994116b --- /dev/null +++ b/client/internal/wgproxy/bpf/portreplace.c @@ -0,0 +1,90 @@ +#include +#include // ETH_P_IP +#include +#include +#include +#include +#include + +#define bpf_printk(fmt, ...) \ + ({ \ + char ____fmt[] = fmt; \ + bpf_trace_printk(____fmt, sizeof(____fmt), ##__VA_ARGS__); \ + }) + +const __u32 map_key_proxy_port = 0; +const __u32 map_key_wg_port = 1; + +struct bpf_map_def SEC("maps") xdp_port_map = { + .type = BPF_MAP_TYPE_ARRAY, + .key_size = sizeof(__u32), + .value_size = sizeof(__u16), + .max_entries = 10, +}; + +__u16 proxy_port = 0; +__u16 wg_port = 0; + +bool read_port_settings() { + __u16 *value; + value = bpf_map_lookup_elem(&xdp_port_map, &map_key_proxy_port); + if(!value) { + return false; + } + + proxy_port = *value; + + value = bpf_map_lookup_elem(&xdp_port_map, &map_key_wg_port); + if(!value) { + return false; + } + wg_port = *value; + + return true; +} + +SEC("xdp") +int xdp_prog_func(struct xdp_md *ctx) { + if(proxy_port == 0 || wg_port == 0) { + if(!read_port_settings()){ + return XDP_PASS; + } + bpf_printk("proxy port: %d, wg port: %d", proxy_port, wg_port); + } + + void *data = (void *)(long)ctx->data; + void *data_end = (void *)(long)ctx->data_end; + struct ethhdr *eth = data; + struct iphdr *ip = (data + sizeof(struct ethhdr)); + struct udphdr *udp = (data + sizeof(struct ethhdr) + sizeof(struct iphdr)); + + // return early if not enough data + if (data + sizeof(struct ethhdr) + sizeof(struct iphdr) + sizeof(struct udphdr) > data_end){ + return XDP_PASS; + } + + // skip non IPv4 packages + if (eth->h_proto != htons(ETH_P_IP)) { + return XDP_PASS; + } + + if (ip->protocol != IPPROTO_UDP) { + return XDP_PASS; + } + + // 2130706433 = 127.0.0.1 + if (ip->daddr != htonl(2130706433)) { + return XDP_PASS; + } + + if (udp->source != htons(wg_port)){ + return XDP_PASS; + } + + __be16 new_src_port = udp->dest; + __be16 new_dst_port = htons(proxy_port); + udp->dest = new_dst_port; + udp->source = new_src_port; + return XDP_PASS; +} +char _license[] SEC("license") = "GPL"; \ No newline at end of file diff --git a/client/internal/wgproxy/bpf_bpfeb.go b/client/internal/wgproxy/bpf_bpfeb.go new file mode 100644 index 000000000..37a5cb22c --- /dev/null +++ b/client/internal/wgproxy/bpf_bpfeb.go @@ -0,0 +1,120 @@ +// Code generated by bpf2go; DO NOT EDIT. +//go:build arm64be || armbe || mips || mips64 || mips64p32 || ppc64 || s390 || s390x || sparc || sparc64 +// +build arm64be armbe mips mips64 mips64p32 ppc64 s390 s390x sparc sparc64 + +package wgproxy + +import ( + "bytes" + _ "embed" + "fmt" + "io" + + "github.com/cilium/ebpf" +) + +// loadBpf returns the embedded CollectionSpec for bpf. +func loadBpf() (*ebpf.CollectionSpec, error) { + reader := bytes.NewReader(_BpfBytes) + spec, err := ebpf.LoadCollectionSpecFromReader(reader) + if err != nil { + return nil, fmt.Errorf("can't load bpf: %w", err) + } + + return spec, err +} + +// loadBpfObjects loads bpf and converts it into a struct. +// +// The following types are suitable as obj argument: +// +// *bpfObjects +// *bpfPrograms +// *bpfMaps +// +// See ebpf.CollectionSpec.LoadAndAssign documentation for details. +func loadBpfObjects(obj interface{}, opts *ebpf.CollectionOptions) error { + spec, err := loadBpf() + if err != nil { + return err + } + + return spec.LoadAndAssign(obj, opts) +} + +// bpfSpecs contains maps and programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfSpecs struct { + bpfProgramSpecs + bpfMapSpecs +} + +// bpfSpecs contains programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfProgramSpecs struct { + XdpProgFunc *ebpf.ProgramSpec `ebpf:"xdp_prog_func"` +} + +// bpfMapSpecs contains maps before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfMapSpecs struct { + XdpPortMap *ebpf.MapSpec `ebpf:"xdp_port_map"` +} + +// bpfObjects contains all objects after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfObjects struct { + bpfPrograms + bpfMaps +} + +func (o *bpfObjects) Close() error { + return _BpfClose( + &o.bpfPrograms, + &o.bpfMaps, + ) +} + +// bpfMaps contains all maps after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfMaps struct { + XdpPortMap *ebpf.Map `ebpf:"xdp_port_map"` +} + +func (m *bpfMaps) Close() error { + return _BpfClose( + m.XdpPortMap, + ) +} + +// bpfPrograms contains all programs after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfPrograms struct { + XdpProgFunc *ebpf.Program `ebpf:"xdp_prog_func"` +} + +func (p *bpfPrograms) Close() error { + return _BpfClose( + p.XdpProgFunc, + ) +} + +func _BpfClose(closers ...io.Closer) error { + for _, closer := range closers { + if err := closer.Close(); err != nil { + return err + } + } + return nil +} + +// Do not access this directly. +// +//go:embed bpf_bpfeb.o +var _BpfBytes []byte diff --git a/client/internal/wgproxy/bpf_bpfeb.o b/client/internal/wgproxy/bpf_bpfeb.o new file mode 100644 index 000000000..c3aa757d6 Binary files /dev/null and b/client/internal/wgproxy/bpf_bpfeb.o differ diff --git a/client/internal/wgproxy/bpf_bpfel.go b/client/internal/wgproxy/bpf_bpfel.go new file mode 100644 index 000000000..c553b9da4 --- /dev/null +++ b/client/internal/wgproxy/bpf_bpfel.go @@ -0,0 +1,120 @@ +// Code generated by bpf2go; DO NOT EDIT. +//go:build 386 || amd64 || amd64p32 || arm || arm64 || mips64le || mips64p32le || mipsle || ppc64le || riscv64 +// +build 386 amd64 amd64p32 arm arm64 mips64le mips64p32le mipsle ppc64le riscv64 + +package wgproxy + +import ( + "bytes" + _ "embed" + "fmt" + "io" + + "github.com/cilium/ebpf" +) + +// loadBpf returns the embedded CollectionSpec for bpf. +func loadBpf() (*ebpf.CollectionSpec, error) { + reader := bytes.NewReader(_BpfBytes) + spec, err := ebpf.LoadCollectionSpecFromReader(reader) + if err != nil { + return nil, fmt.Errorf("can't load bpf: %w", err) + } + + return spec, err +} + +// loadBpfObjects loads bpf and converts it into a struct. +// +// The following types are suitable as obj argument: +// +// *bpfObjects +// *bpfPrograms +// *bpfMaps +// +// See ebpf.CollectionSpec.LoadAndAssign documentation for details. +func loadBpfObjects(obj interface{}, opts *ebpf.CollectionOptions) error { + spec, err := loadBpf() + if err != nil { + return err + } + + return spec.LoadAndAssign(obj, opts) +} + +// bpfSpecs contains maps and programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfSpecs struct { + bpfProgramSpecs + bpfMapSpecs +} + +// bpfSpecs contains programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfProgramSpecs struct { + XdpProgFunc *ebpf.ProgramSpec `ebpf:"xdp_prog_func"` +} + +// bpfMapSpecs contains maps before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfMapSpecs struct { + XdpPortMap *ebpf.MapSpec `ebpf:"xdp_port_map"` +} + +// bpfObjects contains all objects after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfObjects struct { + bpfPrograms + bpfMaps +} + +func (o *bpfObjects) Close() error { + return _BpfClose( + &o.bpfPrograms, + &o.bpfMaps, + ) +} + +// bpfMaps contains all maps after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfMaps struct { + XdpPortMap *ebpf.Map `ebpf:"xdp_port_map"` +} + +func (m *bpfMaps) Close() error { + return _BpfClose( + m.XdpPortMap, + ) +} + +// bpfPrograms contains all programs after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfPrograms struct { + XdpProgFunc *ebpf.Program `ebpf:"xdp_prog_func"` +} + +func (p *bpfPrograms) Close() error { + return _BpfClose( + p.XdpProgFunc, + ) +} + +func _BpfClose(closers ...io.Closer) error { + for _, closer := range closers { + if err := closer.Close(); err != nil { + return err + } + } + return nil +} + +// Do not access this directly. +// +//go:embed bpf_bpfel.o +var _BpfBytes []byte diff --git a/client/internal/wgproxy/bpf_bpfel.o b/client/internal/wgproxy/bpf_bpfel.o new file mode 100644 index 000000000..d8b24d641 Binary files /dev/null and b/client/internal/wgproxy/bpf_bpfel.o differ diff --git a/client/internal/wgproxy/factory.go b/client/internal/wgproxy/factory.go new file mode 100644 index 000000000..0e1261500 --- /dev/null +++ b/client/internal/wgproxy/factory.go @@ -0,0 +1,20 @@ +package wgproxy + +type Factory struct { + wgPort int + ebpfProxy Proxy +} + +func (w *Factory) GetProxy() Proxy { + if w.ebpfProxy != nil { + return w.ebpfProxy + } + return NewWGUserSpaceProxy(w.wgPort) +} + +func (w *Factory) Free() error { + if w.ebpfProxy != nil { + return w.ebpfProxy.CloseConn() + } + return nil +} diff --git a/client/internal/wgproxy/factory_linux.go b/client/internal/wgproxy/factory_linux.go new file mode 100644 index 000000000..0ae9b57b3 --- /dev/null +++ b/client/internal/wgproxy/factory_linux.go @@ -0,0 +1,19 @@ +package wgproxy + +import ( + log "github.com/sirupsen/logrus" +) + +func NewFactory(wgPort int) *Factory { + f := &Factory{wgPort: wgPort} + + ebpfProxy := NewWGEBPFProxy(wgPort) + err := ebpfProxy.Listen() + if err != nil { + log.Errorf("failed to initialize ebpf proxy: %s", err) + return f + } + + f.ebpfProxy = ebpfProxy + return f +} diff --git a/client/internal/wgproxy/factory_nonlinux.go b/client/internal/wgproxy/factory_nonlinux.go new file mode 100644 index 000000000..c538efd84 --- /dev/null +++ b/client/internal/wgproxy/factory_nonlinux.go @@ -0,0 +1,7 @@ +//go:build !linux || android + +package wgproxy + +func NewFactory(wgPort int) *Factory { + return &Factory{wgPort: wgPort} +} diff --git a/client/internal/wgproxy/loader.go b/client/internal/wgproxy/loader.go new file mode 100644 index 000000000..5833b9c25 --- /dev/null +++ b/client/internal/wgproxy/loader.go @@ -0,0 +1,80 @@ +//go:build linux && !android + +package wgproxy + +import ( + _ "embed" + "net" + + "github.com/cilium/ebpf/link" + "github.com/cilium/ebpf/rlimit" +) + +const ( + mapKeyProxyPort uint32 = 0 + mapKeyWgPort uint32 = 1 +) + +//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -cc clang-14 bpf bpf/portreplace.c -- + +type eBPF struct { + link link.Link +} + +func newEBPF() *eBPF { + return &eBPF{} +} + +func (l *eBPF) load(proxyPort, wgPort int) error { + // it required for Docker + err := rlimit.RemoveMemlock() + if err != nil { + return err + } + + ifce, err := net.InterfaceByName("lo") + if err != nil { + return err + } + + // Load pre-compiled programs into the kernel. + objs := bpfObjects{} + err = loadBpfObjects(&objs, nil) + if err != nil { + return err + } + defer func() { + _ = objs.Close() + }() + + err = objs.XdpPortMap.Put(mapKeyProxyPort, uint16(proxyPort)) + if err != nil { + return err + } + + err = objs.XdpPortMap.Put(mapKeyWgPort, uint16(wgPort)) + if err != nil { + return err + } + + defer func() { + _ = objs.XdpPortMap.Close() + }() + + l.link, err = link.AttachXDP(link.XDPOptions{ + Program: objs.XdpProgFunc, + Interface: ifce.Index, + }) + if err != nil { + return err + } + + return err +} + +func (l *eBPF) free() error { + if l.link != nil { + return l.link.Close() + } + return nil +} diff --git a/client/internal/wgproxy/loader_test.go b/client/internal/wgproxy/loader_test.go new file mode 100644 index 000000000..ad0fed236 --- /dev/null +++ b/client/internal/wgproxy/loader_test.go @@ -0,0 +1,18 @@ +//go:build linux + +package wgproxy + +import ( + "testing" +) + +func Test_newEBPF(t *testing.T) { + ebpf := newEBPF() + err := ebpf.load(1234, 51892) + defer func() { + _ = ebpf.free() + }() + if err != nil { + t.Errorf("%s", err) + } +} diff --git a/client/internal/wgproxy/portlookup.go b/client/internal/wgproxy/portlookup.go new file mode 100644 index 000000000..6f3d33487 --- /dev/null +++ b/client/internal/wgproxy/portlookup.go @@ -0,0 +1,32 @@ +package wgproxy + +import ( + "fmt" + "net" +) + +const ( + portRangeStart = 3128 + portRangeEnd = 3228 +) + +type portLookup struct { +} + +func (pl portLookup) searchFreePort() (int, error) { + for i := portRangeStart; i <= portRangeEnd; i++ { + if pl.tryToBind(i) == nil { + return i, nil + } + } + return 0, fmt.Errorf("failed to bind free port for eBPF proxy") +} + +func (pl portLookup) tryToBind(port int) error { + l, err := net.ListenPacket("udp", fmt.Sprintf(":%d", port)) + if err != nil { + return err + } + _ = l.Close() + return nil +} diff --git a/client/internal/wgproxy/portlookup_test.go b/client/internal/wgproxy/portlookup_test.go new file mode 100644 index 000000000..6a386f330 --- /dev/null +++ b/client/internal/wgproxy/portlookup_test.go @@ -0,0 +1,42 @@ +package wgproxy + +import ( + "fmt" + "net" + "testing" +) + +func Test_portLookup_searchFreePort(t *testing.T) { + pl := portLookup{} + _, err := pl.searchFreePort() + if err != nil { + t.Fatal(err) + } +} + +func Test_portLookup_on_allocated(t *testing.T) { + pl := portLookup{} + + allocatedPort, err := allocatePort(portRangeStart) + if err != nil { + t.Fatal(err) + } + defer allocatedPort.Close() + + fp, err := pl.searchFreePort() + if err != nil { + t.Fatal(err) + } + + if fp != (portRangeStart + 1) { + t.Errorf("invalid free port, expected: %d, got: %d", portRangeStart+1, fp) + } +} + +func allocatePort(port int) (net.PacketConn, error) { + c, err := net.ListenPacket("udp", fmt.Sprintf(":%d", port)) + if err != nil { + return nil, err + } + return c, err +} diff --git a/client/internal/wgproxy/proxy.go b/client/internal/wgproxy/proxy.go new file mode 100644 index 000000000..16ebf0f35 --- /dev/null +++ b/client/internal/wgproxy/proxy.go @@ -0,0 +1,12 @@ +package wgproxy + +import ( + "net" +) + +// Proxy is a transfer layer between the Turn connection and the WireGuard +type Proxy interface { + AddTurnConn(urnConn net.Conn) (net.Addr, error) + CloseConn() error + Free() error +} diff --git a/client/internal/wgproxy/proxy_ebpf.go b/client/internal/wgproxy/proxy_ebpf.go new file mode 100644 index 000000000..7683aa52a --- /dev/null +++ b/client/internal/wgproxy/proxy_ebpf.go @@ -0,0 +1,250 @@ +//go:build linux && !android + +package wgproxy + +import ( + "fmt" + "io" + "net" + "os" + "sync" + "syscall" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + + log "github.com/sirupsen/logrus" +) + +// WGEBPFProxy definition for proxy with eBPF support +type WGEBPFProxy struct { + ebpf *eBPF + lastUsedPort uint16 + localWGListenPort int + + turnConnStore map[uint16]net.Conn + turnConnMutex sync.Mutex + + rawConn net.PacketConn + conn *net.UDPConn +} + +// NewWGEBPFProxy create new WGEBPFProxy instance +func NewWGEBPFProxy(wgPort int) *WGEBPFProxy { + log.Debugf("instantiate ebpf proxy") + wgProxy := &WGEBPFProxy{ + localWGListenPort: wgPort, + ebpf: newEBPF(), + lastUsedPort: 0, + turnConnStore: make(map[uint16]net.Conn), + } + return wgProxy +} + +// Listen load ebpf program and listen the proxy +func (p *WGEBPFProxy) Listen() error { + pl := portLookup{} + wgPorxyPort, err := pl.searchFreePort() + if err != nil { + return err + } + + p.rawConn, err = p.prepareSenderRawSocket() + if err != nil { + return err + } + + err = p.ebpf.load(wgPorxyPort, p.localWGListenPort) + if err != nil { + return err + } + + addr := net.UDPAddr{ + Port: wgPorxyPort, + IP: net.ParseIP("127.0.0.1"), + } + + p.conn, err = net.ListenUDP("udp", &addr) + if err != nil { + cErr := p.Free() + if err != nil { + log.Errorf("failed to close the wgproxy: %s", cErr) + } + return err + } + + go p.proxyToRemote() + log.Infof("local wg proxy listening on: %d", wgPorxyPort) + return nil +} + +// AddTurnConn add new turn connection for the proxy +func (p *WGEBPFProxy) AddTurnConn(turnConn net.Conn) (net.Addr, error) { + wgEndpointPort, err := p.storeTurnConn(turnConn) + if err != nil { + return nil, err + } + + go p.proxyToLocal(wgEndpointPort, turnConn) + log.Infof("turn conn added to wg proxy store: %s, endpoint port: :%d", turnConn.RemoteAddr(), wgEndpointPort) + + wgEndpoint := &net.UDPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: int(wgEndpointPort), + } + return wgEndpoint, nil +} + +// CloseConn doing nothing because this type of proxy implementation does not store the connection +func (p *WGEBPFProxy) CloseConn() error { + return nil +} + +// Free resources +func (p *WGEBPFProxy) Free() error { + var err1, err2, err3 error + if p.conn != nil { + err1 = p.conn.Close() + } + + err2 = p.ebpf.free() + if p.rawConn != nil { + err3 = p.rawConn.Close() + } + + if err1 != nil { + return err1 + } + + if err2 != nil { + return err2 + } + + return err3 +} + +func (p *WGEBPFProxy) proxyToLocal(endpointPort uint16, remoteConn net.Conn) { + buf := make([]byte, 1500) + for { + n, err := remoteConn.Read(buf) + if err != nil { + if err != io.EOF { + log.Errorf("failed to read from turn conn (endpoint: :%d): %s", endpointPort, err) + } + p.removeTurnConn(endpointPort) + return + } + err = p.sendPkg(buf[:n], endpointPort) + if err != nil { + log.Errorf("failed to write out turn pkg to local conn: %v", err) + } + } +} + +// proxyToRemote read messages from local WireGuard interface and forward it to remote conn +func (p *WGEBPFProxy) proxyToRemote() { + buf := make([]byte, 1500) + for { + n, addr, err := p.conn.ReadFromUDP(buf) + if err != nil { + log.Errorf("failed to read UDP pkg from WG: %s", err) + return + } + + conn, ok := p.turnConnStore[uint16(addr.Port)] + if !ok { + log.Errorf("turn conn not found by port: %d", addr.Port) + continue + } + + _, err = conn.Write(buf[:n]) + if err != nil { + log.Debugf("failed to forward local wg pkg (%d) to remote turn conn: %s", addr.Port, err) + } + } +} + +func (p *WGEBPFProxy) storeTurnConn(turnConn net.Conn) (uint16, error) { + p.turnConnMutex.Lock() + defer p.turnConnMutex.Unlock() + + np, err := p.nextFreePort() + if err != nil { + return np, err + } + p.turnConnStore[np] = turnConn + return np, nil +} + +func (p *WGEBPFProxy) removeTurnConn(turnConnID uint16) { + log.Tracef("remove turn conn from store by port: %d", turnConnID) + p.turnConnMutex.Lock() + defer p.turnConnMutex.Unlock() + delete(p.turnConnStore, turnConnID) + +} + +func (p *WGEBPFProxy) nextFreePort() (uint16, error) { + if len(p.turnConnStore) == 65535 { + return 0, fmt.Errorf("reached maximum turn connection numbers") + } +generatePort: + if p.lastUsedPort == 65535 { + p.lastUsedPort = 1 + } else { + p.lastUsedPort++ + } + + if _, ok := p.turnConnStore[p.lastUsedPort]; ok { + goto generatePort + } + return p.lastUsedPort, nil +} + +func (p *WGEBPFProxy) prepareSenderRawSocket() (net.PacketConn, error) { + fd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_RAW, syscall.IPPROTO_RAW) + if err != nil { + return nil, err + } + err = syscall.SetsockoptInt(fd, syscall.IPPROTO_IP, syscall.IP_HDRINCL, 1) + if err != nil { + return nil, err + } + err = syscall.SetsockoptString(fd, syscall.SOL_SOCKET, syscall.SO_BINDTODEVICE, "lo") + if err != nil { + return nil, err + } + + return net.FilePacketConn(os.NewFile(uintptr(fd), fmt.Sprintf("fd %d", fd))) +} + +func (p *WGEBPFProxy) sendPkg(data []byte, port uint16) error { + localhost := net.ParseIP("127.0.0.1") + + payload := gopacket.Payload(data) + ipH := &layers.IPv4{ + DstIP: localhost, + SrcIP: localhost, + Version: 4, + TTL: 64, + Protocol: layers.IPProtocolUDP, + } + udpH := &layers.UDP{ + SrcPort: layers.UDPPort(port), + DstPort: layers.UDPPort(p.localWGListenPort), + } + + err := udpH.SetNetworkLayerForChecksum(ipH) + if err != nil { + return err + } + + layerBuffer := gopacket.NewSerializeBuffer() + + err = gopacket.SerializeLayers(layerBuffer, gopacket.SerializeOptions{ComputeChecksums: true, FixLengths: true}, ipH, udpH, payload) + if err != nil { + return err + } + _, err = p.rawConn.WriteTo(layerBuffer.Bytes(), &net.IPAddr{IP: localhost}) + return err +} diff --git a/client/internal/wgproxy/proxy_ebpf_test.go b/client/internal/wgproxy/proxy_ebpf_test.go new file mode 100644 index 000000000..84c74cdcc --- /dev/null +++ b/client/internal/wgproxy/proxy_ebpf_test.go @@ -0,0 +1,56 @@ +//go:build linux && !android + +package wgproxy + +import ( + "testing" +) + +func TestWGEBPFProxy_connStore(t *testing.T) { + wgProxy := NewWGEBPFProxy(1) + + p, _ := wgProxy.storeTurnConn(nil) + if p != 1 { + t.Errorf("invalid initial port: %d", wgProxy.lastUsedPort) + } + + numOfConns := 10 + for i := 0; i < numOfConns; i++ { + p, _ = wgProxy.storeTurnConn(nil) + } + if p != uint16(numOfConns)+1 { + t.Errorf("invalid last used port: %d, expected: %d", p, numOfConns+1) + } + if len(wgProxy.turnConnStore) != numOfConns+1 { + t.Errorf("invalid store size: %d, expected: %d", len(wgProxy.turnConnStore), numOfConns+1) + } +} + +func TestWGEBPFProxy_portCalculation_overflow(t *testing.T) { + wgProxy := NewWGEBPFProxy(1) + + _, _ = wgProxy.storeTurnConn(nil) + wgProxy.lastUsedPort = 65535 + p, _ := wgProxy.storeTurnConn(nil) + + if len(wgProxy.turnConnStore) != 2 { + t.Errorf("invalid store size: %d, expected: %d", len(wgProxy.turnConnStore), 2) + } + + if p != 2 { + t.Errorf("invalid last used port: %d, expected: %d", p, 2) + } +} + +func TestWGEBPFProxy_portCalculation_maxConn(t *testing.T) { + wgProxy := NewWGEBPFProxy(1) + + for i := 0; i < 65535; i++ { + _, _ = wgProxy.storeTurnConn(nil) + } + + _, err := wgProxy.storeTurnConn(nil) + if err == nil { + t.Errorf("invalid turn conn store calculation") + } +} diff --git a/client/internal/wgproxy/proxy_userspace.go b/client/internal/wgproxy/proxy_userspace.go new file mode 100644 index 000000000..f2411e976 --- /dev/null +++ b/client/internal/wgproxy/proxy_userspace.go @@ -0,0 +1,105 @@ +package wgproxy + +import ( + "context" + "fmt" + "net" + + log "github.com/sirupsen/logrus" +) + +// WGUserSpaceProxy proxies +type WGUserSpaceProxy struct { + localWGListenPort int + ctx context.Context + cancel context.CancelFunc + + remoteConn net.Conn + localConn net.Conn +} + +// NewWGUserSpaceProxy instantiate a user space WireGuard proxy +func NewWGUserSpaceProxy(wgPort int) *WGUserSpaceProxy { + p := &WGUserSpaceProxy{ + localWGListenPort: wgPort, + } + p.ctx, p.cancel = context.WithCancel(context.Background()) + return p +} + +// AddTurnConn start the proxy with the given remote conn +func (p *WGUserSpaceProxy) AddTurnConn(remoteConn net.Conn) (net.Addr, error) { + p.remoteConn = remoteConn + + var err error + p.localConn, err = net.Dial("udp", fmt.Sprintf(":%d", p.localWGListenPort)) + if err != nil { + log.Errorf("failed dialing to local Wireguard port %s", err) + return nil, err + } + + go p.proxyToRemote() + go p.proxyToLocal() + + return p.localConn.LocalAddr(), err +} + +// CloseConn close the localConn +func (p *WGUserSpaceProxy) CloseConn() error { + p.cancel() + if p.localConn == nil { + return nil + } + return p.localConn.Close() +} + +// Free doing nothing because this implementation of proxy does not have global state +func (p *WGUserSpaceProxy) Free() error { + return nil +} + +// proxyToRemote proxies everything from Wireguard to the RemoteKey peer +// blocks +func (p *WGUserSpaceProxy) proxyToRemote() { + + buf := make([]byte, 1500) + for { + select { + case <-p.ctx.Done(): + return + default: + n, err := p.localConn.Read(buf) + if err != nil { + continue + } + + _, err = p.remoteConn.Write(buf[:n]) + if err != nil { + continue + } + } + } +} + +// proxyToLocal proxies everything from the RemoteKey peer to local Wireguard +// blocks +func (p *WGUserSpaceProxy) proxyToLocal() { + + buf := make([]byte, 1500) + for { + select { + case <-p.ctx.Done(): + return + default: + n, err := p.remoteConn.Read(buf) + if err != nil { + continue + } + + _, err = p.localConn.Write(buf[:n]) + if err != nil { + continue + } + } + } +} diff --git a/go.mod b/go.mod index 4cb1c48c2..32e25c1a0 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( require ( fyne.io/fyne/v2 v2.1.4 github.com/c-robinson/iplib v1.0.3 + github.com/cilium/ebpf v0.10.0 github.com/coreos/go-iptables v0.6.0 github.com/creack/pty v1.1.18 github.com/eko/gocache/v3 v3.1.1 @@ -125,7 +126,6 @@ require ( github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect - github.com/rogpeppe/go-internal v1.8.0 // indirect github.com/spf13/cast v1.5.0 // indirect github.com/srwiley/oksvg v0.0.0-20200311192757-870daf9aa564 // indirect github.com/srwiley/rasterx v0.0.0-20200120212402-85cb7272f5e9 // indirect diff --git a/go.sum b/go.sum index b374e507e..da913db52 100644 --- a/go.sum +++ b/go.sum @@ -101,6 +101,8 @@ github.com/cilium/ebpf v0.0.0-20200110133405-4032b1d8aae3/go.mod h1:MA5e5Lr8slmE github.com/cilium/ebpf v0.4.0/go.mod h1:4tRaxcgiL706VnOzHOdBlY8IEAIdxINsQBcU4xJJXRs= github.com/cilium/ebpf v0.5.0/go.mod h1:4tRaxcgiL706VnOzHOdBlY8IEAIdxINsQBcU4xJJXRs= github.com/cilium/ebpf v0.7.0/go.mod h1:/oI2+1shJiTGAMgl6/RgJr36Eo1jzrRcAWbcXO2usCA= +github.com/cilium/ebpf v0.10.0 h1:nk5HPMeoBXtOzbkZBWym+ZWq1GIiHUsBFXxwewXAHLQ= +github.com/cilium/ebpf v0.10.0/go.mod h1:DPiVdY/kT534dgc9ERmvP8mWA+9gvwgKfRvk4nNWnoE= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= @@ -177,7 +179,7 @@ github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQL github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= -github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE= +github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY= github.com/fredbi/uri v0.0.0-20181227131451-3dcfdacbaaf3 h1:FDqhDm7pcsLhhWl1QtD8vlzI4mm59llRvNzrFg6/LAA= github.com/fredbi/uri v0.0.0-20181227131451-3dcfdacbaaf3/go.mod h1:CzM2G82Q9BDUvMTGHnXf/6OExw/Dz2ivDj48nVg7Lg8= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -419,7 +421,7 @@ github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFB github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.4-0.20190131011033-7dc38fb350b1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -552,7 +554,6 @@ github.com/pion/turn/v2 v2.1.0 h1:5wGHSgGhJhP/RpabkUb/T9PdsAjkGLS6toYz5HNzoSI= github.com/pion/turn/v2 v2.1.0/go.mod h1:yrT5XbXSGX1VFSF31A3c1kCNB5bBZgk/uu5LET162qs= github.com/pion/udp/v2 v2.0.1 h1:xP0z6WNux1zWEjhC7onRA3EwwSliXqu1ElUZAQhUP54= github.com/pion/udp/v2 v2.0.1/go.mod h1:B7uvTMP00lzWdyMr/1PVZXtV3wpPIxBRd4Wl6AksXn8= -github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -596,8 +597,7 @@ github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40T github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= -github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= -github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rs/cors v1.8.0 h1:P2KMzcFwrPoSjkF1WLRPsp3UMLyql8L4v9hQpVeK5so= github.com/rs/cors v1.8.0/go.mod h1:EBwu+T5AvHOcXwvZIkQFjUN6s8Czyqw12GL/Y0tUyRM= github.com/rs/xid v1.3.0 h1:6NjYksEUlhurdVehpc7S7dk6DAmcKv8V9gG0FsVN2U4= diff --git a/iface/bind/udp_mux_universal.go b/iface/bind/udp_mux_universal.go index 30c015623..9792bba55 100644 --- a/iface/bind/udp_mux_universal.go +++ b/iface/bind/udp_mux_universal.go @@ -82,7 +82,7 @@ func (m *UniversalUDPMuxDefault) ReadFromConn(ctx context.Context) { default: _, a, err := m.params.UDPConn.ReadFrom(buf) if err != nil { - log.Errorf("error while reading packet %s", err) + log.Errorf("error while reading packet: %s", err) continue } msg := &stun.Message{ diff --git a/iface/iface.go b/iface/iface.go index 6c7e1a1cd..58167d1e8 100644 --- a/iface/iface.go +++ b/iface/iface.go @@ -74,7 +74,7 @@ func (w *WGIface) UpdatePeer(peerKey string, allowedIps string, keepAlive time.D w.mu.Lock() defer w.mu.Unlock() - log.Debugf("updating interface %s peer %s: endpoint %s ", w.tun.DeviceName(), peerKey, endpoint) + log.Debugf("updating interface %s peer %s, endpoint %s ", w.tun.DeviceName(), peerKey, endpoint) return w.configurer.updatePeer(peerKey, allowedIps, keepAlive, endpoint, preSharedKey) }