diff --git a/management/server/account.go b/management/server/account.go index 7aa96e626..1627959d2 100644 --- a/management/server/account.go +++ b/management/server/account.go @@ -6,11 +6,14 @@ import ( "fmt" "math/rand" "net" + "os" "reflect" "regexp" "slices" + "strconv" "strings" "sync" + "sync/atomic" "time" cacheStore "github.com/eko/gocache/lib/v4/store" @@ -94,6 +97,9 @@ type DefaultAccountManager struct { metrics telemetry.AppMetrics permissionsManager permissions.Manager + + accountUpdateLocks sync.Map + updateAccountPeersBufferInterval atomic.Int64 } // getJWTGroupsChanges calculates the changes needed to sync a user's JWT groups. @@ -188,6 +194,23 @@ func BuildManager( settingsManager: settingsManager, permissionsManager: permissionsManager, } + + var initialInterval int64 + intervalStr := os.Getenv("PEER_UPDATE_INTERVAL_MS") + interval, err := strconv.Atoi(intervalStr) + if err != nil { + initialInterval = 1 + } else { + initialInterval = int64(interval) * 10 + go func() { + time.Sleep(30 * time.Second) + am.updateAccountPeersBufferInterval.Store(int64(time.Duration(interval) * time.Millisecond)) + log.WithContext(ctx).Infof("set peer update buffer interval to %dms", interval) + }() + } + am.updateAccountPeersBufferInterval.Store(initialInterval) + log.WithContext(ctx).Infof("set peer update buffer interval to %dms", initialInterval) + accountsCounter, err := store.GetAccountsCounter(ctx) if err != nil { log.WithContext(ctx).Error(err) @@ -1224,7 +1247,7 @@ func (am *DefaultAccountManager) SyncUserJWTGroups(ctx context.Context, userAuth if removedGroupAffectsPeers || newGroupsAffectsPeers { log.WithContext(ctx).Tracef("user %s: JWT group membership changed, updating account peers", userAuth.UserId) - am.UpdateAccountPeers(ctx, userAuth.AccountId) + am.BufferUpdateAccountPeers(ctx, userAuth.AccountId) } } @@ -1463,7 +1486,7 @@ func (am *DefaultAccountManager) GetDNSDomain() string { func (am *DefaultAccountManager) onPeersInvalidated(ctx context.Context, accountID string) { log.WithContext(ctx).Debugf("validated peers has been invalidated for account %s", accountID) - am.UpdateAccountPeers(ctx, accountID) + am.BufferUpdateAccountPeers(ctx, accountID) } func (am *DefaultAccountManager) FindExistingPostureCheck(accountID string, checks *posture.ChecksDefinition) (*posture.Checks, error) { diff --git a/management/server/peer.go b/management/server/peer.go index 846f1dc61..05e3b176b 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -74,6 +74,10 @@ func (am *DefaultAccountManager) GetPeers(ctx context.Context, accountID, userID return []*nbpeer.Peer{}, nil } + return am.getUserAccessiblePeers(ctx, accountID, peersMap, peers) +} + +func (am *DefaultAccountManager) getUserAccessiblePeers(ctx context.Context, accountID string, peersMap map[string]*nbpeer.Peer, peers []*nbpeer.Peer) ([]*nbpeer.Peer, error) { account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID) if err != nil { return nil, err @@ -138,7 +142,7 @@ func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubK if expired { // we need to update other peers because when peer login expires all other peers are notified to disconnect from // the expired one. Here we notify them that connection is now allowed again. - am.UpdateAccountPeers(ctx, accountID) + am.BufferUpdateAccountPeers(ctx, accountID) } return nil @@ -382,7 +386,7 @@ func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peer } if updateAccountPeers { - am.UpdateAccountPeers(ctx, accountID) + am.BufferUpdateAccountPeers(ctx, accountID) } return nil @@ -652,7 +656,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s unlock = nil if updateAccountPeers { - am.UpdateAccountPeers(ctx, accountID) + am.BufferUpdateAccountPeers(ctx, accountID) } return am.getValidatedPeerWithMap(ctx, false, accountID, newPeer) @@ -747,7 +751,7 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync types.PeerSy } if isStatusChanged || sync.UpdateAccountPeers || (updated && len(postureChecks) > 0) { - am.UpdateAccountPeers(ctx, accountID) + am.BufferUpdateAccountPeers(ctx, accountID) } return am.getValidatedPeerWithMap(ctx, peerNotValid, accountID, peer) @@ -892,7 +896,7 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.Peer unlockPeer = nil if updateRemotePeers || isStatusChanged || (isPeerUpdated && len(postureChecks) > 0) { - am.UpdateAccountPeers(ctx, accountID) + am.BufferUpdateAccountPeers(ctx, accountID) } return am.getValidatedPeerWithMap(ctx, isRequiresApproval, accountID, peer) @@ -1116,13 +1120,10 @@ func (am *DefaultAccountManager) GetPeer(ctx context.Context, accountID, peerID, return peer, nil } - // it is also possible that user doesn't own the peer but some of his peers have access to it, - // this is a valid case, show the peer as well. - userPeers, err := am.Store.GetUserPeers(ctx, store.LockingStrengthShare, accountID, userID) - if err != nil { - return nil, err - } + return am.checkIfUserOwnsPeer(ctx, accountID, userID, peer) +} +func (am *DefaultAccountManager) checkIfUserOwnsPeer(ctx context.Context, accountID, userID string, peer *nbpeer.Peer) (*nbpeer.Peer, error) { account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID) if err != nil { return nil, err @@ -1133,16 +1134,23 @@ func (am *DefaultAccountManager) GetPeer(ctx context.Context, accountID, peerID, return nil, err } + // it is also possible that user doesn't own the peer but some of his peers have access to it, + // this is a valid case, show the peer as well. + userPeers, err := am.Store.GetUserPeers(ctx, store.LockingStrengthShare, accountID, userID) + if err != nil { + return nil, err + } + for _, p := range userPeers { aclPeers, _ := account.GetPeerConnectionResources(ctx, p.ID, approvedPeersMap) for _, aclPeer := range aclPeers { - if aclPeer.ID == peerID { + if aclPeer.ID == peer.ID { return peer, nil } } } - return nil, status.Errorf(status.Internal, "user %s has no access to peer %s under account %s", userID, peerID, accountID) + return nil, status.Errorf(status.Internal, "user %s has no access to peer %s under account %s", userID, peer.ID, accountID) } // UpdateAccountPeers updates all peers that belong to an account. @@ -1220,6 +1228,21 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account } } +func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) { + mu, _ := am.accountUpdateLocks.LoadOrStore(accountID, &sync.Mutex{}) + lock := mu.(*sync.Mutex) + + if !lock.TryLock() { + return + } + + go func() { + time.Sleep(time.Duration(am.updateAccountPeersBufferInterval.Load())) + lock.Unlock() + am.UpdateAccountPeers(ctx, accountID) + }() +} + // UpdateAccountPeer updates a single peer that belongs to an account. // Should be called when changes need to be synced to a specific peer only. func (am *DefaultAccountManager) UpdateAccountPeer(ctx context.Context, accountId string, peerId string) {