Refactor initial Relay connection

approach.

- allow to run engine without Relay servers
- keep up to date the Relay address changes
This commit is contained in:
Zoltan Papp
2024-10-28 18:28:09 +01:00
parent 940f8b4547
commit 7bde2ac09c
8 changed files with 165 additions and 79 deletions

View File

@ -230,6 +230,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, probes *ProbeHold
relayURLs, token := parseRelayInfo(loginResp) relayURLs, token := parseRelayInfo(loginResp)
relayManager := relayClient.NewManager(engineCtx, relayURLs, myPrivateKey.PublicKey().String()) relayManager := relayClient.NewManager(engineCtx, relayURLs, myPrivateKey.PublicKey().String())
c.statusRecorder.SetRelayMgr(relayManager)
if len(relayURLs) > 0 { if len(relayURLs) > 0 {
if token != nil { if token != nil {
if err := relayManager.UpdateToken(token); err != nil { if err := relayManager.UpdateToken(token); err != nil {
@ -240,9 +241,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, probes *ProbeHold
log.Infof("connecting to the Relay service(s): %s", strings.Join(relayURLs, ", ")) log.Infof("connecting to the Relay service(s): %s", strings.Join(relayURLs, ", "))
if err = relayManager.Serve(); err != nil { if err = relayManager.Serve(); err != nil {
log.Error(err) log.Error(err)
return wrapErr(err)
} }
c.statusRecorder.SetRelayMgr(relayManager)
} }
peerConfig := loginResp.GetPeerConfig() peerConfig := loginResp.GetPeerConfig()

View File

@ -38,7 +38,6 @@ import (
"github.com/netbirdio/netbird/client/internal/routemanager/systemops" "github.com/netbirdio/netbird/client/internal/routemanager/systemops"
"github.com/netbirdio/netbird/client/internal/statemanager" "github.com/netbirdio/netbird/client/internal/statemanager"
nbssh "github.com/netbirdio/netbird/client/ssh" nbssh "github.com/netbirdio/netbird/client/ssh"
"github.com/netbirdio/netbird/client/system" "github.com/netbirdio/netbird/client/system"
nbdns "github.com/netbirdio/netbird/dns" nbdns "github.com/netbirdio/netbird/dns"
@ -171,7 +170,7 @@ type Engine struct {
relayManager *relayClient.Manager relayManager *relayClient.Manager
stateManager *statemanager.Manager stateManager *statemanager.Manager
srWatcher *guard.SRWatcher srWatcher *guard.SRWatcher
} }
// Peer is an instance of the Connection Peer // Peer is an instance of the Connection Peer
@ -538,6 +537,7 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
relayMsg := wCfg.GetRelay() relayMsg := wCfg.GetRelay()
if relayMsg != nil { if relayMsg != nil {
// when we receive token we expect valid address list too
c := &auth.Token{ c := &auth.Token{
Payload: relayMsg.GetTokenPayload(), Payload: relayMsg.GetTokenPayload(),
Signature: relayMsg.GetTokenSignature(), Signature: relayMsg.GetTokenSignature(),
@ -546,9 +546,16 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
log.Errorf("failed to update relay token: %v", err) log.Errorf("failed to update relay token: %v", err)
return fmt.Errorf("update relay token: %w", err) return fmt.Errorf("update relay token: %w", err)
} }
e.relayManager.UpdateServerURLs(relayMsg.Urls)
// Just in case the agent started with an MGM server where the relay was disabled but was later enabled.
// We can ignore all errors because the guard will manage the reconnection retries.
_ = e.relayManager.Serve()
} else {
e.relayManager.UpdateServerURLs(nil)
} }
// todo update relay address in the relay manager
// todo update signal // todo update signal
} }

View File

@ -141,7 +141,7 @@ type Client struct {
instanceURL *RelayAddr instanceURL *RelayAddr
muInstanceURL sync.Mutex muInstanceURL sync.Mutex
onDisconnectListener func() onDisconnectListener func(string)
onConnectedListener func() onConnectedListener func()
listenerMutex sync.Mutex listenerMutex sync.Mutex
} }
@ -234,7 +234,7 @@ func (c *Client) ServerInstanceURL() (string, error) {
} }
// SetOnDisconnectListener sets a function that will be called when the connection to the relay server is closed. // SetOnDisconnectListener sets a function that will be called when the connection to the relay server is closed.
func (c *Client) SetOnDisconnectListener(fn func()) { func (c *Client) SetOnDisconnectListener(fn func(string)) {
c.listenerMutex.Lock() c.listenerMutex.Lock()
defer c.listenerMutex.Unlock() defer c.listenerMutex.Unlock()
c.onDisconnectListener = fn c.onDisconnectListener = fn
@ -555,7 +555,7 @@ func (c *Client) notifyDisconnected() {
if c.onDisconnectListener == nil { if c.onDisconnectListener == nil {
return return
} }
go c.onDisconnectListener() go c.onDisconnectListener(c.connectionURL)
} }
func (c *Client) notifyConnected() { func (c *Client) notifyConnected() {

View File

@ -551,7 +551,7 @@ func TestCloseByServer(t *testing.T) {
} }
disconnected := make(chan struct{}) disconnected := make(chan struct{})
relayClient.SetOnDisconnectListener(func() { relayClient.SetOnDisconnectListener(func(_ string) {
log.Infof("client disconnected") log.Infof("client disconnected")
close(disconnected) close(disconnected)
}) })

View File

@ -13,23 +13,32 @@ var (
// Guard manage the reconnection tries to the Relay server in case of disconnection event. // Guard manage the reconnection tries to the Relay server in case of disconnection event.
type Guard struct { type Guard struct {
ctx context.Context // OnNewRelayClient is a channel that is used to notify the relay client about a new relay client instance.
relayClient *Client OnNewRelayClient chan *Client
serverPicker *ServerPicker
} }
// NewGuard creates a new guard for the relay client. // NewGuard creates a new guard for the relay client.
func NewGuard(context context.Context, relayClient *Client) *Guard { func NewGuard(sp *ServerPicker) *Guard {
g := &Guard{ g := &Guard{
ctx: context, OnNewRelayClient: make(chan *Client, 1),
relayClient: relayClient, serverPicker: sp,
} }
return g return g
} }
// OnDisconnected is called when the relay client is disconnected from the relay server. It will trigger the reconnection // StartReconnectTrys is called when the relay client is disconnected from the relay server.
// It attempts to reconnect to the relay server. The function first tries a quick reconnect
// to the same server that was used before, if the server URL is still valid. If the quick
// reconnect fails, it starts a ticker to periodically attempt server picking until it
// succeeds or the context is done.
//
// Parameters:
// - ctx: The context to control the lifecycle of the reconnection attempts.
// - relayClient: The relay client instance that was disconnected.
// todo prevent multiple reconnection instances. In the current usage it should not happen, but it is better to prevent // todo prevent multiple reconnection instances. In the current usage it should not happen, but it is better to prevent
func (g *Guard) OnDisconnected() { func (g *Guard) StartReconnectTrys(ctx context.Context, relayClient *Client) {
if g.quickReconnect() { if g.isServerURLStillValid(relayClient) && g.quickReconnect(ctx, relayClient) {
return return
} }
@ -39,30 +48,58 @@ func (g *Guard) OnDisconnected() {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
err := g.relayClient.Connect() if err := g.retry(ctx); err != nil {
if err != nil { log.Errorf("failed to pick new Relay server: %s", err)
log.Errorf("failed to reconnect to relay server: %s", err)
continue continue
} }
return return
case <-g.ctx.Done(): case <-ctx.Done():
return return
} }
} }
} }
func (g *Guard) quickReconnect() bool { func (g *Guard) retry(ctx context.Context) error {
ctx, cancel := context.WithTimeout(g.ctx, 1500*time.Millisecond) relayClient, err := g.serverPicker.PickServer(ctx)
if err != nil {
return err
}
// prevent to work with a deprecated Relay client instance
g.drainRelayClientChan()
g.OnNewRelayClient <- relayClient
return nil
}
func (g *Guard) quickReconnect(parentCtx context.Context, rc *Client) bool {
ctx, cancel := context.WithTimeout(parentCtx, 1500*time.Millisecond)
defer cancel() defer cancel()
<-ctx.Done() <-ctx.Done()
if g.ctx.Err() != nil { if parentCtx.Err() != nil {
return false return false
} }
if err := g.relayClient.Connect(); err != nil { if err := rc.Connect(); err != nil {
log.Errorf("failed to reconnect to relay server: %s", err) log.Errorf("failed to reconnect to relay server: %s", err)
return false return false
} }
return true return true
} }
func (g *Guard) drainRelayClientChan() {
select {
case <-g.OnNewRelayClient:
default:
}
}
func (g *Guard) isServerURLStillValid(rc *Client) bool {
for _, url := range g.serverPicker.ServerURLs.Load().([]string) {
if url == rc.connectionURL {
return true
}
}
return false
}

