Wg ebpf proxy (#911)

EBPF proxy between TURN (relay) and WireGuard to reduce number of used ports used by the NetBird agent.
- Separate the wg configuration from the proxy logic
- In case if eBPF type proxy has only one single proxy instance
- In case if the eBPF is not supported fallback to the original proxy Implementation

Between the signature of eBPF type proxy and original proxy has 
differences so this is why the factory structure exists
This commit is contained in:
Zoltan Papp 2023-07-26 14:00:47 +02:00 committed by GitHub
parent 6dee89379b
commit b0364da67c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1117 additions and 379 deletions

View File

@ -20,8 +20,8 @@ import (
"github.com/netbirdio/netbird/client/internal/acl" "github.com/netbirdio/netbird/client/internal/acl"
"github.com/netbirdio/netbird/client/internal/dns" "github.com/netbirdio/netbird/client/internal/dns"
"github.com/netbirdio/netbird/client/internal/peer" "github.com/netbirdio/netbird/client/internal/peer"
"github.com/netbirdio/netbird/client/internal/proxy"
"github.com/netbirdio/netbird/client/internal/routemanager" "github.com/netbirdio/netbird/client/internal/routemanager"
"github.com/netbirdio/netbird/client/internal/wgproxy"
nbssh "github.com/netbirdio/netbird/client/ssh" nbssh "github.com/netbirdio/netbird/client/ssh"
nbdns "github.com/netbirdio/netbird/dns" nbdns "github.com/netbirdio/netbird/dns"
"github.com/netbirdio/netbird/iface" "github.com/netbirdio/netbird/iface"
@ -101,7 +101,8 @@ type Engine struct {
ctx context.Context ctx context.Context
wgInterface *iface.WGIface wgInterface *iface.WGIface
wgProxyFactory *wgproxy.Factory
udpMux *bind.UniversalUDPMuxDefault udpMux *bind.UniversalUDPMuxDefault
udpMuxConn io.Closer udpMuxConn io.Closer
@ -132,6 +133,7 @@ func NewEngine(
signalClient signal.Client, mgmClient mgm.Client, signalClient signal.Client, mgmClient mgm.Client,
config *EngineConfig, mobileDep MobileDependency, statusRecorder *peer.Status, config *EngineConfig, mobileDep MobileDependency, statusRecorder *peer.Status,
) *Engine { ) *Engine {
return &Engine{ return &Engine{
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
@ -146,6 +148,7 @@ func NewEngine(
networkSerial: 0, networkSerial: 0,
sshServerFunc: nbssh.DefaultSSHServer, sshServerFunc: nbssh.DefaultSSHServer,
statusRecorder: statusRecorder, statusRecorder: statusRecorder,
wgProxyFactory: wgproxy.NewFactory(config.WgPort),
} }
} }
@ -282,7 +285,7 @@ func (e *Engine) modifyPeers(peersUpdate []*mgmProto.RemotePeerConfig) error {
for _, p := range peersUpdate { for _, p := range peersUpdate {
peerPubKey := p.GetWgPubKey() peerPubKey := p.GetWgPubKey()
if peerConn, ok := e.peerConns[peerPubKey]; ok { if peerConn, ok := e.peerConns[peerPubKey]; ok {
if peerConn.GetConf().ProxyConfig.AllowedIps != strings.Join(p.AllowedIps, ",") { if peerConn.WgConfig().AllowedIps != strings.Join(p.AllowedIps, ",") {
modified = append(modified, p) modified = append(modified, p)
continue continue
} }
@ -795,9 +798,7 @@ func (e *Engine) connWorker(conn *peer.Conn, peerKey string) {
// we might have received new STUN and TURN servers meanwhile, so update them // we might have received new STUN and TURN servers meanwhile, so update them
e.syncMsgMux.Lock() e.syncMsgMux.Lock()
conf := conn.GetConf() conn.UpdateStunTurn(append(e.STUNs, e.TURNs...))
conf.StunTurn = append(e.STUNs, e.TURNs...)
conn.UpdateConf(conf)
e.syncMsgMux.Unlock() e.syncMsgMux.Unlock()
err := conn.Open() err := conn.Open()
@ -826,9 +827,9 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs string) (*peer.Conn, e
stunTurn = append(stunTurn, e.STUNs...) stunTurn = append(stunTurn, e.STUNs...)
stunTurn = append(stunTurn, e.TURNs...) stunTurn = append(stunTurn, e.TURNs...)
proxyConfig := proxy.Config{ wgConfig := peer.WgConfig{
RemoteKey: pubKey, RemoteKey: pubKey,
WgListenAddr: fmt.Sprintf("127.0.0.1:%d", e.config.WgPort), WgListenPort: e.config.WgPort,
WgInterface: e.wgInterface, WgInterface: e.wgInterface,
AllowedIps: allowedIPs, AllowedIps: allowedIPs,
PreSharedKey: e.config.PreSharedKey, PreSharedKey: e.config.PreSharedKey,
@ -845,13 +846,13 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs string) (*peer.Conn, e
Timeout: timeout, Timeout: timeout,
UDPMux: e.udpMux.UDPMuxDefault, UDPMux: e.udpMux.UDPMuxDefault,
UDPMuxSrflx: e.udpMux, UDPMuxSrflx: e.udpMux,
ProxyConfig: proxyConfig, WgConfig: wgConfig,
LocalWgPort: e.config.WgPort, LocalWgPort: e.config.WgPort,
NATExternalIPs: e.parseNATExternalIPMappings(), NATExternalIPs: e.parseNATExternalIPMappings(),
UserspaceBind: e.wgInterface.IsUserspaceBind(), UserspaceBind: e.wgInterface.IsUserspaceBind(),
} }
peerConn, err := peer.NewConn(config, e.statusRecorder, e.mobileDep.TunAdapter, e.mobileDep.IFaceDiscover) peerConn, err := peer.NewConn(config, e.statusRecorder, e.wgProxyFactory, e.mobileDep.TunAdapter, e.mobileDep.IFaceDiscover)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1008,6 +1009,10 @@ func (e *Engine) parseNATExternalIPMappings() []string {
} }
func (e *Engine) close() { func (e *Engine) close() {
if err := e.wgProxyFactory.Free(); err != nil {
log.Errorf("failed closing ebpf proxy: %s", err)
}
log.Debugf("removing Netbird interface %s", e.config.WgIfaceName) log.Debugf("removing Netbird interface %s", e.config.WgIfaceName)
if e.wgInterface != nil { if e.wgInterface != nil {
if err := e.wgInterface.Close(); err != nil { if err := e.wgInterface.Close(); err != nil {

View File

@ -367,9 +367,9 @@ func TestEngine_UpdateNetworkMap(t *testing.T) {
t.Errorf("expecting Engine.peerConns to contain peer %s", p) t.Errorf("expecting Engine.peerConns to contain peer %s", p)
} }
expectedAllowedIPs := strings.Join(p.AllowedIps, ",") expectedAllowedIPs := strings.Join(p.AllowedIps, ",")
if conn.GetConf().ProxyConfig.AllowedIps != expectedAllowedIPs { if conn.WgConfig().AllowedIps != expectedAllowedIPs {
t.Errorf("expecting peer %s to have AllowedIPs= %s, got %s", p.GetWgPubKey(), t.Errorf("expecting peer %s to have AllowedIPs= %s, got %s", p.GetWgPubKey(),
expectedAllowedIPs, conn.GetConf().ProxyConfig.AllowedIps) expectedAllowedIPs, conn.WgConfig().AllowedIps)
} }
} }
}) })

View File

@ -10,9 +10,10 @@ import (
"github.com/pion/ice/v2" "github.com/pion/ice/v2"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"github.com/netbirdio/netbird/client/internal/proxy"
"github.com/netbirdio/netbird/client/internal/stdnet" "github.com/netbirdio/netbird/client/internal/stdnet"
"github.com/netbirdio/netbird/client/internal/wgproxy"
"github.com/netbirdio/netbird/iface" "github.com/netbirdio/netbird/iface"
"github.com/netbirdio/netbird/iface/bind" "github.com/netbirdio/netbird/iface/bind"
signal "github.com/netbirdio/netbird/signal/client" signal "github.com/netbirdio/netbird/signal/client"
@ -23,8 +24,18 @@ import (
const ( const (
iceKeepAliveDefault = 4 * time.Second iceKeepAliveDefault = 4 * time.Second
iceDisconnectedTimeoutDefault = 6 * time.Second iceDisconnectedTimeoutDefault = 6 * time.Second
defaultWgKeepAlive = 25 * time.Second
) )
type WgConfig struct {
WgListenPort int
RemoteKey string
WgInterface *iface.WGIface
AllowedIps string
PreSharedKey *wgtypes.Key
}
// ConnConfig is a peer Connection configuration // ConnConfig is a peer Connection configuration
type ConnConfig struct { type ConnConfig struct {
@ -43,7 +54,7 @@ type ConnConfig struct {
Timeout time.Duration Timeout time.Duration
ProxyConfig proxy.Config WgConfig WgConfig
UDPMux ice.UDPMux UDPMux ice.UDPMux
UDPMuxSrflx ice.UniversalUDPMux UDPMuxSrflx ice.UniversalUDPMux
@ -98,7 +109,9 @@ type Conn struct {
statusRecorder *Status statusRecorder *Status
proxy proxy.Proxy wgProxyFactory *wgproxy.Factory
wgProxy wgproxy.Proxy
remoteModeCh chan ModeMessage remoteModeCh chan ModeMessage
meta meta meta meta
@ -122,14 +135,19 @@ func (conn *Conn) GetConf() ConnConfig {
return conn.config return conn.config
} }
// UpdateConf updates the connection config // WgConfig returns the WireGuard config
func (conn *Conn) UpdateConf(conf ConnConfig) { func (conn *Conn) WgConfig() WgConfig {
conn.config = conf return conn.config.WgConfig
}
// UpdateStunTurn update the turn and stun addresses
func (conn *Conn) UpdateStunTurn(turnStun []*ice.URL) {
conn.config.StunTurn = turnStun
} }
// NewConn creates a new not opened Conn to the remote peer. // NewConn creates a new not opened Conn to the remote peer.
// To establish a connection run Conn.Open // To establish a connection run Conn.Open
func NewConn(config ConnConfig, statusRecorder *Status, adapter iface.TunAdapter, iFaceDiscover stdnet.ExternalIFaceDiscover) (*Conn, error) { func NewConn(config ConnConfig, statusRecorder *Status, wgProxyFactory *wgproxy.Factory, adapter iface.TunAdapter, iFaceDiscover stdnet.ExternalIFaceDiscover) (*Conn, error) {
return &Conn{ return &Conn{
config: config, config: config,
mu: sync.Mutex{}, mu: sync.Mutex{},
@ -139,6 +157,7 @@ func NewConn(config ConnConfig, statusRecorder *Status, adapter iface.TunAdapter
remoteAnswerCh: make(chan OfferAnswer), remoteAnswerCh: make(chan OfferAnswer),
statusRecorder: statusRecorder, statusRecorder: statusRecorder,
remoteModeCh: make(chan ModeMessage, 1), remoteModeCh: make(chan ModeMessage, 1),
wgProxyFactory: wgProxyFactory,
adapter: adapter, adapter: adapter,
iFaceDiscover: iFaceDiscover, iFaceDiscover: iFaceDiscover,
}, nil }, nil
@ -215,12 +234,12 @@ func (conn *Conn) candidateTypes() []ice.CandidateType {
func (conn *Conn) Open() error { func (conn *Conn) Open() error {
log.Debugf("trying to connect to peer %s", conn.config.Key) log.Debugf("trying to connect to peer %s", conn.config.Key)
peerState := State{PubKey: conn.config.Key} peerState := State{
PubKey: conn.config.Key,
peerState.IP = strings.Split(conn.config.ProxyConfig.AllowedIps, "/")[0] IP: strings.Split(conn.config.WgConfig.AllowedIps, "/")[0],
peerState.ConnStatusUpdate = time.Now() ConnStatusUpdate: time.Now(),
peerState.ConnStatus = conn.status ConnStatus: conn.status,
}
err := conn.statusRecorder.UpdatePeerState(peerState) err := conn.statusRecorder.UpdatePeerState(peerState)
if err != nil { if err != nil {
log.Warnf("erro while updating the state of peer %s,err: %v", conn.config.Key, err) log.Warnf("erro while updating the state of peer %s,err: %v", conn.config.Key, err)
@ -275,10 +294,11 @@ func (conn *Conn) Open() error {
defer conn.notifyDisconnected() defer conn.notifyDisconnected()
conn.mu.Unlock() conn.mu.Unlock()
peerState = State{PubKey: conn.config.Key} peerState = State{
PubKey: conn.config.Key,
peerState.ConnStatus = conn.status ConnStatus: conn.status,
peerState.ConnStatusUpdate = time.Now() ConnStatusUpdate: time.Now(),
}
err = conn.statusRecorder.UpdatePeerState(peerState) err = conn.statusRecorder.UpdatePeerState(peerState)
if err != nil { if err != nil {
log.Warnf("erro while updating the state of peer %s,err: %v", conn.config.Key, err) log.Warnf("erro while updating the state of peer %s,err: %v", conn.config.Key, err)
@ -309,19 +329,12 @@ func (conn *Conn) Open() error {
remoteWgPort = remoteOfferAnswer.WgListenPort remoteWgPort = remoteOfferAnswer.WgListenPort
} }
// the ice connection has been established successfully so we are ready to start the proxy // the ice connection has been established successfully so we are ready to start the proxy
err = conn.startProxy(remoteConn, remoteWgPort) remoteAddr, err := conn.configureConnection(remoteConn, remoteWgPort)
if err != nil { if err != nil {
return err return err
} }
if conn.proxy.Type() == proxy.TypeDirectNoProxy { log.Infof("connected to peer %s, endpoint address: %s", conn.config.Key, remoteAddr.String())
host, _, _ := net.SplitHostPort(remoteConn.LocalAddr().String())
rhost, _, _ := net.SplitHostPort(remoteConn.RemoteAddr().String())
// direct Wireguard connection
log.Infof("directly connected to peer %s [laddr <-> raddr] [%s:%d <-> %s:%d]", conn.config.Key, host, conn.config.LocalWgPort, rhost, remoteWgPort)
} else {
log.Infof("connected to peer %s [laddr <-> raddr] [%s <-> %s]", conn.config.Key, remoteConn.LocalAddr().String(), remoteConn.RemoteAddr().String())
}
// wait until connection disconnected or has been closed externally (upper layer, e.g. engine) // wait until connection disconnected or has been closed externally (upper layer, e.g. engine)
select { select {
@ -338,54 +351,60 @@ func isRelayCandidate(candidate ice.Candidate) bool {
return candidate.Type() == ice.CandidateTypeRelay return candidate.Type() == ice.CandidateTypeRelay
} }
// startProxy starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected // configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected
func (conn *Conn) startProxy(remoteConn net.Conn, remoteWgPort int) error { func (conn *Conn) configureConnection(remoteConn net.Conn, remoteWgPort int) (net.Addr, error) {
conn.mu.Lock() conn.mu.Lock()
defer conn.mu.Unlock() defer conn.mu.Unlock()
var pair *ice.CandidatePair
pair, err := conn.agent.GetSelectedCandidatePair() pair, err := conn.agent.GetSelectedCandidatePair()
if err != nil { if err != nil {
return err return nil, err
} }
peerState := State{PubKey: conn.config.Key} var endpoint net.Addr
p := conn.getProxy(pair, remoteWgPort) if isRelayCandidate(pair.Local) {
conn.proxy = p log.Debugf("setup relay connection")
err = p.Start(remoteConn) conn.wgProxy = conn.wgProxyFactory.GetProxy()
endpoint, err = conn.wgProxy.AddTurnConn(remoteConn)
if err != nil {
return nil, err
}
} else {
// To support old version's with direct mode we attempt to punch an additional role with the remote wireguard port
go conn.punchRemoteWGPort(pair, remoteWgPort)
endpoint = remoteConn.RemoteAddr()
}
endpointUdpAddr, _ := net.ResolveUDPAddr(endpoint.Network(), endpoint.String())
err = conn.config.WgConfig.WgInterface.UpdatePeer(conn.config.WgConfig.RemoteKey, conn.config.WgConfig.AllowedIps, defaultWgKeepAlive, endpointUdpAddr, conn.config.WgConfig.PreSharedKey)
if err != nil { if err != nil {
return err if conn.wgProxy != nil {
_ = conn.wgProxy.CloseConn()
}
return nil, err
} }
conn.status = StatusConnected conn.status = StatusConnected
peerState.ConnStatus = conn.status peerState := State{
peerState.ConnStatusUpdate = time.Now() PubKey: conn.config.Key,
peerState.LocalIceCandidateType = pair.Local.Type().String() ConnStatus: conn.status,
peerState.RemoteIceCandidateType = pair.Remote.Type().String() ConnStatusUpdate: time.Now(),
LocalIceCandidateType: pair.Local.Type().String(),
RemoteIceCandidateType: pair.Remote.Type().String(),
Direct: !isRelayCandidate(pair.Local),
}
if pair.Local.Type() == ice.CandidateTypeRelay || pair.Remote.Type() == ice.CandidateTypeRelay { if pair.Local.Type() == ice.CandidateTypeRelay || pair.Remote.Type() == ice.CandidateTypeRelay {
peerState.Relayed = true peerState.Relayed = true
} }
peerState.Direct = p.Type() == proxy.TypeDirectNoProxy || p.Type() == proxy.TypeNoProxy
err = conn.statusRecorder.UpdatePeerState(peerState) err = conn.statusRecorder.UpdatePeerState(peerState)
if err != nil { if err != nil {
log.Warnf("unable to save peer's state, got error: %v", err) log.Warnf("unable to save peer's state, got error: %v", err)
} }
return nil return endpoint, nil
}
// todo rename this method and the proxy package to something more appropriate
func (conn *Conn) getProxy(pair *ice.CandidatePair, remoteWgPort int) proxy.Proxy {
if isRelayCandidate(pair.Local) {
return proxy.NewWireGuardProxy(conn.config.ProxyConfig)
}
// To support old version's with direct mode we attempt to punch an additional role with the remote wireguard port
go conn.punchRemoteWGPort(pair, remoteWgPort)
return proxy.NewNoProxy(conn.config.ProxyConfig)
} }
func (conn *Conn) punchRemoteWGPort(pair *ice.CandidatePair, remoteWgPort int) { func (conn *Conn) punchRemoteWGPort(pair *ice.CandidatePair, remoteWgPort int) {
@ -414,22 +433,22 @@ func (conn *Conn) cleanup() error {
conn.mu.Lock() conn.mu.Lock()
defer conn.mu.Unlock() defer conn.mu.Unlock()
var err1, err2, err3 error
if conn.agent != nil { if conn.agent != nil {
err := conn.agent.Close() err1 = conn.agent.Close()
if err != nil { if err1 == nil {
return err conn.agent = nil
} }
conn.agent = nil
} }
if conn.proxy != nil { if conn.wgProxy != nil {
err := conn.proxy.Close() err2 = conn.wgProxy.CloseConn()
if err != nil { conn.wgProxy = nil
return err
}
conn.proxy = nil
} }
// todo: is it problem if we try to remove a peer what is never existed?
err3 = conn.config.WgConfig.WgInterface.RemovePeer(conn.config.WgConfig.RemoteKey)
if conn.notifyDisconnected != nil { if conn.notifyDisconnected != nil {
conn.notifyDisconnected() conn.notifyDisconnected()
conn.notifyDisconnected = nil conn.notifyDisconnected = nil
@ -437,10 +456,11 @@ func (conn *Conn) cleanup() error {
conn.status = StatusDisconnected conn.status = StatusDisconnected
peerState := State{PubKey: conn.config.Key} peerState := State{
peerState.ConnStatus = conn.status PubKey: conn.config.Key,
peerState.ConnStatusUpdate = time.Now() ConnStatus: conn.status,
ConnStatusUpdate: time.Now(),
}
err := conn.statusRecorder.UpdatePeerState(peerState) err := conn.statusRecorder.UpdatePeerState(peerState)
if err != nil { if err != nil {
// pretty common error because by that time Engine can already remove the peer and status won't be available. // pretty common error because by that time Engine can already remove the peer and status won't be available.
@ -449,8 +469,13 @@ func (conn *Conn) cleanup() error {
} }
log.Debugf("cleaned up connection to peer %s", conn.config.Key) log.Debugf("cleaned up connection to peer %s", conn.config.Key)
if err1 != nil {
return nil return err1
}
if err2 != nil {
return err2
}
return err3
} }
// SetSignalOffer sets a handler function to be triggered by Conn when a new connection offer has to be signalled to the remote peer // SetSignalOffer sets a handler function to be triggered by Conn when a new connection offer has to be signalled to the remote peer

View File

@ -5,12 +5,11 @@ import (
"testing" "testing"
"time" "time"
"github.com/netbirdio/netbird/client/internal/stdnet"
"github.com/magiconair/properties/assert" "github.com/magiconair/properties/assert"
"github.com/pion/ice/v2" "github.com/pion/ice/v2"
"github.com/netbirdio/netbird/client/internal/proxy" "github.com/netbirdio/netbird/client/internal/stdnet"
"github.com/netbirdio/netbird/client/internal/wgproxy"
"github.com/netbirdio/netbird/iface" "github.com/netbirdio/netbird/iface"
) )
@ -20,7 +19,6 @@ var connConf = ConnConfig{
StunTurn: []*ice.URL{}, StunTurn: []*ice.URL{},
InterfaceBlackList: nil, InterfaceBlackList: nil,
Timeout: time.Second, Timeout: time.Second,
ProxyConfig: proxy.Config{},
LocalWgPort: 51820, LocalWgPort: 51820,
} }
@ -37,7 +35,11 @@ func TestNewConn_interfaceFilter(t *testing.T) {
} }
func TestConn_GetKey(t *testing.T) { func TestConn_GetKey(t *testing.T) {
conn, err := NewConn(connConf, nil, nil, nil) wgProxyFactory := wgproxy.NewFactory(connConf.LocalWgPort)
defer func() {
_ = wgProxyFactory.Free()
}()
conn, err := NewConn(connConf, nil, wgProxyFactory, nil, nil)
if err != nil { if err != nil {
return return
} }
@ -48,8 +50,11 @@ func TestConn_GetKey(t *testing.T) {
} }
func TestConn_OnRemoteOffer(t *testing.T) { func TestConn_OnRemoteOffer(t *testing.T) {
wgProxyFactory := wgproxy.NewFactory(connConf.LocalWgPort)
conn, err := NewConn(connConf, NewRecorder("https://mgm"), nil, nil) defer func() {
_ = wgProxyFactory.Free()
}()
conn, err := NewConn(connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil)
if err != nil { if err != nil {
return return
} }
@ -82,8 +87,11 @@ func TestConn_OnRemoteOffer(t *testing.T) {
} }
func TestConn_OnRemoteAnswer(t *testing.T) { func TestConn_OnRemoteAnswer(t *testing.T) {
wgProxyFactory := wgproxy.NewFactory(connConf.LocalWgPort)
conn, err := NewConn(connConf, NewRecorder("https://mgm"), nil, nil) defer func() {
_ = wgProxyFactory.Free()
}()
conn, err := NewConn(connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil)
if err != nil { if err != nil {
return return
} }
@ -115,8 +123,11 @@ func TestConn_OnRemoteAnswer(t *testing.T) {
wg.Wait() wg.Wait()
} }
func TestConn_Status(t *testing.T) { func TestConn_Status(t *testing.T) {
wgProxyFactory := wgproxy.NewFactory(connConf.LocalWgPort)
conn, err := NewConn(connConf, NewRecorder("https://mgm"), nil, nil) defer func() {
_ = wgProxyFactory.Free()
}()
conn, err := NewConn(connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil)
if err != nil { if err != nil {
return return
} }
@ -142,8 +153,11 @@ func TestConn_Status(t *testing.T) {
} }
func TestConn_Close(t *testing.T) { func TestConn_Close(t *testing.T) {
wgProxyFactory := wgproxy.NewFactory(connConf.LocalWgPort)
conn, err := NewConn(connConf, NewRecorder("https://mgm"), nil, nil) defer func() {
_ = wgProxyFactory.Free()
}()
conn, err := NewConn(connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil)
if err != nil { if err != nil {
return return
} }

View File

@ -1,72 +0,0 @@
package proxy
import (
"context"
log "github.com/sirupsen/logrus"
"net"
"time"
)
// DummyProxy just sends pings to the RemoteKey peer and reads responses
type DummyProxy struct {
conn net.Conn
remote string
ctx context.Context
cancel context.CancelFunc
}
func NewDummyProxy(remote string) *DummyProxy {
p := &DummyProxy{remote: remote}
p.ctx, p.cancel = context.WithCancel(context.Background())
return p
}
func (p *DummyProxy) Close() error {
p.cancel()
return nil
}
func (p *DummyProxy) Start(remoteConn net.Conn) error {
p.conn = remoteConn
go func() {
buf := make([]byte, 1500)
for {
select {
case <-p.ctx.Done():
return
default:
_, err := p.conn.Read(buf)
if err != nil {
log.Errorf("error while reading RemoteKey %s proxy %v", p.remote, err)
return
}
//log.Debugf("received %s from %s", string(buf[:n]), p.remote)
}
}
}()
go func() {
for {
select {
case <-p.ctx.Done():
return
default:
_, err := p.conn.Write([]byte("hello"))
//log.Debugf("sent ping to %s", p.remote)
if err != nil {
log.Errorf("error while writing to RemoteKey %s proxy %v", p.remote, err)
return
}
time.Sleep(5 * time.Second)
}
}
}()
return nil
}
func (p *DummyProxy) Type() Type {
return TypeDummy
}

View File

@ -1,42 +0,0 @@
package proxy
import (
log "github.com/sirupsen/logrus"
"net"
)
// NoProxy is used just to configure WireGuard without any local proxy in between.
// Used when the WireGuard interface is userspace and uses bind.ICEBind
type NoProxy struct {
config Config
}
// NewNoProxy creates a new NoProxy with a provided config
func NewNoProxy(config Config) *NoProxy {
return &NoProxy{config: config}
}
// Close removes peer from the WireGuard interface
func (p *NoProxy) Close() error {
err := p.config.WgInterface.RemovePeer(p.config.RemoteKey)
if err != nil {
return err
}
return nil
}
// Start just updates WireGuard peer with the remote address
func (p *NoProxy) Start(remoteConn net.Conn) error {
log.Debugf("using NoProxy to connect to peer %s at %s", p.config.RemoteKey, remoteConn.RemoteAddr().String())
addr, err := net.ResolveUDPAddr("udp", remoteConn.RemoteAddr().String())
if err != nil {
return err
}
return p.config.WgInterface.UpdatePeer(p.config.RemoteKey, p.config.AllowedIps, DefaultWgKeepAlive,
addr, p.config.PreSharedKey)
}
func (p *NoProxy) Type() Type {
return TypeNoProxy
}

View File

@ -1,35 +0,0 @@
package proxy
import (
"github.com/netbirdio/netbird/iface"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"io"
"net"
"time"
)
const DefaultWgKeepAlive = 25 * time.Second
type Type string
const (
TypeDirectNoProxy Type = "DirectNoProxy"
TypeWireGuard Type = "WireGuard"
TypeDummy Type = "Dummy"
TypeNoProxy Type = "NoProxy"
)
type Config struct {
WgListenAddr string
RemoteKey string
WgInterface *iface.WGIface
AllowedIps string
PreSharedKey *wgtypes.Key
}
type Proxy interface {
io.Closer
// Start creates a local remoteConn and starts proxying data from/to remoteConn
Start(remoteConn net.Conn) error
Type() Type
}

View File

@ -1,128 +0,0 @@
package proxy
import (
"context"
log "github.com/sirupsen/logrus"
"net"
)
// WireGuardProxy proxies
type WireGuardProxy struct {
ctx context.Context
cancel context.CancelFunc
config Config
remoteConn net.Conn
localConn net.Conn
}
func NewWireGuardProxy(config Config) *WireGuardProxy {
p := &WireGuardProxy{config: config}
p.ctx, p.cancel = context.WithCancel(context.Background())
return p
}
func (p *WireGuardProxy) updateEndpoint() error {
udpAddr, err := net.ResolveUDPAddr(p.localConn.LocalAddr().Network(), p.localConn.LocalAddr().String())
if err != nil {
return err
}
// add local proxy connection as a Wireguard peer
err = p.config.WgInterface.UpdatePeer(p.config.RemoteKey, p.config.AllowedIps, DefaultWgKeepAlive,
udpAddr, p.config.PreSharedKey)
if err != nil {
return err
}
return nil
}
func (p *WireGuardProxy) Start(remoteConn net.Conn) error {
p.remoteConn = remoteConn
var err error
p.localConn, err = net.Dial("udp", p.config.WgListenAddr)
if err != nil {
log.Errorf("failed dialing to local Wireguard port %s", err)
return err
}
err = p.updateEndpoint()
if err != nil {
log.Errorf("error while updating Wireguard peer endpoint [%s] %v", p.config.RemoteKey, err)
return err
}
go p.proxyToRemote()
go p.proxyToLocal()
return nil
}
func (p *WireGuardProxy) Close() error {
p.cancel()
if c := p.localConn; c != nil {
err := p.localConn.Close()
if err != nil {
return err
}
}
err := p.config.WgInterface.RemovePeer(p.config.RemoteKey)
if err != nil {
return err
}
return nil
}
// proxyToRemote proxies everything from Wireguard to the RemoteKey peer
// blocks
func (p *WireGuardProxy) proxyToRemote() {
buf := make([]byte, 1500)
for {
select {
case <-p.ctx.Done():
log.Debugf("stopped proxying to remote peer %s due to closed connection", p.config.RemoteKey)
return
default:
n, err := p.localConn.Read(buf)
if err != nil {
continue
}
_, err = p.remoteConn.Write(buf[:n])
if err != nil {
continue
}
}
}
}
// proxyToLocal proxies everything from the RemoteKey peer to local Wireguard
// blocks
func (p *WireGuardProxy) proxyToLocal() {
buf := make([]byte, 1500)
for {
select {
case <-p.ctx.Done():
log.Debugf("stopped proxying from remote peer %s due to closed connection", p.config.RemoteKey)
return
default:
n, err := p.remoteConn.Read(buf)
if err != nil {
continue
}
_, err = p.localConn.Write(buf[:n])
if err != nil {
continue
}
}
}
}
func (p *WireGuardProxy) Type() Type {
return TypeWireGuard
}

View File

@ -0,0 +1,90 @@
#include <stdbool.h>
#include <linux/if_ether.h> // ETH_P_IP
#include <linux/udp.h>
#include <linux/ip.h>
#include <netinet/in.h>
#include <linux/bpf.h>
#include <bpf/bpf_helpers.h>
#define bpf_printk(fmt, ...) \
({ \
char ____fmt[] = fmt; \
bpf_trace_printk(____fmt, sizeof(____fmt), ##__VA_ARGS__); \
})
const __u32 map_key_proxy_port = 0;
const __u32 map_key_wg_port = 1;
struct bpf_map_def SEC("maps") xdp_port_map = {
.type = BPF_MAP_TYPE_ARRAY,
.key_size = sizeof(__u32),
.value_size = sizeof(__u16),
.max_entries = 10,
};
__u16 proxy_port = 0;
__u16 wg_port = 0;
bool read_port_settings() {
__u16 *value;
value = bpf_map_lookup_elem(&xdp_port_map, &map_key_proxy_port);
if(!value) {
return false;
}
proxy_port = *value;
value = bpf_map_lookup_elem(&xdp_port_map, &map_key_wg_port);
if(!value) {
return false;
}
wg_port = *value;
return true;
}
SEC("xdp")
int xdp_prog_func(struct xdp_md *ctx) {
if(proxy_port == 0 || wg_port == 0) {
if(!read_port_settings()){
return XDP_PASS;
}
bpf_printk("proxy port: %d, wg port: %d", proxy_port, wg_port);
}
void *data = (void *)(long)ctx->data;
void *data_end = (void *)(long)ctx->data_end;
struct ethhdr *eth = data;
struct iphdr *ip = (data + sizeof(struct ethhdr));
struct udphdr *udp = (data + sizeof(struct ethhdr) + sizeof(struct iphdr));
// return early if not enough data
if (data + sizeof(struct ethhdr) + sizeof(struct iphdr) + sizeof(struct udphdr) > data_end){
return XDP_PASS;
}
// skip non IPv4 packages
if (eth->h_proto != htons(ETH_P_IP)) {
return XDP_PASS;
}
if (ip->protocol != IPPROTO_UDP) {
return XDP_PASS;
}
// 2130706433 = 127.0.0.1
if (ip->daddr != htonl(2130706433)) {
return XDP_PASS;
}
if (udp->source != htons(wg_port)){
return XDP_PASS;
}
__be16 new_src_port = udp->dest;
__be16 new_dst_port = htons(proxy_port);
udp->dest = new_dst_port;
udp->source = new_src_port;
return XDP_PASS;
}
char _license[] SEC("license") = "GPL";

View File

@ -0,0 +1,120 @@
// Code generated by bpf2go; DO NOT EDIT.
//go:build arm64be || armbe || mips || mips64 || mips64p32 || ppc64 || s390 || s390x || sparc || sparc64
// +build arm64be armbe mips mips64 mips64p32 ppc64 s390 s390x sparc sparc64
package wgproxy
import (
"bytes"
_ "embed"
"fmt"
"io"
"github.com/cilium/ebpf"
)
// loadBpf returns the embedded CollectionSpec for bpf.
func loadBpf() (*ebpf.CollectionSpec, error) {
reader := bytes.NewReader(_BpfBytes)
spec, err := ebpf.LoadCollectionSpecFromReader(reader)
if err != nil {
return nil, fmt.Errorf("can't load bpf: %w", err)
}
return spec, err
}
// loadBpfObjects loads bpf and converts it into a struct.
//
// The following types are suitable as obj argument:
//
// *bpfObjects
// *bpfPrograms
// *bpfMaps
//
// See ebpf.CollectionSpec.LoadAndAssign documentation for details.
func loadBpfObjects(obj interface{}, opts *ebpf.CollectionOptions) error {
spec, err := loadBpf()
if err != nil {
return err
}
return spec.LoadAndAssign(obj, opts)
}
// bpfSpecs contains maps and programs before they are loaded into the kernel.
//
// It can be passed ebpf.CollectionSpec.Assign.
type bpfSpecs struct {
bpfProgramSpecs
bpfMapSpecs
}
// bpfSpecs contains programs before they are loaded into the kernel.
//
// It can be passed ebpf.CollectionSpec.Assign.
type bpfProgramSpecs struct {
XdpProgFunc *ebpf.ProgramSpec `ebpf:"xdp_prog_func"`
}
// bpfMapSpecs contains maps before they are loaded into the kernel.
//
// It can be passed ebpf.CollectionSpec.Assign.
type bpfMapSpecs struct {
XdpPortMap *ebpf.MapSpec `ebpf:"xdp_port_map"`
}
// bpfObjects contains all objects after they have been loaded into the kernel.
//
// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign.
type bpfObjects struct {
bpfPrograms
bpfMaps
}
func (o *bpfObjects) Close() error {
return _BpfClose(
&o.bpfPrograms,
&o.bpfMaps,
)
}
// bpfMaps contains all maps after they have been loaded into the kernel.
//
// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign.
type bpfMaps struct {
XdpPortMap *ebpf.Map `ebpf:"xdp_port_map"`
}
func (m *bpfMaps) Close() error {
return _BpfClose(
m.XdpPortMap,
)
}
// bpfPrograms contains all programs after they have been loaded into the kernel.
//
// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign.
type bpfPrograms struct {
XdpProgFunc *ebpf.Program `ebpf:"xdp_prog_func"`
}
func (p *bpfPrograms) Close() error {
return _BpfClose(
p.XdpProgFunc,
)
}
func _BpfClose(closers ...io.Closer) error {
for _, closer := range closers {
if err := closer.Close(); err != nil {
return err
}
}
return nil
}
// Do not access this directly.
//
//go:embed bpf_bpfeb.o
var _BpfBytes []byte

Binary file not shown.

View File

@ -0,0 +1,120 @@
// Code generated by bpf2go; DO NOT EDIT.
//go:build 386 || amd64 || amd64p32 || arm || arm64 || mips64le || mips64p32le || mipsle || ppc64le || riscv64
// +build 386 amd64 amd64p32 arm arm64 mips64le mips64p32le mipsle ppc64le riscv64
package wgproxy
import (
"bytes"
_ "embed"
"fmt"
"io"
"github.com/cilium/ebpf"
)
// loadBpf returns the embedded CollectionSpec for bpf.
func loadBpf() (*ebpf.CollectionSpec, error) {
reader := bytes.NewReader(_BpfBytes)
spec, err := ebpf.LoadCollectionSpecFromReader(reader)
if err != nil {
return nil, fmt.Errorf("can't load bpf: %w", err)
}
return spec, err
}
// loadBpfObjects loads bpf and converts it into a struct.
//
// The following types are suitable as obj argument:
//
// *bpfObjects
// *bpfPrograms
// *bpfMaps
//
// See ebpf.CollectionSpec.LoadAndAssign documentation for details.
func loadBpfObjects(obj interface{}, opts *ebpf.CollectionOptions) error {
spec, err := loadBpf()
if err != nil {
return err
}
return spec.LoadAndAssign(obj, opts)
}
// bpfSpecs contains maps and programs before they are loaded into the kernel.
//
// It can be passed ebpf.CollectionSpec.Assign.
type bpfSpecs struct {
bpfProgramSpecs
bpfMapSpecs
}
// bpfSpecs contains programs before they are loaded into the kernel.
//
// It can be passed ebpf.CollectionSpec.Assign.
type bpfProgramSpecs struct {
XdpProgFunc *ebpf.ProgramSpec `ebpf:"xdp_prog_func"`
}
// bpfMapSpecs contains maps before they are loaded into the kernel.
//
// It can be passed ebpf.CollectionSpec.Assign.
type bpfMapSpecs struct {
XdpPortMap *ebpf.MapSpec `ebpf:"xdp_port_map"`
}
// bpfObjects contains all objects after they have been loaded into the kernel.
//
// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign.
type bpfObjects struct {
bpfPrograms
bpfMaps
}
func (o *bpfObjects) Close() error {
return _BpfClose(
&o.bpfPrograms,
&o.bpfMaps,
)
}
// bpfMaps contains all maps after they have been loaded into the kernel.
//
// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign.
type bpfMaps struct {
XdpPortMap *ebpf.Map `ebpf:"xdp_port_map"`
}
func (m *bpfMaps) Close() error {
return _BpfClose(
m.XdpPortMap,
)
}
// bpfPrograms contains all programs after they have been loaded into the kernel.
//
// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign.
type bpfPrograms struct {
XdpProgFunc *ebpf.Program `ebpf:"xdp_prog_func"`
}
func (p *bpfPrograms) Close() error {
return _BpfClose(
p.XdpProgFunc,
)
}
func _BpfClose(closers ...io.Closer) error {
for _, closer := range closers {
if err := closer.Close(); err != nil {
return err
}
}
return nil
}
// Do not access this directly.
//
//go:embed bpf_bpfel.o
var _BpfBytes []byte

Binary file not shown.

View File

@ -0,0 +1,20 @@
package wgproxy
type Factory struct {
wgPort int
ebpfProxy Proxy
}
func (w *Factory) GetProxy() Proxy {
if w.ebpfProxy != nil {
return w.ebpfProxy
}
return NewWGUserSpaceProxy(w.wgPort)
}
func (w *Factory) Free() error {
if w.ebpfProxy != nil {
return w.ebpfProxy.CloseConn()
}
return nil
}

View File

@ -0,0 +1,19 @@
package wgproxy
import (
log "github.com/sirupsen/logrus"
)
func NewFactory(wgPort int) *Factory {
f := &Factory{wgPort: wgPort}
ebpfProxy := NewWGEBPFProxy(wgPort)
err := ebpfProxy.Listen()
if err != nil {
log.Errorf("failed to initialize ebpf proxy: %s", err)
return f
}
f.ebpfProxy = ebpfProxy
return f
}

View File

@ -0,0 +1,7 @@
//go:build !linux || android
package wgproxy
func NewFactory(wgPort int) *Factory {
return &Factory{wgPort: wgPort}
}

View File

@ -0,0 +1,80 @@
//go:build linux && !android
package wgproxy
import (
_ "embed"
"net"
"github.com/cilium/ebpf/link"
"github.com/cilium/ebpf/rlimit"
)
const (
mapKeyProxyPort uint32 = 0
mapKeyWgPort uint32 = 1
)
//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -cc clang-14 bpf bpf/portreplace.c --
type eBPF struct {
link link.Link
}
func newEBPF() *eBPF {
return &eBPF{}
}
func (l *eBPF) load(proxyPort, wgPort int) error {
// it required for Docker
err := rlimit.RemoveMemlock()
if err != nil {
return err
}
ifce, err := net.InterfaceByName("lo")
if err != nil {
return err
}
// Load pre-compiled programs into the kernel.
objs := bpfObjects{}
err = loadBpfObjects(&objs, nil)
if err != nil {
return err
}
defer func() {
_ = objs.Close()
}()
err = objs.XdpPortMap.Put(mapKeyProxyPort, uint16(proxyPort))
if err != nil {
return err
}
err = objs.XdpPortMap.Put(mapKeyWgPort, uint16(wgPort))
if err != nil {
return err
}
defer func() {
_ = objs.XdpPortMap.Close()
}()
l.link, err = link.AttachXDP(link.XDPOptions{
Program: objs.XdpProgFunc,
Interface: ifce.Index,
})
if err != nil {
return err
}
return err
}
func (l *eBPF) free() error {
if l.link != nil {
return l.link.Close()
}
return nil
}

View File

@ -0,0 +1,18 @@
//go:build linux
package wgproxy
import (
"testing"
)
func Test_newEBPF(t *testing.T) {
ebpf := newEBPF()
err := ebpf.load(1234, 51892)
defer func() {
_ = ebpf.free()
}()
if err != nil {
t.Errorf("%s", err)
}
}

View File

@ -0,0 +1,32 @@
package wgproxy
import (
"fmt"
"net"
)
const (
portRangeStart = 3128
portRangeEnd = 3228
)
type portLookup struct {
}
func (pl portLookup) searchFreePort() (int, error) {
for i := portRangeStart; i <= portRangeEnd; i++ {
if pl.tryToBind(i) == nil {
return i, nil
}
}
return 0, fmt.Errorf("failed to bind free port for eBPF proxy")
}
func (pl portLookup) tryToBind(port int) error {
l, err := net.ListenPacket("udp", fmt.Sprintf(":%d", port))
if err != nil {
return err
}
_ = l.Close()
return nil
}

View File

@ -0,0 +1,42 @@
package wgproxy
import (
"fmt"
"net"
"testing"
)
func Test_portLookup_searchFreePort(t *testing.T) {
pl := portLookup{}
_, err := pl.searchFreePort()
if err != nil {
t.Fatal(err)
}
}
func Test_portLookup_on_allocated(t *testing.T) {
pl := portLookup{}
allocatedPort, err := allocatePort(portRangeStart)
if err != nil {
t.Fatal(err)
}
defer allocatedPort.Close()
fp, err := pl.searchFreePort()
if err != nil {
t.Fatal(err)
}
if fp != (portRangeStart + 1) {
t.Errorf("invalid free port, expected: %d, got: %d", portRangeStart+1, fp)
}
}
func allocatePort(port int) (net.PacketConn, error) {
c, err := net.ListenPacket("udp", fmt.Sprintf(":%d", port))
if err != nil {
return nil, err
}
return c, err
}

View File

@ -0,0 +1,12 @@
package wgproxy
import (
"net"
)
// Proxy is a transfer layer between the Turn connection and the WireGuard
type Proxy interface {
AddTurnConn(urnConn net.Conn) (net.Addr, error)
CloseConn() error
Free() error
}

View File

@ -0,0 +1,250 @@
//go:build linux && !android
package wgproxy
import (
"fmt"
"io"
"net"
"os"
"sync"
"syscall"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
log "github.com/sirupsen/logrus"
)
// WGEBPFProxy definition for proxy with eBPF support
type WGEBPFProxy struct {
ebpf *eBPF
lastUsedPort uint16
localWGListenPort int
turnConnStore map[uint16]net.Conn
turnConnMutex sync.Mutex
rawConn net.PacketConn
conn *net.UDPConn
}
// NewWGEBPFProxy create new WGEBPFProxy instance
func NewWGEBPFProxy(wgPort int) *WGEBPFProxy {
log.Debugf("instantiate ebpf proxy")
wgProxy := &WGEBPFProxy{
localWGListenPort: wgPort,
ebpf: newEBPF(),
lastUsedPort: 0,
turnConnStore: make(map[uint16]net.Conn),
}
return wgProxy
}
// Listen load ebpf program and listen the proxy
func (p *WGEBPFProxy) Listen() error {
pl := portLookup{}
wgPorxyPort, err := pl.searchFreePort()
if err != nil {
return err
}
p.rawConn, err = p.prepareSenderRawSocket()
if err != nil {
return err
}
err = p.ebpf.load(wgPorxyPort, p.localWGListenPort)
if err != nil {
return err
}
addr := net.UDPAddr{
Port: wgPorxyPort,
IP: net.ParseIP("127.0.0.1"),
}
p.conn, err = net.ListenUDP("udp", &addr)
if err != nil {
cErr := p.Free()
if err != nil {
log.Errorf("failed to close the wgproxy: %s", cErr)
}
return err
}
go p.proxyToRemote()
log.Infof("local wg proxy listening on: %d", wgPorxyPort)
return nil
}
// AddTurnConn add new turn connection for the proxy
func (p *WGEBPFProxy) AddTurnConn(turnConn net.Conn) (net.Addr, error) {
wgEndpointPort, err := p.storeTurnConn(turnConn)
if err != nil {
return nil, err
}
go p.proxyToLocal(wgEndpointPort, turnConn)
log.Infof("turn conn added to wg proxy store: %s, endpoint port: :%d", turnConn.RemoteAddr(), wgEndpointPort)
wgEndpoint := &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: int(wgEndpointPort),
}
return wgEndpoint, nil
}
// CloseConn doing nothing because this type of proxy implementation does not store the connection
func (p *WGEBPFProxy) CloseConn() error {
return nil
}
// Free resources
func (p *WGEBPFProxy) Free() error {
var err1, err2, err3 error
if p.conn != nil {
err1 = p.conn.Close()
}
err2 = p.ebpf.free()
if p.rawConn != nil {
err3 = p.rawConn.Close()
}
if err1 != nil {
return err1
}
if err2 != nil {
return err2
}
return err3
}
func (p *WGEBPFProxy) proxyToLocal(endpointPort uint16, remoteConn net.Conn) {
buf := make([]byte, 1500)
for {
n, err := remoteConn.Read(buf)
if err != nil {
if err != io.EOF {
log.Errorf("failed to read from turn conn (endpoint: :%d): %s", endpointPort, err)
}
p.removeTurnConn(endpointPort)
return
}
err = p.sendPkg(buf[:n], endpointPort)
if err != nil {
log.Errorf("failed to write out turn pkg to local conn: %v", err)
}
}
}
// proxyToRemote read messages from local WireGuard interface and forward it to remote conn
func (p *WGEBPFProxy) proxyToRemote() {
buf := make([]byte, 1500)
for {
n, addr, err := p.conn.ReadFromUDP(buf)
if err != nil {
log.Errorf("failed to read UDP pkg from WG: %s", err)
return
}
conn, ok := p.turnConnStore[uint16(addr.Port)]
if !ok {
log.Errorf("turn conn not found by port: %d", addr.Port)
continue
}
_, err = conn.Write(buf[:n])
if err != nil {
log.Debugf("failed to forward local wg pkg (%d) to remote turn conn: %s", addr.Port, err)
}
}
}
func (p *WGEBPFProxy) storeTurnConn(turnConn net.Conn) (uint16, error) {
p.turnConnMutex.Lock()
defer p.turnConnMutex.Unlock()
np, err := p.nextFreePort()
if err != nil {
return np, err
}
p.turnConnStore[np] = turnConn
return np, nil
}
func (p *WGEBPFProxy) removeTurnConn(turnConnID uint16) {
log.Tracef("remove turn conn from store by port: %d", turnConnID)
p.turnConnMutex.Lock()
defer p.turnConnMutex.Unlock()
delete(p.turnConnStore, turnConnID)
}
func (p *WGEBPFProxy) nextFreePort() (uint16, error) {
if len(p.turnConnStore) == 65535 {
return 0, fmt.Errorf("reached maximum turn connection numbers")
}
generatePort:
if p.lastUsedPort == 65535 {
p.lastUsedPort = 1
} else {
p.lastUsedPort++
}
if _, ok := p.turnConnStore[p.lastUsedPort]; ok {
goto generatePort
}
return p.lastUsedPort, nil
}
func (p *WGEBPFProxy) prepareSenderRawSocket() (net.PacketConn, error) {
fd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_RAW, syscall.IPPROTO_RAW)
if err != nil {
return nil, err
}
err = syscall.SetsockoptInt(fd, syscall.IPPROTO_IP, syscall.IP_HDRINCL, 1)
if err != nil {
return nil, err
}
err = syscall.SetsockoptString(fd, syscall.SOL_SOCKET, syscall.SO_BINDTODEVICE, "lo")
if err != nil {
return nil, err
}
return net.FilePacketConn(os.NewFile(uintptr(fd), fmt.Sprintf("fd %d", fd)))
}
func (p *WGEBPFProxy) sendPkg(data []byte, port uint16) error {
localhost := net.ParseIP("127.0.0.1")
payload := gopacket.Payload(data)
ipH := &layers.IPv4{
DstIP: localhost,
SrcIP: localhost,
Version: 4,
TTL: 64,
Protocol: layers.IPProtocolUDP,
}
udpH := &layers.UDP{
SrcPort: layers.UDPPort(port),
DstPort: layers.UDPPort(p.localWGListenPort),
}
err := udpH.SetNetworkLayerForChecksum(ipH)
if err != nil {
return err
}
layerBuffer := gopacket.NewSerializeBuffer()
err = gopacket.SerializeLayers(layerBuffer, gopacket.SerializeOptions{ComputeChecksums: true, FixLengths: true}, ipH, udpH, payload)
if err != nil {
return err
}
_, err = p.rawConn.WriteTo(layerBuffer.Bytes(), &net.IPAddr{IP: localhost})
return err
}

View File

@ -0,0 +1,56 @@
//go:build linux && !android
package wgproxy
import (
"testing"
)
func TestWGEBPFProxy_connStore(t *testing.T) {
wgProxy := NewWGEBPFProxy(1)
p, _ := wgProxy.storeTurnConn(nil)
if p != 1 {
t.Errorf("invalid initial port: %d", wgProxy.lastUsedPort)
}
numOfConns := 10
for i := 0; i < numOfConns; i++ {
p, _ = wgProxy.storeTurnConn(nil)
}
if p != uint16(numOfConns)+1 {
t.Errorf("invalid last used port: %d, expected: %d", p, numOfConns+1)
}
if len(wgProxy.turnConnStore) != numOfConns+1 {
t.Errorf("invalid store size: %d, expected: %d", len(wgProxy.turnConnStore), numOfConns+1)
}
}
func TestWGEBPFProxy_portCalculation_overflow(t *testing.T) {
wgProxy := NewWGEBPFProxy(1)
_, _ = wgProxy.storeTurnConn(nil)
wgProxy.lastUsedPort = 65535
p, _ := wgProxy.storeTurnConn(nil)
if len(wgProxy.turnConnStore) != 2 {
t.Errorf("invalid store size: %d, expected: %d", len(wgProxy.turnConnStore), 2)
}
if p != 2 {
t.Errorf("invalid last used port: %d, expected: %d", p, 2)
}
}
func TestWGEBPFProxy_portCalculation_maxConn(t *testing.T) {
wgProxy := NewWGEBPFProxy(1)
for i := 0; i < 65535; i++ {
_, _ = wgProxy.storeTurnConn(nil)
}
_, err := wgProxy.storeTurnConn(nil)
if err == nil {
t.Errorf("invalid turn conn store calculation")
}
}

View File

@ -0,0 +1,105 @@
package wgproxy
import (
"context"
"fmt"
"net"
log "github.com/sirupsen/logrus"
)
// WGUserSpaceProxy proxies
type WGUserSpaceProxy struct {
localWGListenPort int
ctx context.Context
cancel context.CancelFunc
remoteConn net.Conn
localConn net.Conn
}
// NewWGUserSpaceProxy instantiate a user space WireGuard proxy
func NewWGUserSpaceProxy(wgPort int) *WGUserSpaceProxy {
p := &WGUserSpaceProxy{
localWGListenPort: wgPort,
}
p.ctx, p.cancel = context.WithCancel(context.Background())
return p
}
// AddTurnConn start the proxy with the given remote conn
func (p *WGUserSpaceProxy) AddTurnConn(remoteConn net.Conn) (net.Addr, error) {
p.remoteConn = remoteConn
var err error
p.localConn, err = net.Dial("udp", fmt.Sprintf(":%d", p.localWGListenPort))
if err != nil {
log.Errorf("failed dialing to local Wireguard port %s", err)
return nil, err
}
go p.proxyToRemote()
go p.proxyToLocal()
return p.localConn.LocalAddr(), err
}
// CloseConn close the localConn
func (p *WGUserSpaceProxy) CloseConn() error {
p.cancel()
if p.localConn == nil {
return nil
}
return p.localConn.Close()
}
// Free doing nothing because this implementation of proxy does not have global state
func (p *WGUserSpaceProxy) Free() error {
return nil
}
// proxyToRemote proxies everything from Wireguard to the RemoteKey peer
// blocks
func (p *WGUserSpaceProxy) proxyToRemote() {
buf := make([]byte, 1500)
for {
select {
case <-p.ctx.Done():
return
default:
n, err := p.localConn.Read(buf)
if err != nil {
continue
}
_, err = p.remoteConn.Write(buf[:n])
if err != nil {
continue
}
}
}
}
// proxyToLocal proxies everything from the RemoteKey peer to local Wireguard
// blocks
func (p *WGUserSpaceProxy) proxyToLocal() {
buf := make([]byte, 1500)
for {
select {
case <-p.ctx.Done():
return
default:
n, err := p.remoteConn.Read(buf)
if err != nil {
continue
}
_, err = p.localConn.Write(buf[:n])
if err != nil {
continue
}
}
}
}

2
go.mod
View File

@ -30,6 +30,7 @@ require (
require ( require (
fyne.io/fyne/v2 v2.1.4 fyne.io/fyne/v2 v2.1.4
github.com/c-robinson/iplib v1.0.3 github.com/c-robinson/iplib v1.0.3
github.com/cilium/ebpf v0.10.0
github.com/coreos/go-iptables v0.6.0 github.com/coreos/go-iptables v0.6.0
github.com/creack/pty v1.1.18 github.com/creack/pty v1.1.18
github.com/eko/gocache/v3 v3.1.1 github.com/eko/gocache/v3 v3.1.1
@ -125,7 +126,6 @@ require (
github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect
github.com/rogpeppe/go-internal v1.8.0 // indirect
github.com/spf13/cast v1.5.0 // indirect github.com/spf13/cast v1.5.0 // indirect
github.com/srwiley/oksvg v0.0.0-20200311192757-870daf9aa564 // indirect github.com/srwiley/oksvg v0.0.0-20200311192757-870daf9aa564 // indirect
github.com/srwiley/rasterx v0.0.0-20200120212402-85cb7272f5e9 // indirect github.com/srwiley/rasterx v0.0.0-20200120212402-85cb7272f5e9 // indirect

10
go.sum
View File

@ -101,6 +101,8 @@ github.com/cilium/ebpf v0.0.0-20200110133405-4032b1d8aae3/go.mod h1:MA5e5Lr8slmE
github.com/cilium/ebpf v0.4.0/go.mod h1:4tRaxcgiL706VnOzHOdBlY8IEAIdxINsQBcU4xJJXRs= github.com/cilium/ebpf v0.4.0/go.mod h1:4tRaxcgiL706VnOzHOdBlY8IEAIdxINsQBcU4xJJXRs=
github.com/cilium/ebpf v0.5.0/go.mod h1:4tRaxcgiL706VnOzHOdBlY8IEAIdxINsQBcU4xJJXRs= github.com/cilium/ebpf v0.5.0/go.mod h1:4tRaxcgiL706VnOzHOdBlY8IEAIdxINsQBcU4xJJXRs=
github.com/cilium/ebpf v0.7.0/go.mod h1:/oI2+1shJiTGAMgl6/RgJr36Eo1jzrRcAWbcXO2usCA= github.com/cilium/ebpf v0.7.0/go.mod h1:/oI2+1shJiTGAMgl6/RgJr36Eo1jzrRcAWbcXO2usCA=
github.com/cilium/ebpf v0.10.0 h1:nk5HPMeoBXtOzbkZBWym+ZWq1GIiHUsBFXxwewXAHLQ=
github.com/cilium/ebpf v0.10.0/go.mod h1:DPiVdY/kT534dgc9ERmvP8mWA+9gvwgKfRvk4nNWnoE=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
@ -177,7 +179,7 @@ github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQL
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE= github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY=
github.com/fredbi/uri v0.0.0-20181227131451-3dcfdacbaaf3 h1:FDqhDm7pcsLhhWl1QtD8vlzI4mm59llRvNzrFg6/LAA= github.com/fredbi/uri v0.0.0-20181227131451-3dcfdacbaaf3 h1:FDqhDm7pcsLhhWl1QtD8vlzI4mm59llRvNzrFg6/LAA=
github.com/fredbi/uri v0.0.0-20181227131451-3dcfdacbaaf3/go.mod h1:CzM2G82Q9BDUvMTGHnXf/6OExw/Dz2ivDj48nVg7Lg8= github.com/fredbi/uri v0.0.0-20181227131451-3dcfdacbaaf3/go.mod h1:CzM2G82Q9BDUvMTGHnXf/6OExw/Dz2ivDj48nVg7Lg8=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
@ -419,7 +421,7 @@ github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFB
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.4-0.20190131011033-7dc38fb350b1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.4-0.20190131011033-7dc38fb350b1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
@ -552,7 +554,6 @@ github.com/pion/turn/v2 v2.1.0 h1:5wGHSgGhJhP/RpabkUb/T9PdsAjkGLS6toYz5HNzoSI=
github.com/pion/turn/v2 v2.1.0/go.mod h1:yrT5XbXSGX1VFSF31A3c1kCNB5bBZgk/uu5LET162qs= github.com/pion/turn/v2 v2.1.0/go.mod h1:yrT5XbXSGX1VFSF31A3c1kCNB5bBZgk/uu5LET162qs=
github.com/pion/udp/v2 v2.0.1 h1:xP0z6WNux1zWEjhC7onRA3EwwSliXqu1ElUZAQhUP54= github.com/pion/udp/v2 v2.0.1 h1:xP0z6WNux1zWEjhC7onRA3EwwSliXqu1ElUZAQhUP54=
github.com/pion/udp/v2 v2.0.1/go.mod h1:B7uvTMP00lzWdyMr/1PVZXtV3wpPIxBRd4Wl6AksXn8= github.com/pion/udp/v2 v2.0.1/go.mod h1:B7uvTMP00lzWdyMr/1PVZXtV3wpPIxBRd4Wl6AksXn8=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@ -596,8 +597,7 @@ github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40T
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
github.com/rs/cors v1.8.0 h1:P2KMzcFwrPoSjkF1WLRPsp3UMLyql8L4v9hQpVeK5so= github.com/rs/cors v1.8.0 h1:P2KMzcFwrPoSjkF1WLRPsp3UMLyql8L4v9hQpVeK5so=
github.com/rs/cors v1.8.0/go.mod h1:EBwu+T5AvHOcXwvZIkQFjUN6s8Czyqw12GL/Y0tUyRM= github.com/rs/cors v1.8.0/go.mod h1:EBwu+T5AvHOcXwvZIkQFjUN6s8Czyqw12GL/Y0tUyRM=
github.com/rs/xid v1.3.0 h1:6NjYksEUlhurdVehpc7S7dk6DAmcKv8V9gG0FsVN2U4= github.com/rs/xid v1.3.0 h1:6NjYksEUlhurdVehpc7S7dk6DAmcKv8V9gG0FsVN2U4=

View File

@ -82,7 +82,7 @@ func (m *UniversalUDPMuxDefault) ReadFromConn(ctx context.Context) {
default: default:
_, a, err := m.params.UDPConn.ReadFrom(buf) _, a, err := m.params.UDPConn.ReadFrom(buf)
if err != nil { if err != nil {
log.Errorf("error while reading packet %s", err) log.Errorf("error while reading packet: %s", err)
continue continue
} }
msg := &stun.Message{ msg := &stun.Message{

View File

@ -74,7 +74,7 @@ func (w *WGIface) UpdatePeer(peerKey string, allowedIps string, keepAlive time.D
w.mu.Lock() w.mu.Lock()
defer w.mu.Unlock() defer w.mu.Unlock()
log.Debugf("updating interface %s peer %s: endpoint %s ", w.tun.DeviceName(), peerKey, endpoint) log.Debugf("updating interface %s peer %s, endpoint %s ", w.tun.DeviceName(), peerKey, endpoint)
return w.configurer.updatePeer(peerKey, allowedIps, keepAlive, endpoint, preSharedKey) return w.configurer.updatePeer(peerKey, allowedIps, keepAlive, endpoint, preSharedKey)
} }