diff --git a/fs/accounting/token_bucket.go b/fs/accounting/token_bucket.go index 80831c4f2..377d9af44 100644 --- a/fs/accounting/token_bucket.go +++ b/fs/accounting/token_bucket.go @@ -27,16 +27,6 @@ const ( type buckets [TokenBucketSlots]*rate.Limiter -// can't request more than this many bytes at once -// -// set small for edge bandwidth limiters, but big for core bandwidth -// limiters since we may be using both at once -var maxBurstSizes = [TokenBucketSlots]int{ - TokenBucketSlotAccounting: 4 * 1024 * 1024, - TokenBucketSlotTransportRx: 4 * 1024, - TokenBucketSlotTransportTx: 4 * 1024, -} - // tokenBucket holds info about the rate limiters in use type tokenBucket struct { mu sync.RWMutex // protects the token bucket variables @@ -63,26 +53,28 @@ func (bs *buckets) _setOff() { } } +const maxBurstSize = 4 * 1024 * 1024 // must be bigger than the biggest request + // make a new empty token bucket with the bandwidth(s) given func newTokenBucket(bandwidth fs.BwPair) (tbs buckets) { bandwidthAccounting := fs.SizeSuffix(-1) if bandwidth.Tx > 0 { - tbs[TokenBucketSlotTransportTx] = rate.NewLimiter(rate.Limit(bandwidth.Tx), maxBurstSizes[TokenBucketSlotTransportTx]) + tbs[TokenBucketSlotTransportTx] = rate.NewLimiter(rate.Limit(bandwidth.Tx), maxBurstSize) bandwidthAccounting = bandwidth.Tx } if bandwidth.Rx > 0 { - tbs[TokenBucketSlotTransportRx] = rate.NewLimiter(rate.Limit(bandwidth.Rx), maxBurstSizes[TokenBucketSlotTransportRx]) + tbs[TokenBucketSlotTransportRx] = rate.NewLimiter(rate.Limit(bandwidth.Rx), maxBurstSize) if bandwidth.Rx > bandwidthAccounting { bandwidthAccounting = bandwidth.Rx } } if bandwidthAccounting > 0 { - tbs[TokenBucketSlotAccounting] = rate.NewLimiter(rate.Limit(bandwidthAccounting), maxBurstSizes[TokenBucketSlotAccounting]) + tbs[TokenBucketSlotAccounting] = rate.NewLimiter(rate.Limit(bandwidthAccounting), maxBurstSize) } - for i, tb := range tbs { + for _, tb := range tbs { if tb != nil { // empty the bucket - err := tb.WaitN(context.Background(), maxBurstSizes[i]) + err := tb.WaitN(context.Background(), maxBurstSize) if err != nil { fs.Errorf(nil, "Failed to empty token bucket: %v", err) } @@ -157,28 +149,20 @@ func (tb *tokenBucket) StartTokenTicker(ctx context.Context) { }() } -// LimitBandwidth sleeps for the correct amount of time for the -// passage of n bytes according to the current bandwidth limit. +// LimitBandwidth sleeps for the correct amount of time for the passage +// of n bytes according to the current bandwidth limit func (tb *tokenBucket) LimitBandwidth(i TokenBucketSlot, n int) { tb.mu.RLock() - t := tb.curr[i] - maxBurstSize := maxBurstSizes[i] - tb.mu.RUnlock() // Limit the transfer speed if required - if t != nil && n > 0 { - // wait in chunks of maxBurstSize - for toWait := maxBurstSize; n > 0; n -= toWait { - if n < maxBurstSize { - toWait = n - } - err := t.WaitN(context.Background(), toWait) - if err != nil { - fs.Errorf(nil, "Token bucket error: %v", err) - } + if tb.curr[i] != nil { + err := tb.curr[i].WaitN(context.Background(), n) + if err != nil { + fs.Errorf(nil, "Token bucket error: %v", err) } } + tb.mu.RUnlock() } // SetBwLimit sets the current bandwidth limit