mirror of
https://github.com/netbirdio/netbird.git
synced 2024-12-14 02:41:34 +01:00
2a5cb16494
Can support firewalls with restricted WS rules allow to run engine without Relay servers keep up to date Relay address changes
406 lines
11 KiB
Go
406 lines
11 KiB
Go
package client
|
|
|
|
import (
|
|
"container/list"
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"reflect"
|
|
"sync"
|
|
"time"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
relayAuth "github.com/netbirdio/netbird/relay/auth/hmac"
|
|
)
|
|
|
|
var (
|
|
relayCleanupInterval = 60 * time.Second
|
|
keepUnusedServerTime = 5 * time.Second
|
|
|
|
ErrRelayClientNotConnected = fmt.Errorf("relay client not connected")
|
|
)
|
|
|
|
// RelayTrack hold the relay clients for the foreign relay servers.
|
|
// With the mutex can ensure we can open new connection in case the relay connection has been established with
|
|
// the relay server.
|
|
type RelayTrack struct {
|
|
sync.RWMutex
|
|
relayClient *Client
|
|
err error
|
|
created time.Time
|
|
}
|
|
|
|
func NewRelayTrack() *RelayTrack {
|
|
return &RelayTrack{
|
|
created: time.Now(),
|
|
}
|
|
}
|
|
|
|
type OnServerCloseListener func()
|
|
|
|
// ManagerService is the interface for the relay manager.
|
|
type ManagerService interface {
|
|
Serve() error
|
|
OpenConn(serverAddress, peerKey string) (net.Conn, error)
|
|
AddCloseListener(serverAddress string, onClosedListener OnServerCloseListener) error
|
|
RelayInstanceAddress() (string, error)
|
|
ServerURLs() []string
|
|
HasRelayAddress() bool
|
|
UpdateToken(token *relayAuth.Token) error
|
|
}
|
|
|
|
// Manager is a manager for the relay client instances. It establishes one persistent connection to the given relay URL
|
|
// and automatically reconnect to them in case disconnection.
|
|
// The manager also manage temporary relay connection. If a client wants to communicate with a client on a
|
|
// different relay server, the manager will establish a new connection to the relay server. The connection with these
|
|
// relay servers will be closed if there is no active connection. Periodically the manager will check if there is any
|
|
// unused relay connection and close it.
|
|
type Manager struct {
|
|
ctx context.Context
|
|
peerID string
|
|
running bool
|
|
tokenStore *relayAuth.TokenStore
|
|
serverPicker *ServerPicker
|
|
|
|
relayClient *Client
|
|
// the guard logic can overwrite the relayClient variable, this mutex protect the usage of the variable
|
|
relayClientMu sync.Mutex
|
|
reconnectGuard *Guard
|
|
|
|
relayClients map[string]*RelayTrack
|
|
relayClientsMutex sync.RWMutex
|
|
|
|
onDisconnectedListeners map[string]*list.List
|
|
onReconnectedListenerFn func()
|
|
listenerLock sync.Mutex
|
|
}
|
|
|
|
// NewManager creates a new manager instance.
|
|
// The serverURL address can be empty. In this case, the manager will not serve.
|
|
func NewManager(ctx context.Context, serverURLs []string, peerID string) *Manager {
|
|
tokenStore := &relayAuth.TokenStore{}
|
|
|
|
m := &Manager{
|
|
ctx: ctx,
|
|
peerID: peerID,
|
|
tokenStore: tokenStore,
|
|
serverPicker: &ServerPicker{
|
|
TokenStore: tokenStore,
|
|
PeerID: peerID,
|
|
},
|
|
relayClients: make(map[string]*RelayTrack),
|
|
onDisconnectedListeners: make(map[string]*list.List),
|
|
}
|
|
m.serverPicker.ServerURLs.Store(serverURLs)
|
|
m.reconnectGuard = NewGuard(m.serverPicker)
|
|
return m
|
|
}
|
|
|
|
// Serve starts the manager, attempting to establish a connection with the relay server.
|
|
// If the connection fails, it will keep trying to reconnect in the background.
|
|
// Additionally, it starts a cleanup loop to remove unused relay connections.
|
|
// The manager will automatically reconnect to the relay server in case of disconnection.
|
|
func (m *Manager) Serve() error {
|
|
if m.running {
|
|
return fmt.Errorf("manager already serving")
|
|
}
|
|
m.running = true
|
|
log.Debugf("starting relay client manager with %v relay servers", m.serverPicker.ServerURLs.Load())
|
|
|
|
client, err := m.serverPicker.PickServer(m.ctx)
|
|
if err != nil {
|
|
go m.reconnectGuard.StartReconnectTrys(m.ctx, nil)
|
|
} else {
|
|
m.storeClient(client)
|
|
}
|
|
|
|
go m.listenGuardEvent(m.ctx)
|
|
go m.startCleanupLoop()
|
|
return err
|
|
}
|
|
|
|
// OpenConn opens a connection to the given peer key. If the peer is on the same relay server, the connection will be
|
|
// established via the relay server. If the peer is on a different relay server, the manager will establish a new
|
|
// connection to the relay server. It returns back with a net.Conn what represent the remote peer connection.
|
|
func (m *Manager) OpenConn(serverAddress, peerKey string) (net.Conn, error) {
|
|
m.relayClientMu.Lock()
|
|
defer m.relayClientMu.Unlock()
|
|
|
|
if m.relayClient == nil {
|
|
return nil, ErrRelayClientNotConnected
|
|
}
|
|
|
|
foreign, err := m.isForeignServer(serverAddress)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var (
|
|
netConn net.Conn
|
|
)
|
|
if !foreign {
|
|
log.Debugf("open peer connection via permanent server: %s", peerKey)
|
|
netConn, err = m.relayClient.OpenConn(peerKey)
|
|
} else {
|
|
log.Debugf("open peer connection via foreign server: %s", serverAddress)
|
|
netConn, err = m.openConnVia(serverAddress, peerKey)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return netConn, err
|
|
}
|
|
|
|
// Ready returns true if the home Relay client is connected to the relay server.
|
|
func (m *Manager) Ready() bool {
|
|
m.relayClientMu.Lock()
|
|
defer m.relayClientMu.Unlock()
|
|
|
|
if m.relayClient == nil {
|
|
return false
|
|
}
|
|
return m.relayClient.Ready()
|
|
}
|
|
|
|
func (m *Manager) SetOnReconnectedListener(f func()) {
|
|
m.onReconnectedListenerFn = f
|
|
}
|
|
|
|
// AddCloseListener adds a listener to the given server instance address. The listener will be called if the connection
|
|
// closed.
|
|
func (m *Manager) AddCloseListener(serverAddress string, onClosedListener OnServerCloseListener) error {
|
|
m.relayClientMu.Lock()
|
|
defer m.relayClientMu.Unlock()
|
|
|
|
if m.relayClient == nil {
|
|
return ErrRelayClientNotConnected
|
|
}
|
|
|
|
foreign, err := m.isForeignServer(serverAddress)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var listenerAddr string
|
|
if foreign {
|
|
listenerAddr = serverAddress
|
|
} else {
|
|
listenerAddr = m.relayClient.connectionURL
|
|
}
|
|
m.addListener(listenerAddr, onClosedListener)
|
|
return nil
|
|
}
|
|
|
|
// RelayInstanceAddress returns the address of the permanent relay server. It could change if the network connection is
|
|
// lost. This address will be sent to the target peer to choose the common relay server for the communication.
|
|
func (m *Manager) RelayInstanceAddress() (string, error) {
|
|
m.relayClientMu.Lock()
|
|
defer m.relayClientMu.Unlock()
|
|
|
|
if m.relayClient == nil {
|
|
return "", ErrRelayClientNotConnected
|
|
}
|
|
return m.relayClient.ServerInstanceURL()
|
|
}
|
|
|
|
// ServerURLs returns the addresses of the relay servers.
|
|
func (m *Manager) ServerURLs() []string {
|
|
return m.serverPicker.ServerURLs.Load().([]string)
|
|
}
|
|
|
|
// HasRelayAddress returns true if the manager is serving. With this method can check if the peer can communicate with
|
|
// Relay service.
|
|
func (m *Manager) HasRelayAddress() bool {
|
|
return len(m.serverPicker.ServerURLs.Load().([]string)) > 0
|
|
}
|
|
|
|
func (m *Manager) UpdateServerURLs(serverURLs []string) {
|
|
log.Infof("update relay server URLs: %v", serverURLs)
|
|
m.serverPicker.ServerURLs.Store(serverURLs)
|
|
}
|
|
|
|
// UpdateToken updates the token in the token store.
|
|
func (m *Manager) UpdateToken(token *relayAuth.Token) error {
|
|
return m.tokenStore.UpdateToken(token)
|
|
}
|
|
|
|
func (m *Manager) openConnVia(serverAddress, peerKey string) (net.Conn, error) {
|
|
// check if already has a connection to the desired relay server
|
|
m.relayClientsMutex.RLock()
|
|
rt, ok := m.relayClients[serverAddress]
|
|
if ok {
|
|
rt.RLock()
|
|
m.relayClientsMutex.RUnlock()
|
|
defer rt.RUnlock()
|
|
if rt.err != nil {
|
|
return nil, rt.err
|
|
}
|
|
return rt.relayClient.OpenConn(peerKey)
|
|
}
|
|
m.relayClientsMutex.RUnlock()
|
|
|
|
// if not, establish a new connection but check it again (because changed the lock type) before starting the
|
|
// connection
|
|
m.relayClientsMutex.Lock()
|
|
rt, ok = m.relayClients[serverAddress]
|
|
if ok {
|
|
rt.RLock()
|
|
m.relayClientsMutex.Unlock()
|
|
defer rt.RUnlock()
|
|
if rt.err != nil {
|
|
return nil, rt.err
|
|
}
|
|
return rt.relayClient.OpenConn(peerKey)
|
|
}
|
|
|
|
// create a new relay client and store it in the relayClients map
|
|
rt = NewRelayTrack()
|
|
rt.Lock()
|
|
m.relayClients[serverAddress] = rt
|
|
m.relayClientsMutex.Unlock()
|
|
|
|
relayClient := NewClient(m.ctx, serverAddress, m.tokenStore, m.peerID)
|
|
err := relayClient.Connect()
|
|
if err != nil {
|
|
rt.err = err
|
|
rt.Unlock()
|
|
m.relayClientsMutex.Lock()
|
|
delete(m.relayClients, serverAddress)
|
|
m.relayClientsMutex.Unlock()
|
|
return nil, err
|
|
}
|
|
// if connection closed then delete the relay client from the list
|
|
relayClient.SetOnDisconnectListener(m.onServerDisconnected)
|
|
rt.relayClient = relayClient
|
|
rt.Unlock()
|
|
|
|
conn, err := relayClient.OpenConn(peerKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return conn, nil
|
|
}
|
|
|
|
func (m *Manager) onServerConnected() {
|
|
if m.onReconnectedListenerFn == nil {
|
|
return
|
|
}
|
|
go m.onReconnectedListenerFn()
|
|
}
|
|
|
|
// onServerDisconnected start to reconnection for home server only
|
|
func (m *Manager) onServerDisconnected(serverAddress string) {
|
|
m.relayClientMu.Lock()
|
|
if serverAddress == m.relayClient.connectionURL {
|
|
go m.reconnectGuard.StartReconnectTrys(m.ctx, m.relayClient)
|
|
}
|
|
m.relayClientMu.Unlock()
|
|
|
|
m.notifyOnDisconnectListeners(serverAddress)
|
|
}
|
|
|
|
func (m *Manager) listenGuardEvent(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case rc := <-m.reconnectGuard.OnNewRelayClient:
|
|
m.storeClient(rc)
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) storeClient(client *Client) {
|
|
m.relayClientMu.Lock()
|
|
defer m.relayClientMu.Unlock()
|
|
|
|
m.relayClient = client
|
|
m.relayClient.SetOnConnectedListener(m.onServerConnected)
|
|
m.relayClient.SetOnDisconnectListener(m.onServerDisconnected)
|
|
}
|
|
|
|
func (m *Manager) isForeignServer(address string) (bool, error) {
|
|
rAddr, err := m.relayClient.ServerInstanceURL()
|
|
if err != nil {
|
|
return false, fmt.Errorf("relay client not connected")
|
|
}
|
|
return rAddr != address, nil
|
|
}
|
|
|
|
func (m *Manager) startCleanupLoop() {
|
|
ticker := time.NewTicker(relayCleanupInterval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-m.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
m.cleanUpUnusedRelays()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) cleanUpUnusedRelays() {
|
|
m.relayClientsMutex.Lock()
|
|
defer m.relayClientsMutex.Unlock()
|
|
|
|
for addr, rt := range m.relayClients {
|
|
rt.Lock()
|
|
// if the connection failed to the server the relay client will be nil
|
|
// but the instance will be kept in the relayClients until the next locking
|
|
if rt.err != nil {
|
|
rt.Unlock()
|
|
continue
|
|
}
|
|
|
|
if time.Since(rt.created) <= keepUnusedServerTime {
|
|
rt.Unlock()
|
|
continue
|
|
}
|
|
|
|
if rt.relayClient.HasConns() {
|
|
rt.Unlock()
|
|
continue
|
|
}
|
|
rt.relayClient.SetOnDisconnectListener(nil)
|
|
go func() {
|
|
_ = rt.relayClient.Close()
|
|
}()
|
|
log.Debugf("clean up unused relay server connection: %s", addr)
|
|
delete(m.relayClients, addr)
|
|
rt.Unlock()
|
|
}
|
|
}
|
|
|
|
func (m *Manager) addListener(serverAddress string, onClosedListener OnServerCloseListener) {
|
|
m.listenerLock.Lock()
|
|
defer m.listenerLock.Unlock()
|
|
l, ok := m.onDisconnectedListeners[serverAddress]
|
|
if !ok {
|
|
l = list.New()
|
|
}
|
|
for e := l.Front(); e != nil; e = e.Next() {
|
|
if reflect.ValueOf(e.Value).Pointer() == reflect.ValueOf(onClosedListener).Pointer() {
|
|
return
|
|
}
|
|
}
|
|
l.PushBack(onClosedListener)
|
|
m.onDisconnectedListeners[serverAddress] = l
|
|
}
|
|
|
|
func (m *Manager) notifyOnDisconnectListeners(serverAddress string) {
|
|
m.listenerLock.Lock()
|
|
defer m.listenerLock.Unlock()
|
|
|
|
l, ok := m.onDisconnectedListeners[serverAddress]
|
|
if !ok {
|
|
return
|
|
}
|
|
for e := l.Front(); e != nil; e = e.Next() {
|
|
go e.Value.(OnServerCloseListener)()
|
|
}
|
|
delete(m.onDisconnectedListeners, serverAddress)
|
|
}
|