updated approach to filtering - through sync

This commit is contained in:
crn4
2025-06-16 09:40:25 +02:00
parent 5b09804a17
commit 4619d39e17
7 changed files with 106 additions and 85 deletions

View File

@ -1530,6 +1530,10 @@ func domainIsUpToDate(domain string, domainCategory string, userAuth nbcontext.U
return domainCategory == types.PrivateCategory || userAuth.DomainCategory != types.PrivateCategory || domain != userAuth.Domain return domainCategory == types.PrivateCategory || userAuth.DomainCategory != types.PrivateCategory || domain != userAuth.Domain
} }
func (am *DefaultAccountManager) AllowSync(wgPubKey, metahash string) bool {
return am.loginFilter.allowLogin(wgPubKey, metahash)
}
func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) { func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) {
start := time.Now() start := time.Now()
defer func() { defer func() {
@ -1551,6 +1555,9 @@ func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID
log.WithContext(ctx).Warnf("failed marking peer as connected %s %v", peerPubKey, err) log.WithContext(ctx).Warnf("failed marking peer as connected %s %v", peerPubKey, err)
} }
metahash := metaHash(meta)
am.loginFilter.addLogin(peerPubKey, metahash)
return peer, netMap, postureChecks, nil return peer, netMap, postureChecks, nil
} }

View File

@ -117,4 +117,5 @@ type Manager interface {
UpdateToPrimaryAccount(ctx context.Context, accountId string) (*types.Account, error) UpdateToPrimaryAccount(ctx context.Context, accountId string) (*types.Account, error)
GetOwnerInfo(ctx context.Context, accountId string) (*types.UserInfo, error) GetOwnerInfo(ctx context.Context, accountId string) (*types.UserInfo, error)
GetCurrentUserInfo(ctx context.Context, userAuth nbcontext.UserAuth) (*users.UserInfoWithPermissions, error) GetCurrentUserInfo(ctx context.Context, userAuth nbcontext.UserAuth) (*users.UserInfoWithPermissions, error)
AllowSync(string, string) bool
} }

View File

@ -141,6 +141,11 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
if err != nil { if err != nil {
return err return err
} }
peerMeta := extractPeerMeta(ctx, syncReq.GetMeta())
metahashed := metaHash(peerMeta)
if !s.accountManager.AllowSync(peerKey.String(), metahashed) {
return internalStatus.ErrPeerAlreadyLoggedIn
}
// nolint:staticcheck // nolint:staticcheck
ctx = context.WithValue(ctx, nbContext.PeerIDKey, peerKey.String()) ctx = context.WithValue(ctx, nbContext.PeerIDKey, peerKey.String())
@ -173,7 +178,7 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
log.WithContext(ctx).Tracef("peer system meta has to be provided on sync. Peer %s, remote addr %s", peerKey.String(), realIP) log.WithContext(ctx).Tracef("peer system meta has to be provided on sync. Peer %s, remote addr %s", peerKey.String(), realIP)
} }
peer, netMap, postureChecks, err := s.accountManager.SyncAndMarkPeer(ctx, accountID, peerKey.String(), extractPeerMeta(ctx, syncReq.GetMeta()), realIP) peer, netMap, postureChecks, err := s.accountManager.SyncAndMarkPeer(ctx, accountID, peerKey.String(), peerMeta, realIP)
if err != nil { if err != nil {
log.WithContext(ctx).Debugf("error while syncing peer %s: %v", peerKey.String(), err) log.WithContext(ctx).Debugf("error while syncing peer %s: %v", peerKey.String(), err)
return mapError(ctx, err) return mapError(ctx, err)

View File

@ -3,36 +3,46 @@ package server
import ( import (
"strings" "strings"
"sync" "sync"
"time"
nbpeer "github.com/netbirdio/netbird/management/server/peer" nbpeer "github.com/netbirdio/netbird/management/server/peer"
) )
const ( const (
loginFilterSize = 100_000 // Size of the login filter map, making it large enough for a future loginFilterSize = 100_000 // Size of the login filter map, making it large enough for a future
filterTimeout = 5 * time.Minute // Duration to secure the previous login information in the filter
) )
type loginFilter struct { type loginFilter struct {
mu sync.RWMutex mu sync.RWMutex
logged map[string]string logged map[string]metahash
}
type metahash struct {
hash string
lastlogin time.Time
} }
func newLoginFilter() *loginFilter { func newLoginFilter() *loginFilter {
return &loginFilter{ return &loginFilter{
logged: make(map[string]string, loginFilterSize), logged: make(map[string]metahash, loginFilterSize),
} }
} }
func (l *loginFilter) addLogin(wgPubKey, metaHash string) { func (l *loginFilter) addLogin(wgPubKey, metaHash string) {
l.mu.Lock() l.mu.Lock()
defer l.mu.Unlock() defer l.mu.Unlock()
l.logged[wgPubKey] = metaHash l.logged[wgPubKey] = metahash{
hash: metaHash,
lastlogin: time.Now(),
}
} }
func (l *loginFilter) allowLogin(wgPubKey, metaHash string) bool { func (l *loginFilter) allowLogin(wgPubKey, metaHash string) bool {
l.mu.RLock() l.mu.RLock()
defer l.mu.RUnlock() defer l.mu.RUnlock()
if loggedMetaHash, ok := l.logged[wgPubKey]; ok { if loggedMetaHash, ok := l.logged[wgPubKey]; ok {
return loggedMetaHash == metaHash return loggedMetaHash.hash == metaHash && time.Since(loggedMetaHash.lastlogin) < filterTimeout
} }
return true return true
} }

View File

@ -119,6 +119,8 @@ type MockAccountManager struct {
GetAccountMetaFunc func(ctx context.Context, accountID, userID string) (*types.AccountMeta, error) GetAccountMetaFunc func(ctx context.Context, accountID, userID string) (*types.AccountMeta, error)
GetOrCreateAccountByPrivateDomainFunc func(ctx context.Context, initiatorId, domain string) (*types.Account, bool, error) GetOrCreateAccountByPrivateDomainFunc func(ctx context.Context, initiatorId, domain string) (*types.Account, bool, error)
AllowSyncFunc func(string, string) bool
} }
func (am *MockAccountManager) UpdateAccountPeers(ctx context.Context, accountID string) { func (am *MockAccountManager) UpdateAccountPeers(ctx context.Context, accountID string) {
@ -890,3 +892,7 @@ func (am *MockAccountManager) GetCurrentUserInfo(ctx context.Context, userAuth n
} }
return nil, status.Errorf(codes.Unimplemented, "method GetCurrentUserInfo is not implemented") return nil, status.Errorf(codes.Unimplemented, "method GetCurrentUserInfo is not implemented")
} }
func (am *MockAccountManager) AllowSync(_, _ string) bool {
return true
}

