diff --git a/docs/content/docs.md b/docs/content/docs.md index a38b6eccf..8c9312010 100644 --- a/docs/content/docs.md +++ b/docs/content/docs.md @@ -716,6 +716,17 @@ files not recursed through are considered excluded and will be deleted on the destination. Test first with `--dry-run` if you are not sure what will happen. +### --max-duration=TIME ### + +Rclone will stop scheduling new transfers when it has run for the +duration specified. + +Defaults to off. + +When the limit is reached any existing transfers will complete. + +Rclone won't exit with an error if the transfer limit is reached. + ### --max-transfer=SIZE ### Rclone will stop transferring when it has reached the size specified. diff --git a/fs/config.go b/fs/config.go index cb1932241..661cf405a 100644 --- a/fs/config.go +++ b/fs/config.go @@ -92,6 +92,7 @@ type ConfigInfo struct { PasswordCommand SpaceSepList UseServerModTime bool MaxTransfer SizeSuffix + MaxDuration time.Duration MaxBacklog int MaxStatsGroups int StatsOneLine bool diff --git a/fs/config/configflags/configflags.go b/fs/config/configflags/configflags.go index e76ff1935..a70777a0a 100644 --- a/fs/config/configflags/configflags.go +++ b/fs/config/configflags/configflags.go @@ -93,6 +93,7 @@ func AddFlags(flagSet *pflag.FlagSet) { flags.FVarP(flagSet, &fs.Config.StreamingUploadCutoff, "streaming-upload-cutoff", "", "Cutoff for switching to chunked upload if file size is unknown. Upload starts after reaching cutoff or when file ends.") flags.FVarP(flagSet, &fs.Config.Dump, "dump", "", "List of items to dump from: "+fs.DumpFlagsList) flags.FVarP(flagSet, &fs.Config.MaxTransfer, "max-transfer", "", "Maximum size of data to transfer.") + flags.DurationVarP(flagSet, &fs.Config.MaxDuration, "max-duration", "", 0, "Maximum duration rclone will transfer data for.") flags.IntVarP(flagSet, &fs.Config.MaxBacklog, "max-backlog", "", fs.Config.MaxBacklog, "Maximum number of objects in sync or check backlog.") flags.IntVarP(flagSet, &fs.Config.MaxStatsGroups, "max-stats-groups", "", fs.Config.MaxStatsGroups, "Maximum number of stats groups to keep in memory. On max oldest is discarded.") flags.BoolVarP(flagSet, &fs.Config.StatsOneLine, "stats-one-line", "", fs.Config.StatsOneLine, "Make the stats fit on one line.") diff --git a/fs/sync/sync.go b/fs/sync/sync.go index 7c3fc7230..e0be39fb6 100644 --- a/fs/sync/sync.go +++ b/fs/sync/sync.go @@ -7,6 +7,7 @@ import ( "path" "sort" "sync" + "time" "github.com/pkg/errors" "github.com/rclone/rclone/fs" @@ -102,7 +103,14 @@ func newSyncCopyMove(ctx context.Context, fdst, fsrc fs.Fs, deleteMode fs.Delete if err != nil { return nil, err } - s.ctx, s.cancel = context.WithCancel(ctx) + // If a max session duration has been defined add a deadline to the context + if fs.Config.MaxDuration > 0 { + endTime := time.Now().Add(fs.Config.MaxDuration) + fs.Infof(s.fdst, "Transfer session deadline: %s", endTime.Format("2006/01/02 15:04:05")) + s.ctx, s.cancel = context.WithDeadline(ctx, endTime) + } else { + s.ctx, s.cancel = context.WithCancel(ctx) + } if s.noTraverse && s.deleteMode != fs.DeleteModeOff { fs.Errorf(nil, "Ignoring --no-traverse with sync") s.noTraverse = false @@ -195,6 +203,9 @@ func (s *syncCopyMove) processError(err error) { if err == nil { return } + if err == context.DeadlineExceeded { + err = fserrors.NoRetryError(err) + } s.errorMu.Lock() defer s.errorMu.Unlock() switch { @@ -742,6 +753,9 @@ func (s *syncCopyMove) run() error { s.processError(deleteEmptyDirectories(s.ctx, s.fsrc, s.srcEmptyDirs)) } + // Read the error out of the context if there is one + s.processError(s.ctx.Err()) + // cancel the context to free resources s.cancel() return s.currentError() diff --git a/fs/sync/sync_test.go b/fs/sync/sync_test.go index 417fd5b4d..bd8194f9a 100644 --- a/fs/sync/sync_test.go +++ b/fs/sync/sync_test.go @@ -4,6 +4,7 @@ package sync import ( "context" + "fmt" "runtime" "strings" "testing" @@ -990,6 +991,48 @@ func TestSyncWithUpdateOlder(t *testing.T) { fstest.CheckItems(t, r.Fremote, oneO, twoF, threeF, fourF, fiveF) } +// Test with a max transfer duration +func TestSyncWithMaxDuration(t *testing.T) { + r := fstest.NewRun(t) + defer r.Finalise() + + maxDuration := 250 * time.Millisecond + fs.Config.MaxDuration = maxDuration + bytesPerSecond := 300 + accounting.SetBwLimit(fs.SizeSuffix(bytesPerSecond)) + oldTransfers := fs.Config.Transfers + fs.Config.Transfers = 1 + defer func() { + fs.Config.MaxDuration = 0 // reset back to default + fs.Config.Transfers = oldTransfers + accounting.SetBwLimit(fs.SizeSuffix(0)) + }() + + // 5 files of 60 bytes at 60 bytes/s 5 seconds + testFiles := make([]fstest.Item, 5) + for i := 0; i < len(testFiles); i++ { + testFiles[i] = r.WriteFile(fmt.Sprintf("file%d", i), "------------------------------------------------------------", t1) + } + + fstest.CheckListing(t, r.Flocal, testFiles) + + accounting.GlobalStats().ResetCounters() + startTime := time.Now() + err := Sync(context.Background(), r.Fremote, r.Flocal, false) + require.Equal(t, context.DeadlineExceeded, errors.Cause(err)) + err = accounting.GlobalStats().GetLastError() + require.NoError(t, err) + + elapsed := time.Since(startTime) + maxTransferTime := (time.Duration(len(testFiles)) * 60 * time.Second) / time.Duration(bytesPerSecond) + + what := fmt.Sprintf("expecting elapsed time %v between %v and %v", elapsed, maxDuration, maxTransferTime) + require.True(t, elapsed >= maxDuration, what) + require.True(t, elapsed < 5*time.Second, what) + // we must not have transferred all files during the session + require.True(t, accounting.GlobalStats().GetTransfers() < int64(len(testFiles))) +} + // Test with TrackRenames set func TestSyncWithTrackRenames(t *testing.T) { r := fstest.NewRun(t)