Add --tpslimit and --tpslimit-burst to limit transactions per second for HTTP

This is useful if you are being rate limited or banned by your cloud
storage provider.
This commit is contained in:
Nick Craig-Wood 2017-07-13 05:11:24 +01:00
parent ec6c3f2686
commit 6f71260acf
4 changed files with 75 additions and 0 deletions

View File

@ -566,6 +566,40 @@ If using `--syslog` this sets the syslog facility (eg `KERN`, `USER`).
See `man syslog` for a list of possible facilities. The default See `man syslog` for a list of possible facilities. The default
facility is `DAEMON`. facility is `DAEMON`.
### --tpslimit float ###
Limit HTTP transactions per second to this. Default is 0 which is used
to mean unlimited transactions per second.
For example to limit rclone to 10 HTTP transactions per second use
`--tpslimit 10`, or to 1 transaction every 2 seconds use `--tpslimit
0.5`.
Use this when the number of transactions per second from rclone is
causing a problem with the cloud storage provider (eg getting you
banned or rate limited).
This can be very useful for `rclone mount` to control the behaviour of
applications using it.
See also `--tpslimit-burst`.
### --tpslimit-burst int ###
Max burst of transactions for `--tpslimit`. (default 1)
Normally `--tpslimit` will do exactly the number of transaction per
second specified. However if you supply `--tps-burst` then rclone can
save up some transactions from when it was idle giving a burst of up
to the parameter supplied.
For example if you provide `--tpslimit-burst 10` then if rclone has
been idle for more than 10*`--tpslimit` then it can do 10 transactions
very quickly before they are limited again.
This may be used to increase performance of `--tpslimit` without
changing the long term average number of transactions per second.
### --track-renames ### ### --track-renames ###
By default, rclone doesn't keep track of renamed files, so if you By default, rclone doesn't keep track of renamed files, so if you

View File

@ -96,6 +96,8 @@ var (
backupDir = StringP("backup-dir", "", "", "Make backups into hierarchy based in DIR.") backupDir = StringP("backup-dir", "", "", "Make backups into hierarchy based in DIR.")
suffix = StringP("suffix", "", "", "Suffix for use with --backup-dir.") suffix = StringP("suffix", "", "", "Suffix for use with --backup-dir.")
useListR = BoolP("fast-list", "", false, "Use recursive list if available. Uses more memory but fewer transactions.") useListR = BoolP("fast-list", "", false, "Use recursive list if available. Uses more memory but fewer transactions.")
tpsLimit = Float64P("tpslimit", "", 0, "Limit HTTP transactions per second to this.")
tpsLimitBurst = IntP("tpslimit-burst", "", 1, "Max burst of transactions for --tpslimit.")
logLevel = LogLevelNotice logLevel = LogLevelNotice
statsLogLevel = LogLevelInfo statsLogLevel = LogLevelInfo
bwLimit BwTimetable bwLimit BwTimetable
@ -228,6 +230,8 @@ type ConfigInfo struct {
Suffix string Suffix string
UseListR bool UseListR bool
BufferSize SizeSuffix BufferSize SizeSuffix
TPSLimit float64
TPSLimitBurst int
} }
// Return the path to the configuration file // Return the path to the configuration file
@ -364,6 +368,8 @@ func LoadConfig() {
Config.BackupDir = *backupDir Config.BackupDir = *backupDir
Config.Suffix = *suffix Config.Suffix = *suffix
Config.UseListR = *useListR Config.UseListR = *useListR
Config.TPSLimit = *tpsLimit
Config.TPSLimitBurst = *tpsLimitBurst
Config.BufferSize = bufferSize Config.BufferSize = bufferSize
ConfigPath = *configFile ConfigPath = *configFile
@ -413,6 +419,9 @@ func LoadConfig() {
// Start the bandwidth update ticker // Start the bandwidth update ticker
startTokenTicker() startTokenTicker()
// Start the transactions per second limiter
startHTTPTokenBucket()
} }
var errorConfigFileNotFound = errors.New("config file not found") var errorConfigFileNotFound = errors.New("config file not found")

View File

@ -286,6 +286,15 @@ func IntP(name, shorthand string, value int, usage string) (out *int) {
return out return out
} }
// Float64P defines a flag which can be overridden by an environment variable
//
// It is a thin wrapper around pflag.Float64P
func Float64P(name, shorthand string, value float64, usage string) (out *float64) {
out = pflag.Float64P(name, shorthand, value, usage)
setDefaultFromEnv(name)
return out
}
// DurationP defines a flag which can be overridden by an environment variable // DurationP defines a flag which can be overridden by an environment variable
// //
// It is a thin wrapper around pflag.DurationP // It is a thin wrapper around pflag.DurationP

View File

@ -11,6 +11,9 @@ import (
"reflect" "reflect"
"sync" "sync"
"time" "time"
"golang.org/x/net/context" // switch to "context" when we stop supporting go1.6
"golang.org/x/time/rate"
) )
const ( const (
@ -21,8 +24,21 @@ const (
var ( var (
transport http.RoundTripper transport http.RoundTripper
noTransport sync.Once noTransport sync.Once
tpsBucket *rate.Limiter // for limiting number of http transactions per second
) )
// Start the token bucket if necessary
func startHTTPTokenBucket() {
if Config.TPSLimit > 0 {
tpsBurst := Config.TPSLimitBurst
if tpsBurst < 1 {
tpsBurst = 1
}
tpsBucket = rate.NewLimiter(rate.Limit(Config.TPSLimit), tpsBurst)
Infof(nil, "Starting HTTP transaction limiter: max %g transactions/s with burst %d", Config.TPSLimit, tpsBurst)
}
}
// A net.Conn that sets a deadline for every Read or Write operation // A net.Conn that sets a deadline for every Read or Write operation
type timeoutConn struct { type timeoutConn struct {
net.Conn net.Conn
@ -217,6 +233,13 @@ func cleanAuth(buf []byte) []byte {
// RoundTrip implements the RoundTripper interface. // RoundTrip implements the RoundTripper interface.
func (t *Transport) RoundTrip(req *http.Request) (resp *http.Response, err error) { func (t *Transport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
// Get transactions per second token first if limiting
if tpsBucket != nil {
tbErr := tpsBucket.Wait(context.Background()) // FIXME switch to req.Context() when we drop go1.6 support
if tbErr != nil {
Errorf(nil, "HTTP token bucket error: %v", err)
}
}
// Force user agent // Force user agent
req.Header.Set("User-Agent", UserAgent) req.Header.Set("User-Agent", UserAgent)
// Logf request // Logf request