2024-09-08 12:06:14 +02:00
|
|
|
package client
|
|
|
|
|
|
|
|
import (
|
|
|
|
"container/list"
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"net"
|
|
|
|
"reflect"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
|
|
|
|
relayAuth "github.com/netbirdio/netbird/relay/auth/hmac"
|
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
|
|
|
relayCleanupInterval = 60 * time.Second
|
2024-11-01 12:33:29 +01:00
|
|
|
keepUnusedServerTime = 5 * time.Second
|
2024-09-08 12:06:14 +02:00
|
|
|
|
|
|
|
ErrRelayClientNotConnected = fmt.Errorf("relay client not connected")
|
|
|
|
)
|
|
|
|
|
|
|
|
// RelayTrack hold the relay clients for the foreign relay servers.
|
|
|
|
// With the mutex can ensure we can open new connection in case the relay connection has been established with
|
|
|
|
// the relay server.
|
|
|
|
type RelayTrack struct {
|
|
|
|
sync.RWMutex
|
|
|
|
relayClient *Client
|
2024-09-09 18:12:32 +02:00
|
|
|
err error
|
2024-11-01 12:33:29 +01:00
|
|
|
created time.Time
|
2024-09-08 12:06:14 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func NewRelayTrack() *RelayTrack {
|
2024-11-01 12:33:29 +01:00
|
|
|
return &RelayTrack{
|
|
|
|
created: time.Now(),
|
|
|
|
}
|
2024-09-08 12:06:14 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
type OnServerCloseListener func()
|
|
|
|
|
|
|
|
// ManagerService is the interface for the relay manager.
|
|
|
|
type ManagerService interface {
|
|
|
|
Serve() error
|
|
|
|
OpenConn(serverAddress, peerKey string) (net.Conn, error)
|
|
|
|
AddCloseListener(serverAddress string, onClosedListener OnServerCloseListener) error
|
|
|
|
RelayInstanceAddress() (string, error)
|
|
|
|
ServerURLs() []string
|
|
|
|
HasRelayAddress() bool
|
|
|
|
UpdateToken(token *relayAuth.Token) error
|
|
|
|
}
|
|
|
|
|
|
|
|
// Manager is a manager for the relay client instances. It establishes one persistent connection to the given relay URL
|
|
|
|
// and automatically reconnect to them in case disconnection.
|
|
|
|
// The manager also manage temporary relay connection. If a client wants to communicate with a client on a
|
|
|
|
// different relay server, the manager will establish a new connection to the relay server. The connection with these
|
|
|
|
// relay servers will be closed if there is no active connection. Periodically the manager will check if there is any
|
|
|
|
// unused relay connection and close it.
|
|
|
|
type Manager struct {
|
2024-11-22 18:12:34 +01:00
|
|
|
ctx context.Context
|
|
|
|
peerID string
|
|
|
|
running bool
|
|
|
|
tokenStore *relayAuth.TokenStore
|
|
|
|
serverPicker *ServerPicker
|
2024-09-08 12:06:14 +02:00
|
|
|
|
2024-11-22 18:12:34 +01:00
|
|
|
relayClient *Client
|
|
|
|
// the guard logic can overwrite the relayClient variable, this mutex protect the usage of the variable
|
|
|
|
relayClientMu sync.Mutex
|
2024-09-08 12:06:14 +02:00
|
|
|
reconnectGuard *Guard
|
|
|
|
|
|
|
|
relayClients map[string]*RelayTrack
|
|
|
|
relayClientsMutex sync.RWMutex
|
|
|
|
|
|
|
|
onDisconnectedListeners map[string]*list.List
|
2024-10-24 11:43:14 +02:00
|
|
|
onReconnectedListenerFn func()
|
2024-09-08 12:06:14 +02:00
|
|
|
listenerLock sync.Mutex
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewManager creates a new manager instance.
|
|
|
|
// The serverURL address can be empty. In this case, the manager will not serve.
|
|
|
|
func NewManager(ctx context.Context, serverURLs []string, peerID string) *Manager {
|
2024-11-22 18:12:34 +01:00
|
|
|
tokenStore := &relayAuth.TokenStore{}
|
|
|
|
|
|
|
|
m := &Manager{
|
|
|
|
ctx: ctx,
|
|
|
|
peerID: peerID,
|
|
|
|
tokenStore: tokenStore,
|
|
|
|
serverPicker: &ServerPicker{
|
|
|
|
TokenStore: tokenStore,
|
|
|
|
PeerID: peerID,
|
|
|
|
},
|
2024-09-08 12:06:14 +02:00
|
|
|
relayClients: make(map[string]*RelayTrack),
|
|
|
|
onDisconnectedListeners: make(map[string]*list.List),
|
|
|
|
}
|
2024-11-22 18:12:34 +01:00
|
|
|
m.serverPicker.ServerURLs.Store(serverURLs)
|
|
|
|
m.reconnectGuard = NewGuard(m.serverPicker)
|
|
|
|
return m
|
2024-09-08 12:06:14 +02:00
|
|
|
}
|
|
|
|
|
2024-11-22 18:12:34 +01:00
|
|
|
// Serve starts the manager, attempting to establish a connection with the relay server.
|
|
|
|
// If the connection fails, it will keep trying to reconnect in the background.
|
|
|
|
// Additionally, it starts a cleanup loop to remove unused relay connections.
|
|
|
|
// The manager will automatically reconnect to the relay server in case of disconnection.
|
2024-09-08 12:06:14 +02:00
|
|
|
func (m *Manager) Serve() error {
|
2024-11-22 18:12:34 +01:00
|
|
|
if m.running {
|
2024-09-08 12:06:14 +02:00
|
|
|
return fmt.Errorf("manager already serving")
|
|
|
|
}
|
2024-11-22 18:12:34 +01:00
|
|
|
m.running = true
|
|
|
|
log.Debugf("starting relay client manager with %v relay servers", m.serverPicker.ServerURLs.Load())
|
2024-09-08 12:06:14 +02:00
|
|
|
|
2024-11-22 18:12:34 +01:00
|
|
|
client, err := m.serverPicker.PickServer(m.ctx)
|
2024-09-16 16:11:10 +02:00
|
|
|
if err != nil {
|
2024-11-22 18:12:34 +01:00
|
|
|
go m.reconnectGuard.StartReconnectTrys(m.ctx, nil)
|
|
|
|
} else {
|
|
|
|
m.storeClient(client)
|
2024-09-08 12:06:14 +02:00
|
|
|
}
|
2024-09-16 16:11:10 +02:00
|
|
|
|
2024-11-22 18:12:34 +01:00
|
|
|
go m.listenGuardEvent(m.ctx)
|
|
|
|
go m.startCleanupLoop()
|
|
|
|
return err
|
2024-09-08 12:06:14 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// OpenConn opens a connection to the given peer key. If the peer is on the same relay server, the connection will be
|
|
|
|
// established via the relay server. If the peer is on a different relay server, the manager will establish a new
|
|
|
|
// connection to the relay server. It returns back with a net.Conn what represent the remote peer connection.
|
|
|
|
func (m *Manager) OpenConn(serverAddress, peerKey string) (net.Conn, error) {
|
2024-11-22 18:12:34 +01:00
|
|
|
m.relayClientMu.Lock()
|
|
|
|
defer m.relayClientMu.Unlock()
|
|
|
|
|
2024-09-08 12:06:14 +02:00
|
|
|
if m.relayClient == nil {
|
|
|
|
return nil, ErrRelayClientNotConnected
|
|
|
|
}
|
|
|
|
|
|
|
|
foreign, err := m.isForeignServer(serverAddress)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
var (
|
|
|
|
netConn net.Conn
|
|
|
|
)
|
|
|
|
if !foreign {
|
|
|
|
log.Debugf("open peer connection via permanent server: %s", peerKey)
|
|
|
|
netConn, err = m.relayClient.OpenConn(peerKey)
|
|
|
|
} else {
|
|
|
|
log.Debugf("open peer connection via foreign server: %s", serverAddress)
|
|
|
|
netConn, err = m.openConnVia(serverAddress, peerKey)
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return netConn, err
|
|
|
|
}
|
|
|
|
|
2024-10-24 11:43:14 +02:00
|
|
|
// Ready returns true if the home Relay client is connected to the relay server.
|
|
|
|
func (m *Manager) Ready() bool {
|
2024-11-22 18:12:34 +01:00
|
|
|
m.relayClientMu.Lock()
|
|
|
|
defer m.relayClientMu.Unlock()
|
|
|
|
|
2024-10-24 11:43:14 +02:00
|
|
|
if m.relayClient == nil {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
return m.relayClient.Ready()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Manager) SetOnReconnectedListener(f func()) {
|
|
|
|
m.onReconnectedListenerFn = f
|
|
|
|
}
|
|
|
|
|
2024-09-08 12:06:14 +02:00
|
|
|
// AddCloseListener adds a listener to the given server instance address. The listener will be called if the connection
|
|
|
|
// closed.
|
|
|
|
func (m *Manager) AddCloseListener(serverAddress string, onClosedListener OnServerCloseListener) error {
|
2024-11-22 18:12:34 +01:00
|
|
|
m.relayClientMu.Lock()
|
|
|
|
defer m.relayClientMu.Unlock()
|
|
|
|
|
|
|
|
if m.relayClient == nil {
|
|
|
|
return ErrRelayClientNotConnected
|
|
|
|
}
|
|
|
|
|
2024-09-08 12:06:14 +02:00
|
|
|
foreign, err := m.isForeignServer(serverAddress)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
var listenerAddr string
|
|
|
|
if foreign {
|
|
|
|
listenerAddr = serverAddress
|
|
|
|
} else {
|
|
|
|
listenerAddr = m.relayClient.connectionURL
|
|
|
|
}
|
|
|
|
m.addListener(listenerAddr, onClosedListener)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// RelayInstanceAddress returns the address of the permanent relay server. It could change if the network connection is
|
|
|
|
// lost. This address will be sent to the target peer to choose the common relay server for the communication.
|
|
|
|
func (m *Manager) RelayInstanceAddress() (string, error) {
|
2024-11-22 18:12:34 +01:00
|
|
|
m.relayClientMu.Lock()
|
|
|
|
defer m.relayClientMu.Unlock()
|
|
|
|
|
2024-09-08 12:06:14 +02:00
|
|
|
if m.relayClient == nil {
|
|
|
|
return "", ErrRelayClientNotConnected
|
|
|
|
}
|
|
|
|
return m.relayClient.ServerInstanceURL()
|
|
|
|
}
|
|
|
|
|
|
|
|
// ServerURLs returns the addresses of the relay servers.
|
|
|
|
func (m *Manager) ServerURLs() []string {
|
2024-11-22 18:12:34 +01:00
|
|
|
return m.serverPicker.ServerURLs.Load().([]string)
|
2024-09-08 12:06:14 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// HasRelayAddress returns true if the manager is serving. With this method can check if the peer can communicate with
|
|
|
|
// Relay service.
|
|
|
|
func (m *Manager) HasRelayAddress() bool {
|
2024-11-22 18:12:34 +01:00
|
|
|
return len(m.serverPicker.ServerURLs.Load().([]string)) > 0
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Manager) UpdateServerURLs(serverURLs []string) {
|
|
|
|
log.Infof("update relay server URLs: %v", serverURLs)
|
|
|
|
m.serverPicker.ServerURLs.Store(serverURLs)
|
2024-09-08 12:06:14 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// UpdateToken updates the token in the token store.
|
|
|
|
func (m *Manager) UpdateToken(token *relayAuth.Token) error {
|
|
|
|
return m.tokenStore.UpdateToken(token)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Manager) openConnVia(serverAddress, peerKey string) (net.Conn, error) {
|
|
|
|
// check if already has a connection to the desired relay server
|
|
|
|
m.relayClientsMutex.RLock()
|
|
|
|
rt, ok := m.relayClients[serverAddress]
|
|
|
|
if ok {
|
|
|
|
rt.RLock()
|
|
|
|
m.relayClientsMutex.RUnlock()
|
|
|
|
defer rt.RUnlock()
|
2024-09-09 18:12:32 +02:00
|
|
|
if rt.err != nil {
|
|
|
|
return nil, rt.err
|
|
|
|
}
|
2024-09-08 12:06:14 +02:00
|
|
|
return rt.relayClient.OpenConn(peerKey)
|
|
|
|
}
|
|
|
|
m.relayClientsMutex.RUnlock()
|
|
|
|
|
|
|
|
// if not, establish a new connection but check it again (because changed the lock type) before starting the
|
|
|
|
// connection
|
|
|
|
m.relayClientsMutex.Lock()
|
|
|
|
rt, ok = m.relayClients[serverAddress]
|
|
|
|
if ok {
|
|
|
|
rt.RLock()
|
|
|
|
m.relayClientsMutex.Unlock()
|
|
|
|
defer rt.RUnlock()
|
2024-09-09 18:12:32 +02:00
|
|
|
if rt.err != nil {
|
|
|
|
return nil, rt.err
|
|
|
|
}
|
2024-09-08 12:06:14 +02:00
|
|
|
return rt.relayClient.OpenConn(peerKey)
|
|
|
|
}
|
|
|
|
|
|
|
|
// create a new relay client and store it in the relayClients map
|
|
|
|
rt = NewRelayTrack()
|
|
|
|
rt.Lock()
|
|
|
|
m.relayClients[serverAddress] = rt
|
|
|
|
m.relayClientsMutex.Unlock()
|
|
|
|
|
|
|
|
relayClient := NewClient(m.ctx, serverAddress, m.tokenStore, m.peerID)
|
|
|
|
err := relayClient.Connect()
|
|
|
|
if err != nil {
|
2024-09-09 18:12:32 +02:00
|
|
|
rt.err = err
|
2024-09-08 12:06:14 +02:00
|
|
|
rt.Unlock()
|
|
|
|
m.relayClientsMutex.Lock()
|
|
|
|
delete(m.relayClients, serverAddress)
|
|
|
|
m.relayClientsMutex.Unlock()
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
// if connection closed then delete the relay client from the list
|
2024-11-22 18:12:34 +01:00
|
|
|
relayClient.SetOnDisconnectListener(m.onServerDisconnected)
|
2024-09-08 12:06:14 +02:00
|
|
|
rt.relayClient = relayClient
|
|
|
|
rt.Unlock()
|
|
|
|
|
|
|
|
conn, err := relayClient.OpenConn(peerKey)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return conn, nil
|
|
|
|
}
|
|
|
|
|
2024-10-24 11:43:14 +02:00
|
|
|
func (m *Manager) onServerConnected() {
|
|
|
|
if m.onReconnectedListenerFn == nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
go m.onReconnectedListenerFn()
|
|
|
|
}
|
|
|
|
|
2024-11-22 18:12:34 +01:00
|
|
|
// onServerDisconnected start to reconnection for home server only
|
2024-09-08 12:06:14 +02:00
|
|
|
func (m *Manager) onServerDisconnected(serverAddress string) {
|
2024-11-22 18:12:34 +01:00
|
|
|
m.relayClientMu.Lock()
|
2024-09-08 12:06:14 +02:00
|
|
|
if serverAddress == m.relayClient.connectionURL {
|
2024-11-22 18:12:34 +01:00
|
|
|
go m.reconnectGuard.StartReconnectTrys(m.ctx, m.relayClient)
|
2024-09-08 12:06:14 +02:00
|
|
|
}
|
2024-11-22 18:12:34 +01:00
|
|
|
m.relayClientMu.Unlock()
|
2024-09-08 12:06:14 +02:00
|
|
|
|
|
|
|
m.notifyOnDisconnectListeners(serverAddress)
|
|
|
|
}
|
|
|
|
|
2024-11-22 18:12:34 +01:00
|
|
|
func (m *Manager) listenGuardEvent(ctx context.Context) {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case rc := <-m.reconnectGuard.OnNewRelayClient:
|
|
|
|
m.storeClient(rc)
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Manager) storeClient(client *Client) {
|
|
|
|
m.relayClientMu.Lock()
|
|
|
|
defer m.relayClientMu.Unlock()
|
|
|
|
|
|
|
|
m.relayClient = client
|
|
|
|
m.relayClient.SetOnConnectedListener(m.onServerConnected)
|
|
|
|
m.relayClient.SetOnDisconnectListener(m.onServerDisconnected)
|
|
|
|
}
|
|
|
|
|
2024-09-08 12:06:14 +02:00
|
|
|
func (m *Manager) isForeignServer(address string) (bool, error) {
|
|
|
|
rAddr, err := m.relayClient.ServerInstanceURL()
|
|
|
|
if err != nil {
|
|
|
|
return false, fmt.Errorf("relay client not connected")
|
|
|
|
}
|
|
|
|
return rAddr != address, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Manager) startCleanupLoop() {
|
|
|
|
ticker := time.NewTicker(relayCleanupInterval)
|
2024-11-22 18:12:34 +01:00
|
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-m.ctx.Done():
|
|
|
|
return
|
|
|
|
case <-ticker.C:
|
|
|
|
m.cleanUpUnusedRelays()
|
2024-09-08 12:06:14 +02:00
|
|
|
}
|
2024-11-22 18:12:34 +01:00
|
|
|
}
|
2024-09-08 12:06:14 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Manager) cleanUpUnusedRelays() {
|
|
|
|
m.relayClientsMutex.Lock()
|
|
|
|
defer m.relayClientsMutex.Unlock()
|
|
|
|
|
|
|
|
for addr, rt := range m.relayClients {
|
|
|
|
rt.Lock()
|
2024-11-01 12:33:29 +01:00
|
|
|
// if the connection failed to the server the relay client will be nil
|
|
|
|
// but the instance will be kept in the relayClients until the next locking
|
|
|
|
if rt.err != nil {
|
|
|
|
rt.Unlock()
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
if time.Since(rt.created) <= keepUnusedServerTime {
|
|
|
|
rt.Unlock()
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2024-09-08 12:06:14 +02:00
|
|
|
if rt.relayClient.HasConns() {
|
|
|
|
rt.Unlock()
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
rt.relayClient.SetOnDisconnectListener(nil)
|
|
|
|
go func() {
|
|
|
|
_ = rt.relayClient.Close()
|
|
|
|
}()
|
|
|
|
log.Debugf("clean up unused relay server connection: %s", addr)
|
|
|
|
delete(m.relayClients, addr)
|
|
|
|
rt.Unlock()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Manager) addListener(serverAddress string, onClosedListener OnServerCloseListener) {
|
|
|
|
m.listenerLock.Lock()
|
|
|
|
defer m.listenerLock.Unlock()
|
|
|
|
l, ok := m.onDisconnectedListeners[serverAddress]
|
|
|
|
if !ok {
|
|
|
|
l = list.New()
|
|
|
|
}
|
|
|
|
for e := l.Front(); e != nil; e = e.Next() {
|
|
|
|
if reflect.ValueOf(e.Value).Pointer() == reflect.ValueOf(onClosedListener).Pointer() {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
l.PushBack(onClosedListener)
|
|
|
|
m.onDisconnectedListeners[serverAddress] = l
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Manager) notifyOnDisconnectListeners(serverAddress string) {
|
|
|
|
m.listenerLock.Lock()
|
|
|
|
defer m.listenerLock.Unlock()
|
|
|
|
|
|
|
|
l, ok := m.onDisconnectedListeners[serverAddress]
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
for e := l.Front(); e != nil; e = e.Next() {
|
|
|
|
go e.Value.(OnServerCloseListener)()
|
|
|
|
}
|
|
|
|
delete(m.onDisconnectedListeners, serverAddress)
|
|
|
|
}
|