mirror of
https://github.com/rclone/rclone.git
synced 2025-01-20 21:28:41 +01:00
d068e0b1a9
Before this change backends which supported more than one hash (eg
pcloud) or backends which wrapped backends supporting more than one
hash (combine) would fail the TestMultithreadCopy and
TestMultithreadCopyAbort with an error like
Failed to make new multi hasher: requested set 000001ff contains unknown hash types
This was caused by the tests limiting the globally available hashes to
the first hash supplied by the backend.
This was added in this commit
d5d28a7513
operations: fix overwrite of destination when multi-thread transfer fails
to overcome the tests taking >100s on the local backend because they
made every single hash that the local backend. It brought this time
down to 20s.
This commit fixes the problem and retains the CPU speedup by only
applying the fix from the original commit if the destination backend
is the local backend. This fixes the common case (testing on the local
backend). This does not fix the problem for a backend which wraps the
local backend (eg combine) but this is run only on the integration
test machine and not on all the CI.
336 lines
10 KiB
Go
336 lines
10 KiB
Go
package operations
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/rclone/rclone/fs/accounting"
|
|
"github.com/rclone/rclone/fs/hash"
|
|
"github.com/rclone/rclone/fs/object"
|
|
"github.com/rclone/rclone/fstest/mockfs"
|
|
"github.com/rclone/rclone/fstest/mockobject"
|
|
"github.com/rclone/rclone/lib/random"
|
|
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fstest"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestDoMultiThreadCopy(t *testing.T) {
|
|
ctx := context.Background()
|
|
ci := fs.GetConfig(ctx)
|
|
f, err := mockfs.NewFs(ctx, "potato", "", nil)
|
|
require.NoError(t, err)
|
|
src := mockobject.New("file.txt").WithContent([]byte(random.String(100)), mockobject.SeekModeNone)
|
|
srcFs, err := mockfs.NewFs(ctx, "sausage", "", nil)
|
|
require.NoError(t, err)
|
|
src.SetFs(srcFs)
|
|
|
|
oldStreams := ci.MultiThreadStreams
|
|
oldCutoff := ci.MultiThreadCutoff
|
|
oldIsSet := ci.MultiThreadSet
|
|
defer func() {
|
|
ci.MultiThreadStreams = oldStreams
|
|
ci.MultiThreadCutoff = oldCutoff
|
|
ci.MultiThreadSet = oldIsSet
|
|
}()
|
|
|
|
ci.MultiThreadStreams, ci.MultiThreadCutoff = 4, 50
|
|
ci.MultiThreadSet = false
|
|
|
|
nullWriterAt := func(ctx context.Context, remote string, size int64) (fs.WriterAtCloser, error) {
|
|
panic("don't call me")
|
|
}
|
|
f.Features().OpenWriterAt = nullWriterAt
|
|
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
|
|
ci.MultiThreadStreams = 0
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
ci.MultiThreadStreams = 1
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
ci.MultiThreadStreams = 2
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
|
|
ci.MultiThreadCutoff = 200
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
ci.MultiThreadCutoff = 101
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
ci.MultiThreadCutoff = 100
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
|
|
f.Features().OpenWriterAt = nil
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
f.Features().OpenWriterAt = nullWriterAt
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
|
|
f.Features().IsLocal = true
|
|
srcFs.Features().IsLocal = true
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
ci.MultiThreadSet = true
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
ci.MultiThreadSet = false
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
srcFs.Features().IsLocal = false
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
srcFs.Features().IsLocal = true
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
f.Features().IsLocal = false
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
srcFs.Features().IsLocal = false
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
|
|
srcFs.Features().NoMultiThreading = true
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
srcFs.Features().NoMultiThreading = false
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
}
|
|
|
|
func TestMultithreadCalculateNumChunks(t *testing.T) {
|
|
for _, test := range []struct {
|
|
size int64
|
|
chunkSize int64
|
|
wantNumChunks int
|
|
}{
|
|
{size: 1, chunkSize: multithreadChunkSize, wantNumChunks: 1},
|
|
{size: 1 << 20, chunkSize: 1, wantNumChunks: 1 << 20},
|
|
{size: 1 << 20, chunkSize: 2, wantNumChunks: 1 << 19},
|
|
{size: (1 << 20) + 1, chunkSize: 2, wantNumChunks: (1 << 19) + 1},
|
|
{size: (1 << 20) - 1, chunkSize: 2, wantNumChunks: 1 << 19},
|
|
} {
|
|
t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) {
|
|
mc := &multiThreadCopyState{
|
|
size: test.size,
|
|
}
|
|
mc.numChunks = calculateNumChunks(test.size, test.chunkSize)
|
|
assert.Equal(t, test.wantNumChunks, mc.numChunks)
|
|
})
|
|
}
|
|
}
|
|
|
|
// Skip if not multithread, returning the chunkSize otherwise
|
|
func skipIfNotMultithread(ctx context.Context, t *testing.T, r *fstest.Run) int {
|
|
features := r.Fremote.Features()
|
|
if features.OpenChunkWriter == nil && features.OpenWriterAt == nil {
|
|
t.Skip("multithread writing not supported")
|
|
}
|
|
|
|
// Only support one hash for the local backend otherwise we end up spending a huge amount of CPU on hashing!
|
|
if r.Fremote.Features().IsLocal {
|
|
oldHashes := hash.SupportOnly([]hash.Type{r.Fremote.Hashes().GetOne()})
|
|
t.Cleanup(func() {
|
|
_ = hash.SupportOnly(oldHashes)
|
|
})
|
|
}
|
|
|
|
ci := fs.GetConfig(ctx)
|
|
chunkSize := int(ci.MultiThreadChunkSize)
|
|
if features.OpenChunkWriter != nil {
|
|
//OpenChunkWriter func(ctx context.Context, remote string, src ObjectInfo, options ...OpenOption) (info ChunkWriterInfo, writer ChunkWriter, err error)
|
|
const fileName = "chunksize-probe"
|
|
src := object.NewStaticObjectInfo(fileName, time.Now(), int64(100*fs.Mebi), true, nil, nil)
|
|
info, writer, err := features.OpenChunkWriter(ctx, fileName, src)
|
|
require.NoError(t, err)
|
|
chunkSize = int(info.ChunkSize)
|
|
err = writer.Abort(ctx)
|
|
require.NoError(t, err)
|
|
}
|
|
return chunkSize
|
|
}
|
|
|
|
func TestMultithreadCopy(t *testing.T) {
|
|
r := fstest.NewRun(t)
|
|
ctx := context.Background()
|
|
chunkSize := skipIfNotMultithread(ctx, t, r)
|
|
// Check every other transfer for metadata
|
|
checkMetadata := false
|
|
ctx, ci := fs.AddConfig(ctx)
|
|
|
|
for _, upload := range []bool{false, true} {
|
|
for _, test := range []struct {
|
|
size int
|
|
streams int
|
|
}{
|
|
{size: chunkSize*2 - 1, streams: 2},
|
|
{size: chunkSize * 2, streams: 2},
|
|
{size: chunkSize*2 + 1, streams: 2},
|
|
} {
|
|
checkMetadata = !checkMetadata
|
|
ci.Metadata = checkMetadata
|
|
fileName := fmt.Sprintf("test-multithread-copy-%v-%d-%d", upload, test.size, test.streams)
|
|
t.Run(fmt.Sprintf("upload=%v,size=%v,streams=%v", upload, test.size, test.streams), func(t *testing.T) {
|
|
if *fstest.SizeLimit > 0 && int64(test.size) > *fstest.SizeLimit {
|
|
t.Skipf("exceeded file size limit %d > %d", test.size, *fstest.SizeLimit)
|
|
}
|
|
var (
|
|
contents = random.String(test.size)
|
|
t1 = fstest.Time("2001-02-03T04:05:06.499999999Z")
|
|
file1 fstest.Item
|
|
src, dst fs.Object
|
|
err error
|
|
testMetadata = fs.Metadata{
|
|
// System metadata supported by all backends
|
|
"mtime": t1.Format(time.RFC3339Nano),
|
|
// User metadata
|
|
"potato": "jersey",
|
|
}
|
|
)
|
|
|
|
var fSrc, fDst fs.Fs
|
|
if upload {
|
|
file1 = r.WriteFile(fileName, contents, t1)
|
|
r.CheckRemoteItems(t)
|
|
r.CheckLocalItems(t, file1)
|
|
fDst, fSrc = r.Fremote, r.Flocal
|
|
} else {
|
|
file1 = r.WriteObject(ctx, fileName, contents, t1)
|
|
r.CheckRemoteItems(t, file1)
|
|
r.CheckLocalItems(t)
|
|
fDst, fSrc = r.Flocal, r.Fremote
|
|
}
|
|
src, err = fSrc.NewObject(ctx, fileName)
|
|
require.NoError(t, err)
|
|
|
|
do, canSetMetadata := src.(fs.SetMetadataer)
|
|
if checkMetadata && canSetMetadata {
|
|
// Set metadata on the source if required
|
|
err := do.SetMetadata(ctx, testMetadata)
|
|
if err == fs.ErrorNotImplemented {
|
|
canSetMetadata = false
|
|
} else {
|
|
require.NoError(t, err)
|
|
fstest.CheckEntryMetadata(ctx, t, r.Flocal, src, testMetadata)
|
|
}
|
|
}
|
|
|
|
accounting.GlobalStats().ResetCounters()
|
|
tr := accounting.GlobalStats().NewTransfer(src, nil)
|
|
|
|
defer func() {
|
|
tr.Done(ctx, err)
|
|
}()
|
|
|
|
dst, err = multiThreadCopy(ctx, fDst, fileName, src, test.streams, tr)
|
|
require.NoError(t, err)
|
|
|
|
assert.Equal(t, src.Size(), dst.Size())
|
|
assert.Equal(t, fileName, dst.Remote())
|
|
fstest.CheckListingWithPrecision(t, fSrc, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, fDst, fSrc))
|
|
fstest.CheckListingWithPrecision(t, fDst, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, fDst, fSrc))
|
|
|
|
if checkMetadata && canSetMetadata && fDst.Features().ReadMetadata {
|
|
fstest.CheckEntryMetadata(ctx, t, fDst, dst, testMetadata)
|
|
}
|
|
|
|
require.NoError(t, dst.Remove(ctx))
|
|
require.NoError(t, src.Remove(ctx))
|
|
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
type errorObject struct {
|
|
fs.Object
|
|
size int64
|
|
wg *sync.WaitGroup
|
|
}
|
|
|
|
// Open opens the file for read. Call Close() on the returned io.ReadCloser
|
|
//
|
|
// Remember this is called multiple times whenever the backend seeks (eg having read checksum)
|
|
func (o errorObject) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) {
|
|
fs.Debugf(nil, "Open with options = %v", options)
|
|
rc, err := o.Object.Open(ctx, options...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Return an error reader for the second segment
|
|
for _, option := range options {
|
|
if ropt, ok := option.(*fs.RangeOption); ok {
|
|
end := ropt.End + 1
|
|
if end >= o.size {
|
|
// Give the other chunks a chance to start
|
|
time.Sleep(time.Second)
|
|
// Wait for chunks to upload first
|
|
o.wg.Wait()
|
|
fs.Debugf(nil, "Returning error reader")
|
|
return errorReadCloser{rc}, nil
|
|
}
|
|
}
|
|
}
|
|
o.wg.Add(1)
|
|
return wgReadCloser{rc, o.wg}, nil
|
|
}
|
|
|
|
type errorReadCloser struct {
|
|
io.ReadCloser
|
|
}
|
|
|
|
func (rc errorReadCloser) Read(p []byte) (n int, err error) {
|
|
fs.Debugf(nil, "BOOM: simulated read failure")
|
|
return 0, errors.New("BOOM: simulated read failure")
|
|
}
|
|
|
|
type wgReadCloser struct {
|
|
io.ReadCloser
|
|
wg *sync.WaitGroup
|
|
}
|
|
|
|
func (rc wgReadCloser) Close() (err error) {
|
|
rc.wg.Done()
|
|
return rc.ReadCloser.Close()
|
|
}
|
|
|
|
// Make sure aborting the multi-thread copy doesn't overwrite an existing file.
|
|
func TestMultithreadCopyAbort(t *testing.T) {
|
|
r := fstest.NewRun(t)
|
|
ctx := context.Background()
|
|
chunkSize := skipIfNotMultithread(ctx, t, r)
|
|
size := 2*chunkSize + 1
|
|
|
|
if *fstest.SizeLimit > 0 && int64(size) > *fstest.SizeLimit {
|
|
t.Skipf("exceeded file size limit %d > %d", size, *fstest.SizeLimit)
|
|
}
|
|
|
|
// first write a canary file which we are trying not to overwrite
|
|
const fileName = "test-multithread-abort"
|
|
contents := random.String(100)
|
|
t1 := fstest.Time("2001-02-03T04:05:06.499999999Z")
|
|
canary := r.WriteObject(ctx, fileName, contents, t1)
|
|
r.CheckRemoteItems(t, canary)
|
|
|
|
// Now write a local file to upload
|
|
file1 := r.WriteFile(fileName, random.String(size), t1)
|
|
r.CheckLocalItems(t, file1)
|
|
|
|
src, err := r.Flocal.NewObject(ctx, fileName)
|
|
require.NoError(t, err)
|
|
accounting.GlobalStats().ResetCounters()
|
|
tr := accounting.GlobalStats().NewTransfer(src, nil)
|
|
|
|
defer func() {
|
|
tr.Done(ctx, err)
|
|
}()
|
|
wg := new(sync.WaitGroup)
|
|
dst, err := multiThreadCopy(ctx, r.Fremote, fileName, errorObject{src, int64(size), wg}, 1, tr)
|
|
assert.Error(t, err)
|
|
assert.Nil(t, dst)
|
|
|
|
if r.Fremote.Features().PartialUploads {
|
|
r.CheckRemoteItems(t)
|
|
|
|
} else {
|
|
r.CheckRemoteItems(t, canary)
|
|
o, err := r.Fremote.NewObject(ctx, fileName)
|
|
require.NoError(t, err)
|
|
require.NoError(t, o.Remove(ctx))
|
|
}
|
|
}
|