qingstor: add upload chunk size/concurrency/cutoff control #2851

* --upload-chunk-size
* --upload-concurrency
* --upload-cutoff
This commit is contained in:
Nick Craig-Wood 2019-01-04 11:24:20 +00:00
parent 9685be64cd
commit a2341cc412
4 changed files with 166 additions and 21 deletions

View File

@ -72,14 +72,51 @@ func init() {
Help: "Number of connection retries.",
Default: 3,
Advanced: true,
}, {
Name: "upload_cutoff",
Help: `Cutoff for switching to chunked upload
Any files larger than this will be uploaded in chunks of chunk_size.
The minimum is 0 and the maximum is 5GB.`,
Default: defaultUploadCutoff,
Advanced: true,
}, {
Name: "chunk_size",
Help: `Chunk size to use for uploading.
When uploading files larger than upload_cutoff they will be uploaded
as multipart uploads using this chunk size.
Note that "--qingstor-upload-concurrency" chunks of this size are buffered
in memory per transfer.
If you are transferring large files over high speed links and you have
enough memory, then increasing this will speed up the transfers.`,
Default: minChunkSize,
Advanced: true,
}, {
Name: "upload_concurrency",
Help: `Concurrency for multipart uploads.
This is the number of chunks of the same file that are uploaded
concurrently.
If you are uploading small numbers of large file over high speed link
and these uploads do not fully utilize your bandwidth, then increasing
this may help to speed up the transfers.`,
Default: 4,
Advanced: true,
}},
})
}
// Constants
const (
listLimitSize = 1000 // Number of items to read at once
maxSizeForCopy = 1024 * 1024 * 1024 * 5 // The maximum size of object we can COPY
listLimitSize = 1000 // Number of items to read at once
maxSizeForCopy = 1024 * 1024 * 1024 * 5 // The maximum size of object we can COPY
minChunkSize = fs.SizeSuffix(minMultiPartSize)
defaultUploadCutoff = fs.SizeSuffix(200 * 1024 * 1024)
maxUploadCutoff = fs.SizeSuffix(5 * 1024 * 1024 * 1024)
)
// Globals
@ -92,12 +129,15 @@ func timestampToTime(tp int64) time.Time {
// Options defines the configuration for this backend
type Options struct {
EnvAuth bool `config:"env_auth"`
AccessKeyID string `config:"access_key_id"`
SecretAccessKey string `config:"secret_access_key"`
Endpoint string `config:"endpoint"`
Zone string `config:"zone"`
ConnectionRetries int `config:"connection_retries"`
EnvAuth bool `config:"env_auth"`
AccessKeyID string `config:"access_key_id"`
SecretAccessKey string `config:"secret_access_key"`
Endpoint string `config:"endpoint"`
Zone string `config:"zone"`
ConnectionRetries int `config:"connection_retries"`
UploadCutoff fs.SizeSuffix `config:"upload_cutoff"`
ChunkSize fs.SizeSuffix `config:"chunk_size"`
UploadConcurrency int `config:"upload_concurrency"`
}
// Fs represents a remote qingstor server
@ -227,6 +267,36 @@ func qsServiceConnection(opt *Options) (*qs.Service, error) {
return qs.Init(cf)
}
func checkUploadChunkSize(cs fs.SizeSuffix) error {
if cs < minChunkSize {
return errors.Errorf("%s is less than %s", cs, minChunkSize)
}
return nil
}
func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
err = checkUploadChunkSize(cs)
if err == nil {
old, f.opt.ChunkSize = f.opt.ChunkSize, cs
}
return
}
func checkUploadCutoff(cs fs.SizeSuffix) error {
if cs > maxUploadCutoff {
return errors.Errorf("%s is greater than %s", cs, maxUploadCutoff)
}
return nil
}
func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
err = checkUploadCutoff(cs)
if err == nil {
old, f.opt.UploadCutoff = f.opt.UploadCutoff, cs
}
return
}
// NewFs constructs an Fs from the path, bucket:path
func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
// Parse config into Options struct
@ -235,6 +305,14 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
if err != nil {
return nil, err
}
err = checkUploadChunkSize(opt.ChunkSize)
if err != nil {
return nil, errors.Wrap(err, "qingstor: chunk size")
}
err = checkUploadCutoff(opt.UploadCutoff)
if err != nil {
return nil, errors.Wrap(err, "qingstor: upload cutoff")
}
bucket, key, err := qsParsePath(root)
if err != nil {
return nil, err
@ -913,16 +991,24 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
mimeType := fs.MimeType(src)
req := uploadInput{
body: in,
qsSvc: o.fs.svc,
bucket: o.fs.bucket,
zone: o.fs.zone,
key: key,
mimeType: mimeType,
body: in,
qsSvc: o.fs.svc,
bucket: o.fs.bucket,
zone: o.fs.zone,
key: key,
mimeType: mimeType,
partSize: int64(o.fs.opt.ChunkSize),
concurrency: o.fs.opt.UploadConcurrency,
}
uploader := newUploader(&req)
err = uploader.upload()
size := src.Size()
multipart := size < 0 || size >= int64(o.fs.opt.UploadCutoff)
if multipart {
err = uploader.upload()
} else {
err = uploader.singlePartUpload(in, size)
}
if err != nil {
return err
}

View File

@ -2,12 +2,12 @@
// +build !plan9
package qingstor_test
package qingstor
import (
"testing"
"github.com/ncw/rclone/backend/qingstor"
"github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fstest/fstests"
)
@ -15,6 +15,19 @@ import (
func TestIntegration(t *testing.T) {
fstests.Run(t, &fstests.Opt{
RemoteName: "TestQingStor:",
NilObject: (*qingstor.Object)(nil),
NilObject: (*Object)(nil),
ChunkedUpload: fstests.ChunkedUploadConfig{
MinChunkSize: minChunkSize,
},
})
}
func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
return f.setUploadChunkSize(cs)
}
func (f *Fs) SetUploadCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
return f.setUploadCutoff(cs)
}
var _ fstests.SetUploadChunkSizer = (*Fs)(nil)