View File

@ -789,11 +789,6 @@ func (am *DefaultAccountManager) handlePeerLoginNotFound(ctx context.Context, lo
// LoginPeer logs in or registers a peer. // LoginPeer logs in or registers a peer.
// If peer doesn't exist the function checks whether a setup key or a user is present and registers a new peer if so. // If peer doesn't exist the function checks whether a setup key or a user is present and registers a new peer if so.
func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.PeerLogin) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) { func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.PeerLogin) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) {
metahash := metaHash(login.Meta)
if !am.loginFilter.allowLogin(login.WireGuardPubKey, metahash) {
return nil, nil, nil, status.ErrPeerAlreadyLoggedIn
}
accountID, err := am.Store.GetAccountIDByPeerPubKey(ctx, login.WireGuardPubKey) accountID, err := am.Store.GetAccountIDByPeerPubKey(ctx, login.WireGuardPubKey)
if err != nil { if err != nil {
return am.handlePeerLoginNotFound(ctx, login, err) return am.handlePeerLoginNotFound(ctx, login, err)
@ -905,8 +900,6 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.Peer
am.BufferUpdateAccountPeers(ctx, accountID) am.BufferUpdateAccountPeers(ctx, accountID)
} }
am.loginFilter.addLogin(login.WireGuardPubKey, metahash)
return am.getValidatedPeerWithMap(ctx, isRequiresApproval, accountID, peer) return am.getValidatedPeerWithMap(ctx, isRequiresApproval, accountID, peer)
} }

View File

