Merge branch 'feature/relay-status' into feature/relay-integration

This commit is contained in:
Zoltán Papp 2024-07-12 11:41:21 +02:00
commit add4e9f4e4
8 changed files with 60 additions and 13 deletions

View File

@ -163,7 +163,10 @@ func runInForegroundMode(ctx context.Context, cmd *cobra.Command) error {
ctx, cancel = context.WithCancel(ctx) ctx, cancel = context.WithCancel(ctx)
SetupCloseHandler(ctx, cancel) SetupCloseHandler(ctx, cancel)
connectClient := internal.NewConnectClient(ctx, config, peer.NewRecorder(config.ManagementURL.String())) r := peer.NewRecorder(config.ManagementURL.String())
r.GetFullStatus()
connectClient := internal.NewConnectClient(ctx, config, r)
return connectClient.Run() return connectClient.Run()
} }

View File

@ -257,6 +257,7 @@ func (c *ConnectClient) run(
log.Error(err) log.Error(err)
return wrapErr(err) return wrapErr(err)
} }
c.statusRecorder.SetRelayMgr(relayManager)
} }
peerConfig := loginResp.GetPeerConfig() peerConfig := loginResp.GetPeerConfig()

View File

@ -13,6 +13,7 @@ import (
"github.com/netbirdio/netbird/client/internal/relay" "github.com/netbirdio/netbird/client/internal/relay"
"github.com/netbirdio/netbird/iface" "github.com/netbirdio/netbird/iface"
"github.com/netbirdio/netbird/management/domain" "github.com/netbirdio/netbird/management/domain"
relayClient "github.com/netbirdio/netbird/relay/client"
) )
// State contains the latest state of a peer // State contains the latest state of a peer
@ -142,6 +143,8 @@ type Status struct {
// Some Peer actions mostly used by in a batch when the network map has been synchronized. In these type of events // Some Peer actions mostly used by in a batch when the network map has been synchronized. In these type of events
// set to true this variable and at the end of the processing we will reset it by the FinishPeerListModifications() // set to true this variable and at the end of the processing we will reset it by the FinishPeerListModifications()
peerListChangedForNotification bool peerListChangedForNotification bool
relayMgr *relayClient.Manager
} }
// NewRecorder returns a new Status instance // NewRecorder returns a new Status instance
@ -156,6 +159,12 @@ func NewRecorder(mgmAddress string) *Status {
} }
} }
func (d *Status) SetRelayMgr(manager *relayClient.Manager) {
d.mux.Lock()
defer d.mux.Unlock()
d.relayMgr = manager
}
// ReplaceOfflinePeers replaces // ReplaceOfflinePeers replaces
func (d *Status) ReplaceOfflinePeers(replacement []State) { func (d *Status) ReplaceOfflinePeers(replacement []State) {
d.mux.Lock() d.mux.Lock()
@ -503,9 +512,30 @@ func (d *Status) GetSignalState() SignalState {
} }
func (d *Status) GetRelayStates() []relay.ProbeResult { func (d *Status) GetRelayStates() []relay.ProbeResult {
if d.relayMgr == nil {
return d.relayStates return d.relayStates
} }
// extend the list of stun, turn servers with relay address
relaysState := make([]relay.ProbeResult, len(d.relayStates), len(d.relayStates)+1)
copy(relaysState, d.relayStates)
relayState := relay.ProbeResult{}
// if the server connection is not established then we will use the general address
// in case of connection we will use the instance specific address
instanceAddr, err := d.relayMgr.RelayInstanceAddress()
if err != nil {
relayState.URI = d.relayMgr.ServerURL()
relayState.Err = err
} else {
relayState.URI = instanceAddr
}
relaysState = append(relaysState, relayState)
return relaysState
}
func (d *Status) GetDNSStates() []NSGroupState { func (d *Status) GetDNSStates() []NSGroupState {
return d.nsGroupStates return d.nsGroupStates
} }

View File

@ -17,7 +17,7 @@ import (
// ProbeResult holds the info about the result of a relay probe request // ProbeResult holds the info about the result of a relay probe request
type ProbeResult struct { type ProbeResult struct {
URI *stun.URI URI string
Err error Err error
Addr string Addr string
} }
@ -176,7 +176,7 @@ func ProbeAll(
wg.Add(1) wg.Add(1)
go func(res *ProbeResult, stunURI *stun.URI) { go func(res *ProbeResult, stunURI *stun.URI) {
defer wg.Done() defer wg.Done()
res.URI = stunURI res.URI = stunURI.String()
res.Addr, res.Err = fn(ctx, stunURI) res.Addr, res.Err = fn(ctx, stunURI)
}(&results[i], uri) }(&results[i], uri)
} }

View File

@ -174,8 +174,8 @@ func seedFromStatus(a *anonymize.Anonymizer, status *peer.FullStatus) {
} }
for _, relay := range status.Relays { for _, relay := range status.Relays {
if relay.URI != nil { if relay.URI != "" {
a.AnonymizeURI(relay.URI.String()) a.AnonymizeURI(relay.URI)
} }
} }
} }

