pacer: make pacer more flexible

Make the pacer package more flexible by extracting the pace calculation
functions into a separate interface. This also allows to move features
that require the fs package like logging and custom errors into the fs
package.

Also add a RetryAfterError sentinel error that can be used to signal a
desired retry time to the Calculator.
This commit is contained in:
Fabian Möller 2019-02-09 21:52:15 +01:00 committed by Nick Craig-Wood
parent 9ed721a3f6
commit 61616ba864
21 changed files with 822 additions and 631 deletions

View File

@ -155,7 +155,7 @@ type Fs struct {
noAuthClient *http.Client // unauthenticated http client
root string // the path we are working on
dirCache *dircache.DirCache // Map of directory path to directory id
pacer *pacer.Pacer // pacer for API calls
pacer *fs.Pacer // pacer for API calls
trueRootID string // ID of true root directory
tokenRenewer *oauthutil.Renew // renew the token on expiry
}
@ -273,7 +273,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
root: root,
opt: *opt,
c: c,
pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.AmazonCloudDrivePacer),
pacer: fs.NewPacer(pacer.NewAmazonCloudDrive(pacer.MinSleep(minSleep))),
noAuthClient: fshttp.NewClient(fs.Config),
}
f.features = (&fs.Features{

View File

@ -144,7 +144,7 @@ type Fs struct {
containerOKMu sync.Mutex // mutex to protect container OK
containerOK bool // true if we have created the container
containerDeleted bool // true if we have deleted the container
pacer *pacer.Pacer // To pace and retry the API calls
pacer *fs.Pacer // To pace and retry the API calls
uploadToken *pacer.TokenDispenser // control concurrency
}
@ -347,7 +347,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
opt: *opt,
container: container,
root: directory,
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant).SetPacer(pacer.S3Pacer),
pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers),
client: fshttp.NewClient(fs.Config),
}

View File

