diff --git a/backend/s3/s3.go b/backend/s3/s3.go index 4b7b93a27..4aa3bd8a0 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -14,7 +14,9 @@ What happens if you CTRL-C a multipart upload */ import ( + "bytes" "context" + "crypto/md5" "encoding/base64" "encoding/hex" "encoding/xml" @@ -24,6 +26,7 @@ import ( "net/url" "path" "regexp" + "sort" "strconv" "strings" "sync" @@ -39,7 +42,6 @@ import ( "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" - "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/ncw/swift" "github.com/pkg/errors" "github.com/rclone/rclone/fs" @@ -52,7 +54,9 @@ import ( "github.com/rclone/rclone/fs/walk" "github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/pacer" + "github.com/rclone/rclone/lib/readers" "github.com/rclone/rclone/lib/rest" + "golang.org/x/sync/errgroup" ) const enc = encodings.S3 @@ -814,7 +818,8 @@ const ( metaMD5Hash = "Md5chksum" // the meta key to store md5hash in maxRetries = 10 // number of retries to make of operations maxSizeForCopy = 5 * 1024 * 1024 * 1024 // The maximum size of object we can COPY - minChunkSize = fs.SizeSuffix(s3manager.MinUploadPartSize) + maxUploadParts = 10000 // maximum allowed number of parts in a multi-part upload + minChunkSize = fs.SizeSuffix(1024 * 1024 * 5) defaultUploadCutoff = fs.SizeSuffix(200 * 1024 * 1024) maxUploadCutoff = fs.SizeSuffix(5 * 1024 * 1024 * 1024) minSleep = 10 * time.Millisecond // In case of error, start at 10ms sleep. @@ -2023,6 +2028,193 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read var warnStreamUpload sync.Once +func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (err error) { + f := o.fs + + // make concurrency machinery + concurrency := f.opt.UploadConcurrency + if concurrency < 1 { + concurrency = 1 + } + bufs := make(chan []byte, concurrency) + defer func() { + // empty the channel on exit + close(bufs) + for range bufs { + } + }() + for i := 0; i < concurrency; i++ { + bufs <- nil + } + + // calculate size of parts + partSize := int(f.opt.ChunkSize) + + // size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize + // buffers here (default 5MB). With a maximum number of parts (10,000) this will be a file of + // 48GB which seems like a not too unreasonable limit. + if size == -1 { + warnStreamUpload.Do(func() { + fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v", + f.opt.ChunkSize, fs.SizeSuffix(partSize*maxUploadParts)) + }) + } else { + // Adjust partSize until the number of parts is small enough. + if size/int64(partSize) >= maxUploadParts { + // Calculate partition size rounded up to the nearest MB + partSize = int((((size / maxUploadParts) >> 20) + 1) << 20) + } + } + + var cout *s3.CreateMultipartUploadOutput + err = f.pacer.Call(func() (bool, error) { + var err error + cout, err = f.c.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{ + Bucket: req.Bucket, + ACL: req.ACL, + Key: req.Key, + ContentType: req.ContentType, + Metadata: req.Metadata, + ServerSideEncryption: req.ServerSideEncryption, + SSEKMSKeyId: req.SSEKMSKeyId, + StorageClass: req.StorageClass, + }) + return f.shouldRetry(err) + }) + if err != nil { + return errors.Wrap(err, "multipart upload failed to initialise") + } + uid := cout.UploadId + + defer func() { + if o.fs.opt.LeavePartsOnError { + return + } + if err != nil { + // We can try to abort the upload, but ignore the error. + fs.Debugf(o, "Cancelling multipart upload") + errCancel := f.pacer.Call(func() (bool, error) { + _, err := f.c.AbortMultipartUploadWithContext(ctx, &s3.AbortMultipartUploadInput{ + Bucket: req.Bucket, + Key: req.Key, + UploadId: uid, + RequestPayer: req.RequestPayer, + }) + return f.shouldRetry(err) + }) + if errCancel != nil { + fs.Debugf(o, "Failed to cancel multipart upload: %v", errCancel) + } + } + }() + + var ( + g, gCtx = errgroup.WithContext(ctx) + finished = false + partsMu sync.Mutex // to protect parts + parts []*s3.CompletedPart + off int64 + ) + + for partNum := int64(1); !finished; partNum++ { + // Get a block of memory from the channel (which limits concurrency) + buf := <-bufs + if buf == nil { + buf = make([]byte, partSize) + } + + // Read the chunk + var n int + n, err = readers.ReadFill(in, buf) // this can never return 0, nil + if err == io.EOF { + if n == 0 { + break + } + finished = true + } else if err != nil { + return errors.Wrap(err, "multipart upload failed to read source") + } + buf = buf[:n] + + partNum := partNum + fs.Debugf(o, "multipart upload starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(n), fs.SizeSuffix(off), fs.SizeSuffix(size)) + off += int64(n) + g.Go(func() (err error) { + partLength := int64(len(buf)) + + // create checksum of buffer for integrity checking + md5sumBinary := md5.Sum(buf) + md5sum := base64.StdEncoding.EncodeToString(md5sumBinary[:]) + + err = f.pacer.Call(func() (bool, error) { + uploadPartReq := &s3.UploadPartInput{ + Body: bytes.NewReader(buf), + Bucket: req.Bucket, + Key: req.Key, + PartNumber: &partNum, + UploadId: uid, + ContentMD5: &md5sum, + ContentLength: &partLength, + RequestPayer: req.RequestPayer, + SSECustomerAlgorithm: req.SSECustomerAlgorithm, + SSECustomerKey: req.SSECustomerKey, + SSECustomerKeyMD5: req.SSECustomerKeyMD5, + } + uout, err := f.c.UploadPartWithContext(gCtx, uploadPartReq) + if err != nil { + if partNum <= int64(concurrency) { + return f.shouldRetry(err) + } + // retry all chunks once have done the first batch + return true, err + } + partsMu.Lock() + parts = append(parts, &s3.CompletedPart{ + PartNumber: &partNum, + ETag: uout.ETag, + }) + partsMu.Unlock() + + return false, nil + }) + + // return the memory + bufs <- buf[:partSize] + + if err != nil { + return errors.Wrap(err, "multipart upload failed to upload part") + } + return nil + }) + } + err = g.Wait() + if err != nil { + return err + } + + // sort the completed parts by part number + sort.Slice(parts, func(i, j int) bool { + return *parts[i].PartNumber < *parts[j].PartNumber + }) + + err = f.pacer.Call(func() (bool, error) { + _, err := f.c.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: req.Bucket, + Key: req.Key, + MultipartUpload: &s3.CompletedMultipartUpload{ + Parts: parts, + }, + RequestPayer: req.RequestPayer, + UploadId: uid, + }) + return f.shouldRetry(err) + }) + if err != nil { + return errors.Wrap(err, "multipart upload failed to finalise") + } + return nil +} + // Update the Object from in with modTime and size func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { bucket, bucketPath := o.split() @@ -2034,31 +2226,6 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op size := src.Size() multipart := size < 0 || size >= int64(o.fs.opt.UploadCutoff) - var uploader *s3manager.Uploader - if multipart { - uploader = s3manager.NewUploader(o.fs.ses, func(u *s3manager.Uploader) { - u.Concurrency = o.fs.opt.UploadConcurrency - u.LeavePartsOnError = o.fs.opt.LeavePartsOnError - u.S3 = o.fs.c - u.PartSize = int64(o.fs.opt.ChunkSize) - - // size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize - // buffers here (default 5MB). With a maximum number of parts (10,000) this will be a file of - // 48GB which seems like a not too unreasonable limit. - if size == -1 { - warnStreamUpload.Do(func() { - fs.Logf(o.fs, "Streaming uploads using chunk size %v will have maximum file size of %v", - o.fs.opt.ChunkSize, fs.SizeSuffix(u.PartSize*s3manager.MaxUploadParts)) - }) - return - } - // Adjust PartSize until the number of parts is small enough. - if size/u.PartSize >= s3manager.MaxUploadParts { - // Calculate partition size rounded up to the nearest MB - u.PartSize = (((size / s3manager.MaxUploadParts) >> 20) + 1) << 20 - } - }) - } // Set the mtime in the meta data metadata := map[string]*string{ @@ -2083,52 +2250,32 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op // Guess the content type mimeType := fs.MimeType(ctx, src) + req := s3.PutObjectInput{ + Bucket: &bucket, + ACL: &o.fs.opt.ACL, + Key: &bucketPath, + ContentType: &mimeType, + Metadata: metadata, + } + if md5sum != "" { + req.ContentMD5 = &md5sum + } + if o.fs.opt.ServerSideEncryption != "" { + req.ServerSideEncryption = &o.fs.opt.ServerSideEncryption + } + if o.fs.opt.SSEKMSKeyID != "" { + req.SSEKMSKeyId = &o.fs.opt.SSEKMSKeyID + } + if o.fs.opt.StorageClass != "" { + req.StorageClass = &o.fs.opt.StorageClass + } + if multipart { - req := s3manager.UploadInput{ - Bucket: &bucket, - ACL: &o.fs.opt.ACL, - Key: &bucketPath, - Body: in, - ContentType: &mimeType, - Metadata: metadata, - //ContentLength: &size, - } - if o.fs.opt.ServerSideEncryption != "" { - req.ServerSideEncryption = &o.fs.opt.ServerSideEncryption - } - if o.fs.opt.SSEKMSKeyID != "" { - req.SSEKMSKeyId = &o.fs.opt.SSEKMSKeyID - } - if o.fs.opt.StorageClass != "" { - req.StorageClass = &o.fs.opt.StorageClass - } - err = o.fs.pacer.CallNoRetry(func() (bool, error) { - _, err = uploader.UploadWithContext(ctx, &req) - return o.fs.shouldRetry(err) - }) + err = o.uploadMultipart(ctx, &req, size, in) if err != nil { return err } } else { - req := s3.PutObjectInput{ - Bucket: &bucket, - ACL: &o.fs.opt.ACL, - Key: &bucketPath, - ContentType: &mimeType, - Metadata: metadata, - } - if md5sum != "" { - req.ContentMD5 = &md5sum - } - if o.fs.opt.ServerSideEncryption != "" { - req.ServerSideEncryption = &o.fs.opt.ServerSideEncryption - } - if o.fs.opt.SSEKMSKeyID != "" { - req.SSEKMSKeyId = &o.fs.opt.SSEKMSKeyID - } - if o.fs.opt.StorageClass != "" { - req.StorageClass = &o.fs.opt.StorageClass - } // Create the request putObj, _ := o.fs.c.PutObjectRequest(&req)