- Increase queue size and drop the overflowed messages (#2617)

- Explicit close the net.Conn in user space wgProxy when close the wgProxy
- Add extra logs
This commit is contained in:
Zoltan Papp 2024-09-19 13:49:09 +02:00 committed by GitHub
parent 28cbb4b70f
commit 6f0fd1d1b3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 40 additions and 14 deletions

View File

@ -518,6 +518,9 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) {
defer conn.mu.Unlock() defer conn.mu.Unlock()
if conn.ctx.Err() != nil { if conn.ctx.Err() != nil {
if err := rci.relayedConn.Close(); err != nil {
log.Warnf("failed to close unnecessary relayed connection: %v", err)
}
return return
} }
@ -530,6 +533,7 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) {
conn.log.Errorf("failed to add relayed net.Conn to local proxy: %v", err) conn.log.Errorf("failed to add relayed net.Conn to local proxy: %v", err)
return return
} }
conn.log.Infof("created new wgProxy for relay connection: %s", endpoint)
endpointUdpAddr, _ := net.ResolveUDPAddr(endpoint.Network(), endpoint.String()) endpointUdpAddr, _ := net.ResolveUDPAddr(endpoint.Network(), endpoint.String())
conn.endpointRelay = endpointUdpAddr conn.endpointRelay = endpointUdpAddr
@ -775,9 +779,8 @@ func (conn *Conn) getEndpointForICEConnInfo(iceConnInfo ICEConnInfo) (net.Addr,
ep, err := wgProxy.AddTurnConn(iceConnInfo.RemoteConn) ep, err := wgProxy.AddTurnConn(iceConnInfo.RemoteConn)
if err != nil { if err != nil {
conn.log.Errorf("failed to add turn net.Conn to local proxy: %v", err) conn.log.Errorf("failed to add turn net.Conn to local proxy: %v", err)
err = wgProxy.CloseConn() if errClose := wgProxy.CloseConn(); errClose != nil {
if err != nil { conn.log.Warnf("failed to close turn proxy connection: %v", errClose)
conn.log.Warnf("failed to close turn proxy connection: %v", err)
} }
return nil, nil, err return nil, nil, err
} }

View File

@ -32,8 +32,8 @@ func NewWGUserSpaceProxy(ctx context.Context, wgPort int) *WGUserSpaceProxy {
} }
// AddTurnConn start the proxy with the given remote conn // AddTurnConn start the proxy with the given remote conn
func (p *WGUserSpaceProxy) AddTurnConn(turnConn net.Conn) (net.Addr, error) { func (p *WGUserSpaceProxy) AddTurnConn(remoteConn net.Conn) (net.Addr, error) {
p.remoteConn = turnConn p.remoteConn = remoteConn
var err error var err error
p.localConn, err = nbnet.NewDialer().DialContext(p.ctx, "udp", fmt.Sprintf(":%d", p.localWGListenPort)) p.localConn, err = nbnet.NewDialer().DialContext(p.ctx, "udp", fmt.Sprintf(":%d", p.localWGListenPort))
@ -54,6 +54,14 @@ func (p *WGUserSpaceProxy) CloseConn() error {
if p.localConn == nil { if p.localConn == nil {
return nil return nil
} }
if p.remoteConn == nil {
return nil
}
if err := p.remoteConn.Close(); err != nil {
log.Warnf("failed to close remote conn: %s", err)
}
return p.localConn.Close() return p.localConn.Close()
} }
@ -65,6 +73,8 @@ func (p *WGUserSpaceProxy) Free() error {
// proxyToRemote proxies everything from Wireguard to the RemoteKey peer // proxyToRemote proxies everything from Wireguard to the RemoteKey peer
// blocks // blocks
func (p *WGUserSpaceProxy) proxyToRemote() { func (p *WGUserSpaceProxy) proxyToRemote() {
defer log.Infof("exit from proxyToRemote: %s", p.localConn.LocalAddr())
buf := make([]byte, 1500) buf := make([]byte, 1500)
for { for {
select { select {
@ -93,7 +103,8 @@ func (p *WGUserSpaceProxy) proxyToRemote() {
// proxyToLocal proxies everything from the RemoteKey peer to local Wireguard // proxyToLocal proxies everything from the RemoteKey peer to local Wireguard
// blocks // blocks
func (p *WGUserSpaceProxy) proxyToLocal() { func (p *WGUserSpaceProxy) proxyToLocal() {
defer p.cancel()
defer log.Infof("exit from proxyToLocal: %s", p.localConn.LocalAddr())
buf := make([]byte, 1500) buf := make([]byte, 1500)
for { for {
select { select {
@ -103,7 +114,6 @@ func (p *WGUserSpaceProxy) proxyToLocal() {
n, err := p.remoteConn.Read(buf) n, err := p.remoteConn.Read(buf)
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
p.cancel()
return return
} }
log.Errorf("failed to read from remote conn: %s", err) log.Errorf("failed to read from remote conn: %s", err)

View File

@ -58,7 +58,10 @@ func (m *Msg) Free() {
m.bufPool.Put(m.bufPtr) m.bufPool.Put(m.bufPtr)
} }
// connContainer is a container for the connection to the peer. It is responsible for managing the messages from the
// server and forwarding them to the upper layer content reader.
type connContainer struct { type connContainer struct {
log *log.Entry
conn *Conn conn *Conn
messages chan Msg messages chan Msg
msgChanLock sync.Mutex msgChanLock sync.Mutex
@ -67,10 +70,10 @@ type connContainer struct {
cancel context.CancelFunc cancel context.CancelFunc
} }
func newConnContainer(conn *Conn, messages chan Msg) *connContainer { func newConnContainer(log *log.Entry, conn *Conn, messages chan Msg) *connContainer {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
return &connContainer{ return &connContainer{
log: log,
conn: conn, conn: conn,
messages: messages, messages: messages,
ctx: ctx, ctx: ctx,
@ -91,6 +94,10 @@ func (cc *connContainer) writeMsg(msg Msg) {
case cc.messages <- msg: case cc.messages <- msg:
case <-cc.ctx.Done(): case <-cc.ctx.Done():
msg.Free() msg.Free()
default:
msg.Free()
cc.log.Infof("message queue is full")
// todo consider to close the connection
} }
} }
@ -141,8 +148,8 @@ type Client struct {
// NewClient creates a new client for the relay server. The client is not connected to the server until the Connect // NewClient creates a new client for the relay server. The client is not connected to the server until the Connect
func NewClient(ctx context.Context, serverURL string, authTokenStore *auth.TokenStore, peerID string) *Client { func NewClient(ctx context.Context, serverURL string, authTokenStore *auth.TokenStore, peerID string) *Client {
hashedID, hashedStringId := messages.HashID(peerID) hashedID, hashedStringId := messages.HashID(peerID)
return &Client{ c := &Client{
log: log.WithFields(log.Fields{"client_id": hashedStringId, "relay": serverURL}), log: log.WithFields(log.Fields{"relay": serverURL}),
parentCtx: ctx, parentCtx: ctx,
connectionURL: serverURL, connectionURL: serverURL,
authTokenStore: authTokenStore, authTokenStore: authTokenStore,
@ -155,6 +162,8 @@ func NewClient(ctx context.Context, serverURL string, authTokenStore *auth.Token
}, },
conns: make(map[string]*connContainer), conns: make(map[string]*connContainer),
} }
c.log.Infof("create new relay connection: local peerID: %s, local peer hashedID: %s", peerID, hashedStringId)
return c
} }
// Connect establishes a connection to the relay server. It blocks until the connection is established or an error occurs. // Connect establishes a connection to the relay server. It blocks until the connection is established or an error occurs.
@ -203,10 +212,10 @@ func (c *Client) OpenConn(dstPeerID string) (net.Conn, error) {
} }
c.log.Infof("open connection to peer: %s", hashedStringID) c.log.Infof("open connection to peer: %s", hashedStringID)
msgChannel := make(chan Msg, 2) msgChannel := make(chan Msg, 100)
conn := NewConn(c, hashedID, hashedStringID, msgChannel, c.instanceURL) conn := NewConn(c, hashedID, hashedStringID, msgChannel, c.instanceURL)
c.conns[hashedStringID] = newConnContainer(conn, msgChannel) c.conns[hashedStringID] = newConnContainer(c.log, conn, msgChannel)
return conn, nil return conn, nil
} }
@ -455,7 +464,10 @@ func (c *Client) listenForStopEvents(hc *healthcheck.Receiver, conn net.Conn, in
} }
c.log.Errorf("health check timeout") c.log.Errorf("health check timeout")
internalStopFlag.set() internalStopFlag.set()
_ = conn.Close() // ignore the err because the readLoop will handle it if err := conn.Close(); err != nil {
// ignore the err handling because the readLoop will handle it
c.log.Warnf("failed to close connection: %s", err)
}
return return
case <-c.parentCtx.Done(): case <-c.parentCtx.Done():
err := c.close(true) err := c.close(true)
@ -486,6 +498,7 @@ func (c *Client) closeConn(connReference *Conn, id string) error {
if container.conn != connReference { if container.conn != connReference {
return fmt.Errorf("conn reference mismatch") return fmt.Errorf("conn reference mismatch")
} }
c.log.Infof("free up connection to peer: %s", id)
delete(c.conns, id) delete(c.conns, id)
container.close() container.close()