@ -167,7 +167,7 @@ type Fs struct {
uploadMu sync.Mutex // lock for upload variable
uploads []*api.GetUploadURLResponse // result of get upload URL calls
authMu sync.Mutex // lock for authorizing the account
pacer *pacer.Pacer // To pace and retry the API calls
pacer *fs.Pacer // To pace and retry the API calls
bufferTokens chan []byte // control concurrency of multipart uploads
}
@ -251,13 +251,7 @@ func (f *Fs) shouldRetryNoReauth(resp *http.Response, err error) (bool, error) {
fs.Errorf(f, "Malformed %s header %q: %v", retryAfterHeader, retryAfterString, err)
}
}
retryAfterDuration := time.Duration(retryAfter) * time.Second
if f.pacer.GetSleep() < retryAfterDuration {
fs.Debugf(f, "Setting sleep to %v after error: %v", retryAfterDuration, err)
// We set 1/2 the value here because the pacer will double it immediately
f.pacer.SetSleep(retryAfterDuration / 2)
}
return true, err
return true, pacer.RetryAfterError(err, time.Duration(retryAfter)*time.Second)
}
return fserrors.ShouldRetry(err) || fserrors.ShouldRetryHTTP(resp, retryErrorCodes), err
}
@ -363,7 +357,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
bucket: bucket,
root: directory,
srv: rest.NewClient(fshttp.NewClient(fs.Config)).SetErrorHandler(errorHandler),
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
}
f.features = (&fs.Features{
ReadMimeType: true,

View File

@ -111,7 +111,7 @@ type Fs struct {
features *fs.Features // optional features
srv *rest.Client // the connection to the one drive server
dirCache *dircache.DirCache // Map of directory path to directory id
pacer *pacer.Pacer // pacer for API calls
pacer *fs.Pacer // pacer for API calls
tokenRenewer *oauthutil.Renew // renew the token on expiry
uploadToken *pacer.TokenDispenser // control concurrency
}
@ -260,7 +260,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
root: root,
opt: *opt,
srv: rest.NewClient(oAuthClient).SetRoot(rootURL),
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers),
}
f.features = (&fs.Features{

View File

@ -426,7 +426,7 @@ type Fs struct {
client *http.Client // authorized client
rootFolderID string // the id of the root folder
dirCache *dircache.DirCache // Map of directory path to directory id
pacer *pacer.Pacer // To pace the API calls
pacer *fs.Pacer // To pace the API calls
exportExtensions []string // preferred extensions to download docs
importMimeTypes []string // MIME types to convert to docs
isTeamDrive bool // true if this is a team drive
@ -789,8 +789,8 @@ func configTeamDrive(opt *Options, m configmap.Mapper, name string) error {
}
// newPacer makes a pacer configured for drive
func newPacer(opt *Options) *pacer.Pacer {
return pacer.New().SetMinSleep(time.Duration(opt.PacerMinSleep)).SetBurst(opt.PacerBurst).SetPacer(pacer.GoogleDrivePacer)
func newPacer(opt *Options) *fs.Pacer {
return fs.NewPacer(pacer.NewGoogleDrive(pacer.MinSleep(opt.PacerMinSleep), pacer.Burst(opt.PacerBurst)))
}
func getServiceAccountClient(opt *Options, credentialsData []byte) (*http.Client, error) {

View File

@ -160,7 +160,7 @@ type Fs struct {
team team.Client // for the Teams API
slashRoot string // root with "/" prefix, lowercase
slashRootSlash string // root with "/" prefix and postfix, lowercase
pacer *pacer.Pacer // To pace the API calls
pacer *fs.Pacer // To pace the API calls
ns string // The namespace we are using or "" for none
}
@ -209,7 +209,7 @@ func shouldRetry(err error) (bool, error) {
case auth.RateLimitAPIError:
if e.RateLimitError.RetryAfter > 0 {
fs.Debugf(baseErrString, "Too many requests or write operations. Trying again in %d seconds.", e.RateLimitError.RetryAfter)
time.Sleep(time.Duration(e.RateLimitError.RetryAfter) * time.Second)
err = pacer.RetryAfterError(err, time.Duration(e.RateLimitError.RetryAfter)*time.Second)
}
return true, err
}
@ -273,7 +273,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
f := &Fs{
name: name,
opt: *opt,
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
}
config := dropbox.Config{
LogLevel: dropbox.LogOff, // logging in the SDK: LogOff, LogDebug, LogInfo

View File

@ -256,7 +256,7 @@ type Fs struct {
bucket string // the bucket we are working on
bucketOKMu sync.Mutex // mutex to protect bucket OK
bucketOK bool // true if we have created the bucket
pacer *pacer.Pacer // To pace the API calls
pacer *fs.Pacer // To pace the API calls
}
// Object describes a storage object
@ -395,7 +395,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
bucket: bucket,
root: directory,
opt: *opt,
pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.GoogleDrivePacer),
pacer: fs.NewPacer(pacer.NewGoogleDrive(pacer.MinSleep(minSleep))),
}
f.features = (&fs.Features{
ReadMimeType: true,

View File

@ -190,7 +190,7 @@ type Fs struct {
endpointURL string
srv *rest.Client
apiSrv *rest.Client
pacer *pacer.Pacer
pacer *fs.Pacer
tokenRenewer *oauthutil.Renew // renew the token on expiry
}
@ -403,7 +403,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
opt: *opt,
srv: rest.NewClient(oAuthClient).SetRoot(rootURL),
apiSrv: rest.NewClient(oAuthClient).SetRoot(apiURL),
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
}
f.features = (&fs.Features{
CaseInsensitive: true,

View File

@ -98,7 +98,7 @@ type Fs struct {
opt Options // parsed config options
features *fs.Features // optional features
srv *mega.Mega // the connection to the server
pacer *pacer.Pacer // pacer for API calls
pacer *fs.Pacer // pacer for API calls
rootNodeMu sync.Mutex // mutex for _rootNode
_rootNode *mega.Node // root node - call findRoot to use this
mkdirMu sync.Mutex // used to serialize calls to mkdir / rmdir
@ -217,7 +217,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
root: root,
opt: *opt,
srv: srv,
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
}
f.features = (&fs.Features{
DuplicateFiles: true,

View File

@ -261,7 +261,7 @@ type Fs struct {
features *fs.Features // optional features
srv *rest.Client // the connection to the one drive server
dirCache *dircache.DirCache // Map of directory path to directory id
pacer *pacer.Pacer // pacer for API calls
pacer *fs.Pacer // pacer for API calls
tokenRenewer *oauthutil.Renew // renew the token on expiry
driveID string // ID to use for querying Microsoft Graph
driveType string // https://developer.microsoft.com/en-us/graph/docs/api-reference/v1.0/resources/drive
@ -475,7 +475,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
driveID: opt.DriveID,
driveType: opt.DriveType,
srv: rest.NewClient(oAuthClient).SetRoot(graphURL + "/drives/" + opt.DriveID),
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
}
f.features = (&fs.Features{
CaseInsensitive: true,

View File

@ -65,7 +65,7 @@ type Fs struct {
opt Options // parsed options
features *fs.Features // optional features
srv *rest.Client // the connection to the server
pacer *pacer.Pacer // To pace and retry the API calls
pacer *fs.Pacer // To pace and retry the API calls
session UserSessionInfo // contains the session data
dirCache *dircache.DirCache // Map of directory path to directory id
}
@ -144,7 +144,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
root: root,
opt: *opt,
srv: rest.NewClient(fshttp.NewClient(fs.Config)).SetErrorHandler(errorHandler),
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
}
f.dirCache = dircache.New(root, "0", f)

View File

@ -95,7 +95,7 @@ type Fs struct {
features *fs.Features // optional features
srv *rest.Client // the connection to the server
dirCache *dircache.DirCache // Map of directory path to directory id
pacer *pacer.Pacer // pacer for API calls
pacer *fs.Pacer // pacer for API calls
tokenRenewer *oauthutil.Renew // renew the token on expiry
}
@ -254,7 +254,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
root: root,
opt: *opt,
srv: rest.NewClient(oAuthClient).SetRoot(rootURL),
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
}
f.features = (&fs.Features{
CaseInsensitive: false,

View File

@ -782,7 +782,7 @@ type Fs struct {
bucketOKMu sync.Mutex // mutex to protect bucket OK
bucketOK bool // true if we have created the bucket
bucketDeleted bool // true if we have deleted the bucket
pacer *pacer.Pacer // To pace the API calls
pacer *fs.Pacer // To pace the API calls
srv *http.Client // a plain http client
}
@ -1055,7 +1055,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
c: c,
bucket: bucket,
ses: ses,
pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.S3Pacer),
pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep))),
srv: fshttp.NewClient(fs.Config),
}
f.features = (&fs.Features{

View File

@ -216,7 +216,7 @@ type Fs struct {
containerOK bool // true if we have created the container
segmentsContainer string // container to store the segments (if any) in
noCheckContainer bool // don't check the container before creating it
pacer *pacer.Pacer // To pace the API calls
pacer *fs.Pacer // To pace the API calls
}
// Object describes a swift object
@ -401,7 +401,7 @@ func NewFsWithConnection(opt *Options, name, root string, c *swift.Connection, n
segmentsContainer: container + "_segments",
root: directory,
noCheckContainer: noCheckContainer,
pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.S3Pacer),
pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep))),
}
f.features = (&fs.Features{
ReadMimeType: true,

View File

@ -101,7 +101,7 @@ type Fs struct {
endpoint *url.URL // URL of the host
endpointURL string // endpoint as a string
srv *rest.Client // the connection to the one drive server
pacer *pacer.Pacer // pacer for API calls
pacer *fs.Pacer // pacer for API calls
precision time.Duration // mod time precision
canStream bool // set if can stream
useOCMtime bool // set if can use X-OC-Mtime
@ -318,7 +318,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
endpoint: u,
endpointURL: u.String(),
srv: rest.NewClient(fshttp.NewClient(fs.Config)).SetRoot(u.String()),
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
precision: fs.ModTimeNotSupported,
}
f.features = (&fs.Features{

View File

@ -93,7 +93,7 @@ type Fs struct {
opt Options // parsed options
features *fs.Features // optional features
srv *rest.Client // the connection to the yandex server
pacer *pacer.Pacer // pacer for API calls
pacer *fs.Pacer // pacer for API calls
diskRoot string // root path with "disk:/" container name
}
@ -269,7 +269,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
name: name,
opt: *opt,
srv: rest.NewClient(oAuthClient).SetRoot(rootURL),
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
}
f.setRoot(root)
f.features = (&fs.Features{

View File

@ -16,8 +16,10 @@ import (
"github.com/ncw/rclone/fs/config/configmap"
"github.com/ncw/rclone/fs/config/configstruct"
"github.com/ncw/rclone/fs/fserrors"
"github.com/ncw/rclone/fs/fspath"
"github.com/ncw/rclone/fs/hash"
"github.com/ncw/rclone/lib/pacer"
"github.com/pkg/errors"
)
@ -1112,3 +1114,90 @@ func GetModifyWindow(fss ...Info) time.Duration {
}
return window
}
// Pacer is a simple wrapper around a pacer.Pacer with logging.
type Pacer struct {
*pacer.Pacer
}
type logCalculator struct {
pacer.Calculator
}
// NewPacer creates a Pacer for the given Fs and Calculator.
func NewPacer(c pacer.Calculator) *Pacer {
p := &Pacer{
Pacer: pacer.New(
pacer.InvokerOption(pacerInvoker),
pacer.MaxConnectionsOption(Config.Checkers+Config.Transfers),
pacer.RetriesOption(Config.LowLevelRetries),
pacer.CalculatorOption(c),
),
}
p.SetCalculator(c)
return p
}
func (d *logCalculator) Calculate(state pacer.State) time.Duration {
type causer interface {
Cause() error
}
if c, ok := state.LastError.(causer); ok {
state.LastError = c.Cause()
} else {
Logf("pacer", "Invalid error in fs.Pacer: %t", state.LastError)
}
oldSleepTime := state.SleepTime
newSleepTime := d.Calculator.Calculate(state)
if state.ConsecutiveRetries > 0 {
if newSleepTime != oldSleepTime {
Debugf("pacer", "Rate limited, increasing sleep to %v", newSleepTime)
}
} else {
if newSleepTime != oldSleepTime {
Debugf("pacer", "Reducing sleep to %v", newSleepTime)
}
}
return newSleepTime
}
// SetCalculator sets the pacing algorithm. Don't modify the Calculator object
// afterwards, use the ModifyCalculator method when needed.
//
// It will choose the default algorithm if nil is passed in.
func (p *Pacer) SetCalculator(c pacer.Calculator) {
switch c.(type) {
case *logCalculator:
Logf("pacer", "Invalid Calculator in fs.Pacer.SetCalculator")
case nil:
c = &logCalculator{pacer.NewDefault()}
default:
c = &logCalculator{c}
}
p.Pacer.SetCalculator(c)
}
// ModifyCalculator calls the given function with the currently configured
// Calculator and the Pacer lock held.
func (p *Pacer) ModifyCalculator(f func(pacer.Calculator)) {
p.ModifyCalculator(func(c pacer.Calculator) {
switch _c := c.(type) {
case *logCalculator:
f(_c.Calculator)
default:
Logf("pacer", "Invalid Calculator in fs.Pacer: %t", c)
f(c)
}
})
}
func pacerInvoker(try, retries int, f pacer.Paced) (retry bool, err error) {
retry, err = f()
if retry {
Debugf("pacer", "low level retry %d/%d (error %v)", try, retries, err)
err = fserrors.RetryError(err)
}
return
}

View File

@ -2,8 +2,15 @@ package fs
import (
"strings"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/ncw/rclone/fs/fserrors"
"github.com/ncw/rclone/lib/pacer"
"github.com/pkg/errors"
"github.com/spf13/pflag"
"github.com/stretchr/testify/assert"
)
@ -70,3 +77,47 @@ func TestOption(t *testing.T) {
err = d.Set("sdfsdf")
assert.Error(t, err)
}
var errFoo = errors.New("foo")
type dummyPaced struct {
retry bool
called int
wait *sync.Cond
}
func (dp *dummyPaced) fn() (bool, error) {
if dp.wait != nil {
dp.wait.L.Lock()
dp.wait.Wait()
dp.wait.L.Unlock()
}
dp.called++
return dp.retry, errFoo
}
func TestPacerCall(t *testing.T) {
expectedCalled := Config.LowLevelRetries
if expectedCalled == 0 {
expectedCalled = 20
Config.LowLevelRetries = expectedCalled
defer func() {
Config.LowLevelRetries = 0
}()
}
p := NewPacer(pacer.NewDefault(pacer.MinSleep(1*time.Millisecond), pacer.MaxSleep(2*time.Millisecond)))
dp := &dummyPaced{retry: true}
err := p.Call(dp.fn)
require.Equal(t, expectedCalled, dp.called)
require.Implements(t, (*fserrors.Retrier)(nil), err)
}
func TestPacerCallNoRetry(t *testing.T) {
p := NewPacer(pacer.NewDefault(pacer.MinSleep(1*time.Millisecond), pacer.MaxSleep(2*time.Millisecond)))
dp := &dummyPaced{retry: true}
err := p.CallNoRetry(dp.fn)
require.Equal(t, 1, dp.called)
require.Implements(t, (*fserrors.Retrier)(nil), err)
}

View File

@ -2,74 +2,69 @@
package pacer
import (
"context"
"math/rand"
"sync"
"time"
"github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fs/fserrors"
"golang.org/x/time/rate"
"github.com/ncw/rclone/lib/errors"
)
// Pacer state
type Pacer struct {
mu sync.Mutex // Protecting read/writes
minSleep time.Duration // minimum sleep time
maxSleep time.Duration // maximum sleep time
burst int // number of calls to send without rate limiting
limiter *rate.Limiter // rate limiter for the minsleep
decayConstant uint // decay constant
attackConstant uint // attack constant
pacer chan struct{} // To pace the operations
sleepTime time.Duration // Time to sleep for each transaction
retries int // Max number of retries
maxConnections int // Maximum number of concurrent connections
connTokens chan struct{} // Connection tokens
calculatePace func(bool) // switchable pacing algorithm - call with mu held
consecutiveRetries int // number of consecutive retries
// State represents the public Pacer state that will be passed to the
// configured Calculator
type State struct {
SleepTime time.Duration // current time to sleep before adding the pacer token back
ConsecutiveRetries int // number of consecutive retries, will be 0 when the last invoker call returned false
LastError error // the error returned by the last invoker call or nil
}
// Type is for selecting different pacing algorithms
type Type int
// Calculator is a generic calculation function for a Pacer.
type Calculator interface {
// Calculate takes the current Pacer state and returns the sleep time after which
// the next Pacer call will be done.
Calculate(state State) time.Duration
}
const (
// DefaultPacer is a truncated exponential attack and decay.
//
// On retries the sleep time is doubled, on non errors then
// sleeptime decays according to the decay constant as set
// with SetDecayConstant.
//
// The sleep never goes below that set with SetMinSleep or
// above that set with SetMaxSleep.
DefaultPacer = Type(iota)
// Pacer is the primary type of the pacer package. It allows to retry calls
// with a configurable delay in between.
type Pacer struct {
pacerOptions
mu sync.Mutex // Protecting read/writes
pacer chan struct{} // To pace the operations
connTokens chan struct{} // Connection tokens
state State
}
type pacerOptions struct {
maxConnections int // Maximum number of concurrent connections
retries int // Max number of retries
calculator Calculator // switchable pacing algorithm - call with mu held
invoker InvokerFunc // wrapper function used to invoke the target function
}
// AmazonCloudDrivePacer is a specialised pacer for Amazon Drive
//
// It implements a truncated exponential backoff strategy with
// randomization. Normally operations are paced at the
// interval set with SetMinSleep. On errors the sleep timer
// is set to 0..2**retries seconds.
//
// See https://developer.amazon.com/public/apis/experience/cloud-drive/content/restful-api-best-practices
AmazonCloudDrivePacer
// InvokerFunc is the signature of the wrapper function used to invoke the
// target function in Pacer.
type InvokerFunc func(try, tries int, f Paced) (bool, error)
// GoogleDrivePacer is a specialised pacer for Google Drive
//
// It implements a truncated exponential backoff strategy with
// randomization. Normally operations are paced at the
// interval set with SetMinSleep. On errors the sleep timer
// is set to (2 ^ n) + random_number_milliseconds seconds
//
// See https://developers.google.com/drive/v2/web/handle-errors#exponential-backoff
GoogleDrivePacer
// Option can be used in New to configure the Pacer.
type Option func(*pacerOptions)
// S3Pacer is a specialised pacer for S3
//
// It is basically the defaultPacer, but allows the sleep time to go to 0
// when things are going well.
S3Pacer
)
// CalculatorOption sets a Calculator for the new Pacer.
func CalculatorOption(c Calculator) Option {
return func(p *pacerOptions) { p.calculator = c }
}
// RetriesOption sets the retries number for the new Pacer.
func RetriesOption(retries int) Option {
return func(p *pacerOptions) { p.retries = retries }
}
// MaxConnectionsOption sets the maximum connections number for the new Pacer.
func MaxConnectionsOption(maxConnections int) Option {
return func(p *pacerOptions) { p.maxConnections = maxConnections }
}
// InvokerOption sets a InvokerFunc for the new Pacer.
func InvokerOption(invoker InvokerFunc) Option {
return func(p *pacerOptions) { p.invoker = invoker }
}
// Paced is a function which is called by the Call and CallNoRetry
// methods. It should return a boolean, true if it would like to be
@ -77,19 +72,27 @@ const (
// wrapped in a RetryError.
type Paced func() (bool, error)
// New returns a Pacer with sensible defaults
func New() *Pacer {
// New returns a Pacer with sensible defaults.
func New(options ...Option) *Pacer {
opts := pacerOptions{
maxConnections: 10,
retries: 3,
}
for _, o := range options {
o(&opts)
}
p := &Pacer{
maxSleep: 2 * time.Second,
decayConstant: 2,
attackConstant: 1,
retries: fs.Config.LowLevelRetries,
pacerOptions: opts,
pacer: make(chan struct{}, 1),
}
p.sleepTime = p.minSleep
p.SetPacer(DefaultPacer)
p.SetMaxConnections(fs.Config.Checkers + fs.Config.Transfers)
p.SetMinSleep(10 * time.Millisecond)
if p.calculator == nil {
p.SetCalculator(nil)
}
p.state.SleepTime = p.calculator.Calculate(p.state)
if p.invoker == nil {
p.invoker = invoke
}
p.SetMaxConnections(p.maxConnections)
// Put the first pacing token in
p.pacer <- struct{}{}
@ -97,54 +100,11 @@ func New() *Pacer {
return p
}
// SetSleep sets the current sleep time
func (p *Pacer) SetSleep(t time.Duration) *Pacer {
p.mu.Lock()
defer p.mu.Unlock()
p.sleepTime = t
return p
}
// GetSleep gets the current sleep time
func (p *Pacer) GetSleep() time.Duration {
p.mu.Lock()
defer p.mu.Unlock()
return p.sleepTime
}
// SetMinSleep sets the minimum sleep time for the pacer
func (p *Pacer) SetMinSleep(t time.Duration) *Pacer {
p.mu.Lock()
defer p.mu.Unlock()
p.minSleep = t
p.sleepTime = p.minSleep
p.limiter = rate.NewLimiter(rate.Every(p.minSleep), p.burst)
return p
}
// SetBurst sets the burst with no limiting of the pacer
func (p *Pacer) SetBurst(n int) *Pacer {
p.mu.Lock()
defer p.mu.Unlock()
p.burst = n
p.limiter = rate.NewLimiter(rate.Every(p.minSleep), p.burst)
return p
}
// SetMaxSleep sets the maximum sleep time for the pacer
func (p *Pacer) SetMaxSleep(t time.Duration) *Pacer {
p.mu.Lock()
defer p.mu.Unlock()
p.maxSleep = t
p.sleepTime = p.minSleep
return p
}
// SetMaxConnections sets the maximum number of concurrent connections.
// Setting the value to 0 will allow unlimited number of connections.
// Should not be changed once you have started calling the pacer.
// By default this will be set to fs.Config.Checkers.
func (p *Pacer) SetMaxConnections(n int) *Pacer {
func (p *Pacer) SetMaxConnections(n int) {
p.mu.Lock()
defer p.mu.Unlock()
p.maxConnections = n
@ -156,61 +116,34 @@ func (p *Pacer) SetMaxConnections(n int) *Pacer {
p.connTokens <- struct{}{}
}
}
return p
}
// SetDecayConstant sets the decay constant for the pacer
//
// This is the speed the time falls back to the minimum after errors
// have occurred.
//
// bigger for slower decay, exponential. 1 is halve, 0 is go straight to minimum
func (p *Pacer) SetDecayConstant(decay uint) *Pacer {
p.mu.Lock()
defer p.mu.Unlock()
p.decayConstant = decay
return p
}
// SetAttackConstant sets the attack constant for the pacer
//
// This is the speed the time grows from the minimum after errors have
// occurred.
//
// bigger for slower attack, 1 is double, 0 is go straight to maximum
func (p *Pacer) SetAttackConstant(attack uint) *Pacer {
p.mu.Lock()
defer p.mu.Unlock()
p.attackConstant = attack
return p
}
// SetRetries sets the max number of tries for Call
func (p *Pacer) SetRetries(retries int) *Pacer {
// SetRetries sets the max number of retries for Call
func (p *Pacer) SetRetries(retries int) {
p.mu.Lock()
defer p.mu.Unlock()
p.retries = retries
return p
}
// SetPacer sets the pacing algorithm
// SetCalculator sets the pacing algorithm. Don't modify the Calculator object
// afterwards, use the ModifyCalculator method when needed.
//
// It will choose the default algorithm if an incorrect value is
// passed in.
func (p *Pacer) SetPacer(t Type) *Pacer {
// It will choose the default algorithm if nil is passed in.
func (p *Pacer) SetCalculator(c Calculator) {
p.mu.Lock()
defer p.mu.Unlock()
switch t {
case AmazonCloudDrivePacer:
p.calculatePace = p.acdPacer
case GoogleDrivePacer:
p.calculatePace = p.drivePacer
case S3Pacer:
p.calculatePace = p.s3Pacer
default:
p.calculatePace = p.defaultPacer
if c == nil {
c = NewDefault()
}
return p
p.calculator = c
}
// ModifyCalculator calls the given function with the currently configured
// Calculator and the Pacer lock held.
func (p *Pacer) ModifyCalculator(f func(Calculator)) {
p.mu.Lock()
f(p.calculator)
p.mu.Unlock()
}
// Start a call to the API
@ -230,170 +163,29 @@ func (p *Pacer) beginCall() {
p.mu.Lock()
// Restart the timer
go func(sleepTime, minSleep time.Duration) {
// fs.Debugf(f, "New sleep for %v at %v", t, time.Now())
// Sleep the minimum time with the rate limiter
if minSleep > 0 && sleepTime >= minSleep {
_ = p.limiter.Wait(context.Background())
sleepTime -= minSleep
}
// Then sleep the remaining time
if sleepTime > 0 {
time.Sleep(sleepTime)
}
go func(t time.Duration) {
time.Sleep(t)
p.pacer <- struct{}{}
}(p.sleepTime, p.minSleep)
}(p.state.SleepTime)
p.mu.Unlock()
}
// exponentialImplementation implements a exponentialImplementation up
// and down pacing algorithm
//
// See the description for DefaultPacer
//
// This should calculate a new sleepTime. It takes a boolean as to
// whether the operation should be retried or not.
//
// Call with p.mu held
func (p *Pacer) defaultPacer(retry bool) {
oldSleepTime := p.sleepTime
if retry {
if p.attackConstant == 0 {
p.sleepTime = p.maxSleep
} else {
p.sleepTime = (p.sleepTime << p.attackConstant) / ((1 << p.attackConstant) - 1)
}
if p.sleepTime > p.maxSleep {
p.sleepTime = p.maxSleep
}
if p.sleepTime != oldSleepTime {
fs.Debugf("pacer", "Rate limited, increasing sleep to %v", p.sleepTime)
}
} else {
p.sleepTime = (p.sleepTime<<p.decayConstant - p.sleepTime) >> p.decayConstant
if p.sleepTime < p.minSleep {
p.sleepTime = p.minSleep
}
if p.sleepTime != oldSleepTime {
fs.Debugf("pacer", "Reducing sleep to %v", p.sleepTime)
}
}
}
// acdPacer implements a truncated exponential backoff
// strategy with randomization for Amazon Drive
//
// See the description for AmazonCloudDrivePacer
//
// This should calculate a new sleepTime. It takes a boolean as to
// whether the operation should be retried or not.
//
// Call with p.mu held
func (p *Pacer) acdPacer(retry bool) {
consecutiveRetries := p.consecutiveRetries
if consecutiveRetries == 0 {
if p.sleepTime != p.minSleep {
p.sleepTime = p.minSleep
fs.Debugf("pacer", "Resetting sleep to minimum %v on success", p.sleepTime)
}
} else {
if consecutiveRetries > 9 {
consecutiveRetries = 9
}
// consecutiveRetries starts at 1 so
// maxSleep is 2**(consecutiveRetries-1) seconds
maxSleep := time.Second << uint(consecutiveRetries-1)
// actual sleep is random from 0..maxSleep
p.sleepTime = time.Duration(rand.Int63n(int64(maxSleep)))
if p.sleepTime < p.minSleep {
p.sleepTime = p.minSleep
}
fs.Debugf("pacer", "Rate limited, sleeping for %v (%d consecutive low level retries)", p.sleepTime, p.consecutiveRetries)
}
}
// drivePacer implements a truncated exponential backoff strategy with
// randomization for Google Drive
//
// See the description for GoogleDrivePacer
//
// This should calculate a new sleepTime. It takes a boolean as to
// whether the operation should be retried or not.
//
// Call with p.mu held
func (p *Pacer) drivePacer(retry bool) {
consecutiveRetries := p.consecutiveRetries
if consecutiveRetries == 0 {
if p.sleepTime != p.minSleep {
p.sleepTime = p.minSleep
fs.Debugf("pacer", "Resetting sleep to minimum %v on success", p.sleepTime)
}
} else {
if consecutiveRetries > 5 {
consecutiveRetries = 5
}
// consecutiveRetries starts at 1 so go from 1,2,3,4,5,5 => 1,2,4,8,16,16
// maxSleep is 2**(consecutiveRetries-1) seconds + random milliseconds
p.sleepTime = time.Second<<uint(consecutiveRetries-1) + time.Duration(rand.Int63n(int64(time.Second)))
fs.Debugf("pacer", "Rate limited, sleeping for %v (%d consecutive low level retries)", p.sleepTime, p.consecutiveRetries)
}
}
// s3Pacer implements a pacer compatible with our expectations of S3, where it tries to not
// delay at all between successful calls, but backs off in the default fashion in response
// to any errors.
// The assumption is that errors should be exceedingly rare (S3 seems to have largely solved
// the sort of scability questions rclone is likely to run into), and in the happy case
// it can handle calls with no delays between them.
//
// Basically defaultPacer, but with some handling of sleepTime going to/from 0ms
// Ignores minSleep entirely
//
// Call with p.mu held
func (p *Pacer) s3Pacer(retry bool) {
oldSleepTime := p.sleepTime
if retry {
if p.attackConstant == 0 {
p.sleepTime = p.maxSleep
} else {
if p.sleepTime == 0 {
p.sleepTime = p.minSleep
} else {
p.sleepTime = (p.sleepTime << p.attackConstant) / ((1 << p.attackConstant) - 1)
}
}
if p.sleepTime > p.maxSleep {
p.sleepTime = p.maxSleep
}
if p.sleepTime != oldSleepTime {
fs.Debugf("pacer", "Rate limited, increasing sleep to %v", p.sleepTime)
}
} else {
p.sleepTime = (p.sleepTime<<p.decayConstant - p.sleepTime) >> p.decayConstant
if p.sleepTime < p.minSleep {
p.sleepTime = 0
}
if p.sleepTime != oldSleepTime {
fs.Debugf("pacer", "Reducing sleep to %v", p.sleepTime)
}
}
}
// endCall implements the pacing algorithm
//
// This should calculate a new sleepTime. It takes a boolean as to
// whether the operation should be retried or not.
func (p *Pacer) endCall(retry bool) {
func (p *Pacer) endCall(retry bool, err error) {
if p.maxConnections > 0 {
p.connTokens <- struct{}{}
}
p.mu.Lock()
if retry {
p.consecutiveRetries++
p.state.ConsecutiveRetries++
} else {
p.consecutiveRetries = 0
p.state.ConsecutiveRetries = 0
}
p.calculatePace(retry)
p.state.LastError = err
p.state.SleepTime = p.calculator.Calculate(p.state)
p.mu.Unlock()
}
@ -402,15 +194,11 @@ func (p *Pacer) call(fn Paced, retries int) (err error) {
var retry bool
for i := 1; i <= retries; i++ {
p.beginCall()
retry, err = fn()
p.endCall(retry)
retry, err = p.invoker(i, retries, fn)
p.endCall(retry, err)
if !retry {
break
}
fs.Debugf("pacer", "low level retry %d/%d (error %v)", i, retries, err)
}
if retry {
err = fserrors.RetryError(err)
}
return err
}
@ -436,3 +224,41 @@ func (p *Pacer) Call(fn Paced) (err error) {
func (p *Pacer) CallNoRetry(fn Paced) error {
return p.call(fn, 1)
}
func invoke(try, tries int, f Paced) (bool, error) {
return f()
}
type retryAfterError struct {
error
retryAfter time.Duration
}
func (r *retryAfterError) Error() string {
return r.error.Error()
}
func (r *retryAfterError) Cause() error {
return r.error
}
// RetryAfterError returns a wrapped error that can be used by Calculator implementations
func RetryAfterError(err error, retryAfter time.Duration) error {
return &retryAfterError{
error: err,
retryAfter: retryAfter,
}
}
// IsRetryAfter returns true if the the error or any of it's Cause's is an error
// returned by RetryAfterError. It also returns the associated Duration if possible.
func IsRetryAfter(err error) (retryAfter time.Duration, isRetryAfter bool) {
errors.Walk(err, func(err error) bool {
if r, ok := err.(*retryAfterError); ok {
retryAfter, isRetryAfter = r.retryAfter, true
return true
}
return false
})
return
}

View File

@ -1,181 +1,85 @@
package pacer
import (
"fmt"
"sync"
"testing"
"time"
"github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fs/fserrors"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)
func TestNew(t *testing.T) {
const expectedRetries = 7
fs.Config.LowLevelRetries = expectedRetries
p := New()
if p.minSleep != 10*time.Millisecond {
t.Errorf("minSleep")
}
if p.maxSleep != 2*time.Second {
t.Errorf("maxSleep")
}
if p.sleepTime != p.minSleep {
t.Errorf("sleepTime")
}
if p.retries != expectedRetries {
t.Errorf("retries want %v got %v", expectedRetries, p.retries)
}
if p.decayConstant != 2 {
t.Errorf("decayConstant")
}
if p.attackConstant != 1 {
t.Errorf("attackConstant")
}
if cap(p.pacer) != 1 {
t.Errorf("pacer 1")
}
if len(p.pacer) != 1 {
t.Errorf("pacer 2")
}
if fmt.Sprintf("%p", p.calculatePace) != fmt.Sprintf("%p", p.defaultPacer) {
t.Errorf("calculatePace")
}
if p.maxConnections != fs.Config.Checkers+fs.Config.Transfers {
t.Errorf("maxConnections")
}
if cap(p.connTokens) != fs.Config.Checkers+fs.Config.Transfers {
t.Errorf("connTokens")
}
if p.consecutiveRetries != 0 {
t.Errorf("consecutiveRetries")
}
}
func TestSetSleep(t *testing.T) {
p := New().SetSleep(2 * time.Millisecond)
if p.sleepTime != 2*time.Millisecond {
t.Errorf("didn't set")
}
}
func TestGetSleep(t *testing.T) {
p := New().SetSleep(2 * time.Millisecond)
if p.GetSleep() != 2*time.Millisecond {
t.Errorf("didn't get")
}
}
func TestSetMinSleep(t *testing.T) {
p := New().SetMinSleep(1 * time.Millisecond)
if p.minSleep != 1*time.Millisecond {
t.Errorf("didn't set")
}
}
func TestSetMaxSleep(t *testing.T) {
p := New().SetMaxSleep(100 * time.Second)
if p.maxSleep != 100*time.Second {
t.Errorf("didn't set")
const expectedConnections = 9
p := New(RetriesOption(expectedRetries), MaxConnectionsOption(expectedConnections))
if d, ok := p.calculator.(*Default); ok {
assert.Equal(t, 10*time.Millisecond, d.minSleep)
assert.Equal(t, 2*time.Second, d.maxSleep)
assert.Equal(t, d.minSleep, p.state.SleepTime)
assert.Equal(t, uint(2), d.decayConstant)
assert.Equal(t, uint(1), d.attackConstant)
} else {
t.Errorf("calculator")
}
assert.Equal(t, expectedRetries, p.retries)
assert.Equal(t, 1, cap(p.pacer))
assert.Equal(t, 1, len(p.pacer))
assert.Equal(t, expectedConnections, p.maxConnections)
assert.Equal(t, expectedConnections, cap(p.connTokens))
assert.Equal(t, 0, p.state.ConsecutiveRetries)
}
func TestMaxConnections(t *testing.T) {
p := New().SetMaxConnections(20)
if p.maxConnections != 20 {
t.Errorf("maxConnections")
}
if cap(p.connTokens) != 20 {
t.Errorf("connTokens")
}
p := New()
p.SetMaxConnections(20)
assert.Equal(t, 20, p.maxConnections)
assert.Equal(t, 20, cap(p.connTokens))
p.SetMaxConnections(0)
if p.maxConnections != 0 {
t.Errorf("maxConnections is not 0")
}
if p.connTokens != nil {
t.Errorf("connTokens is not nil")
}
}
func TestSetDecayConstant(t *testing.T) {
p := New().SetDecayConstant(17)
if p.decayConstant != 17 {
t.Errorf("didn't set")
}
assert.Equal(t, 0, p.maxConnections)
assert.Nil(t, p.connTokens)
}
func TestDecay(t *testing.T) {
p := New().SetMinSleep(time.Microsecond).SetPacer(DefaultPacer).SetMaxSleep(time.Second)
c := NewDefault(MinSleep(1*time.Microsecond), MaxSleep(1*time.Second))
for _, test := range []struct {
in time.Duration
in State
attackConstant uint
want time.Duration
}{
{8 * time.Millisecond, 1, 4 * time.Millisecond},
{1 * time.Millisecond, 0, time.Microsecond},
{1 * time.Millisecond, 2, (3 * time.Millisecond) / 4},
{1 * time.Millisecond, 3, (7 * time.Millisecond) / 8},
{State{SleepTime: 8 * time.Millisecond}, 1, 4 * time.Millisecond},
{State{SleepTime: 1 * time.Millisecond}, 0, 1 * time.Microsecond},
{State{SleepTime: 1 * time.Millisecond}, 2, (3 * time.Millisecond) / 4},
{State{SleepTime: 1 * time.Millisecond}, 3, (7 * time.Millisecond) / 8},
} {
p.sleepTime = test.in
p.SetDecayConstant(test.attackConstant)
p.defaultPacer(false)
got := p.sleepTime
if got != test.want {
t.Errorf("bad sleep want %v got %v", test.want, got)
}
}
}
func TestSetAttackConstant(t *testing.T) {
p := New().SetAttackConstant(19)
if p.attackConstant != 19 {
t.Errorf("didn't set")
c.decayConstant = test.attackConstant
got := c.Calculate(test.in)
assert.Equal(t, test.want, got, "test: %+v", test)
}
}
func TestAttack(t *testing.T) {
p := New().SetMinSleep(time.Microsecond).SetPacer(DefaultPacer).SetMaxSleep(time.Second)
c := NewDefault(MinSleep(1*time.Microsecond), MaxSleep(1*time.Second))
for _, test := range []struct {
in time.Duration
in State
attackConstant uint
want time.Duration
}{
{1 * time.Millisecond, 1, 2 * time.Millisecond},
{1 * time.Millisecond, 0, time.Second},
{1 * time.Millisecond, 2, (4 * time.Millisecond) / 3},
{1 * time.Millisecond, 3, (8 * time.Millisecond) / 7},
{State{SleepTime: 1 * time.Millisecond, ConsecutiveRetries: 1}, 1, 2 * time.Millisecond},
{State{SleepTime: 1 * time.Millisecond, ConsecutiveRetries: 1}, 0, 1 * time.Second},
{State{SleepTime: 1 * time.Millisecond, ConsecutiveRetries: 1}, 2, (4 * time.Millisecond) / 3},
{State{SleepTime: 1 * time.Millisecond, ConsecutiveRetries: 1}, 3, (8 * time.Millisecond) / 7},
} {
p.sleepTime = test.in
p.SetAttackConstant(test.attackConstant)
p.defaultPacer(true)
got := p.sleepTime
if got != test.want {
t.Errorf("bad sleep want %v got %v", test.want, got)
c.attackConstant = test.attackConstant
got := c.Calculate(test.in)
assert.Equal(t, test.want, got, "test: %+v", test)
}
}
}
func TestSetRetries(t *testing.T) {
p := New().SetRetries(18)
if p.retries != 18 {
t.Errorf("didn't set")
}
}
func TestSetPacer(t *testing.T) {
p := New().SetPacer(AmazonCloudDrivePacer)
if fmt.Sprintf("%p", p.calculatePace) != fmt.Sprintf("%p", p.acdPacer) {
t.Errorf("calculatePace is not acdPacer")
}
p.SetPacer(GoogleDrivePacer)
if fmt.Sprintf("%p", p.calculatePace) != fmt.Sprintf("%p", p.drivePacer) {
t.Errorf("calculatePace is not drivePacer")
}
p.SetPacer(DefaultPacer)
if fmt.Sprintf("%p", p.calculatePace) != fmt.Sprintf("%p", p.defaultPacer) {
t.Errorf("calculatePace is not defaultPacer")
}
p := New()
p.SetRetries(18)
assert.Equal(t, 18, p.retries)
}
// emptyTokens empties the pacer of all its tokens
@ -200,7 +104,7 @@ func waitForPace(p *Pacer, duration time.Duration) (when time.Time) {
}
func TestBeginCall(t *testing.T) {
p := New().SetMaxConnections(10).SetMinSleep(1 * time.Millisecond)
p := New(MaxConnectionsOption(10), CalculatorOption(NewDefault(MinSleep(1*time.Millisecond))))
emptyTokens(p)
go p.beginCall()
if !waitForPace(p, 10*time.Millisecond).IsZero() {
@ -223,7 +127,7 @@ func TestBeginCall(t *testing.T) {
}
func TestBeginCallZeroConnections(t *testing.T) {
p := New().SetMaxConnections(0).SetMinSleep(1 * time.Millisecond)
p := New(MaxConnectionsOption(0), CalculatorOption(NewDefault(MinSleep(1*time.Millisecond))))
emptyTokens(p)
go p.beginCall()
if !waitForPace(p, 10*time.Millisecond).IsZero() {
@ -241,155 +145,144 @@ func TestBeginCallZeroConnections(t *testing.T) {
}
func TestDefaultPacer(t *testing.T) {
p := New().SetMinSleep(time.Millisecond).SetPacer(DefaultPacer).SetMaxSleep(time.Second).SetDecayConstant(2)
c := NewDefault(MinSleep(1*time.Millisecond), MaxSleep(1*time.Second), DecayConstant(2))
for _, test := range []struct {
in time.Duration
retry bool
state State
want time.Duration
}{
{time.Millisecond, true, 2 * time.Millisecond},
{time.Second, true, time.Second},
{(3 * time.Second) / 4, true, time.Second},
{time.Second, false, 750 * time.Millisecond},
{1000 * time.Microsecond, false, time.Millisecond},
{1200 * time.Microsecond, false, time.Millisecond},
{State{SleepTime: 1 * time.Millisecond, ConsecutiveRetries: 1}, 2 * time.Millisecond},
{State{SleepTime: 1 * time.Second, ConsecutiveRetries: 1}, 1 * time.Second},
{State{SleepTime: (3 * time.Second) / 4, ConsecutiveRetries: 1}, 1 * time.Second},
{State{SleepTime: 1 * time.Second}, 750 * time.Millisecond},
{State{SleepTime: 1000 * time.Microsecond}, 1 * time.Millisecond},
{State{SleepTime: 1200 * time.Microsecond}, 1 * time.Millisecond},
} {
p.sleepTime = test.in
p.defaultPacer(test.retry)
got := p.sleepTime
if got != test.want {
t.Errorf("bad sleep want %v got %v", test.want, got)
}
got := c.Calculate(test.state)
assert.Equal(t, test.want, got, "test: %+v", test)
}
}
func TestAmazonCloudDrivePacer(t *testing.T) {
p := New().SetMinSleep(time.Millisecond).SetPacer(AmazonCloudDrivePacer).SetMaxSleep(time.Second).SetDecayConstant(2)
c := NewAmazonCloudDrive(MinSleep(1 * time.Millisecond))
// Do lots of times because of the random number!
for _, test := range []struct {
in time.Duration
consecutiveRetries int
retry bool
state State
want time.Duration
}{
{time.Millisecond, 0, true, time.Millisecond},
{10 * time.Millisecond, 0, true, time.Millisecond},
{1 * time.Second, 1, true, 500 * time.Millisecond},
{1 * time.Second, 2, true, 1 * time.Second},
{1 * time.Second, 3, true, 2 * time.Second},
{1 * time.Second, 4, true, 4 * time.Second},
{1 * time.Second, 5, true, 8 * time.Second},
{1 * time.Second, 6, true, 16 * time.Second},
{1 * time.Second, 7, true, 32 * time.Second},
{1 * time.Second, 8, true, 64 * time.Second},
{1 * time.Second, 9, true, 128 * time.Second},
{1 * time.Second, 10, true, 128 * time.Second},
{1 * time.Second, 11, true, 128 * time.Second},
{State{SleepTime: 1 * time.Millisecond, ConsecutiveRetries: 0}, 1 * time.Millisecond},
{State{SleepTime: 10 * time.Millisecond, ConsecutiveRetries: 0}, 1 * time.Millisecond},
{State{SleepTime: 1 * time.Second, ConsecutiveRetries: 1}, 500 * time.Millisecond},
{State{SleepTime: 1 * time.Second, ConsecutiveRetries: 2}, 1 * time.Second},
{State{SleepTime: 1 * time.Second, ConsecutiveRetries: 3}, 2 * time.Second},
{State{SleepTime: 1 * time.Second, ConsecutiveRetries: 4}, 4 * time.Second},
{State{SleepTime: 1 * time.Second, ConsecutiveRetries: 5}, 8 * time.Second},
{State{SleepTime: 1 * time.Second, ConsecutiveRetries: 6}, 16 * time.Second},
{State{SleepTime: 1 * time.Second, ConsecutiveRetries: 7}, 32 * time.Second},
{State{SleepTime: 1 * time.Second, ConsecutiveRetries: 8}, 64 * time.Second},
{State{SleepTime: 1 * time.Second, ConsecutiveRetries: 9}, 128 * time.Second},
{State{SleepTime: 1 * time.Second, ConsecutiveRetries: 10}, 128 * time.Second},
{State{SleepTime: 1 * time.Second, ConsecutiveRetries: 11}, 128 * time.Second},
} {
const n = 1000
var sum time.Duration
// measure average time over n cycles
for i := 0; i < n; i++ {
p.sleepTime = test.in
p.consecutiveRetries = test.consecutiveRetries
p.acdPacer(test.retry)
sum += p.sleepTime
sum += c.Calculate(test.state)
}
got := sum / n
//t.Logf("%+v: got = %v", test, got)
if got < (test.want*9)/10 || got > (test.want*11)/10 {
t.Fatalf("%+v: bad sleep want %v+/-10%% got %v", test, test.want, got)
}
assert.False(t, got < (test.want*9)/10 || got > (test.want*11)/10, "test: %+v", test)
}
}
func TestGoogleDrivePacer(t *testing.T) {
p := New().SetMinSleep(time.Millisecond).SetPacer(GoogleDrivePacer).SetMaxSleep(time.Second).SetDecayConstant(2)
// Do lots of times because of the random number!
for _, test := range []struct {
in time.Duration
consecutiveRetries int
retry bool
state State
want time.Duration
}{
{time.Millisecond, 0, true, time.Millisecond},
{10 * time.Millisecond, 0, true, time.Millisecond},
{1 * time.Second, 1, true, 1*time.Second + 500*time.Millisecond},
{1 * time.Second, 2, true, 2*time.Second + 500*time.Millisecond},
{1 * time.Second, 3, true, 4*time.Second + 500*time.Millisecond},
{1 * time.Second, 4, true, 8*time.Second + 500*time.Millisecond},
{1 * time.Second, 5, true, 16*time.Second + 500*time.Millisecond},
{1 * time.Second, 6, true, 16*time.Second + 500*time.Millisecond},
{1 * time.Second, 7, true, 16*time.Second + 500*time.Millisecond},
{State{SleepTime: 1 * time.Millisecond}, 0},
{State{SleepTime: 10 * time.Millisecond}, 0},
{State{SleepTime: 1 * time.Second, ConsecutiveRetries: 1}, 1*time.Second + 500*time.Millisecond},
{State{SleepTime: 1 * time.Second, ConsecutiveRetries: 2}, 2*time.Second + 500*time.Millisecond},
{State{SleepTime: 1 * time.Second, ConsecutiveRetries: 3}, 4*time.Second + 500*time.Millisecond},
{State{SleepTime: 1 * time.Second, ConsecutiveRetries: 4}, 8*time.Second + 500*time.Millisecond},
{State{SleepTime: 1 * time.Second, ConsecutiveRetries: 5}, 16*time.Second + 500*time.Millisecond},
{State{SleepTime: 1 * time.Second, ConsecutiveRetries: 6}, 16*time.Second + 500*time.Millisecond},
{State{SleepTime: 1 * time.Second, ConsecutiveRetries: 7}, 16*time.Second + 500*time.Millisecond},
} {
const n = 1000
var sum time.Duration
// measure average time over n cycles
for i := 0; i < n; i++ {
p.sleepTime = test.in
p.consecutiveRetries = test.consecutiveRetries
p.drivePacer(test.retry)
sum += p.sleepTime
c := NewGoogleDrive(MinSleep(1 * time.Millisecond))
sum += c.Calculate(test.state)
}
got := sum / n
//t.Logf("%+v: got = %v", test, got)
if got < (test.want*9)/10 || got > (test.want*11)/10 {
t.Fatalf("%+v: bad sleep want %v+/-10%% got %v", test, test.want, got)
assert.False(t, got < (test.want*9)/10 || got > (test.want*11)/10, "test: %+v, got: %v", test, got)
}
const minSleep = 2 * time.Millisecond
for _, test := range []struct {
calls int
want int
}{
{1, 0},
{9, 0},
{10, 0},
{11, 1},
{12, 2},
} {
c := NewGoogleDrive(MinSleep(minSleep), Burst(10))
count := 0
for i := 0; i < test.calls; i++ {
sleep := c.Calculate(State{})
time.Sleep(sleep)
if sleep != 0 {
count++
}
}
assert.Equalf(t, test.want, count, "test: %+v, got: %v", test, count)
}
}
func TestS3Pacer(t *testing.T) {
p := New().SetMinSleep(10 * time.Millisecond).SetPacer(S3Pacer).SetMaxSleep(time.Second).SetDecayConstant(2)
c := NewS3(MinSleep(10*time.Millisecond), MaxSleep(1*time.Second), DecayConstant(2))
for _, test := range []struct {
in time.Duration
retry bool
state State
want time.Duration
}{
{0, true, 10 * time.Millisecond}, //Things were going ok, we failed once, back off to minSleep
{10 * time.Millisecond, true, 20 * time.Millisecond}, //Another fail, double the backoff
{10 * time.Millisecond, false, 0}, //Things start going ok when we're at minSleep; should result in no sleep
{12 * time.Millisecond, false, 0}, //*near* minsleep and going ok, decay would take below minSleep, should go to 0
{0, false, 0}, //Things have been going ok; not retrying should keep sleep at 0
{time.Second, true, time.Second}, //Check maxSleep is enforced
{(3 * time.Second) / 4, true, time.Second}, //Check attack heading to maxSleep doesn't exceed maxSleep
{time.Second, false, 750 * time.Millisecond}, //Check decay from maxSleep
{48 * time.Millisecond, false, 36 * time.Millisecond}, //Check simple decay above minSleep
{State{SleepTime: 0, ConsecutiveRetries: 1}, 10 * time.Millisecond}, //Things were going ok, we failed once, back off to minSleep
{State{SleepTime: 10 * time.Millisecond, ConsecutiveRetries: 1}, 20 * time.Millisecond}, //Another fail, double the backoff
{State{SleepTime: 10 * time.Millisecond}, 0}, //Things start going ok when we're at minSleep; should result in no sleep
{State{SleepTime: 12 * time.Millisecond}, 0}, //*near* minsleep and going ok, decay would take below minSleep, should go to 0
{State{SleepTime: 0}, 0}, //Things have been going ok; not retrying should keep sleep at 0
{State{SleepTime: 1 * time.Second, ConsecutiveRetries: 1}, 1 * time.Second}, //Check maxSleep is enforced
{State{SleepTime: (3 * time.Second) / 4, ConsecutiveRetries: 1}, 1 * time.Second}, //Check attack heading to maxSleep doesn't exceed maxSleep
{State{SleepTime: 1 * time.Second}, 750 * time.Millisecond}, //Check decay from maxSleep
{State{SleepTime: 48 * time.Millisecond}, 36 * time.Millisecond}, //Check simple decay above minSleep
} {
p.sleepTime = test.in
p.s3Pacer(test.retry)
got := p.sleepTime
if got != test.want {
t.Errorf("bad sleep for %v with retry %v: want %v got %v", test.in, test.retry, test.want, got)
}
got := c.Calculate(test.state)
assert.Equal(t, test.want, got, "test: %+v", test)
}
}
func TestEndCall(t *testing.T) {
p := New().SetMaxConnections(5)
p := New(MaxConnectionsOption(5))
emptyTokens(p)
p.consecutiveRetries = 1
p.endCall(true)
if len(p.connTokens) != 1 {
t.Errorf("Expecting 1 token")
}
if p.consecutiveRetries != 2 {
t.Errorf("Bad consecutive retries")
}
p.state.ConsecutiveRetries = 1
p.endCall(true, nil)
assert.Equal(t, 1, len(p.connTokens))
assert.Equal(t, 2, p.state.ConsecutiveRetries)
}
func TestEndCallZeroConnections(t *testing.T) {
p := New().SetMaxConnections(0)
p := New(MaxConnectionsOption(0))
emptyTokens(p)
p.consecutiveRetries = 1
p.endCall(false)
if len(p.connTokens) != 0 {
t.Errorf("Expecting 0 token")
}
if p.consecutiveRetries != 0 {
t.Errorf("Bad consecutive retries")
}
p.state.ConsecutiveRetries = 1
p.endCall(false, nil)
assert.Equal(t, 0, len(p.connTokens))
assert.Equal(t, 0, p.state.ConsecutiveRetries)
}
var errFoo = errors.New("foo")
@ -397,67 +290,79 @@ var errFoo = errors.New("foo")
type dummyPaced struct {
retry bool
called int
wait *sync.Cond
}
func (dp *dummyPaced) fn() (bool, error) {
if dp.wait != nil {
dp.wait.L.Lock()
dp.called++
dp.wait.Wait()
dp.wait.L.Unlock()
} else {
dp.called++
}
return dp.retry, errFoo
}
func Test_callNoRetry(t *testing.T) {
p := New().SetMinSleep(time.Millisecond).SetMaxSleep(2 * time.Millisecond)
func TestCallFixed(t *testing.T) {
p := New(CalculatorOption(NewDefault(MinSleep(1*time.Millisecond), MaxSleep(2*time.Millisecond))))
dp := &dummyPaced{retry: false}
err := p.call(dp.fn, 10)
if dp.called != 1 {
t.Errorf("called want %d got %d", 1, dp.called)
}
if err != errFoo {
t.Errorf("err want %v got %v", errFoo, err)
}
assert.Equal(t, 1, dp.called)
assert.Equal(t, errFoo, err)
}
func Test_callRetry(t *testing.T) {
p := New().SetMinSleep(time.Millisecond).SetMaxSleep(2 * time.Millisecond)
p := New(CalculatorOption(NewDefault(MinSleep(1*time.Millisecond), MaxSleep(2*time.Millisecond))))
dp := &dummyPaced{retry: true}
err := p.call(dp.fn, 10)
if dp.called != 10 {
t.Errorf("called want %d got %d", 10, dp.called)
}
if err == errFoo {
t.Errorf("err didn't want %v got %v", errFoo, err)
}
_, ok := err.(fserrors.Retrier)
if !ok {
t.Errorf("didn't return a retry error")
}
assert.Equal(t, 10, dp.called)
assert.Equal(t, errFoo, err)
}
func TestCall(t *testing.T) {
p := New().SetMinSleep(time.Millisecond).SetMaxSleep(2 * time.Millisecond).SetRetries(20)
p := New(RetriesOption(20), CalculatorOption(NewDefault(MinSleep(1*time.Millisecond), MaxSleep(2*time.Millisecond))))
dp := &dummyPaced{retry: true}
err := p.Call(dp.fn)
if dp.called != 20 {
t.Errorf("called want %d got %d", 20, dp.called)
}
_, ok := err.(fserrors.Retrier)
if !ok {
t.Errorf("didn't return a retry error")
}
assert.Equal(t, 20, dp.called)
assert.Equal(t, errFoo, err)
}
func TestCallNoRetry(t *testing.T) {
p := New().SetMinSleep(time.Millisecond).SetMaxSleep(2 * time.Millisecond).SetRetries(20)
func TestCallParallel(t *testing.T) {
p := New(MaxConnectionsOption(3), RetriesOption(1), CalculatorOption(NewDefault(MinSleep(100*time.Microsecond), MaxSleep(1*time.Millisecond))))
dp := &dummyPaced{retry: true}
err := p.CallNoRetry(dp.fn)
if dp.called != 1 {
t.Errorf("called want %d got %d", 1, dp.called)
wait := sync.NewCond(&sync.Mutex{})
funcs := make([]*dummyPaced, 5)
for i := range funcs {
dp := &dummyPaced{wait: wait}
funcs[i] = dp
go func() {
assert.Equal(t, errFoo, p.CallNoRetry(dp.fn))
}()
}
_, ok := err.(fserrors.Retrier)
if !ok {
t.Errorf("didn't return a retry error")
time.Sleep(10 * time.Millisecond)
called := 0
wait.L.Lock()
for _, dp := range funcs {
called += dp.called
}
wait.L.Unlock()
assert.Equal(t, 3, called)
wait.Broadcast()
time.Sleep(20 * time.Millisecond)
called = 0
wait.L.Lock()
for _, dp := range funcs {
called += dp.called
}
wait.L.Unlock()
assert.Equal(t, 5, called)
wait.Broadcast()
}

326
lib/pacer/pacers.go Normal file
View File

@ -0,0 +1,326 @@
package pacer
import (
"math/rand"
"time"
"golang.org/x/time/rate"
)
type (
// MinSleep configures the minimum sleep time of a Calculator
MinSleep time.Duration
// MaxSleep configures the maximum sleep time of a Calculator
MaxSleep time.Duration
// DecayConstant configures the decay constant time of a Calculator
DecayConstant uint
// AttackConstant configures the attack constant of a Calculator
AttackConstant uint
// Burst configures the number of API calls to allow without sleeping
Burst int
)
// Default is a truncated exponential attack and decay.
//
// On retries the sleep time is doubled, on non errors then sleeptime decays
// according to the decay constant as set with SetDecayConstant.
//
// The sleep never goes below that set with SetMinSleep or above that set
// with SetMaxSleep.
type Default struct {
minSleep time.Duration // minimum sleep time
maxSleep time.Duration // maximum sleep time
decayConstant uint // decay constant
attackConstant uint // attack constant
}
// DefaultOption is the interface implemented by all options for the Default Calculator
type DefaultOption interface {
ApplyDefault(*Default)
}
// NewDefault creates a Calculator used by Pacer as the default.
func NewDefault(opts ...DefaultOption) *Default {
c := &Default{
minSleep: 10 * time.Millisecond,
maxSleep: 2 * time.Second,
decayConstant: 2,
attackConstant: 1,
}
c.Update(opts...)
return c
}
// Update applies the Calculator options.
func (c *Default) Update(opts ...DefaultOption) {
for _, opt := range opts {
opt.ApplyDefault(c)
}
}
// ApplyDefault updates the value on the Calculator
func (o MinSleep) ApplyDefault(c *Default) {
c.minSleep = time.Duration(o)
}
// ApplyDefault updates the value on the Calculator
func (o MaxSleep) ApplyDefault(c *Default) {
c.maxSleep = time.Duration(o)
}
// ApplyDefault updates the value on the Calculator
func (o DecayConstant) ApplyDefault(c *Default) {
c.decayConstant = uint(o)
}
// ApplyDefault updates the value on the Calculator
func (o AttackConstant) ApplyDefault(c *Default) {
c.attackConstant = uint(o)
}
// Calculate takes the current Pacer state and return the wait time until the next try.
func (c *Default) Calculate(state State) time.Duration {
if t, ok := IsRetryAfter(state.LastError); ok {
if t < c.minSleep {
return c.minSleep
}
return t
}
if state.ConsecutiveRetries > 0 {
sleepTime := c.maxSleep
if c.attackConstant != 0 {
sleepTime = (state.SleepTime << c.attackConstant) / ((1 << c.attackConstant) - 1)
}
if sleepTime > c.maxSleep {
sleepTime = c.maxSleep
}
return sleepTime
}
sleepTime := (state.SleepTime<<c.decayConstant - state.SleepTime) >> c.decayConstant
if sleepTime < c.minSleep {
sleepTime = c.minSleep
}
return sleepTime
}
// AmazonCloudDrive is a specialized pacer for Amazon Drive
//
// It implements a truncated exponential backoff strategy with randomization.
// Normally operations are paced at the interval set with SetMinSleep. On errors
// the sleep timer is set to 0..2**retries seconds.
//
// See https://developer.amazon.com/public/apis/experience/cloud-drive/content/restful-api-best-practices
type AmazonCloudDrive struct {
minSleep time.Duration // minimum sleep time
}
// AmazonCloudDriveOption is the interface implemented by all options for the AmazonCloudDrive Calculator
type AmazonCloudDriveOption interface {
ApplyAmazonCloudDrive(*AmazonCloudDrive)
}
// NewAmazonCloudDrive returns a new AmazonCloudDrive Calculator with default values
func NewAmazonCloudDrive(opts ...AmazonCloudDriveOption) *AmazonCloudDrive {
c := &AmazonCloudDrive{
minSleep: 10 * time.Millisecond,
}
c.Update(opts...)
return c
}
// Update applies the Calculator options.
func (c *AmazonCloudDrive) Update(opts ...AmazonCloudDriveOption) {
for _, opt := range opts {
opt.ApplyAmazonCloudDrive(c)
}
}
// ApplyAmazonCloudDrive updates the value on the Calculator
func (o MinSleep) ApplyAmazonCloudDrive(c *AmazonCloudDrive) {
c.minSleep = time.Duration(o)
}
// Calculate takes the current Pacer state and return the wait time until the next try.
func (c *AmazonCloudDrive) Calculate(state State) time.Duration {
if t, ok := IsRetryAfter(state.LastError); ok {
if t < c.minSleep {
return c.minSleep
}
return t
}
consecutiveRetries := state.ConsecutiveRetries
if consecutiveRetries == 0 {
return c.minSleep
}
if consecutiveRetries > 9 {
consecutiveRetries = 9
}
// consecutiveRetries starts at 1 so
// maxSleep is 2**(consecutiveRetries-1) seconds
maxSleep := time.Second << uint(consecutiveRetries-1)
// actual sleep is random from 0..maxSleep
sleepTime := time.Duration(rand.Int63n(int64(maxSleep)))
if sleepTime < c.minSleep {
sleepTime = c.minSleep
}
return sleepTime
}
// GoogleDrive is a specialized pacer for Google Drive
//
// It implements a truncated exponential backoff strategy with randomization.
// Normally operations are paced at the interval set with SetMinSleep. On errors
// the sleep timer is set to (2 ^ n) + random_number_milliseconds seconds.
//
// See https://developers.google.com/drive/v2/web/handle-errors#exponential-backoff
type GoogleDrive struct {
minSleep time.Duration // minimum sleep time
burst int // number of requests without sleeping
limiter *rate.Limiter // rate limiter for the minSleep
}
// GoogleDriveOption is the interface implemented by all options for the GoogleDrive Calculator
type GoogleDriveOption interface {
ApplyGoogleDrive(*GoogleDrive)
}
// NewGoogleDrive returns a new GoogleDrive Calculator with default values
func NewGoogleDrive(opts ...GoogleDriveOption) *GoogleDrive {
c := &GoogleDrive{
minSleep: 10 * time.Millisecond,
burst: 1,
}
c.Update(opts...)
return c
}
// Update applies the Calculator options.
func (c *GoogleDrive) Update(opts ...GoogleDriveOption) {
for _, opt := range opts {
opt.ApplyGoogleDrive(c)
}
if c.burst <= 0 {
c.burst = 1
}
c.limiter = rate.NewLimiter(rate.Every(c.minSleep), c.burst)
}
// ApplyGoogleDrive updates the value on the Calculator
func (o MinSleep) ApplyGoogleDrive(c *GoogleDrive) {
c.minSleep = time.Duration(o)
}
// ApplyGoogleDrive updates the value on the Calculator
func (o Burst) ApplyGoogleDrive(c *GoogleDrive) {
c.burst = int(o)
}
// Calculate takes the current Pacer state and return the wait time until the next try.
func (c *GoogleDrive) Calculate(state State) time.Duration {
if t, ok := IsRetryAfter(state.LastError); ok {
if t < c.minSleep {
return c.minSleep
}
return t
}
consecutiveRetries := state.ConsecutiveRetries
if consecutiveRetries == 0 {
return c.limiter.Reserve().Delay()
}
if consecutiveRetries > 5 {
consecutiveRetries = 5
}
// consecutiveRetries starts at 1 so go from 1,2,3,4,5,5 => 1,2,4,8,16,16
// maxSleep is 2**(consecutiveRetries-1) seconds + random milliseconds
return time.Second<<uint(consecutiveRetries-1) + time.Duration(rand.Int63n(int64(time.Second)))
}
// S3 implements a pacer compatible with our expectations of S3, where it tries to not
// delay at all between successful calls, but backs off in the default fashion in response
// to any errors.
// The assumption is that errors should be exceedingly rare (S3 seems to have largely solved
// the sort of stability questions rclone is likely to run into), and in the happy case
// it can handle calls with no delays between them.
//
// Basically defaultPacer, but with some handling of sleepTime going to/from 0ms
type S3 struct {
minSleep time.Duration // minimum sleep time
maxSleep time.Duration // maximum sleep time
decayConstant uint // decay constant
attackConstant uint // attack constant
}
// S3Option is the interface implemented by all options for the S3 Calculator
type S3Option interface {
ApplyS3(*S3)
}
// NewS3 returns a new S3 Calculator with default values
func NewS3(opts ...S3Option) *S3 {
c := &S3{
maxSleep: 2 * time.Second,
decayConstant: 2,
attackConstant: 1,
}
c.Update(opts...)
return c
}
// Update applies the Calculator options.
func (c *S3) Update(opts ...S3Option) {
for _, opt := range opts {
opt.ApplyS3(c)
}
}
// ApplyS3 updates the value on the Calculator
func (o MaxSleep) ApplyS3(c *S3) {
c.maxSleep = time.Duration(o)
}
// ApplyS3 updates the value on the Calculator
func (o MinSleep) ApplyS3(c *S3) {
c.minSleep = time.Duration(o)
}
// ApplyS3 updates the value on the Calculator
func (o DecayConstant) ApplyS3(c *S3) {
c.decayConstant = uint(o)
}
// ApplyS3 updates the value on the Calculator
func (o AttackConstant) ApplyS3(c *S3) {
c.attackConstant = uint(o)
}
// Calculate takes the current Pacer state and return the wait time until the next try.
func (c *S3) Calculate(state State) time.Duration {
if t, ok := IsRetryAfter(state.LastError); ok {
if t < c.minSleep {
return c.minSleep
}
return t
}
if state.ConsecutiveRetries > 0 {
if c.attackConstant == 0 {
return c.maxSleep
}
if state.SleepTime == 0 {
return c.minSleep
}
sleepTime := (state.SleepTime << c.attackConstant) / ((1 << c.attackConstant) - 1)
if sleepTime > c.maxSleep {
sleepTime = c.maxSleep
}
return sleepTime
}
sleepTime := (state.SleepTime<<c.decayConstant - state.SleepTime) >> c.decayConstant
if sleepTime < c.minSleep {
sleepTime = 0
}
return sleepTime
}