mirror of
https://github.com/netbirdio/netbird.git
synced 2025-01-12 17:08:36 +01:00
dd29f4c01e
Reduce the peer status notifications When receive new network map invoke multiple notifications for every single peers. It cause high cpu usage We handle the in a batch the peer notification in update network map. - Remove the unnecessary UpdatePeerFQDN calls in addNewPeer - Fix notification in RemovePeer function - Involve FinishPeerListModifications logic
362 lines
8.6 KiB
Go
362 lines
8.6 KiB
Go
package peer
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// State contains the latest state of a peer
|
|
type State struct {
|
|
IP string
|
|
PubKey string
|
|
FQDN string
|
|
ConnStatus ConnStatus
|
|
ConnStatusUpdate time.Time
|
|
Relayed bool
|
|
Direct bool
|
|
LocalIceCandidateType string
|
|
RemoteIceCandidateType string
|
|
}
|
|
|
|
// LocalPeerState contains the latest state of the local peer
|
|
type LocalPeerState struct {
|
|
IP string
|
|
PubKey string
|
|
KernelInterface bool
|
|
FQDN string
|
|
}
|
|
|
|
// SignalState contains the latest state of a signal connection
|
|
type SignalState struct {
|
|
URL string
|
|
Connected bool
|
|
}
|
|
|
|
// ManagementState contains the latest state of a management connection
|
|
type ManagementState struct {
|
|
URL string
|
|
Connected bool
|
|
}
|
|
|
|
// FullStatus contains the full state held by the Status instance
|
|
type FullStatus struct {
|
|
Peers []State
|
|
ManagementState ManagementState
|
|
SignalState SignalState
|
|
LocalPeerState LocalPeerState
|
|
}
|
|
|
|
// Status holds a state of peers, signal and management connections
|
|
type Status struct {
|
|
mux sync.Mutex
|
|
peers map[string]State
|
|
changeNotify map[string]chan struct{}
|
|
signalState bool
|
|
managementState bool
|
|
localPeer LocalPeerState
|
|
offlinePeers []State
|
|
mgmAddress string
|
|
signalAddress string
|
|
notifier *notifier
|
|
|
|
// To reduce the number of notification invocation this bool will be true when need to call the notification
|
|
// Some Peer actions mostly used by in a batch when the network map has been synchronized. In these type of events
|
|
// set to true this variable and at the end of the processing we will reset it by the FinishPeerListModifications()
|
|
peerListChangedForNotification bool
|
|
}
|
|
|
|
// NewRecorder returns a new Status instance
|
|
func NewRecorder(mgmAddress string) *Status {
|
|
return &Status{
|
|
peers: make(map[string]State),
|
|
changeNotify: make(map[string]chan struct{}),
|
|
offlinePeers: make([]State, 0),
|
|
notifier: newNotifier(),
|
|
mgmAddress: mgmAddress,
|
|
}
|
|
}
|
|
|
|
// ReplaceOfflinePeers replaces
|
|
func (d *Status) ReplaceOfflinePeers(replacement []State) {
|
|
d.mux.Lock()
|
|
defer d.mux.Unlock()
|
|
d.offlinePeers = make([]State, len(replacement))
|
|
copy(d.offlinePeers, replacement)
|
|
|
|
// todo we should set to true in case if the list changed only
|
|
d.peerListChangedForNotification = true
|
|
}
|
|
|
|
// AddPeer adds peer to Daemon status map
|
|
func (d *Status) AddPeer(peerPubKey string, fqdn string) error {
|
|
d.mux.Lock()
|
|
defer d.mux.Unlock()
|
|
|
|
_, ok := d.peers[peerPubKey]
|
|
if ok {
|
|
return errors.New("peer already exist")
|
|
}
|
|
d.peers[peerPubKey] = State{
|
|
PubKey: peerPubKey,
|
|
ConnStatus: StatusDisconnected,
|
|
FQDN: fqdn,
|
|
}
|
|
d.peerListChangedForNotification = true
|
|
return nil
|
|
}
|
|
|
|
// GetPeer adds peer to Daemon status map
|
|
func (d *Status) GetPeer(peerPubKey string) (State, error) {
|
|
d.mux.Lock()
|
|
defer d.mux.Unlock()
|
|
|
|
state, ok := d.peers[peerPubKey]
|
|
if !ok {
|
|
return State{}, errors.New("peer not found")
|
|
}
|
|
return state, nil
|
|
}
|
|
|
|
// RemovePeer removes peer from Daemon status map
|
|
func (d *Status) RemovePeer(peerPubKey string) error {
|
|
d.mux.Lock()
|
|
defer d.mux.Unlock()
|
|
|
|
_, ok := d.peers[peerPubKey]
|
|
if !ok {
|
|
return errors.New("no peer with to remove")
|
|
}
|
|
|
|
delete(d.peers, peerPubKey)
|
|
d.peerListChangedForNotification = true
|
|
return nil
|
|
}
|
|
|
|
// UpdatePeerState updates peer status
|
|
func (d *Status) UpdatePeerState(receivedState State) error {
|
|
d.mux.Lock()
|
|
defer d.mux.Unlock()
|
|
|
|
peerState, ok := d.peers[receivedState.PubKey]
|
|
if !ok {
|
|
return errors.New("peer doesn't exist")
|
|
}
|
|
|
|
if receivedState.IP != "" {
|
|
peerState.IP = receivedState.IP
|
|
}
|
|
|
|
skipNotification := shouldSkipNotify(receivedState, peerState)
|
|
|
|
if receivedState.ConnStatus != peerState.ConnStatus {
|
|
peerState.ConnStatus = receivedState.ConnStatus
|
|
peerState.ConnStatusUpdate = receivedState.ConnStatusUpdate
|
|
peerState.Direct = receivedState.Direct
|
|
peerState.Relayed = receivedState.Relayed
|
|
peerState.LocalIceCandidateType = receivedState.LocalIceCandidateType
|
|
peerState.RemoteIceCandidateType = receivedState.RemoteIceCandidateType
|
|
}
|
|
|
|
d.peers[receivedState.PubKey] = peerState
|
|
|
|
if skipNotification {
|
|
return nil
|
|
}
|
|
|
|
ch, found := d.changeNotify[receivedState.PubKey]
|
|
if found && ch != nil {
|
|
close(ch)
|
|
d.changeNotify[receivedState.PubKey] = nil
|
|
}
|
|
|
|
d.notifyPeerListChanged()
|
|
return nil
|
|
}
|
|
|
|
func shouldSkipNotify(new, curr State) bool {
|
|
switch {
|
|
case new.ConnStatus == StatusConnecting:
|
|
return true
|
|
case new.ConnStatus == StatusDisconnected && curr.ConnStatus == StatusConnecting:
|
|
return true
|
|
case new.ConnStatus == StatusDisconnected && curr.ConnStatus == StatusDisconnected:
|
|
return curr.IP != ""
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// UpdatePeerFQDN update peer's state fqdn only
|
|
func (d *Status) UpdatePeerFQDN(peerPubKey, fqdn string) error {
|
|
d.mux.Lock()
|
|
defer d.mux.Unlock()
|
|
|
|
peerState, ok := d.peers[peerPubKey]
|
|
if !ok {
|
|
return errors.New("peer doesn't exist")
|
|
}
|
|
|
|
peerState.FQDN = fqdn
|
|
d.peers[peerPubKey] = peerState
|
|
|
|
return nil
|
|
}
|
|
|
|
// FinishPeerListModifications this event invoke the notification
|
|
func (d *Status) FinishPeerListModifications() {
|
|
d.mux.Lock()
|
|
|
|
if !d.peerListChangedForNotification {
|
|
d.mux.Unlock()
|
|
return
|
|
}
|
|
d.peerListChangedForNotification = false
|
|
d.mux.Unlock()
|
|
|
|
d.notifyPeerListChanged()
|
|
}
|
|
|
|
// GetPeerStateChangeNotifier returns a change notifier channel for a peer
|
|
func (d *Status) GetPeerStateChangeNotifier(peer string) <-chan struct{} {
|
|
d.mux.Lock()
|
|
defer d.mux.Unlock()
|
|
ch, found := d.changeNotify[peer]
|
|
if !found || ch == nil {
|
|
ch = make(chan struct{})
|
|
d.changeNotify[peer] = ch
|
|
}
|
|
return ch
|
|
}
|
|
|
|
// UpdateLocalPeerState updates local peer status
|
|
func (d *Status) UpdateLocalPeerState(localPeerState LocalPeerState) {
|
|
d.mux.Lock()
|
|
defer d.mux.Unlock()
|
|
|
|
d.localPeer = localPeerState
|
|
d.notifyAddressChanged()
|
|
}
|
|
|
|
// CleanLocalPeerState cleans local peer status
|
|
func (d *Status) CleanLocalPeerState() {
|
|
d.mux.Lock()
|
|
defer d.mux.Unlock()
|
|
|
|
d.localPeer = LocalPeerState{}
|
|
d.notifyAddressChanged()
|
|
}
|
|
|
|
// MarkManagementDisconnected sets ManagementState to disconnected
|
|
func (d *Status) MarkManagementDisconnected() {
|
|
d.mux.Lock()
|
|
defer d.mux.Unlock()
|
|
defer d.onConnectionChanged()
|
|
|
|
d.managementState = false
|
|
}
|
|
|
|
// MarkManagementConnected sets ManagementState to connected
|
|
func (d *Status) MarkManagementConnected() {
|
|
d.mux.Lock()
|
|
defer d.mux.Unlock()
|
|
defer d.onConnectionChanged()
|
|
|
|
d.managementState = true
|
|
}
|
|
|
|
// UpdateSignalAddress update the address of the signal server
|
|
func (d *Status) UpdateSignalAddress(signalURL string) {
|
|
d.mux.Lock()
|
|
defer d.mux.Unlock()
|
|
d.signalAddress = signalURL
|
|
}
|
|
|
|
// UpdateManagementAddress update the address of the management server
|
|
func (d *Status) UpdateManagementAddress(mgmAddress string) {
|
|
d.mux.Lock()
|
|
defer d.mux.Unlock()
|
|
d.mgmAddress = mgmAddress
|
|
}
|
|
|
|
// MarkSignalDisconnected sets SignalState to disconnected
|
|
func (d *Status) MarkSignalDisconnected() {
|
|
d.mux.Lock()
|
|
defer d.mux.Unlock()
|
|
defer d.onConnectionChanged()
|
|
|
|
d.signalState = false
|
|
}
|
|
|
|
// MarkSignalConnected sets SignalState to connected
|
|
func (d *Status) MarkSignalConnected() {
|
|
d.mux.Lock()
|
|
defer d.mux.Unlock()
|
|
defer d.onConnectionChanged()
|
|
|
|
d.signalState = true
|
|
}
|
|
|
|
// GetFullStatus gets full status
|
|
func (d *Status) GetFullStatus() FullStatus {
|
|
d.mux.Lock()
|
|
defer d.mux.Unlock()
|
|
|
|
fullStatus := FullStatus{
|
|
ManagementState: ManagementState{
|
|
d.mgmAddress,
|
|
d.managementState,
|
|
},
|
|
SignalState: SignalState{
|
|
d.signalAddress,
|
|
d.signalState,
|
|
},
|
|
LocalPeerState: d.localPeer,
|
|
}
|
|
|
|
for _, status := range d.peers {
|
|
fullStatus.Peers = append(fullStatus.Peers, status)
|
|
}
|
|
|
|
fullStatus.Peers = append(fullStatus.Peers, d.offlinePeers...)
|
|
|
|
return fullStatus
|
|
}
|
|
|
|
// ClientStart will notify all listeners about the new service state
|
|
func (d *Status) ClientStart() {
|
|
d.notifier.clientStart()
|
|
}
|
|
|
|
// ClientStop will notify all listeners about the new service state
|
|
func (d *Status) ClientStop() {
|
|
d.notifier.clientStop()
|
|
}
|
|
|
|
// ClientTeardown will notify all listeners about the service is under teardown
|
|
func (d *Status) ClientTeardown() {
|
|
d.notifier.clientTearDown()
|
|
}
|
|
|
|
// SetConnectionListener set a listener to the notifier
|
|
func (d *Status) SetConnectionListener(listener Listener) {
|
|
d.notifier.setListener(listener)
|
|
}
|
|
|
|
// RemoveConnectionListener remove the listener from the notifier
|
|
func (d *Status) RemoveConnectionListener() {
|
|
d.notifier.removeListener()
|
|
}
|
|
|
|
func (d *Status) onConnectionChanged() {
|
|
d.notifier.updateServerStates(d.managementState, d.signalState)
|
|
}
|
|
|
|
func (d *Status) notifyPeerListChanged() {
|
|
d.notifier.peerListChanged(len(d.peers) + len(d.offlinePeers))
|
|
}
|
|
|
|
func (d *Status) notifyAddressChanged() {
|
|
d.notifier.localAddressChanged(d.localPeer.FQDN, d.localPeer.IP)
|
|
}
|