s3: Auto detect region for buckets on operation failure - fixes #2915

If an incorrect region error is returned while using a bucket then the
region is updated, the session is remade and the operation is retried.
This commit is contained in:
Nick Craig-Wood 2019-01-16 13:35:19 +00:00
parent 0855608bc1
commit e31578e03c

View File

@ -833,7 +833,7 @@ var retryErrorCodes = []int{
//S3 is pretty resilient, and the built in retry handling is probably sufficient
// as it should notice closed connections and timeouts which are the most likely
// sort of failure modes
func shouldRetry(err error) (bool, error) {
func (f *Fs) shouldRetry(err error) (bool, error) {
// If this is an awserr object, try and extract more useful information to determine if we should retry
if awsError, ok := err.(awserr.Error); ok {
// Simple case, check the original embedded error in case it's generically retriable
@ -842,6 +842,15 @@ func shouldRetry(err error) (bool, error) {
}
// Failing that, if it's a RequestFailure it's probably got an http status code we can check
if reqErr, ok := err.(awserr.RequestFailure); ok {
// 301 if wrong region for bucket
if reqErr.StatusCode() == http.StatusMovedPermanently {
urfbErr := f.updateRegionForBucket()
if urfbErr != nil {
fs.Errorf(f, "Failed to update region for bucket: %v", urfbErr)
return false, err
}
return true, err
}
for _, e := range retryErrorCodes {
if reqErr.StatusCode() == e {
return true, err
@ -930,12 +939,17 @@ func s3Connection(opt *Options) (*s3.S3, *session.Session, error) {
opt.ForcePathStyle = false
}
awsConfig := aws.NewConfig().
WithRegion(opt.Region).
WithMaxRetries(maxRetries).
WithCredentials(cred).
WithEndpoint(opt.Endpoint).
WithHTTPClient(fshttp.NewClient(fs.Config)).
WithS3ForcePathStyle(opt.ForcePathStyle)
if opt.Region != "" {
awsConfig.WithRegion(opt.Region)
}
if opt.Endpoint != "" {
awsConfig.WithEndpoint(opt.Endpoint)
}
// awsConfig.WithLogLevel(aws.LogDebugWithSigning)
awsSessionOpts := session.Options{
Config: *awsConfig,
@ -1052,7 +1066,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
}
err = f.pacer.Call(func() (bool, error) {
_, err = f.c.HeadObject(&req)
return shouldRetry(err)
return f.shouldRetry(err)
})
if err == nil {
f.root = path.Dir(directory)
@ -1102,6 +1116,51 @@ func (f *Fs) NewObject(remote string) (fs.Object, error) {
return f.newObjectWithInfo(remote, nil)
}
// Gets the bucket location
func (f *Fs) getBucketLocation() (string, error) {
req := s3.GetBucketLocationInput{
Bucket: &f.bucket,
}
var resp *s3.GetBucketLocationOutput
var err error
err = f.pacer.Call(func() (bool, error) {
resp, err = f.c.GetBucketLocation(&req)
return f.shouldRetry(err)
})
if err != nil {
return "", err
}
return s3.NormalizeBucketLocation(aws.StringValue(resp.LocationConstraint)), nil
}
// Updates the region for the bucket by reading the region from the
// bucket then updating the session.
func (f *Fs) updateRegionForBucket() error {
region, err := f.getBucketLocation()
if err != nil {
return errors.Wrap(err, "reading bucket location failed")
}
if aws.StringValue(f.c.Config.Endpoint) != "" {
return errors.Errorf("can't set region to %q as endpoint is set", region)
}
if aws.StringValue(f.c.Config.Region) == region {
return errors.Errorf("region is already %q - not updating", region)
}
// Make a new session with the new region
oldRegion := f.opt.Region
f.opt.Region = region
c, ses, err := s3Connection(&f.opt)
if err != nil {
return errors.Wrap(err, "creating new session failed")
}
f.c = c
f.ses = ses
fs.Logf(f, "Switched region to %q from %q", region, oldRegion)
return nil
}
// listFn is called from list to handle an object.
type listFn func(remote string, object *s3.Object, isDirectory bool) error
@ -1134,7 +1193,7 @@ func (f *Fs) list(dir string, recurse bool, fn listFn) error {
var err error
err = f.pacer.Call(func() (bool, error) {
resp, err = f.c.ListObjects(&req)
return shouldRetry(err)
return f.shouldRetry(err)
})
if err != nil {
if awsErr, ok := err.(awserr.RequestFailure); ok {
@ -1263,7 +1322,7 @@ func (f *Fs) listBuckets(dir string) (entries fs.DirEntries, err error) {
var resp *s3.ListBucketsOutput
err = f.pacer.Call(func() (bool, error) {
resp, err = f.c.ListBuckets(&req)
return shouldRetry(err)
return f.shouldRetry(err)
})
if err != nil {
return nil, err
@ -1351,7 +1410,7 @@ func (f *Fs) dirExists() (bool, error) {
}
err := f.pacer.Call(func() (bool, error) {
_, err := f.c.HeadBucket(&req)
return shouldRetry(err)
return f.shouldRetry(err)
})
if err == nil {
return true, nil
@ -1391,7 +1450,7 @@ func (f *Fs) Mkdir(dir string) error {
}
err := f.pacer.Call(func() (bool, error) {
_, err := f.c.CreateBucket(&req)
return shouldRetry(err)
return f.shouldRetry(err)
})
if err, ok := err.(awserr.Error); ok {
if err.Code() == "BucketAlreadyOwnedByYou" {
@ -1420,7 +1479,7 @@ func (f *Fs) Rmdir(dir string) error {
}
err := f.pacer.Call(func() (bool, error) {
_, err := f.c.DeleteBucket(&req)
return shouldRetry(err)
return f.shouldRetry(err)
})
if err == nil {
f.bucketOK = false
@ -1481,7 +1540,7 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) {
}
err = f.pacer.Call(func() (bool, error) {
_, err = f.c.CopyObject(&req)
return shouldRetry(err)
return f.shouldRetry(err)
})
if err != nil {
return nil, err
@ -1563,7 +1622,7 @@ func (o *Object) readMetaData() (err error) {
err = o.fs.pacer.Call(func() (bool, error) {
var err error
resp, err = o.fs.c.HeadObject(&req)
return shouldRetry(err)
return o.fs.shouldRetry(err)
})
if err != nil {
if awsErr, ok := err.(awserr.RequestFailure); ok {
@ -1659,7 +1718,7 @@ func (o *Object) SetModTime(modTime time.Time) error {
}
err = o.fs.pacer.Call(func() (bool, error) {
_, err := o.fs.c.CopyObject(&req)
return shouldRetry(err)
return o.fs.shouldRetry(err)
})
return err
}
@ -1691,7 +1750,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
err = o.fs.pacer.Call(func() (bool, error) {
var err error
resp, err = o.fs.c.GetObject(&req)
return shouldRetry(err)
return o.fs.shouldRetry(err)
})
if err, ok := err.(awserr.RequestFailure); ok {
if err.Code() == "InvalidObjectState" {
@ -1782,7 +1841,7 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
}
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
_, err = uploader.Upload(&req)
return shouldRetry(err)
return o.fs.shouldRetry(err)
})
if err != nil {
return err
@ -1838,11 +1897,11 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
resp, err := o.fs.srv.Do(httpReq)
if err != nil {
return shouldRetry(err)
return o.fs.shouldRetry(err)
}
body, err := rest.ReadBody(resp)
if err != nil {
return shouldRetry(err)
return o.fs.shouldRetry(err)
}
if resp.StatusCode >= 200 && resp.StatusCode < 299 {
return false, nil
@ -1870,7 +1929,7 @@ func (o *Object) Remove() error {
}
err := o.fs.pacer.Call(func() (bool, error) {
_, err := o.fs.c.DeleteObject(&req)
return shouldRetry(err)
return o.fs.shouldRetry(err)
})
return err
}