Replace token bucket limiter github.com/tsenart/tb with golang.org/x/time/rate

In tests tsenart/tb has proved inaccurate at low rates.
This commit is contained in:
Nick Craig-Wood 2017-07-14 05:28:04 +01:00
parent 470642f2b7
commit 62e28d0a72

View File

@ -12,19 +12,33 @@ import (
"time" "time"
"github.com/VividCortex/ewma" "github.com/VividCortex/ewma"
"github.com/tsenart/tb" "golang.org/x/net/context" // switch to "context" when we stop supporting go1.6
"golang.org/x/time/rate"
) )
// Globals // Globals
var ( var (
Stats = NewStats() Stats = NewStats()
tokenBucketMu sync.Mutex // protects the token bucket variables tokenBucketMu sync.Mutex // protects the token bucket variables
tokenBucket *tb.Bucket tokenBucket *rate.Limiter
prevTokenBucket = tokenBucket prevTokenBucket = tokenBucket
currLimitMu sync.Mutex // protects changes to the timeslot currLimitMu sync.Mutex // protects changes to the timeslot
currLimit BwTimeSlot currLimit BwTimeSlot
) )
const maxBurstSize = 1 * 1024 * 1024 // must be bigger than the biggest request
// make a new empty token bucket with the bandwidth given
func newTokenBucket(bandwidth SizeSuffix) *rate.Limiter {
tokenBucket = rate.NewLimiter(rate.Limit(bandwidth), maxBurstSize)
// empty the bucket
err := tokenBucket.WaitN(context.Background(), maxBurstSize)
if err != nil {
Errorf(nil, "Failed to empty token bucket: %v", err)
}
return tokenBucket
}
// Start the token bucket if necessary // Start the token bucket if necessary
func startTokenBucket() { func startTokenBucket() {
currLimitMu.Lock() currLimitMu.Lock()
@ -32,7 +46,7 @@ func startTokenBucket() {
currLimitMu.Unlock() currLimitMu.Unlock()
if currLimit.bandwidth > 0 { if currLimit.bandwidth > 0 {
tokenBucket = tb.NewBucket(int64(currLimit.bandwidth), 100*time.Millisecond) tokenBucket = newTokenBucket(currLimit.bandwidth)
Infof(nil, "Starting bandwidth limiter at %vBytes/s", &currLimit.bandwidth) Infof(nil, "Starting bandwidth limiter at %vBytes/s", &currLimit.bandwidth)
// Start the SIGUSR2 signal handler to toggle bandwidth. // Start the SIGUSR2 signal handler to toggle bandwidth.
@ -57,16 +71,10 @@ func startTokenTicker() {
if currLimit.bandwidth != limitNow.bandwidth { if currLimit.bandwidth != limitNow.bandwidth {
tokenBucketMu.Lock() tokenBucketMu.Lock()
if tokenBucket != nil {
err := tokenBucket.Close()
if err != nil {
Debugf(nil, "Error closing token bucket: %v", err)
}
}
// Set new bandwidth. If unlimited, set tokenbucket to nil. // Set new bandwidth. If unlimited, set tokenbucket to nil.
if limitNow.bandwidth > 0 { if limitNow.bandwidth > 0 {
tokenBucket = tb.NewBucket(int64(limitNow.bandwidth), 100*time.Millisecond) tokenBucket = newTokenBucket(limitNow.bandwidth)
Logf(nil, "Scheduled bandwidth change. Limit set to %vBytes/s", &limitNow.bandwidth) Logf(nil, "Scheduled bandwidth change. Limit set to %vBytes/s", &limitNow.bandwidth)
} else { } else {
tokenBucket = nil tokenBucket = nil
@ -446,11 +454,13 @@ func (acc *Account) read(in io.Reader, p []byte) (n int, err error) {
// Get the token bucket in use // Get the token bucket in use
tokenBucketMu.Lock() tokenBucketMu.Lock()
tb := tokenBucket
// Limit the transfer speed if required // Limit the transfer speed if required
if tb != nil { if tokenBucket != nil {
tb.Wait(int64(n)) tbErr := tokenBucket.WaitN(context.Background(), n)
if tbErr != nil {
Errorf(nil, "Token bucket error: %v", err)
}
} }
tokenBucketMu.Unlock() tokenBucketMu.Unlock()
return return