netbird/relay/client/manager.go

263 lines
6.8 KiB
Go
Raw Normal View History

2024-05-23 13:24:02 +02:00
package client
import (
"context"
2024-05-29 16:40:26 +02:00
"fmt"
2024-05-26 22:14:33 +02:00
"net"
"sync"
"time"
2024-06-01 11:48:15 +02:00
log "github.com/sirupsen/logrus"
2024-05-23 13:24:02 +02:00
)
var (
relayCleanupInterval = 60 * time.Second
errRelayClientNotConnected = fmt.Errorf("relay client not connected")
)
// RelayTrack hold the relay clients for the foregin relay servers.
// With the mutex can ensure we can open new connection in case the relay connection has been established with
// the relay server.
2024-06-01 11:48:15 +02:00
type RelayTrack struct {
sync.RWMutex
relayClient *Client
}
func NewRelayTrack() *RelayTrack {
return &RelayTrack{}
}
// Manager is a manager for the relay client. It establish one persistent connection to the given relay server. In case
// of network error the manager will try to reconnect to the server.
// The manager also manage temproary relay connection. If a client wants to communicate with an another client on a
// different relay server, the manager will establish a new connection to the relay server. The connection with these
// relay servers will be closed if there is no active connection. Periodically the manager will check if there is any
// unused relay connection and close it.
2024-05-23 13:24:02 +02:00
type Manager struct {
2024-07-02 11:57:17 +02:00
ctx context.Context
serverURL string
peerID string
2024-05-23 13:24:02 +02:00
relayClient *Client
reconnectGuard *Guard
2024-05-23 13:24:02 +02:00
2024-06-01 11:48:15 +02:00
relayClients map[string]*RelayTrack
relayClientsMutex sync.RWMutex
2024-06-25 15:13:08 +02:00
onDisconnectedListeners map[string]map[*func()]struct{}
listenerLock sync.Mutex
2024-05-23 13:24:02 +02:00
}
2024-07-02 11:57:17 +02:00
func NewManager(ctx context.Context, serverURL string, peerID string) *Manager {
2024-05-23 13:24:02 +02:00
return &Manager{
2024-06-25 15:13:08 +02:00
ctx: ctx,
2024-07-02 11:57:17 +02:00
serverURL: serverURL,
2024-06-25 15:13:08 +02:00
peerID: peerID,
relayClients: make(map[string]*RelayTrack),
onDisconnectedListeners: make(map[string]map[*func()]struct{}),
2024-05-23 13:24:02 +02:00
}
}
// Serve starts the manager. It will establish a connection to the relay server and start the relay cleanup loop.
func (m *Manager) Serve() error {
if m.relayClient != nil {
return fmt.Errorf("manager already serving")
}
2024-07-02 11:57:17 +02:00
m.relayClient = NewClient(m.ctx, m.serverURL, m.peerID)
2024-05-29 16:40:26 +02:00
err := m.relayClient.Connect()
if err != nil {
log.Errorf("failed to connect to relay server: %s", err)
return err
2024-05-23 13:24:02 +02:00
}
m.reconnectGuard = NewGuard(m.ctx, m.relayClient)
2024-06-25 15:13:08 +02:00
m.relayClient.SetOnDisconnectListener(func() {
2024-07-02 11:57:17 +02:00
m.onServerDisconnected(m.serverURL)
2024-06-25 15:13:08 +02:00
})
m.startCleanupLoop()
return nil
2024-05-26 22:14:33 +02:00
}
// OpenConn opens a connection to the given peer key. If the peer is on the same relay server, the connection will be
// established via the relay server. If the peer is on a different relay server, the manager will establish a new
// connection to the relay server.
2024-06-25 15:13:08 +02:00
func (m *Manager) OpenConn(serverAddress, peerKey string, onClosedListener func()) (net.Conn, error) {
2024-05-29 16:40:26 +02:00
if m.relayClient == nil {
return nil, errRelayClientNotConnected
2024-05-29 16:40:26 +02:00
}
foreign, err := m.isForeignServer(serverAddress)
2024-05-29 16:40:26 +02:00
if err != nil {
return nil, err
2024-05-29 16:40:26 +02:00
}
2024-06-25 15:13:08 +02:00
var (
netConn net.Conn
)
2024-06-01 11:48:15 +02:00
if !foreign {
2024-06-20 18:17:30 +02:00
log.Debugf("open peer connection via permanent server: %s", peerKey)
2024-06-25 15:13:08 +02:00
netConn, err = m.relayClient.OpenConn(peerKey)
2024-06-01 11:48:15 +02:00
} else {
2024-06-20 18:17:30 +02:00
log.Debugf("open peer connection via foreign server: %s", serverAddress)
2024-06-25 15:13:08 +02:00
netConn, err = m.openConnVia(serverAddress, peerKey)
}
if err != nil {
return nil, err
}
if onClosedListener != nil {
m.addListener(serverAddress, onClosedListener)
}
2024-06-25 15:13:08 +02:00
return netConn, err
2024-05-26 22:14:33 +02:00
}
2024-07-02 11:57:17 +02:00
// RelayInstanceAddress returns the address of the permanent relay server. It could change if the network connection is lost.
// This address will be sent to the target peer to choose the common relay server for the communication.
2024-07-02 11:57:17 +02:00
func (m *Manager) RelayInstanceAddress() (string, error) {
2024-05-29 16:40:26 +02:00
if m.relayClient == nil {
2024-07-02 11:57:17 +02:00
return "", errRelayClientNotConnected
2024-05-29 16:40:26 +02:00
}
2024-07-02 11:57:17 +02:00
return m.relayClient.ServerInstanceURL()
}
2024-05-29 16:40:26 +02:00
2024-06-18 11:10:17 +02:00
func (m *Manager) HasRelayAddress() bool {
2024-07-02 11:57:17 +02:00
return m.serverURL != ""
}
func (m *Manager) openConnVia(serverAddress, peerKey string) (net.Conn, error) {
// check if already has a connection to the desired relay server
2024-06-01 11:48:15 +02:00
m.relayClientsMutex.RLock()
rt, ok := m.relayClients[serverAddress]
2024-05-29 16:40:26 +02:00
if ok {
rt.RLock()
2024-06-01 11:48:15 +02:00
m.relayClientsMutex.RUnlock()
defer rt.RUnlock()
return rt.relayClient.OpenConn(peerKey)
2024-05-29 16:40:26 +02:00
}
2024-06-01 11:48:15 +02:00
m.relayClientsMutex.RUnlock()
// if not, establish a new connection but check it again (because changed the lock type) before starting the
// connection
2024-06-01 11:48:15 +02:00
m.relayClientsMutex.Lock()
rt, ok = m.relayClients[serverAddress]
if ok {
rt.RLock()
m.relayClientsMutex.Unlock()
defer rt.RUnlock()
return rt.relayClient.OpenConn(peerKey)
}
// create a new relay client and store it in the relayClients map
rt = NewRelayTrack()
rt.Lock()
2024-06-01 11:48:15 +02:00
m.relayClients[serverAddress] = rt
m.relayClientsMutex.Unlock()
2024-05-29 16:40:26 +02:00
2024-06-01 11:48:15 +02:00
relayClient := NewClient(m.ctx, serverAddress, m.peerID)
err := relayClient.Connect()
2024-05-29 16:40:26 +02:00
if err != nil {
2024-06-01 11:48:15 +02:00
rt.Unlock()
m.relayClientsMutex.Lock()
delete(m.relayClients, serverAddress)
m.relayClientsMutex.Unlock()
2024-05-29 16:40:26 +02:00
return nil, err
}
// if connection closed then delete the relay client from the list
relayClient.SetOnDisconnectListener(func() {
2024-06-25 15:13:08 +02:00
m.onServerDisconnected(serverAddress)
})
rt.relayClient = relayClient
2024-06-01 11:48:15 +02:00
rt.Unlock()
2024-05-29 16:40:26 +02:00
conn, err := relayClient.OpenConn(peerKey)
if err != nil {
return nil, err
}
return conn, nil
2024-05-23 13:24:02 +02:00
}
2024-06-25 15:13:08 +02:00
func (m *Manager) onServerDisconnected(serverAddress string) {
2024-07-02 11:57:17 +02:00
if serverAddress == m.serverURL {
2024-06-25 15:13:08 +02:00
m.reconnectGuard.OnDisconnected()
}
m.notifyOnDisconnectListeners(serverAddress)
}
func (m *Manager) isForeignServer(address string) (bool, error) {
2024-07-02 11:57:17 +02:00
rAddr, err := m.relayClient.ServerInstanceURL()
if err != nil {
return false, fmt.Errorf("relay client not connected")
}
2024-07-02 11:57:17 +02:00
return rAddr != address, nil
}
func (m *Manager) startCleanupLoop() {
if m.ctx.Err() != nil {
return
}
ticker := time.NewTicker(relayCleanupInterval)
go func() {
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C:
m.cleanUpUnusedRelays()
}
}
}()
}
func (m *Manager) cleanUpUnusedRelays() {
m.relayClientsMutex.Lock()
defer m.relayClientsMutex.Unlock()
for addr, rt := range m.relayClients {
rt.Lock()
if rt.relayClient.HasConns() {
rt.Unlock()
continue
}
rt.relayClient.SetOnDisconnectListener(nil)
go func() {
_ = rt.relayClient.Close()
}()
2024-06-25 15:13:08 +02:00
log.Debugf("clean up unused relay server connection: %s", addr)
delete(m.relayClients, addr)
rt.Unlock()
}
}
2024-06-25 15:13:08 +02:00
func (m *Manager) addListener(serverAddress string, onClosedListener func()) {
m.listenerLock.Lock()
l, ok := m.onDisconnectedListeners[serverAddress]
if !ok {
l = make(map[*func()]struct{})
}
l[&onClosedListener] = struct{}{}
m.onDisconnectedListeners[serverAddress] = l
m.listenerLock.Unlock()
}
func (m *Manager) notifyOnDisconnectListeners(serverAddress string) {
m.listenerLock.Lock()
l, ok := m.onDisconnectedListeners[serverAddress]
if !ok {
return
}
for f := range l {
go (*f)()
}
delete(m.onDisconnectedListeners, serverAddress)
m.listenerLock.Unlock()
}