[management] Buffer updateAccountPeers calls (#3644)

This commit is contained in:
Pascal Fischer 2025-04-11 17:21:05 +02:00 committed by GitHub
parent fd2a21c65d
commit b9f82e2f8a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 61 additions and 15 deletions

View File

@ -6,11 +6,14 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"net" "net"
"os"
"reflect" "reflect"
"regexp" "regexp"
"slices" "slices"
"strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
cacheStore "github.com/eko/gocache/lib/v4/store" cacheStore "github.com/eko/gocache/lib/v4/store"
@ -94,6 +97,9 @@ type DefaultAccountManager struct {
metrics telemetry.AppMetrics metrics telemetry.AppMetrics
permissionsManager permissions.Manager permissionsManager permissions.Manager
accountUpdateLocks sync.Map
updateAccountPeersBufferInterval atomic.Int64
} }
// getJWTGroupsChanges calculates the changes needed to sync a user's JWT groups. // getJWTGroupsChanges calculates the changes needed to sync a user's JWT groups.
@ -188,6 +194,23 @@ func BuildManager(
settingsManager: settingsManager, settingsManager: settingsManager,
permissionsManager: permissionsManager, permissionsManager: permissionsManager,
} }
var initialInterval int64
intervalStr := os.Getenv("PEER_UPDATE_INTERVAL_MS")
interval, err := strconv.Atoi(intervalStr)
if err != nil {
initialInterval = 1
} else {
initialInterval = int64(interval) * 10
go func() {
time.Sleep(30 * time.Second)
am.updateAccountPeersBufferInterval.Store(int64(time.Duration(interval) * time.Millisecond))
log.WithContext(ctx).Infof("set peer update buffer interval to %dms", interval)
}()
}
am.updateAccountPeersBufferInterval.Store(initialInterval)
log.WithContext(ctx).Infof("set peer update buffer interval to %dms", initialInterval)
accountsCounter, err := store.GetAccountsCounter(ctx) accountsCounter, err := store.GetAccountsCounter(ctx)
if err != nil { if err != nil {
log.WithContext(ctx).Error(err) log.WithContext(ctx).Error(err)
@ -1224,7 +1247,7 @@ func (am *DefaultAccountManager) SyncUserJWTGroups(ctx context.Context, userAuth
if removedGroupAffectsPeers || newGroupsAffectsPeers { if removedGroupAffectsPeers || newGroupsAffectsPeers {
log.WithContext(ctx).Tracef("user %s: JWT group membership changed, updating account peers", userAuth.UserId) log.WithContext(ctx).Tracef("user %s: JWT group membership changed, updating account peers", userAuth.UserId)
am.UpdateAccountPeers(ctx, userAuth.AccountId) am.BufferUpdateAccountPeers(ctx, userAuth.AccountId)
} }
} }
@ -1463,7 +1486,7 @@ func (am *DefaultAccountManager) GetDNSDomain() string {
func (am *DefaultAccountManager) onPeersInvalidated(ctx context.Context, accountID string) { func (am *DefaultAccountManager) onPeersInvalidated(ctx context.Context, accountID string) {
log.WithContext(ctx).Debugf("validated peers has been invalidated for account %s", accountID) log.WithContext(ctx).Debugf("validated peers has been invalidated for account %s", accountID)
am.UpdateAccountPeers(ctx, accountID) am.BufferUpdateAccountPeers(ctx, accountID)
} }
func (am *DefaultAccountManager) FindExistingPostureCheck(accountID string, checks *posture.ChecksDefinition) (*posture.Checks, error) { func (am *DefaultAccountManager) FindExistingPostureCheck(accountID string, checks *posture.ChecksDefinition) (*posture.Checks, error) {

View File

@ -74,6 +74,10 @@ func (am *DefaultAccountManager) GetPeers(ctx context.Context, accountID, userID
return []*nbpeer.Peer{}, nil return []*nbpeer.Peer{}, nil
} }
return am.getUserAccessiblePeers(ctx, accountID, peersMap, peers)
}
func (am *DefaultAccountManager) getUserAccessiblePeers(ctx context.Context, accountID string, peersMap map[string]*nbpeer.Peer, peers []*nbpeer.Peer) ([]*nbpeer.Peer, error) {
account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID) account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID)
if err != nil { if err != nil {
return nil, err return nil, err
@ -138,7 +142,7 @@ func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubK
if expired { if expired {
// we need to update other peers because when peer login expires all other peers are notified to disconnect from // we need to update other peers because when peer login expires all other peers are notified to disconnect from
// the expired one. Here we notify them that connection is now allowed again. // the expired one. Here we notify them that connection is now allowed again.
am.UpdateAccountPeers(ctx, accountID) am.BufferUpdateAccountPeers(ctx, accountID)
} }
return nil return nil
@ -382,7 +386,7 @@ func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peer
} }
if updateAccountPeers { if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID) am.BufferUpdateAccountPeers(ctx, accountID)
} }
return nil return nil
@ -652,7 +656,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
unlock = nil unlock = nil
if updateAccountPeers { if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID) am.BufferUpdateAccountPeers(ctx, accountID)
} }
return am.getValidatedPeerWithMap(ctx, false, accountID, newPeer) return am.getValidatedPeerWithMap(ctx, false, accountID, newPeer)
@ -747,7 +751,7 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync types.PeerSy
} }
if isStatusChanged || sync.UpdateAccountPeers || (updated && len(postureChecks) > 0) { if isStatusChanged || sync.UpdateAccountPeers || (updated && len(postureChecks) > 0) {
am.UpdateAccountPeers(ctx, accountID) am.BufferUpdateAccountPeers(ctx, accountID)
} }
return am.getValidatedPeerWithMap(ctx, peerNotValid, accountID, peer) return am.getValidatedPeerWithMap(ctx, peerNotValid, accountID, peer)
@ -892,7 +896,7 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.Peer
unlockPeer = nil unlockPeer = nil
if updateRemotePeers || isStatusChanged || (isPeerUpdated && len(postureChecks) > 0) { if updateRemotePeers || isStatusChanged || (isPeerUpdated && len(postureChecks) > 0) {
am.UpdateAccountPeers(ctx, accountID) am.BufferUpdateAccountPeers(ctx, accountID)
} }
return am.getValidatedPeerWithMap(ctx, isRequiresApproval, accountID, peer) return am.getValidatedPeerWithMap(ctx, isRequiresApproval, accountID, peer)
@ -1116,13 +1120,10 @@ func (am *DefaultAccountManager) GetPeer(ctx context.Context, accountID, peerID,
return peer, nil return peer, nil
} }
// it is also possible that user doesn't own the peer but some of his peers have access to it, return am.checkIfUserOwnsPeer(ctx, accountID, userID, peer)
// this is a valid case, show the peer as well. }
userPeers, err := am.Store.GetUserPeers(ctx, store.LockingStrengthShare, accountID, userID)
if err != nil {
return nil, err
}
func (am *DefaultAccountManager) checkIfUserOwnsPeer(ctx context.Context, accountID, userID string, peer *nbpeer.Peer) (*nbpeer.Peer, error) {
account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID) account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID)
if err != nil { if err != nil {
return nil, err return nil, err
@ -1133,16 +1134,23 @@ func (am *DefaultAccountManager) GetPeer(ctx context.Context, accountID, peerID,
return nil, err return nil, err
} }
// it is also possible that user doesn't own the peer but some of his peers have access to it,
// this is a valid case, show the peer as well.
userPeers, err := am.Store.GetUserPeers(ctx, store.LockingStrengthShare, accountID, userID)
if err != nil {
return nil, err
}
for _, p := range userPeers { for _, p := range userPeers {
aclPeers, _ := account.GetPeerConnectionResources(ctx, p.ID, approvedPeersMap) aclPeers, _ := account.GetPeerConnectionResources(ctx, p.ID, approvedPeersMap)
for _, aclPeer := range aclPeers { for _, aclPeer := range aclPeers {
if aclPeer.ID == peerID { if aclPeer.ID == peer.ID {
return peer, nil return peer, nil
} }
} }
} }
return nil, status.Errorf(status.Internal, "user %s has no access to peer %s under account %s", userID, peerID, accountID) return nil, status.Errorf(status.Internal, "user %s has no access to peer %s under account %s", userID, peer.ID, accountID)
} }
// UpdateAccountPeers updates all peers that belong to an account. // UpdateAccountPeers updates all peers that belong to an account.
@ -1220,6 +1228,21 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
} }
} }
func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
mu, _ := am.accountUpdateLocks.LoadOrStore(accountID, &sync.Mutex{})
lock := mu.(*sync.Mutex)
if !lock.TryLock() {
return
}
go func() {
time.Sleep(time.Duration(am.updateAccountPeersBufferInterval.Load()))
lock.Unlock()
am.UpdateAccountPeers(ctx, accountID)
}()
}
// UpdateAccountPeer updates a single peer that belongs to an account. // UpdateAccountPeer updates a single peer that belongs to an account.
// Should be called when changes need to be synced to a specific peer only. // Should be called when changes need to be synced to a specific peer only.
func (am *DefaultAccountManager) UpdateAccountPeer(ctx context.Context, accountId string, peerId string) { func (am *DefaultAccountManager) UpdateAccountPeer(ctx context.Context, accountId string, peerId string) {