new realization - added block for reconnecting one machine

This commit is contained in:
crn4 2025-06-19 12:24:27 +02:00
parent 0a5f751343
commit 9d0cae862b
6 changed files with 296 additions and 57 deletions

View File

@ -1530,7 +1530,7 @@ func domainIsUpToDate(domain string, domainCategory string, userAuth nbcontext.U
return domainCategory == types.PrivateCategory || userAuth.DomainCategory != types.PrivateCategory || domain != userAuth.Domain return domainCategory == types.PrivateCategory || userAuth.DomainCategory != types.PrivateCategory || domain != userAuth.Domain
} }
func (am *DefaultAccountManager) AllowSync(wgPubKey, metahash string) bool { func (am *DefaultAccountManager) AllowSync(wgPubKey string, metahash uint64) bool {
return am.loginFilter.allowLogin(wgPubKey, metahash) return am.loginFilter.allowLogin(wgPubKey, metahash)
} }
@ -1555,7 +1555,7 @@ func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID
log.WithContext(ctx).Warnf("failed marking peer as connected %s %v", peerPubKey, err) log.WithContext(ctx).Warnf("failed marking peer as connected %s %v", peerPubKey, err)
} }
metahash := metaHash(meta) metahash := metaHash(meta, realIP.String())
am.loginFilter.addLogin(peerPubKey, metahash) am.loginFilter.addLogin(peerPubKey, metahash)
return peer, netMap, postureChecks, nil return peer, netMap, postureChecks, nil

View File

@ -117,5 +117,5 @@ type Manager interface {
UpdateToPrimaryAccount(ctx context.Context, accountId string) (*types.Account, error) UpdateToPrimaryAccount(ctx context.Context, accountId string) (*types.Account, error)
GetOwnerInfo(ctx context.Context, accountId string) (*types.UserInfo, error) GetOwnerInfo(ctx context.Context, accountId string) (*types.UserInfo, error)
GetCurrentUserInfo(ctx context.Context, userAuth nbcontext.UserAuth) (*users.UserInfoWithPermissions, error) GetCurrentUserInfo(ctx context.Context, userAuth nbcontext.UserAuth) (*users.UserInfoWithPermissions, error)
AllowSync(string, string) bool AllowSync(string, uint64) bool
} }

View File

@ -141,8 +141,10 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
if err != nil { if err != nil {
return err return err
} }
realIP := getRealIP(ctx)
sRealIP := realIP.String()
peerMeta := extractPeerMeta(ctx, syncReq.GetMeta()) peerMeta := extractPeerMeta(ctx, syncReq.GetMeta())
metahashed := metaHash(peerMeta) metahashed := metaHash(peerMeta, sRealIP)
if !s.accountManager.AllowSync(peerKey.String(), metahashed) { if !s.accountManager.AllowSync(peerKey.String(), metahashed) {
return mapError(ctx, internalStatus.ErrPeerAlreadyLoggedIn) return mapError(ctx, internalStatus.ErrPeerAlreadyLoggedIn)
} }
@ -171,8 +173,7 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
// nolint:staticcheck // nolint:staticcheck
ctx = context.WithValue(ctx, nbContext.AccountIDKey, accountID) ctx = context.WithValue(ctx, nbContext.AccountIDKey, accountID)
realIP := getRealIP(ctx) log.WithContext(ctx).Debugf("Sync request from peer [%s] [%s]", req.WgPubKey, sRealIP)
log.WithContext(ctx).Debugf("Sync request from peer [%s] [%s]", req.WgPubKey, realIP.String())
if syncReq.GetMeta() == nil { if syncReq.GetMeta() == nil {
log.WithContext(ctx).Tracef("peer system meta has to be provided on sync. Peer %s, remote addr %s", peerKey.String(), realIP) log.WithContext(ctx).Tracef("peer system meta has to be provided on sync. Peer %s, remote addr %s", peerKey.String(), realIP)
@ -447,7 +448,8 @@ func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*p
s.appMetrics.GRPCMetrics().CountLoginRequest() s.appMetrics.GRPCMetrics().CountLoginRequest()
} }
realIP := getRealIP(ctx) realIP := getRealIP(ctx)
log.WithContext(ctx).Debugf("Login request from peer [%s] [%s]", req.WgPubKey, realIP.String()) sRealIP := realIP.String()
log.WithContext(ctx).Debugf("Login request from peer [%s] [%s]", req.WgPubKey, sRealIP)
loginReq := &proto.LoginRequest{} loginReq := &proto.LoginRequest{}
peerKey, err := s.parseRequest(ctx, req, loginReq) peerKey, err := s.parseRequest(ctx, req, loginReq)
@ -456,7 +458,7 @@ func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*p
} }
peerMeta := extractPeerMeta(ctx, loginReq.GetMeta()) peerMeta := extractPeerMeta(ctx, loginReq.GetMeta())
metahashed := metaHash(peerMeta) metahashed := metaHash(peerMeta, sRealIP)
if !s.accountManager.AllowSync(peerKey.String(), metahashed) { if !s.accountManager.AllowSync(peerKey.String(), metahashed) {
return nil, mapError(ctx, internalStatus.ErrPeerAlreadyLoggedIn) return nil, mapError(ctx, internalStatus.ErrPeerAlreadyLoggedIn)
} }

