diff --git a/fs/operations/copy.go b/fs/operations/copy.go new file mode 100644 index 000000000..f7a05cb04 --- /dev/null +++ b/fs/operations/copy.go @@ -0,0 +1,301 @@ +package operations + +import ( + "context" + "errors" + "fmt" + "io" + "path" + "strings" + "time" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/accounting" + "github.com/rclone/rclone/fs/fserrors" + "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/lib/atexit" + "github.com/rclone/rclone/lib/pacer" + "github.com/rclone/rclone/lib/random" +) + +// Used to remove a failed copy +// +// Returns whether the file was successfully removed or not +func removeFailedCopy(ctx context.Context, dst fs.Object) bool { + if dst == nil { + return false + } + fs.Infof(dst, "Removing failed copy") + removeErr := dst.Remove(ctx) + if removeErr != nil { + fs.Infof(dst, "Failed to remove failed copy: %s", removeErr) + return false + } + return true +} + +// Used to remove a failed partial copy +// +// Returns whether the file was successfully removed or not +func removeFailedPartialCopy(ctx context.Context, f fs.Fs, remotePartial string) bool { + o, err := f.NewObject(ctx, remotePartial) + if errors.Is(err, fs.ErrorObjectNotFound) { + return true + } else if err != nil { + fs.Infof(remotePartial, "Failed to remove failed partial copy: %s", err) + return false + } + return removeFailedCopy(ctx, o) +} + +// Copy src object to dst or f if nil. If dst is nil then it uses +// remote as the name of the new object. +// +// It returns the destination object if possible. Note that this may +// be nil. +func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) { + ci := fs.GetConfig(ctx) + tr := accounting.Stats(ctx).NewTransfer(src) + defer func() { + tr.Done(ctx, err) + }() + newDst = dst + if SkipDestructive(ctx, src, "copy") { + in := tr.Account(ctx, nil) + in.DryRun(src.Size()) + return newDst, nil + } + maxTries := ci.LowLevelRetries + tries := 0 + doUpdate := dst != nil + hashType, hashOption := CommonHash(ctx, f, src.Fs()) + + if dst != nil { + remote = dst.Remote() + } + + var ( + inplace = true + remotePartial = remote + ) + if !ci.Inplace && f.Features().Move != nil && f.Features().PartialUploads && !strings.HasSuffix(remote, ".rclonelink") { + if len(ci.PartialSuffix) > 16 { + return nil, fmt.Errorf("expecting length of --partial-suffix to be not greater than %d but got %d", 16, len(ci.PartialSuffix)) + } + + // Avoid making the leaf name longer if it's already lengthy to avoid + // trouble with file name length limits. + suffix := "." + random.String(8) + ci.PartialSuffix + base := path.Base(remotePartial) + if len(base) > 100 { + remotePartial = remotePartial[:len(remotePartial)-len(suffix)] + suffix + } else { + remotePartial += suffix + } + inplace = false + } + + var actionTaken string + for { + // Try server-side copy first - if has optional interface and + // is same underlying remote + actionTaken = "Copied (server-side copy)" + if ci.MaxTransfer >= 0 { + var bytesSoFar int64 + if ci.CutoffMode == fs.CutoffModeCautious { + bytesSoFar = accounting.Stats(ctx).GetBytesWithPending() + src.Size() + } else { + bytesSoFar = accounting.Stats(ctx).GetBytes() + } + if bytesSoFar >= int64(ci.MaxTransfer) { + if ci.CutoffMode == fs.CutoffModeHard { + return nil, accounting.ErrorMaxTransferLimitReachedFatal + } + return nil, accounting.ErrorMaxTransferLimitReachedGraceful + } + } + if doCopy := f.Features().Copy; doCopy != nil && (SameConfig(src.Fs(), f) || (SameRemoteType(src.Fs(), f) && (f.Features().ServerSideAcrossConfigs || ci.ServerSideAcrossConfigs))) { + in := tr.Account(ctx, nil) // account the transfer + in.ServerSideTransferStart() + newDst, err = doCopy(ctx, src, remote) + if err == nil { + dst = newDst + in.ServerSideCopyEnd(dst.Size()) // account the bytes for the server-side transfer + _ = in.Close() + inplace = true + } else { + _ = in.Close() + } + if errors.Is(err, fs.ErrorCantCopy) { + tr.Reset(ctx) // skip incomplete accounting - will be overwritten by the manual copy below + } + } else { + err = fs.ErrorCantCopy + } + // If can't server-side copy, do it manually + if errors.Is(err, fs.ErrorCantCopy) { + // Remove partial files on premature exit + var atexitRemovePartial atexit.FnHandle + if !inplace { + atexitRemovePartial = atexit.Register(func() { + ctx := context.Background() + removeFailedPartialCopy(ctx, f, remotePartial) + }) + } + + uploadOptions := []fs.OpenOption{hashOption} + for _, option := range ci.UploadHeaders { + uploadOptions = append(uploadOptions, option) + } + if ci.MetadataSet != nil { + uploadOptions = append(uploadOptions, fs.MetadataOption(ci.MetadataSet)) + } + + if doMultiThreadCopy(ctx, f, src) { + dst, err = multiThreadCopy(ctx, f, remotePartial, src, ci.MultiThreadStreams, tr, uploadOptions...) + if err == nil { + newDst = dst + } + if doUpdate { + actionTaken = "Multi-thread Copied (replaced existing)" + } else { + actionTaken = "Multi-thread Copied (new)" + } + } else { + var in0 io.ReadCloser + options := []fs.OpenOption{hashOption} + for _, option := range ci.DownloadHeaders { + options = append(options, option) + } + in0, err = Open(ctx, src, options...) + if err != nil { + err = fmt.Errorf("failed to open source object: %w", err) + } else { + if src.Size() == -1 { + // -1 indicates unknown size. Use Rcat to handle both remotes supporting and not supporting PutStream. + if doUpdate { + actionTaken = "Copied (Rcat, replaced existing)" + } else { + actionTaken = "Copied (Rcat, new)" + } + // Make any metadata to pass to rcat + var meta fs.Metadata + if ci.Metadata { + meta, err = fs.GetMetadata(ctx, src) + if err != nil { + fs.Errorf(src, "Failed to read metadata: %v", err) + } + } + // NB Rcat closes in0 + dst, err = Rcat(ctx, f, remotePartial, in0, src.ModTime(ctx), meta) + newDst = dst + } else { + in := tr.Account(ctx, in0).WithBuffer() // account and buffer the transfer + var wrappedSrc fs.ObjectInfo = src + // We try to pass the original object if possible + if src.Remote() != remotePartial { + wrappedSrc = fs.NewOverrideRemote(src, remotePartial) + } + if doUpdate && inplace { + err = dst.Update(ctx, in, wrappedSrc, uploadOptions...) + } else { + dst, err = f.Put(ctx, in, wrappedSrc, uploadOptions...) + } + if doUpdate { + actionTaken = "Copied (replaced existing)" + } else { + actionTaken = "Copied (new)" + } + closeErr := in.Close() + if err == nil { + newDst = dst + err = closeErr + } + } + } + } + if !inplace { + atexit.Unregister(atexitRemovePartial) + } + + } + tries++ + if tries >= maxTries { + break + } + // Retry if err returned a retry error + if fserrors.ContextError(ctx, &err) { + break + } + var retry bool + if fserrors.IsRetryError(err) || fserrors.ShouldRetry(err) { + retry = true + } else if t, ok := pacer.IsRetryAfter(err); ok { + fs.Debugf(src, "Sleeping for %v (as indicated by the server) to obey Retry-After error: %v", t, err) + time.Sleep(t) + retry = true + } + if retry { + fs.Debugf(src, "Received error: %v - low level retry %d/%d", err, tries, maxTries) + tr.Reset(ctx) // skip incomplete accounting - will be overwritten by retry + continue + } + // otherwise finish + break + } + if err != nil { + err = fs.CountError(err) + fs.Errorf(src, "Failed to copy: %v", err) + if !inplace { + removeFailedPartialCopy(ctx, f, remotePartial) + } + return newDst, err + } + + // Verify sizes are the same after transfer + if sizeDiffers(ctx, src, dst) { + err = fmt.Errorf("corrupted on transfer: sizes differ %d vs %d", src.Size(), dst.Size()) + fs.Errorf(dst, "%v", err) + err = fs.CountError(err) + removeFailedCopy(ctx, dst) + return newDst, err + } + + // Verify hashes are the same after transfer - ignoring blank hashes + if hashType != hash.None { + // checkHashes has logged and counted errors + equal, _, srcSum, dstSum, _ := checkHashes(ctx, src, dst, hashType) + if !equal { + err = fmt.Errorf("corrupted on transfer: %v hash differ %q vs %q", hashType, srcSum, dstSum) + fs.Errorf(dst, "%v", err) + err = fs.CountError(err) + removeFailedCopy(ctx, dst) + return newDst, err + } + } + + // Move the copied file to its real destination. + if err == nil && !inplace && remotePartial != remote { + dst, err = f.Features().Move(ctx, newDst, remote) + if err == nil { + fs.Debugf(newDst, "renamed to: %s", remote) + newDst = dst + } else { + fs.Errorf(newDst, "partial file rename failed: %v", err) + err = fs.CountError(err) + removeFailedCopy(ctx, newDst) + return newDst, err + } + } + + if newDst != nil && src.String() != newDst.String() { + actionTaken = fmt.Sprintf("%s to: %s", actionTaken, newDst.String()) + } + fs.Infof(src, "%s%s", actionTaken, fs.LogValueHide("size", fs.SizeSuffix(src.Size()))) + return newDst, err +} + +// CopyFile moves a single file possibly to a new name +func CopyFile(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, dstFileName string, srcFileName string) (err error) { + return moveOrCopyFile(ctx, fdst, fsrc, dstFileName, srcFileName, true) +} diff --git a/fs/operations/copy_test.go b/fs/operations/copy_test.go new file mode 100644 index 000000000..a2d7e5356 --- /dev/null +++ b/fs/operations/copy_test.go @@ -0,0 +1,375 @@ +package operations_test + +import ( + "context" + "crypto/rand" + "errors" + "strings" + "testing" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/accounting" + "github.com/rclone/rclone/fs/operations" + "github.com/rclone/rclone/fstest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCopyFile(t *testing.T) { + ctx := context.Background() + r := fstest.NewRun(t) + + file1 := r.WriteFile("file1", "file1 contents", t1) + r.CheckLocalItems(t, file1) + + file2 := file1 + file2.Path = "sub/file2" + + err := operations.CopyFile(ctx, r.Fremote, r.Flocal, file2.Path, file1.Path) + require.NoError(t, err) + r.CheckLocalItems(t, file1) + r.CheckRemoteItems(t, file2) + + err = operations.CopyFile(ctx, r.Fremote, r.Flocal, file2.Path, file1.Path) + require.NoError(t, err) + r.CheckLocalItems(t, file1) + r.CheckRemoteItems(t, file2) + + err = operations.CopyFile(ctx, r.Fremote, r.Fremote, file2.Path, file2.Path) + require.NoError(t, err) + r.CheckLocalItems(t, file1) + r.CheckRemoteItems(t, file2) +} + +func TestCopyFileBackupDir(t *testing.T) { + ctx := context.Background() + ctx, ci := fs.AddConfig(ctx) + r := fstest.NewRun(t) + if !operations.CanServerSideMove(r.Fremote) { + t.Skip("Skipping test as remote does not support server-side move or copy") + } + + ci.BackupDir = r.FremoteName + "/backup" + + file1 := r.WriteFile("dst/file1", "file1 contents", t1) + r.CheckLocalItems(t, file1) + + file1old := r.WriteObject(ctx, "dst/file1", "file1 contents old", t1) + r.CheckRemoteItems(t, file1old) + + err := operations.CopyFile(ctx, r.Fremote, r.Flocal, file1.Path, file1.Path) + require.NoError(t, err) + r.CheckLocalItems(t, file1) + file1old.Path = "backup/dst/file1" + r.CheckRemoteItems(t, file1old, file1) +} + +// Test with CompareDest set +func TestCopyFileCompareDest(t *testing.T) { + ctx := context.Background() + ctx, ci := fs.AddConfig(ctx) + r := fstest.NewRun(t) + + ci.CompareDest = []string{r.FremoteName + "/CompareDest"} + fdst, err := fs.NewFs(ctx, r.FremoteName+"/dst") + require.NoError(t, err) + + // check empty dest, empty compare + file1 := r.WriteFile("one", "one", t1) + r.CheckLocalItems(t, file1) + + err = operations.CopyFile(ctx, fdst, r.Flocal, file1.Path, file1.Path) + require.NoError(t, err) + + file1dst := file1 + file1dst.Path = "dst/one" + + r.CheckRemoteItems(t, file1dst) + + // check old dest, empty compare + file1b := r.WriteFile("one", "onet2", t2) + r.CheckRemoteItems(t, file1dst) + r.CheckLocalItems(t, file1b) + + err = operations.CopyFile(ctx, fdst, r.Flocal, file1b.Path, file1b.Path) + require.NoError(t, err) + + file1bdst := file1b + file1bdst.Path = "dst/one" + + r.CheckRemoteItems(t, file1bdst) + + // check old dest, new compare + file3 := r.WriteObject(ctx, "dst/one", "one", t1) + file2 := r.WriteObject(ctx, "CompareDest/one", "onet2", t2) + file1c := r.WriteFile("one", "onet2", t2) + r.CheckRemoteItems(t, file2, file3) + r.CheckLocalItems(t, file1c) + + err = operations.CopyFile(ctx, fdst, r.Flocal, file1c.Path, file1c.Path) + require.NoError(t, err) + + r.CheckRemoteItems(t, file2, file3) + + // check empty dest, new compare + file4 := r.WriteObject(ctx, "CompareDest/two", "two", t2) + file5 := r.WriteFile("two", "two", t2) + r.CheckRemoteItems(t, file2, file3, file4) + r.CheckLocalItems(t, file1c, file5) + + err = operations.CopyFile(ctx, fdst, r.Flocal, file5.Path, file5.Path) + require.NoError(t, err) + + r.CheckRemoteItems(t, file2, file3, file4) + + // check new dest, new compare + err = operations.CopyFile(ctx, fdst, r.Flocal, file5.Path, file5.Path) + require.NoError(t, err) + + r.CheckRemoteItems(t, file2, file3, file4) + + // check empty dest, old compare + file5b := r.WriteFile("two", "twot3", t3) + r.CheckRemoteItems(t, file2, file3, file4) + r.CheckLocalItems(t, file1c, file5b) + + err = operations.CopyFile(ctx, fdst, r.Flocal, file5b.Path, file5b.Path) + require.NoError(t, err) + + file5bdst := file5b + file5bdst.Path = "dst/two" + + r.CheckRemoteItems(t, file2, file3, file4, file5bdst) +} + +// Test with CopyDest set +func TestCopyFileCopyDest(t *testing.T) { + ctx := context.Background() + ctx, ci := fs.AddConfig(ctx) + r := fstest.NewRun(t) + + if r.Fremote.Features().Copy == nil { + t.Skip("Skipping test as remote does not support server-side copy") + } + + ci.CopyDest = []string{r.FremoteName + "/CopyDest"} + + fdst, err := fs.NewFs(ctx, r.FremoteName+"/dst") + require.NoError(t, err) + + // check empty dest, empty copy + file1 := r.WriteFile("one", "one", t1) + r.CheckLocalItems(t, file1) + + err = operations.CopyFile(ctx, fdst, r.Flocal, file1.Path, file1.Path) + require.NoError(t, err) + + file1dst := file1 + file1dst.Path = "dst/one" + + r.CheckRemoteItems(t, file1dst) + + // check old dest, empty copy + file1b := r.WriteFile("one", "onet2", t2) + r.CheckRemoteItems(t, file1dst) + r.CheckLocalItems(t, file1b) + + err = operations.CopyFile(ctx, fdst, r.Flocal, file1b.Path, file1b.Path) + require.NoError(t, err) + + file1bdst := file1b + file1bdst.Path = "dst/one" + + r.CheckRemoteItems(t, file1bdst) + + // check old dest, new copy, backup-dir + + ci.BackupDir = r.FremoteName + "/BackupDir" + + file3 := r.WriteObject(ctx, "dst/one", "one", t1) + file2 := r.WriteObject(ctx, "CopyDest/one", "onet2", t2) + file1c := r.WriteFile("one", "onet2", t2) + r.CheckRemoteItems(t, file2, file3) + r.CheckLocalItems(t, file1c) + + err = operations.CopyFile(ctx, fdst, r.Flocal, file1c.Path, file1c.Path) + require.NoError(t, err) + + file2dst := file2 + file2dst.Path = "dst/one" + file3.Path = "BackupDir/one" + + r.CheckRemoteItems(t, file2, file2dst, file3) + ci.BackupDir = "" + + // check empty dest, new copy + file4 := r.WriteObject(ctx, "CopyDest/two", "two", t2) + file5 := r.WriteFile("two", "two", t2) + r.CheckRemoteItems(t, file2, file2dst, file3, file4) + r.CheckLocalItems(t, file1c, file5) + + err = operations.CopyFile(ctx, fdst, r.Flocal, file5.Path, file5.Path) + require.NoError(t, err) + + file4dst := file4 + file4dst.Path = "dst/two" + + r.CheckRemoteItems(t, file2, file2dst, file3, file4, file4dst) + + // check new dest, new copy + err = operations.CopyFile(ctx, fdst, r.Flocal, file5.Path, file5.Path) + require.NoError(t, err) + + r.CheckRemoteItems(t, file2, file2dst, file3, file4, file4dst) + + // check empty dest, old copy + file6 := r.WriteObject(ctx, "CopyDest/three", "three", t2) + file7 := r.WriteFile("three", "threet3", t3) + r.CheckRemoteItems(t, file2, file2dst, file3, file4, file4dst, file6) + r.CheckLocalItems(t, file1c, file5, file7) + + err = operations.CopyFile(ctx, fdst, r.Flocal, file7.Path, file7.Path) + require.NoError(t, err) + + file7dst := file7 + file7dst.Path = "dst/three" + + r.CheckRemoteItems(t, file2, file2dst, file3, file4, file4dst, file6, file7dst) +} + +func TestCopyInplace(t *testing.T) { + ctx := context.Background() + ctx, ci := fs.AddConfig(ctx) + r := fstest.NewRun(t) + + if !r.Fremote.Features().PartialUploads { + t.Skip("Partial uploads not supported") + } + + ci.Inplace = true + + file1 := r.WriteFile("file1", "file1 contents", t1) + r.CheckLocalItems(t, file1) + + file2 := file1 + file2.Path = "sub/file2" + + err := operations.CopyFile(ctx, r.Fremote, r.Flocal, file2.Path, file1.Path) + require.NoError(t, err) + r.CheckLocalItems(t, file1) + r.CheckRemoteItems(t, file2) + + err = operations.CopyFile(ctx, r.Fremote, r.Flocal, file2.Path, file1.Path) + require.NoError(t, err) + r.CheckLocalItems(t, file1) + r.CheckRemoteItems(t, file2) + + err = operations.CopyFile(ctx, r.Fremote, r.Fremote, file2.Path, file2.Path) + require.NoError(t, err) + r.CheckLocalItems(t, file1) + r.CheckRemoteItems(t, file2) +} + +func TestCopyLongFileName(t *testing.T) { + ctx := context.Background() + ctx, ci := fs.AddConfig(ctx) + r := fstest.NewRun(t) + + if !r.Fremote.Features().PartialUploads { + t.Skip("Partial uploads not supported") + } + + ci.Inplace = false // the default + + file1 := r.WriteFile("file1", "file1 contents", t1) + r.CheckLocalItems(t, file1) + + file2 := file1 + file2.Path = "sub/" + strings.Repeat("file2", 30) + + err := operations.CopyFile(ctx, r.Fremote, r.Flocal, file2.Path, file1.Path) + require.NoError(t, err) + r.CheckLocalItems(t, file1) + r.CheckRemoteItems(t, file2) + + err = operations.CopyFile(ctx, r.Fremote, r.Flocal, file2.Path, file1.Path) + require.NoError(t, err) + r.CheckLocalItems(t, file1) + r.CheckRemoteItems(t, file2) + + err = operations.CopyFile(ctx, r.Fremote, r.Fremote, file2.Path, file2.Path) + require.NoError(t, err) + r.CheckLocalItems(t, file1) + r.CheckRemoteItems(t, file2) +} + +func TestCopyFileMaxTransfer(t *testing.T) { + ctx := context.Background() + ctx, ci := fs.AddConfig(ctx) + r := fstest.NewRun(t) + defer accounting.Stats(ctx).ResetCounters() + + const sizeCutoff = 2048 + + // Make random incompressible data + randomData := make([]byte, sizeCutoff) + _, err := rand.Read(randomData) + require.NoError(t, err) + randomString := string(randomData) + + file1 := r.WriteFile("TestCopyFileMaxTransfer/file1", "file1 contents", t1) + file2 := r.WriteFile("TestCopyFileMaxTransfer/file2", "file2 contents"+randomString, t2) + file3 := r.WriteFile("TestCopyFileMaxTransfer/file3", "file3 contents"+randomString, t2) + file4 := r.WriteFile("TestCopyFileMaxTransfer/file4", "file4 contents"+randomString, t2) + + // Cutoff mode: Hard + ci.MaxTransfer = sizeCutoff + ci.CutoffMode = fs.CutoffModeHard + + // file1: Show a small file gets transferred OK + accounting.Stats(ctx).ResetCounters() + err = operations.CopyFile(ctx, r.Fremote, r.Flocal, file1.Path, file1.Path) + require.NoError(t, err) + r.CheckLocalItems(t, file1, file2, file3, file4) + r.CheckRemoteItems(t, file1) + + // file2: show a large file does not get transferred + accounting.Stats(ctx).ResetCounters() + err = operations.CopyFile(ctx, r.Fremote, r.Flocal, file2.Path, file2.Path) + require.NotNil(t, err, "Did not get expected max transfer limit error") + if !errors.Is(err, accounting.ErrorMaxTransferLimitReachedFatal) { + t.Log("Expecting error to contain accounting.ErrorMaxTransferLimitReachedFatal") + // Sometimes the backends or their SDKs don't pass the + // error through properly, so check that it at least + // has the text we expect in. + assert.Contains(t, err.Error(), "max transfer limit reached") + } + r.CheckLocalItems(t, file1, file2, file3, file4) + r.CheckRemoteItems(t, file1) + + // Cutoff mode: Cautious + ci.CutoffMode = fs.CutoffModeCautious + + // file3: show a large file does not get transferred + accounting.Stats(ctx).ResetCounters() + err = operations.CopyFile(ctx, r.Fremote, r.Flocal, file3.Path, file3.Path) + require.NotNil(t, err) + assert.True(t, errors.Is(err, accounting.ErrorMaxTransferLimitReachedGraceful)) + r.CheckLocalItems(t, file1, file2, file3, file4) + r.CheckRemoteItems(t, file1) + + if isChunker(r.Fremote) { + t.Log("skipping remainder of test for chunker as it involves multiple transfers") + return + } + + // Cutoff mode: Soft + ci.CutoffMode = fs.CutoffModeSoft + + // file4: show a large file does get transferred this time + accounting.Stats(ctx).ResetCounters() + err = operations.CopyFile(ctx, r.Fremote, r.Flocal, file4.Path, file4.Path) + require.NoError(t, err) + r.CheckLocalItems(t, file1, file2, file3, file4) + r.CheckRemoteItems(t, file1, file4) +} diff --git a/fs/operations/operations.go b/fs/operations/operations.go index 318cd6765..8fb55c184 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -34,7 +34,6 @@ import ( "github.com/rclone/rclone/fs/object" "github.com/rclone/rclone/fs/walk" "github.com/rclone/rclone/lib/atexit" - "github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/random" "github.com/rclone/rclone/lib/readers" "golang.org/x/sync/errgroup" @@ -281,36 +280,6 @@ func equal(ctx context.Context, src fs.ObjectInfo, dst fs.Object, opt equalOpt) return true } -// Used to remove a failed copy -// -// Returns whether the file was successfully removed or not -func removeFailedCopy(ctx context.Context, dst fs.Object) bool { - if dst == nil { - return false - } - fs.Infof(dst, "Removing failed copy") - removeErr := dst.Remove(ctx) - if removeErr != nil { - fs.Infof(dst, "Failed to remove failed copy: %s", removeErr) - return false - } - return true -} - -// Used to remove a failed partial copy -// -// Returns whether the file was successfully removed or not -func removeFailedPartialCopy(ctx context.Context, f fs.Fs, remotePartial string) bool { - o, err := f.NewObject(ctx, remotePartial) - if errors.Is(err, fs.ErrorObjectNotFound) { - return true - } else if err != nil { - fs.Infof(remotePartial, "Failed to remove failed partial copy: %s", err) - return false - } - return removeFailedCopy(ctx, o) -} - // CommonHash returns a single hash.Type and a HashOption with that // type which is in common between the two fs.Fs. func CommonHash(ctx context.Context, fa, fb fs.Info) (hash.Type, *fs.HashesOption) { @@ -328,253 +297,6 @@ func CommonHash(ctx context.Context, fa, fb fs.Info) (hash.Type, *fs.HashesOptio return hashType, &fs.HashesOption{Hashes: common} } -// Copy src object to dst or f if nil. If dst is nil then it uses -// remote as the name of the new object. -// -// It returns the destination object if possible. Note that this may -// be nil. -func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) { - ci := fs.GetConfig(ctx) - tr := accounting.Stats(ctx).NewTransfer(src) - defer func() { - tr.Done(ctx, err) - }() - newDst = dst - if SkipDestructive(ctx, src, "copy") { - in := tr.Account(ctx, nil) - in.DryRun(src.Size()) - return newDst, nil - } - maxTries := ci.LowLevelRetries - tries := 0 - doUpdate := dst != nil - hashType, hashOption := CommonHash(ctx, f, src.Fs()) - - if dst != nil { - remote = dst.Remote() - } - - var ( - inplace = true - remotePartial = remote - ) - if !ci.Inplace && f.Features().Move != nil && f.Features().PartialUploads && !strings.HasSuffix(remote, ".rclonelink") { - if len(ci.PartialSuffix) > 16 { - return nil, fmt.Errorf("expecting length of --partial-suffix to be not greater than %d but got %d", 16, len(ci.PartialSuffix)) - } - - // Avoid making the leaf name longer if it's already lengthy to avoid - // trouble with file name length limits. - suffix := "." + random.String(8) + ci.PartialSuffix - base := path.Base(remotePartial) - if len(base) > 100 { - remotePartial = remotePartial[:len(remotePartial)-len(suffix)] + suffix - } else { - remotePartial += suffix - } - inplace = false - } - - var actionTaken string - for { - // Try server-side copy first - if has optional interface and - // is same underlying remote - actionTaken = "Copied (server-side copy)" - if ci.MaxTransfer >= 0 { - var bytesSoFar int64 - if ci.CutoffMode == fs.CutoffModeCautious { - bytesSoFar = accounting.Stats(ctx).GetBytesWithPending() + src.Size() - } else { - bytesSoFar = accounting.Stats(ctx).GetBytes() - } - if bytesSoFar >= int64(ci.MaxTransfer) { - if ci.CutoffMode == fs.CutoffModeHard { - return nil, accounting.ErrorMaxTransferLimitReachedFatal - } - return nil, accounting.ErrorMaxTransferLimitReachedGraceful - } - } - if doCopy := f.Features().Copy; doCopy != nil && (SameConfig(src.Fs(), f) || (SameRemoteType(src.Fs(), f) && (f.Features().ServerSideAcrossConfigs || ci.ServerSideAcrossConfigs))) { - in := tr.Account(ctx, nil) // account the transfer - in.ServerSideTransferStart() - newDst, err = doCopy(ctx, src, remote) - if err == nil { - dst = newDst - in.ServerSideCopyEnd(dst.Size()) // account the bytes for the server-side transfer - _ = in.Close() - inplace = true - } else { - _ = in.Close() - } - if errors.Is(err, fs.ErrorCantCopy) { - tr.Reset(ctx) // skip incomplete accounting - will be overwritten by the manual copy below - } - } else { - err = fs.ErrorCantCopy - } - // If can't server-side copy, do it manually - if errors.Is(err, fs.ErrorCantCopy) { - // Remove partial files on premature exit - var atexitRemovePartial atexit.FnHandle - if !inplace { - atexitRemovePartial = atexit.Register(func() { - ctx := context.Background() - removeFailedPartialCopy(ctx, f, remotePartial) - }) - } - - uploadOptions := []fs.OpenOption{hashOption} - for _, option := range ci.UploadHeaders { - uploadOptions = append(uploadOptions, option) - } - if ci.MetadataSet != nil { - uploadOptions = append(uploadOptions, fs.MetadataOption(ci.MetadataSet)) - } - - if doMultiThreadCopy(ctx, f, src) { - dst, err = multiThreadCopy(ctx, f, remotePartial, src, ci.MultiThreadStreams, tr, uploadOptions...) - if err == nil { - newDst = dst - } - if doUpdate { - actionTaken = "Multi-thread Copied (replaced existing)" - } else { - actionTaken = "Multi-thread Copied (new)" - } - } else { - var in0 io.ReadCloser - options := []fs.OpenOption{hashOption} - for _, option := range ci.DownloadHeaders { - options = append(options, option) - } - in0, err = Open(ctx, src, options...) - if err != nil { - err = fmt.Errorf("failed to open source object: %w", err) - } else { - if src.Size() == -1 { - // -1 indicates unknown size. Use Rcat to handle both remotes supporting and not supporting PutStream. - if doUpdate { - actionTaken = "Copied (Rcat, replaced existing)" - } else { - actionTaken = "Copied (Rcat, new)" - } - // Make any metadata to pass to rcat - var meta fs.Metadata - if ci.Metadata { - meta, err = fs.GetMetadata(ctx, src) - if err != nil { - fs.Errorf(src, "Failed to read metadata: %v", err) - } - } - // NB Rcat closes in0 - dst, err = Rcat(ctx, f, remotePartial, in0, src.ModTime(ctx), meta) - newDst = dst - } else { - in := tr.Account(ctx, in0).WithBuffer() // account and buffer the transfer - var wrappedSrc fs.ObjectInfo = src - // We try to pass the original object if possible - if src.Remote() != remotePartial { - wrappedSrc = fs.NewOverrideRemote(src, remotePartial) - } - if doUpdate && inplace { - err = dst.Update(ctx, in, wrappedSrc, uploadOptions...) - } else { - dst, err = f.Put(ctx, in, wrappedSrc, uploadOptions...) - } - if doUpdate { - actionTaken = "Copied (replaced existing)" - } else { - actionTaken = "Copied (new)" - } - closeErr := in.Close() - if err == nil { - newDst = dst - err = closeErr - } - } - } - } - if !inplace { - atexit.Unregister(atexitRemovePartial) - } - - } - tries++ - if tries >= maxTries { - break - } - // Retry if err returned a retry error - if fserrors.ContextError(ctx, &err) { - break - } - var retry bool - if fserrors.IsRetryError(err) || fserrors.ShouldRetry(err) { - retry = true - } else if t, ok := pacer.IsRetryAfter(err); ok { - fs.Debugf(src, "Sleeping for %v (as indicated by the server) to obey Retry-After error: %v", t, err) - time.Sleep(t) - retry = true - } - if retry { - fs.Debugf(src, "Received error: %v - low level retry %d/%d", err, tries, maxTries) - tr.Reset(ctx) // skip incomplete accounting - will be overwritten by retry - continue - } - // otherwise finish - break - } - if err != nil { - err = fs.CountError(err) - fs.Errorf(src, "Failed to copy: %v", err) - if !inplace { - removeFailedPartialCopy(ctx, f, remotePartial) - } - return newDst, err - } - - // Verify sizes are the same after transfer - if sizeDiffers(ctx, src, dst) { - err = fmt.Errorf("corrupted on transfer: sizes differ %d vs %d", src.Size(), dst.Size()) - fs.Errorf(dst, "%v", err) - err = fs.CountError(err) - removeFailedCopy(ctx, dst) - return newDst, err - } - - // Verify hashes are the same after transfer - ignoring blank hashes - if hashType != hash.None { - // checkHashes has logged and counted errors - equal, _, srcSum, dstSum, _ := checkHashes(ctx, src, dst, hashType) - if !equal { - err = fmt.Errorf("corrupted on transfer: %v hash differ %q vs %q", hashType, srcSum, dstSum) - fs.Errorf(dst, "%v", err) - err = fs.CountError(err) - removeFailedCopy(ctx, dst) - return newDst, err - } - } - - // Move the copied file to its real destination. - if err == nil && !inplace && remotePartial != remote { - dst, err = f.Features().Move(ctx, newDst, remote) - if err == nil { - fs.Debugf(newDst, "renamed to: %s", remote) - newDst = dst - } else { - fs.Errorf(newDst, "partial file rename failed: %v", err) - err = fs.CountError(err) - removeFailedCopy(ctx, newDst) - return newDst, err - } - } - - if newDst != nil && src.String() != newDst.String() { - actionTaken = fmt.Sprintf("%s to: %s", actionTaken, newDst.String()) - } - fs.Infof(src, "%s%s", actionTaken, fs.LogValueHide("size", fs.SizeSuffix(src.Size()))) - return newDst, err -} - // SameObject returns true if src and dst could be pointing to the // same object. func SameObject(src, dst fs.Object) bool { @@ -2077,11 +1799,6 @@ func MoveFile(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, dstFileName string, s return moveOrCopyFile(ctx, fdst, fsrc, dstFileName, srcFileName, false) } -// CopyFile moves a single file possibly to a new name -func CopyFile(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, dstFileName string, srcFileName string) (err error) { - return moveOrCopyFile(ctx, fdst, fsrc, dstFileName, srcFileName, true) -} - // SetTier changes tier of object in remote func SetTier(ctx context.Context, fsrc fs.Fs, tier string) error { return ListFn(ctx, fsrc, func(o fs.Object) { diff --git a/fs/operations/operations_test.go b/fs/operations/operations_test.go index 640807ca2..02f7b108c 100644 --- a/fs/operations/operations_test.go +++ b/fs/operations/operations_test.go @@ -22,7 +22,6 @@ package operations_test import ( "bytes" "context" - "crypto/rand" "errors" "fmt" "io" @@ -1021,294 +1020,6 @@ func TestMoveFileBackupDir(t *testing.T) { r.CheckRemoteItems(t, file1old, file1) } -func TestCopyFile(t *testing.T) { - ctx := context.Background() - r := fstest.NewRun(t) - - file1 := r.WriteFile("file1", "file1 contents", t1) - r.CheckLocalItems(t, file1) - - file2 := file1 - file2.Path = "sub/file2" - - err := operations.CopyFile(ctx, r.Fremote, r.Flocal, file2.Path, file1.Path) - require.NoError(t, err) - r.CheckLocalItems(t, file1) - r.CheckRemoteItems(t, file2) - - err = operations.CopyFile(ctx, r.Fremote, r.Flocal, file2.Path, file1.Path) - require.NoError(t, err) - r.CheckLocalItems(t, file1) - r.CheckRemoteItems(t, file2) - - err = operations.CopyFile(ctx, r.Fremote, r.Fremote, file2.Path, file2.Path) - require.NoError(t, err) - r.CheckLocalItems(t, file1) - r.CheckRemoteItems(t, file2) -} - -func TestCopyFileBackupDir(t *testing.T) { - ctx := context.Background() - ctx, ci := fs.AddConfig(ctx) - r := fstest.NewRun(t) - if !operations.CanServerSideMove(r.Fremote) { - t.Skip("Skipping test as remote does not support server-side move or copy") - } - - ci.BackupDir = r.FremoteName + "/backup" - - file1 := r.WriteFile("dst/file1", "file1 contents", t1) - r.CheckLocalItems(t, file1) - - file1old := r.WriteObject(ctx, "dst/file1", "file1 contents old", t1) - r.CheckRemoteItems(t, file1old) - - err := operations.CopyFile(ctx, r.Fremote, r.Flocal, file1.Path, file1.Path) - require.NoError(t, err) - r.CheckLocalItems(t, file1) - file1old.Path = "backup/dst/file1" - r.CheckRemoteItems(t, file1old, file1) -} - -// Test with CompareDest set -func TestCopyFileCompareDest(t *testing.T) { - ctx := context.Background() - ctx, ci := fs.AddConfig(ctx) - r := fstest.NewRun(t) - - ci.CompareDest = []string{r.FremoteName + "/CompareDest"} - fdst, err := fs.NewFs(ctx, r.FremoteName+"/dst") - require.NoError(t, err) - - // check empty dest, empty compare - file1 := r.WriteFile("one", "one", t1) - r.CheckLocalItems(t, file1) - - err = operations.CopyFile(ctx, fdst, r.Flocal, file1.Path, file1.Path) - require.NoError(t, err) - - file1dst := file1 - file1dst.Path = "dst/one" - - r.CheckRemoteItems(t, file1dst) - - // check old dest, empty compare - file1b := r.WriteFile("one", "onet2", t2) - r.CheckRemoteItems(t, file1dst) - r.CheckLocalItems(t, file1b) - - err = operations.CopyFile(ctx, fdst, r.Flocal, file1b.Path, file1b.Path) - require.NoError(t, err) - - file1bdst := file1b - file1bdst.Path = "dst/one" - - r.CheckRemoteItems(t, file1bdst) - - // check old dest, new compare - file3 := r.WriteObject(ctx, "dst/one", "one", t1) - file2 := r.WriteObject(ctx, "CompareDest/one", "onet2", t2) - file1c := r.WriteFile("one", "onet2", t2) - r.CheckRemoteItems(t, file2, file3) - r.CheckLocalItems(t, file1c) - - err = operations.CopyFile(ctx, fdst, r.Flocal, file1c.Path, file1c.Path) - require.NoError(t, err) - - r.CheckRemoteItems(t, file2, file3) - - // check empty dest, new compare - file4 := r.WriteObject(ctx, "CompareDest/two", "two", t2) - file5 := r.WriteFile("two", "two", t2) - r.CheckRemoteItems(t, file2, file3, file4) - r.CheckLocalItems(t, file1c, file5) - - err = operations.CopyFile(ctx, fdst, r.Flocal, file5.Path, file5.Path) - require.NoError(t, err) - - r.CheckRemoteItems(t, file2, file3, file4) - - // check new dest, new compare - err = operations.CopyFile(ctx, fdst, r.Flocal, file5.Path, file5.Path) - require.NoError(t, err) - - r.CheckRemoteItems(t, file2, file3, file4) - - // check empty dest, old compare - file5b := r.WriteFile("two", "twot3", t3) - r.CheckRemoteItems(t, file2, file3, file4) - r.CheckLocalItems(t, file1c, file5b) - - err = operations.CopyFile(ctx, fdst, r.Flocal, file5b.Path, file5b.Path) - require.NoError(t, err) - - file5bdst := file5b - file5bdst.Path = "dst/two" - - r.CheckRemoteItems(t, file2, file3, file4, file5bdst) -} - -// Test with CopyDest set -func TestCopyFileCopyDest(t *testing.T) { - ctx := context.Background() - ctx, ci := fs.AddConfig(ctx) - r := fstest.NewRun(t) - - if r.Fremote.Features().Copy == nil { - t.Skip("Skipping test as remote does not support server-side copy") - } - - ci.CopyDest = []string{r.FremoteName + "/CopyDest"} - - fdst, err := fs.NewFs(ctx, r.FremoteName+"/dst") - require.NoError(t, err) - - // check empty dest, empty copy - file1 := r.WriteFile("one", "one", t1) - r.CheckLocalItems(t, file1) - - err = operations.CopyFile(ctx, fdst, r.Flocal, file1.Path, file1.Path) - require.NoError(t, err) - - file1dst := file1 - file1dst.Path = "dst/one" - - r.CheckRemoteItems(t, file1dst) - - // check old dest, empty copy - file1b := r.WriteFile("one", "onet2", t2) - r.CheckRemoteItems(t, file1dst) - r.CheckLocalItems(t, file1b) - - err = operations.CopyFile(ctx, fdst, r.Flocal, file1b.Path, file1b.Path) - require.NoError(t, err) - - file1bdst := file1b - file1bdst.Path = "dst/one" - - r.CheckRemoteItems(t, file1bdst) - - // check old dest, new copy, backup-dir - - ci.BackupDir = r.FremoteName + "/BackupDir" - - file3 := r.WriteObject(ctx, "dst/one", "one", t1) - file2 := r.WriteObject(ctx, "CopyDest/one", "onet2", t2) - file1c := r.WriteFile("one", "onet2", t2) - r.CheckRemoteItems(t, file2, file3) - r.CheckLocalItems(t, file1c) - - err = operations.CopyFile(ctx, fdst, r.Flocal, file1c.Path, file1c.Path) - require.NoError(t, err) - - file2dst := file2 - file2dst.Path = "dst/one" - file3.Path = "BackupDir/one" - - r.CheckRemoteItems(t, file2, file2dst, file3) - ci.BackupDir = "" - - // check empty dest, new copy - file4 := r.WriteObject(ctx, "CopyDest/two", "two", t2) - file5 := r.WriteFile("two", "two", t2) - r.CheckRemoteItems(t, file2, file2dst, file3, file4) - r.CheckLocalItems(t, file1c, file5) - - err = operations.CopyFile(ctx, fdst, r.Flocal, file5.Path, file5.Path) - require.NoError(t, err) - - file4dst := file4 - file4dst.Path = "dst/two" - - r.CheckRemoteItems(t, file2, file2dst, file3, file4, file4dst) - - // check new dest, new copy - err = operations.CopyFile(ctx, fdst, r.Flocal, file5.Path, file5.Path) - require.NoError(t, err) - - r.CheckRemoteItems(t, file2, file2dst, file3, file4, file4dst) - - // check empty dest, old copy - file6 := r.WriteObject(ctx, "CopyDest/three", "three", t2) - file7 := r.WriteFile("three", "threet3", t3) - r.CheckRemoteItems(t, file2, file2dst, file3, file4, file4dst, file6) - r.CheckLocalItems(t, file1c, file5, file7) - - err = operations.CopyFile(ctx, fdst, r.Flocal, file7.Path, file7.Path) - require.NoError(t, err) - - file7dst := file7 - file7dst.Path = "dst/three" - - r.CheckRemoteItems(t, file2, file2dst, file3, file4, file4dst, file6, file7dst) -} - -func TestCopyInplace(t *testing.T) { - ctx := context.Background() - ctx, ci := fs.AddConfig(ctx) - r := fstest.NewRun(t) - - if !r.Fremote.Features().PartialUploads { - t.Skip("Partial uploads not supported") - } - - ci.Inplace = true - - file1 := r.WriteFile("file1", "file1 contents", t1) - r.CheckLocalItems(t, file1) - - file2 := file1 - file2.Path = "sub/file2" - - err := operations.CopyFile(ctx, r.Fremote, r.Flocal, file2.Path, file1.Path) - require.NoError(t, err) - r.CheckLocalItems(t, file1) - r.CheckRemoteItems(t, file2) - - err = operations.CopyFile(ctx, r.Fremote, r.Flocal, file2.Path, file1.Path) - require.NoError(t, err) - r.CheckLocalItems(t, file1) - r.CheckRemoteItems(t, file2) - - err = operations.CopyFile(ctx, r.Fremote, r.Fremote, file2.Path, file2.Path) - require.NoError(t, err) - r.CheckLocalItems(t, file1) - r.CheckRemoteItems(t, file2) -} - -func TestCopyLongFileName(t *testing.T) { - ctx := context.Background() - ctx, ci := fs.AddConfig(ctx) - r := fstest.NewRun(t) - - if !r.Fremote.Features().PartialUploads { - t.Skip("Partial uploads not supported") - } - - ci.Inplace = false // the default - - file1 := r.WriteFile("file1", "file1 contents", t1) - r.CheckLocalItems(t, file1) - - file2 := file1 - file2.Path = "sub/" + strings.Repeat("file2", 30) - - err := operations.CopyFile(ctx, r.Fremote, r.Flocal, file2.Path, file1.Path) - require.NoError(t, err) - r.CheckLocalItems(t, file1) - r.CheckRemoteItems(t, file2) - - err = operations.CopyFile(ctx, r.Fremote, r.Flocal, file2.Path, file1.Path) - require.NoError(t, err) - r.CheckLocalItems(t, file1) - r.CheckRemoteItems(t, file2) - - err = operations.CopyFile(ctx, r.Fremote, r.Fremote, file2.Path, file2.Path) - require.NoError(t, err) - r.CheckLocalItems(t, file1) - r.CheckRemoteItems(t, file2) -} - // testFsInfo is for unit testing fs.Info type testFsInfo struct { name string @@ -1884,77 +1595,6 @@ func TestRcatSizeMetadata(t *testing.T) { } } -func TestCopyFileMaxTransfer(t *testing.T) { - ctx := context.Background() - ctx, ci := fs.AddConfig(ctx) - r := fstest.NewRun(t) - defer accounting.Stats(ctx).ResetCounters() - - const sizeCutoff = 2048 - - // Make random incompressible data - randomData := make([]byte, sizeCutoff) - _, err := rand.Read(randomData) - require.NoError(t, err) - randomString := string(randomData) - - file1 := r.WriteFile("TestCopyFileMaxTransfer/file1", "file1 contents", t1) - file2 := r.WriteFile("TestCopyFileMaxTransfer/file2", "file2 contents"+randomString, t2) - file3 := r.WriteFile("TestCopyFileMaxTransfer/file3", "file3 contents"+randomString, t2) - file4 := r.WriteFile("TestCopyFileMaxTransfer/file4", "file4 contents"+randomString, t2) - - // Cutoff mode: Hard - ci.MaxTransfer = sizeCutoff - ci.CutoffMode = fs.CutoffModeHard - - // file1: Show a small file gets transferred OK - accounting.Stats(ctx).ResetCounters() - err = operations.CopyFile(ctx, r.Fremote, r.Flocal, file1.Path, file1.Path) - require.NoError(t, err) - r.CheckLocalItems(t, file1, file2, file3, file4) - r.CheckRemoteItems(t, file1) - - // file2: show a large file does not get transferred - accounting.Stats(ctx).ResetCounters() - err = operations.CopyFile(ctx, r.Fremote, r.Flocal, file2.Path, file2.Path) - require.NotNil(t, err, "Did not get expected max transfer limit error") - if !errors.Is(err, accounting.ErrorMaxTransferLimitReachedFatal) { - t.Log("Expecting error to contain accounting.ErrorMaxTransferLimitReachedFatal") - // Sometimes the backends or their SDKs don't pass the - // error through properly, so check that it at least - // has the text we expect in. - assert.Contains(t, err.Error(), "max transfer limit reached") - } - r.CheckLocalItems(t, file1, file2, file3, file4) - r.CheckRemoteItems(t, file1) - - // Cutoff mode: Cautious - ci.CutoffMode = fs.CutoffModeCautious - - // file3: show a large file does not get transferred - accounting.Stats(ctx).ResetCounters() - err = operations.CopyFile(ctx, r.Fremote, r.Flocal, file3.Path, file3.Path) - require.NotNil(t, err) - assert.True(t, errors.Is(err, accounting.ErrorMaxTransferLimitReachedGraceful)) - r.CheckLocalItems(t, file1, file2, file3, file4) - r.CheckRemoteItems(t, file1) - - if isChunker(r.Fremote) { - t.Log("skipping remainder of test for chunker as it involves multiple transfers") - return - } - - // Cutoff mode: Soft - ci.CutoffMode = fs.CutoffModeSoft - - // file4: show a large file does get transferred this time - accounting.Stats(ctx).ResetCounters() - err = operations.CopyFile(ctx, r.Fremote, r.Flocal, file4.Path, file4.Path) - require.NoError(t, err) - r.CheckLocalItems(t, file1, file2, file3, file4) - r.CheckRemoteItems(t, file1, file4) -} - func TestTouchDir(t *testing.T) { ctx := context.Background() r := fstest.NewRun(t) @@ -1968,6 +1608,7 @@ func TestTouchDir(t *testing.T) { file3 := r.WriteBoth(ctx, "sub dir/potato3", "hello", t2) r.CheckRemoteItems(t, file1, file2, file3) + accounting.GlobalStats().ResetCounters() timeValue := time.Date(2010, 9, 8, 7, 6, 5, 4, time.UTC) err := operations.TouchDir(ctx, r.Fremote, "", timeValue, true) require.NoError(t, err)