View File

@ -745,7 +745,7 @@ func toProtoFullStatus(fullStatus peer.FullStatus) *proto.FullStatus {
for _, relayState := range fullStatus.Relays { for _, relayState := range fullStatus.Relays {
pbRelayState := &proto.RelayState{ pbRelayState := &proto.RelayState{
URI: relayState.URI.String(), URI: relayState.URI,
Available: relayState.Err == nil, Available: relayState.Err == nil,
} }
if err := relayState.Err; err != nil { if err := relayState.Err; err != nil {

View File

@ -111,6 +111,7 @@ type Client struct {
readLoopMutex sync.Mutex readLoopMutex sync.Mutex
wgReadLoop sync.WaitGroup wgReadLoop sync.WaitGroup
instanceURL string instanceURL string
muInstanceURL sync.Mutex
onDisconnectListener func() onDisconnectListener func()
listenerMutex sync.Mutex listenerMutex sync.Mutex
@ -190,8 +191,8 @@ func (c *Client) OpenConn(dstPeerID string) (net.Conn, error) {
// ServerInstanceURL returns the address of the relay server. It could change after the close and reopen the connection. // ServerInstanceURL returns the address of the relay server. It could change after the close and reopen the connection.
func (c *Client) ServerInstanceURL() (string, error) { func (c *Client) ServerInstanceURL() (string, error) {
c.mu.Lock() c.muInstanceURL.Lock()
defer c.mu.Unlock() defer c.muInstanceURL.Unlock()
if c.instanceURL == "" { if c.instanceURL == "" {
return "", fmt.Errorf("relay connection is not established") return "", fmt.Errorf("relay connection is not established")
} }
@ -272,7 +273,9 @@ func (c *Client) handShake() error {
if err != nil { if err != nil {
return err return err
} }
c.muInstanceURL.Lock()
c.instanceURL = ia c.instanceURL = ia
c.muInstanceURL.Unlock()
return nil return nil
} }
@ -310,6 +313,11 @@ func (c *Client) readLoop(relayConn net.Conn) {
} }
hc.Stop() hc.Stop()
c.muInstanceURL.Lock()
c.instanceURL = ""
c.muInstanceURL.Unlock()
c.notifyDisconnected() c.notifyDisconnected()
c.wgReadLoop.Done() c.wgReadLoop.Done()
_ = c.close(false) _ = c.close(false)

View File

@ -129,10 +129,19 @@ func (m *Manager) RelayInstanceAddress() (string, error) {
return m.relayClient.ServerInstanceURL() return m.relayClient.ServerInstanceURL()
} }
// ServerURL returns the address of the permanent relay server.
func (m *Manager) ServerURL() string {
return m.serverURL
}
func (m *Manager) HasRelayAddress() bool { func (m *Manager) HasRelayAddress() bool {
return m.serverURL != "" return m.serverURL != ""
} }
func (m *Manager) UpdateToken(token *relayAuth.Token) {
m.tokenStore.UpdateToken(token)
}
func (m *Manager) openConnVia(serverAddress, peerKey string) (net.Conn, error) { func (m *Manager) openConnVia(serverAddress, peerKey string) (net.Conn, error) {
// check if already has a connection to the desired relay server // check if already has a connection to the desired relay server
m.relayClientsMutex.RLock() m.relayClientsMutex.RLock()
@ -264,7 +273,3 @@ func (m *Manager) notifyOnDisconnectListeners(serverAddress string) {
m.listenerLock.Unlock() m.listenerLock.Unlock()
} }
func (m *Manager) UpdateToken(token *relayAuth.Token) {
m.tokenStore.UpdateToken(token)
}