mirror of
https://github.com/netbirdio/netbird.git
synced 2024-11-07 16:54:16 +01:00
Reduce the peer status notifications (#956)
Reduce the peer status notifications When receive new network map invoke multiple notifications for every single peers. It cause high cpu usage We handle the in a batch the peer notification in update network map. - Remove the unnecessary UpdatePeerFQDN calls in addNewPeer - Fix notification in RemovePeer function - Involve FinishPeerListModifications logic
This commit is contained in:
parent
cb7ecd1cc4
commit
dd29f4c01e
@ -605,6 +605,7 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
|||||||
// cleanup request, most likely our peer has been deleted
|
// cleanup request, most likely our peer has been deleted
|
||||||
if networkMap.GetRemotePeersIsEmpty() {
|
if networkMap.GetRemotePeersIsEmpty() {
|
||||||
err := e.removeAllPeers()
|
err := e.removeAllPeers()
|
||||||
|
e.statusRecorder.FinishPeerListModifications()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -624,6 +625,8 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
e.statusRecorder.FinishPeerListModifications()
|
||||||
|
|
||||||
// update SSHServer by adding remote peer SSH keys
|
// update SSHServer by adding remote peer SSH keys
|
||||||
if !isNil(e.sshServer) {
|
if !isNil(e.sshServer) {
|
||||||
for _, config := range networkMap.GetRemotePeers() {
|
for _, config := range networkMap.GetRemotePeers() {
|
||||||
@ -759,17 +762,13 @@ func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error {
|
|||||||
}
|
}
|
||||||
e.peerConns[peerKey] = conn
|
e.peerConns[peerKey] = conn
|
||||||
|
|
||||||
err = e.statusRecorder.AddPeer(peerKey)
|
err = e.statusRecorder.AddPeer(peerKey, peerConfig.Fqdn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("error adding peer %s to status recorder, got error: %v", peerKey, err)
|
log.Warnf("error adding peer %s to status recorder, got error: %v", peerKey, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
go e.connWorker(conn, peerKey)
|
go e.connWorker(conn, peerKey)
|
||||||
}
|
}
|
||||||
err := e.statusRecorder.UpdatePeerFQDN(peerKey, peerConfig.Fqdn)
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf("error updating peer's %s fqdn in the status recorder, got error: %v", peerKey, err)
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -59,6 +59,11 @@ type Status struct {
|
|||||||
mgmAddress string
|
mgmAddress string
|
||||||
signalAddress string
|
signalAddress string
|
||||||
notifier *notifier
|
notifier *notifier
|
||||||
|
|
||||||
|
// To reduce the number of notification invocation this bool will be true when need to call the notification
|
||||||
|
// Some Peer actions mostly used by in a batch when the network map has been synchronized. In these type of events
|
||||||
|
// set to true this variable and at the end of the processing we will reset it by the FinishPeerListModifications()
|
||||||
|
peerListChangedForNotification bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRecorder returns a new Status instance
|
// NewRecorder returns a new Status instance
|
||||||
@ -78,11 +83,13 @@ func (d *Status) ReplaceOfflinePeers(replacement []State) {
|
|||||||
defer d.mux.Unlock()
|
defer d.mux.Unlock()
|
||||||
d.offlinePeers = make([]State, len(replacement))
|
d.offlinePeers = make([]State, len(replacement))
|
||||||
copy(d.offlinePeers, replacement)
|
copy(d.offlinePeers, replacement)
|
||||||
d.notifyPeerListChanged()
|
|
||||||
|
// todo we should set to true in case if the list changed only
|
||||||
|
d.peerListChangedForNotification = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddPeer adds peer to Daemon status map
|
// AddPeer adds peer to Daemon status map
|
||||||
func (d *Status) AddPeer(peerPubKey string) error {
|
func (d *Status) AddPeer(peerPubKey string, fqdn string) error {
|
||||||
d.mux.Lock()
|
d.mux.Lock()
|
||||||
defer d.mux.Unlock()
|
defer d.mux.Unlock()
|
||||||
|
|
||||||
@ -90,7 +97,12 @@ func (d *Status) AddPeer(peerPubKey string) error {
|
|||||||
if ok {
|
if ok {
|
||||||
return errors.New("peer already exist")
|
return errors.New("peer already exist")
|
||||||
}
|
}
|
||||||
d.peers[peerPubKey] = State{PubKey: peerPubKey, ConnStatus: StatusDisconnected}
|
d.peers[peerPubKey] = State{
|
||||||
|
PubKey: peerPubKey,
|
||||||
|
ConnStatus: StatusDisconnected,
|
||||||
|
FQDN: fqdn,
|
||||||
|
}
|
||||||
|
d.peerListChangedForNotification = true
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -112,13 +124,13 @@ func (d *Status) RemovePeer(peerPubKey string) error {
|
|||||||
defer d.mux.Unlock()
|
defer d.mux.Unlock()
|
||||||
|
|
||||||
_, ok := d.peers[peerPubKey]
|
_, ok := d.peers[peerPubKey]
|
||||||
if ok {
|
if !ok {
|
||||||
delete(d.peers, peerPubKey)
|
return errors.New("no peer with to remove")
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
d.notifyPeerListChanged()
|
delete(d.peers, peerPubKey)
|
||||||
return errors.New("no peer with to remove")
|
d.peerListChangedForNotification = true
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdatePeerState updates peer status
|
// UpdatePeerState updates peer status
|
||||||
@ -188,10 +200,23 @@ func (d *Status) UpdatePeerFQDN(peerPubKey, fqdn string) error {
|
|||||||
peerState.FQDN = fqdn
|
peerState.FQDN = fqdn
|
||||||
d.peers[peerPubKey] = peerState
|
d.peers[peerPubKey] = peerState
|
||||||
|
|
||||||
d.notifyPeerListChanged()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FinishPeerListModifications this event invoke the notification
|
||||||
|
func (d *Status) FinishPeerListModifications() {
|
||||||
|
d.mux.Lock()
|
||||||
|
|
||||||
|
if !d.peerListChangedForNotification {
|
||||||
|
d.mux.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
d.peerListChangedForNotification = false
|
||||||
|
d.mux.Unlock()
|
||||||
|
|
||||||
|
d.notifyPeerListChanged()
|
||||||
|
}
|
||||||
|
|
||||||
// GetPeerStateChangeNotifier returns a change notifier channel for a peer
|
// GetPeerStateChangeNotifier returns a change notifier channel for a peer
|
||||||
func (d *Status) GetPeerStateChangeNotifier(peer string) <-chan struct{} {
|
func (d *Status) GetPeerStateChangeNotifier(peer string) <-chan struct{} {
|
||||||
d.mux.Lock()
|
d.mux.Lock()
|
||||||
|
@ -9,13 +9,13 @@ import (
|
|||||||
func TestAddPeer(t *testing.T) {
|
func TestAddPeer(t *testing.T) {
|
||||||
key := "abc"
|
key := "abc"
|
||||||
status := NewRecorder("https://mgm")
|
status := NewRecorder("https://mgm")
|
||||||
err := status.AddPeer(key)
|
err := status.AddPeer(key, "abc.netbird")
|
||||||
assert.NoError(t, err, "shouldn't return error")
|
assert.NoError(t, err, "shouldn't return error")
|
||||||
|
|
||||||
_, exists := status.peers[key]
|
_, exists := status.peers[key]
|
||||||
assert.True(t, exists, "value was found")
|
assert.True(t, exists, "value was found")
|
||||||
|
|
||||||
err = status.AddPeer(key)
|
err = status.AddPeer(key, "abc.netbird")
|
||||||
|
|
||||||
assert.Error(t, err, "should return error on duplicate")
|
assert.Error(t, err, "should return error on duplicate")
|
||||||
}
|
}
|
||||||
@ -23,7 +23,7 @@ func TestAddPeer(t *testing.T) {
|
|||||||
func TestGetPeer(t *testing.T) {
|
func TestGetPeer(t *testing.T) {
|
||||||
key := "abc"
|
key := "abc"
|
||||||
status := NewRecorder("https://mgm")
|
status := NewRecorder("https://mgm")
|
||||||
err := status.AddPeer(key)
|
err := status.AddPeer(key, "abc.netbird")
|
||||||
assert.NoError(t, err, "shouldn't return error")
|
assert.NoError(t, err, "shouldn't return error")
|
||||||
|
|
||||||
peerStatus, err := status.GetPeer(key)
|
peerStatus, err := status.GetPeer(key)
|
||||||
|
Loading…
Reference in New Issue
Block a user