View File

@ -1,7 +1,7 @@
package server package server
import ( import (
"strings" "hash/fnv"
"sync" "sync"
"time" "time"
@ -11,38 +11,86 @@ import (
const ( const (
loginFilterSize = 100_000 // Size of the login filter map, making it large enough for a future loginFilterSize = 100_000 // Size of the login filter map, making it large enough for a future
filterTimeout = 5 * time.Minute // Duration to secure the previous login information in the filter filterTimeout = 5 * time.Minute // Duration to secure the previous login information in the filter
reconnTreshold = 5 * time.Minute
blockDuration = 10 * time.Minute // Duration for which a user is banned after exceeding the reconnection limit
reconnLimitForBan = 30 // Number of reconnections within the reconnTrashold that triggers a ban
) )
type config struct {
loginFilterSize int
filterTimeout time.Duration
reconnTreshold time.Duration
blockDuration time.Duration
reconnLimitForBan int
}
type loginFilter struct { type loginFilter struct {
mu sync.RWMutex mu sync.RWMutex
cfg *config
logged map[string]metahash logged map[string]metahash
} }
type metahash struct { type metahash struct {
hash string hash uint64
lastlogin time.Time counter int
banned bool
firstLogin time.Time
lastSeen time.Time
}
func initCfg() *config {
return &config{
loginFilterSize: loginFilterSize,
filterTimeout: filterTimeout,
reconnTreshold: reconnTreshold,
blockDuration: blockDuration,
reconnLimitForBan: reconnLimitForBan,
}
} }
func newLoginFilter() *loginFilter { func newLoginFilter() *loginFilter {
return newLoginFilterWithCfg(initCfg())
}
func newLoginFilterWithCfg(cfg *config) *loginFilter {
return &loginFilter{ return &loginFilter{
logged: make(map[string]metahash, loginFilterSize), logged: make(map[string]metahash, cfg.loginFilterSize),
cfg: cfg,
} }
} }
func (l *loginFilter) addLogin(wgPubKey, metaHash string) { func (l *loginFilter) addLogin(wgPubKey string, metaHash uint64) {
l.mu.Lock() l.mu.Lock()
defer l.mu.Unlock() defer l.mu.Unlock()
l.logged[wgPubKey] = metahash{ mh, ok := l.logged[wgPubKey]
if !ok || mh.banned {
mh = metahash{
hash: metaHash, hash: metaHash,
lastlogin: time.Now(), firstLogin: time.Now(),
} }
}
mh.counter++
mh.hash = metaHash
mh.lastSeen = time.Now()
if mh.counter > l.cfg.reconnLimitForBan && mh.lastSeen.Sub(mh.firstLogin) < l.cfg.reconnTreshold {
mh.banned = true
}
l.logged[wgPubKey] = mh
} }
func (l *loginFilter) allowLogin(wgPubKey, metaHash string) bool { func (l *loginFilter) allowLogin(wgPubKey string, metaHash uint64) bool {
l.mu.RLock() l.mu.RLock()
defer l.mu.RUnlock() defer l.mu.RUnlock()
if loggedMetaHash, ok := l.logged[wgPubKey]; ok { mh, ok := l.logged[wgPubKey]
return loggedMetaHash.hash == metaHash && time.Since(loggedMetaHash.lastlogin) < filterTimeout if !ok {
return true
}
if mh.banned && time.Since(mh.lastSeen) < l.cfg.blockDuration {
return false
}
if mh.hash != metaHash && time.Since(mh.lastSeen) < l.cfg.filterTimeout {
return false
} }
return true return true
} }
@ -53,16 +101,22 @@ func (l *loginFilter) removeLogin(wgPubKey string) {
delete(l.logged, wgPubKey) delete(l.logged, wgPubKey)
} }
func metaHash(meta nbpeer.PeerSystemMeta) string { func metaHash(meta nbpeer.PeerSystemMeta, pubip string) uint64 {
estimatedSize := len(meta.WtVersion) + len(meta.OSVersion) + len(meta.KernelVersion) + len(meta.Hostname) h := fnv.New64a()
var b strings.Builder if len(meta.NetworkAddresses) != 0 {
b.Grow(estimatedSize) for _, na := range meta.NetworkAddresses {
h.Write([]byte(na.Mac))
}
}
b.WriteString(meta.WtVersion) h.Write([]byte(meta.WtVersion))
b.WriteString(meta.OSVersion) h.Write([]byte(meta.OSVersion))
b.WriteString(meta.KernelVersion) h.Write([]byte(meta.KernelVersion))
b.WriteString(meta.Hostname) h.Write([]byte(meta.Hostname))
h.Write([]byte(meta.SystemSerialNumber))
h.Write([]byte(pubip))
return h.Sum64()
return b.String()
} }

View File

@ -1,47 +1,230 @@
package server package server
import ( import (
"fmt"
"hash/fnv" "hash/fnv"
"strconv"
"strings"
"testing" "testing"
"time"
"github.com/stretchr/testify/suite"
"github.com/netbirdio/netbird/management/server/peer"
nbpeer "github.com/netbirdio/netbird/management/server/peer" nbpeer "github.com/netbirdio/netbird/management/server/peer"
) )
func BenchmarkMetaHash(b *testing.B) { func testCfg() *config {
meta := peer.PeerSystemMeta{ return &config{
WtVersion: "1.0.0", filterTimeout: 20 * time.Millisecond,
OSVersion: "Linux 5.4.0", reconnTreshold: 50 * time.Millisecond,
KernelVersion: "5.4.0-42-generic", blockDuration: 100 * time.Millisecond,
Hostname: "test-host", reconnLimitForBan: 3,
} }
b.Run("fnv", func(b *testing.B) {
for i := 0; i < b.N; i++ {
metaHashFnv(meta)
}
})
b.Run("builder", func(b *testing.B) {
for i := 0; i < b.N; i++ {
metaHash(meta)
}
})
b.Run("strings", func(b *testing.B) {
for i := 0; i < b.N; i++ {
metaHashStrings(meta)
}
})
} }
func metaHashStrings(meta nbpeer.PeerSystemMeta) string { type LoginFilterTestSuite struct {
return meta.WtVersion + meta.OSVersion + meta.KernelVersion + meta.Hostname suite.Suite
filter *loginFilter
} }
func metaHashFnv(meta nbpeer.PeerSystemMeta) string { func (s *LoginFilterTestSuite) SetupTest() {
s.filter = newLoginFilterWithCfg(testCfg())
}
func TestLoginFilterTestSuite(t *testing.T) {
suite.Run(t, new(LoginFilterTestSuite))
}
func (s *LoginFilterTestSuite) TestFirstLogin() {
pubKey := "PUB_KEY_A"
meta := uint64(4353457657645)
s.True(s.filter.allowLogin(pubKey, meta), "should allow a new peer")
s.filter.addLogin(pubKey, meta)
s.Require().Contains(s.filter.logged, pubKey)
s.Equal(1, s.filter.logged[pubKey].counter)
}
func (s *LoginFilterTestSuite) TestFlappingPeerTriggersBan() {
pubKey := "PUB_KEY_A"
meta := uint64(4353457657645)
limit := s.filter.cfg.reconnLimitForBan
for range limit {
s.filter.addLogin(pubKey, meta)
}
s.True(s.filter.allowLogin(pubKey, meta), "should still allow login at the limit boundary")
s.filter.addLogin(pubKey, meta)
s.False(s.filter.allowLogin(pubKey, meta), "should deny login after exceeding the limit")
s.True(s.filter.logged[pubKey].banned, "peer should be marked as banned")
}
func (s *LoginFilterTestSuite) TestBannedPeerIsDenied() {
pubKey := "PUB_KEY_A"
meta := uint64(4353457657645)
s.filter.logged[pubKey] = metahash{
hash: meta,
banned: true,
lastSeen: time.Now(),
}
s.False(s.filter.allowLogin(pubKey, meta))
}
func (s *LoginFilterTestSuite) TestPeerIsAllowedAfterBanExpires() {
pubKey := "PUB_KEY_A"
meta := uint64(4353457657645)
s.filter.logged[pubKey] = metahash{
hash: meta,
banned: true,
lastSeen: time.Now().Add(-(s.filter.cfg.blockDuration + time.Second)),
}
s.True(s.filter.allowLogin(pubKey, meta), "should allow login after ban expires")
s.filter.addLogin(pubKey, meta)
s.Require().Contains(s.filter.logged, pubKey)
entry := s.filter.logged[pubKey]
s.False(entry.banned, "ban should be lifted on new login")
s.Equal(1, entry.counter, "counter should be reset")
}
func (s *LoginFilterTestSuite) TestDifferentHashIsBlockedWhenActive() {
pubKey := "PUB_KEY_A"
meta1 := uint64(23424223423)
meta2 := uint64(99878798987987)
s.filter.addLogin(pubKey, meta1)
s.False(s.filter.allowLogin(pubKey, meta2))
}
func (s *LoginFilterTestSuite) TestDifferentHashIsAllowedAfterTimeout() {
pubKey := "PUB_KEY_A"
meta1 := uint64(23424223423)
meta2 := uint64(99878798987987)
s.filter.addLogin(pubKey, meta1)
s.Require().Contains(s.filter.logged, pubKey)
entry := s.filter.logged[pubKey]
entry.lastSeen = time.Now().Add(-(s.filter.cfg.filterTimeout + time.Second))
s.filter.logged[pubKey] = entry
s.True(s.filter.allowLogin(pubKey, meta2))
}
func (s *LoginFilterTestSuite) TestRemovedPeerCanLogin() {
pubKey := "PUB_KEY_A"
meta := uint64(4353457657645)
s.filter.addLogin(pubKey, meta)
s.Require().Contains(s.filter.logged, pubKey)
s.filter.removeLogin(pubKey)
s.NotContains(s.filter.logged, pubKey)
s.True(s.filter.allowLogin(pubKey, meta))
}
func BenchmarkHashingMethods(b *testing.B) {
meta := nbpeer.PeerSystemMeta{
WtVersion: "1.25.1",
OSVersion: "Ubuntu 22.04.3 LTS",
KernelVersion: "5.15.0-76-generic",
Hostname: "prod-server-database-01",
SystemSerialNumber: "PC-1234567890",
NetworkAddresses: []nbpeer.NetworkAddress{{Mac: "00:1B:44:11:3A:B7"}, {Mac: "00:1B:44:11:3A:B8"}},
}
pubip := "8.8.8.8"
var resultString string
var resultUint uint64
b.Run("BuilderString", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
resultString = builderString(meta, pubip)
}
})
b.Run("FnvHashToString", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
resultString = fnvHashToString(meta, pubip)
}
})
b.Run("FnvHashToUint64 - used", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
resultUint = metaHash(meta, pubip)
}
})
_ = resultString
_ = resultUint
}
func fnvHashToString(meta nbpeer.PeerSystemMeta, pubip string) string {
h := fnv.New64a() h := fnv.New64a()
if len(meta.NetworkAddresses) != 0 {
for _, na := range meta.NetworkAddresses {
h.Write([]byte(na.Mac))
}
}
h.Write([]byte(meta.WtVersion)) h.Write([]byte(meta.WtVersion))
h.Write([]byte(meta.OSVersion)) h.Write([]byte(meta.OSVersion))
h.Write([]byte(meta.KernelVersion)) h.Write([]byte(meta.KernelVersion))
h.Write([]byte(meta.Hostname)) h.Write([]byte(meta.Hostname))
return fmt.Sprintf("%x", h.Sum64()) h.Write([]byte(meta.SystemSerialNumber))
h.Write([]byte(pubip))
return strconv.FormatUint(h.Sum64(), 16)
}
func builderString(meta nbpeer.PeerSystemMeta, pubip string) string {
mac := getMacAddress(meta.NetworkAddresses)
estimatedSize := len(meta.WtVersion) + len(meta.OSVersion) + len(meta.KernelVersion) + len(meta.Hostname) + len(meta.SystemSerialNumber) +
len(pubip) + len(mac) + 6
var b strings.Builder
b.Grow(estimatedSize)
b.WriteString(meta.WtVersion)
b.WriteByte('|')
b.WriteString(meta.OSVersion)
b.WriteByte('|')
b.WriteString(meta.KernelVersion)
b.WriteByte('|')
b.WriteString(meta.Hostname)
b.WriteByte('|')
b.WriteString(meta.SystemSerialNumber)
b.WriteByte('|')
b.WriteString(pubip)
b.WriteByte('|')
b.WriteString(mac)
return b.String()
}
func getMacAddress(nas []nbpeer.NetworkAddress) string {
if len(nas) == 0 {
return ""
}
macs := make([]string, 0, len(nas))
for _, na := range nas {
macs = append(macs, na.Mac)
}
return strings.Join(macs, "/")
} }

View File

@ -120,7 +120,7 @@ type MockAccountManager struct {
GetOrCreateAccountByPrivateDomainFunc func(ctx context.Context, initiatorId, domain string) (*types.Account, bool, error) GetOrCreateAccountByPrivateDomainFunc func(ctx context.Context, initiatorId, domain string) (*types.Account, bool, error)
AllowSyncFunc func(string, string) bool AllowSyncFunc func(string, uint64) bool
} }
func (am *MockAccountManager) UpdateAccountPeers(ctx context.Context, accountID string) { func (am *MockAccountManager) UpdateAccountPeers(ctx context.Context, accountID string) {
@ -893,6 +893,6 @@ func (am *MockAccountManager) GetCurrentUserInfo(ctx context.Context, userAuth n
return nil, status.Errorf(codes.Unimplemented, "method GetCurrentUserInfo is not implemented") return nil, status.Errorf(codes.Unimplemented, "method GetCurrentUserInfo is not implemented")
} }
func (am *MockAccountManager) AllowSync(_, _ string) bool { func (am *MockAccountManager) AllowSync(_ string, _ uint64) bool {
return true return true
} }