From 463a18aa0713a55e12e4cb6d201144121fc60ba5 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Mon, 14 Dec 2020 21:02:20 +0000 Subject: [PATCH] fs/accounting: make edge bandwidth limiters have smaller bursts to make smoother This change decreases the edge limiter burst size which dramatically increases the smoothness of the bandwidth limiting. The core bandwidth limiter remains with a large burst so it isn't affected by double rate limiting on the edge limiters. See: #4395 See: https://forum.rclone.org/t/bwlimit-is-not-really-smooth/20947 --- fs/accounting/token_bucket.go | 44 ++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/fs/accounting/token_bucket.go b/fs/accounting/token_bucket.go index 377d9af44..80831c4f2 100644 --- a/fs/accounting/token_bucket.go +++ b/fs/accounting/token_bucket.go @@ -27,6 +27,16 @@ const ( type buckets [TokenBucketSlots]*rate.Limiter +// can't request more than this many bytes at once +// +// set small for edge bandwidth limiters, but big for core bandwidth +// limiters since we may be using both at once +var maxBurstSizes = [TokenBucketSlots]int{ + TokenBucketSlotAccounting: 4 * 1024 * 1024, + TokenBucketSlotTransportRx: 4 * 1024, + TokenBucketSlotTransportTx: 4 * 1024, +} + // tokenBucket holds info about the rate limiters in use type tokenBucket struct { mu sync.RWMutex // protects the token bucket variables @@ -53,28 +63,26 @@ func (bs *buckets) _setOff() { } } -const maxBurstSize = 4 * 1024 * 1024 // must be bigger than the biggest request - // make a new empty token bucket with the bandwidth(s) given func newTokenBucket(bandwidth fs.BwPair) (tbs buckets) { bandwidthAccounting := fs.SizeSuffix(-1) if bandwidth.Tx > 0 { - tbs[TokenBucketSlotTransportTx] = rate.NewLimiter(rate.Limit(bandwidth.Tx), maxBurstSize) + tbs[TokenBucketSlotTransportTx] = rate.NewLimiter(rate.Limit(bandwidth.Tx), maxBurstSizes[TokenBucketSlotTransportTx]) bandwidthAccounting = bandwidth.Tx } if bandwidth.Rx > 0 { - tbs[TokenBucketSlotTransportRx] = rate.NewLimiter(rate.Limit(bandwidth.Rx), maxBurstSize) + tbs[TokenBucketSlotTransportRx] = rate.NewLimiter(rate.Limit(bandwidth.Rx), maxBurstSizes[TokenBucketSlotTransportRx]) if bandwidth.Rx > bandwidthAccounting { bandwidthAccounting = bandwidth.Rx } } if bandwidthAccounting > 0 { - tbs[TokenBucketSlotAccounting] = rate.NewLimiter(rate.Limit(bandwidthAccounting), maxBurstSize) + tbs[TokenBucketSlotAccounting] = rate.NewLimiter(rate.Limit(bandwidthAccounting), maxBurstSizes[TokenBucketSlotAccounting]) } - for _, tb := range tbs { + for i, tb := range tbs { if tb != nil { // empty the bucket - err := tb.WaitN(context.Background(), maxBurstSize) + err := tb.WaitN(context.Background(), maxBurstSizes[i]) if err != nil { fs.Errorf(nil, "Failed to empty token bucket: %v", err) } @@ -149,20 +157,28 @@ func (tb *tokenBucket) StartTokenTicker(ctx context.Context) { }() } -// LimitBandwidth sleeps for the correct amount of time for the passage -// of n bytes according to the current bandwidth limit +// LimitBandwidth sleeps for the correct amount of time for the +// passage of n bytes according to the current bandwidth limit. func (tb *tokenBucket) LimitBandwidth(i TokenBucketSlot, n int) { tb.mu.RLock() + t := tb.curr[i] + maxBurstSize := maxBurstSizes[i] + tb.mu.RUnlock() // Limit the transfer speed if required - if tb.curr[i] != nil { - err := tb.curr[i].WaitN(context.Background(), n) - if err != nil { - fs.Errorf(nil, "Token bucket error: %v", err) + if t != nil && n > 0 { + // wait in chunks of maxBurstSize + for toWait := maxBurstSize; n > 0; n -= toWait { + if n < maxBurstSize { + toWait = n + } + err := t.WaitN(context.Background(), toWait) + if err != nil { + fs.Errorf(nil, "Token bucket error: %v", err) + } } } - tb.mu.RUnlock() } // SetBwLimit sets the current bandwidth limit