From e84790ef7976440974cb4c4f01e4712854346dcc Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Thu, 22 Nov 2018 22:15:52 +0000 Subject: [PATCH] swift: add pacer for retries to make swift more reliable #2740 --- backend/swift/swift.go | 135 +++++++++++++++++++++++++++++++++++------ 1 file changed, 116 insertions(+), 19 deletions(-) diff --git a/backend/swift/swift.go b/backend/swift/swift.go index 2ea38f7d1..f4d7299ca 100644 --- a/backend/swift/swift.go +++ b/backend/swift/swift.go @@ -21,6 +21,7 @@ import ( "github.com/ncw/rclone/fs/hash" "github.com/ncw/rclone/fs/operations" "github.com/ncw/rclone/fs/walk" + "github.com/ncw/rclone/lib/pacer" "github.com/ncw/swift" "github.com/pkg/errors" ) @@ -30,6 +31,7 @@ const ( directoryMarkerContentType = "application/directory" // content type of directory marker objects listChunks = 1000 // chunk size to read directory listings defaultChunkSize = 5 * fs.GibiByte + minSleep = 10 * time.Millisecond // In case of error, start at 10ms sleep. ) // SharedOptions are shared between swift and hubic @@ -187,6 +189,7 @@ type Fs struct { containerOK bool // true if we have created the container segmentsContainer string // container to store the segments (if any) in noCheckContainer bool // don't check the container before creating it + pacer *pacer.Pacer // To pace the API calls } // Object describes a swift object @@ -227,6 +230,32 @@ func (f *Fs) Features() *fs.Features { return f.features } +// retryErrorCodes is a slice of error codes that we will retry +var retryErrorCodes = []int{ + 401, // Unauthorized (eg "Token has expired") + 408, // Request Timeout + 409, // Conflict - various states that could be resolved on a retry + 429, // Rate exceeded. + 500, // Get occasional 500 Internal Server Error + 503, // Service Unavailable/Slow Down - "Reduce your request rate" + 504, // Gateway Time-out +} + +// shouldRetry returns a boolean as to whether this err deserves to be +// retried. It returns the err as a convenience +func shouldRetry(err error) (bool, error) { + // If this is an swift.Error object extract the HTTP error code + if swiftError, ok := err.(*swift.Error); ok { + for _, e := range retryErrorCodes { + if swiftError.StatusCode == e { + return true, err + } + } + } + // Check for generic failure conditions + return fserrors.ShouldRetry(err), err +} + // Pattern to match a swift path var matcher = regexp.MustCompile(`^/*([^/]*)(.*)$`) @@ -337,6 +366,7 @@ func NewFsWithConnection(opt *Options, name, root string, c *swift.Connection, n segmentsContainer: container + "_segments", root: directory, noCheckContainer: noCheckContainer, + pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.S3Pacer), } f.features = (&fs.Features{ ReadMimeType: true, @@ -346,7 +376,11 @@ func NewFsWithConnection(opt *Options, name, root string, c *swift.Connection, n if f.root != "" { f.root += "/" // Check to see if the object exists - ignoring directory markers - info, _, err := f.c.Object(container, directory) + var info swift.Object + err = f.pacer.Call(func() (bool, error) { + info, _, err = f.c.Object(container, directory) + return shouldRetry(err) + }) if err == nil && info.ContentType != directoryMarkerContentType { f.root = path.Dir(directory) if f.root == "." { @@ -436,7 +470,12 @@ func (f *Fs) listContainerRoot(container, root string, dir string, recurse bool, } rootLength := len(root) return f.c.ObjectsWalk(container, &opts, func(opts *swift.ObjectsOpts) (interface{}, error) { - objects, err := f.c.Objects(container, opts) + var objects []swift.Object + var err error + err = f.pacer.Call(func() (bool, error) { + objects, err = f.c.Objects(container, opts) + return shouldRetry(err) + }) if err == nil { for i := range objects { object := &objects[i] @@ -525,7 +564,11 @@ func (f *Fs) listContainers(dir string) (entries fs.DirEntries, err error) { if dir != "" { return nil, fs.ErrorListBucketRequired } - containers, err := f.c.ContainersAll(nil) + var containers []swift.Container + err = f.pacer.Call(func() (bool, error) { + containers, err = f.c.ContainersAll(nil) + return shouldRetry(err) + }) if err != nil { return nil, errors.Wrap(err, "container listing failed") } @@ -586,7 +629,12 @@ func (f *Fs) ListR(dir string, callback fs.ListRCallback) (err error) { // About gets quota information func (f *Fs) About() (*fs.Usage, error) { - containers, err := f.c.ContainersAll(nil) + var containers []swift.Container + var err error + err = f.pacer.Call(func() (bool, error) { + containers, err = f.c.ContainersAll(nil) + return shouldRetry(err) + }) if err != nil { return nil, errors.Wrap(err, "container listing failed") } @@ -636,14 +684,20 @@ func (f *Fs) Mkdir(dir string) error { // Check to see if container exists first var err error = swift.ContainerNotFound if !f.noCheckContainer { - _, _, err = f.c.Container(f.container) + err = f.pacer.Call(func() (bool, error) { + _, _, err = f.c.Container(f.container) + return shouldRetry(err) + }) } if err == swift.ContainerNotFound { headers := swift.Headers{} if f.opt.StoragePolicy != "" { headers["X-Storage-Policy"] = f.opt.StoragePolicy } - err = f.c.ContainerCreate(f.container, headers) + err = f.pacer.Call(func() (bool, error) { + err = f.c.ContainerCreate(f.container, headers) + return shouldRetry(err) + }) } if err == nil { f.containerOK = true @@ -660,7 +714,11 @@ func (f *Fs) Rmdir(dir string) error { if f.root != "" || dir != "" { return nil } - err := f.c.ContainerDelete(f.container) + var err error + err = f.pacer.Call(func() (bool, error) { + err = f.c.ContainerDelete(f.container) + return shouldRetry(err) + }) if err == nil { f.containerOK = false } @@ -719,7 +777,10 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) { return nil, fs.ErrorCantCopy } srcFs := srcObj.fs - _, err = f.c.ObjectCopy(srcFs.container, srcFs.root+srcObj.remote, f.container, f.root+remote, nil) + err = f.pacer.Call(func() (bool, error) { + _, err = f.c.ObjectCopy(srcFs.container, srcFs.root+srcObj.remote, f.container, f.root+remote, nil) + return shouldRetry(err) + }) if err != nil { return nil, err } @@ -809,7 +870,12 @@ func (o *Object) readMetaData() (err error) { if o.headers != nil { return nil } - info, h, err := o.fs.c.Object(o.fs.container, o.fs.root+o.remote) + var info swift.Object + var h swift.Headers + err = o.fs.pacer.Call(func() (bool, error) { + info, h, err = o.fs.c.Object(o.fs.container, o.fs.root+o.remote) + return shouldRetry(err) + }) if err != nil { if err == swift.ObjectNotFound { return fs.ErrorObjectNotFound @@ -861,7 +927,10 @@ func (o *Object) SetModTime(modTime time.Time) error { newHeaders[k] = v } } - return o.fs.c.ObjectUpdate(o.fs.container, o.fs.root+o.remote, newHeaders) + return o.fs.pacer.Call(func() (bool, error) { + err = o.fs.c.ObjectUpdate(o.fs.container, o.fs.root+o.remote, newHeaders) + return shouldRetry(err) + }) } // Storable returns if this object is storable @@ -876,7 +945,10 @@ func (o *Object) Storable() bool { func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { headers := fs.OpenOptionHeaders(options) _, isRanging := headers["Range"] - in, _, err = o.fs.c.ObjectOpen(o.fs.container, o.fs.root+o.remote, !isRanging, headers) + err = o.fs.pacer.Call(func() (bool, error) { + in, _, err = o.fs.c.ObjectOpen(o.fs.container, o.fs.root+o.remote, !isRanging, headers) + return shouldRetry(err) + }) return } @@ -903,13 +975,20 @@ func (o *Object) removeSegments(except string) error { } segmentPath := segmentsRoot + remote fs.Debugf(o, "Removing segment file %q in container %q", segmentPath, o.fs.segmentsContainer) - return o.fs.c.ObjectDelete(o.fs.segmentsContainer, segmentPath) + var err error + return o.fs.pacer.Call(func() (bool, error) { + err = o.fs.c.ObjectDelete(o.fs.segmentsContainer, segmentPath) + return shouldRetry(err) + }) }) if err != nil { return err } // remove the segments container if empty, ignore errors - err = o.fs.c.ContainerDelete(o.fs.segmentsContainer) + err = o.fs.pacer.Call(func() (bool, error) { + err = o.fs.c.ContainerDelete(o.fs.segmentsContainer) + return shouldRetry(err) + }) if err == nil { fs.Debugf(o, "Removed empty container %q", o.fs.segmentsContainer) } @@ -938,13 +1017,19 @@ func urlEncode(str string) string { func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64, contentType string) (string, error) { // Create the segmentsContainer if it doesn't exist var err error - _, _, err = o.fs.c.Container(o.fs.segmentsContainer) + err = o.fs.pacer.Call(func() (bool, error) { + _, _, err = o.fs.c.Container(o.fs.segmentsContainer) + return shouldRetry(err) + }) if err == swift.ContainerNotFound { headers := swift.Headers{} if o.fs.opt.StoragePolicy != "" { headers["X-Storage-Policy"] = o.fs.opt.StoragePolicy } - err = o.fs.c.ContainerCreate(o.fs.segmentsContainer, headers) + err = o.fs.pacer.Call(func() (bool, error) { + err = o.fs.c.ContainerCreate(o.fs.segmentsContainer, headers) + return shouldRetry(err) + }) } if err != nil { return "", err @@ -973,7 +1058,10 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64, segmentReader := io.LimitReader(in, n) segmentPath := fmt.Sprintf("%s/%08d", segmentsPath, i) fs.Debugf(o, "Uploading segment file %q into %q", segmentPath, o.fs.segmentsContainer) - _, err := o.fs.c.ObjectPut(o.fs.segmentsContainer, segmentPath, segmentReader, true, "", "", headers) + err = o.fs.pacer.CallNoRetry(func() (bool, error) { + _, err = o.fs.c.ObjectPut(o.fs.segmentsContainer, segmentPath, segmentReader, true, "", "", headers) + return shouldRetry(err) + }) if err != nil { return "", err } @@ -984,7 +1072,10 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64, headers["Content-Length"] = "0" // set Content-Length as we know it emptyReader := bytes.NewReader(nil) manifestName := o.fs.root + o.remote - _, err = o.fs.c.ObjectPut(o.fs.container, manifestName, emptyReader, true, "", contentType, headers) + err = o.fs.pacer.Call(func() (bool, error) { + _, err = o.fs.c.ObjectPut(o.fs.container, manifestName, emptyReader, true, "", contentType, headers) + return shouldRetry(err) + }) return uniquePrefix + "/", err } @@ -1021,7 +1112,10 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio } } else { headers["Content-Length"] = strconv.FormatInt(size, 10) // set Content-Length as we know it - _, err := o.fs.c.ObjectPut(o.fs.container, o.fs.root+o.remote, in, true, "", contentType, headers) + err = o.fs.pacer.CallNoRetry(func() (bool, error) { + _, err = o.fs.c.ObjectPut(o.fs.container, o.fs.root+o.remote, in, true, "", contentType, headers) + return shouldRetry(err) + }) if err != nil { return err } @@ -1047,7 +1141,10 @@ func (o *Object) Remove() error { return err } // Remove file/manifest first - err = o.fs.c.ObjectDelete(o.fs.container, o.fs.root+o.remote) + err = o.fs.pacer.Call(func() (bool, error) { + err = o.fs.c.ObjectDelete(o.fs.container, o.fs.root+o.remote) + return shouldRetry(err) + }) if err != nil { return err }