mirror of
https://github.com/netbirdio/netbird.git
synced 2024-12-02 13:03:56 +01:00
7bda385e1b
* Skip peer update on unchanged network map (#2236) * Enhance network updates by skipping unchanged messages Optimizes the network update process by skipping updates where no changes in the peer update message received. * Add unit tests * add locks * Improve concurrency and update peer message handling * Refactor account manager network update tests * fix test * Fix inverted network map update condition * Add default group and policy to test data * Run peer updates in a separate goroutine * Refactor * Refactor lock * Fix peers update by including NetworkMap and posture Checks * go mod tidy * fix merge Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * fix merge Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * [management] Skip account peers update if no changes affect peers (#2310) * Remove incrementing network serial and updating peers after group deletion * Update account peer if posture check is linked to policy * Remove account peers update on saving setup key * Refactor group link checking into re-usable functions * Add HasPeers function to group * Refactor group management * Optimize group change effects on account peers * Update account peers if ns group has peers * Refactor group changes * Optimize account peers update in DNS settings * Optimize update of account peers on jwt groups sync * Refactor peer account updates for efficiency * Optimize peer update on user deletion and changes * Remove condition check for network serial update * Optimize account peers updates on route changes * Remove UpdatePeerSSHKey method * Remove unused isPolicyRuleGroupsEmpty * Add tests for peer update behavior on posture check changes * Add tests for peer update behavior on policy changes * Add tests for peer update behavior on group changes * Add tests for peer update behavior on dns settings changes * Refactor * Add tests for peer update behavior on name server changes * Add tests for peer update behavior on user changes * Add tests for peer update behavior on route changes * fix tests * Add tests for peer update behavior on setup key changes * Add tests for peer update behavior on peers changes * fix merge * Fix tests * go mod tidy * Add NameServer and Route comparators * Update network map diff logic with custom comparators * Add tests * Refactor duplicate diff handling logic * fix linter * fix tests * Refactor policy group handling and update logic. Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * Update route check by checking if group has peers Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * Refactor posture check policy linking logic Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * Simplify peer update condition in DNS management Refactor the condition for updating account peers to remove redundant checks Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * fix tests Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * fix merge Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * add policy tests Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * add posture checks tests Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * fix user and setup key tests Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * fix account and route tests Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * fix typo Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * fix nameserver tests Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * fix routes tests Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * fix group tests Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * upgrade diff package Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * fix nameserver tests Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * use generic differ for netip.Addr and netip.Prefix Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * go mod tidy Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * add peer tests Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * fix merge Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * fix management suite tests Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * fix postgres tests Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * enable diff nil structs comparison Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * skip the update only last sent the serial is larger Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * refactor peer and user Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * skip spell check for groupD Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * Refactor group, ns group, policy and posture checks Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * skip spell check for GroupD Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * update account policy check before verifying policy status Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * Update management/server/route_test.go Co-authored-by: Maycon Santos <mlsmaycon@gmail.com> * Update management/server/route_test.go Co-authored-by: Maycon Santos <mlsmaycon@gmail.com> * Update management/server/route_test.go Co-authored-by: Maycon Santos <mlsmaycon@gmail.com> * Update management/server/route_test.go Co-authored-by: Maycon Santos <mlsmaycon@gmail.com> * Update management/server/route_test.go Co-authored-by: Maycon Santos <mlsmaycon@gmail.com> * add tests missing tests for dns setting groups Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * add tests for posture checks changes Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * add ns group and policy tests Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * add route and group tests Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * increase Linux test timeout to 10 minutes Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * Run diff for client posture checks only Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * add panic recovery and detailed logging in peer update comparison Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> * Fix tests Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> --------- Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> Co-authored-by: Maycon Santos <mlsmaycon@gmail.com> --------- Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com> Co-authored-by: Maycon Santos <mlsmaycon@gmail.com>
272 lines
7.6 KiB
Go
272 lines
7.6 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"runtime/debug"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/netbirdio/netbird/management/server/differs"
|
|
"github.com/r3labs/diff/v3"
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
"github.com/netbirdio/netbird/management/proto"
|
|
"github.com/netbirdio/netbird/management/server/telemetry"
|
|
)
|
|
|
|
const channelBufferSize = 100
|
|
|
|
type UpdateMessage struct {
|
|
Update *proto.SyncResponse
|
|
NetworkMap *NetworkMap
|
|
}
|
|
|
|
type PeersUpdateManager struct {
|
|
// peerChannels is an update channel indexed by Peer.ID
|
|
peerChannels map[string]chan *UpdateMessage
|
|
// peerNetworkMaps is the UpdateMessage indexed by Peer.ID.
|
|
peerUpdateMessage map[string]*UpdateMessage
|
|
// channelsMux keeps the mutex to access peerChannels
|
|
channelsMux *sync.RWMutex
|
|
// metrics provides method to collect application metrics
|
|
metrics telemetry.AppMetrics
|
|
}
|
|
|
|
// NewPeersUpdateManager returns a new instance of PeersUpdateManager
|
|
func NewPeersUpdateManager(metrics telemetry.AppMetrics) *PeersUpdateManager {
|
|
return &PeersUpdateManager{
|
|
peerChannels: make(map[string]chan *UpdateMessage),
|
|
peerUpdateMessage: make(map[string]*UpdateMessage),
|
|
channelsMux: &sync.RWMutex{},
|
|
metrics: metrics,
|
|
}
|
|
}
|
|
|
|
// SendUpdate sends update message to the peer's channel
|
|
func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, update *UpdateMessage) {
|
|
start := time.Now()
|
|
var found, dropped bool
|
|
|
|
// skip sending sync update to the peer if there is no change in update message,
|
|
// it will not check on turn credential refresh as we do not send network map or client posture checks
|
|
if update.NetworkMap != nil {
|
|
updated := p.handlePeerMessageUpdate(ctx, peerID, update)
|
|
if !updated {
|
|
return
|
|
}
|
|
}
|
|
|
|
p.channelsMux.Lock()
|
|
|
|
defer func() {
|
|
p.channelsMux.Unlock()
|
|
if p.metrics != nil {
|
|
p.metrics.UpdateChannelMetrics().CountSendUpdateDuration(time.Since(start), found, dropped)
|
|
}
|
|
}()
|
|
|
|
if update.NetworkMap != nil {
|
|
lastSentUpdate := p.peerUpdateMessage[peerID]
|
|
if lastSentUpdate != nil && lastSentUpdate.Update.NetworkMap.GetSerial() > update.Update.NetworkMap.GetSerial() {
|
|
log.WithContext(ctx).Debugf("peer %s new network map serial: %d not greater than last sent: %d, skip sending update",
|
|
peerID, update.Update.NetworkMap.GetSerial(), lastSentUpdate.Update.NetworkMap.GetSerial())
|
|
return
|
|
}
|
|
p.peerUpdateMessage[peerID] = update
|
|
}
|
|
|
|
if channel, ok := p.peerChannels[peerID]; ok {
|
|
found = true
|
|
select {
|
|
case channel <- update:
|
|
log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID)
|
|
default:
|
|
dropped = true
|
|
log.WithContext(ctx).Warnf("channel for peer %s is %d full or closed", peerID, len(channel))
|
|
}
|
|
} else {
|
|
log.WithContext(ctx).Debugf("peer %s has no channel", peerID)
|
|
}
|
|
}
|
|
|
|
// CreateChannel creates a go channel for a given peer used to deliver updates relevant to the peer.
|
|
func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) chan *UpdateMessage {
|
|
start := time.Now()
|
|
|
|
closed := false
|
|
|
|
p.channelsMux.Lock()
|
|
defer func() {
|
|
p.channelsMux.Unlock()
|
|
if p.metrics != nil {
|
|
p.metrics.UpdateChannelMetrics().CountCreateChannelDuration(time.Since(start), closed)
|
|
}
|
|
}()
|
|
|
|
if channel, ok := p.peerChannels[peerID]; ok {
|
|
closed = true
|
|
delete(p.peerChannels, peerID)
|
|
close(channel)
|
|
delete(p.peerUpdateMessage, peerID)
|
|
}
|
|
// mbragin: todo shouldn't it be more? or configurable?
|
|
channel := make(chan *UpdateMessage, channelBufferSize)
|
|
p.peerChannels[peerID] = channel
|
|
|
|
log.WithContext(ctx).Debugf("opened updates channel for a peer %s", peerID)
|
|
|
|
return channel
|
|
}
|
|
|
|
func (p *PeersUpdateManager) closeChannel(ctx context.Context, peerID string) {
|
|
if channel, ok := p.peerChannels[peerID]; ok {
|
|
delete(p.peerChannels, peerID)
|
|
close(channel)
|
|
delete(p.peerUpdateMessage, peerID)
|
|
}
|
|
|
|
log.WithContext(ctx).Debugf("closed updates channel of a peer %s", peerID)
|
|
}
|
|
|
|
// CloseChannels closes updates channel for each given peer
|
|
func (p *PeersUpdateManager) CloseChannels(ctx context.Context, peerIDs []string) {
|
|
start := time.Now()
|
|
|
|
p.channelsMux.Lock()
|
|
defer func() {
|
|
p.channelsMux.Unlock()
|
|
if p.metrics != nil {
|
|
p.metrics.UpdateChannelMetrics().CountCloseChannelsDuration(time.Since(start), len(peerIDs))
|
|
}
|
|
}()
|
|
|
|
for _, id := range peerIDs {
|
|
p.closeChannel(ctx, id)
|
|
}
|
|
}
|
|
|
|
// CloseChannel closes updates channel of a given peer
|
|
func (p *PeersUpdateManager) CloseChannel(ctx context.Context, peerID string) {
|
|
start := time.Now()
|
|
|
|
p.channelsMux.Lock()
|
|
defer func() {
|
|
p.channelsMux.Unlock()
|
|
if p.metrics != nil {
|
|
p.metrics.UpdateChannelMetrics().CountCloseChannelDuration(time.Since(start))
|
|
}
|
|
}()
|
|
|
|
p.closeChannel(ctx, peerID)
|
|
}
|
|
|
|
// GetAllConnectedPeers returns a copy of the connected peers map
|
|
func (p *PeersUpdateManager) GetAllConnectedPeers() map[string]struct{} {
|
|
start := time.Now()
|
|
|
|
p.channelsMux.Lock()
|
|
|
|
m := make(map[string]struct{})
|
|
|
|
defer func() {
|
|
p.channelsMux.Unlock()
|
|
if p.metrics != nil {
|
|
p.metrics.UpdateChannelMetrics().CountGetAllConnectedPeersDuration(time.Since(start), len(m))
|
|
}
|
|
}()
|
|
|
|
for ID := range p.peerChannels {
|
|
m[ID] = struct{}{}
|
|
}
|
|
|
|
return m
|
|
}
|
|
|
|
// HasChannel returns true if peers has channel in update manager, otherwise false
|
|
func (p *PeersUpdateManager) HasChannel(peerID string) bool {
|
|
start := time.Now()
|
|
|
|
p.channelsMux.Lock()
|
|
|
|
defer func() {
|
|
p.channelsMux.Unlock()
|
|
if p.metrics != nil {
|
|
p.metrics.UpdateChannelMetrics().CountHasChannelDuration(time.Since(start))
|
|
}
|
|
}()
|
|
|
|
_, ok := p.peerChannels[peerID]
|
|
|
|
return ok
|
|
}
|
|
|
|
// handlePeerMessageUpdate checks if the update message for a peer is new and should be sent.
|
|
func (p *PeersUpdateManager) handlePeerMessageUpdate(ctx context.Context, peerID string, update *UpdateMessage) bool {
|
|
p.channelsMux.RLock()
|
|
lastSentUpdate := p.peerUpdateMessage[peerID]
|
|
p.channelsMux.RUnlock()
|
|
|
|
if lastSentUpdate != nil {
|
|
updated, err := isNewPeerUpdateMessage(ctx, lastSentUpdate, update)
|
|
if err != nil {
|
|
log.WithContext(ctx).Errorf("error checking for SyncResponse updates: %v", err)
|
|
return false
|
|
}
|
|
if !updated {
|
|
log.WithContext(ctx).Debugf("peer %s network map is not updated, skip sending update", peerID)
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// isNewPeerUpdateMessage checks if the given current update message is a new update that should be sent.
|
|
func isNewPeerUpdateMessage(ctx context.Context, lastSentUpdate, currUpdateToSend *UpdateMessage) (isNew bool, err error) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
log.WithContext(ctx).Panicf("comparing peer update messages. Trace: %s", debug.Stack())
|
|
isNew, err = true, nil
|
|
}
|
|
}()
|
|
|
|
if lastSentUpdate.Update.NetworkMap.GetSerial() > currUpdateToSend.Update.NetworkMap.GetSerial() {
|
|
return false, nil
|
|
}
|
|
|
|
differ, err := diff.NewDiffer(
|
|
diff.CustomValueDiffers(&differs.NetIPAddr{}),
|
|
diff.CustomValueDiffers(&differs.NetIPPrefix{}),
|
|
)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to create differ: %v", err)
|
|
}
|
|
|
|
lastSentFiles := getChecksFiles(lastSentUpdate.Update.Checks)
|
|
currFiles := getChecksFiles(currUpdateToSend.Update.Checks)
|
|
|
|
changelog, err := differ.Diff(lastSentFiles, currFiles)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to diff checks: %v", err)
|
|
}
|
|
if len(changelog) > 0 {
|
|
return true, nil
|
|
}
|
|
|
|
changelog, err = differ.Diff(lastSentUpdate.NetworkMap, currUpdateToSend.NetworkMap)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to diff network map: %v", err)
|
|
}
|
|
return len(changelog) > 0, nil
|
|
}
|
|
|
|
// getChecksFiles returns a list of files from the given checks.
|
|
func getChecksFiles(checks []*proto.Checks) []string {
|
|
files := make([]string, 0, len(checks))
|
|
for _, check := range checks {
|
|
files = append(files, check.GetFiles()...)
|
|
}
|
|
return files
|
|
}
|