operations: fix and tidy multithread code

- fix docs and error messages for multithread
- use sync/errgroup built in concurrency limiting
- re-arrange multithread code
- don't continue multi-thread uploads if one part fails
This commit is contained in:
Nick Craig-Wood 2023-08-14 18:05:19 +01:00
parent e6fde67491
commit f3bd02f0ef

View File

@ -11,34 +11,12 @@ import (
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/lib/readers"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
const (
multithreadChunkSize = 64 << 10
)
// An offsetWriter maps writes at offset base to offset base+off in the underlying writer.
//
// Modified from the go source code. Can be replaced with
// io.OffsetWriter when we no longer need to support go1.19
type offsetWriter struct {
w io.WriterAt
off int64 // the current offset
}
// newOffsetWriter returns an offsetWriter that writes to w
// starting at offset off.
func newOffsetWriter(w io.WriterAt, off int64) *offsetWriter {
return &offsetWriter{w, off}
}
func (o *offsetWriter) Write(p []byte) (n int, err error) {
n, err = o.w.WriteAt(p, o.off)
o.off += int64(n)
return
}
// Return a boolean as to whether we should use multi thread copy for
// this transfer
func doMultiThreadCopy(ctx context.Context, f fs.Fs, src fs.Object) bool {
@ -102,10 +80,12 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int, writ
rc, err := Open(ctx, mc.src, &fs.RangeOption{Start: start, End: end - 1})
if err != nil {
return fmt.Errorf("multipart copy: failed to open source: %w", err)
return fmt.Errorf("multi-thread copy: failed to open source: %w", err)
}
defer fs.CheckClose(rc, &err)
// FIXME NewRepeatableReader is allocating - need to be more careful with the memory allocations
// Also allocating for copy to local which doesn't need it
bytesWritten, err := writer.WriteChunk(stream, readers.NewRepeatableReader(rc))
if err != nil {
return err
@ -128,7 +108,6 @@ func calculateNumChunks(size int64, chunkSize int64) int {
if size%chunkSize != 0 {
numChunks++
}
return int(numChunks)
}
@ -140,7 +119,7 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object,
if openChunkWriter == nil {
openWriterAt := f.Features().OpenWriterAt
if openWriterAt == nil {
return nil, errors.New("multi-part copy: neither OpenChunkWriter nor OpenWriterAt supported")
return nil, errors.New("multi-thread copy: neither OpenChunkWriter nor OpenWriterAt supported")
}
openChunkWriter = openChunkWriterFromOpenWriterAt(openWriterAt, int64(ci.MultiThreadChunkSize), int64(ci.MultiThreadWriteBufferSize), f)
}
@ -153,7 +132,12 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object,
}
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(streams)
chunkSize, chunkWriter, err := openChunkWriter(ctx, remote, src)
if err != nil {
return nil, fmt.Errorf("multi-thread copy: failed to open chunk writer: %w", err)
}
if chunkSize > src.Size() {
fs.Debugf(src, "multi-thread copy: chunk size %v was bigger than source file size %v", fs.SizeSuffix(chunkSize), fs.SizeSuffix(src.Size()))
@ -175,25 +159,18 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object,
numChunks: numChunks,
}
if err != nil {
return nil, fmt.Errorf("multipart copy: failed to open chunk writer: %w", err)
}
// Make accounting
mc.acc = tr.Account(ctx, nil)
fs.Debugf(src, "Starting multi-thread copy with %d parts of size %v with %v parallel streams", mc.numChunks, fs.SizeSuffix(mc.partSize), mc.streams)
sem := semaphore.NewWeighted(int64(mc.streams))
for chunk := 0; chunk < mc.numChunks; chunk++ {
fs.Debugf(src, "Acquiring semaphore...")
if err := sem.Acquire(ctx, 1); err != nil {
fs.Errorf(src, "Failed to acquire semaphore: %v", err)
// Fail fast, in case an errgroup managed function returns an error
if gCtx.Err() != nil {
break
}
currChunk := chunk
g.Go(func() (err error) {
defer sem.Release(1)
return mc.copyStream(gCtx, currChunk, chunkWriter)
chunk := chunk
g.Go(func() error {
return mc.copyStream(gCtx, chunk, chunkWriter)
})
}
@ -224,6 +201,28 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object,
return obj, nil
}
// An offsetWriter maps writes at offset base to offset base+off in the underlying writer.
//
// Modified from the go source code. Can be replaced with
// io.OffsetWriter when we no longer need to support go1.19
type offsetWriter struct {
w io.WriterAt
off int64 // the current offset
}
// newOffsetWriter returns an offsetWriter that writes to w
// starting at offset off.
func newOffsetWriter(w io.WriterAt, off int64) *offsetWriter {
return &offsetWriter{w, off}
}
func (o *offsetWriter) Write(p []byte) (n int, err error) {
n, err = o.w.WriteAt(p, o.off)
o.off += int64(n)
return
}
// writerAtChunkWriter converts a WriterAtCloser into a ChunkWriter
type writerAtChunkWriter struct {
ctx context.Context
remote string
@ -235,6 +234,7 @@ type writerAtChunkWriter struct {
f fs.Fs
}
// WriteChunk writes chunkNumber from reader
func (w writerAtChunkWriter) WriteChunk(chunkNumber int, reader io.ReadSeeker) (int64, error) {
fs.Debugf(w.remote, "writing chunk %v", chunkNumber)
@ -254,21 +254,23 @@ func (w writerAtChunkWriter) WriteChunk(chunkNumber int, reader io.ReadSeeker) (
if n != bytesToWrite {
return -1, fmt.Errorf("expected to write %v bytes for chunk %v, but wrote %v bytes", bytesToWrite, chunkNumber, n)
}
// if we were buffering, flush do disk
// if we were buffering, flush to disk
switch w := writer.(type) {
case *bufio.Writer:
er2 := w.Flush()
if er2 != nil {
return -1, fmt.Errorf("multipart copy: flush failed: %w", err)
return -1, fmt.Errorf("multi-thread copy: flush failed: %w", err)
}
}
return n, nil
}
// Close the chunk writing
func (w writerAtChunkWriter) Close() error {
return w.writerAt.Close()
}
// Abort the chunk writing
func (w writerAtChunkWriter) Abort() error {
obj, err := w.f.NewObject(w.ctx, w.remote)
if err != nil {