Refactor peer scheduler to retry every 3 seconds on errors

Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com>
This commit is contained in:
bcmmbaga 2025-01-14 22:13:11 +03:00
parent acb5340d40
commit 84aea32118
No known key found for this signature in database
GPG Key ID: 511EED5C928AD547
2 changed files with 23 additions and 13 deletions

View File

@ -45,6 +45,7 @@ import (
const (
CacheExpirationMax = 7 * 24 * 3600 * time.Second // 7 days
CacheExpirationMin = 3 * 24 * 3600 * time.Second // 3 days
peerSchedulerRetryInterval = 3 * time.Second
emptyUserID = "empty user ID in claims"
errorGettingDomainAccIDFmt = "error getting account ID by private domain: %v"
)
@ -469,7 +470,7 @@ func (am *DefaultAccountManager) peerLoginExpirationJob(ctx context.Context, acc
expiredPeers, err := am.getExpiredPeers(ctx, accountID)
if err != nil {
return 0, false
return peerSchedulerRetryInterval, true
}
var peerIDs []string
@ -481,7 +482,7 @@ func (am *DefaultAccountManager) peerLoginExpirationJob(ctx context.Context, acc
if err := am.expireAndUpdatePeers(ctx, accountID, expiredPeers); err != nil {
log.WithContext(ctx).Errorf("failed updating account peers while expiring peers for account %s", accountID)
return 0, false
return peerSchedulerRetryInterval, true
}
return am.getNextPeerExpiration(ctx, accountID)
@ -504,7 +505,7 @@ func (am *DefaultAccountManager) peerInactivityExpirationJob(ctx context.Context
inactivePeers, err := am.getInactivePeers(ctx, accountID)
if err != nil {
log.WithContext(ctx).Errorf("failed getting inactive peers for account %s", accountID)
return 0, false
return peerSchedulerRetryInterval, true
}
var peerIDs []string
@ -516,7 +517,7 @@ func (am *DefaultAccountManager) peerInactivityExpirationJob(ctx context.Context
if err := am.expireAndUpdatePeers(ctx, accountID, inactivePeers); err != nil {
log.Errorf("failed updating account peers while expiring peers for account %s", accountID)
return 0, false
return peerSchedulerRetryInterval, true
}
return am.getNextInactivePeerExpiration(ctx, accountID)

View File

@ -335,6 +335,15 @@ func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peer
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer unlock()
user, err := am.Store.GetUserByUserID(ctx, store.LockingStrengthShare, userID)
if err != nil {
return err
}
if user.AccountID != accountID {
return status.NewUserNotPartOfAccountError()
}
peerAccountID, err := am.Store.GetAccountIDByPeerID(ctx, store.LockingStrengthShare, peerID)
if err != nil {
return err
@ -1139,6 +1148,11 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
// UpdateAccountPeer updates a single peer that belongs to an account.
// Should be called when changes need to be synced to a specific peer only.
func (am *DefaultAccountManager) UpdateAccountPeer(ctx context.Context, accountId string, peerId string) {
if !am.peersUpdateManager.HasChannel(peerId) {
log.WithContext(ctx).Tracef("peer %s doesn't have a channel, skipping network map update", peerId)
return
}
account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountId)
if err != nil {
log.WithContext(ctx).Errorf("failed to send out updates to peer %s. failed to get account: %v", peerId, err)
@ -1151,11 +1165,6 @@ func (am *DefaultAccountManager) UpdateAccountPeer(ctx context.Context, accountI
return
}
if !am.peersUpdateManager.HasChannel(peerId) {
log.WithContext(ctx).Tracef("peer %s doesn't have a channel, skipping network map update", peerId)
return
}
approvedPeersMap, err := am.integratedPeerValidator.GetValidatedPeers(account.Id, account.Groups, account.Peers, account.Settings.Extra)
if err != nil {
log.WithContext(ctx).Errorf("failed to send update to peer %s, failed to validate peers: %v", peerId, err)
@ -1185,7 +1194,7 @@ func (am *DefaultAccountManager) getNextPeerExpiration(ctx context.Context, acco
peersWithExpiry, err := am.Store.GetAccountPeersWithExpiration(ctx, store.LockingStrengthShare, accountID)
if err != nil {
log.WithContext(ctx).Errorf("failed to get peers with expiration: %v", err)
return 0, false
return peerSchedulerRetryInterval, true
}
if len(peersWithExpiry) == 0 {
@ -1195,7 +1204,7 @@ func (am *DefaultAccountManager) getNextPeerExpiration(ctx context.Context, acco
settings, err := am.Store.GetAccountSettings(ctx, store.LockingStrengthShare, accountID)
if err != nil {
log.WithContext(ctx).Errorf("failed to get account settings: %v", err)
return 0, false
return peerSchedulerRetryInterval, true
}
var nextExpiry *time.Duration
@ -1229,7 +1238,7 @@ func (am *DefaultAccountManager) getNextInactivePeerExpiration(ctx context.Conte
peersWithInactivity, err := am.Store.GetAccountPeersWithInactivity(ctx, store.LockingStrengthShare, accountID)
if err != nil {
log.WithContext(ctx).Errorf("failed to get peers with inactivity: %v", err)
return 0, false
return peerSchedulerRetryInterval, true
}
if len(peersWithInactivity) == 0 {
@ -1239,7 +1248,7 @@ func (am *DefaultAccountManager) getNextInactivePeerExpiration(ctx context.Conte
settings, err := am.Store.GetAccountSettings(ctx, store.LockingStrengthShare, accountID)
if err != nil {
log.WithContext(ctx).Errorf("failed to get account settings: %v", err)
return 0, false
return peerSchedulerRetryInterval, true
}
var nextExpiry *time.Duration