mirror of
https://github.com/rclone/rclone.git
synced 2024-11-23 08:54:10 +01:00
e7c20e0bce
Before this change, when using rclone copy or move with --backup-dir and the source was a single file, rclone would fail to use the backup directory. This change looks up the backup directory in the Fs cache and uses it as appropriate. This affects any commands which call operations.MoveFile or operations.CopyFile which includes rclone move/moveto/copy/copyto where the source is a single file. Fixes #3219
1753 lines
49 KiB
Go
1753 lines
49 KiB
Go
// Package operations does generic operations on filesystems and objects
|
|
package operations
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/csv"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"path"
|
|
"path/filepath"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/ncw/rclone/fs"
|
|
"github.com/ncw/rclone/fs/accounting"
|
|
"github.com/ncw/rclone/fs/cache"
|
|
"github.com/ncw/rclone/fs/fserrors"
|
|
"github.com/ncw/rclone/fs/fshttp"
|
|
"github.com/ncw/rclone/fs/hash"
|
|
"github.com/ncw/rclone/fs/march"
|
|
"github.com/ncw/rclone/fs/object"
|
|
"github.com/ncw/rclone/fs/walk"
|
|
"github.com/ncw/rclone/lib/readers"
|
|
"github.com/pkg/errors"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
// CheckHashes checks the two files to see if they have common
|
|
// known hash types and compares them
|
|
//
|
|
// Returns
|
|
//
|
|
// equal - which is equality of the hashes
|
|
//
|
|
// hash - the HashType. This is HashNone if either of the hashes were
|
|
// unset or a compatible hash couldn't be found.
|
|
//
|
|
// err - may return an error which will already have been logged
|
|
//
|
|
// If an error is returned it will return equal as false
|
|
func CheckHashes(src fs.ObjectInfo, dst fs.Object) (equal bool, ht hash.Type, err error) {
|
|
common := src.Fs().Hashes().Overlap(dst.Fs().Hashes())
|
|
// fs.Debugf(nil, "Shared hashes: %v", common)
|
|
if common.Count() == 0 {
|
|
return true, hash.None, nil
|
|
}
|
|
ht = common.GetOne()
|
|
srcHash, err := src.Hash(ht)
|
|
if err != nil {
|
|
fs.CountError(err)
|
|
fs.Errorf(src, "Failed to calculate src hash: %v", err)
|
|
return false, ht, err
|
|
}
|
|
if srcHash == "" {
|
|
return true, hash.None, nil
|
|
}
|
|
dstHash, err := dst.Hash(ht)
|
|
if err != nil {
|
|
fs.CountError(err)
|
|
fs.Errorf(dst, "Failed to calculate dst hash: %v", err)
|
|
return false, ht, err
|
|
}
|
|
if dstHash == "" {
|
|
return true, hash.None, nil
|
|
}
|
|
if srcHash != dstHash {
|
|
fs.Debugf(src, "%v = %s (%v)", ht, srcHash, src.Fs())
|
|
fs.Debugf(dst, "%v = %s (%v)", ht, dstHash, dst.Fs())
|
|
}
|
|
return srcHash == dstHash, ht, nil
|
|
}
|
|
|
|
// Equal checks to see if the src and dst objects are equal by looking at
|
|
// size, mtime and hash
|
|
//
|
|
// If the src and dst size are different then it is considered to be
|
|
// not equal. If --size-only is in effect then this is the only check
|
|
// that is done. If --ignore-size is in effect then this check is
|
|
// skipped and the files are considered the same size.
|
|
//
|
|
// If the size is the same and the mtime is the same then it is
|
|
// considered to be equal. This check is skipped if using --checksum.
|
|
//
|
|
// If the size is the same and mtime is different, unreadable or
|
|
// --checksum is set and the hash is the same then the file is
|
|
// considered to be equal. In this case the mtime on the dst is
|
|
// updated if --checksum is not set.
|
|
//
|
|
// Otherwise the file is considered to be not equal including if there
|
|
// were errors reading info.
|
|
func Equal(src fs.ObjectInfo, dst fs.Object) bool {
|
|
return equal(src, dst, fs.Config.SizeOnly, fs.Config.CheckSum)
|
|
}
|
|
|
|
// sizeDiffers compare the size of src and dst taking into account the
|
|
// various ways of ignoring sizes
|
|
func sizeDiffers(src, dst fs.ObjectInfo) bool {
|
|
if fs.Config.IgnoreSize || src.Size() < 0 || dst.Size() < 0 {
|
|
return false
|
|
}
|
|
return src.Size() != dst.Size()
|
|
}
|
|
|
|
var checksumWarning sync.Once
|
|
|
|
func equal(src fs.ObjectInfo, dst fs.Object, sizeOnly, checkSum bool) bool {
|
|
if sizeDiffers(src, dst) {
|
|
fs.Debugf(src, "Sizes differ (src %d vs dst %d)", src.Size(), dst.Size())
|
|
return false
|
|
}
|
|
if sizeOnly {
|
|
fs.Debugf(src, "Sizes identical")
|
|
return true
|
|
}
|
|
|
|
// Assert: Size is equal or being ignored
|
|
|
|
// If checking checksum and not modtime
|
|
if checkSum {
|
|
// Check the hash
|
|
same, ht, _ := CheckHashes(src, dst)
|
|
if !same {
|
|
fs.Debugf(src, "%v differ", ht)
|
|
return false
|
|
}
|
|
if ht == hash.None {
|
|
checksumWarning.Do(func() {
|
|
fs.Logf(dst.Fs(), "--checksum is in use but the source and destination have no hashes in common; falling back to --size-only")
|
|
})
|
|
fs.Debugf(src, "Size of src and dst objects identical")
|
|
} else {
|
|
fs.Debugf(src, "Size and %v of src and dst objects identical", ht)
|
|
}
|
|
return true
|
|
}
|
|
|
|
// Sizes the same so check the mtime
|
|
modifyWindow := fs.GetModifyWindow(src.Fs(), dst.Fs())
|
|
if modifyWindow == fs.ModTimeNotSupported {
|
|
fs.Debugf(src, "Sizes identical")
|
|
return true
|
|
}
|
|
srcModTime := src.ModTime()
|
|
dstModTime := dst.ModTime()
|
|
dt := dstModTime.Sub(srcModTime)
|
|
if dt < modifyWindow && dt > -modifyWindow {
|
|
fs.Debugf(src, "Size and modification time the same (differ by %s, within tolerance %s)", dt, modifyWindow)
|
|
return true
|
|
}
|
|
|
|
fs.Debugf(src, "Modification times differ by %s: %v, %v", dt, srcModTime, dstModTime)
|
|
|
|
// Check if the hashes are the same
|
|
same, ht, _ := CheckHashes(src, dst)
|
|
if !same {
|
|
fs.Debugf(src, "%v differ", ht)
|
|
return false
|
|
}
|
|
if ht == hash.None {
|
|
// if couldn't check hash, return that they differ
|
|
return false
|
|
}
|
|
|
|
// mod time differs but hash is the same to reset mod time if required
|
|
if !fs.Config.NoUpdateModTime {
|
|
if fs.Config.DryRun {
|
|
fs.Logf(src, "Not updating modification time as --dry-run")
|
|
} else {
|
|
// Size and hash the same but mtime different
|
|
// Error if objects are treated as immutable
|
|
if fs.Config.Immutable {
|
|
fs.Errorf(dst, "Timestamp mismatch between immutable objects")
|
|
return false
|
|
}
|
|
// Update the mtime of the dst object here
|
|
err := dst.SetModTime(srcModTime)
|
|
if err == fs.ErrorCantSetModTime {
|
|
fs.Debugf(dst, "src and dst identical but can't set mod time without re-uploading")
|
|
return false
|
|
} else if err == fs.ErrorCantSetModTimeWithoutDelete {
|
|
fs.Debugf(dst, "src and dst identical but can't set mod time without deleting and re-uploading")
|
|
// Remove the file if BackupDir isn't set. If BackupDir is set we would rather have the old file
|
|
// put in the BackupDir than deleted which is what will happen if we don't delete it.
|
|
if fs.Config.BackupDir == "" {
|
|
err = dst.Remove()
|
|
if err != nil {
|
|
fs.Errorf(dst, "failed to delete before re-upload: %v", err)
|
|
}
|
|
}
|
|
return false
|
|
} else if err != nil {
|
|
fs.CountError(err)
|
|
fs.Errorf(dst, "Failed to set modification time: %v", err)
|
|
} else {
|
|
fs.Infof(src, "Updated modification time in destination")
|
|
}
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// Used to remove a failed copy
|
|
//
|
|
// Returns whether the file was successfully removed or not
|
|
func removeFailedCopy(dst fs.Object) bool {
|
|
if dst == nil {
|
|
return false
|
|
}
|
|
fs.Infof(dst, "Removing failed copy")
|
|
removeErr := dst.Remove()
|
|
if removeErr != nil {
|
|
fs.Infof(dst, "Failed to remove failed copy: %s", removeErr)
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// Wrapper to override the remote for an object
|
|
type overrideRemoteObject struct {
|
|
fs.Object
|
|
remote string
|
|
}
|
|
|
|
// Remote returns the overridden remote name
|
|
func (o *overrideRemoteObject) Remote() string {
|
|
return o.remote
|
|
}
|
|
|
|
// MimeType returns the mime type of the underlying object or "" if it
|
|
// can't be worked out
|
|
func (o *overrideRemoteObject) MimeType() string {
|
|
if do, ok := o.Object.(fs.MimeTyper); ok {
|
|
return do.MimeType()
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// Check interface is satisfied
|
|
var _ fs.MimeTyper = (*overrideRemoteObject)(nil)
|
|
|
|
// Copy src object to dst or f if nil. If dst is nil then it uses
|
|
// remote as the name of the new object.
|
|
//
|
|
// It returns the destination object if possible. Note that this may
|
|
// be nil.
|
|
func Copy(f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) {
|
|
accounting.Stats.Transferring(src.Remote())
|
|
defer func() {
|
|
accounting.Stats.DoneTransferring(src.Remote(), err == nil)
|
|
}()
|
|
newDst = dst
|
|
if fs.Config.DryRun {
|
|
fs.Logf(src, "Not copying as --dry-run")
|
|
return newDst, nil
|
|
}
|
|
maxTries := fs.Config.LowLevelRetries
|
|
tries := 0
|
|
doUpdate := dst != nil
|
|
// work out which hash to use - limit to 1 hash in common
|
|
var common hash.Set
|
|
hashType := hash.None
|
|
if !fs.Config.SizeOnly {
|
|
common = src.Fs().Hashes().Overlap(f.Hashes())
|
|
if common.Count() > 0 {
|
|
hashType = common.GetOne()
|
|
common = hash.Set(hashType)
|
|
}
|
|
}
|
|
hashOption := &fs.HashesOption{Hashes: common}
|
|
var actionTaken string
|
|
for {
|
|
// Try server side copy first - if has optional interface and
|
|
// is same underlying remote
|
|
actionTaken = "Copied (server side copy)"
|
|
if doCopy := f.Features().Copy; doCopy != nil && (SameConfig(src.Fs(), f) || (SameRemoteType(src.Fs(), f) && f.Features().ServerSideAcrossConfigs)) {
|
|
// Check transfer limit for server side copies
|
|
if fs.Config.MaxTransfer >= 0 && accounting.Stats.GetBytes() >= int64(fs.Config.MaxTransfer) {
|
|
return nil, accounting.ErrorMaxTransferLimitReached
|
|
}
|
|
newDst, err = doCopy(src, remote)
|
|
if err == nil {
|
|
dst = newDst
|
|
accounting.Stats.Bytes(dst.Size()) // account the bytes for the server side transfer
|
|
}
|
|
} else {
|
|
err = fs.ErrorCantCopy
|
|
}
|
|
// If can't server side copy, do it manually
|
|
if err == fs.ErrorCantCopy {
|
|
if doOpenWriterAt := f.Features().OpenWriterAt; doOpenWriterAt != nil && src.Size() >= int64(fs.Config.MultiThreadCutoff) && fs.Config.MultiThreadStreams > 1 {
|
|
// Number of streams proportional to size
|
|
streams := src.Size() / int64(fs.Config.MultiThreadCutoff)
|
|
// With maximum
|
|
if streams > int64(fs.Config.MultiThreadStreams) {
|
|
streams = int64(fs.Config.MultiThreadStreams)
|
|
}
|
|
if streams < 2 {
|
|
streams = 2
|
|
}
|
|
dst, err = multiThreadCopy(f, remote, src, int(streams))
|
|
if doUpdate {
|
|
actionTaken = "Multi-thread Copied (replaced existing)"
|
|
} else {
|
|
actionTaken = "Multi-thread Copied (new)"
|
|
}
|
|
} else {
|
|
var in0 io.ReadCloser
|
|
in0, err = newReOpen(src, hashOption, nil, fs.Config.LowLevelRetries)
|
|
if err != nil {
|
|
err = errors.Wrap(err, "failed to open source object")
|
|
} else {
|
|
if src.Size() == -1 {
|
|
// -1 indicates unknown size. Use Rcat to handle both remotes supporting and not supporting PutStream.
|
|
if doUpdate {
|
|
actionTaken = "Copied (Rcat, replaced existing)"
|
|
} else {
|
|
actionTaken = "Copied (Rcat, new)"
|
|
}
|
|
dst, err = Rcat(f, remote, in0, src.ModTime())
|
|
newDst = dst
|
|
} else {
|
|
in := accounting.NewAccount(in0, src).WithBuffer() // account and buffer the transfer
|
|
var wrappedSrc fs.ObjectInfo = src
|
|
// We try to pass the original object if possible
|
|
if src.Remote() != remote {
|
|
wrappedSrc = &overrideRemoteObject{Object: src, remote: remote}
|
|
}
|
|
if doUpdate {
|
|
actionTaken = "Copied (replaced existing)"
|
|
err = dst.Update(in, wrappedSrc, hashOption)
|
|
} else {
|
|
actionTaken = "Copied (new)"
|
|
dst, err = f.Put(in, wrappedSrc, hashOption)
|
|
}
|
|
closeErr := in.Close()
|
|
if err == nil {
|
|
newDst = dst
|
|
err = closeErr
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
tries++
|
|
if tries >= maxTries {
|
|
break
|
|
}
|
|
// Retry if err returned a retry error
|
|
if fserrors.IsRetryError(err) || fserrors.ShouldRetry(err) {
|
|
fs.Debugf(src, "Received error: %v - low level retry %d/%d", err, tries, maxTries)
|
|
continue
|
|
}
|
|
// otherwise finish
|
|
break
|
|
}
|
|
if err != nil {
|
|
fs.CountError(err)
|
|
fs.Errorf(src, "Failed to copy: %v", err)
|
|
return newDst, err
|
|
}
|
|
|
|
// Verify sizes are the same after transfer
|
|
if sizeDiffers(src, dst) {
|
|
err = errors.Errorf("corrupted on transfer: sizes differ %d vs %d", src.Size(), dst.Size())
|
|
fs.Errorf(dst, "%v", err)
|
|
fs.CountError(err)
|
|
removeFailedCopy(dst)
|
|
return newDst, err
|
|
}
|
|
|
|
// Verify hashes are the same after transfer - ignoring blank hashes
|
|
if !fs.Config.IgnoreChecksum && hashType != hash.None {
|
|
var srcSum string
|
|
srcSum, err = src.Hash(hashType)
|
|
if err != nil {
|
|
fs.CountError(err)
|
|
fs.Errorf(src, "Failed to read src hash: %v", err)
|
|
} else if srcSum != "" {
|
|
var dstSum string
|
|
dstSum, err = dst.Hash(hashType)
|
|
if err != nil {
|
|
fs.CountError(err)
|
|
fs.Errorf(dst, "Failed to read hash: %v", err)
|
|
} else if !hash.Equals(srcSum, dstSum) {
|
|
err = errors.Errorf("corrupted on transfer: %v hash differ %q vs %q", hashType, srcSum, dstSum)
|
|
fs.Errorf(dst, "%v", err)
|
|
fs.CountError(err)
|
|
removeFailedCopy(dst)
|
|
return newDst, err
|
|
}
|
|
}
|
|
}
|
|
|
|
fs.Infof(src, actionTaken)
|
|
return newDst, err
|
|
}
|
|
|
|
// Move src object to dst or fdst if nil. If dst is nil then it uses
|
|
// remote as the name of the new object.
|
|
//
|
|
// Note that you must check the destination does not exist before
|
|
// calling this and pass it as dst. If you pass dst=nil and the
|
|
// destination does exist then this may create duplicates or return
|
|
// errors.
|
|
//
|
|
// It returns the destination object if possible. Note that this may
|
|
// be nil.
|
|
func Move(fdst fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) {
|
|
accounting.Stats.Checking(src.Remote())
|
|
defer func() {
|
|
accounting.Stats.DoneChecking(src.Remote())
|
|
}()
|
|
newDst = dst
|
|
if fs.Config.DryRun {
|
|
fs.Logf(src, "Not moving as --dry-run")
|
|
return newDst, nil
|
|
}
|
|
// See if we have Move available
|
|
if doMove := fdst.Features().Move; doMove != nil && (SameConfig(src.Fs(), fdst) || (SameRemoteType(src.Fs(), fdst) && fdst.Features().ServerSideAcrossConfigs)) {
|
|
// Delete destination if it exists
|
|
if dst != nil {
|
|
err = DeleteFile(dst)
|
|
if err != nil {
|
|
return newDst, err
|
|
}
|
|
}
|
|
// Move dst <- src
|
|
newDst, err = doMove(src, remote)
|
|
switch err {
|
|
case nil:
|
|
fs.Infof(src, "Moved (server side)")
|
|
return newDst, nil
|
|
case fs.ErrorCantMove:
|
|
fs.Debugf(src, "Can't move, switching to copy")
|
|
default:
|
|
fs.CountError(err)
|
|
fs.Errorf(src, "Couldn't move: %v", err)
|
|
return newDst, err
|
|
}
|
|
}
|
|
// Move not found or didn't work so copy dst <- src
|
|
newDst, err = Copy(fdst, dst, remote, src)
|
|
if err != nil {
|
|
fs.Errorf(src, "Not deleting source as copy failed: %v", err)
|
|
return newDst, err
|
|
}
|
|
// Delete src if no error on copy
|
|
return newDst, DeleteFile(src)
|
|
}
|
|
|
|
// CanServerSideMove returns true if fdst support server side moves or
|
|
// server side copies
|
|
//
|
|
// Some remotes simulate rename by server-side copy and delete, so include
|
|
// remotes that implements either Mover or Copier.
|
|
func CanServerSideMove(fdst fs.Fs) bool {
|
|
canMove := fdst.Features().Move != nil
|
|
canCopy := fdst.Features().Copy != nil
|
|
return canMove || canCopy
|
|
}
|
|
|
|
// SuffixName adds the current --suffix to the remote, obeying
|
|
// --suffix-keep-extension if set
|
|
func SuffixName(remote string) string {
|
|
if fs.Config.Suffix == "" {
|
|
return remote
|
|
}
|
|
if fs.Config.SuffixKeepExtension {
|
|
ext := path.Ext(remote)
|
|
base := remote[:len(remote)-len(ext)]
|
|
return base + fs.Config.Suffix + ext
|
|
}
|
|
return remote + fs.Config.Suffix
|
|
}
|
|
|
|
// DeleteFileWithBackupDir deletes a single file respecting --dry-run
|
|
// and accumulating stats and errors.
|
|
//
|
|
// If backupDir is set then it moves the file to there instead of
|
|
// deleting
|
|
func DeleteFileWithBackupDir(dst fs.Object, backupDir fs.Fs) (err error) {
|
|
accounting.Stats.Checking(dst.Remote())
|
|
numDeletes := accounting.Stats.Deletes(1)
|
|
if fs.Config.MaxDelete != -1 && numDeletes > fs.Config.MaxDelete {
|
|
return fserrors.FatalError(errors.New("--max-delete threshold reached"))
|
|
}
|
|
action, actioned, actioning := "delete", "Deleted", "deleting"
|
|
if backupDir != nil {
|
|
action, actioned, actioning = "move into backup dir", "Moved into backup dir", "moving into backup dir"
|
|
}
|
|
if fs.Config.DryRun {
|
|
fs.Logf(dst, "Not %s as --dry-run", actioning)
|
|
} else if backupDir != nil {
|
|
if !SameConfig(dst.Fs(), backupDir) {
|
|
err = errors.New("parameter to --backup-dir has to be on the same remote as destination")
|
|
} else {
|
|
remoteWithSuffix := SuffixName(dst.Remote())
|
|
overwritten, _ := backupDir.NewObject(remoteWithSuffix)
|
|
_, err = Move(backupDir, overwritten, remoteWithSuffix, dst)
|
|
}
|
|
} else {
|
|
err = dst.Remove()
|
|
}
|
|
if err != nil {
|
|
fs.CountError(err)
|
|
fs.Errorf(dst, "Couldn't %s: %v", action, err)
|
|
} else if !fs.Config.DryRun {
|
|
fs.Infof(dst, actioned)
|
|
}
|
|
accounting.Stats.DoneChecking(dst.Remote())
|
|
return err
|
|
}
|
|
|
|
// DeleteFile deletes a single file respecting --dry-run and accumulating stats and errors.
|
|
//
|
|
// If useBackupDir is set and --backup-dir is in effect then it moves
|
|
// the file to there instead of deleting
|
|
func DeleteFile(dst fs.Object) (err error) {
|
|
return DeleteFileWithBackupDir(dst, nil)
|
|
}
|
|
|
|
// DeleteFilesWithBackupDir removes all the files passed in the
|
|
// channel
|
|
//
|
|
// If backupDir is set the files will be placed into that directory
|
|
// instead of being deleted.
|
|
func DeleteFilesWithBackupDir(toBeDeleted fs.ObjectsChan, backupDir fs.Fs) error {
|
|
var wg sync.WaitGroup
|
|
wg.Add(fs.Config.Transfers)
|
|
var errorCount int32
|
|
var fatalErrorCount int32
|
|
|
|
for i := 0; i < fs.Config.Transfers; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
for dst := range toBeDeleted {
|
|
err := DeleteFileWithBackupDir(dst, backupDir)
|
|
if err != nil {
|
|
atomic.AddInt32(&errorCount, 1)
|
|
if fserrors.IsFatalError(err) {
|
|
fs.Errorf(nil, "Got fatal error on delete: %s", err)
|
|
atomic.AddInt32(&fatalErrorCount, 1)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
fs.Infof(nil, "Waiting for deletions to finish")
|
|
wg.Wait()
|
|
if errorCount > 0 {
|
|
err := errors.Errorf("failed to delete %d files", errorCount)
|
|
if fatalErrorCount > 0 {
|
|
return fserrors.FatalError(err)
|
|
}
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DeleteFiles removes all the files passed in the channel
|
|
func DeleteFiles(toBeDeleted fs.ObjectsChan) error {
|
|
return DeleteFilesWithBackupDir(toBeDeleted, nil)
|
|
}
|
|
|
|
// SameRemoteType returns true if fdst and fsrc are the same type
|
|
func SameRemoteType(fdst, fsrc fs.Info) bool {
|
|
return fmt.Sprintf("%T", fdst) == fmt.Sprintf("%T", fsrc)
|
|
}
|
|
|
|
// SameConfig returns true if fdst and fsrc are using the same config
|
|
// file entry
|
|
func SameConfig(fdst, fsrc fs.Info) bool {
|
|
return fdst.Name() == fsrc.Name()
|
|
}
|
|
|
|
// Same returns true if fdst and fsrc point to the same underlying Fs
|
|
func Same(fdst, fsrc fs.Info) bool {
|
|
return SameConfig(fdst, fsrc) && strings.Trim(fdst.Root(), "/") == strings.Trim(fsrc.Root(), "/")
|
|
}
|
|
|
|
// Overlapping returns true if fdst and fsrc point to the same
|
|
// underlying Fs and they overlap.
|
|
func Overlapping(fdst, fsrc fs.Info) bool {
|
|
if !SameConfig(fdst, fsrc) {
|
|
return false
|
|
}
|
|
// Return the Root with a trailing / if not empty
|
|
fixedRoot := func(f fs.Info) string {
|
|
s := strings.Trim(filepath.ToSlash(f.Root()), "/")
|
|
if s != "" {
|
|
s += "/"
|
|
}
|
|
return s
|
|
}
|
|
fdstRoot := fixedRoot(fdst)
|
|
fsrcRoot := fixedRoot(fsrc)
|
|
return strings.HasPrefix(fdstRoot, fsrcRoot) || strings.HasPrefix(fsrcRoot, fdstRoot)
|
|
}
|
|
|
|
// checkIdentical checks to see if dst and src are identical
|
|
//
|
|
// it returns true if differences were found
|
|
// it also returns whether it couldn't be hashed
|
|
func checkIdentical(dst, src fs.Object) (differ bool, noHash bool) {
|
|
same, ht, err := CheckHashes(src, dst)
|
|
if err != nil {
|
|
// CheckHashes will log and count errors
|
|
return true, false
|
|
}
|
|
if ht == hash.None {
|
|
return false, true
|
|
}
|
|
if !same {
|
|
err = errors.Errorf("%v differ", ht)
|
|
fs.Errorf(src, "%v", err)
|
|
fs.CountError(err)
|
|
return true, false
|
|
}
|
|
return false, false
|
|
}
|
|
|
|
// checkFn is the the type of the checking function used in CheckFn()
|
|
type checkFn func(a, b fs.Object) (differ bool, noHash bool)
|
|
|
|
// checkMarch is used to march over two Fses in the same way as
|
|
// sync/copy
|
|
type checkMarch struct {
|
|
fdst, fsrc fs.Fs
|
|
check checkFn
|
|
oneway bool
|
|
differences int32
|
|
noHashes int32
|
|
srcFilesMissing int32
|
|
dstFilesMissing int32
|
|
matches int32
|
|
}
|
|
|
|
// DstOnly have an object which is in the destination only
|
|
func (c *checkMarch) DstOnly(dst fs.DirEntry) (recurse bool) {
|
|
switch dst.(type) {
|
|
case fs.Object:
|
|
if c.oneway {
|
|
return false
|
|
}
|
|
err := errors.Errorf("File not in %v", c.fsrc)
|
|
fs.Errorf(dst, "%v", err)
|
|
fs.CountError(err)
|
|
atomic.AddInt32(&c.differences, 1)
|
|
atomic.AddInt32(&c.srcFilesMissing, 1)
|
|
case fs.Directory:
|
|
// Do the same thing to the entire contents of the directory
|
|
return true
|
|
default:
|
|
panic("Bad object in DirEntries")
|
|
}
|
|
return false
|
|
}
|
|
|
|
// SrcOnly have an object which is in the source only
|
|
func (c *checkMarch) SrcOnly(src fs.DirEntry) (recurse bool) {
|
|
switch src.(type) {
|
|
case fs.Object:
|
|
err := errors.Errorf("File not in %v", c.fdst)
|
|
fs.Errorf(src, "%v", err)
|
|
fs.CountError(err)
|
|
atomic.AddInt32(&c.differences, 1)
|
|
atomic.AddInt32(&c.dstFilesMissing, 1)
|
|
case fs.Directory:
|
|
// Do the same thing to the entire contents of the directory
|
|
return true
|
|
default:
|
|
panic("Bad object in DirEntries")
|
|
}
|
|
return false
|
|
}
|
|
|
|
// check to see if two objects are identical using the check function
|
|
func (c *checkMarch) checkIdentical(dst, src fs.Object) (differ bool, noHash bool) {
|
|
accounting.Stats.Checking(src.Remote())
|
|
defer accounting.Stats.DoneChecking(src.Remote())
|
|
if sizeDiffers(src, dst) {
|
|
err := errors.Errorf("Sizes differ")
|
|
fs.Errorf(src, "%v", err)
|
|
fs.CountError(err)
|
|
return true, false
|
|
}
|
|
if fs.Config.SizeOnly {
|
|
return false, false
|
|
}
|
|
return c.check(dst, src)
|
|
}
|
|
|
|
// Match is called when src and dst are present, so sync src to dst
|
|
func (c *checkMarch) Match(dst, src fs.DirEntry) (recurse bool) {
|
|
switch srcX := src.(type) {
|
|
case fs.Object:
|
|
dstX, ok := dst.(fs.Object)
|
|
if ok {
|
|
differ, noHash := c.checkIdentical(dstX, srcX)
|
|
if differ {
|
|
atomic.AddInt32(&c.differences, 1)
|
|
} else {
|
|
atomic.AddInt32(&c.matches, 1)
|
|
fs.Debugf(dstX, "OK")
|
|
}
|
|
if noHash {
|
|
atomic.AddInt32(&c.noHashes, 1)
|
|
}
|
|
} else {
|
|
err := errors.Errorf("is file on %v but directory on %v", c.fsrc, c.fdst)
|
|
fs.Errorf(src, "%v", err)
|
|
fs.CountError(err)
|
|
atomic.AddInt32(&c.differences, 1)
|
|
atomic.AddInt32(&c.dstFilesMissing, 1)
|
|
}
|
|
case fs.Directory:
|
|
// Do the same thing to the entire contents of the directory
|
|
_, ok := dst.(fs.Directory)
|
|
if ok {
|
|
return true
|
|
}
|
|
err := errors.Errorf("is file on %v but directory on %v", c.fdst, c.fsrc)
|
|
fs.Errorf(dst, "%v", err)
|
|
fs.CountError(err)
|
|
atomic.AddInt32(&c.differences, 1)
|
|
atomic.AddInt32(&c.srcFilesMissing, 1)
|
|
|
|
default:
|
|
panic("Bad object in DirEntries")
|
|
}
|
|
return false
|
|
}
|
|
|
|
// CheckFn checks the files in fsrc and fdst according to Size and
|
|
// hash using checkFunction on each file to check the hashes.
|
|
//
|
|
// checkFunction sees if dst and src are identical
|
|
//
|
|
// it returns true if differences were found
|
|
// it also returns whether it couldn't be hashed
|
|
func CheckFn(fdst, fsrc fs.Fs, check checkFn, oneway bool) error {
|
|
c := &checkMarch{
|
|
fdst: fdst,
|
|
fsrc: fsrc,
|
|
check: check,
|
|
oneway: oneway,
|
|
}
|
|
|
|
// set up a march over fdst and fsrc
|
|
m := &march.March{
|
|
Ctx: context.Background(),
|
|
Fdst: fdst,
|
|
Fsrc: fsrc,
|
|
Dir: "",
|
|
Callback: c,
|
|
}
|
|
fs.Infof(fdst, "Waiting for checks to finish")
|
|
m.Run()
|
|
|
|
if c.dstFilesMissing > 0 {
|
|
fs.Logf(fdst, "%d files missing", c.dstFilesMissing)
|
|
}
|
|
if c.srcFilesMissing > 0 {
|
|
fs.Logf(fsrc, "%d files missing", c.srcFilesMissing)
|
|
}
|
|
|
|
fs.Logf(fdst, "%d differences found", accounting.Stats.GetErrors())
|
|
if c.noHashes > 0 {
|
|
fs.Logf(fdst, "%d hashes could not be checked", c.noHashes)
|
|
}
|
|
if c.matches > 0 {
|
|
fs.Logf(fdst, "%d matching files", c.matches)
|
|
}
|
|
if c.differences > 0 {
|
|
return errors.Errorf("%d differences found", c.differences)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Check the files in fsrc and fdst according to Size and hash
|
|
func Check(fdst, fsrc fs.Fs, oneway bool) error {
|
|
return CheckFn(fdst, fsrc, checkIdentical, oneway)
|
|
}
|
|
|
|
// CheckEqualReaders checks to see if in1 and in2 have the same
|
|
// content when read.
|
|
//
|
|
// it returns true if differences were found
|
|
func CheckEqualReaders(in1, in2 io.Reader) (differ bool, err error) {
|
|
const bufSize = 64 * 1024
|
|
buf1 := make([]byte, bufSize)
|
|
buf2 := make([]byte, bufSize)
|
|
for {
|
|
n1, err1 := readers.ReadFill(in1, buf1)
|
|
n2, err2 := readers.ReadFill(in2, buf2)
|
|
// check errors
|
|
if err1 != nil && err1 != io.EOF {
|
|
return true, err1
|
|
} else if err2 != nil && err2 != io.EOF {
|
|
return true, err2
|
|
}
|
|
// err1 && err2 are nil or io.EOF here
|
|
// process the data
|
|
if n1 != n2 || !bytes.Equal(buf1[:n1], buf2[:n2]) {
|
|
return true, nil
|
|
}
|
|
// if both streams finished the we have finished
|
|
if err1 == io.EOF && err2 == io.EOF {
|
|
break
|
|
}
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
// CheckIdentical checks to see if dst and src are identical by
|
|
// reading all their bytes if necessary.
|
|
//
|
|
// it returns true if differences were found
|
|
func CheckIdentical(dst, src fs.Object) (differ bool, err error) {
|
|
in1, err := dst.Open()
|
|
if err != nil {
|
|
return true, errors.Wrapf(err, "failed to open %q", dst)
|
|
}
|
|
in1 = accounting.NewAccount(in1, dst).WithBuffer() // account and buffer the transfer
|
|
defer fs.CheckClose(in1, &err)
|
|
|
|
in2, err := src.Open()
|
|
if err != nil {
|
|
return true, errors.Wrapf(err, "failed to open %q", src)
|
|
}
|
|
in2 = accounting.NewAccount(in2, src).WithBuffer() // account and buffer the transfer
|
|
defer fs.CheckClose(in2, &err)
|
|
|
|
return CheckEqualReaders(in1, in2)
|
|
}
|
|
|
|
// CheckDownload checks the files in fsrc and fdst according to Size
|
|
// and the actual contents of the files.
|
|
func CheckDownload(fdst, fsrc fs.Fs, oneway bool) error {
|
|
check := func(a, b fs.Object) (differ bool, noHash bool) {
|
|
differ, err := CheckIdentical(a, b)
|
|
if err != nil {
|
|
fs.CountError(err)
|
|
fs.Errorf(a, "Failed to download: %v", err)
|
|
return true, true
|
|
}
|
|
return differ, false
|
|
}
|
|
return CheckFn(fdst, fsrc, check, oneway)
|
|
}
|
|
|
|
// ListFn lists the Fs to the supplied function
|
|
//
|
|
// Lists in parallel which may get them out of order
|
|
func ListFn(f fs.Fs, fn func(fs.Object)) error {
|
|
return walk.ListR(f, "", false, fs.Config.MaxDepth, walk.ListObjects, func(entries fs.DirEntries) error {
|
|
entries.ForObject(fn)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// mutex for synchronized output
|
|
var outMutex sync.Mutex
|
|
|
|
// Synchronized fmt.Fprintf
|
|
//
|
|
// Ignores errors from Fprintf
|
|
func syncFprintf(w io.Writer, format string, a ...interface{}) {
|
|
outMutex.Lock()
|
|
defer outMutex.Unlock()
|
|
_, _ = fmt.Fprintf(w, format, a...)
|
|
}
|
|
|
|
// List the Fs to the supplied writer
|
|
//
|
|
// Shows size and path - obeys includes and excludes
|
|
//
|
|
// Lists in parallel which may get them out of order
|
|
func List(f fs.Fs, w io.Writer) error {
|
|
return ListFn(f, func(o fs.Object) {
|
|
syncFprintf(w, "%9d %s\n", o.Size(), o.Remote())
|
|
})
|
|
}
|
|
|
|
// ListLong lists the Fs to the supplied writer
|
|
//
|
|
// Shows size, mod time and path - obeys includes and excludes
|
|
//
|
|
// Lists in parallel which may get them out of order
|
|
func ListLong(f fs.Fs, w io.Writer) error {
|
|
return ListFn(f, func(o fs.Object) {
|
|
accounting.Stats.Checking(o.Remote())
|
|
modTime := o.ModTime()
|
|
accounting.Stats.DoneChecking(o.Remote())
|
|
syncFprintf(w, "%9d %s %s\n", o.Size(), modTime.Local().Format("2006-01-02 15:04:05.000000000"), o.Remote())
|
|
})
|
|
}
|
|
|
|
// Md5sum list the Fs to the supplied writer
|
|
//
|
|
// Produces the same output as the md5sum command - obeys includes and
|
|
// excludes
|
|
//
|
|
// Lists in parallel which may get them out of order
|
|
func Md5sum(f fs.Fs, w io.Writer) error {
|
|
return HashLister(hash.MD5, f, w)
|
|
}
|
|
|
|
// Sha1sum list the Fs to the supplied writer
|
|
//
|
|
// Obeys includes and excludes
|
|
//
|
|
// Lists in parallel which may get them out of order
|
|
func Sha1sum(f fs.Fs, w io.Writer) error {
|
|
return HashLister(hash.SHA1, f, w)
|
|
}
|
|
|
|
// DropboxHashSum list the Fs to the supplied writer
|
|
//
|
|
// Obeys includes and excludes
|
|
//
|
|
// Lists in parallel which may get them out of order
|
|
func DropboxHashSum(f fs.Fs, w io.Writer) error {
|
|
return HashLister(hash.Dropbox, f, w)
|
|
}
|
|
|
|
// hashSum returns the human readable hash for ht passed in. This may
|
|
// be UNSUPPORTED or ERROR.
|
|
func hashSum(ht hash.Type, o fs.Object) string {
|
|
accounting.Stats.Checking(o.Remote())
|
|
sum, err := o.Hash(ht)
|
|
accounting.Stats.DoneChecking(o.Remote())
|
|
if err == hash.ErrUnsupported {
|
|
sum = "UNSUPPORTED"
|
|
} else if err != nil {
|
|
fs.Debugf(o, "Failed to read %v: %v", ht, err)
|
|
sum = "ERROR"
|
|
}
|
|
return sum
|
|
}
|
|
|
|
// HashLister does a md5sum equivalent for the hash type passed in
|
|
func HashLister(ht hash.Type, f fs.Fs, w io.Writer) error {
|
|
return ListFn(f, func(o fs.Object) {
|
|
sum := hashSum(ht, o)
|
|
syncFprintf(w, "%*s %s\n", hash.Width[ht], sum, o.Remote())
|
|
})
|
|
}
|
|
|
|
// Count counts the objects and their sizes in the Fs
|
|
//
|
|
// Obeys includes and excludes
|
|
func Count(f fs.Fs) (objects int64, size int64, err error) {
|
|
err = ListFn(f, func(o fs.Object) {
|
|
atomic.AddInt64(&objects, 1)
|
|
atomic.AddInt64(&size, o.Size())
|
|
})
|
|
return
|
|
}
|
|
|
|
// ConfigMaxDepth returns the depth to use for a recursive or non recursive listing.
|
|
func ConfigMaxDepth(recursive bool) int {
|
|
depth := fs.Config.MaxDepth
|
|
if !recursive && depth < 0 {
|
|
depth = 1
|
|
}
|
|
return depth
|
|
}
|
|
|
|
// ListDir lists the directories/buckets/containers in the Fs to the supplied writer
|
|
func ListDir(f fs.Fs, w io.Writer) error {
|
|
return walk.ListR(f, "", false, ConfigMaxDepth(false), walk.ListDirs, func(entries fs.DirEntries) error {
|
|
entries.ForDir(func(dir fs.Directory) {
|
|
if dir != nil {
|
|
syncFprintf(w, "%12d %13s %9d %s\n", dir.Size(), dir.ModTime().Local().Format("2006-01-02 15:04:05"), dir.Items(), dir.Remote())
|
|
}
|
|
})
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// Mkdir makes a destination directory or container
|
|
func Mkdir(f fs.Fs, dir string) error {
|
|
if fs.Config.DryRun {
|
|
fs.Logf(fs.LogDirName(f, dir), "Not making directory as dry run is set")
|
|
return nil
|
|
}
|
|
fs.Debugf(fs.LogDirName(f, dir), "Making directory")
|
|
err := f.Mkdir(dir)
|
|
if err != nil {
|
|
fs.CountError(err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TryRmdir removes a container but not if not empty. It doesn't
|
|
// count errors but may return one.
|
|
func TryRmdir(f fs.Fs, dir string) error {
|
|
if fs.Config.DryRun {
|
|
fs.Logf(fs.LogDirName(f, dir), "Not deleting as dry run is set")
|
|
return nil
|
|
}
|
|
fs.Debugf(fs.LogDirName(f, dir), "Removing directory")
|
|
return f.Rmdir(dir)
|
|
}
|
|
|
|
// Rmdir removes a container but not if not empty
|
|
func Rmdir(f fs.Fs, dir string) error {
|
|
err := TryRmdir(f, dir)
|
|
if err != nil {
|
|
fs.CountError(err)
|
|
return err
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Purge removes a directory and all of its contents
|
|
func Purge(f fs.Fs, dir string) error {
|
|
doFallbackPurge := true
|
|
var err error
|
|
if dir == "" {
|
|
// FIXME change the Purge interface so it takes a dir - see #1891
|
|
if doPurge := f.Features().Purge; doPurge != nil {
|
|
doFallbackPurge = false
|
|
if fs.Config.DryRun {
|
|
fs.Logf(f, "Not purging as --dry-run set")
|
|
} else {
|
|
err = doPurge()
|
|
if err == fs.ErrorCantPurge {
|
|
doFallbackPurge = true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if doFallbackPurge {
|
|
// DeleteFiles and Rmdir observe --dry-run
|
|
err = DeleteFiles(listToChan(f, dir))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = Rmdirs(f, dir, false)
|
|
}
|
|
if err != nil {
|
|
fs.CountError(err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Delete removes all the contents of a container. Unlike Purge, it
|
|
// obeys includes and excludes.
|
|
func Delete(f fs.Fs) error {
|
|
delChan := make(fs.ObjectsChan, fs.Config.Transfers)
|
|
delErr := make(chan error, 1)
|
|
go func() {
|
|
delErr <- DeleteFiles(delChan)
|
|
}()
|
|
err := ListFn(f, func(o fs.Object) {
|
|
delChan <- o
|
|
})
|
|
close(delChan)
|
|
delError := <-delErr
|
|
if err == nil {
|
|
err = delError
|
|
}
|
|
return err
|
|
}
|
|
|
|
// listToChan will transfer all objects in the listing to the output
|
|
//
|
|
// If an error occurs, the error will be logged, and it will close the
|
|
// channel.
|
|
//
|
|
// If the error was ErrorDirNotFound then it will be ignored
|
|
func listToChan(f fs.Fs, dir string) fs.ObjectsChan {
|
|
o := make(fs.ObjectsChan, fs.Config.Checkers)
|
|
go func() {
|
|
defer close(o)
|
|
err := walk.ListR(f, dir, true, fs.Config.MaxDepth, walk.ListObjects, func(entries fs.DirEntries) error {
|
|
entries.ForObject(func(obj fs.Object) {
|
|
o <- obj
|
|
})
|
|
return nil
|
|
})
|
|
if err != nil && err != fs.ErrorDirNotFound {
|
|
err = errors.Wrap(err, "failed to list")
|
|
fs.CountError(err)
|
|
fs.Errorf(nil, "%v", err)
|
|
}
|
|
}()
|
|
return o
|
|
}
|
|
|
|
// CleanUp removes the trash for the Fs
|
|
func CleanUp(f fs.Fs) error {
|
|
doCleanUp := f.Features().CleanUp
|
|
if doCleanUp == nil {
|
|
return errors.Errorf("%v doesn't support cleanup", f)
|
|
}
|
|
if fs.Config.DryRun {
|
|
fs.Logf(f, "Not running cleanup as --dry-run set")
|
|
return nil
|
|
}
|
|
return doCleanUp()
|
|
}
|
|
|
|
// wrap a Reader and a Closer together into a ReadCloser
|
|
type readCloser struct {
|
|
io.Reader
|
|
io.Closer
|
|
}
|
|
|
|
// Cat any files to the io.Writer
|
|
//
|
|
// if offset == 0 it will be ignored
|
|
// if offset > 0 then the file will be seeked to that offset
|
|
// if offset < 0 then the file will be seeked that far from the end
|
|
//
|
|
// if count < 0 then it will be ignored
|
|
// if count >= 0 then only that many characters will be output
|
|
func Cat(f fs.Fs, w io.Writer, offset, count int64) error {
|
|
var mu sync.Mutex
|
|
return ListFn(f, func(o fs.Object) {
|
|
var err error
|
|
accounting.Stats.Transferring(o.Remote())
|
|
defer func() {
|
|
accounting.Stats.DoneTransferring(o.Remote(), err == nil)
|
|
}()
|
|
opt := fs.RangeOption{Start: offset, End: -1}
|
|
size := o.Size()
|
|
if opt.Start < 0 {
|
|
opt.Start += size
|
|
}
|
|
if count >= 0 {
|
|
opt.End = opt.Start + count - 1
|
|
}
|
|
var options []fs.OpenOption
|
|
if opt.Start > 0 || opt.End >= 0 {
|
|
options = append(options, &opt)
|
|
}
|
|
in, err := o.Open(options...)
|
|
if err != nil {
|
|
fs.CountError(err)
|
|
fs.Errorf(o, "Failed to open: %v", err)
|
|
return
|
|
}
|
|
if count >= 0 {
|
|
in = &readCloser{Reader: &io.LimitedReader{R: in, N: count}, Closer: in}
|
|
// reduce remaining size to count
|
|
if size > count {
|
|
size = count
|
|
}
|
|
}
|
|
in = accounting.NewAccountSizeName(in, size, o.Remote()).WithBuffer() // account and buffer the transfer
|
|
defer func() {
|
|
err = in.Close()
|
|
if err != nil {
|
|
fs.CountError(err)
|
|
fs.Errorf(o, "Failed to close: %v", err)
|
|
}
|
|
}()
|
|
// take the lock just before we output stuff, so at the last possible moment
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
_, err = io.Copy(w, in)
|
|
if err != nil {
|
|
fs.CountError(err)
|
|
fs.Errorf(o, "Failed to send to output: %v", err)
|
|
}
|
|
})
|
|
}
|
|
|
|
// Rcat reads data from the Reader until EOF and uploads it to a file on remote
|
|
func Rcat(fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time) (dst fs.Object, err error) {
|
|
accounting.Stats.Transferring(dstFileName)
|
|
in = accounting.NewAccountSizeName(in, -1, dstFileName).WithBuffer()
|
|
defer func() {
|
|
accounting.Stats.DoneTransferring(dstFileName, err == nil)
|
|
if otherErr := in.Close(); otherErr != nil {
|
|
fs.Debugf(fdst, "Rcat: failed to close source: %v", err)
|
|
}
|
|
}()
|
|
|
|
hashOption := &fs.HashesOption{Hashes: fdst.Hashes()}
|
|
hash, err := hash.NewMultiHasherTypes(fdst.Hashes())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
readCounter := readers.NewCountingReader(in)
|
|
trackingIn := io.TeeReader(readCounter, hash)
|
|
|
|
compare := func(dst fs.Object) error {
|
|
src := object.NewStaticObjectInfo(dstFileName, modTime, int64(readCounter.BytesRead()), false, hash.Sums(), fdst)
|
|
if !Equal(src, dst) {
|
|
err = errors.Errorf("corrupted on transfer")
|
|
fs.CountError(err)
|
|
fs.Errorf(dst, "%v", err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// check if file small enough for direct upload
|
|
buf := make([]byte, fs.Config.StreamingUploadCutoff)
|
|
if n, err := io.ReadFull(trackingIn, buf); err == io.EOF || err == io.ErrUnexpectedEOF {
|
|
fs.Debugf(fdst, "File to upload is small (%d bytes), uploading instead of streaming", n)
|
|
src := object.NewMemoryObject(dstFileName, modTime, buf[:n])
|
|
return Copy(fdst, nil, dstFileName, src)
|
|
}
|
|
|
|
// Make a new ReadCloser with the bits we've already read
|
|
in = &readCloser{
|
|
Reader: io.MultiReader(bytes.NewReader(buf), trackingIn),
|
|
Closer: in,
|
|
}
|
|
|
|
fStreamTo := fdst
|
|
canStream := fdst.Features().PutStream != nil
|
|
if !canStream {
|
|
fs.Debugf(fdst, "Target remote doesn't support streaming uploads, creating temporary local FS to spool file")
|
|
tmpLocalFs, err := fs.TemporaryLocalFs()
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "Failed to create temporary local FS to spool file")
|
|
}
|
|
defer func() {
|
|
err := Purge(tmpLocalFs, "")
|
|
if err != nil {
|
|
fs.Infof(tmpLocalFs, "Failed to cleanup temporary FS: %v", err)
|
|
}
|
|
}()
|
|
fStreamTo = tmpLocalFs
|
|
}
|
|
|
|
if fs.Config.DryRun {
|
|
fs.Logf("stdin", "Not uploading as --dry-run")
|
|
// prevents "broken pipe" errors
|
|
_, err = io.Copy(ioutil.Discard, in)
|
|
return nil, err
|
|
}
|
|
|
|
objInfo := object.NewStaticObjectInfo(dstFileName, modTime, -1, false, nil, nil)
|
|
if dst, err = fStreamTo.Features().PutStream(in, objInfo, hashOption); err != nil {
|
|
return dst, err
|
|
}
|
|
if err = compare(dst); err != nil {
|
|
return dst, err
|
|
}
|
|
if !canStream {
|
|
// copy dst (which is the local object we have just streamed to) to the remote
|
|
return Copy(fdst, nil, dstFileName, dst)
|
|
}
|
|
return dst, nil
|
|
}
|
|
|
|
// PublicLink adds a "readable by anyone with link" permission on the given file or folder.
|
|
func PublicLink(f fs.Fs, remote string) (string, error) {
|
|
doPublicLink := f.Features().PublicLink
|
|
if doPublicLink == nil {
|
|
return "", errors.Errorf("%v doesn't support public links", f)
|
|
}
|
|
return doPublicLink(remote)
|
|
}
|
|
|
|
// Rmdirs removes any empty directories (or directories only
|
|
// containing empty directories) under f, including f.
|
|
func Rmdirs(f fs.Fs, dir string, leaveRoot bool) error {
|
|
dirEmpty := make(map[string]bool)
|
|
dirEmpty[dir] = !leaveRoot
|
|
err := walk.Walk(f, dir, true, fs.Config.MaxDepth, func(dirPath string, entries fs.DirEntries, err error) error {
|
|
if err != nil {
|
|
fs.CountError(err)
|
|
fs.Errorf(f, "Failed to list %q: %v", dirPath, err)
|
|
return nil
|
|
}
|
|
for _, entry := range entries {
|
|
switch x := entry.(type) {
|
|
case fs.Directory:
|
|
// add a new directory as empty
|
|
dir := x.Remote()
|
|
_, found := dirEmpty[dir]
|
|
if !found {
|
|
dirEmpty[dir] = true
|
|
}
|
|
case fs.Object:
|
|
// mark the parents of the file as being non-empty
|
|
dir := x.Remote()
|
|
for dir != "" {
|
|
dir = path.Dir(dir)
|
|
if dir == "." || dir == "/" {
|
|
dir = ""
|
|
}
|
|
empty, found := dirEmpty[dir]
|
|
// End if we reach a directory which is non-empty
|
|
if found && !empty {
|
|
break
|
|
}
|
|
dirEmpty[dir] = false
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to rmdirs")
|
|
}
|
|
// Now delete the empty directories, starting from the longest path
|
|
var toDelete []string
|
|
for dir, empty := range dirEmpty {
|
|
if empty {
|
|
toDelete = append(toDelete, dir)
|
|
}
|
|
}
|
|
sort.Strings(toDelete)
|
|
for i := len(toDelete) - 1; i >= 0; i-- {
|
|
dir := toDelete[i]
|
|
err := TryRmdir(f, dir)
|
|
if err != nil {
|
|
fs.CountError(err)
|
|
fs.Errorf(dir, "Failed to rmdir: %v", err)
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// NeedTransfer checks to see if src needs to be copied to dst using
|
|
// the current config.
|
|
//
|
|
// Returns a flag which indicates whether the file needs to be
|
|
// transferred or not.
|
|
func NeedTransfer(dst, src fs.Object) bool {
|
|
if dst == nil {
|
|
fs.Debugf(src, "Couldn't find file - need to transfer")
|
|
return true
|
|
}
|
|
// If we should ignore existing files, don't transfer
|
|
if fs.Config.IgnoreExisting {
|
|
fs.Debugf(src, "Destination exists, skipping")
|
|
return false
|
|
}
|
|
// If we should upload unconditionally
|
|
if fs.Config.IgnoreTimes {
|
|
fs.Debugf(src, "Transferring unconditionally as --ignore-times is in use")
|
|
return true
|
|
}
|
|
// If UpdateOlder is in effect, skip if dst is newer than src
|
|
if fs.Config.UpdateOlder {
|
|
srcModTime := src.ModTime()
|
|
dstModTime := dst.ModTime()
|
|
dt := dstModTime.Sub(srcModTime)
|
|
// If have a mutually agreed precision then use that
|
|
modifyWindow := fs.GetModifyWindow(dst.Fs(), src.Fs())
|
|
if modifyWindow == fs.ModTimeNotSupported {
|
|
// Otherwise use 1 second as a safe default as
|
|
// the resolution of the time a file was
|
|
// uploaded.
|
|
modifyWindow = time.Second
|
|
}
|
|
switch {
|
|
case dt >= modifyWindow:
|
|
fs.Debugf(src, "Destination is newer than source, skipping")
|
|
return false
|
|
case dt <= -modifyWindow:
|
|
fs.Debugf(src, "Destination is older than source, transferring")
|
|
default:
|
|
if src.Size() == dst.Size() {
|
|
fs.Debugf(src, "Destination mod time is within %v of source and sizes identical, skipping", modifyWindow)
|
|
return false
|
|
}
|
|
fs.Debugf(src, "Destination mod time is within %v of source but sizes differ, transferring", modifyWindow)
|
|
}
|
|
} else {
|
|
// Check to see if changed or not
|
|
if Equal(src, dst) {
|
|
fs.Debugf(src, "Unchanged skipping")
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// RcatSize reads data from the Reader until EOF and uploads it to a file on remote.
|
|
// Pass in size >=0 if known, <0 if not known
|
|
func RcatSize(fdst fs.Fs, dstFileName string, in io.ReadCloser, size int64, modTime time.Time) (dst fs.Object, err error) {
|
|
var obj fs.Object
|
|
|
|
if size >= 0 {
|
|
// Size known use Put
|
|
accounting.Stats.Transferring(dstFileName)
|
|
body := ioutil.NopCloser(in) // we let the server close the body
|
|
in := accounting.NewAccountSizeName(body, size, dstFileName) // account the transfer (no buffering)
|
|
|
|
if fs.Config.DryRun {
|
|
fs.Logf("stdin", "Not uploading as --dry-run")
|
|
// prevents "broken pipe" errors
|
|
_, err = io.Copy(ioutil.Discard, in)
|
|
return nil, err
|
|
}
|
|
|
|
var err error
|
|
defer func() {
|
|
closeErr := in.Close()
|
|
if closeErr != nil {
|
|
accounting.Stats.Error(closeErr)
|
|
fs.Errorf(dstFileName, "Post request: close failed: %v", closeErr)
|
|
}
|
|
accounting.Stats.DoneTransferring(dstFileName, err == nil)
|
|
}()
|
|
info := object.NewStaticObjectInfo(dstFileName, modTime, size, true, nil, fdst)
|
|
obj, err = fdst.Put(in, info)
|
|
if err != nil {
|
|
fs.Errorf(dstFileName, "Post request put error: %v", err)
|
|
|
|
return nil, err
|
|
}
|
|
} else {
|
|
// Size unknown use Rcat
|
|
obj, err = Rcat(fdst, dstFileName, in, modTime)
|
|
if err != nil {
|
|
fs.Errorf(dstFileName, "Post request rcat error: %v", err)
|
|
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return obj, nil
|
|
}
|
|
|
|
// CopyURL copies the data from the url to (fdst, dstFileName)
|
|
func CopyURL(fdst fs.Fs, dstFileName string, url string) (dst fs.Object, err error) {
|
|
client := fshttp.NewClient(fs.Config)
|
|
resp, err := client.Get(url)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer fs.CheckClose(resp.Body, &err)
|
|
return RcatSize(fdst, dstFileName, resp.Body, resp.ContentLength, time.Now())
|
|
}
|
|
|
|
// moveOrCopyFile moves or copies a single file possibly to a new name
|
|
func moveOrCopyFile(fdst fs.Fs, fsrc fs.Fs, dstFileName string, srcFileName string, cp bool) (err error) {
|
|
dstFilePath := path.Join(fdst.Root(), dstFileName)
|
|
srcFilePath := path.Join(fsrc.Root(), srcFileName)
|
|
if fdst.Name() == fsrc.Name() && dstFilePath == srcFilePath {
|
|
fs.Debugf(fdst, "don't need to copy/move %s, it is already at target location", dstFileName)
|
|
return nil
|
|
}
|
|
|
|
// Choose operations
|
|
Op := Move
|
|
if cp {
|
|
Op = Copy
|
|
}
|
|
|
|
// Find src object
|
|
srcObj, err := fsrc.NewObject(srcFileName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Find dst object if it exists
|
|
dstObj, err := fdst.NewObject(dstFileName)
|
|
if err == fs.ErrorObjectNotFound {
|
|
dstObj = nil
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
|
|
if NeedTransfer(dstObj, srcObj) {
|
|
// If destination already exists, then we must move it into --backup-dir if required
|
|
if dstObj != nil && fs.Config.BackupDir != "" {
|
|
backupDir, err := cache.Get(fs.Config.BackupDir)
|
|
if err != nil {
|
|
return errors.Wrap(err, "creating Fs for --backup-dir failed")
|
|
}
|
|
remoteWithSuffix := SuffixName(dstObj.Remote())
|
|
overwritten, _ := backupDir.NewObject(remoteWithSuffix)
|
|
_, err = Move(backupDir, overwritten, remoteWithSuffix, dstObj)
|
|
if err != nil {
|
|
return errors.Wrap(err, "moving to --backup-dir failed")
|
|
}
|
|
// If successful zero out the dstObj as it is no longer there
|
|
dstObj = nil
|
|
}
|
|
|
|
_, err = Op(fdst, dstObj, dstFileName, srcObj)
|
|
} else {
|
|
accounting.Stats.Checking(srcFileName)
|
|
if !cp {
|
|
err = DeleteFile(srcObj)
|
|
}
|
|
defer accounting.Stats.DoneChecking(srcFileName)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// MoveFile moves a single file possibly to a new name
|
|
func MoveFile(fdst fs.Fs, fsrc fs.Fs, dstFileName string, srcFileName string) (err error) {
|
|
return moveOrCopyFile(fdst, fsrc, dstFileName, srcFileName, false)
|
|
}
|
|
|
|
// CopyFile moves a single file possibly to a new name
|
|
func CopyFile(fdst fs.Fs, fsrc fs.Fs, dstFileName string, srcFileName string) (err error) {
|
|
return moveOrCopyFile(fdst, fsrc, dstFileName, srcFileName, true)
|
|
}
|
|
|
|
// SetTier changes tier of object in remote
|
|
func SetTier(fsrc fs.Fs, tier string) error {
|
|
return ListFn(fsrc, func(o fs.Object) {
|
|
objImpl, ok := o.(fs.SetTierer)
|
|
if !ok {
|
|
fs.Errorf(fsrc, "Remote object does not implement SetTier")
|
|
return
|
|
}
|
|
err := objImpl.SetTier(tier)
|
|
if err != nil {
|
|
fs.Errorf(fsrc, "Failed to do SetTier, %v", err)
|
|
}
|
|
})
|
|
}
|
|
|
|
// ListFormat defines files information print format
|
|
type ListFormat struct {
|
|
separator string
|
|
dirSlash bool
|
|
absolute bool
|
|
output []func(entry *ListJSONItem) string
|
|
csv *csv.Writer
|
|
buf bytes.Buffer
|
|
}
|
|
|
|
// SetSeparator changes separator in struct
|
|
func (l *ListFormat) SetSeparator(separator string) {
|
|
l.separator = separator
|
|
}
|
|
|
|
// SetDirSlash defines if slash should be printed
|
|
func (l *ListFormat) SetDirSlash(dirSlash bool) {
|
|
l.dirSlash = dirSlash
|
|
}
|
|
|
|
// SetAbsolute prints a leading slash in front of path names
|
|
func (l *ListFormat) SetAbsolute(absolute bool) {
|
|
l.absolute = absolute
|
|
}
|
|
|
|
// SetCSV defines if the output should be csv
|
|
//
|
|
// Note that you should call SetSeparator before this if you want a
|
|
// custom separator
|
|
func (l *ListFormat) SetCSV(useCSV bool) {
|
|
if useCSV {
|
|
l.csv = csv.NewWriter(&l.buf)
|
|
if l.separator != "" {
|
|
l.csv.Comma = []rune(l.separator)[0]
|
|
}
|
|
} else {
|
|
l.csv = nil
|
|
}
|
|
}
|
|
|
|
// SetOutput sets functions used to create files information
|
|
func (l *ListFormat) SetOutput(output []func(entry *ListJSONItem) string) {
|
|
l.output = output
|
|
}
|
|
|
|
// AddModTime adds file's Mod Time to output
|
|
func (l *ListFormat) AddModTime() {
|
|
l.AppendOutput(func(entry *ListJSONItem) string {
|
|
return entry.ModTime.When.Local().Format("2006-01-02 15:04:05")
|
|
})
|
|
}
|
|
|
|
// AddSize adds file's size to output
|
|
func (l *ListFormat) AddSize() {
|
|
l.AppendOutput(func(entry *ListJSONItem) string {
|
|
return strconv.FormatInt(entry.Size, 10)
|
|
})
|
|
}
|
|
|
|
// normalisePath makes sure the path has the correct slashes for the current mode
|
|
func (l *ListFormat) normalisePath(entry *ListJSONItem, remote string) string {
|
|
if l.absolute && !strings.HasPrefix(remote, "/") {
|
|
remote = "/" + remote
|
|
}
|
|
if entry.IsDir && l.dirSlash {
|
|
remote += "/"
|
|
}
|
|
return remote
|
|
}
|
|
|
|
// AddPath adds path to file to output
|
|
func (l *ListFormat) AddPath() {
|
|
l.AppendOutput(func(entry *ListJSONItem) string {
|
|
return l.normalisePath(entry, entry.Path)
|
|
})
|
|
}
|
|
|
|
// AddEncrypted adds the encrypted path to file to output
|
|
func (l *ListFormat) AddEncrypted() {
|
|
l.AppendOutput(func(entry *ListJSONItem) string {
|
|
return l.normalisePath(entry, entry.Encrypted)
|
|
})
|
|
}
|
|
|
|
// AddHash adds the hash of the type given to the output
|
|
func (l *ListFormat) AddHash(ht hash.Type) {
|
|
hashName := ht.String()
|
|
l.AppendOutput(func(entry *ListJSONItem) string {
|
|
if entry.IsDir {
|
|
return ""
|
|
}
|
|
return entry.Hashes[hashName]
|
|
})
|
|
}
|
|
|
|
// AddID adds file's ID to the output if known
|
|
func (l *ListFormat) AddID() {
|
|
l.AppendOutput(func(entry *ListJSONItem) string {
|
|
return entry.ID
|
|
})
|
|
}
|
|
|
|
// AddOrigID adds file's Original ID to the output if known
|
|
func (l *ListFormat) AddOrigID() {
|
|
l.AppendOutput(func(entry *ListJSONItem) string {
|
|
return entry.OrigID
|
|
})
|
|
}
|
|
|
|
// AddTier adds file's Tier to the output if known
|
|
func (l *ListFormat) AddTier() {
|
|
l.AppendOutput(func(entry *ListJSONItem) string {
|
|
return entry.Tier
|
|
})
|
|
}
|
|
|
|
// AddMimeType adds file's MimeType to the output if known
|
|
func (l *ListFormat) AddMimeType() {
|
|
l.AppendOutput(func(entry *ListJSONItem) string {
|
|
return entry.MimeType
|
|
})
|
|
}
|
|
|
|
// AppendOutput adds string generated by specific function to printed output
|
|
func (l *ListFormat) AppendOutput(functionToAppend func(item *ListJSONItem) string) {
|
|
l.output = append(l.output, functionToAppend)
|
|
}
|
|
|
|
// Format prints information about the DirEntry in the format defined
|
|
func (l *ListFormat) Format(entry *ListJSONItem) (result string) {
|
|
var out []string
|
|
for _, fun := range l.output {
|
|
out = append(out, fun(entry))
|
|
}
|
|
if l.csv != nil {
|
|
l.buf.Reset()
|
|
_ = l.csv.Write(out) // can't fail writing to bytes.Buffer
|
|
l.csv.Flush()
|
|
result = strings.TrimRight(l.buf.String(), "\n")
|
|
} else {
|
|
result = strings.Join(out, l.separator)
|
|
}
|
|
return result
|
|
}
|
|
|
|
// DirMove renames srcRemote to dstRemote
|
|
//
|
|
// It does this by loading the directory tree into memory (using ListR
|
|
// if available) and doing renames in parallel.
|
|
func DirMove(f fs.Fs, srcRemote, dstRemote string) (err error) {
|
|
// Use DirMove if possible
|
|
if doDirMove := f.Features().DirMove; doDirMove != nil {
|
|
return doDirMove(f, srcRemote, dstRemote)
|
|
}
|
|
|
|
// Load the directory tree into memory
|
|
tree, err := walk.NewDirTree(f, srcRemote, true, -1)
|
|
if err != nil {
|
|
return errors.Wrap(err, "RenameDir tree walk")
|
|
}
|
|
|
|
// Get the directories in sorted order
|
|
dirs := tree.Dirs()
|
|
|
|
// Make the destination directories - must be done in order not in parallel
|
|
for _, dir := range dirs {
|
|
dstPath := dstRemote + dir[len(srcRemote):]
|
|
err := f.Mkdir(dstPath)
|
|
if err != nil {
|
|
return errors.Wrap(err, "RenameDir mkdir")
|
|
}
|
|
}
|
|
|
|
// Rename the files in parallel
|
|
type rename struct {
|
|
o fs.Object
|
|
newPath string
|
|
}
|
|
renames := make(chan rename, fs.Config.Transfers)
|
|
g, ctx := errgroup.WithContext(context.Background())
|
|
for i := 0; i < fs.Config.Transfers; i++ {
|
|
g.Go(func() error {
|
|
for job := range renames {
|
|
dstOverwritten, _ := f.NewObject(job.newPath)
|
|
_, err := Move(f, dstOverwritten, job.newPath, job.o)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
for dir, entries := range tree {
|
|
dstPath := dstRemote + dir[len(srcRemote):]
|
|
for _, entry := range entries {
|
|
if o, ok := entry.(fs.Object); ok {
|
|
renames <- rename{o, path.Join(dstPath, path.Base(o.Remote()))}
|
|
}
|
|
}
|
|
}
|
|
close(renames)
|
|
err = g.Wait()
|
|
if err != nil {
|
|
return errors.Wrap(err, "RenameDir renames")
|
|
}
|
|
|
|
// Remove the source directories in reverse order
|
|
for i := len(dirs) - 1; i >= 0; i-- {
|
|
err := f.Rmdir(dirs[i])
|
|
if err != nil {
|
|
return errors.Wrap(err, "RenameDir rmdir")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|