backends: change OpenChunkWriter interface to allow backend concurrency override

Before this change the concurrency used for an upload was rather
inconsistent.

- if size below `--backend-upload-cutoff` (default 200M) do single part upload.

- if size below `--multi-thread-cutoff` (default 256M) or using streaming
  uploads (eg `rclone rcat) do multipart upload using
  `--backend-upload-concurrency` to set the concurrency used by the uploader.

- otherwise do multipart upload using `--multi-thread-streams` to set the
  concurrency.

This change makes the default for the concurrency used be the
`--backend-upload-concurrency`. If `--multi-thread-streams` is set and larger
than the `--backend-upload-concurrency` then that will be used instead.

This means that if the user sets `--backend-upload-concurrency` then it will be
obeyed for all multipart/multi-thread transfers and the user can override them
all with `--multi-thread-streams`.

See: #7056
This commit is contained in:
Nick Craig-Wood 2023-09-01 17:25:15 +01:00
parent a7337b0a95
commit 2db0e23584
8 changed files with 124 additions and 82 deletions

View File

@ -1986,7 +1986,7 @@ type azChunkWriter struct {
// //
// Pass in the remote and the src object // Pass in the remote and the src object
// You can also use options to hint at the desired chunk size // You can also use options to hint at the desired chunk size
func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (chunkSizeResult int64, writer fs.ChunkWriter, err error) { func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) {
// Temporary Object under construction // Temporary Object under construction
o := &Object{ o := &Object{
fs: f, fs: f,
@ -1994,7 +1994,7 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn
} }
ui, err := o.prepareUpload(ctx, src, options) ui, err := o.prepareUpload(ctx, src, options)
if err != nil { if err != nil {
return -1, nil, fmt.Errorf("failed to prepare upload: %w", err) return info, nil, fmt.Errorf("failed to prepare upload: %w", err)
} }
// Calculate correct partSize // Calculate correct partSize
@ -2020,7 +2020,7 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn
} else { } else {
partSize = chunksize.Calculator(remote, size, blockblob.MaxBlocks, f.opt.ChunkSize) partSize = chunksize.Calculator(remote, size, blockblob.MaxBlocks, f.opt.ChunkSize)
if partSize > fs.SizeSuffix(blockblob.MaxStageBlockBytes) { if partSize > fs.SizeSuffix(blockblob.MaxStageBlockBytes) {
return -1, nil, fmt.Errorf("can't upload as it is too big %v - takes more than %d chunks of %v", fs.SizeSuffix(size), fs.SizeSuffix(blockblob.MaxBlocks), fs.SizeSuffix(blockblob.MaxStageBlockBytes)) return info, nil, fmt.Errorf("can't upload as it is too big %v - takes more than %d chunks of %v", fs.SizeSuffix(size), fs.SizeSuffix(blockblob.MaxBlocks), fs.SizeSuffix(blockblob.MaxStageBlockBytes))
} }
totalParts = int(fs.SizeSuffix(size) / partSize) totalParts = int(fs.SizeSuffix(size) / partSize)
if fs.SizeSuffix(size)%partSize != 0 { if fs.SizeSuffix(size)%partSize != 0 {
@ -2037,8 +2037,13 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn
ui: ui, ui: ui,
o: o, o: o,
} }
info = fs.ChunkWriterInfo{
ChunkSize: int64(partSize),
Concurrency: o.fs.opt.UploadConcurrency,
//LeavePartsOnError: o.fs.opt.LeavePartsOnError,
}
fs.Debugf(o, "open chunk writer: started multipart upload") fs.Debugf(o, "open chunk writer: started multipart upload")
return int64(partSize), chunkWriter, nil return info, chunkWriter, nil
} }
// WriteChunk will write chunk number with reader bytes, where chunk number >= 0 // WriteChunk will write chunk number with reader bytes, where chunk number >= 0
@ -2165,9 +2170,7 @@ var warnStreamUpload sync.Once
func (o *Object) uploadMultipart(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (ui uploadInfo, err error) { func (o *Object) uploadMultipart(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (ui uploadInfo, err error) {
chunkWriter, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{ chunkWriter, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{
Open: o.fs, Open: o.fs,
Concurrency: o.fs.opt.UploadConcurrency,
OpenOptions: options, OpenOptions: options,
//LeavePartsOnError: o.fs.opt.LeavePartsOnError,
}) })
if err != nil { if err != nil {
return ui, err return ui, err

View File

@ -1897,9 +1897,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
} else if size > int64(o.fs.opt.UploadCutoff) { } else if size > int64(o.fs.opt.UploadCutoff) {
_, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{ _, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{
Open: o.fs, Open: o.fs,
Concurrency: o.fs.opt.UploadConcurrency,
OpenOptions: options, OpenOptions: options,
//LeavePartsOnError: o.fs.opt.LeavePartsOnError,
}) })
return err return err
} }
@ -2013,13 +2011,13 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
// //
// Pass in the remote and the src object // Pass in the remote and the src object
// You can also use options to hint at the desired chunk size // You can also use options to hint at the desired chunk size
func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (chunkSizeResult int64, writer fs.ChunkWriter, err error) { func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) {
// FIXME what if file is smaller than 1 chunk? // FIXME what if file is smaller than 1 chunk?
if f.opt.Versions { if f.opt.Versions {
return -1, nil, errNotWithVersions return info, nil, errNotWithVersions
} }
if f.opt.VersionAt.IsSet() { if f.opt.VersionAt.IsSet() {
return -1, nil, errNotWithVersionAt return info, nil, errNotWithVersionAt
} }
//size := src.Size() //size := src.Size()
@ -2032,11 +2030,16 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn
bucket, _ := o.split() bucket, _ := o.split()
err = f.makeBucket(ctx, bucket) err = f.makeBucket(ctx, bucket)
if err != nil { if err != nil {
return -1, nil, err return info, nil, err
} }
info = fs.ChunkWriterInfo{
ChunkSize: int64(f.opt.ChunkSize),
Concurrency: o.fs.opt.UploadConcurrency,
//LeavePartsOnError: o.fs.opt.LeavePartsOnError,
}
up, err := f.newLargeUpload(ctx, o, nil, src, f.opt.ChunkSize, false, nil) up, err := f.newLargeUpload(ctx, o, nil, src, f.opt.ChunkSize, false, nil)
return int64(f.opt.ChunkSize), up, err return info, up, err
} }
// Remove an object // Remove an object

View File

@ -53,10 +53,8 @@ type objectChunkWriter struct {
func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, options ...fs.OpenOption) error { func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, options ...fs.OpenOption) error {
_, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{ _, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{
Open: o.fs, Open: o.fs,
Concurrency: o.fs.opt.UploadConcurrency, OpenOptions: options,
LeavePartsOnError: o.fs.opt.LeavePartsOnError,
OpenOptions: options,
}) })
return err return err
} }
@ -69,7 +67,7 @@ func (f *Fs) OpenChunkWriter(
ctx context.Context, ctx context.Context,
remote string, remote string,
src fs.ObjectInfo, src fs.ObjectInfo,
options ...fs.OpenOption) (chunkSizeResult int64, writer fs.ChunkWriter, err error) { options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) {
// Temporary Object under construction // Temporary Object under construction
o := &Object{ o := &Object{
fs: f, fs: f,
@ -77,7 +75,7 @@ func (f *Fs) OpenChunkWriter(
} }
ui, err := o.prepareUpload(ctx, src, options) ui, err := o.prepareUpload(ctx, src, options)
if err != nil { if err != nil {
return -1, nil, fmt.Errorf("failed to prepare upload: %w", err) return info, nil, fmt.Errorf("failed to prepare upload: %w", err)
} }
uploadParts := f.opt.MaxUploadParts uploadParts := f.opt.MaxUploadParts
@ -105,7 +103,7 @@ func (f *Fs) OpenChunkWriter(
uploadID, existingParts, err := o.createMultipartUpload(ctx, ui.req) uploadID, existingParts, err := o.createMultipartUpload(ctx, ui.req)
if err != nil { if err != nil {
return -1, nil, fmt.Errorf("create multipart upload request failed: %w", err) return info, nil, fmt.Errorf("create multipart upload request failed: %w", err)
} }
bucketName, bucketPath := o.split() bucketName, bucketPath := o.split()
chunkWriter := &objectChunkWriter{ chunkWriter := &objectChunkWriter{
@ -119,8 +117,13 @@ func (f *Fs) OpenChunkWriter(
ui: ui, ui: ui,
o: o, o: o,
} }
info = fs.ChunkWriterInfo{
ChunkSize: int64(chunkSize),
Concurrency: o.fs.opt.UploadConcurrency,
LeavePartsOnError: o.fs.opt.LeavePartsOnError,
}
fs.Debugf(o, "open chunk writer: started multipart upload: %v", uploadID) fs.Debugf(o, "open chunk writer: started multipart upload: %v", uploadID)
return int64(chunkSize), chunkWriter, err return info, chunkWriter, err
} }
// WriteChunk will write chunk number with reader bytes, where chunk number >= 0 // WriteChunk will write chunk number with reader bytes, where chunk number >= 0

View File

@ -3115,7 +3115,6 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
if err != nil { if err != nil {
return nil, err return nil, err
} }
fs.Debugf(nil, "name = %q, root = %q, opt = %#v", name, root, opt)
err = checkUploadChunkSize(opt.ChunkSize) err = checkUploadChunkSize(opt.ChunkSize)
if err != nil { if err != nil {
return nil, fmt.Errorf("s3: chunk size: %w", err) return nil, fmt.Errorf("s3: chunk size: %w", err)
@ -5317,7 +5316,7 @@ type s3ChunkWriter struct {
// //
// Pass in the remote and the src object // Pass in the remote and the src object
// You can also use options to hint at the desired chunk size // You can also use options to hint at the desired chunk size
func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (chunkSizeResult int64, writer fs.ChunkWriter, err error) { func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) {
// Temporary Object under construction // Temporary Object under construction
o := &Object{ o := &Object{
fs: f, fs: f,
@ -5325,7 +5324,7 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn
} }
ui, err := o.prepareUpload(ctx, src, options) ui, err := o.prepareUpload(ctx, src, options)
if err != nil { if err != nil {
return -1, nil, fmt.Errorf("failed to prepare upload: %w", err) return info, nil, fmt.Errorf("failed to prepare upload: %w", err)
} }
//structs.SetFrom(&mReq, req) //structs.SetFrom(&mReq, req)
@ -5361,7 +5360,7 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn
return f.shouldRetry(ctx, err) return f.shouldRetry(ctx, err)
}) })
if err != nil { if err != nil {
return -1, nil, fmt.Errorf("create multipart upload failed: %w", err) return info, nil, fmt.Errorf("create multipart upload failed: %w", err)
} }
chunkWriter := &s3ChunkWriter{ chunkWriter := &s3ChunkWriter{
@ -5376,8 +5375,13 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn
ui: ui, ui: ui,
o: o, o: o,
} }
info = fs.ChunkWriterInfo{
ChunkSize: int64(chunkSize),
Concurrency: o.fs.opt.UploadConcurrency,
LeavePartsOnError: o.fs.opt.LeavePartsOnError,
}
fs.Debugf(o, "open chunk writer: started multipart upload: %v", *mOut.UploadId) fs.Debugf(o, "open chunk writer: started multipart upload: %v", *mOut.UploadId)
return int64(chunkSize), chunkWriter, err return info, chunkWriter, err
} }
// add a part number and etag to the completed parts // add a part number and etag to the completed parts
@ -5527,10 +5531,8 @@ func (w *s3ChunkWriter) Close(ctx context.Context) (err error) {
func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, options ...fs.OpenOption) (wantETag, gotETag string, versionID *string, ui uploadInfo, err error) { func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, options ...fs.OpenOption) (wantETag, gotETag string, versionID *string, ui uploadInfo, err error) {
chunkWriter, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{ chunkWriter, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{
Open: o.fs, Open: o.fs,
Concurrency: o.fs.opt.UploadConcurrency, OpenOptions: options,
LeavePartsOnError: o.fs.opt.LeavePartsOnError,
OpenOptions: options,
}) })
if err != nil { if err != nil {
return wantETag, gotETag, versionID, ui, err return wantETag, gotETag, versionID, ui, err

View File

@ -1521,12 +1521,12 @@ This command line flag allows you to override that computed default.
### --multi-thread-write-buffer-size=SIZE ### ### --multi-thread-write-buffer-size=SIZE ###
When downloading with multiple threads, rclone will buffer SIZE bytes in When transferring with multiple threads, rclone will buffer SIZE bytes
memory before writing to disk for each thread. in memory before writing to disk for each thread.
This can improve performance if the underlying filesystem does not deal This can improve performance if the underlying filesystem does not deal
well with a lot of small writes in different positions of the file, so well with a lot of small writes in different positions of the file, so
if you see downloads being limited by disk write speed, you might want if you see transfers being limited by disk write speed, you might want
to experiment with different values. Specially for magnetic drives and to experiment with different values. Specially for magnetic drives and
remote file systems a higher value can be useful. remote file systems a higher value can be useful.
@ -1540,22 +1540,23 @@ multiples of 16k performed much better than other values.
### --multi-thread-chunk-size=SizeSuffix ### ### --multi-thread-chunk-size=SizeSuffix ###
Normally the chunk size for multi thread copies is set by the backend. Normally the chunk size for multi thread transfers is set by the backend.
However some backends such as `local` and `smb` (which implement However some backends such as `local` and `smb` (which implement `OpenWriterAt`
`OpenWriterAt` but not `OpenChunkWriter`) don't have a natural chunk but not `OpenChunkWriter`) don't have a natural chunk size.
size.
In this case the value of this option is used (default 64Mi). In this case the value of this option is used (default 64Mi).
### --multi-thread-cutoff=SIZE ### ### --multi-thread-cutoff=SIZE ###
When transferring files to capable backends above this size, rclone When transferring files above SIZE to capable backends, rclone will
will use multiple threads to download the file (default 256M). use multiple threads to transfer the file (default 256M).
Capable backends are marked in the Capable backends are marked in the
[overview](/overview/#optional-features) as `MultithreadUpload`. (They [overview](/overview/#optional-features) as `MultithreadUpload`. (They
need to implement either `OpenWriterAt` or `OpenChunkedWriter`). These need to implement either the `OpenWriterAt` or `OpenChunkedWriter`
include include, `local`, `s3`, `azureblob`, `b2` and `smb`. internal interfaces). These include include, `local`, `s3`,
`azureblob`, `b2`, `oracleobjectstorage` and `smb` at the time of
writing.
On the local disk, rclone preallocates the file (using On the local disk, rclone preallocates the file (using
`fallocate(FALLOC_FL_KEEP_SIZE)` on unix or `NTSetInformationFile` on `fallocate(FALLOC_FL_KEEP_SIZE)` on unix or `NTSetInformationFile` on
@ -1574,8 +1575,8 @@ This will work with the `sync`/`copy`/`move` commands and friends
mount` and `rclone serve` if `--vfs-cache-mode` is set to `writes` or mount` and `rclone serve` if `--vfs-cache-mode` is set to `writes` or
above. above.
**NB** that this **only** works supported backends as the destination **NB** that this **only** works with supported backends as the
but will work with any backend as the source. destination but will work with any backend as the source.
**NB** that multi-thread copies are disabled for local to local copies **NB** that multi-thread copies are disabled for local to local copies
as they are faster without unless `--multi-thread-streams` is set as they are faster without unless `--multi-thread-streams` is set
@ -1584,14 +1585,19 @@ explicitly.
**NB** on Windows using multi-thread transfers to the local disk will **NB** on Windows using multi-thread transfers to the local disk will
cause the resulting files to be [sparse](https://en.wikipedia.org/wiki/Sparse_file). cause the resulting files to be [sparse](https://en.wikipedia.org/wiki/Sparse_file).
Use `--local-no-sparse` to disable sparse files (which may cause long Use `--local-no-sparse` to disable sparse files (which may cause long
delays at the start of downloads) or disable multi-thread downloads delays at the start of transfers) or disable multi-thread transfers
with `--multi-thread-streams 0` with `--multi-thread-streams 0`
### --multi-thread-streams=N ### ### --multi-thread-streams=N ###
When using multi thread downloads (see above `--multi-thread-cutoff`) When using multi thread transfers (see above `--multi-thread-cutoff`)
this sets the number of streams to use. Set to `0` to disable multi this sets the number of streams to use. Set to `0` to disable multi
thread downloads (Default 4). thread transfers (Default 4).
If the backend has a `--backend-upload-concurrency` setting (eg
`--s3-upload-concurrency`) then this setting will be used as the
number of transfers instead if it is larger than the value of
`--multi-thread-streams` or `--multi-thread-streams` isn't set.
### --no-check-dest ### ### --no-check-dest ###

View File

@ -155,7 +155,7 @@ type Features struct {
// Pass in the remote and the src object // Pass in the remote and the src object
// You can also use options to hint at the desired chunk size // You can also use options to hint at the desired chunk size
// //
OpenChunkWriter func(ctx context.Context, remote string, src ObjectInfo, options ...OpenOption) (chunkSize int64, writer ChunkWriter, err error) OpenChunkWriter func(ctx context.Context, remote string, src ObjectInfo, options ...OpenOption) (info ChunkWriterInfo, writer ChunkWriter, err error)
// UserInfo returns info about the connected user // UserInfo returns info about the connected user
UserInfo func(ctx context.Context) (map[string]string, error) UserInfo func(ctx context.Context) (map[string]string, error)
@ -639,17 +639,24 @@ type OpenWriterAter interface {
// OpenWriterAtFn describes the OpenWriterAt function pointer // OpenWriterAtFn describes the OpenWriterAt function pointer
type OpenWriterAtFn func(ctx context.Context, remote string, size int64) (WriterAtCloser, error) type OpenWriterAtFn func(ctx context.Context, remote string, size int64) (WriterAtCloser, error)
// ChunkWriterInfo describes how a backend would like ChunkWriter called
type ChunkWriterInfo struct {
ChunkSize int64 // preferred chunk size
Concurrency int // how many chunks to write at once
LeavePartsOnError bool // if set don't delete parts uploaded so far on error
}
// OpenChunkWriter is an option interface for Fs to implement chunked writing // OpenChunkWriter is an option interface for Fs to implement chunked writing
type OpenChunkWriter interface { type OpenChunkWriter interface {
// OpenChunkWriter returns the chunk size and a ChunkWriter // OpenChunkWriter returns the chunk size and a ChunkWriter
// //
// Pass in the remote and the src object // Pass in the remote and the src object
// You can also use options to hint at the desired chunk size // You can also use options to hint at the desired chunk size
OpenChunkWriter(ctx context.Context, remote string, src ObjectInfo, options ...OpenOption) (chunkSize int64, writer ChunkWriter, err error) OpenChunkWriter(ctx context.Context, remote string, src ObjectInfo, options ...OpenOption) (info ChunkWriterInfo, writer ChunkWriter, err error)
} }
// OpenChunkWriterFn describes the OpenChunkWriter function pointer // OpenChunkWriterFn describes the OpenChunkWriter function pointer
type OpenChunkWriterFn func(ctx context.Context, remote string, src ObjectInfo, options ...OpenOption) (chunkSize int64, writer ChunkWriter, err error) type OpenChunkWriterFn func(ctx context.Context, remote string, src ObjectInfo, options ...OpenOption) (info ChunkWriterInfo, writer ChunkWriter, err error)
// ChunkWriter is returned by OpenChunkWriter to implement chunked writing // ChunkWriter is returned by OpenChunkWriter to implement chunked writing
type ChunkWriter interface { type ChunkWriter interface {

View File

@ -58,7 +58,6 @@ type multiThreadCopyState struct {
size int64 size int64
src fs.Object src fs.Object
acc *accounting.Account acc *accounting.Account
streams int
numChunks int numChunks int
noSeek bool // set if sure the receiving fs won't seek the input noSeek bool // set if sure the receiving fs won't seek the input
} }
@ -128,7 +127,7 @@ func calculateNumChunks(size int64, chunkSize int64) int {
// Copy src to (f, remote) using streams download threads. It tries to use the OpenChunkWriter feature // Copy src to (f, remote) using streams download threads. It tries to use the OpenChunkWriter feature
// and if that's not available it creates an adapter using OpenWriterAt // and if that's not available it creates an adapter using OpenWriterAt
func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, streams int, tr *accounting.Transfer) (newDst fs.Object, err error) { func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, concurrency int, tr *accounting.Transfer) (newDst fs.Object, err error) {
openChunkWriter := f.Features().OpenChunkWriter openChunkWriter := f.Features().OpenChunkWriter
ci := fs.GetConfig(ctx) ci := fs.GetConfig(ctx)
noseek := false noseek := false
@ -149,47 +148,61 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object,
return nil, fmt.Errorf("multi-thread copy: can't copy zero sized file") return nil, fmt.Errorf("multi-thread copy: can't copy zero sized file")
} }
g, gCtx := errgroup.WithContext(ctx) info, chunkWriter, err := openChunkWriter(ctx, remote, src)
g.SetLimit(streams)
chunkSize, chunkWriter, err := openChunkWriter(ctx, remote, src)
if err != nil { if err != nil {
return nil, fmt.Errorf("multi-thread copy: failed to open chunk writer: %w", err) return nil, fmt.Errorf("multi-thread copy: failed to open chunk writer: %w", err)
} }
uploadCtx, cancel := context.WithCancel(ctx)
defer cancel()
defer atexit.OnError(&err, func() { defer atexit.OnError(&err, func() {
fs.Debugf(src, "multi-thread copy: aborting transfer on exit") cancel()
if info.LeavePartsOnError {
return
}
fs.Debugf(src, "multi-thread copy: cancelling transfer on exit")
abortErr := chunkWriter.Abort(ctx) abortErr := chunkWriter.Abort(ctx)
if abortErr != nil { if abortErr != nil {
fs.Debugf(src, "multi-thread copy: abort failed: %v", abortErr) fs.Debugf(src, "multi-thread copy: abort failed: %v", abortErr)
} }
})() })()
if chunkSize > src.Size() { if info.ChunkSize > src.Size() {
fs.Debugf(src, "multi-thread copy: chunk size %v was bigger than source file size %v", fs.SizeSuffix(chunkSize), fs.SizeSuffix(src.Size())) fs.Debugf(src, "multi-thread copy: chunk size %v was bigger than source file size %v", fs.SizeSuffix(info.ChunkSize), fs.SizeSuffix(src.Size()))
chunkSize = src.Size() info.ChunkSize = src.Size()
} }
numChunks := calculateNumChunks(src.Size(), chunkSize) numChunks := calculateNumChunks(src.Size(), info.ChunkSize)
if streams > numChunks { if concurrency > numChunks {
fs.Debugf(src, "multi-thread copy: number of streams %d was bigger than number of chunks %d", streams, numChunks) fs.Debugf(src, "multi-thread copy: number of streams %d was bigger than number of chunks %d", concurrency, numChunks)
streams = numChunks concurrency = numChunks
} }
// Use the backend concurrency if it is higher than --multi-thread-streams or if --multi-thread-streams wasn't set explicitly
if !ci.MultiThreadSet || info.Concurrency > concurrency {
fs.Debugf(src, "multi-thread copy: using backend concurrency of %d instead of --multi-thread-streams %d", info.Concurrency, concurrency)
concurrency = info.Concurrency
}
if concurrency < 1 {
concurrency = 1
}
g, gCtx := errgroup.WithContext(uploadCtx)
g.SetLimit(concurrency)
mc := &multiThreadCopyState{ mc := &multiThreadCopyState{
ctx: gCtx, ctx: gCtx,
size: src.Size(), size: src.Size(),
src: src, src: src,
partSize: chunkSize, partSize: info.ChunkSize,
streams: streams,
numChunks: numChunks, numChunks: numChunks,
noSeek: noseek, noSeek: noseek,
} }
// Make accounting // Make accounting
mc.acc = tr.Account(ctx, nil) mc.acc = tr.Account(gCtx, nil)
fs.Debugf(src, "Starting multi-thread copy with %d chunks of size %v with %v parallel streams", mc.numChunks, fs.SizeSuffix(mc.partSize), mc.streams) fs.Debugf(src, "Starting multi-thread copy with %d chunks of size %v with %v parallel streams", mc.numChunks, fs.SizeSuffix(mc.partSize), concurrency)
for chunk := 0; chunk < mc.numChunks; chunk++ { for chunk := 0; chunk < mc.numChunks; chunk++ {
// Fail fast, in case an errgroup managed function returns an error // Fail fast, in case an errgroup managed function returns an error
if gCtx.Err() != nil { if gCtx.Err() != nil {
@ -307,10 +320,12 @@ func (w writerAtChunkWriter) Abort(ctx context.Context) error {
// openChunkWriterFromOpenWriterAt adapts an OpenWriterAtFn into an OpenChunkWriterFn using chunkSize and writeBufferSize // openChunkWriterFromOpenWriterAt adapts an OpenWriterAtFn into an OpenChunkWriterFn using chunkSize and writeBufferSize
func openChunkWriterFromOpenWriterAt(openWriterAt fs.OpenWriterAtFn, chunkSize int64, writeBufferSize int64, f fs.Fs) fs.OpenChunkWriterFn { func openChunkWriterFromOpenWriterAt(openWriterAt fs.OpenWriterAtFn, chunkSize int64, writeBufferSize int64, f fs.Fs) fs.OpenChunkWriterFn {
return func(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (chunkSizeResult int64, writer fs.ChunkWriter, err error) { return func(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) {
ci := fs.GetConfig(ctx)
writerAt, err := openWriterAt(ctx, remote, src.Size()) writerAt, err := openWriterAt(ctx, remote, src.Size())
if err != nil { if err != nil {
return -1, nil, err return info, nil, err
} }
if writeBufferSize > 0 { if writeBufferSize > 0 {
@ -326,7 +341,10 @@ func openChunkWriterFromOpenWriterAt(openWriterAt fs.OpenWriterAtFn, chunkSize i
writeBufferSize: writeBufferSize, writeBufferSize: writeBufferSize,
f: f, f: f,
} }
info = fs.ChunkWriterInfo{
return chunkSize, chunkWriter, nil ChunkSize: chunkSize,
Concurrency: ci.MultiThreadStreams,
}
return info, chunkWriter, nil
} }
} }

View File

@ -45,10 +45,8 @@ func NewRW() *pool.RW {
// UploadMultipartOptions options for the generic multipart upload // UploadMultipartOptions options for the generic multipart upload
type UploadMultipartOptions struct { type UploadMultipartOptions struct {
Open fs.OpenChunkWriter // thing to call OpenChunkWriter on Open fs.OpenChunkWriter // thing to call OpenChunkWriter on
OpenOptions []fs.OpenOption // options for OpenChunkWriter OpenOptions []fs.OpenOption // options for OpenChunkWriter
Concurrency int // number of simultaneous uploads to do
LeavePartsOnError bool // if set don't delete parts uploaded so far on error
} }
// UploadMultipart does a generic multipart upload from src using f as OpenChunkWriter. // UploadMultipart does a generic multipart upload from src using f as OpenChunkWriter.
@ -57,22 +55,23 @@ type UploadMultipartOptions struct {
// //
// It returns the chunkWriter used in case the caller needs to extract any private info from it. // It returns the chunkWriter used in case the caller needs to extract any private info from it.
func UploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, opt UploadMultipartOptions) (chunkWriterOut fs.ChunkWriter, err error) { func UploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, opt UploadMultipartOptions) (chunkWriterOut fs.ChunkWriter, err error) {
chunkSize, chunkWriter, err := opt.Open.OpenChunkWriter(ctx, src.Remote(), src, opt.OpenOptions...) info, chunkWriter, err := opt.Open.OpenChunkWriter(ctx, src.Remote(), src, opt.OpenOptions...)
if err != nil { if err != nil {
return nil, fmt.Errorf("multipart upload failed to initialise: %w", err) return nil, fmt.Errorf("multipart upload failed to initialise: %w", err)
} }
// make concurrency machinery // make concurrency machinery
concurrency := opt.Concurrency concurrency := info.Concurrency
if concurrency < 1 { if concurrency < 1 {
concurrency = 1 concurrency = 1
} }
tokens := pacer.NewTokenDispenser(concurrency) tokens := pacer.NewTokenDispenser(concurrency)
uploadCtx, cancel := context.WithCancel(ctx) uploadCtx, cancel := context.WithCancel(ctx)
defer cancel()
defer atexit.OnError(&err, func() { defer atexit.OnError(&err, func() {
cancel() cancel()
if opt.LeavePartsOnError { if info.LeavePartsOnError {
return return
} }
fs.Debugf(src, "Cancelling multipart upload") fs.Debugf(src, "Cancelling multipart upload")
@ -83,10 +82,11 @@ func UploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, opt U
})() })()
var ( var (
g, gCtx = errgroup.WithContext(uploadCtx) g, gCtx = errgroup.WithContext(uploadCtx)
finished = false finished = false
off int64 off int64
size = src.Size() size = src.Size()
chunkSize = info.ChunkSize
) )
// Do the accounting manually // Do the accounting manually