From 6f1766dd9e74fffbbe52301f898c62ebaaff8e69 Mon Sep 17 00:00:00 2001 From: Shing Kit Chan Date: Thu, 31 Oct 2019 03:23:17 +0800 Subject: [PATCH] fs: Add support for --max-transfer-cutoff modes #2672 This also adds max transfer cut off check for server side copies too --- docs/content/docs.md | 14 +++++++ fs/accounting/accounting.go | 5 ++- fs/accounting/accounting_test.go | 17 +++++++++ fs/accounting/stats.go | 16 ++++++++ fs/config.go | 1 + fs/config/configflags/configflags.go | 38 ++++++++++++++----- fs/maxtransfermode.go | 12 ++++++ fs/operations/operations.go | 7 +++- fs/operations/operations_test.go | 56 ++++++++++++++++++++++++++++ 9 files changed, 155 insertions(+), 11 deletions(-) create mode 100644 fs/maxtransfermode.go diff --git a/docs/content/docs.md b/docs/content/docs.md index 70622bc3f..b9f3d626b 100644 --- a/docs/content/docs.md +++ b/docs/content/docs.md @@ -736,6 +736,20 @@ When the limit is reached all transfers will stop immediately. Rclone will exit with exit code 8 if the transfer limit is reached. +### --max-transfer-(hard,soft,cautious) ### + +This modifies the behavior of `--max-transfer` +Defaults to `--max-transfer-hard`. + +Specifiying `--max-transfer-hard` will stop transferring immediately +when Rclone reaches the limit. + +Specifiying `--max-transfer-soft` will stop starting new transfers +when Rclone reaches the limit. + +Specifiying `--max-transfer-cautious` will try to prevent Rclone +from reaching the limit. + ### --modify-window=TIME ### When checking whether a file has been modified, this is the maximum diff --git a/fs/accounting/accounting.go b/fs/accounting/accounting.go index dd9a5145f..dc34a7c94 100644 --- a/fs/accounting/accounting.go +++ b/fs/accounting/accounting.go @@ -61,7 +61,10 @@ func newAccountSizeName(stats *StatsInfo, in io.ReadCloser, size int64, name str exit: make(chan struct{}), avg: 0, lpTime: time.Now(), - max: int64(fs.Config.MaxTransfer), + max: -1, + } + if fs.Config.MaxTransferMode != fs.MaxTransferModeSoft { + acc.max = int64((fs.Config.MaxTransfer)) } go acc.averageLoop() stats.inProgress.set(acc.name, acc) diff --git a/fs/accounting/accounting_test.go b/fs/accounting/accounting_test.go index 3f365e493..b3b702503 100644 --- a/fs/accounting/accounting_test.go +++ b/fs/accounting/accounting_test.go @@ -197,9 +197,12 @@ func TestAccountAccounter(t *testing.T) { func TestAccountMaxTransfer(t *testing.T) { old := fs.Config.MaxTransfer + oldMode := fs.Config.MaxTransferMode + fs.Config.MaxTransfer = 15 defer func() { fs.Config.MaxTransfer = old + fs.Config.MaxTransferMode = oldMode }() in := ioutil.NopCloser(bytes.NewBuffer(make([]byte, 100))) @@ -218,6 +221,20 @@ func TestAccountMaxTransfer(t *testing.T) { assert.Equal(t, 0, n) assert.Equal(t, ErrorMaxTransferLimitReached, err) assert.True(t, fserrors.IsFatalError(err)) + + fs.Config.MaxTransferMode = fs.MaxTransferModeSoft + stats = NewStats() + acc = newAccountSizeName(stats, in, 1, "test") + + n, err = acc.Read(b) + assert.Equal(t, 10, n) + assert.NoError(t, err) + n, err = acc.Read(b) + assert.Equal(t, 10, n) + assert.NoError(t, err) + n, err = acc.Read(b) + assert.Equal(t, 10, n) + assert.NoError(t, err) } func TestShortenName(t *testing.T) { diff --git a/fs/accounting/stats.go b/fs/accounting/stats.go index 993afdcba..a61b88886 100644 --- a/fs/accounting/stats.go +++ b/fs/accounting/stats.go @@ -382,6 +382,22 @@ func (s *StatsInfo) GetBytes() int64 { return s.bytes } +// GetBytesWithPending returns the number of bytes transferred and remaining transfers +func (s *StatsInfo) GetBytesWithPending() int64 { + s.mu.RLock() + defer s.mu.RUnlock() + pending := int64(0) + for _, tr := range s.startedTransfers { + if tr.acc != nil { + bytes, size := tr.acc.progress() + if bytes < size { + pending += size - bytes + } + } + } + return s.bytes + pending +} + // Errors updates the stats for errors func (s *StatsInfo) Errors(errors int64) { s.mu.Lock() diff --git a/fs/config.go b/fs/config.go index 661cf405a..c7396b7bd 100644 --- a/fs/config.go +++ b/fs/config.go @@ -93,6 +93,7 @@ type ConfigInfo struct { UseServerModTime bool MaxTransfer SizeSuffix MaxDuration time.Duration + MaxTransferMode MaxTransferMode MaxBacklog int MaxStatsGroups int StatsOneLine bool diff --git a/fs/config/configflags/configflags.go b/fs/config/configflags/configflags.go index a70777a0a..454c1604b 100644 --- a/fs/config/configflags/configflags.go +++ b/fs/config/configflags/configflags.go @@ -20,15 +20,18 @@ import ( var ( // these will get interpreted into fs.Config via SetFlags() below - verbose int - quiet bool - dumpHeaders bool - dumpBodies bool - deleteBefore bool - deleteDuring bool - deleteAfter bool - bindAddr string - disableFeatures string + verbose int + quiet bool + dumpHeaders bool + dumpBodies bool + deleteBefore bool + deleteDuring bool + deleteAfter bool + bindAddr string + disableFeatures string + maxTransferHard bool + maxTransferSoft bool + maxTransferCautious bool ) // AddFlags adds the non filing system specific flags to the command @@ -94,6 +97,9 @@ func AddFlags(flagSet *pflag.FlagSet) { flags.FVarP(flagSet, &fs.Config.Dump, "dump", "", "List of items to dump from: "+fs.DumpFlagsList) flags.FVarP(flagSet, &fs.Config.MaxTransfer, "max-transfer", "", "Maximum size of data to transfer.") flags.DurationVarP(flagSet, &fs.Config.MaxDuration, "max-duration", "", 0, "Maximum duration rclone will transfer data for.") + flags.BoolVarP(flagSet, &maxTransferHard, "max-transfer-hard", "", false, "When transferring, stop immediately when --max-transfer is reached") + flags.BoolVarP(flagSet, &maxTransferSoft, "max-transfer-soft", "", false, "When transferring, stop starting new transfers when --max-transfer is reached") + flags.BoolVarP(flagSet, &maxTransferCautious, "max-transfer-cautious", "", false, "When transferring, try to avoid reaching --max-transfer") flags.IntVarP(flagSet, &fs.Config.MaxBacklog, "max-backlog", "", fs.Config.MaxBacklog, "Maximum number of objects in sync or check backlog.") flags.IntVarP(flagSet, &fs.Config.MaxStatsGroups, "max-stats-groups", "", fs.Config.MaxStatsGroups, "Maximum number of stats groups to keep in memory. On max oldest is discarded.") flags.BoolVarP(flagSet, &fs.Config.StatsOneLine, "stats-one-line", "", fs.Config.StatsOneLine, "Make the stats fit on one line.") @@ -178,6 +184,20 @@ func SetFlags() { fs.Config.DeleteMode = fs.DeleteModeDefault } + switch { + case maxTransferHard && (maxTransferSoft || maxTransferCautious), + maxTransferSoft && maxTransferCautious: + log.Fatalf(`Only one of --max-trans fer-hard, --max-transfer-soft or --max-transfer-cautious can be used.`) + case maxTransferHard: + fs.Config.MaxTransferMode = fs.MaxTransferModeHard + case maxTransferSoft: + fs.Config.MaxTransferMode = fs.MaxTransferModeSoft + case maxTransferCautious: + fs.Config.MaxTransferMode = fs.MaxTransferModeCautious + default: + fs.Config.MaxTransferMode = fs.MaxTransferModeDefault + } + if fs.Config.CompareDest != "" && fs.Config.CopyDest != "" { log.Fatalf(`Can't use --compare-dest with --copy-dest.`) } diff --git a/fs/maxtransfermode.go b/fs/maxtransfermode.go new file mode 100644 index 000000000..7d38e07b5 --- /dev/null +++ b/fs/maxtransfermode.go @@ -0,0 +1,12 @@ +package fs + +// MaxTransferMode describes the possible delete modes in the config +type MaxTransferMode byte + +// MaxTransferMode constants +const ( + MaxTransferModeHard MaxTransferMode = iota + MaxTransferModeSoft + MaxTransferModeCautious + MaxTransferModeDefault = MaxTransferModeHard +) diff --git a/fs/operations/operations.go b/fs/operations/operations.go index c04b5ab77..e4702fcc9 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -364,7 +364,8 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj actionTaken = "Copied (server side copy)" if doCopy := f.Features().Copy; doCopy != nil && (SameConfig(src.Fs(), f) || (SameRemoteType(src.Fs(), f) && f.Features().ServerSideAcrossConfigs)) { // Check transfer limit for server side copies - if fs.Config.MaxTransfer >= 0 && accounting.Stats(ctx).GetBytes() >= int64(fs.Config.MaxTransfer) { + if fs.Config.MaxTransfer >= 0 && (accounting.Stats(ctx).GetBytes() >= int64(fs.Config.MaxTransfer) || + (fs.Config.MaxTransferMode == fs.MaxTransferModeCautious && accounting.Stats(ctx).GetBytesWithPending()+src.Size() >= int64(fs.Config.MaxTransfer))) { return nil, accounting.ErrorMaxTransferLimitReached } in := tr.Account(nil) // account the transfer @@ -385,6 +386,10 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj } // If can't server side copy, do it manually if err == fs.ErrorCantCopy { + if fs.Config.MaxTransfer >= 0 && (accounting.Stats(ctx).GetBytes() >= int64(fs.Config.MaxTransfer) || + (fs.Config.MaxTransferMode == fs.MaxTransferModeCautious && accounting.Stats(ctx).GetBytesWithPending()+src.Size() >= int64(fs.Config.MaxTransfer))) { + return nil, accounting.ErrorMaxTransferLimitReached + } if doMultiThreadCopy(f, src) { // Number of streams proportional to size streams := src.Size() / int64(fs.Config.MultiThreadCutoff) diff --git a/fs/operations/operations_test.go b/fs/operations/operations_test.go index ddefac822..539fdc564 100644 --- a/fs/operations/operations_test.go +++ b/fs/operations/operations_test.go @@ -39,6 +39,7 @@ import ( "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/fs/filter" + "github.com/rclone/rclone/fs/fserrors" "github.com/rclone/rclone/fs/fshttp" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/operations" @@ -1527,3 +1528,58 @@ func TestRcatSize(t *testing.T) { // Check files exist fstest.CheckItems(t, r.Fremote, file1, file2) } + +func TestCopyFileMaxTransfer(t *testing.T) { + r := fstest.NewRun(t) + defer r.Finalise() + old := fs.Config.MaxTransfer + oldMode := fs.Config.MaxTransferMode + + defer func() { + fs.Config.MaxTransfer = old + fs.Config.MaxTransferMode = oldMode + accounting.Stats(context.Background()).ResetCounters() + }() + + file1 := r.WriteFile("file1", "file1 contents", t1) + file2 := r.WriteFile("file2", "file2 contents...........", t2) + + rfile1 := file1 + rfile1.Path = "sub/file1" + rfile2 := file2 + rfile2.Path = "sub/file2" + + fs.Config.MaxTransfer = 15 + fs.Config.MaxTransferMode = fs.MaxTransferModeHard + accounting.Stats(context.Background()).ResetCounters() + + err := operations.CopyFile(context.Background(), r.Fremote, r.Flocal, rfile1.Path, file1.Path) + require.NoError(t, err) + fstest.CheckItems(t, r.Flocal, file1, file2) + fstest.CheckItems(t, r.Fremote, rfile1) + + accounting.Stats(context.Background()).ResetCounters() + + err = operations.CopyFile(context.Background(), r.Fremote, r.Flocal, rfile2.Path, file2.Path) + fstest.CheckItems(t, r.Flocal, file1, file2) + fstest.CheckItems(t, r.Fremote, rfile1) + assert.Equal(t, accounting.ErrorMaxTransferLimitReached, err) + assert.True(t, fserrors.IsFatalError(err)) + + fs.Config.MaxTransferMode = fs.MaxTransferModeCautious + accounting.Stats(context.Background()).ResetCounters() + + err = operations.CopyFile(context.Background(), r.Fremote, r.Flocal, rfile2.Path, file2.Path) + fstest.CheckItems(t, r.Flocal, file1, file2) + fstest.CheckItems(t, r.Fremote, rfile1) + assert.Equal(t, accounting.ErrorMaxTransferLimitReached, err) + assert.True(t, fserrors.IsFatalError(err)) + + fs.Config.MaxTransferMode = fs.MaxTransferModeSoft + accounting.Stats(context.Background()).ResetCounters() + + err = operations.CopyFile(context.Background(), r.Fremote, r.Flocal, rfile2.Path, file2.Path) + require.NoError(t, err) + fstest.CheckItems(t, r.Flocal, file1, file2) + fstest.CheckItems(t, r.Fremote, rfile1, rfile2) +}