diff --git a/backend/s3/s3.go b/backend/s3/s3.go index dd894db18..4e2a15cde 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -12,6 +12,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "net/http" "net/url" "path" @@ -32,7 +33,6 @@ import ( "github.com/aws/aws-sdk-go/aws/endpoints" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" - v4 "github.com/aws/aws-sdk-go/aws/signer/v4" "github.com/aws/aws-sdk-go/service/s3" "github.com/ncw/swift/v2" "github.com/rclone/rclone/fs" @@ -2044,7 +2044,6 @@ type Fs struct { ctx context.Context // global context for reading config features *fs.Features // optional features c *s3.S3 // the connection to the s3 server - cu *s3.S3 // unsigned connection to the s3 server for PutObject ses *session.Session // the s3 session rootBucket string // bucket part of root (if any) rootDirectory string // directory part of root (if any) @@ -2181,11 +2180,7 @@ func getClient(ctx context.Context, opt *Options) *http.Client { } // s3Connection makes a connection to s3 -// -// If unsignedBody is set then the connection is configured for -// unsigned bodies which is necessary for PutObject if we don't want -// it to seek -func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S3, *s3.S3, *session.Session, error) { +func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S3, *session.Session, error) { ci := fs.GetConfig(ctx) // Make the auth v := credentials.Value{ @@ -2202,7 +2197,7 @@ func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S // start a new AWS session awsSession, err := session.NewSession() if err != nil { - return nil, nil, nil, fmt.Errorf("NewSession: %w", err) + return nil, nil, fmt.Errorf("NewSession: %w", err) } // first provider to supply a credential set "wins" @@ -2242,9 +2237,9 @@ func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S // if no access key/secret and iam is explicitly disabled then fall back to anon interaction cred = credentials.AnonymousCredentials case v.AccessKeyID == "": - return nil, nil, nil, errors.New("access_key_id not found") + return nil, nil, errors.New("access_key_id not found") case v.SecretAccessKey == "": - return nil, nil, nil, errors.New("secret_access_key not found") + return nil, nil, errors.New("secret_access_key not found") } if opt.Region == "" { @@ -2283,36 +2278,25 @@ func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S // (from the shared config file) if the passed-in Options.Config.Credentials is nil. awsSessionOpts.Config.Credentials = nil } - // Setting this stops PutObject reading the body twice and seeking - // We add our own Content-MD5 for data protection - awsSessionOpts.Config.S3DisableContentMD5Validation = aws.Bool(true) ses, err := session.NewSessionWithOptions(awsSessionOpts) if err != nil { - return nil, nil, nil, err + return nil, nil, err } - newC := func(unsignedBody bool) *s3.S3 { - c := s3.New(ses) - if opt.V2Auth || opt.Region == "other-v2-signature" { - fs.Debugf(nil, "Using v2 auth") - signer := func(req *request.Request) { - // Ignore AnonymousCredentials object - if req.Config.Credentials == credentials.AnonymousCredentials { - return - } - sign(v.AccessKeyID, v.SecretAccessKey, req.HTTPRequest) + c := s3.New(ses) + if opt.V2Auth || opt.Region == "other-v2-signature" { + fs.Debugf(nil, "Using v2 auth") + signer := func(req *request.Request) { + // Ignore AnonymousCredentials object + if req.Config.Credentials == credentials.AnonymousCredentials { + return } - c.Handlers.Sign.Clear() - c.Handlers.Sign.PushBackNamed(corehandlers.BuildContentLengthHandler) - c.Handlers.Sign.PushBack(signer) - } else if unsignedBody { - // If the body is unsigned then tell the signer that we aren't signing the payload - c.Handlers.Sign.Clear() - c.Handlers.Sign.PushBackNamed(corehandlers.BuildContentLengthHandler) - c.Handlers.Sign.PushBackNamed(v4.BuildNamedHandler("v4.SignRequestHandler.WithUnsignedPayload", v4.WithUnsignedPayload)) + sign(v.AccessKeyID, v.SecretAccessKey, req.HTTPRequest) } - return c + c.Handlers.Sign.Clear() + c.Handlers.Sign.PushBackNamed(corehandlers.BuildContentLengthHandler) + c.Handlers.Sign.PushBack(signer) } - return newC(false), newC(true), ses, nil + return c, ses, nil } func checkUploadChunkSize(cs fs.SizeSuffix) error { @@ -2501,7 +2485,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e opt.SSECustomerKeyMD5 = base64.StdEncoding.EncodeToString(md5sumBinary[:]) } srv := getClient(ctx, opt) - c, cu, ses, err := s3Connection(ctx, opt, srv) + c, ses, err := s3Connection(ctx, opt, srv) if err != nil { return nil, err } @@ -2519,7 +2503,6 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e ci: ci, ctx: ctx, c: c, - cu: cu, ses: ses, pacer: pc, cache: bucket.NewCache(), @@ -2643,12 +2626,11 @@ func (f *Fs) updateRegionForBucket(ctx context.Context, bucket string) error { // Make a new session with the new region oldRegion := f.opt.Region f.opt.Region = region - c, cu, ses, err := s3Connection(f.ctx, &f.opt, f.srv) + c, ses, err := s3Connection(f.ctx, &f.opt, f.srv) if err != nil { return fmt.Errorf("creating new session failed: %w", err) } f.c = c - f.cu = cu f.ses = ses fs.Logf(f, "Switched region to %q from %q", region, oldRegion) @@ -4143,48 +4125,23 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si return etag, nil } -// unWrapAwsError unwraps AWS errors, looking for a non AWS error -// -// It returns true if one was found and the error, or false and the -// error passed in. -func unWrapAwsError(err error) (found bool, outErr error) { - if awsErr, ok := err.(awserr.Error); ok { - var origErrs []error - if batchErr, ok := awsErr.(awserr.BatchError); ok { - origErrs = batchErr.OrigErrs() - } else { - origErrs = []error{awsErr.OrigErr()} - } - for _, origErr := range origErrs { - found, newErr := unWrapAwsError(origErr) - if found { - return found, newErr - } - } - return false, err - } - return true, err -} - // Upload a single part using PutObject func (o *Object) uploadSinglepartPutObject(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (etag string, lastModified time.Time, err error) { - req.Body = readers.NewFakeSeeker(in, size) - var resp *s3.PutObjectOutput + r, resp := o.fs.c.PutObjectRequest(req) + if req.ContentLength != nil && *req.ContentLength == 0 { + // Can't upload zero length files like this for some reason + r.Body = bytes.NewReader([]byte{}) + } else { + r.SetStreamingBody(ioutil.NopCloser(in)) + } + r.SetContext(ctx) + r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD") + err = o.fs.pacer.CallNoRetry(func() (bool, error) { - resp, err = o.fs.cu.PutObject(req) + err := r.Send() return o.fs.shouldRetry(ctx, err) }) if err != nil { - // Return the underlying error if we have a Serialization error if possible - // - // Serialization errors are synthesized locally in the SDK (not returned from the - // server). We'll get one if the SDK attempts a retry, however the FakeSeeker will - // remember the previous error from Read and return that. - if do, ok := err.(awserr.Error); ok && do.Code() == request.ErrCodeSerialization { - if found, newErr := unWrapAwsError(err); found { - err = newErr - } - } return etag, lastModified, err } lastModified = time.Now()