rename request buffer and update default interval (#2459)

This commit is contained in:
pascal-fischer 2024-08-21 11:44:52 +02:00 committed by GitHub
parent 3ed90728e6
commit 0f0415b92a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 16 additions and 14 deletions

View File

@ -161,7 +161,7 @@ type DefaultAccountManager struct {
eventStore activity.Store eventStore activity.Store
geo *geolocation.Geolocation geo *geolocation.Geolocation
cache *AccountCache requestBuffer *AccountRequestBuffer
// singleAccountMode indicates whether the instance has a single account. // singleAccountMode indicates whether the instance has a single account.
// If true, then every new user will end up under the same account. // If true, then every new user will end up under the same account.
@ -969,7 +969,7 @@ func BuildManager(
userDeleteFromIDPEnabled: userDeleteFromIDPEnabled, userDeleteFromIDPEnabled: userDeleteFromIDPEnabled,
integratedPeerValidator: integratedPeerValidator, integratedPeerValidator: integratedPeerValidator,
metrics: metrics, metrics: metrics,
cache: NewAccountCache(ctx, store), requestBuffer: NewAccountRequestBuffer(ctx, store),
} }
allAccounts := store.GetAllAccounts(ctx) allAccounts := store.GetAllAccounts(ctx)
// enable single account mode only if configured by user and number of existing accounts is not grater than 1 // enable single account mode only if configured by user and number of existing accounts is not grater than 1

View File

@ -21,7 +21,7 @@ type AccountResult struct {
Err error Err error
} }
type AccountCache struct { type AccountRequestBuffer struct {
store Store store Store
getAccountRequests map[string][]*AccountRequest getAccountRequests map[string][]*AccountRequest
mu sync.Mutex mu sync.Mutex
@ -29,17 +29,19 @@ type AccountCache struct {
bufferInterval time.Duration bufferInterval time.Duration
} }
func NewAccountCache(ctx context.Context, store Store) *AccountCache { func NewAccountRequestBuffer(ctx context.Context, store Store) *AccountRequestBuffer {
bufferIntervalStr := os.Getenv("NB_GET_ACCOUNT_BUFFER_INTERVAL") bufferIntervalStr := os.Getenv("NB_GET_ACCOUNT_BUFFER_INTERVAL")
bufferInterval, err := time.ParseDuration(bufferIntervalStr) bufferInterval, err := time.ParseDuration(bufferIntervalStr)
if err != nil && bufferIntervalStr != "" { if err != nil {
log.WithContext(ctx).Warnf("failed to parse account cache buffer interval: %s", err) if bufferIntervalStr != "" {
bufferInterval = 300 * time.Millisecond log.WithContext(ctx).Warnf("failed to parse account request buffer interval: %s", err)
}
bufferInterval = 100 * time.Millisecond
} }
log.WithContext(ctx).Infof("set account cache buffer interval to %s", bufferInterval) log.WithContext(ctx).Infof("set account request buffer interval to %s", bufferInterval)
ac := AccountCache{ ac := AccountRequestBuffer{
store: store, store: store,
getAccountRequests: make(map[string][]*AccountRequest), getAccountRequests: make(map[string][]*AccountRequest),
getAccountRequestCh: make(chan *AccountRequest), getAccountRequestCh: make(chan *AccountRequest),
@ -50,7 +52,7 @@ func NewAccountCache(ctx context.Context, store Store) *AccountCache {
return &ac return &ac
} }
func (ac *AccountCache) GetAccountWithBackpressure(ctx context.Context, accountID string) (*Account, error) { func (ac *AccountRequestBuffer) GetAccountWithBackpressure(ctx context.Context, accountID string) (*Account, error) {
req := &AccountRequest{ req := &AccountRequest{
AccountID: accountID, AccountID: accountID,
ResultChan: make(chan *AccountResult, 1), ResultChan: make(chan *AccountResult, 1),
@ -65,7 +67,7 @@ func (ac *AccountCache) GetAccountWithBackpressure(ctx context.Context, accountI
return result.Account, result.Err return result.Account, result.Err
} }
func (ac *AccountCache) processGetAccountBatch(ctx context.Context, accountID string) { func (ac *AccountRequestBuffer) processGetAccountBatch(ctx context.Context, accountID string) {
ac.mu.Lock() ac.mu.Lock()
requests := ac.getAccountRequests[accountID] requests := ac.getAccountRequests[accountID]
delete(ac.getAccountRequests, accountID) delete(ac.getAccountRequests, accountID)
@ -86,7 +88,7 @@ func (ac *AccountCache) processGetAccountBatch(ctx context.Context, accountID st
} }
} }
func (ac *AccountCache) processGetAccountRequests(ctx context.Context) { func (ac *AccountRequestBuffer) processGetAccountRequests(ctx context.Context) {
for { for {
select { select {
case req := <-ac.getAccountRequestCh: case req := <-ac.getAccountRequestCh:

View File

@ -654,7 +654,7 @@ func Test_LoginPerformance(t *testing.T) {
// {"M", 250, 1}, // {"M", 250, 1},
// {"L", 500, 1}, // {"L", 500, 1},
// {"XL", 750, 1}, // {"XL", 750, 1},
{"XXL", 1000, 5}, {"XXL", 2000, 1},
} }
log.SetOutput(io.Discard) log.SetOutput(io.Discard)

View File

@ -714,7 +714,7 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login PeerLogin)
unlockPeer() unlockPeer()
unlockPeer = nil unlockPeer = nil
account, err := am.cache.GetAccountWithBackpressure(ctx, accountID) account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }