mirror of
https://github.com/netbirdio/netbird.git
synced 2025-06-22 10:41:24 +02:00
- add comments
- avoid double closing messages - add cleanup routine for relay manager
This commit is contained in:
parent
15818b72c6
commit
a40d4d2f32
@ -35,7 +35,6 @@ type connContainer struct {
|
|||||||
type Client struct {
|
type Client struct {
|
||||||
log *log.Entry
|
log *log.Entry
|
||||||
parentCtx context.Context
|
parentCtx context.Context
|
||||||
ctx context.Context
|
|
||||||
ctxCancel context.CancelFunc
|
ctxCancel context.CancelFunc
|
||||||
serverAddress string
|
serverAddress string
|
||||||
hashedID []byte
|
hashedID []byte
|
||||||
@ -66,13 +65,6 @@ func NewClient(ctx context.Context, serverAddress, peerID string) *Client {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetOnDisconnectListener sets a function that will be called when the connection to the relay server is closed.
|
|
||||||
func (c *Client) SetOnDisconnectListener(fn func()) {
|
|
||||||
c.listenerMutex.Lock()
|
|
||||||
defer c.listenerMutex.Unlock()
|
|
||||||
c.onDisconnectListener = fn
|
|
||||||
}
|
|
||||||
|
|
||||||
// Connect establishes a connection to the relay server. It blocks until the connection is established or an error occurs.
|
// Connect establishes a connection to the relay server. It blocks until the connection is established or an error occurs.
|
||||||
func (c *Client) Connect() error {
|
func (c *Client) Connect() error {
|
||||||
c.readLoopMutex.Lock()
|
c.readLoopMutex.Lock()
|
||||||
@ -92,8 +84,9 @@ func (c *Client) Connect() error {
|
|||||||
|
|
||||||
c.serviceIsRunning = true
|
c.serviceIsRunning = true
|
||||||
|
|
||||||
c.ctx, c.ctxCancel = context.WithCancel(c.parentCtx)
|
var ctx context.Context
|
||||||
context.AfterFunc(c.ctx, func() {
|
ctx, c.ctxCancel = context.WithCancel(c.parentCtx)
|
||||||
|
context.AfterFunc(ctx, func() {
|
||||||
cErr := c.Close()
|
cErr := c.Close()
|
||||||
if cErr != nil {
|
if cErr != nil {
|
||||||
log.Errorf("failed to close relay connection: %s", cErr)
|
log.Errorf("failed to close relay connection: %s", cErr)
|
||||||
@ -136,6 +129,19 @@ func (c *Client) RelayRemoteAddress() (net.Addr, error) {
|
|||||||
return c.remoteAddr, nil
|
return c.remoteAddr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetOnDisconnectListener sets a function that will be called when the connection to the relay server is closed.
|
||||||
|
func (c *Client) SetOnDisconnectListener(fn func()) {
|
||||||
|
c.listenerMutex.Lock()
|
||||||
|
defer c.listenerMutex.Unlock()
|
||||||
|
c.onDisconnectListener = fn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) HasConns() bool {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
return len(c.conns) > 0
|
||||||
|
}
|
||||||
|
|
||||||
// Close closes the connection to the relay server and all connections to other peers.
|
// Close closes the connection to the relay server and all connections to other peers.
|
||||||
func (c *Client) Close() error {
|
func (c *Client) Close() error {
|
||||||
c.readLoopMutex.Lock()
|
c.readLoopMutex.Lock()
|
||||||
@ -143,14 +149,17 @@ func (c *Client) Close() error {
|
|||||||
|
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
var err error
|
var err error
|
||||||
if c.serviceIsRunning {
|
if !c.serviceIsRunning {
|
||||||
c.serviceIsRunning = false
|
return nil
|
||||||
err = c.relayConn.Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.serviceIsRunning = false
|
||||||
|
err = c.relayConn.Close()
|
||||||
c.closeAllConns()
|
c.closeAllConns()
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
|
|
||||||
c.wgReadLoop.Wait()
|
c.wgReadLoop.Wait()
|
||||||
|
c.log.Infof("relay client ha been closed: %s", c.serverAddress)
|
||||||
c.ctxCancel()
|
c.ctxCancel()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -5,10 +5,18 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
relayCleanupInterval = 60 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
// RelayTrack hold the relay clients for the foregin relay servers.
|
||||||
|
// With the mutex can ensure we can open new connection in case the relay connection has been established with
|
||||||
|
// the relay server.
|
||||||
type RelayTrack struct {
|
type RelayTrack struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
relayClient *Client
|
relayClient *Client
|
||||||
@ -18,6 +26,12 @@ func NewRelayTrack() *RelayTrack {
|
|||||||
return &RelayTrack{}
|
return &RelayTrack{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Manager is a manager for the relay client. It establish one persistent connection to the given relay server. In case
|
||||||
|
// of network error the manager will try to reconnect to the server.
|
||||||
|
// The manager also manage temproary relay connection. If a client wants to communicate with an another client on a
|
||||||
|
// different relay server, the manager will establish a new connection to the relay server. The connection with these
|
||||||
|
// relay servers will be closed if there is no active connection. Periodically the manager will check if there is any
|
||||||
|
// unused relay connection and close it.
|
||||||
type Manager struct {
|
type Manager struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
srvAddress string
|
srvAddress string
|
||||||
@ -39,18 +53,26 @@ func NewManager(ctx context.Context, serverAddress string, peerID string) *Manag
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) Serve() error {
|
// Serve starts the manager. It will establish a connection to the relay server and start the relay cleanup loop.
|
||||||
|
// todo: consider to return an error if the initial connection to the relay server is not established.
|
||||||
|
func (m *Manager) Serve() {
|
||||||
m.relayClient = NewClient(m.ctx, m.srvAddress, m.peerID)
|
m.relayClient = NewClient(m.ctx, m.srvAddress, m.peerID)
|
||||||
m.reconnectGuard = NewGuard(m.ctx, m.relayClient)
|
m.reconnectGuard = NewGuard(m.ctx, m.relayClient)
|
||||||
m.relayClient.SetOnDisconnectListener(m.reconnectGuard.OnDisconnected)
|
m.relayClient.SetOnDisconnectListener(m.reconnectGuard.OnDisconnected)
|
||||||
err := m.relayClient.Connect()
|
err := m.relayClient.Connect()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
log.Errorf("failed to connect to relay server, keep try to reconnect: %s", err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
m.startCleanupLoop()
|
||||||
|
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// OpenConn opens a connection to the given peer key. If the peer is on the same relay server, the connection will be
|
||||||
|
// established via the relay server. If the peer is on a different relay server, the manager will establish a new
|
||||||
|
// connection to the relay server.
|
||||||
func (m *Manager) OpenConn(serverAddress, peerKey string) (net.Conn, error) {
|
func (m *Manager) OpenConn(serverAddress, peerKey string) (net.Conn, error) {
|
||||||
if m.relayClient == nil {
|
if m.relayClient == nil {
|
||||||
return nil, fmt.Errorf("relay client not connected")
|
return nil, fmt.Errorf("relay client not connected")
|
||||||
@ -68,6 +90,8 @@ func (m *Manager) OpenConn(serverAddress, peerKey string) (net.Conn, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RelayAddress returns the address of the permanent relay server. It could change if the network connection is lost.
|
||||||
|
// This address will be sent to the target peer to choose the common relay server for the communication.
|
||||||
func (m *Manager) RelayAddress() (net.Addr, error) {
|
func (m *Manager) RelayAddress() (net.Addr, error) {
|
||||||
if m.relayClient == nil {
|
if m.relayClient == nil {
|
||||||
return nil, fmt.Errorf("relay client not connected")
|
return nil, fmt.Errorf("relay client not connected")
|
||||||
@ -76,20 +100,31 @@ func (m *Manager) RelayAddress() (net.Addr, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) openConnVia(serverAddress, peerKey string) (net.Conn, error) {
|
func (m *Manager) openConnVia(serverAddress, peerKey string) (net.Conn, error) {
|
||||||
|
// check if already has a connection to the desired relay server
|
||||||
m.relayClientsMutex.RLock()
|
m.relayClientsMutex.RLock()
|
||||||
relayTrack, ok := m.relayClients[serverAddress]
|
rt, ok := m.relayClients[serverAddress]
|
||||||
if ok {
|
if ok {
|
||||||
relayTrack.RLock()
|
rt.RLock()
|
||||||
m.relayClientsMutex.RUnlock()
|
m.relayClientsMutex.RUnlock()
|
||||||
defer relayTrack.RUnlock()
|
defer rt.RUnlock()
|
||||||
return relayTrack.relayClient.OpenConn(peerKey)
|
return rt.relayClient.OpenConn(peerKey)
|
||||||
}
|
}
|
||||||
m.relayClientsMutex.RUnlock()
|
m.relayClientsMutex.RUnlock()
|
||||||
|
|
||||||
rt := NewRelayTrack()
|
// if not, establish a new connection but check it again (because changed the lock type) before starting the
|
||||||
rt.Lock()
|
// connection
|
||||||
|
|
||||||
m.relayClientsMutex.Lock()
|
m.relayClientsMutex.Lock()
|
||||||
|
rt, ok = m.relayClients[serverAddress]
|
||||||
|
if ok {
|
||||||
|
rt.RLock()
|
||||||
|
m.relayClientsMutex.Unlock()
|
||||||
|
defer rt.RUnlock()
|
||||||
|
return rt.relayClient.OpenConn(peerKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
// create a new relay client and store it in the relayClients map
|
||||||
|
rt = NewRelayTrack()
|
||||||
|
rt.Lock()
|
||||||
m.relayClients[serverAddress] = rt
|
m.relayClients[serverAddress] = rt
|
||||||
m.relayClientsMutex.Unlock()
|
m.relayClientsMutex.Unlock()
|
||||||
|
|
||||||
@ -102,25 +137,25 @@ func (m *Manager) openConnVia(serverAddress, peerKey string) (net.Conn, error) {
|
|||||||
m.relayClientsMutex.Unlock()
|
m.relayClientsMutex.Unlock()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
// if connection closed then delete the relay client from the list
|
||||||
relayClient.SetOnDisconnectListener(func() {
|
relayClient.SetOnDisconnectListener(func() {
|
||||||
m.deleteRelayConn(serverAddress)
|
m.deleteRelayConn(serverAddress)
|
||||||
})
|
})
|
||||||
|
rt.relayClient = relayClient
|
||||||
rt.Unlock()
|
rt.Unlock()
|
||||||
|
|
||||||
conn, err := relayClient.OpenConn(peerKey)
|
conn, err := relayClient.OpenConn(peerKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return conn, nil
|
return conn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) deleteRelayConn(address string) {
|
func (m *Manager) deleteRelayConn(address string) {
|
||||||
log.Infof("deleting relay client for %s", address)
|
log.Infof("deleting relay client for %s", address)
|
||||||
m.relayClientsMutex.Lock()
|
m.relayClientsMutex.Lock()
|
||||||
defer m.relayClientsMutex.Unlock()
|
|
||||||
|
|
||||||
delete(m.relayClients, address)
|
delete(m.relayClients, address)
|
||||||
|
m.relayClientsMutex.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) isForeignServer(address string) (bool, error) {
|
func (m *Manager) isForeignServer(address string) (bool, error) {
|
||||||
@ -130,3 +165,42 @@ func (m *Manager) isForeignServer(address string) (bool, error) {
|
|||||||
}
|
}
|
||||||
return rAddr.String() != address, nil
|
return rAddr.String() != address, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Manager) startCleanupLoop() {
|
||||||
|
if m.ctx.Err() != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ticker := time.NewTicker(relayCleanupInterval)
|
||||||
|
go func() {
|
||||||
|
defer ticker.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-m.ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
m.cleanUpUnusedRelays()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) cleanUpUnusedRelays() {
|
||||||
|
m.relayClientsMutex.Lock()
|
||||||
|
defer m.relayClientsMutex.Unlock()
|
||||||
|
|
||||||
|
for addr, rt := range m.relayClients {
|
||||||
|
rt.Lock()
|
||||||
|
if rt.relayClient.HasConns() {
|
||||||
|
rt.Unlock()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
rt.relayClient.SetOnDisconnectListener(nil)
|
||||||
|
go func() {
|
||||||
|
_ = rt.relayClient.Close()
|
||||||
|
}()
|
||||||
|
log.Debugf("clean up relay client: %s", addr)
|
||||||
|
delete(m.relayClients, addr)
|
||||||
|
rt.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -3,6 +3,7 @@ package client
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
@ -47,18 +48,13 @@ func TestForeignConn(t *testing.T) {
|
|||||||
idAlice := "alice"
|
idAlice := "alice"
|
||||||
log.Debugf("connect by alice")
|
log.Debugf("connect by alice")
|
||||||
clientAlice := NewManager(ctx, addr1, idAlice)
|
clientAlice := NewManager(ctx, addr1, idAlice)
|
||||||
err := clientAlice.Serve()
|
clientAlice.Serve()
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to connect to server: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
idBob := "bob"
|
idBob := "bob"
|
||||||
log.Debugf("connect by bob")
|
log.Debugf("connect by bob")
|
||||||
clientBob := NewManager(ctx, addr2, idBob)
|
clientBob := NewManager(ctx, addr2, idBob)
|
||||||
err = clientBob.Serve()
|
clientBob.Serve()
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to connect to server: %s", err)
|
|
||||||
}
|
|
||||||
bobsSrvAddr, err := clientBob.RelayAddress()
|
bobsSrvAddr, err := clientBob.RelayAddress()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to get relay address: %s", err)
|
t.Fatalf("failed to get relay address: %s", err)
|
||||||
@ -137,10 +133,7 @@ func TestForeginConnClose(t *testing.T) {
|
|||||||
idAlice := "alice"
|
idAlice := "alice"
|
||||||
log.Debugf("connect by alice")
|
log.Debugf("connect by alice")
|
||||||
clientAlice := NewManager(ctx, addr1, idAlice)
|
clientAlice := NewManager(ctx, addr1, idAlice)
|
||||||
err := clientAlice.Serve()
|
clientAlice.Serve()
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to connect to server: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
conn, err := clientAlice.OpenConn(addr2, "anotherpeer")
|
conn, err := clientAlice.OpenConn(addr2, "anotherpeer")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -154,3 +147,60 @@ func TestForeginConnClose(t *testing.T) {
|
|||||||
|
|
||||||
select {}
|
select {}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestForeginAutoClose(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
addr1 := "localhost:1234"
|
||||||
|
srv1 := server.NewServer()
|
||||||
|
go func() {
|
||||||
|
err := srv1.Listen(addr1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to bind server: %s", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
err := srv1.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("failed to close server: %s", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
addr2 := "localhost:2234"
|
||||||
|
srv2 := server.NewServer()
|
||||||
|
go func() {
|
||||||
|
err := srv2.Listen(addr2)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to bind server: %s", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
err := srv2.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("failed to close server: %s", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
idAlice := "alice"
|
||||||
|
log.Debugf("connect by alice")
|
||||||
|
mgr := NewManager(ctx, addr1, idAlice)
|
||||||
|
relayCleanupInterval = 2 * time.Second
|
||||||
|
mgr.Serve()
|
||||||
|
|
||||||
|
conn, err := mgr.OpenConn(addr2, "anotherpeer")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to bind channel: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = conn.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to close connection: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(relayCleanupInterval + 1*time.Second)
|
||||||
|
if len(mgr.relayClients) != 0 {
|
||||||
|
t.Errorf("expected 0, got %d", len(mgr.relayClients))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -79,6 +79,7 @@ func (c *Conn) Close() error {
|
|||||||
return c.Conn.Close(websocket.StatusNormalClosure, "")
|
return c.Conn.Close(websocket.StatusNormalClosure, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo: fix io.EOF handling
|
||||||
func ioErrHandling(err error) error {
|
func ioErrHandling(err error) error {
|
||||||
var wErr *websocket.CloseError
|
var wErr *websocket.CloseError
|
||||||
if !errors.As(err, &wErr) {
|
if !errors.As(err, &wErr) {
|
||||||
|
@ -18,7 +18,6 @@ import (
|
|||||||
// Server
|
// Server
|
||||||
// todo:
|
// todo:
|
||||||
// authentication: provide JWT token via RPC call. The MGM server can forward the token to the agents.
|
// authentication: provide JWT token via RPC call. The MGM server can forward the token to the agents.
|
||||||
// connection timeout handling
|
|
||||||
type Server struct {
|
type Server struct {
|
||||||
store *Store
|
store *Store
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user