diff --git a/go.mod b/go.mod index 7223a446b..9b5a4d256 100644 --- a/go.mod +++ b/go.mod @@ -71,6 +71,7 @@ require ( github.com/pion/transport/v3 v3.0.1 github.com/pion/turn/v3 v3.0.1 github.com/prometheus/client_golang v1.19.1 + github.com/r3labs/diff v1.1.0 github.com/r3labs/diff/v3 v3.0.1 github.com/rs/xid v1.3.0 github.com/shirou/gopsutil/v3 v3.24.4 diff --git a/go.sum b/go.sum index 5cd703bc8..1629a94b2 100644 --- a/go.sum +++ b/go.sum @@ -605,6 +605,8 @@ github.com/prometheus/common v0.53.0 h1:U2pL9w9nmJwJDa4qqLQ3ZaePJ6ZTwt7cMD3AG3+a github.com/prometheus/common v0.53.0/go.mod h1:BrxBKv3FWBIGXw89Mg1AeBq7FSyRzXWI3l3e7W3RN5U= github.com/prometheus/procfs v0.15.0 h1:A82kmvXJq2jTu5YUhSGNlYoxh85zLnKgPz4bMZgI5Ek= github.com/prometheus/procfs v0.15.0/go.mod h1:Y0RJ/Y5g5wJpkTisOtqwDSo4HwhGmLB4VQSw2sQJLHk= +github.com/r3labs/diff v1.1.0 h1:V53xhrbTHrWFWq3gI4b94AjgEJOerO1+1l0xyHOBi8M= +github.com/r3labs/diff v1.1.0/go.mod h1:7WjXasNzi0vJetRcB/RqNl5dlIsmXcTTLmF5IoH6Xig= github.com/r3labs/diff/v3 v3.0.1 h1:CBKqf3XmNRHXKmdU7mZP1w7TV0pDyVCis1AUHtA4Xtg= github.com/r3labs/diff/v3 v3.0.1/go.mod h1:f1S9bourRbiM66NskseyUdo0fTmEE0qKrikYJX63dgo= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= diff --git a/management/server/peer.go b/management/server/peer.go index 96ede1511..c7555ced2 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -990,10 +990,10 @@ func (am *DefaultAccountManager) updateAccountPeers(ctx context.Context, account customZone := account.GetPeersCustomZone(ctx, am.dnsDomain) for _, peer := range peers { - if !am.peersUpdateManager.HasChannel(peer.ID) { - log.WithContext(ctx).Tracef("peer %s doesn't have a channel, skipping network map update", peer.ID) - continue - } + // if !am.peersUpdateManager.HasChannel(peer.ID) { + // log.WithContext(ctx).Tracef("peer %s doesn't have a channel, skipping network map update", peer.ID) + // continue + // } wg.Add(1) semaphore <- struct{}{} @@ -1002,7 +1002,7 @@ func (am *DefaultAccountManager) updateAccountPeers(ctx context.Context, account defer func() { <-semaphore }() postureChecks := am.getPeerPostureChecks(account, p) - remotePeerNetworkMap := account.GetPeerNetworkMap(ctx, p.ID, customZone, approvedPeersMap, am.metrics.AccountManagerMetrics()) + remotePeerNetworkMap := account.GetPeerNetworkMap(ctx, p.ID, customZone, approvedPeersMap, nil) update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, am.GetDNSDomain(), postureChecks, dnsCache) am.peersUpdateManager.SendUpdate(ctx, p.ID, &UpdateMessage{Update: update, NetworkMap: remotePeerNetworkMap}) }(peer) diff --git a/management/server/updatechannel.go b/management/server/updatechannel.go index 6fb96c971..ff7c87fea 100644 --- a/management/server/updatechannel.go +++ b/management/server/updatechannel.go @@ -7,10 +7,11 @@ import ( "sync" "time" - "github.com/netbirdio/netbird/management/server/differs" "github.com/r3labs/diff/v3" log "github.com/sirupsen/logrus" + "github.com/netbirdio/netbird/management/server/differs" + "github.com/netbirdio/netbird/management/proto" "github.com/netbirdio/netbird/management/server/telemetry" ) @@ -76,18 +77,18 @@ func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, upda p.peerUpdateMessage[peerID] = update } - if channel, ok := p.peerChannels[peerID]; ok { - found = true - select { - case channel <- update: - log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID) - default: - dropped = true - log.WithContext(ctx).Warnf("channel for peer %s is %d full or closed", peerID, len(channel)) - } - } else { - log.WithContext(ctx).Debugf("peer %s has no channel", peerID) - } + // if channel, ok := p.peerChannels[peerID]; ok { + // found = true + // select { + // case channel <- update: + // log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID) + // default: + // dropped = true + // log.WithContext(ctx).Warnf("channel for peer %s is %d full or closed", peerID, len(channel)) + // } + // } else { + // log.WithContext(ctx).Debugf("peer %s has no channel", peerID) + // } } // CreateChannel creates a go channel for a given peer used to deliver updates relevant to the peer. @@ -207,6 +208,8 @@ func (p *PeersUpdateManager) handlePeerMessageUpdate(ctx context.Context, peerID lastSentUpdate := p.peerUpdateMessage[peerID] p.channelsMux.RUnlock() + lastSentUpdate = update + if lastSentUpdate != nil { updated, err := isNewPeerUpdateMessage(ctx, lastSentUpdate, update) if err != nil { diff --git a/management/server/updatechannel_benchmark_test.go b/management/server/updatechannel_benchmark_test.go new file mode 100644 index 000000000..c155a7bc1 --- /dev/null +++ b/management/server/updatechannel_benchmark_test.go @@ -0,0 +1,163 @@ +package server + +import ( + "context" + "fmt" + "net/netip" + "testing" + "time" + + nbdns "github.com/netbirdio/netbird/dns" + nbgroup "github.com/netbirdio/netbird/management/server/group" + nbpeer "github.com/netbirdio/netbird/management/server/peer" + route2 "github.com/netbirdio/netbird/route" +) + +func initTestAccount(b *testing.B, numPerAccount int) *Account { + b.Helper() + + account := newAccountWithId(context.Background(), "account_id", "testuser", "") + groupALL, err := account.GetGroupAll() + if err != nil { + b.Fatal(err) + } + setupKey, _ := GenerateDefaultSetupKey() + account.SetupKeys[setupKey.Key] = setupKey + for n := 0; n < numPerAccount; n++ { + netIP := randomIPv4() + peerID := fmt.Sprintf("%s-peer-%d", account.Id, n) + + peer := &nbpeer.Peer{ + ID: peerID, + Key: peerID, + IP: netIP, + Name: peerID, + DNSLabel: peerID, + UserID: userID, + Status: &nbpeer.PeerStatus{Connected: false, LastSeen: time.Now()}, + SSHEnabled: false, + } + account.Peers[peerID] = peer + group, _ := account.GetGroupAll() + group.Peers = append(group.Peers, peerID) + user := &User{ + Id: fmt.Sprintf("%s-user-%d", account.Id, n), + AccountID: account.Id, + } + account.Users[user.Id] = user + route := &route2.Route{ + ID: route2.ID(fmt.Sprintf("network-id-%d", n)), + Description: "base route", + NetID: route2.NetID(fmt.Sprintf("network-id-%d", n)), + Network: netip.MustParsePrefix(netIP.String() + "/24"), + NetworkType: route2.IPv4Network, + Metric: 9999, + Masquerade: false, + Enabled: true, + Groups: []string{groupALL.ID}, + } + account.Routes[route.ID] = route + + group = &nbgroup.Group{ + ID: fmt.Sprintf("group-id-%d", n), + AccountID: account.Id, + Name: fmt.Sprintf("group-id-%d", n), + Issued: "api", + Peers: nil, + } + account.Groups[group.ID] = group + + nameserver := &nbdns.NameServerGroup{ + ID: fmt.Sprintf("nameserver-id-%d", n), + AccountID: account.Id, + Name: fmt.Sprintf("nameserver-id-%d", n), + Description: "", + NameServers: []nbdns.NameServer{{IP: netip.MustParseAddr(netIP.String()), NSType: nbdns.UDPNameServerType}}, + Groups: []string{group.ID}, + Primary: false, + Domains: nil, + Enabled: false, + SearchDomainsEnabled: false, + } + account.NameServerGroups[nameserver.ID] = nameserver + + setupKey, _ := GenerateDefaultSetupKey() + account.SetupKeys[setupKey.Key] = setupKey + } + + group := &nbgroup.Group{ + ID: "randomID", + AccountID: account.Id, + Name: "randomName", + Issued: "api", + Peers: groupALL.Peers[:numPerAccount-1], + } + account.Groups[group.ID] = group + + account.Policies = []*Policy{ + { + ID: "RuleDefault", + Name: "Default", + Description: "This is a default rule that allows connections between all the resources", + Enabled: true, + Rules: []*PolicyRule{ + { + ID: "RuleDefault", + Name: "Default", + Description: "This is a default rule that allows connections between all the resources", + Bidirectional: true, + Enabled: true, + Protocol: PolicyRuleProtocolTCP, + Action: PolicyTrafficActionAccept, + Sources: []string{ + group.ID, + }, + Destinations: []string{ + group.ID, + }, + }, + { + ID: "RuleDefault2", + Name: "Default", + Description: "This is a default rule that allows connections between all the resources", + Bidirectional: true, + Enabled: true, + Protocol: PolicyRuleProtocolUDP, + Action: PolicyTrafficActionAccept, + Sources: []string{ + groupALL.ID, + }, + Destinations: []string{ + groupALL.ID, + }, + }, + }, + }, + } + return account +} + +// 1000 - 6717416375 ns/op +// 500 - 1732888875 ns/op +func BenchmarkTest_updateAccountPeers100(b *testing.B) { + dataDir := b.TempDir() + store, cleanUp, err := NewTestStoreFromSQL(context.Background(), "", dataDir) + b.Cleanup(cleanUp) + + um := NewPeersUpdateManager(nil) + am, err := BuildManager(context.Background(), store, um, nil, "", "netbird.selfhosted", nil, nil, false, MocIntegratedValidator{}, nil) + if err != nil { + b.Fatal(err) + } + + account := initTestAccount(b, 100) + + err = store.SaveAccount(context.Background(), account) + if err != nil { + b.Fatal(err) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + am.updateAccountPeers(context.Background(), account) + } +}