Add reconnect logic

This commit is contained in:
Zoltan Papp 2024-05-28 01:00:25 +02:00
parent 645a1f31a7
commit 076ce69a24
2 changed files with 206 additions and 57 deletions

View File

@ -3,6 +3,7 @@ package client
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/netbirdio/netbird/relay/client/dialer/udp"
"io" "io"
"net" "net"
"sync" "sync"
@ -10,7 +11,6 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/relay/client/dialer/udp"
"github.com/netbirdio/netbird/relay/messages" "github.com/netbirdio/netbird/relay/messages"
) )
@ -19,6 +19,10 @@ const (
serverResponseTimeout = 8 * time.Second serverResponseTimeout = 8 * time.Second
) )
var (
reconnectingTimeout = 5 * time.Second
)
type Msg struct { type Msg struct {
buf []byte buf []byte
} }
@ -35,12 +39,15 @@ type Client struct {
serverAddress string serverAddress string
hashedID []byte hashedID []byte
conns map[string]*connContainer // todo handle it in thread safe way relayConnIsEstablished bool
conns map[string]*connContainer
connsMutext sync.Mutex // protect conns and relayConnIsEstablished bool
relayConn net.Conn relayConn net.Conn
relayConnState bool serviceIsRunning bool
wgRelayConn sync.WaitGroup wgRelayConn sync.WaitGroup
mu sync.Mutex mu sync.Mutex
onDisconnected chan struct{}
} }
func NewClient(ctx context.Context, serverAddress, peerID string) *Client { func NewClient(ctx context.Context, serverAddress, peerID string) *Client {
@ -53,38 +60,30 @@ func NewClient(ctx context.Context, serverAddress, peerID string) *Client {
serverAddress: serverAddress, serverAddress: serverAddress,
hashedID: hashedID, hashedID: hashedID,
conns: make(map[string]*connContainer), conns: make(map[string]*connContainer),
onDisconnected: make(chan struct{}),
} }
} }
func (c *Client) Connect() error { func (c *Client) Connect() error {
c.mu.Lock() c.mu.Lock()
if c.relayConnState { if c.serviceIsRunning {
c.mu.Unlock() c.mu.Unlock()
return nil return nil
} }
conn, err := udp.Dial(c.serverAddress) err := c.connect()
if err != nil { if err != nil {
return err
}
c.relayConn = conn
err = c.handShake()
if err != nil {
cErr := conn.Close()
if cErr != nil {
log.Errorf("failed to close connection: %s", cErr)
}
c.relayConn = nil
return err
}
c.relayConnState = true
c.mu.Unlock() c.mu.Unlock()
return err
}
c.serviceIsRunning = true
c.wgRelayConn.Add(1) c.wgRelayConn.Add(1)
go c.readLoop() go c.readLoop()
c.mu.Unlock()
go func() { go func() {
<-c.ctx.Done() <-c.ctx.Done()
cErr := c.close() cErr := c.close()
@ -93,13 +92,50 @@ func (c *Client) Connect() error {
} }
}() }()
go c.reconnectGuard()
return nil return nil
} }
func (c *Client) reconnectGuard() {
for {
c.wgRelayConn.Wait()
c.mu.Lock()
if !c.serviceIsRunning {
c.mu.Unlock()
return
}
log.Infof("reconnecting to relay server")
err := c.connect()
if err != nil {
log.Errorf("failed to reconnect to relay server: %s", err)
c.mu.Unlock()
time.Sleep(reconnectingTimeout)
continue
}
log.Infof("reconnected to relay server")
c.wgRelayConn.Add(1)
go c.readLoop()
c.mu.Unlock()
}
}
func (c *Client) OpenConn(dstPeerID string) (net.Conn, error) { func (c *Client) OpenConn(dstPeerID string) (net.Conn, error) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
if !c.relayConnState {
c.connsMutext.Lock()
defer c.connsMutext.Unlock()
if !c.relayConnIsEstablished {
return nil, fmt.Errorf("relay connection is not established")
}
if !c.serviceIsRunning {
return nil, fmt.Errorf("relay connection is not established") return nil, fmt.Errorf("relay connection is not established")
} }
@ -120,26 +156,41 @@ func (c *Client) Close() error {
return c.close() return c.close()
} }
func (c *Client) connect() error {
conn, err := udp.Dial(c.serverAddress)
if err != nil {
return err
}
c.relayConn = conn
err = c.handShake()
if err != nil {
cErr := conn.Close()
if cErr != nil {
log.Errorf("failed to close connection: %s", cErr)
}
c.relayConn = nil
return err
}
c.relayConnIsEstablished = true
return nil
}
func (c *Client) close() error { func (c *Client) close() error {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
if !c.relayConnState { if !c.serviceIsRunning {
return nil return nil
} }
c.relayConnState = false c.serviceIsRunning = false
err := c.relayConn.Close() err := c.relayConn.Close()
c.wgRelayConn.Wait() c.wgRelayConn.Wait()
// close all Conn types
for _, container := range c.conns {
close(container.messages)
}
c.conns = make(map[string]*connContainer)
return err return err
} }
@ -189,17 +240,13 @@ func (c *Client) handShake() error {
} }
func (c *Client) readLoop() { func (c *Client) readLoop() {
defer func() {
c.log.Tracef("exit from read loop")
c.wgRelayConn.Done()
}()
var errExit error var errExit error
var n int var n int
for { for {
buf := make([]byte, bufferSize) buf := make([]byte, bufferSize)
n, errExit = c.relayConn.Read(buf) n, errExit = c.relayConn.Read(buf)
if errExit != nil { if errExit != nil {
if c.relayConnState { if c.serviceIsRunning {
c.log.Debugf("failed to read message from relay server: %s", errExit) c.log.Debugf("failed to read message from relay server: %s", errExit)
} }
break break
@ -232,10 +279,20 @@ func (c *Client) readLoop() {
} }
} }
if c.relayConnState { if c.serviceIsRunning {
c.log.Errorf("failed to read message from relay server: %s", errExit)
_ = c.relayConn.Close() _ = c.relayConn.Close()
} }
c.connsMutext.Lock()
c.relayConnIsEstablished = false
for _, container := range c.conns {
close(container.messages)
}
c.conns = make(map[string]*connContainer)
c.connsMutext.Unlock()
c.log.Tracef("exit from read loop")
c.wgRelayConn.Done()
} }
func (c *Client) writeTo(id string, dstID []byte, payload []byte) (int, error) { func (c *Client) writeTo(id string, dstID []byte, payload []byte) (int, error) {
@ -275,6 +332,9 @@ func (c *Client) closeConn(id string) error {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
c.connsMutext.Lock()
defer c.connsMutext.Unlock()
conn, ok := c.conns[id] conn, ok := c.conns[id]
if !ok { if !ok {
return fmt.Errorf("connection already closed") return fmt.Errorf("connection already closed")

View File

@ -1,16 +1,16 @@
package test package client
import ( import (
"context" "context"
"net" "net"
"os" "os"
"testing" "testing"
"time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/util" "github.com/netbirdio/netbird/util"
"github.com/netbirdio/netbird/relay/client"
"github.com/netbirdio/netbird/relay/server" "github.com/netbirdio/netbird/relay/server"
) )
@ -39,21 +39,21 @@ func TestClient(t *testing.T) {
} }
}() }()
clientAlice := client.NewClient(ctx, addr, "alice") clientAlice := NewClient(ctx, addr, "alice")
err := clientAlice.Connect() err := clientAlice.Connect()
if err != nil { if err != nil {
t.Fatalf("failed to connect to server: %s", err) t.Fatalf("failed to connect to server: %s", err)
} }
defer clientAlice.Close() defer clientAlice.Close()
clientPlaceHolder := client.NewClient(ctx, addr, "clientPlaceHolder") clientPlaceHolder := NewClient(ctx, addr, "clientPlaceHolder")
err = clientPlaceHolder.Connect() err = clientPlaceHolder.Connect()
if err != nil { if err != nil {
t.Fatalf("failed to connect to server: %s", err) t.Fatalf("failed to connect to server: %s", err)
} }
defer clientPlaceHolder.Close() defer clientPlaceHolder.Close()
clientBob := client.NewClient(ctx, addr, "bob") clientBob := NewClient(ctx, addr, "bob")
err = clientBob.Connect() err = clientBob.Connect()
if err != nil { if err != nil {
t.Fatalf("failed to connect to server: %s", err) t.Fatalf("failed to connect to server: %s", err)
@ -107,7 +107,7 @@ func TestRegistration(t *testing.T) {
} }
}() }()
clientAlice := client.NewClient(ctx, addr, "alice") clientAlice := NewClient(ctx, addr, "alice")
err := clientAlice.Connect() err := clientAlice.Connect()
if err != nil { if err != nil {
t.Fatalf("failed to connect to server: %s", err) t.Fatalf("failed to connect to server: %s", err)
@ -140,7 +140,7 @@ func TestRegistrationTimeout(t *testing.T) {
} }
defer tcpListener.Close() defer tcpListener.Close()
clientAlice := client.NewClient(ctx, "127.0.0.1:1234", "alice") clientAlice := NewClient(ctx, "127.0.0.1:1234", "alice")
err = clientAlice.Connect() err = clientAlice.Connect()
if err == nil { if err == nil {
t.Errorf("failed to connect to server: %s", err) t.Errorf("failed to connect to server: %s", err)
@ -173,7 +173,7 @@ func TestEcho(t *testing.T) {
} }
}() }()
clientAlice := client.NewClient(ctx, addr, idAlice) clientAlice := NewClient(ctx, addr, idAlice)
err := clientAlice.Connect() err := clientAlice.Connect()
if err != nil { if err != nil {
t.Fatalf("failed to connect to server: %s", err) t.Fatalf("failed to connect to server: %s", err)
@ -185,7 +185,7 @@ func TestEcho(t *testing.T) {
} }
}() }()
clientBob := client.NewClient(ctx, addr, idBob) clientBob := NewClient(ctx, addr, idBob)
err = clientBob.Connect() err = clientBob.Connect()
if err != nil { if err != nil {
t.Fatalf("failed to connect to server: %s", err) t.Fatalf("failed to connect to server: %s", err)
@ -254,7 +254,7 @@ func TestBindToUnavailabePeer(t *testing.T) {
} }
}() }()
clientAlice := client.NewClient(ctx, addr, "alice") clientAlice := NewClient(ctx, addr, "alice")
err := clientAlice.Connect() err := clientAlice.Connect()
if err != nil { if err != nil {
t.Errorf("failed to connect to server: %s", err) t.Errorf("failed to connect to server: %s", err)
@ -293,7 +293,7 @@ func TestBindReconnect(t *testing.T) {
} }
}() }()
clientAlice := client.NewClient(ctx, addr, "alice") clientAlice := NewClient(ctx, addr, "alice")
err := clientAlice.Connect() err := clientAlice.Connect()
if err != nil { if err != nil {
t.Errorf("failed to connect to server: %s", err) t.Errorf("failed to connect to server: %s", err)
@ -304,7 +304,7 @@ func TestBindReconnect(t *testing.T) {
t.Errorf("failed to bind channel: %s", err) t.Errorf("failed to bind channel: %s", err)
} }
clientBob := client.NewClient(ctx, addr, "bob") clientBob := NewClient(ctx, addr, "bob")
err = clientBob.Connect() err = clientBob.Connect()
if err != nil { if err != nil {
t.Errorf("failed to connect to server: %s", err) t.Errorf("failed to connect to server: %s", err)
@ -321,7 +321,7 @@ func TestBindReconnect(t *testing.T) {
t.Errorf("failed to close client: %s", err) t.Errorf("failed to close client: %s", err)
} }
clientAlice = client.NewClient(ctx, addr, "alice") clientAlice = NewClient(ctx, addr, "alice")
err = clientAlice.Connect() err = clientAlice.Connect()
if err != nil { if err != nil {
t.Errorf("failed to connect to server: %s", err) t.Errorf("failed to connect to server: %s", err)
@ -375,7 +375,7 @@ func TestCloseConn(t *testing.T) {
} }
}() }()
clientAlice := client.NewClient(ctx, addr, "alice") clientAlice := NewClient(ctx, addr, "alice")
err := clientAlice.Connect() err := clientAlice.Connect()
if err != nil { if err != nil {
t.Errorf("failed to connect to server: %s", err) t.Errorf("failed to connect to server: %s", err)
@ -402,3 +402,92 @@ func TestCloseConn(t *testing.T) {
t.Errorf("unexpected writing from closed connection") t.Errorf("unexpected writing from closed connection")
} }
} }
func TestAutoReconnect(t *testing.T) {
ctx := context.Background()
addr := "localhost:1234"
srv := server.NewServer()
go func() {
err := srv.Listen(addr)
if err != nil {
t.Errorf("failed to bind server: %s", err)
}
}()
defer func() {
err := srv.Close()
if err != nil {
log.Errorf("failed to close server: %s", err)
}
}()
clientAlice := NewClient(ctx, addr, "alice")
err := clientAlice.Connect()
if err != nil {
t.Errorf("failed to connect to server: %s", err)
}
conn, err := clientAlice.OpenConn("bob")
if err != nil {
t.Errorf("failed to bind channel: %s", err)
}
_ = clientAlice.relayConn.Close()
_, err = conn.Read(make([]byte, 1))
if err == nil {
t.Errorf("unexpected reading from closed connection")
}
log.Infof("waiting for reconnection")
time.Sleep(reconnectingTimeout)
_, err = clientAlice.OpenConn("bob")
if err != nil {
t.Errorf("failed to open channel: %s", err)
}
}
func TestCloseRelayConn(t *testing.T) {
ctx := context.Background()
addr := "localhost:1234"
srv := server.NewServer()
go func() {
err := srv.Listen(addr)
if err != nil {
t.Errorf("failed to bind server: %s", err)
}
}()
defer func() {
err := srv.Close()
if err != nil {
log.Errorf("failed to close server: %s", err)
}
}()
clientAlice := NewClient(ctx, addr, "alice")
err := clientAlice.Connect()
if err != nil {
t.Errorf("failed to connect to server: %s", err)
}
conn, err := clientAlice.OpenConn("bob")
if err != nil {
t.Errorf("failed to bind channel: %s", err)
}
_ = clientAlice.relayConn.Close()
_, err = conn.Read(make([]byte, 1))
if err == nil {
t.Errorf("unexpected reading from closed connection")
}
_, err = clientAlice.OpenConn("bob")
if err == nil {
t.Errorf("unexpected opening connection to closed server")
}
}