View File

@ -53,12 +53,15 @@ type ManagerService interface {
// relay servers will be closed if there is no active connection. Periodically the manager will check if there is any // relay servers will be closed if there is no active connection. Periodically the manager will check if there is any
// unused relay connection and close it. // unused relay connection and close it.
type Manager struct { type Manager struct {
ctx context.Context ctx context.Context
serverURLs []string peerID string
peerID string running bool
tokenStore *relayAuth.TokenStore tokenStore *relayAuth.TokenStore
serverPicker *ServerPicker
relayClient *Client relayClient *Client
// the guard logic can overwrite the relayClient variable, this mutex protect the usage of the variable
relayClientMu sync.Mutex
reconnectGuard *Guard reconnectGuard *Guard
relayClients map[string]*RelayTrack relayClients map[string]*RelayTrack
@ -72,48 +75,53 @@ type Manager struct {
// NewManager creates a new manager instance. // NewManager creates a new manager instance.
// The serverURL address can be empty. In this case, the manager will not serve. // The serverURL address can be empty. In this case, the manager will not serve.
func NewManager(ctx context.Context, serverURLs []string, peerID string) *Manager { func NewManager(ctx context.Context, serverURLs []string, peerID string) *Manager {
return &Manager{ tokenStore := &relayAuth.TokenStore{}
ctx: ctx,
serverURLs: serverURLs, m := &Manager{
peerID: peerID, ctx: ctx,
tokenStore: &relayAuth.TokenStore{}, peerID: peerID,
tokenStore: tokenStore,
serverPicker: &ServerPicker{
TokenStore: tokenStore,
PeerID: peerID,
},
relayClients: make(map[string]*RelayTrack), relayClients: make(map[string]*RelayTrack),
onDisconnectedListeners: make(map[string]*list.List), onDisconnectedListeners: make(map[string]*list.List),
} }
m.serverPicker.ServerURLs.Store(serverURLs)
return m
} }
// Serve starts the manager. It will establish a connection to the relay server and start the relay cleanup loop for // Serve starts the manager, attempting to establish a connection with the relay server.
// the unused relay connections. The manager will automatically reconnect to the relay server in case of disconnection. // If the connection fails, it will keep trying to reconnect in the background.
// Additionally, it starts a cleanup loop to remove unused relay connections.
// The manager will automatically reconnect to the relay server in case of disconnection.
func (m *Manager) Serve() error { func (m *Manager) Serve() error {
if m.relayClient != nil { if m.running {
return fmt.Errorf("manager already serving") return fmt.Errorf("manager already serving")
} }
log.Debugf("starting relay client manager with %v relay servers", m.serverURLs) m.running = true
log.Debugf("starting relay client manager with %v relay servers", m.serverPicker.ServerURLs.Load())
sp := ServerPicker{ m.reconnectGuard = NewGuard(m.serverPicker)
TokenStore: m.tokenStore, go m.listenGuardEvent(m.ctx)
PeerID: m.peerID,
client, err := m.serverPicker.PickServer(m.ctx)
if err == nil {
m.storeClient(client)
} }
client, err := sp.PickServer(m.ctx, m.serverURLs) go m.startCleanupLoop()
if err != nil { return err
return err
}
m.relayClient = client
m.reconnectGuard = NewGuard(m.ctx, m.relayClient)
m.relayClient.SetOnConnectedListener(m.onServerConnected)
m.relayClient.SetOnDisconnectListener(func() {
m.onServerDisconnected(client.connectionURL)
})
m.startCleanupLoop()
return nil
} }
// OpenConn opens a connection to the given peer key. If the peer is on the same relay server, the connection will be // OpenConn opens a connection to the given peer key. If the peer is on the same relay server, the connection will be
// established via the relay server. If the peer is on a different relay server, the manager will establish a new // established via the relay server. If the peer is on a different relay server, the manager will establish a new
// connection to the relay server. It returns back with a net.Conn what represent the remote peer connection. // connection to the relay server. It returns back with a net.Conn what represent the remote peer connection.
func (m *Manager) OpenConn(serverAddress, peerKey string) (net.Conn, error) { func (m *Manager) OpenConn(serverAddress, peerKey string) (net.Conn, error) {
m.relayClientMu.Lock()
defer m.relayClientMu.Unlock()
if m.relayClient == nil { if m.relayClient == nil {
return nil, ErrRelayClientNotConnected return nil, ErrRelayClientNotConnected
} }
@ -142,6 +150,9 @@ func (m *Manager) OpenConn(serverAddress, peerKey string) (net.Conn, error) {
// Ready returns true if the home Relay client is connected to the relay server. // Ready returns true if the home Relay client is connected to the relay server.
func (m *Manager) Ready() bool { func (m *Manager) Ready() bool {
m.relayClientMu.Lock()
defer m.relayClientMu.Unlock()
if m.relayClient == nil { if m.relayClient == nil {
return false return false
} }
@ -155,6 +166,13 @@ func (m *Manager) SetOnReconnectedListener(f func()) {
// AddCloseListener adds a listener to the given server instance address. The listener will be called if the connection // AddCloseListener adds a listener to the given server instance address. The listener will be called if the connection
// closed. // closed.
func (m *Manager) AddCloseListener(serverAddress string, onClosedListener OnServerCloseListener) error { func (m *Manager) AddCloseListener(serverAddress string, onClosedListener OnServerCloseListener) error {
m.relayClientMu.Lock()
defer m.relayClientMu.Unlock()
if m.relayClient == nil {
return ErrRelayClientNotConnected
}
foreign, err := m.isForeignServer(serverAddress) foreign, err := m.isForeignServer(serverAddress)
if err != nil { if err != nil {
return err return err
@ -173,6 +191,9 @@ func (m *Manager) AddCloseListener(serverAddress string, onClosedListener OnServ
// RelayInstanceAddress returns the address of the permanent relay server. It could change if the network connection is // RelayInstanceAddress returns the address of the permanent relay server. It could change if the network connection is
// lost. This address will be sent to the target peer to choose the common relay server for the communication. // lost. This address will be sent to the target peer to choose the common relay server for the communication.
func (m *Manager) RelayInstanceAddress() (string, error) { func (m *Manager) RelayInstanceAddress() (string, error) {
m.relayClientMu.Lock()
defer m.relayClientMu.Unlock()
if m.relayClient == nil { if m.relayClient == nil {
return "", ErrRelayClientNotConnected return "", ErrRelayClientNotConnected
} }
@ -181,13 +202,17 @@ func (m *Manager) RelayInstanceAddress() (string, error) {
// ServerURLs returns the addresses of the relay servers. // ServerURLs returns the addresses of the relay servers.
func (m *Manager) ServerURLs() []string { func (m *Manager) ServerURLs() []string {
return m.serverURLs return m.serverPicker.ServerURLs.Load().([]string)
} }
// HasRelayAddress returns true if the manager is serving. With this method can check if the peer can communicate with // HasRelayAddress returns true if the manager is serving. With this method can check if the peer can communicate with
// Relay service. // Relay service.
func (m *Manager) HasRelayAddress() bool { func (m *Manager) HasRelayAddress() bool {
return len(m.serverURLs) > 0 return len(m.serverPicker.ServerURLs.Load().([]string)) > 0
}
func (m *Manager) UpdateServerURLs(serverURLs []string) {
m.serverPicker.ServerURLs.Store(serverURLs)
} }
// UpdateToken updates the token in the token store. // UpdateToken updates the token in the token store.
@ -241,9 +266,7 @@ func (m *Manager) openConnVia(serverAddress, peerKey string) (net.Conn, error) {
return nil, err return nil, err
} }
// if connection closed then delete the relay client from the list // if connection closed then delete the relay client from the list
relayClient.SetOnDisconnectListener(func() { relayClient.SetOnDisconnectListener(m.onServerDisconnected)
m.onServerDisconnected(serverAddress)
})
rt.relayClient = relayClient rt.relayClient = relayClient
rt.Unlock() rt.Unlock()
@ -261,14 +284,37 @@ func (m *Manager) onServerConnected() {
go m.onReconnectedListenerFn() go m.onReconnectedListenerFn()
} }
// onServerDisconnected start to reconnection for home server only
func (m *Manager) onServerDisconnected(serverAddress string) { func (m *Manager) onServerDisconnected(serverAddress string) {
m.relayClientMu.Lock()
if serverAddress == m.relayClient.connectionURL { if serverAddress == m.relayClient.connectionURL {
go m.reconnectGuard.OnDisconnected() go m.reconnectGuard.StartReconnectTrys(m.ctx, m.relayClient)
} }
m.relayClientMu.Unlock()
m.notifyOnDisconnectListeners(serverAddress) m.notifyOnDisconnectListeners(serverAddress)
} }
func (m *Manager) listenGuardEvent(ctx context.Context) {
for {
select {
case rc := <-m.reconnectGuard.OnNewRelayClient:
m.storeClient(rc)
case <-ctx.Done():
return
}
}
}
func (m *Manager) storeClient(client *Client) {
m.relayClientMu.Lock()
defer m.relayClientMu.Unlock()
m.relayClient = client
m.relayClient.SetOnConnectedListener(m.onServerConnected)
m.relayClient.SetOnDisconnectListener(m.onServerDisconnected)
}
func (m *Manager) isForeignServer(address string) (bool, error) { func (m *Manager) isForeignServer(address string) (bool, error) {
rAddr, err := m.relayClient.ServerInstanceURL() rAddr, err := m.relayClient.ServerInstanceURL()
if err != nil { if err != nil {
@ -278,22 +324,16 @@ func (m *Manager) isForeignServer(address string) (bool, error) {
} }
func (m *Manager) startCleanupLoop() { func (m *Manager) startCleanupLoop() {
if m.ctx.Err() != nil {
return
}
ticker := time.NewTicker(relayCleanupInterval) ticker := time.NewTicker(relayCleanupInterval)
go func() { defer ticker.Stop()
defer ticker.Stop() for {
for { select {
select { case <-m.ctx.Done():
case <-m.ctx.Done(): return
return case <-ticker.C:
case <-ticker.C: m.cleanUpUnusedRelays()
m.cleanUpUnusedRelays()
}
} }
}() }
} }
func (m *Manager) cleanUpUnusedRelays() { func (m *Manager) cleanUpUnusedRelays() {

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"sync/atomic"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -24,20 +25,21 @@ type connResult struct {
type ServerPicker struct { type ServerPicker struct {
TokenStore *auth.TokenStore TokenStore *auth.TokenStore
ServerURLs atomic.Value
PeerID string PeerID string
} }
func (sp *ServerPicker) PickServer(parentCtx context.Context, urls []string) (*Client, error) { func (sp *ServerPicker) PickServer(parentCtx context.Context) (*Client, error) {
ctx, cancel := context.WithTimeout(parentCtx, connectionTimeout) ctx, cancel := context.WithTimeout(parentCtx, connectionTimeout)
defer cancel() defer cancel()
totalServers := len(urls) totalServers := len(sp.ServerURLs.Load().([]string))
connResultChan := make(chan connResult, totalServers) connResultChan := make(chan connResult, totalServers)
successChan := make(chan connResult, 1) successChan := make(chan connResult, 1)
concurrentLimiter := make(chan struct{}, maxConcurrentServers) concurrentLimiter := make(chan struct{}, maxConcurrentServers)
for _, url := range urls { for _, url := range sp.ServerURLs.Load().([]string) {
// todo check if we have a successful connection so we do not need to connect to other servers // todo check if we have a successful connection so we do not need to connect to other servers
concurrentLimiter <- struct{}{} concurrentLimiter <- struct{}{}
go func(url string) { go func(url string) {

View File

@ -12,12 +12,13 @@ func TestServerPicker_UnavailableServers(t *testing.T) {
TokenStore: nil, TokenStore: nil,
PeerID: "test", PeerID: "test",
} }
sp.ServerURLs.Store([]string{"rel://dummy1", "rel://dummy2"})
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
go func() { go func() {
_, err := sp.PickServer(ctx, []string{"rel://dummy1", "rel://dummy2"}) _, err := sp.PickServer(ctx)
if err == nil { if err == nil {
t.Error(err) t.Error(err)
} }