diff --git a/fs/accounting.go b/fs/accounting.go index 0e7f7c38e..4ab8a5bab 100644 --- a/fs/accounting.go +++ b/fs/accounting.go @@ -12,19 +12,33 @@ import ( "time" "github.com/VividCortex/ewma" - "github.com/tsenart/tb" + "golang.org/x/net/context" // switch to "context" when we stop supporting go1.6 + "golang.org/x/time/rate" ) // Globals var ( Stats = NewStats() tokenBucketMu sync.Mutex // protects the token bucket variables - tokenBucket *tb.Bucket + tokenBucket *rate.Limiter prevTokenBucket = tokenBucket currLimitMu sync.Mutex // protects changes to the timeslot currLimit BwTimeSlot ) +const maxBurstSize = 1 * 1024 * 1024 // must be bigger than the biggest request + +// make a new empty token bucket with the bandwidth given +func newTokenBucket(bandwidth SizeSuffix) *rate.Limiter { + tokenBucket = rate.NewLimiter(rate.Limit(bandwidth), maxBurstSize) + // empty the bucket + err := tokenBucket.WaitN(context.Background(), maxBurstSize) + if err != nil { + Errorf(nil, "Failed to empty token bucket: %v", err) + } + return tokenBucket +} + // Start the token bucket if necessary func startTokenBucket() { currLimitMu.Lock() @@ -32,7 +46,7 @@ func startTokenBucket() { currLimitMu.Unlock() if currLimit.bandwidth > 0 { - tokenBucket = tb.NewBucket(int64(currLimit.bandwidth), 100*time.Millisecond) + tokenBucket = newTokenBucket(currLimit.bandwidth) Infof(nil, "Starting bandwidth limiter at %vBytes/s", &currLimit.bandwidth) // Start the SIGUSR2 signal handler to toggle bandwidth. @@ -57,16 +71,10 @@ func startTokenTicker() { if currLimit.bandwidth != limitNow.bandwidth { tokenBucketMu.Lock() - if tokenBucket != nil { - err := tokenBucket.Close() - if err != nil { - Debugf(nil, "Error closing token bucket: %v", err) - } - } // Set new bandwidth. If unlimited, set tokenbucket to nil. if limitNow.bandwidth > 0 { - tokenBucket = tb.NewBucket(int64(limitNow.bandwidth), 100*time.Millisecond) + tokenBucket = newTokenBucket(limitNow.bandwidth) Logf(nil, "Scheduled bandwidth change. Limit set to %vBytes/s", &limitNow.bandwidth) } else { tokenBucket = nil @@ -446,11 +454,13 @@ func (acc *Account) read(in io.Reader, p []byte) (n int, err error) { // Get the token bucket in use tokenBucketMu.Lock() - tb := tokenBucket // Limit the transfer speed if required - if tb != nil { - tb.Wait(int64(n)) + if tokenBucket != nil { + tbErr := tokenBucket.WaitN(context.Background(), n) + if tbErr != nil { + Errorf(nil, "Token bucket error: %v", err) + } } tokenBucketMu.Unlock() return