diff --git a/backend/s3/s3.go b/backend/s3/s3.go index 2679f7156..020c3e4b8 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -39,9 +39,11 @@ import ( "github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs/config/configmap" "github.com/ncw/rclone/fs/config/configstruct" + "github.com/ncw/rclone/fs/fserrors" "github.com/ncw/rclone/fs/fshttp" "github.com/ncw/rclone/fs/hash" "github.com/ncw/rclone/fs/walk" + "github.com/ncw/rclone/lib/pacer" "github.com/ncw/rclone/lib/rest" "github.com/ncw/swift" "github.com/pkg/errors" @@ -570,6 +572,7 @@ const ( maxRetries = 10 // number of retries to make of operations maxSizeForCopy = 5 * 1024 * 1024 * 1024 // The maximum size of object we can COPY maxFileSize = 5 * 1024 * 1024 * 1024 * 1024 // largest possible upload file size + minSleep = 10 * time.Millisecond // In case of error, start at 10ms sleep. ) // Options defines the configuration for this backend @@ -604,6 +607,7 @@ type Fs struct { bucketOKMu sync.Mutex // mutex to protect bucket OK bucketOK bool // true if we have created the bucket bucketDeleted bool // true if we have deleted the bucket + pacer *pacer.Pacer // To pace the API calls } // Object describes a s3 object @@ -649,6 +653,37 @@ func (f *Fs) Features() *fs.Features { return f.features } +// retryErrorCodes is a slice of error codes that we will retry +// See: https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html +var retryErrorCodes = []int{ + 409, // Conflict - various states that could be resolved on a retry + 503, // Service Unavailable/Slow Down - "Reduce your request rate" +} + +//S3 is pretty resilient, and the built in retry handling is probably sufficient +// as it should notice closed connections and timeouts which are the most likely +// sort of failure modes +func shouldRetry(err error) (bool, error) { + + // If this is an awserr object, try and extract more useful information to determine if we should retry + if awsError, ok := err.(awserr.Error); ok { + // Simple case, check the original embedded error in case it's generically retriable + if fserrors.ShouldRetry(awsError.OrigErr()) { + return true, err + } + //Failing that, if it's a RequestFailure it's probably got an http status code we can check + if reqErr, ok := err.(awserr.RequestFailure); ok { + for _, e := range retryErrorCodes { + if reqErr.StatusCode() == e { + return true, err + } + } + } + } + //Ok, not an awserr, check for generic failure conditions + return fserrors.ShouldRetry(err), err +} + // Pattern to match a s3 path var matcher = regexp.MustCompile(`^/*([^/]*)(.*)$`) @@ -774,6 +809,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { c: c, bucket: bucket, ses: ses, + pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.S3Pacer), } f.features = (&fs.Features{ ReadMimeType: true, @@ -787,7 +823,10 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { Bucket: &f.bucket, Key: &directory, } - _, err = f.c.HeadObject(&req) + err = f.pacer.Call(func() (bool, error) { + _, err = f.c.HeadObject(&req) + return shouldRetry(err) + }) if err == nil { f.root = path.Dir(directory) if f.root == "." { @@ -864,7 +903,12 @@ func (f *Fs) list(dir string, recurse bool, fn listFn) error { MaxKeys: &maxKeys, Marker: marker, } - resp, err := f.c.ListObjects(&req) + var resp *s3.ListObjectsOutput + var err error + err = f.pacer.Call(func() (bool, error) { + resp, err = f.c.ListObjects(&req) + return shouldRetry(err) + }) if err != nil { if awsErr, ok := err.(awserr.RequestFailure); ok { if awsErr.StatusCode() == http.StatusNotFound { @@ -989,7 +1033,11 @@ func (f *Fs) listBuckets(dir string) (entries fs.DirEntries, err error) { return nil, fs.ErrorListBucketRequired } req := s3.ListBucketsInput{} - resp, err := f.c.ListBuckets(&req) + var resp *s3.ListBucketsOutput + err = f.pacer.Call(func() (bool, error) { + resp, err = f.c.ListBuckets(&req) + return shouldRetry(err) + }) if err != nil { return nil, err } @@ -1074,7 +1122,10 @@ func (f *Fs) dirExists() (bool, error) { req := s3.HeadBucketInput{ Bucket: &f.bucket, } - _, err := f.c.HeadBucket(&req) + err := f.pacer.Call(func() (bool, error) { + _, err := f.c.HeadBucket(&req) + return shouldRetry(err) + }) if err == nil { return true, nil } @@ -1111,7 +1162,10 @@ func (f *Fs) Mkdir(dir string) error { LocationConstraint: &f.opt.LocationConstraint, } } - _, err := f.c.CreateBucket(&req) + err := f.pacer.Call(func() (bool, error) { + _, err := f.c.CreateBucket(&req) + return shouldRetry(err) + }) if err, ok := err.(awserr.Error); ok { if err.Code() == "BucketAlreadyOwnedByYou" { err = nil @@ -1136,7 +1190,10 @@ func (f *Fs) Rmdir(dir string) error { req := s3.DeleteBucketInput{ Bucket: &f.bucket, } - _, err := f.c.DeleteBucket(&req) + err := f.pacer.Call(func() (bool, error) { + _, err := f.c.DeleteBucket(&req) + return shouldRetry(err) + }) if err == nil { f.bucketOK = false f.bucketDeleted = true @@ -1183,7 +1240,10 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) { CopySource: &source, MetadataDirective: aws.String(s3.MetadataDirectiveCopy), } - _, err = f.c.CopyObject(&req) + err = f.pacer.Call(func() (bool, error) { + _, err = f.c.CopyObject(&req) + return shouldRetry(err) + }) if err != nil { return nil, err } @@ -1260,7 +1320,12 @@ func (o *Object) readMetaData() (err error) { Bucket: &o.fs.bucket, Key: &key, } - resp, err := o.fs.c.HeadObject(&req) + var resp *s3.HeadObjectOutput + err = o.fs.pacer.Call(func() (bool, error) { + var err error + resp, err = o.fs.c.HeadObject(&req) + return shouldRetry(err) + }) if err != nil { if awsErr, ok := err.(awserr.RequestFailure); ok { if awsErr.StatusCode() == http.StatusNotFound { @@ -1344,7 +1409,10 @@ func (o *Object) SetModTime(modTime time.Time) error { Metadata: o.meta, MetadataDirective: &directive, } - _, err = o.fs.c.CopyObject(&req) + err = o.fs.pacer.Call(func() (bool, error) { + _, err := o.fs.c.CopyObject(&req) + return shouldRetry(err) + }) return err } @@ -1371,7 +1439,12 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { } } } - resp, err := o.fs.c.GetObject(&req) + var resp *s3.GetObjectOutput + err = o.fs.pacer.Call(func() (bool, error) { + var err error + resp, err = o.fs.c.GetObject(&req) + return shouldRetry(err) + }) if err, ok := err.(awserr.RequestFailure); ok { if err.Code() == "InvalidObjectState" { return nil, errors.Errorf("Object in GLACIER, restore first: %v", key) @@ -1450,7 +1523,10 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio if o.fs.opt.StorageClass != "" { req.StorageClass = &o.fs.opt.StorageClass } - _, err = uploader.Upload(&req) + err = o.fs.pacer.CallNoRetry(func() (bool, error) { + _, err = uploader.Upload(&req) + return shouldRetry(err) + }) if err != nil { return err } @@ -1468,7 +1544,10 @@ func (o *Object) Remove() error { Bucket: &o.fs.bucket, Key: &key, } - _, err := o.fs.c.DeleteObject(&req) + err := o.fs.pacer.Call(func() (bool, error) { + _, err := o.fs.c.DeleteObject(&req) + return shouldRetry(err) + }) return err } diff --git a/lib/pacer/pacer.go b/lib/pacer/pacer.go index fb9ae9977..8d94e5fe8 100644 --- a/lib/pacer/pacer.go +++ b/lib/pacer/pacer.go @@ -59,6 +59,12 @@ const ( // // See https://developers.google.com/drive/v2/web/handle-errors#exponential-backoff GoogleDrivePacer + + // S3Pacer is a specialised pacer for S3 + // + // It is basically the defaultPacer, but allows the sleep time to go to 0 + // when things are going well. + S3Pacer ) // Paced is a function which is called by the Call and CallNoRetry @@ -185,6 +191,8 @@ func (p *Pacer) SetPacer(t Type) *Pacer { p.calculatePace = p.acdPacer case GoogleDrivePacer: p.calculatePace = p.drivePacer + case S3Pacer: + p.calculatePace = p.s3Pacer default: p.calculatePace = p.defaultPacer } @@ -309,6 +317,46 @@ func (p *Pacer) drivePacer(retry bool) { } } +// s3Pacer implements a pacer compatible with our expectations of S3, where it tries to not +// delay at all between successful calls, but backs off in the default fashion in response +// to any errors. +// The assumption is that errors should be exceedingly rare (S3 seems to have largely solved +// the sort of scability questions rclone is likely to run into), and in the happy case +// it can handle calls with no delays between them. +// +// Basically defaultPacer, but with some handling of sleepTime going to/from 0ms +// Ignores minSleep entirely +// +// Call with p.mu held +func (p *Pacer) s3Pacer(retry bool) { + oldSleepTime := p.sleepTime + if retry { + if p.attackConstant == 0 { + p.sleepTime = p.maxSleep + } else { + if p.sleepTime == 0 { + p.sleepTime = p.minSleep + } else { + p.sleepTime = (p.sleepTime << p.attackConstant) / ((1 << p.attackConstant) - 1) + } + } + if p.sleepTime > p.maxSleep { + p.sleepTime = p.maxSleep + } + if p.sleepTime != oldSleepTime { + fs.Debugf("pacer", "Rate limited, increasing sleep to %v", p.sleepTime) + } + } else { + p.sleepTime = (p.sleepTime<> p.decayConstant + if p.sleepTime < p.minSleep { + p.sleepTime = 0 + } + if p.sleepTime != oldSleepTime { + fs.Debugf("pacer", "Reducing sleep to %v", p.sleepTime) + } + } +} + // endCall implements the pacing algorithm // // This should calculate a new sleepTime. It takes a boolean as to diff --git a/lib/pacer/pacer_test.go b/lib/pacer/pacer_test.go index fbddaec02..1b2793f7a 100644 --- a/lib/pacer/pacer_test.go +++ b/lib/pacer/pacer_test.go @@ -340,6 +340,32 @@ func TestGoogleDrivePacer(t *testing.T) { } } +func TestS3Pacer(t *testing.T) { + p := New().SetMinSleep(10 * time.Millisecond).SetPacer(S3Pacer).SetMaxSleep(time.Second).SetDecayConstant(2) + for _, test := range []struct { + in time.Duration + retry bool + want time.Duration + }{ + {0, true, 10 * time.Millisecond}, //Things were going ok, we failed once, back off to minSleep + {10 * time.Millisecond, true, 20 * time.Millisecond}, //Another fail, double the backoff + {10 * time.Millisecond, false, 0}, //Things start going ok when we're at minSleep; should result in no sleep + {12 * time.Millisecond, false, 0}, //*near* minsleep and going ok, decay would take below minSleep, should go to 0 + {0, false, 0}, //Things have been going ok; not retrying should keep sleep at 0 + {time.Second, true, time.Second}, //Check maxSleep is enforced + {(3 * time.Second) / 4, true, time.Second}, //Check attack heading to maxSleep doesn't exceed maxSleep + {time.Second, false, 750 * time.Millisecond}, //Check decay from maxSleep + {48 * time.Millisecond, false, 36 * time.Millisecond}, //Check simple decay above minSleep + } { + p.sleepTime = test.in + p.s3Pacer(test.retry) + got := p.sleepTime + if got != test.want { + t.Errorf("bad sleep for %v with retry %v: want %v got %v", test.in, test.retry, test.want, got) + } + } +} + func TestEndCall(t *testing.T) { p := New().SetMaxConnections(5) emptyTokens(p)