From 7e6fac8b1eaeccbf48e197e2c1f12627c3e69ca5 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Mon, 30 Dec 2019 23:17:06 +0000 Subject: [PATCH] s3: re-implement multipart upload to fix memory issues There have been quite a few reports of problems with the multipart uploader using too much memory and not retrying possible errors. Before this change the multipart uploader used the s3manager abstraction in the AWS SDK. There are numerous bug reports of this using up too much memory. This change re-implements a much simplified version of the s3manager code specialized for rclone's purposes. This should use much less memory and retry chunks properly. See: https://forum.rclone.org/t/memory-usage-s3-alike-to-glacier-without-big-directories/13563 See: https://forum.rclone.org/t/copy-from-local-to-s3-has-high-memory-usage/13405 See: https://forum.rclone.org/t/big-file-upload-to-s3-fails/13575 --- backend/s3/s3.go | 283 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 215 insertions(+), 68 deletions(-) diff --git a/backend/s3/s3.go b/backend/s3/s3.go index 4b7b93a27..4aa3bd8a0 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -14,7 +14,9 @@ What happens if you CTRL-C a multipart upload */ import ( + "bytes" "context" + "crypto/md5" "encoding/base64" "encoding/hex" "encoding/xml" @@ -24,6 +26,7 @@ import ( "net/url" "path" "regexp" + "sort" "strconv" "strings" "sync" @@ -39,7 +42,6 @@ import ( "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" - "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/ncw/swift" "github.com/pkg/errors" "github.com/rclone/rclone/fs" @@ -52,7 +54,9 @@ import ( "github.com/rclone/rclone/fs/walk" "github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/pacer" + "github.com/rclone/rclone/lib/readers" "github.com/rclone/rclone/lib/rest" + "golang.org/x/sync/errgroup" ) const enc = encodings.S3 @@ -814,7 +818,8 @@ const ( metaMD5Hash = "Md5chksum" // the meta key to store md5hash in maxRetries = 10 // number of retries to make of operations maxSizeForCopy = 5 * 1024 * 1024 * 1024 // The maximum size of object we can COPY - minChunkSize = fs.SizeSuffix(s3manager.MinUploadPartSize) + maxUploadParts = 10000 // maximum allowed number of parts in a multi-part upload + minChunkSize = fs.SizeSuffix(1024 * 1024 * 5) defaultUploadCutoff = fs.SizeSuffix(200 * 1024 * 1024) maxUploadCutoff = fs.SizeSuffix(5 * 1024 * 1024 * 1024) minSleep = 10 * time.Millisecond // In case of error, start at 10ms sleep. @@ -2023,6 +2028,193 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read var warnStreamUpload sync.Once +func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (err error) { + f := o.fs + + // make concurrency machinery + concurrency := f.opt.UploadConcurrency + if concurrency < 1 { + concurrency = 1 + } + bufs := make(chan []byte, concurrency) + defer func() { + // empty the channel on exit + close(bufs) + for range bufs { + } + }() + for i := 0; i < concurrency; i++ { + bufs <- nil + } + + // calculate size of parts + partSize := int(f.opt.ChunkSize) + + // size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize + // buffers here (default 5MB). With a maximum number of parts (10,000) this will be a file of + // 48GB which seems like a not too unreasonable limit. + if size == -1 { + warnStreamUpload.Do(func() { + fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v", + f.opt.ChunkSize, fs.SizeSuffix(partSize*maxUploadParts)) + }) + } else { + // Adjust partSize until the number of parts is small enough. + if size/int64(partSize) >= maxUploadParts { + // Calculate partition size rounded up to the nearest MB + partSize = int((((size / maxUploadParts) >> 20) + 1) << 20) + } + } + + var cout *s3.CreateMultipartUploadOutput + err = f.pacer.Call(func() (bool, error) { + var err error + cout, err = f.c.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{ + Bucket: req.Bucket, + ACL: req.ACL, + Key: req.Key, + ContentType: req.ContentType, + Metadata: req.Metadata, + ServerSideEncryption: req.ServerSideEncryption, + SSEKMSKeyId: req.SSEKMSKeyId, + StorageClass: req.StorageClass, + }) + return f.shouldRetry(err) + }) + if err != nil { + return errors.Wrap(err, "multipart upload failed to initialise") + } + uid := cout.UploadId + + defer func() { + if o.fs.opt.LeavePartsOnError { + return + } + if err != nil { + // We can try to abort the upload, but ignore the error. + fs.Debugf(o, "Cancelling multipart upload") + errCancel := f.pacer.Call(func() (bool, error) { + _, err := f.c.AbortMultipartUploadWithContext(ctx, &s3.AbortMultipartUploadInput{ + Bucket: req.Bucket, + Key: req.Key, + UploadId: uid, + RequestPayer: req.RequestPayer, + }) + return f.shouldRetry(err) + }) + if errCancel != nil { + fs.Debugf(o, "Failed to cancel multipart upload: %v", errCancel) + } + } + }() + + var ( + g, gCtx = errgroup.WithContext(ctx) + finished = false + partsMu sync.Mutex // to protect parts + parts []*s3.CompletedPart + off int64 + ) + + for partNum := int64(1); !finished; partNum++ { + // Get a block of memory from the channel (which limits concurrency) + buf := <-bufs + if buf == nil { + buf = make([]byte, partSize) + } + + // Read the chunk + var n int + n, err = readers.ReadFill(in, buf) // this can never return 0, nil + if err == io.EOF { + if n == 0 { + break + } + finished = true + } else if err != nil { + return errors.Wrap(err, "multipart upload failed to read source") + } + buf = buf[:n] + + partNum := partNum + fs.Debugf(o, "multipart upload starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(n), fs.SizeSuffix(off), fs.SizeSuffix(size)) + off += int64(n) + g.Go(func() (err error) { + partLength := int64(len(buf)) + + // create checksum of buffer for integrity checking + md5sumBinary := md5.Sum(buf) + md5sum := base64.StdEncoding.EncodeToString(md5sumBinary[:]) + + err = f.pacer.Call(func() (bool, error) { + uploadPartReq := &s3.UploadPartInput{ + Body: bytes.NewReader(buf), + Bucket: req.Bucket, + Key: req.Key, + PartNumber: &partNum, + UploadId: uid, + ContentMD5: &md5sum, + ContentLength: &partLength, + RequestPayer: req.RequestPayer, + SSECustomerAlgorithm: req.SSECustomerAlgorithm, + SSECustomerKey: req.SSECustomerKey, + SSECustomerKeyMD5: req.SSECustomerKeyMD5, + } + uout, err := f.c.UploadPartWithContext(gCtx, uploadPartReq) + if err != nil { + if partNum <= int64(concurrency) { + return f.shouldRetry(err) + } + // retry all chunks once have done the first batch + return true, err + } + partsMu.Lock() + parts = append(parts, &s3.CompletedPart{ + PartNumber: &partNum, + ETag: uout.ETag, + }) + partsMu.Unlock() + + return false, nil + }) + + // return the memory + bufs <- buf[:partSize] + + if err != nil { + return errors.Wrap(err, "multipart upload failed to upload part") + } + return nil + }) + } + err = g.Wait() + if err != nil { + return err + } + + // sort the completed parts by part number + sort.Slice(parts, func(i, j int) bool { + return *parts[i].PartNumber < *parts[j].PartNumber + }) + + err = f.pacer.Call(func() (bool, error) { + _, err := f.c.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: req.Bucket, + Key: req.Key, + MultipartUpload: &s3.CompletedMultipartUpload{ + Parts: parts, + }, + RequestPayer: req.RequestPayer, + UploadId: uid, + }) + return f.shouldRetry(err) + }) + if err != nil { + return errors.Wrap(err, "multipart upload failed to finalise") + } + return nil +} + // Update the Object from in with modTime and size func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { bucket, bucketPath := o.split() @@ -2034,31 +2226,6 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op size := src.Size() multipart := size < 0 || size >= int64(o.fs.opt.UploadCutoff) - var uploader *s3manager.Uploader - if multipart { - uploader = s3manager.NewUploader(o.fs.ses, func(u *s3manager.Uploader) { - u.Concurrency = o.fs.opt.UploadConcurrency - u.LeavePartsOnError = o.fs.opt.LeavePartsOnError - u.S3 = o.fs.c - u.PartSize = int64(o.fs.opt.ChunkSize) - - // size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize - // buffers here (default 5MB). With a maximum number of parts (10,000) this will be a file of - // 48GB which seems like a not too unreasonable limit. - if size == -1 { - warnStreamUpload.Do(func() { - fs.Logf(o.fs, "Streaming uploads using chunk size %v will have maximum file size of %v", - o.fs.opt.ChunkSize, fs.SizeSuffix(u.PartSize*s3manager.MaxUploadParts)) - }) - return - } - // Adjust PartSize until the number of parts is small enough. - if size/u.PartSize >= s3manager.MaxUploadParts { - // Calculate partition size rounded up to the nearest MB - u.PartSize = (((size / s3manager.MaxUploadParts) >> 20) + 1) << 20 - } - }) - } // Set the mtime in the meta data metadata := map[string]*string{ @@ -2083,52 +2250,32 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op // Guess the content type mimeType := fs.MimeType(ctx, src) + req := s3.PutObjectInput{ + Bucket: &bucket, + ACL: &o.fs.opt.ACL, + Key: &bucketPath, + ContentType: &mimeType, + Metadata: metadata, + } + if md5sum != "" { + req.ContentMD5 = &md5sum + } + if o.fs.opt.ServerSideEncryption != "" { + req.ServerSideEncryption = &o.fs.opt.ServerSideEncryption + } + if o.fs.opt.SSEKMSKeyID != "" { + req.SSEKMSKeyId = &o.fs.opt.SSEKMSKeyID + } + if o.fs.opt.StorageClass != "" { + req.StorageClass = &o.fs.opt.StorageClass + } + if multipart { - req := s3manager.UploadInput{ - Bucket: &bucket, - ACL: &o.fs.opt.ACL, - Key: &bucketPath, - Body: in, - ContentType: &mimeType, - Metadata: metadata, - //ContentLength: &size, - } - if o.fs.opt.ServerSideEncryption != "" { - req.ServerSideEncryption = &o.fs.opt.ServerSideEncryption - } - if o.fs.opt.SSEKMSKeyID != "" { - req.SSEKMSKeyId = &o.fs.opt.SSEKMSKeyID - } - if o.fs.opt.StorageClass != "" { - req.StorageClass = &o.fs.opt.StorageClass - } - err = o.fs.pacer.CallNoRetry(func() (bool, error) { - _, err = uploader.UploadWithContext(ctx, &req) - return o.fs.shouldRetry(err) - }) + err = o.uploadMultipart(ctx, &req, size, in) if err != nil { return err } } else { - req := s3.PutObjectInput{ - Bucket: &bucket, - ACL: &o.fs.opt.ACL, - Key: &bucketPath, - ContentType: &mimeType, - Metadata: metadata, - } - if md5sum != "" { - req.ContentMD5 = &md5sum - } - if o.fs.opt.ServerSideEncryption != "" { - req.ServerSideEncryption = &o.fs.opt.ServerSideEncryption - } - if o.fs.opt.SSEKMSKeyID != "" { - req.SSEKMSKeyId = &o.fs.opt.SSEKMSKeyID - } - if o.fs.opt.StorageClass != "" { - req.StorageClass = &o.fs.opt.StorageClass - } // Create the request putObj, _ := o.fs.c.PutObjectRequest(&req)