s3: re-implement multipart upload to fix memory issues

There have been quite a few reports of problems with the multipart
uploader using too much memory and not retrying possible errors.

Before this change the multipart uploader used the s3manager
abstraction in the AWS SDK.  There are numerous bug reports of this
using up too much memory.

This change re-implements a much simplified version of the s3manager
code specialized for rclone's purposes.

This should use much less memory and retry chunks properly.

See: https://forum.rclone.org/t/memory-usage-s3-alike-to-glacier-without-big-directories/13563
See: https://forum.rclone.org/t/copy-from-local-to-s3-has-high-memory-usage/13405
See: https://forum.rclone.org/t/big-file-upload-to-s3-fails/13575
This commit is contained in:
Nick Craig-Wood 2019-12-30 23:17:06 +00:00
parent 2e0774f3cf
commit 7e6fac8b1e

View File

@ -14,7 +14,9 @@ What happens if you CTRL-C a multipart upload
*/
import (
"bytes"
"context"
"crypto/md5"
"encoding/base64"
"encoding/hex"
"encoding/xml"
@ -24,6 +26,7 @@ import (
"net/url"
"path"
"regexp"
"sort"
"strconv"
"strings"
"sync"
@ -39,7 +42,6 @@ import (
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/ncw/swift"
"github.com/pkg/errors"
"github.com/rclone/rclone/fs"
@ -52,7 +54,9 @@ import (
"github.com/rclone/rclone/fs/walk"
"github.com/rclone/rclone/lib/bucket"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/readers"
"github.com/rclone/rclone/lib/rest"
"golang.org/x/sync/errgroup"
)
const enc = encodings.S3
@ -814,7 +818,8 @@ const (
metaMD5Hash = "Md5chksum" // the meta key to store md5hash in
maxRetries = 10 // number of retries to make of operations
maxSizeForCopy = 5 * 1024 * 1024 * 1024 // The maximum size of object we can COPY
minChunkSize = fs.SizeSuffix(s3manager.MinUploadPartSize)
maxUploadParts = 10000 // maximum allowed number of parts in a multi-part upload
minChunkSize = fs.SizeSuffix(1024 * 1024 * 5)
defaultUploadCutoff = fs.SizeSuffix(200 * 1024 * 1024)
maxUploadCutoff = fs.SizeSuffix(5 * 1024 * 1024 * 1024)
minSleep = 10 * time.Millisecond // In case of error, start at 10ms sleep.
@ -2023,6 +2028,193 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
var warnStreamUpload sync.Once
func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (err error) {
f := o.fs
// make concurrency machinery
concurrency := f.opt.UploadConcurrency
if concurrency < 1 {
concurrency = 1
}
bufs := make(chan []byte, concurrency)
defer func() {
// empty the channel on exit
close(bufs)
for range bufs {
}
}()
for i := 0; i < concurrency; i++ {
bufs <- nil
}
// calculate size of parts
partSize := int(f.opt.ChunkSize)
// size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize
// buffers here (default 5MB). With a maximum number of parts (10,000) this will be a file of
// 48GB which seems like a not too unreasonable limit.
if size == -1 {
warnStreamUpload.Do(func() {
fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v",
f.opt.ChunkSize, fs.SizeSuffix(partSize*maxUploadParts))
})
} else {
// Adjust partSize until the number of parts is small enough.
if size/int64(partSize) >= maxUploadParts {
// Calculate partition size rounded up to the nearest MB
partSize = int((((size / maxUploadParts) >> 20) + 1) << 20)
}
}
var cout *s3.CreateMultipartUploadOutput
err = f.pacer.Call(func() (bool, error) {
var err error
cout, err = f.c.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{
Bucket: req.Bucket,
ACL: req.ACL,
Key: req.Key,
ContentType: req.ContentType,
Metadata: req.Metadata,
ServerSideEncryption: req.ServerSideEncryption,
SSEKMSKeyId: req.SSEKMSKeyId,
StorageClass: req.StorageClass,
})
return f.shouldRetry(err)
})
if err != nil {
return errors.Wrap(err, "multipart upload failed to initialise")
}
uid := cout.UploadId
defer func() {
if o.fs.opt.LeavePartsOnError {
return
}
if err != nil {
// We can try to abort the upload, but ignore the error.
fs.Debugf(o, "Cancelling multipart upload")
errCancel := f.pacer.Call(func() (bool, error) {
_, err := f.c.AbortMultipartUploadWithContext(ctx, &s3.AbortMultipartUploadInput{
Bucket: req.Bucket,
Key: req.Key,
UploadId: uid,
RequestPayer: req.RequestPayer,
})
return f.shouldRetry(err)
})
if errCancel != nil {
fs.Debugf(o, "Failed to cancel multipart upload: %v", errCancel)
}
}
}()
var (
g, gCtx = errgroup.WithContext(ctx)
finished = false
partsMu sync.Mutex // to protect parts
parts []*s3.CompletedPart
off int64
)
for partNum := int64(1); !finished; partNum++ {
// Get a block of memory from the channel (which limits concurrency)
buf := <-bufs
if buf == nil {
buf = make([]byte, partSize)
}
// Read the chunk
var n int
n, err = readers.ReadFill(in, buf) // this can never return 0, nil
if err == io.EOF {
if n == 0 {
break
}
finished = true
} else if err != nil {
return errors.Wrap(err, "multipart upload failed to read source")
}
buf = buf[:n]
partNum := partNum
fs.Debugf(o, "multipart upload starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(n), fs.SizeSuffix(off), fs.SizeSuffix(size))
off += int64(n)
g.Go(func() (err error) {
partLength := int64(len(buf))
// create checksum of buffer for integrity checking
md5sumBinary := md5.Sum(buf)
md5sum := base64.StdEncoding.EncodeToString(md5sumBinary[:])
err = f.pacer.Call(func() (bool, error) {
uploadPartReq := &s3.UploadPartInput{
Body: bytes.NewReader(buf),
Bucket: req.Bucket,
Key: req.Key,
PartNumber: &partNum,
UploadId: uid,
ContentMD5: &md5sum,
ContentLength: &partLength,
RequestPayer: req.RequestPayer,
SSECustomerAlgorithm: req.SSECustomerAlgorithm,
SSECustomerKey: req.SSECustomerKey,
SSECustomerKeyMD5: req.SSECustomerKeyMD5,
}
uout, err := f.c.UploadPartWithContext(gCtx, uploadPartReq)
if err != nil {
if partNum <= int64(concurrency) {
return f.shouldRetry(err)
}
// retry all chunks once have done the first batch
return true, err
}
partsMu.Lock()
parts = append(parts, &s3.CompletedPart{
PartNumber: &partNum,
ETag: uout.ETag,
})
partsMu.Unlock()
return false, nil
})
// return the memory
bufs <- buf[:partSize]
if err != nil {
return errors.Wrap(err, "multipart upload failed to upload part")
}
return nil
})
}
err = g.Wait()
if err != nil {
return err
}
// sort the completed parts by part number
sort.Slice(parts, func(i, j int) bool {
return *parts[i].PartNumber < *parts[j].PartNumber
})
err = f.pacer.Call(func() (bool, error) {
_, err := f.c.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{
Bucket: req.Bucket,
Key: req.Key,
MultipartUpload: &s3.CompletedMultipartUpload{
Parts: parts,
},
RequestPayer: req.RequestPayer,
UploadId: uid,
})
return f.shouldRetry(err)
})
if err != nil {
return errors.Wrap(err, "multipart upload failed to finalise")
}
return nil
}
// Update the Object from in with modTime and size
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
bucket, bucketPath := o.split()
@ -2034,31 +2226,6 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
size := src.Size()
multipart := size < 0 || size >= int64(o.fs.opt.UploadCutoff)
var uploader *s3manager.Uploader
if multipart {
uploader = s3manager.NewUploader(o.fs.ses, func(u *s3manager.Uploader) {
u.Concurrency = o.fs.opt.UploadConcurrency
u.LeavePartsOnError = o.fs.opt.LeavePartsOnError
u.S3 = o.fs.c
u.PartSize = int64(o.fs.opt.ChunkSize)
// size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize
// buffers here (default 5MB). With a maximum number of parts (10,000) this will be a file of
// 48GB which seems like a not too unreasonable limit.
if size == -1 {
warnStreamUpload.Do(func() {
fs.Logf(o.fs, "Streaming uploads using chunk size %v will have maximum file size of %v",
o.fs.opt.ChunkSize, fs.SizeSuffix(u.PartSize*s3manager.MaxUploadParts))
})
return
}
// Adjust PartSize until the number of parts is small enough.
if size/u.PartSize >= s3manager.MaxUploadParts {
// Calculate partition size rounded up to the nearest MB
u.PartSize = (((size / s3manager.MaxUploadParts) >> 20) + 1) << 20
}
})
}
// Set the mtime in the meta data
metadata := map[string]*string{
@ -2083,33 +2250,6 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
// Guess the content type
mimeType := fs.MimeType(ctx, src)
if multipart {
req := s3manager.UploadInput{
Bucket: &bucket,
ACL: &o.fs.opt.ACL,
Key: &bucketPath,
Body: in,
ContentType: &mimeType,
Metadata: metadata,
//ContentLength: &size,
}
if o.fs.opt.ServerSideEncryption != "" {
req.ServerSideEncryption = &o.fs.opt.ServerSideEncryption
}
if o.fs.opt.SSEKMSKeyID != "" {
req.SSEKMSKeyId = &o.fs.opt.SSEKMSKeyID
}
if o.fs.opt.StorageClass != "" {
req.StorageClass = &o.fs.opt.StorageClass
}
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
_, err = uploader.UploadWithContext(ctx, &req)
return o.fs.shouldRetry(err)
})
if err != nil {
return err
}
} else {
req := s3.PutObjectInput{
Bucket: &bucket,
ACL: &o.fs.opt.ACL,
@ -2130,6 +2270,13 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
req.StorageClass = &o.fs.opt.StorageClass
}
if multipart {
err = o.uploadMultipart(ctx, &req, size, in)
if err != nil {
return err
}
} else {
// Create the request
putObj, _ := o.fs.c.PutObjectRequest(&req)