b2: Add pacing, retries and reauthentication - fixes #310

This commit is contained in:
Nick Craig-Wood 2016-02-23 22:15:20 +00:00
parent ee5e34a19c
commit 6f46270735

105
b2/b2.go
View File

@ -26,6 +26,7 @@ import (
"github.com/ncw/rclone/b2/api" "github.com/ncw/rclone/b2/api"
"github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs"
"github.com/ncw/rclone/pacer"
"github.com/ncw/rclone/rest" "github.com/ncw/rclone/rest"
) )
@ -35,6 +36,9 @@ const (
timeKey = "src_last_modified_millis" timeKey = "src_last_modified_millis"
timeHeader = headerPrefix + timeKey timeHeader = headerPrefix + timeKey
sha1Header = "X-Bz-Content-Sha1" sha1Header = "X-Bz-Content-Sha1"
minSleep = 10 * time.Millisecond
maxSleep = 2 * time.Second
decayConstant = 2 // bigger for slower decay, exponential
) )
// Register with Fs // Register with Fs
@ -72,6 +76,7 @@ type Fs struct {
uploadMu sync.Mutex // lock for upload variable uploadMu sync.Mutex // lock for upload variable
upload api.GetUploadURLResponse // result of get upload URL call upload api.GetUploadURLResponse // result of get upload URL call
authMu sync.Mutex // lock for authorizing the account authMu sync.Mutex // lock for authorizing the account
pacer *pacer.Pacer // To pace and retry the API calls
} }
// Object describes a b2 object // Object describes a b2 object
@ -123,6 +128,39 @@ func parsePath(path string) (bucket, directory string, err error) {
return return
} }
// retryErrorCodes is a slice of error codes that we will retry
var retryErrorCodes = []int{
401, // Unauthorized (eg "Token has expired")
408, // Request Timeout
429, // Rate exceeded.
500, // Get occasional 500 Internal Server Error
503, // Service Unavailable
504, // Gateway Time-out
}
// shouldRetryNoAuth returns a boolean as to whether this resp and err
// deserve to be retried. It returns the err as a convenience
func (f *Fs) shouldRetryNoReauth(resp *http.Response, err error) (bool, error) {
return fs.ShouldRetry(err) || fs.ShouldRetryHTTP(resp, retryErrorCodes), err
}
// shouldRetry returns a boolean as to whether this resp and err
// deserve to be retried. It returns the err as a convenience
func (f *Fs) shouldRetry(resp *http.Response, err error) (bool, error) {
if resp != nil && resp.StatusCode == 401 {
fs.Debug(f, "b2 auth token expired refetching")
// Reauth
authErr := f.authorizeAccount()
if authErr != nil {
err = authErr
}
// Refetch upload URL
f.clearUploadURL()
return true, err
}
return f.shouldRetryNoReauth(resp, err)
}
// errorHandler parses a non 2xx error response into an error // errorHandler parses a non 2xx error response into an error
func errorHandler(resp *http.Response) error { func errorHandler(resp *http.Response) error {
// Decode error response // Decode error response
@ -166,8 +204,12 @@ func NewFs(name, root string) (fs.Fs, error) {
key: key, key: key,
endpoint: endpoint, endpoint: endpoint,
srv: rest.NewClient(fs.Config.Client()).SetErrorHandler(errorHandler), srv: rest.NewClient(fs.Config.Client()).SetErrorHandler(errorHandler),
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
}
err = f.authorizeAccount()
if err != nil {
return nil, fmt.Errorf("Failed to authorize account: %v", err)
} }
f.authorizeAccount()
if f.root != "" { if f.root != "" {
f.root += "/" f.root += "/"
// Check to see if the (bucket,directory) is actually an existing file // Check to see if the (bucket,directory) is actually an existing file
@ -201,7 +243,10 @@ func (f *Fs) authorizeAccount() error {
Password: f.key, Password: f.key,
ExtraHeaders: map[string]string{"Authorization": ""}, // unset the Authorization for this request ExtraHeaders: map[string]string{"Authorization": ""}, // unset the Authorization for this request
} }
_, err := f.srv.CallJSON(&opts, nil, &f.info) err := f.pacer.Call(func() (bool, error) {
resp, err := f.srv.CallJSON(&opts, nil, &f.info)
return f.shouldRetryNoReauth(resp, err)
})
if err != nil { if err != nil {
return fmt.Errorf("Failed to authenticate: %v", err) return fmt.Errorf("Failed to authenticate: %v", err)
} }
@ -225,7 +270,10 @@ func (f *Fs) getUploadURL() (string, string, error) {
var request = api.GetUploadURLRequest{ var request = api.GetUploadURLRequest{
BucketID: bucketID, BucketID: bucketID,
} }
_, err := f.srv.CallJSON(&opts, &request, &f.upload) err := f.pacer.Call(func() (bool, error) {
resp, err := f.srv.CallJSON(&opts, &request, &f.upload)
return f.shouldRetryNoReauth(resp, err)
})
if err != nil { if err != nil {
return "", "", fmt.Errorf("Failed to get upload URL: %v", err) return "", "", fmt.Errorf("Failed to get upload URL: %v", err)
} }
@ -311,7 +359,10 @@ func (f *Fs) list(prefix string, limit int, hidden bool, fn listFn) error {
opts.Path = "/b2_list_file_versions" opts.Path = "/b2_list_file_versions"
} }
for { for {
_, err = f.srv.CallJSON(&opts, &request, &response) err := f.pacer.Call(func() (bool, error) {
resp, err := f.srv.CallJSON(&opts, &request, &response)
return f.shouldRetry(resp, err)
})
if err != nil { if err != nil {
if err == errEndList { if err == errEndList {
return nil return nil
@ -379,7 +430,10 @@ func (f *Fs) listBuckets(fn listBucketFn) error {
Method: "POST", Method: "POST",
Path: "/b2_list_buckets", Path: "/b2_list_buckets",
} }
_, err := f.srv.CallJSON(&opts, &account, &response) err := f.pacer.Call(func() (bool, error) {
resp, err := f.srv.CallJSON(&opts, &account, &response)
return f.shouldRetry(resp, err)
})
if err != nil { if err != nil {
return err return err
} }
@ -498,7 +552,10 @@ func (f *Fs) Mkdir() error {
Type: "allPrivate", Type: "allPrivate",
} }
var response api.Bucket var response api.Bucket
_, err := f.srv.CallJSON(&opts, &request, &response) err := f.pacer.Call(func() (bool, error) {
resp, err := f.srv.CallJSON(&opts, &request, &response)
return f.shouldRetry(resp, err)
})
if err != nil { if err != nil {
if apiErr, ok := err.(*api.Error); ok { if apiErr, ok := err.(*api.Error); ok {
if apiErr.Code == "duplicate_bucket_name" { if apiErr.Code == "duplicate_bucket_name" {
@ -531,7 +588,10 @@ func (f *Fs) Rmdir() error {
AccountID: f.info.AccountID, AccountID: f.info.AccountID,
} }
var response api.Bucket var response api.Bucket
_, err = f.srv.CallJSON(&opts, &request, &response) err = f.pacer.Call(func() (bool, error) {
resp, err := f.srv.CallJSON(&opts, &request, &response)
return f.shouldRetry(resp, err)
})
if err != nil { if err != nil {
return fmt.Errorf("Failed to delete bucket: %v", err) return fmt.Errorf("Failed to delete bucket: %v", err)
} }
@ -556,7 +616,10 @@ func (f *Fs) deleteByID(ID, Name string) error {
Name: Name, Name: Name,
} }
var response api.File var response api.File
_, err := f.srv.CallJSON(&opts, &request, &response) err := f.pacer.Call(func() (bool, error) {
resp, err := f.srv.CallJSON(&opts, &request, &response)
return f.shouldRetry(resp, err)
})
if err != nil { if err != nil {
return fmt.Errorf("Failed to delete %q: %v", Name, err) return fmt.Errorf("Failed to delete %q: %v", Name, err)
} }
@ -737,7 +800,10 @@ func (o *Object) readFileMetadata() error {
ID: o.info.ID, ID: o.info.ID,
} }
var response api.FileInfo var response api.FileInfo
_, err = o.fs.srv.CallJSON(&opts, &request, &response) err = o.fs.pacer.Call(func() (bool, error) {
resp, err := o.fs.srv.CallJSON(&opts, &request, &response)
return o.fs.shouldRetry(resp, err)
})
if err != nil { if err != nil {
fs.Debug(o, "Failed to get file info: %v", err) fs.Debug(o, "Failed to get file info: %v", err)
return err return err
@ -833,7 +899,11 @@ func (o *Object) Open() (in io.ReadCloser, err error) {
Absolute: true, Absolute: true,
Path: o.fs.info.DownloadURL + "/file/" + urlEncode(o.fs.bucket) + "/" + urlEncode(o.fs.root+o.remote), Path: o.fs.info.DownloadURL + "/file/" + urlEncode(o.fs.bucket) + "/" + urlEncode(o.fs.root+o.remote),
} }
resp, err := o.fs.srv.Call(&opts) var resp *http.Response
err = o.fs.pacer.Call(func() (bool, error) {
resp, err = o.fs.srv.Call(&opts)
return o.fs.shouldRetry(resp, err)
})
if err != nil { if err != nil {
return nil, fmt.Errorf("Failed to open for download: %v", err) return nil, fmt.Errorf("Failed to open for download: %v", err)
} }
@ -1005,9 +1075,13 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) {
ContentLength: &size, ContentLength: &size,
} }
var response api.FileInfo var response api.FileInfo
_, err = o.fs.srv.CallJSON(&opts, nil, &response) // Don't retry, return a retry error instead
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
resp, err := o.fs.srv.CallJSON(&opts, nil, &response)
return o.fs.shouldRetry(resp, err)
})
if err != nil { if err != nil {
return fmt.Errorf("Failed to upload: %v", err) return err
} }
o.info.ID = response.ID o.info.ID = response.ID
o.info.Name = response.Name o.info.Name = response.Name
@ -1030,10 +1104,13 @@ func (o *Object) Remove() error {
} }
var request = api.HideFileRequest{ var request = api.HideFileRequest{
BucketID: bucketID, BucketID: bucketID,
Name: o.info.Name, Name: o.fs.root + o.remote,
} }
var response api.File var response api.File
_, err = o.fs.srv.CallJSON(&opts, &request, &response) err = o.fs.pacer.Call(func() (bool, error) {
resp, err := o.fs.srv.CallJSON(&opts, &request, &response)
return o.fs.shouldRetry(resp, err)
})
if err != nil { if err != nil {
return fmt.Errorf("Failed to delete file: %v", err) return fmt.Errorf("Failed to delete file: %v", err)
} }