From 14f814b80688101f10ecf76cd92ea67e0c9d6e90 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Wed, 9 Sep 2015 23:23:37 +0100 Subject: [PATCH] acd: Retry on 429 errors with backoff and fix upload of 0 length files --- amazonclouddrive/amazonclouddrive.go | 79 +++++++++++++++++++++++++--- 1 file changed, 72 insertions(+), 7 deletions(-) diff --git a/amazonclouddrive/amazonclouddrive.go b/amazonclouddrive/amazonclouddrive.go index 257283a8c..e826cc788 100644 --- a/amazonclouddrive/amazonclouddrive.go +++ b/amazonclouddrive/amazonclouddrive.go @@ -10,16 +10,17 @@ FIXME make searching for directory in id and file in id more efficient FIXME make the default for no files and no dirs be (FILE & FOLDER) so we ignore assets completely! -FIXME detect 429 errors and return error with fs.RetryErrorf? - */ import ( "fmt" "io" "log" + "math/rand" + "net/http" "regexp" "strings" + "sync" "time" "github.com/ncw/go-acd" @@ -39,6 +40,8 @@ const ( assetKind = "ASSET" statusAvailable = "AVAILABLE" timeFormat = time.RFC3339 // 2014-03-07T22:31:12.173Z + minBackoff = 1 * time.Second + maxBackoff = 256 * time.Second ) // Globals @@ -84,6 +87,9 @@ type FsAcd struct { c *acd.Client // the connection to the acd server root string // the path we are working on dirCache *dircache.DirCache // Map of directory path to directory id + + backoffLock sync.Mutex + backoff time.Duration // current backoff } // FsObjectAcd describes a acd object @@ -204,11 +210,46 @@ func (f *FsAcd) NewFsObject(remote string) fs.Object { return f.newFsObjectWithInfo(remote, nil) } +// errChk checks the response and if it is a 429 error returns a +// RetryError, otherwise it returns just a plain error +// +// It also implements the backoff strategy +func (f *FsAcd) errChk(resp *http.Response, err error) error { + if err != nil && resp != nil && resp.StatusCode == 429 { + // Update backoff + f.backoffLock.Lock() + backoff := f.backoff + if backoff == 0 { + backoff = minBackoff + } else { + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } + f.backoff = backoff + f.backoffLock.Unlock() + // Sleep for the backoff time + sleepTime := time.Duration(rand.Int63n(int64(backoff))) + fs.Debug(f, "Retry error: backoff is now %v, sleeping for %v", backoff, sleepTime) + time.Sleep(sleepTime) + return fs.RetryError(err) + } + // Reset backoff on success + if err == nil { + f.backoffLock.Lock() + f.backoff = 0 + f.backoffLock.Unlock() + } + return err +} + // FindLeaf finds a directory of name leaf in the folder with ID pathId func (f *FsAcd) FindLeaf(pathId, leaf string) (pathIdOut string, found bool, err error) { //fs.Debug(f, "FindLeaf(%q, %q)", pathId, leaf) folder := acd.FolderFromId(pathId, f.c.Nodes) subFolder, _, err := folder.GetFolder(leaf) + // err = f.errChk(resp, err) if err != nil { if err == acd.ErrorNodeNotFound { //fs.Debug(f, "...Not found") @@ -231,6 +272,7 @@ func (f *FsAcd) CreateDir(pathId, leaf string) (newId string, err error) { //fs.Debug(f, "CreateDir(%q, %q)", pathId, leaf) folder := acd.FolderFromId(pathId, f.c.Nodes) info, _, err := folder.CreateFolder(leaf) + // err = f.errChk(resp, err) if err != nil { fs.Debug(f, "...Error %v", err) return "", err @@ -265,9 +307,11 @@ func (f *FsAcd) listAll(dirId string, title string, directoriesOnly bool, filesO Filters: query, } var nodes []*acd.Node + //var resp *http.Response OUTER: for { nodes, _, err = f.c.Nodes.GetNodes(&opts) + // err = f.errChk(resp, err) if err != nil { fs.Stats.Error() fs.ErrorLog(f, "Couldn't list files: %v", err) @@ -394,7 +438,14 @@ func (f *FsAcd) Put(in io.Reader, remote string, modTime time.Time, size int64) return nil, err } folder := acd.FolderFromId(directoryID, o.acd.c.Nodes) - info, _, err := folder.Put(in, leaf) + var info *acd.File + var resp *http.Response + if size != 0 { + info, resp, err = folder.Put(in, leaf) + } else { + info, resp, err = folder.PutSized(in, size, leaf) + } + err = f.errChk(resp, err) if err != nil { return nil, err } @@ -445,7 +496,9 @@ func (f *FsAcd) purgeCheck(check bool) error { } node := acd.NodeFromId(rootID, f.c.Nodes) - _, err = node.Trash() + var resp *http.Response + resp, err = node.Trash() + err = f.errChk(resp, err) if err != nil { return err } @@ -547,6 +600,7 @@ func (o *FsObjectAcd) readMetaData() (err error) { } folder := acd.FolderFromId(directoryID, o.acd.c.Nodes) info, _, err := folder.GetFile(leaf) + // err = o.acd.errChk(resp, err) if err != nil { fs.Debug(o, "Failed to read info: %s", err) return err @@ -588,7 +642,9 @@ func (o *FsObjectAcd) Storable() bool { // Open an object for read func (o *FsObjectAcd) Open() (in io.ReadCloser, err error) { file := acd.File{Node: o.info} - in, _, err = file.Open() + var resp *http.Response + in, resp, err = file.Open() + err = o.acd.errChk(resp, err) return in, err } @@ -597,7 +653,15 @@ func (o *FsObjectAcd) Open() (in io.ReadCloser, err error) { // The new object may have been created if an error is returned func (o *FsObjectAcd) Update(in io.Reader, modTime time.Time, size int64) error { file := acd.File{Node: o.info} - info, _, err := file.Overwrite(in) + var info *acd.File + var resp *http.Response + var err error + if size != 0 { + info, resp, err = file.OverwriteSized(in, size) + } else { + info, resp, err = file.Overwrite(in) + } + err = o.acd.errChk(resp, err) if err != nil { return err } @@ -607,7 +671,8 @@ func (o *FsObjectAcd) Update(in io.Reader, modTime time.Time, size int64) error // Remove an object func (o *FsObjectAcd) Remove() error { - _, err := o.info.Trash() + resp, err := o.info.Trash() + err = o.acd.errChk(resp, err) return err }