diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index 5327e31d2..911ddd228 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -518,6 +518,9 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) { defer conn.mu.Unlock() if conn.ctx.Err() != nil { + if err := rci.relayedConn.Close(); err != nil { + log.Warnf("failed to close unnecessary relayed connection: %v", err) + } return } @@ -530,6 +533,7 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) { conn.log.Errorf("failed to add relayed net.Conn to local proxy: %v", err) return } + conn.log.Infof("created new wgProxy for relay connection: %s", endpoint) endpointUdpAddr, _ := net.ResolveUDPAddr(endpoint.Network(), endpoint.String()) conn.endpointRelay = endpointUdpAddr @@ -775,9 +779,8 @@ func (conn *Conn) getEndpointForICEConnInfo(iceConnInfo ICEConnInfo) (net.Addr, ep, err := wgProxy.AddTurnConn(iceConnInfo.RemoteConn) if err != nil { conn.log.Errorf("failed to add turn net.Conn to local proxy: %v", err) - err = wgProxy.CloseConn() - if err != nil { - conn.log.Warnf("failed to close turn proxy connection: %v", err) + if errClose := wgProxy.CloseConn(); errClose != nil { + conn.log.Warnf("failed to close turn proxy connection: %v", errClose) } return nil, nil, err } diff --git a/client/internal/wgproxy/proxy_userspace.go b/client/internal/wgproxy/proxy_userspace.go index c2c8a9b51..701f615b9 100644 --- a/client/internal/wgproxy/proxy_userspace.go +++ b/client/internal/wgproxy/proxy_userspace.go @@ -32,8 +32,8 @@ func NewWGUserSpaceProxy(ctx context.Context, wgPort int) *WGUserSpaceProxy { } // AddTurnConn start the proxy with the given remote conn -func (p *WGUserSpaceProxy) AddTurnConn(turnConn net.Conn) (net.Addr, error) { - p.remoteConn = turnConn +func (p *WGUserSpaceProxy) AddTurnConn(remoteConn net.Conn) (net.Addr, error) { + p.remoteConn = remoteConn var err error p.localConn, err = nbnet.NewDialer().DialContext(p.ctx, "udp", fmt.Sprintf(":%d", p.localWGListenPort)) @@ -54,6 +54,14 @@ func (p *WGUserSpaceProxy) CloseConn() error { if p.localConn == nil { return nil } + + if p.remoteConn == nil { + return nil + } + + if err := p.remoteConn.Close(); err != nil { + log.Warnf("failed to close remote conn: %s", err) + } return p.localConn.Close() } @@ -65,6 +73,8 @@ func (p *WGUserSpaceProxy) Free() error { // proxyToRemote proxies everything from Wireguard to the RemoteKey peer // blocks func (p *WGUserSpaceProxy) proxyToRemote() { + defer log.Infof("exit from proxyToRemote: %s", p.localConn.LocalAddr()) + buf := make([]byte, 1500) for { select { @@ -93,7 +103,8 @@ func (p *WGUserSpaceProxy) proxyToRemote() { // proxyToLocal proxies everything from the RemoteKey peer to local Wireguard // blocks func (p *WGUserSpaceProxy) proxyToLocal() { - + defer p.cancel() + defer log.Infof("exit from proxyToLocal: %s", p.localConn.LocalAddr()) buf := make([]byte, 1500) for { select { @@ -103,7 +114,6 @@ func (p *WGUserSpaceProxy) proxyToLocal() { n, err := p.remoteConn.Read(buf) if err != nil { if err == io.EOF { - p.cancel() return } log.Errorf("failed to read from remote conn: %s", err) diff --git a/relay/client/client.go b/relay/client/client.go index e431c029d..90bc3ac41 100644 --- a/relay/client/client.go +++ b/relay/client/client.go @@ -58,7 +58,10 @@ func (m *Msg) Free() { m.bufPool.Put(m.bufPtr) } +// connContainer is a container for the connection to the peer. It is responsible for managing the messages from the +// server and forwarding them to the upper layer content reader. type connContainer struct { + log *log.Entry conn *Conn messages chan Msg msgChanLock sync.Mutex @@ -67,10 +70,10 @@ type connContainer struct { cancel context.CancelFunc } -func newConnContainer(conn *Conn, messages chan Msg) *connContainer { +func newConnContainer(log *log.Entry, conn *Conn, messages chan Msg) *connContainer { ctx, cancel := context.WithCancel(context.Background()) - return &connContainer{ + log: log, conn: conn, messages: messages, ctx: ctx, @@ -91,6 +94,10 @@ func (cc *connContainer) writeMsg(msg Msg) { case cc.messages <- msg: case <-cc.ctx.Done(): msg.Free() + default: + msg.Free() + cc.log.Infof("message queue is full") + // todo consider to close the connection } } @@ -141,8 +148,8 @@ type Client struct { // NewClient creates a new client for the relay server. The client is not connected to the server until the Connect func NewClient(ctx context.Context, serverURL string, authTokenStore *auth.TokenStore, peerID string) *Client { hashedID, hashedStringId := messages.HashID(peerID) - return &Client{ - log: log.WithFields(log.Fields{"client_id": hashedStringId, "relay": serverURL}), + c := &Client{ + log: log.WithFields(log.Fields{"relay": serverURL}), parentCtx: ctx, connectionURL: serverURL, authTokenStore: authTokenStore, @@ -155,6 +162,8 @@ func NewClient(ctx context.Context, serverURL string, authTokenStore *auth.Token }, conns: make(map[string]*connContainer), } + c.log.Infof("create new relay connection: local peerID: %s, local peer hashedID: %s", peerID, hashedStringId) + return c } // Connect establishes a connection to the relay server. It blocks until the connection is established or an error occurs. @@ -203,10 +212,10 @@ func (c *Client) OpenConn(dstPeerID string) (net.Conn, error) { } c.log.Infof("open connection to peer: %s", hashedStringID) - msgChannel := make(chan Msg, 2) + msgChannel := make(chan Msg, 100) conn := NewConn(c, hashedID, hashedStringID, msgChannel, c.instanceURL) - c.conns[hashedStringID] = newConnContainer(conn, msgChannel) + c.conns[hashedStringID] = newConnContainer(c.log, conn, msgChannel) return conn, nil } @@ -455,7 +464,10 @@ func (c *Client) listenForStopEvents(hc *healthcheck.Receiver, conn net.Conn, in } c.log.Errorf("health check timeout") internalStopFlag.set() - _ = conn.Close() // ignore the err because the readLoop will handle it + if err := conn.Close(); err != nil { + // ignore the err handling because the readLoop will handle it + c.log.Warnf("failed to close connection: %s", err) + } return case <-c.parentCtx.Done(): err := c.close(true) @@ -486,6 +498,7 @@ func (c *Client) closeConn(connReference *Conn, id string) error { if container.conn != connReference { return fmt.Errorf("conn reference mismatch") } + c.log.Infof("free up connection to peer: %s", id) delete(c.conns, id) container.close()