diff --git a/b2/b2.go b/b2/b2.go index 148c42ec1..d9ed477e5 100644 --- a/b2/b2.go +++ b/b2/b2.go @@ -26,6 +26,7 @@ import ( "github.com/ncw/rclone/b2/api" "github.com/ncw/rclone/fs" + "github.com/ncw/rclone/pacer" "github.com/ncw/rclone/rest" ) @@ -35,6 +36,9 @@ const ( timeKey = "src_last_modified_millis" timeHeader = headerPrefix + timeKey sha1Header = "X-Bz-Content-Sha1" + minSleep = 10 * time.Millisecond + maxSleep = 2 * time.Second + decayConstant = 2 // bigger for slower decay, exponential ) // Register with Fs @@ -72,6 +76,7 @@ type Fs struct { uploadMu sync.Mutex // lock for upload variable upload api.GetUploadURLResponse // result of get upload URL call authMu sync.Mutex // lock for authorizing the account + pacer *pacer.Pacer // To pace and retry the API calls } // Object describes a b2 object @@ -123,6 +128,39 @@ func parsePath(path string) (bucket, directory string, err error) { return } +// retryErrorCodes is a slice of error codes that we will retry +var retryErrorCodes = []int{ + 401, // Unauthorized (eg "Token has expired") + 408, // Request Timeout + 429, // Rate exceeded. + 500, // Get occasional 500 Internal Server Error + 503, // Service Unavailable + 504, // Gateway Time-out +} + +// shouldRetryNoAuth returns a boolean as to whether this resp and err +// deserve to be retried. It returns the err as a convenience +func (f *Fs) shouldRetryNoReauth(resp *http.Response, err error) (bool, error) { + return fs.ShouldRetry(err) || fs.ShouldRetryHTTP(resp, retryErrorCodes), err +} + +// shouldRetry returns a boolean as to whether this resp and err +// deserve to be retried. It returns the err as a convenience +func (f *Fs) shouldRetry(resp *http.Response, err error) (bool, error) { + if resp != nil && resp.StatusCode == 401 { + fs.Debug(f, "b2 auth token expired refetching") + // Reauth + authErr := f.authorizeAccount() + if authErr != nil { + err = authErr + } + // Refetch upload URL + f.clearUploadURL() + return true, err + } + return f.shouldRetryNoReauth(resp, err) +} + // errorHandler parses a non 2xx error response into an error func errorHandler(resp *http.Response) error { // Decode error response @@ -166,8 +204,12 @@ func NewFs(name, root string) (fs.Fs, error) { key: key, endpoint: endpoint, srv: rest.NewClient(fs.Config.Client()).SetErrorHandler(errorHandler), + pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), + } + err = f.authorizeAccount() + if err != nil { + return nil, fmt.Errorf("Failed to authorize account: %v", err) } - f.authorizeAccount() if f.root != "" { f.root += "/" // Check to see if the (bucket,directory) is actually an existing file @@ -201,7 +243,10 @@ func (f *Fs) authorizeAccount() error { Password: f.key, ExtraHeaders: map[string]string{"Authorization": ""}, // unset the Authorization for this request } - _, err := f.srv.CallJSON(&opts, nil, &f.info) + err := f.pacer.Call(func() (bool, error) { + resp, err := f.srv.CallJSON(&opts, nil, &f.info) + return f.shouldRetryNoReauth(resp, err) + }) if err != nil { return fmt.Errorf("Failed to authenticate: %v", err) } @@ -225,7 +270,10 @@ func (f *Fs) getUploadURL() (string, string, error) { var request = api.GetUploadURLRequest{ BucketID: bucketID, } - _, err := f.srv.CallJSON(&opts, &request, &f.upload) + err := f.pacer.Call(func() (bool, error) { + resp, err := f.srv.CallJSON(&opts, &request, &f.upload) + return f.shouldRetryNoReauth(resp, err) + }) if err != nil { return "", "", fmt.Errorf("Failed to get upload URL: %v", err) } @@ -311,7 +359,10 @@ func (f *Fs) list(prefix string, limit int, hidden bool, fn listFn) error { opts.Path = "/b2_list_file_versions" } for { - _, err = f.srv.CallJSON(&opts, &request, &response) + err := f.pacer.Call(func() (bool, error) { + resp, err := f.srv.CallJSON(&opts, &request, &response) + return f.shouldRetry(resp, err) + }) if err != nil { if err == errEndList { return nil @@ -379,7 +430,10 @@ func (f *Fs) listBuckets(fn listBucketFn) error { Method: "POST", Path: "/b2_list_buckets", } - _, err := f.srv.CallJSON(&opts, &account, &response) + err := f.pacer.Call(func() (bool, error) { + resp, err := f.srv.CallJSON(&opts, &account, &response) + return f.shouldRetry(resp, err) + }) if err != nil { return err } @@ -498,7 +552,10 @@ func (f *Fs) Mkdir() error { Type: "allPrivate", } var response api.Bucket - _, err := f.srv.CallJSON(&opts, &request, &response) + err := f.pacer.Call(func() (bool, error) { + resp, err := f.srv.CallJSON(&opts, &request, &response) + return f.shouldRetry(resp, err) + }) if err != nil { if apiErr, ok := err.(*api.Error); ok { if apiErr.Code == "duplicate_bucket_name" { @@ -531,7 +588,10 @@ func (f *Fs) Rmdir() error { AccountID: f.info.AccountID, } var response api.Bucket - _, err = f.srv.CallJSON(&opts, &request, &response) + err = f.pacer.Call(func() (bool, error) { + resp, err := f.srv.CallJSON(&opts, &request, &response) + return f.shouldRetry(resp, err) + }) if err != nil { return fmt.Errorf("Failed to delete bucket: %v", err) } @@ -556,7 +616,10 @@ func (f *Fs) deleteByID(ID, Name string) error { Name: Name, } var response api.File - _, err := f.srv.CallJSON(&opts, &request, &response) + err := f.pacer.Call(func() (bool, error) { + resp, err := f.srv.CallJSON(&opts, &request, &response) + return f.shouldRetry(resp, err) + }) if err != nil { return fmt.Errorf("Failed to delete %q: %v", Name, err) } @@ -737,7 +800,10 @@ func (o *Object) readFileMetadata() error { ID: o.info.ID, } var response api.FileInfo - _, err = o.fs.srv.CallJSON(&opts, &request, &response) + err = o.fs.pacer.Call(func() (bool, error) { + resp, err := o.fs.srv.CallJSON(&opts, &request, &response) + return o.fs.shouldRetry(resp, err) + }) if err != nil { fs.Debug(o, "Failed to get file info: %v", err) return err @@ -833,7 +899,11 @@ func (o *Object) Open() (in io.ReadCloser, err error) { Absolute: true, Path: o.fs.info.DownloadURL + "/file/" + urlEncode(o.fs.bucket) + "/" + urlEncode(o.fs.root+o.remote), } - resp, err := o.fs.srv.Call(&opts) + var resp *http.Response + err = o.fs.pacer.Call(func() (bool, error) { + resp, err = o.fs.srv.Call(&opts) + return o.fs.shouldRetry(resp, err) + }) if err != nil { return nil, fmt.Errorf("Failed to open for download: %v", err) } @@ -1005,9 +1075,13 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) { ContentLength: &size, } var response api.FileInfo - _, err = o.fs.srv.CallJSON(&opts, nil, &response) + // Don't retry, return a retry error instead + err = o.fs.pacer.CallNoRetry(func() (bool, error) { + resp, err := o.fs.srv.CallJSON(&opts, nil, &response) + return o.fs.shouldRetry(resp, err) + }) if err != nil { - return fmt.Errorf("Failed to upload: %v", err) + return err } o.info.ID = response.ID o.info.Name = response.Name @@ -1030,10 +1104,13 @@ func (o *Object) Remove() error { } var request = api.HideFileRequest{ BucketID: bucketID, - Name: o.info.Name, + Name: o.fs.root + o.remote, } var response api.File - _, err = o.fs.srv.CallJSON(&opts, &request, &response) + err = o.fs.pacer.Call(func() (bool, error) { + resp, err := o.fs.srv.CallJSON(&opts, &request, &response) + return o.fs.shouldRetry(resp, err) + }) if err != nil { return fmt.Errorf("Failed to delete file: %v", err) }