@ -10,7 +10,6 @@ import (
"net/netip" "net/netip"
"os" "os"
"runtime" "runtime"
"strconv"
"strings" "strings"
"testing" "testing"
"time" "time"
@ -1679,87 +1678,87 @@ func Test_LoginPeer(t *testing.T) {
} }
} }
func Test_LoginPeerMultipleAccess(t *testing.T) { // func Test_LoginPeerMultipleAccess(t *testing.T) {
if runtime.GOOS == "windows" { // if runtime.GOOS == "windows" {
t.Skip("The SQLite store is not properly supported by Windows yet") // t.Skip("The SQLite store is not properly supported by Windows yet")
} // }
s, cleanup, err := store.NewTestStoreFromSQL(context.Background(), "testdata/extended-store.sql", t.TempDir()) // s, cleanup, err := store.NewTestStoreFromSQL(context.Background(), "testdata/extended-store.sql", t.TempDir())
if err != nil { // if err != nil {
t.Fatal(err) // t.Fatal(err)
} // }
defer cleanup() // defer cleanup()
eventStore := &activity.InMemoryEventStore{} // eventStore := &activity.InMemoryEventStore{}
metrics, err := telemetry.NewDefaultAppMetrics(context.Background()) // metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
assert.NoError(t, err) // assert.NoError(t, err)
ctrl := gomock.NewController(t) // ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish) // t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl) // settingsMockManager := settings.NewMockManager(ctrl)
permissionsManager := permissions.NewManager(s) // permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager) // am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
assert.NoError(t, err) // assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b" // existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
_, err = s.GetAccount(context.Background(), existingAccountID) // _, err = s.GetAccount(context.Background(), existingAccountID)
require.NoError(t, err, "Failed to get existing account, check testdata/extended-store.sql. Account ID: %s", existingAccountID) // require.NoError(t, err, "Failed to get existing account, check testdata/extended-store.sql. Account ID: %s", existingAccountID)
setupKey := "A2C8E62B-38F5-4553-B31E-DD66C696CEBB" // setupKey := "A2C8E62B-38F5-4553-B31E-DD66C696CEBB"
peer := &nbpeer.Peer{ // peer := &nbpeer.Peer{
ID: xid.New().String(), // ID: xid.New().String(),
AccountID: existingAccountID, // AccountID: existingAccountID,
UserID: "", // UserID: "",
IP: net.IP{123, 123, 123, 123}, // IP: net.IP{123, 123, 123, 123},
Meta: nbpeer.PeerSystemMeta{ // Meta: nbpeer.PeerSystemMeta{
Hostname: "Peer", // Hostname: "Peer",
GoOS: "linux", // GoOS: "linux",
}, // },
Name: "PeerName", // Name: "PeerName",
DNSLabel: "peer.test", // DNSLabel: "peer.test",
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: time.Now()}, // Status: &nbpeer.PeerStatus{Connected: false, LastSeen: time.Now()},
SSHEnabled: false, // SSHEnabled: false,
} // }
_, _, _, err = am.AddPeer(context.Background(), setupKey, "", peer) // _, _, _, err = am.AddPeer(context.Background(), setupKey, "", peer)
require.NoError(t, err, "Expected no error when adding peer with setup key: %s", setupKey) // require.NoError(t, err, "Expected no error when adding peer with setup key: %s", setupKey)
testCases := []struct { // testCases := []struct {
name string // name string
n int // n int
}{ // }{
{ // {
name: "10 logins", // name: "10 logins",
n: 10, // n: 10,
}, // },
} // }
for _, tc := range testCases { // for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { // t.Run(tc.name, func(t *testing.T) {
actual := 1 // First login is always successful // actual := 1 // First login is always successful
for i := range tc.n { // for i := range tc.n {
loginInput := types.PeerLogin{ // loginInput := types.PeerLogin{
WireGuardPubKey: peer.ID, // WireGuardPubKey: peer.ID,
SSHKey: "test-ssh-key", // SSHKey: "test-ssh-key",
Meta: nbpeer.PeerSystemMeta{ // Meta: nbpeer.PeerSystemMeta{
Hostname: "peer" + strconv.Itoa(i), // Hostname: "peer" + strconv.Itoa(i),
}, // },
UserID: "", // UserID: "",
SetupKey: setupKey, // SetupKey: setupKey,
ConnectionIP: net.ParseIP("192.0.2.100"), // ConnectionIP: net.ParseIP("192.0.2.100"),
} // }
_, _, _, loginErr := am.LoginPeer(context.Background(), loginInput) // _, _, _, loginErr := am.LoginPeer(context.Background(), loginInput)
if loginErr != nil { // if loginErr != nil && errors.Is(loginErr, status.ErrPeerAlreadyLoggedIn) {
actual++ // actual++
} // }
time.Sleep(time.Millisecond * 100) // time.Sleep(time.Millisecond * 100)
} // }
require.Equal(t, tc.n-1, actual, "Expected %d insuccessful logins, got %d", tc.n, actual) // require.Equal(t, tc.n-1, actual, "Expected %d insuccessful logins, got %d", tc.n, actual)
}) // })
} // }
} // }
func TestPeerAccountPeersUpdate(t *testing.T) { func TestPeerAccountPeersUpdate(t *testing.T) {
manager, account, peer1, peer2, peer3 := setupNetworkMapTest(t) manager, account, peer1, peer2, peer3 := setupNetworkMapTest(t)