From 179f978f75ed974225cdd86f13eb89c1ff219f49 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Fri, 13 Oct 2023 12:22:50 +0100 Subject: [PATCH] operations: refactor Copy into methods on an temporary object operations.Copy had become very unwieldy. This refactors it into methods on a copy object which is created for the duration of the copy. This makes it much easier to read and reason about. --- fs/operations/copy.go | 581 ++++++++++++++++++++++++------------------ 1 file changed, 336 insertions(+), 245 deletions(-) diff --git a/fs/operations/copy.go b/fs/operations/copy.go index f7a05cb04..b396b2d8e 100644 --- a/fs/operations/copy.go +++ b/fs/operations/copy.go @@ -1,3 +1,7 @@ +// This file implements operations.Copy +// +// This is probably the most important operation in rclone. + package operations import ( @@ -18,34 +22,326 @@ import ( "github.com/rclone/rclone/lib/random" ) +// State of the copy +type copy struct { + f fs.Fs // destination fs.Fs + dstFeatures *fs.Features // Features() for fs.Fs + dst fs.Object // destination object to update, may be nil + remote string // destination path, used if dst is nil + src fs.Object // source object + ci *fs.ConfigInfo // current config + maxTries int // max number of tries to do the copy + doUpdate bool // whether we are updating an existing file or not + hashType hash.Type // common hash to use + hashOption *fs.HashesOption // open option for the common hash + tr *accounting.Transfer // accounting for the transfer + inplace bool // set if we are updating inplace and not using a partial name + remoteForCopy string // the name used for the transfer, either remote or remote+".partial" +} + // Used to remove a failed copy -// -// Returns whether the file was successfully removed or not -func removeFailedCopy(ctx context.Context, dst fs.Object) bool { - if dst == nil { - return false +func (c *copy) removeFailedCopy(ctx context.Context, o fs.Object) { + if o == nil { + return } - fs.Infof(dst, "Removing failed copy") - removeErr := dst.Remove(ctx) - if removeErr != nil { - fs.Infof(dst, "Failed to remove failed copy: %s", removeErr) - return false + fs.Infof(o, "Removing failed copy") + err := o.Remove(ctx) + if err != nil { + fs.Infof(o, "Failed to remove failed copy: %s", err) } - return true } // Used to remove a failed partial copy -// -// Returns whether the file was successfully removed or not -func removeFailedPartialCopy(ctx context.Context, f fs.Fs, remotePartial string) bool { - o, err := f.NewObject(ctx, remotePartial) +func (c *copy) removeFailedPartialCopy(ctx context.Context, f fs.Fs, remote string) { + o, err := f.NewObject(ctx, remote) if errors.Is(err, fs.ErrorObjectNotFound) { - return true - } else if err != nil { - fs.Infof(remotePartial, "Failed to remove failed partial copy: %s", err) - return false + // Assume object has been deleted + return } - return removeFailedCopy(ctx, o) + if err != nil { + fs.Infof(remote, "Failed to remove failed partial copy: %s", err) + return + } + c.removeFailedCopy(ctx, o) +} + +// Check to see if we should be using a partial name and return the name for the copy and the inplace flag +func (c *copy) checkPartial() (remoteForCopy string, inplace bool, err error) { + remoteForCopy = c.remote + if c.ci.Inplace || c.dstFeatures.Move == nil || !c.dstFeatures.PartialUploads || strings.HasSuffix(c.remote, ".rclonelink") { + return remoteForCopy, true, nil + } + if len(c.ci.PartialSuffix) > 16 { + return remoteForCopy, true, fmt.Errorf("expecting length of --partial-suffix to be not greater than %d but got %d", 16, len(c.ci.PartialSuffix)) + } + // Avoid making the leaf name longer if it's already lengthy to avoid + // trouble with file name length limits. + suffix := "." + random.String(8) + c.ci.PartialSuffix + base := path.Base(c.remoteForCopy) + if len(base) > 100 { + remoteForCopy = c.remoteForCopy[:len(c.remoteForCopy)-len(suffix)] + suffix + } else { + remoteForCopy += suffix + } + return remoteForCopy, false, nil +} + +// Check to see if we have hit max transfer limits +func (c *copy) checkLimits(ctx context.Context) (err error) { + if c.ci.MaxTransfer < 0 { + return nil + } + var bytesSoFar int64 + if c.ci.CutoffMode == fs.CutoffModeCautious { + bytesSoFar = accounting.Stats(ctx).GetBytesWithPending() + c.src.Size() + } else { + bytesSoFar = accounting.Stats(ctx).GetBytes() + } + if bytesSoFar >= int64(c.ci.MaxTransfer) { + if c.ci.CutoffMode == fs.CutoffModeHard { + return accounting.ErrorMaxTransferLimitReachedFatal + } + return accounting.ErrorMaxTransferLimitReachedGraceful + } + return nil +} + +// Server side copy c.src to (c.f, c.remoteForCopy) if possible or return fs.ErrorCantCopy if not +func (c *copy) serverSideCopy(ctx context.Context) (actionTaken string, newDst fs.Object, err error) { + doCopy := c.dstFeatures.Copy + serverSideCopyOK := false + if doCopy == nil { + serverSideCopyOK = false + } else if SameConfig(c.src.Fs(), c.f) { + serverSideCopyOK = true + } else if SameRemoteType(c.src.Fs(), c.f) { + serverSideCopyOK = c.dstFeatures.ServerSideAcrossConfigs || c.ci.ServerSideAcrossConfigs + } + if !serverSideCopyOK { + return actionTaken, nil, fs.ErrorCantCopy + } + in := c.tr.Account(ctx, nil) // account the transfer + in.ServerSideTransferStart() + newDst, err = doCopy(ctx, c.src, c.remoteForCopy) + if err == nil { + in.ServerSideCopyEnd(newDst.Size()) // account the bytes for the server-side transfer + _ = in.Close() + c.inplace = true + } else { + _ = in.Close() + } + if errors.Is(err, fs.ErrorCantCopy) { + c.tr.Reset(ctx) // skip incomplete accounting - will be overwritten by the manual copy + } + actionTaken = "Copied (server-side copy)" + return actionTaken, newDst, err +} + +// Copy c.src to (c.f, c.remoteForCopy) using multiThreadCopy +func (c *copy) multiThreadCopy(ctx context.Context, uploadOptions []fs.OpenOption) (actionTaken string, newDst fs.Object, err error) { + newDst, err = multiThreadCopy(ctx, c.f, c.remoteForCopy, c.src, c.ci.MultiThreadStreams, c.tr, uploadOptions...) + if c.doUpdate { + actionTaken = "Multi-thread Copied (replaced existing)" + } else { + actionTaken = "Multi-thread Copied (new)" + } + return actionTaken, newDst, err +} + +// Copy the stream from in to (c.f, c.remoteForCopy) and close it +// +// Use Rcat to handle both remotes supporting and not supporting PutStream. +func (c *copy) rcat(ctx context.Context, in io.ReadCloser) (actionTaken string, newDst fs.Object, err error) { + // Make any metadata to pass to rcat + var meta fs.Metadata + if c.ci.Metadata { + meta, err = fs.GetMetadata(ctx, c.src) + if err != nil { + fs.Errorf(c.src, "Failed to read metadata: %v", err) + } + } + + // NB Rcat closes in0 + newDst, err = Rcat(ctx, c.f, c.remoteForCopy, in, c.src.ModTime(ctx), meta) + if c.doUpdate { + actionTaken = "Copied (Rcat, replaced existing)" + } else { + actionTaken = "Copied (Rcat, new)" + } + return actionTaken, newDst, err +} + +// Copy the stream from in to (c.f, c.remoteForCopy) and close it +func (c *copy) updateOrPut(ctx context.Context, in io.ReadCloser, uploadOptions []fs.OpenOption) (actionTaken string, newDst fs.Object, err error) { + // account and buffer the transfer + inAcc := c.tr.Account(ctx, in).WithBuffer() + var wrappedSrc fs.ObjectInfo = c.src + + // We try to pass the original object if possible + if c.src.Remote() != c.remoteForCopy { + wrappedSrc = fs.NewOverrideRemote(c.src, c.remoteForCopy) + } + if c.doUpdate && c.inplace { + err = c.dst.Update(ctx, inAcc, wrappedSrc, uploadOptions...) + // Make sure newDst is c.dst since we updated it + if err == nil { + newDst = c.dst + } + } else { + newDst, err = c.f.Put(ctx, inAcc, wrappedSrc, uploadOptions...) + } + closeErr := inAcc.Close() + if err == nil { + err = closeErr + } + if c.doUpdate { + actionTaken = "Copied (replaced existing)" + } else { + actionTaken = "Copied (new)" + } + return actionTaken, newDst, err +} + +// Do a manual copy by reading the bytes and writing them +func (c *copy) manualCopy(ctx context.Context) (actionTaken string, newDst fs.Object, err error) { + // Remove partial files on premature exit + if !c.inplace { + defer atexit.Unregister(atexit.Register(func() { + ctx := context.Background() + c.removeFailedPartialCopy(ctx, c.f, c.remoteForCopy) + })) + } + + // Options for the upload + uploadOptions := []fs.OpenOption{c.hashOption} + for _, option := range c.ci.UploadHeaders { + uploadOptions = append(uploadOptions, option) + } + if c.ci.MetadataSet != nil { + uploadOptions = append(uploadOptions, fs.MetadataOption(c.ci.MetadataSet)) + } + + // Options for the download + downloadOptions := []fs.OpenOption{c.hashOption} + for _, option := range c.ci.DownloadHeaders { + downloadOptions = append(downloadOptions, option) + } + + if doMultiThreadCopy(ctx, c.f, c.src) { + return c.multiThreadCopy(ctx, uploadOptions) + } + + var in io.ReadCloser + in, err = Open(ctx, c.src, downloadOptions...) + if err != nil { + return actionTaken, nil, fmt.Errorf("failed to open source object: %w", err) + } + + // Note that c.rcat and c.updateOrPut close in + if c.src.Size() == -1 { + return c.rcat(ctx, in) + } + return c.updateOrPut(ctx, in, uploadOptions) +} + +// Verify the copy +func (c *copy) verify(ctx context.Context, newDst fs.Object) (err error) { + // Verify sizes are the same after transfer + if sizeDiffers(ctx, c.src, newDst) { + return fmt.Errorf("corrupted on transfer: sizes differ %d vs %d", c.src.Size(), newDst.Size()) + } + // Verify hashes are the same after transfer - ignoring blank hashes + if c.hashType != hash.None { + // checkHashes has logs and counts errors + equal, _, srcSum, dstSum, _ := checkHashes(ctx, c.src, newDst, c.hashType) + if !equal { + return fmt.Errorf("corrupted on transfer: %v hash differ %q vs %q", c.hashType, srcSum, dstSum) + } + } + return nil +} + +// copy src object to dst or f if nil. If dst is nil then it uses +// remote as the name of the new object. +// +// It returns the destination object if possible. Note that this may +// be nil. +func (c *copy) copy(ctx context.Context) (newDst fs.Object, err error) { + var actionTaken string + retry := true + for tries := 0; retry && tries < c.maxTries; tries++ { + // Check we haven't hit any accounting limits + err = c.checkLimits(ctx) + if err != nil { + return nil, err + } + + // Try server side copy + actionTaken, newDst, err = c.serverSideCopy(ctx) + + // If can't server-side copy, do it manually + if errors.Is(err, fs.ErrorCantCopy) { + actionTaken, newDst, err = c.manualCopy(ctx) + } + + // End if ctx is in error + if fserrors.ContextError(ctx, &err) { + break + } + + // Retry if err returned a retry error + retry = false + if fserrors.IsRetryError(err) || fserrors.ShouldRetry(err) { + retry = true + } else if t, ok := pacer.IsRetryAfter(err); ok { + fs.Debugf(c.src, "Sleeping for %v (as indicated by the server) to obey Retry-After error: %v", t, err) + time.Sleep(t) + retry = true + } + if retry { + fs.Debugf(c.src, "Received error: %v - low level retry %d/%d", err, tries, c.maxTries) + c.tr.Reset(ctx) // skip incomplete accounting - will be overwritten by retry + continue + } + } + if err != nil { + err = fs.CountError(err) + fs.Errorf(c.src, "Failed to copy: %v", err) + if !c.inplace { + c.removeFailedPartialCopy(ctx, c.f, c.remoteForCopy) + } + return newDst, err + } + + // Verify the copy + err = c.verify(ctx, newDst) + if err != nil { + fs.Errorf(newDst, "%v", err) + err = fs.CountError(err) + c.removeFailedCopy(ctx, newDst) + return nil, err + } + + // Move the copied file to its real destination. + if !c.inplace && c.remoteForCopy != c.remote { + movedNewDst, err := c.dstFeatures.Move(ctx, newDst, c.remote) + if err != nil { + fs.Errorf(newDst, "partial file rename failed: %v", err) + err = fs.CountError(err) + c.removeFailedCopy(ctx, newDst) + return nil, err + } + fs.Debugf(newDst, "renamed to: %s", c.remote) + newDst = movedNewDst + } + + // Log what we have done + if newDst != nil && c.src.String() != newDst.String() { + actionTaken = fmt.Sprintf("%s to: %s", actionTaken, newDst.String()) + } + fs.Infof(c.src, "%s%s", actionTaken, fs.LogValueHide("size", fs.SizeSuffix(c.src.Size()))) + + return newDst, nil } // Copy src object to dst or f if nil. If dst is nil then it uses @@ -59,240 +355,35 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj defer func() { tr.Done(ctx, err) }() - newDst = dst if SkipDestructive(ctx, src, "copy") { in := tr.Account(ctx, nil) in.DryRun(src.Size()) return newDst, nil } - maxTries := ci.LowLevelRetries - tries := 0 - doUpdate := dst != nil - hashType, hashOption := CommonHash(ctx, f, src.Fs()) - - if dst != nil { - remote = dst.Remote() + c := ©{ + f: f, + dstFeatures: f.Features(), + dst: dst, + remote: remote, + src: src, + ci: ci, + tr: tr, + maxTries: ci.LowLevelRetries, + doUpdate: dst != nil, } - - var ( - inplace = true - remotePartial = remote - ) - if !ci.Inplace && f.Features().Move != nil && f.Features().PartialUploads && !strings.HasSuffix(remote, ".rclonelink") { - if len(ci.PartialSuffix) > 16 { - return nil, fmt.Errorf("expecting length of --partial-suffix to be not greater than %d but got %d", 16, len(ci.PartialSuffix)) - } - - // Avoid making the leaf name longer if it's already lengthy to avoid - // trouble with file name length limits. - suffix := "." + random.String(8) + ci.PartialSuffix - base := path.Base(remotePartial) - if len(base) > 100 { - remotePartial = remotePartial[:len(remotePartial)-len(suffix)] + suffix - } else { - remotePartial += suffix - } - inplace = false - } - - var actionTaken string - for { - // Try server-side copy first - if has optional interface and - // is same underlying remote - actionTaken = "Copied (server-side copy)" - if ci.MaxTransfer >= 0 { - var bytesSoFar int64 - if ci.CutoffMode == fs.CutoffModeCautious { - bytesSoFar = accounting.Stats(ctx).GetBytesWithPending() + src.Size() - } else { - bytesSoFar = accounting.Stats(ctx).GetBytes() - } - if bytesSoFar >= int64(ci.MaxTransfer) { - if ci.CutoffMode == fs.CutoffModeHard { - return nil, accounting.ErrorMaxTransferLimitReachedFatal - } - return nil, accounting.ErrorMaxTransferLimitReachedGraceful - } - } - if doCopy := f.Features().Copy; doCopy != nil && (SameConfig(src.Fs(), f) || (SameRemoteType(src.Fs(), f) && (f.Features().ServerSideAcrossConfigs || ci.ServerSideAcrossConfigs))) { - in := tr.Account(ctx, nil) // account the transfer - in.ServerSideTransferStart() - newDst, err = doCopy(ctx, src, remote) - if err == nil { - dst = newDst - in.ServerSideCopyEnd(dst.Size()) // account the bytes for the server-side transfer - _ = in.Close() - inplace = true - } else { - _ = in.Close() - } - if errors.Is(err, fs.ErrorCantCopy) { - tr.Reset(ctx) // skip incomplete accounting - will be overwritten by the manual copy below - } - } else { - err = fs.ErrorCantCopy - } - // If can't server-side copy, do it manually - if errors.Is(err, fs.ErrorCantCopy) { - // Remove partial files on premature exit - var atexitRemovePartial atexit.FnHandle - if !inplace { - atexitRemovePartial = atexit.Register(func() { - ctx := context.Background() - removeFailedPartialCopy(ctx, f, remotePartial) - }) - } - - uploadOptions := []fs.OpenOption{hashOption} - for _, option := range ci.UploadHeaders { - uploadOptions = append(uploadOptions, option) - } - if ci.MetadataSet != nil { - uploadOptions = append(uploadOptions, fs.MetadataOption(ci.MetadataSet)) - } - - if doMultiThreadCopy(ctx, f, src) { - dst, err = multiThreadCopy(ctx, f, remotePartial, src, ci.MultiThreadStreams, tr, uploadOptions...) - if err == nil { - newDst = dst - } - if doUpdate { - actionTaken = "Multi-thread Copied (replaced existing)" - } else { - actionTaken = "Multi-thread Copied (new)" - } - } else { - var in0 io.ReadCloser - options := []fs.OpenOption{hashOption} - for _, option := range ci.DownloadHeaders { - options = append(options, option) - } - in0, err = Open(ctx, src, options...) - if err != nil { - err = fmt.Errorf("failed to open source object: %w", err) - } else { - if src.Size() == -1 { - // -1 indicates unknown size. Use Rcat to handle both remotes supporting and not supporting PutStream. - if doUpdate { - actionTaken = "Copied (Rcat, replaced existing)" - } else { - actionTaken = "Copied (Rcat, new)" - } - // Make any metadata to pass to rcat - var meta fs.Metadata - if ci.Metadata { - meta, err = fs.GetMetadata(ctx, src) - if err != nil { - fs.Errorf(src, "Failed to read metadata: %v", err) - } - } - // NB Rcat closes in0 - dst, err = Rcat(ctx, f, remotePartial, in0, src.ModTime(ctx), meta) - newDst = dst - } else { - in := tr.Account(ctx, in0).WithBuffer() // account and buffer the transfer - var wrappedSrc fs.ObjectInfo = src - // We try to pass the original object if possible - if src.Remote() != remotePartial { - wrappedSrc = fs.NewOverrideRemote(src, remotePartial) - } - if doUpdate && inplace { - err = dst.Update(ctx, in, wrappedSrc, uploadOptions...) - } else { - dst, err = f.Put(ctx, in, wrappedSrc, uploadOptions...) - } - if doUpdate { - actionTaken = "Copied (replaced existing)" - } else { - actionTaken = "Copied (new)" - } - closeErr := in.Close() - if err == nil { - newDst = dst - err = closeErr - } - } - } - } - if !inplace { - atexit.Unregister(atexitRemovePartial) - } - - } - tries++ - if tries >= maxTries { - break - } - // Retry if err returned a retry error - if fserrors.ContextError(ctx, &err) { - break - } - var retry bool - if fserrors.IsRetryError(err) || fserrors.ShouldRetry(err) { - retry = true - } else if t, ok := pacer.IsRetryAfter(err); ok { - fs.Debugf(src, "Sleeping for %v (as indicated by the server) to obey Retry-After error: %v", t, err) - time.Sleep(t) - retry = true - } - if retry { - fs.Debugf(src, "Received error: %v - low level retry %d/%d", err, tries, maxTries) - tr.Reset(ctx) // skip incomplete accounting - will be overwritten by retry - continue - } - // otherwise finish - break + c.hashType, c.hashOption = CommonHash(ctx, f, src.Fs()) + if c.dst != nil { + c.remote = c.dst.Remote() } + // Are we using partials? + // + // If so set the flag and update the name we use for the copy + c.remoteForCopy, c.inplace, err = c.checkPartial() if err != nil { - err = fs.CountError(err) - fs.Errorf(src, "Failed to copy: %v", err) - if !inplace { - removeFailedPartialCopy(ctx, f, remotePartial) - } - return newDst, err + return nil, err } - - // Verify sizes are the same after transfer - if sizeDiffers(ctx, src, dst) { - err = fmt.Errorf("corrupted on transfer: sizes differ %d vs %d", src.Size(), dst.Size()) - fs.Errorf(dst, "%v", err) - err = fs.CountError(err) - removeFailedCopy(ctx, dst) - return newDst, err - } - - // Verify hashes are the same after transfer - ignoring blank hashes - if hashType != hash.None { - // checkHashes has logged and counted errors - equal, _, srcSum, dstSum, _ := checkHashes(ctx, src, dst, hashType) - if !equal { - err = fmt.Errorf("corrupted on transfer: %v hash differ %q vs %q", hashType, srcSum, dstSum) - fs.Errorf(dst, "%v", err) - err = fs.CountError(err) - removeFailedCopy(ctx, dst) - return newDst, err - } - } - - // Move the copied file to its real destination. - if err == nil && !inplace && remotePartial != remote { - dst, err = f.Features().Move(ctx, newDst, remote) - if err == nil { - fs.Debugf(newDst, "renamed to: %s", remote) - newDst = dst - } else { - fs.Errorf(newDst, "partial file rename failed: %v", err) - err = fs.CountError(err) - removeFailedCopy(ctx, newDst) - return newDst, err - } - } - - if newDst != nil && src.String() != newDst.String() { - actionTaken = fmt.Sprintf("%s to: %s", actionTaken, newDst.String()) - } - fs.Infof(src, "%s%s", actionTaken, fs.LogValueHide("size", fs.SizeSuffix(src.Size()))) - return newDst, err + // Do the copy now everything is set up + return c.copy(ctx) } // CopyFile moves a single file possibly to a new name