View File

@ -152,11 +152,11 @@ func (u *uploader) init() {
}
// singlePartUpload upload a single object that contentLength less than "defaultUploadPartSize"
func (u *uploader) singlePartUpload(buf io.ReadSeeker) error {
func (u *uploader) singlePartUpload(buf io.Reader, size int64) error {
bucketInit, _ := u.bucketInit()
req := qs.PutObjectInput{
ContentLength: &u.readerPos,
ContentLength: &size,
ContentType: &u.cfg.mimeType,
Body: buf,
}
@ -180,7 +180,7 @@ func (u *uploader) upload() error {
reader, _, err := u.nextReader()
if err == io.EOF { // single part
fs.Debugf(u, "Uploading as single part object to QingStor")
return u.singlePartUpload(reader)
return u.singlePartUpload(reader, u.readerPos)
} else if err != nil {
return errors.Errorf("read upload data failed: %s", err)
}

View File

@ -234,4 +234,50 @@ Number of connection retries.
- Type: int
- Default: 3
#### --qingstor-upload-cutoff
Cutoff for switching to chunked upload
Any files larger than this will be uploaded in chunks of chunk_size.
The minimum is 0 and the maximum is 5GB.
- Config: upload_cutoff
- Env Var: RCLONE_QINGSTOR_UPLOAD_CUTOFF
- Type: SizeSuffix
- Default: 200M
#### --qingstor-chunk-size
Chunk size to use for uploading.
When uploading files larger than upload_cutoff they will be uploaded
as multipart uploads using this chunk size.
Note that "--qingstor-upload-concurrency" chunks of this size are buffered
in memory per transfer.
If you are transferring large files over high speed links and you have
enough memory, then increasing this will speed up the transfers.
- Config: chunk_size
- Env Var: RCLONE_QINGSTOR_CHUNK_SIZE
- Type: SizeSuffix
- Default: 4M
#### --qingstor-upload-concurrency
Concurrency for multipart uploads.
This is the number of chunks of the same file that are uploaded
concurrently.
If you are uploading small numbers of large file over high speed link
and these uploads do not fully utilize your bandwidth, then increasing
this may help to speed up the transfers.
- Config: upload_concurrency
- Env Var: RCLONE_QINGSTOR_UPLOAD_CONCURRENCY
- Type: int
- Default: 4
<!--- autogenerated options stop -->