mirror of
https://github.com/rclone/rclone.git
synced 2024-12-25 00:19:13 +01:00
2e21c58e6a
This is done by making fs.Config private and attaching it to the context instead. The Config should be obtained with fs.GetConfig and fs.AddConfig should be used to get a new mutable config that can be changed.
198 lines
5.1 KiB
Go
198 lines
5.1 KiB
Go
package accounting
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/rc"
|
|
"golang.org/x/time/rate"
|
|
)
|
|
|
|
// Globals
|
|
var (
|
|
tokenBucketMu sync.Mutex // protects the token bucket variables
|
|
tokenBucket *rate.Limiter
|
|
prevTokenBucket = tokenBucket
|
|
bwLimitToggledOff = false
|
|
currLimitMu sync.Mutex // protects changes to the timeslot
|
|
currLimit fs.BwTimeSlot
|
|
)
|
|
|
|
const maxBurstSize = 4 * 1024 * 1024 // must be bigger than the biggest request
|
|
|
|
// make a new empty token bucket with the bandwidth given
|
|
func newTokenBucket(bandwidth fs.SizeSuffix) *rate.Limiter {
|
|
newTokenBucket := rate.NewLimiter(rate.Limit(bandwidth), maxBurstSize)
|
|
// empty the bucket
|
|
err := newTokenBucket.WaitN(context.Background(), maxBurstSize)
|
|
if err != nil {
|
|
fs.Errorf(nil, "Failed to empty token bucket: %v", err)
|
|
}
|
|
return newTokenBucket
|
|
}
|
|
|
|
// StartTokenBucket starts the token bucket if necessary
|
|
func StartTokenBucket(ctx context.Context) {
|
|
ci := fs.GetConfig(ctx)
|
|
currLimitMu.Lock()
|
|
currLimit := ci.BwLimit.LimitAt(time.Now())
|
|
currLimitMu.Unlock()
|
|
|
|
if currLimit.Bandwidth > 0 {
|
|
tokenBucket = newTokenBucket(currLimit.Bandwidth)
|
|
fs.Infof(nil, "Starting bandwidth limiter at %vBytes/s", &currLimit.Bandwidth)
|
|
|
|
// Start the SIGUSR2 signal handler to toggle bandwidth.
|
|
// This function does nothing in windows systems.
|
|
startSignalHandler()
|
|
}
|
|
}
|
|
|
|
// StartTokenTicker creates a ticker to update the bandwidth limiter every minute.
|
|
func StartTokenTicker(ctx context.Context) {
|
|
ci := fs.GetConfig(ctx)
|
|
// If the timetable has a single entry or was not specified, we don't need
|
|
// a ticker to update the bandwidth.
|
|
if len(ci.BwLimit) <= 1 {
|
|
return
|
|
}
|
|
|
|
ticker := time.NewTicker(time.Minute)
|
|
go func() {
|
|
for range ticker.C {
|
|
limitNow := ci.BwLimit.LimitAt(time.Now())
|
|
currLimitMu.Lock()
|
|
|
|
if currLimit.Bandwidth != limitNow.Bandwidth {
|
|
tokenBucketMu.Lock()
|
|
|
|
// If bwlimit is toggled off, the change should only
|
|
// become active on the next toggle, which causes
|
|
// an exchange of tokenBucket <-> prevTokenBucket
|
|
var targetBucket **rate.Limiter
|
|
if bwLimitToggledOff {
|
|
targetBucket = &prevTokenBucket
|
|
} else {
|
|
targetBucket = &tokenBucket
|
|
}
|
|
|
|
// Set new bandwidth. If unlimited, set tokenbucket to nil.
|
|
if limitNow.Bandwidth > 0 {
|
|
*targetBucket = newTokenBucket(limitNow.Bandwidth)
|
|
if bwLimitToggledOff {
|
|
fs.Logf(nil, "Scheduled bandwidth change. "+
|
|
"Limit will be set to %vBytes/s when toggled on again.", &limitNow.Bandwidth)
|
|
} else {
|
|
fs.Logf(nil, "Scheduled bandwidth change. Limit set to %vBytes/s", &limitNow.Bandwidth)
|
|
}
|
|
} else {
|
|
*targetBucket = nil
|
|
fs.Logf(nil, "Scheduled bandwidth change. Bandwidth limits disabled")
|
|
}
|
|
|
|
currLimit = limitNow
|
|
tokenBucketMu.Unlock()
|
|
}
|
|
currLimitMu.Unlock()
|
|
}
|
|
}()
|
|
}
|
|
|
|
// limitBandwidth sleeps for the correct amount of time for the passage
|
|
// of n bytes according to the current bandwidth limit
|
|
func limitBandwidth(n int) {
|
|
tokenBucketMu.Lock()
|
|
|
|
// Limit the transfer speed if required
|
|
if tokenBucket != nil {
|
|
err := tokenBucket.WaitN(context.Background(), n)
|
|
if err != nil {
|
|
fs.Errorf(nil, "Token bucket error: %v", err)
|
|
}
|
|
}
|
|
|
|
tokenBucketMu.Unlock()
|
|
}
|
|
|
|
// SetBwLimit sets the current bandwidth limit
|
|
func SetBwLimit(bandwidth fs.SizeSuffix) {
|
|
tokenBucketMu.Lock()
|
|
defer tokenBucketMu.Unlock()
|
|
if bandwidth > 0 {
|
|
tokenBucket = newTokenBucket(bandwidth)
|
|
fs.Logf(nil, "Bandwidth limit set to %v", bandwidth)
|
|
} else {
|
|
tokenBucket = nil
|
|
fs.Logf(nil, "Bandwidth limit reset to unlimited")
|
|
}
|
|
}
|
|
|
|
// Remote control for the token bucket
|
|
func init() {
|
|
rc.Add(rc.Call{
|
|
Path: "core/bwlimit",
|
|
Fn: func(ctx context.Context, in rc.Params) (out rc.Params, err error) {
|
|
if in["rate"] != nil {
|
|
bwlimit, err := in.GetString("rate")
|
|
if err != nil {
|
|
return out, err
|
|
}
|
|
var bws fs.BwTimetable
|
|
err = bws.Set(bwlimit)
|
|
if err != nil {
|
|
return out, errors.Wrap(err, "bad bwlimit")
|
|
}
|
|
if len(bws) != 1 {
|
|
return out, errors.New("need exactly 1 bandwidth setting")
|
|
}
|
|
bw := bws[0]
|
|
SetBwLimit(bw.Bandwidth)
|
|
}
|
|
bytesPerSecond := int64(-1)
|
|
if tokenBucket != nil {
|
|
bytesPerSecond = int64(tokenBucket.Limit())
|
|
}
|
|
out = rc.Params{
|
|
"rate": fs.SizeSuffix(bytesPerSecond).String(),
|
|
"bytesPerSecond": bytesPerSecond,
|
|
}
|
|
return out, nil
|
|
},
|
|
Title: "Set the bandwidth limit.",
|
|
Help: `
|
|
This sets the bandwidth limit to that passed in.
|
|
|
|
Eg
|
|
|
|
rclone rc core/bwlimit rate=off
|
|
{
|
|
"bytesPerSecond": -1,
|
|
"rate": "off"
|
|
}
|
|
rclone rc core/bwlimit rate=1M
|
|
{
|
|
"bytesPerSecond": 1048576,
|
|
"rate": "1M"
|
|
}
|
|
|
|
|
|
If the rate parameter is not supplied then the bandwidth is queried
|
|
|
|
rclone rc core/bwlimit
|
|
{
|
|
"bytesPerSecond": 1048576,
|
|
"rate": "1M"
|
|
}
|
|
|
|
The format of the parameter is exactly the same as passed to --bwlimit
|
|
except only one bandwidth may be specified.
|
|
|
|
In either case "rate" is returned as a human readable string, and
|
|
"bytesPerSecond" is returned as a number.
|
|
`,
|
|
})
|
|
}
|