rclone/fs/accounting/token_bucket.go
Josh Soref d0888edc0a Spelling fixes
Fix spelling of: above, already, anonymous, associated,
authentication, bandwidth, because, between, blocks, calculate,
candidates, cautious, changelog, cleaner, clipboard, command,
completely, concurrently, considered, constructs, corrupt, current,
daemon, dependencies, deprecated, directory, dispatcher, download,
eligible, ellipsis, encrypter, endpoint, entrieslist, essentially,
existing writers, existing, expires, filesystem, flushing, frequently,
hierarchy, however, implementation, implements, inaccurate,
individually, insensitive, longer, maximum, metadata, modified,
multipart, namedirfirst, nextcloud, obscured, opened, optional,
owncloud, pacific, passphrase, password, permanently, persimmon,
positive, potato, protocol, quota, receiving, recommends, referring,
requires, revisited, satisfied, satisfies, satisfy, semver,
serialized, session, storage, strategies, stringlist, successful,
supported, surprise, temporarily, temporary, transactions, unneeded,
update, uploads, wrapped

Signed-off-by: Josh Soref <jsoref@users.noreply.github.com>
2020-10-14 15:21:31 +01:00

196 lines
5.0 KiB
Go

package accounting
import (
"context"
"sync"
"time"
"github.com/pkg/errors"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/rc"
"golang.org/x/time/rate"
)
// Globals
var (
tokenBucketMu sync.Mutex // protects the token bucket variables
tokenBucket *rate.Limiter
prevTokenBucket = tokenBucket
bwLimitToggledOff = false
currLimitMu sync.Mutex // protects changes to the timeslot
currLimit fs.BwTimeSlot
)
const maxBurstSize = 4 * 1024 * 1024 // must be bigger than the biggest request
// make a new empty token bucket with the bandwidth given
func newTokenBucket(bandwidth fs.SizeSuffix) *rate.Limiter {
newTokenBucket := rate.NewLimiter(rate.Limit(bandwidth), maxBurstSize)
// empty the bucket
err := newTokenBucket.WaitN(context.Background(), maxBurstSize)
if err != nil {
fs.Errorf(nil, "Failed to empty token bucket: %v", err)
}
return newTokenBucket
}
// StartTokenBucket starts the token bucket if necessary
func StartTokenBucket() {
currLimitMu.Lock()
currLimit := fs.Config.BwLimit.LimitAt(time.Now())
currLimitMu.Unlock()
if currLimit.Bandwidth > 0 {
tokenBucket = newTokenBucket(currLimit.Bandwidth)
fs.Infof(nil, "Starting bandwidth limiter at %vBytes/s", &currLimit.Bandwidth)
// Start the SIGUSR2 signal handler to toggle bandwidth.
// This function does nothing in windows systems.
startSignalHandler()
}
}
// StartTokenTicker creates a ticker to update the bandwidth limiter every minute.
func StartTokenTicker() {
// If the timetable has a single entry or was not specified, we don't need
// a ticker to update the bandwidth.
if len(fs.Config.BwLimit) <= 1 {
return
}
ticker := time.NewTicker(time.Minute)
go func() {
for range ticker.C {
limitNow := fs.Config.BwLimit.LimitAt(time.Now())
currLimitMu.Lock()
if currLimit.Bandwidth != limitNow.Bandwidth {
tokenBucketMu.Lock()
// If bwlimit is toggled off, the change should only
// become active on the next toggle, which causes
// an exchange of tokenBucket <-> prevTokenBucket
var targetBucket **rate.Limiter
if bwLimitToggledOff {
targetBucket = &prevTokenBucket
} else {
targetBucket = &tokenBucket
}
// Set new bandwidth. If unlimited, set tokenbucket to nil.
if limitNow.Bandwidth > 0 {
*targetBucket = newTokenBucket(limitNow.Bandwidth)
if bwLimitToggledOff {
fs.Logf(nil, "Scheduled bandwidth change. "+
"Limit will be set to %vBytes/s when toggled on again.", &limitNow.Bandwidth)
} else {
fs.Logf(nil, "Scheduled bandwidth change. Limit set to %vBytes/s", &limitNow.Bandwidth)
}
} else {
*targetBucket = nil
fs.Logf(nil, "Scheduled bandwidth change. Bandwidth limits disabled")
}
currLimit = limitNow
tokenBucketMu.Unlock()
}
currLimitMu.Unlock()
}
}()
}
// limitBandwidth sleeps for the correct amount of time for the passage
// of n bytes according to the current bandwidth limit
func limitBandwidth(n int) {
tokenBucketMu.Lock()
// Limit the transfer speed if required
if tokenBucket != nil {
err := tokenBucket.WaitN(context.Background(), n)
if err != nil {
fs.Errorf(nil, "Token bucket error: %v", err)
}
}
tokenBucketMu.Unlock()
}
// SetBwLimit sets the current bandwidth limit
func SetBwLimit(bandwidth fs.SizeSuffix) {
tokenBucketMu.Lock()
defer tokenBucketMu.Unlock()
if bandwidth > 0 {
tokenBucket = newTokenBucket(bandwidth)
fs.Logf(nil, "Bandwidth limit set to %v", bandwidth)
} else {
tokenBucket = nil
fs.Logf(nil, "Bandwidth limit reset to unlimited")
}
}
// Remote control for the token bucket
func init() {
rc.Add(rc.Call{
Path: "core/bwlimit",
Fn: func(ctx context.Context, in rc.Params) (out rc.Params, err error) {
if in["rate"] != nil {
bwlimit, err := in.GetString("rate")
if err != nil {
return out, err
}
var bws fs.BwTimetable
err = bws.Set(bwlimit)
if err != nil {
return out, errors.Wrap(err, "bad bwlimit")
}
if len(bws) != 1 {
return out, errors.New("need exactly 1 bandwidth setting")
}
bw := bws[0]
SetBwLimit(bw.Bandwidth)
}
bytesPerSecond := int64(-1)
if tokenBucket != nil {
bytesPerSecond = int64(tokenBucket.Limit())
}
out = rc.Params{
"rate": fs.SizeSuffix(bytesPerSecond).String(),
"bytesPerSecond": bytesPerSecond,
}
return out, nil
},
Title: "Set the bandwidth limit.",
Help: `
This sets the bandwidth limit to that passed in.
Eg
rclone rc core/bwlimit rate=off
{
"bytesPerSecond": -1,
"rate": "off"
}
rclone rc core/bwlimit rate=1M
{
"bytesPerSecond": 1048576,
"rate": "1M"
}
If the rate parameter is not supplied then the bandwidth is queried
rclone rc core/bwlimit
{
"bytesPerSecond": 1048576,
"rate": "1M"
}
The format of the parameter is exactly the same as passed to --bwlimit
except only one bandwidth may be specified.
In either case "rate" is returned as a human readable string, and
"bytesPerSecond" is returned as a number.
`,
})
}