mirror of
https://github.com/netbirdio/netbird.git
synced 2024-11-21 23:53:14 +01:00
[client] fix/proxy close (#2873)
When the remote peer switches the Relay instance then must to close the proxy connection to the old instance. It can cause issues when the remote peer switch connects to the Relay instance multiple times and then reconnects to an instance it had previously connected to.
This commit is contained in:
parent
b4d7605147
commit
30f025e7dd
@ -2,6 +2,7 @@ package bind
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
@ -94,7 +95,10 @@ func (p *ProxyBind) close() error {
|
|||||||
|
|
||||||
p.Bind.RemoveEndpoint(p.wgAddr)
|
p.Bind.RemoveEndpoint(p.wgAddr)
|
||||||
|
|
||||||
return p.remoteConn.Close()
|
if rErr := p.remoteConn.Close(); rErr != nil && !errors.Is(rErr, net.ErrClosed) {
|
||||||
|
return rErr
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ProxyBind) proxyToLocal(ctx context.Context) {
|
func (p *ProxyBind) proxyToLocal(ctx context.Context) {
|
||||||
|
@ -77,7 +77,7 @@ func (e *ProxyWrapper) CloseConn() error {
|
|||||||
|
|
||||||
e.cancel()
|
e.cancel()
|
||||||
|
|
||||||
if err := e.remoteConn.Close(); err != nil {
|
if err := e.remoteConn.Close(); err != nil && !errors.Is(err, net.ErrClosed) {
|
||||||
return fmt.Errorf("failed to close remote conn: %w", err)
|
return fmt.Errorf("failed to close remote conn: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -116,7 +116,7 @@ func (p *WGUDPProxy) close() error {
|
|||||||
p.cancel()
|
p.cancel()
|
||||||
|
|
||||||
var result *multierror.Error
|
var result *multierror.Error
|
||||||
if err := p.remoteConn.Close(); err != nil {
|
if err := p.remoteConn.Close(); err != nil && !errors.Is(err, net.ErrClosed) {
|
||||||
result = multierror.Append(result, fmt.Errorf("remote conn: %s", err))
|
result = multierror.Append(result, fmt.Errorf("remote conn: %s", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -442,7 +442,7 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) {
|
|||||||
|
|
||||||
if conn.iceP2PIsActive() {
|
if conn.iceP2PIsActive() {
|
||||||
conn.log.Debugf("do not switch to relay because current priority is: %v", conn.currentConnPriority)
|
conn.log.Debugf("do not switch to relay because current priority is: %v", conn.currentConnPriority)
|
||||||
conn.wgProxyRelay = wgProxy
|
conn.setRelayedProxy(wgProxy)
|
||||||
conn.statusRelay.Set(StatusConnected)
|
conn.statusRelay.Set(StatusConnected)
|
||||||
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
|
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
|
||||||
return
|
return
|
||||||
@ -465,7 +465,7 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) {
|
|||||||
wgConfigWorkaround()
|
wgConfigWorkaround()
|
||||||
conn.currentConnPriority = connPriorityRelay
|
conn.currentConnPriority = connPriorityRelay
|
||||||
conn.statusRelay.Set(StatusConnected)
|
conn.statusRelay.Set(StatusConnected)
|
||||||
conn.wgProxyRelay = wgProxy
|
conn.setRelayedProxy(wgProxy)
|
||||||
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
|
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
|
||||||
conn.log.Infof("start to communicate with peer via relay")
|
conn.log.Infof("start to communicate with peer via relay")
|
||||||
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr)
|
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr)
|
||||||
@ -736,6 +736,15 @@ func (conn *Conn) logTraceConnState() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (conn *Conn) setRelayedProxy(proxy wgproxy.Proxy) {
|
||||||
|
if conn.wgProxyRelay != nil {
|
||||||
|
if err := conn.wgProxyRelay.CloseConn(); err != nil {
|
||||||
|
conn.log.Warnf("failed to close deprecated wg proxy conn: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
conn.wgProxyRelay = proxy
|
||||||
|
}
|
||||||
|
|
||||||
func isController(config ConnConfig) bool {
|
func isController(config ConnConfig) bool {
|
||||||
return config.LocalKey > config.Key
|
return config.LocalKey > config.Key
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,6 @@ package client
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -449,11 +448,11 @@ func (c *Client) writeTo(connReference *Conn, id string, dstID []byte, payload [
|
|||||||
conn, ok := c.conns[id]
|
conn, ok := c.conns[id]
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
if !ok {
|
if !ok {
|
||||||
return 0, io.EOF
|
return 0, net.ErrClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
if conn.conn != connReference {
|
if conn.conn != connReference {
|
||||||
return 0, io.EOF
|
return 0, net.ErrClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo: use buffer pool instead of create new transport msg.
|
// todo: use buffer pool instead of create new transport msg.
|
||||||
@ -508,7 +507,7 @@ func (c *Client) closeConn(connReference *Conn, id string) error {
|
|||||||
|
|
||||||
container, ok := c.conns[id]
|
container, ok := c.conns[id]
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("connection already closed")
|
return net.ErrClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
if container.conn != connReference {
|
if container.conn != connReference {
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
|
||||||
"net"
|
"net"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@ -40,7 +39,7 @@ func (c *Conn) Write(p []byte) (n int, err error) {
|
|||||||
func (c *Conn) Read(b []byte) (n int, err error) {
|
func (c *Conn) Read(b []byte) (n int, err error) {
|
||||||
msg, ok := <-c.messageChan
|
msg, ok := <-c.messageChan
|
||||||
if !ok {
|
if !ok {
|
||||||
return 0, io.EOF
|
return 0, net.ErrClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
n = copy(b, msg.Payload)
|
n = copy(b, msg.Payload)
|
||||||
|
@ -4,7 +4,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -100,7 +99,7 @@ func (c *Conn) isClosed() bool {
|
|||||||
|
|
||||||
func (c *Conn) ioErrHandling(err error) error {
|
func (c *Conn) ioErrHandling(err error) error {
|
||||||
if c.isClosed() {
|
if c.isClosed() {
|
||||||
return io.EOF
|
return net.ErrClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
var wErr *websocket.CloseError
|
var wErr *websocket.CloseError
|
||||||
@ -108,7 +107,7 @@ func (c *Conn) ioErrHandling(err error) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if wErr.Code == websocket.StatusNormalClosure {
|
if wErr.Code == websocket.StatusNormalClosure {
|
||||||
return io.EOF
|
return net.ErrClosed
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -2,7 +2,7 @@ package server
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"io"
|
"errors"
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -57,7 +57,7 @@ func (p *Peer) Work() {
|
|||||||
for {
|
for {
|
||||||
n, err := p.conn.Read(buf)
|
n, err := p.conn.Read(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != io.EOF {
|
if !errors.Is(err, net.ErrClosed) {
|
||||||
p.log.Errorf("failed to read message: %s", err)
|
p.log.Errorf("failed to read message: %s", err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
Loading…
Reference in New Issue
Block a user