fs: Add support for --max-transfer-cutoff modes #2672

This also adds max transfer cut off check for server side copies too
This commit is contained in:
Shing Kit Chan 2019-10-31 03:23:17 +08:00 committed by Nick Craig-Wood
parent 7d70eb0346
commit 6f1766dd9e
9 changed files with 155 additions and 11 deletions

View File

@ -736,6 +736,20 @@ When the limit is reached all transfers will stop immediately.
Rclone will exit with exit code 8 if the transfer limit is reached. Rclone will exit with exit code 8 if the transfer limit is reached.
### --max-transfer-(hard,soft,cautious) ###
This modifies the behavior of `--max-transfer`
Defaults to `--max-transfer-hard`.
Specifiying `--max-transfer-hard` will stop transferring immediately
when Rclone reaches the limit.
Specifiying `--max-transfer-soft` will stop starting new transfers
when Rclone reaches the limit.
Specifiying `--max-transfer-cautious` will try to prevent Rclone
from reaching the limit.
### --modify-window=TIME ### ### --modify-window=TIME ###
When checking whether a file has been modified, this is the maximum When checking whether a file has been modified, this is the maximum

View File

@ -61,7 +61,10 @@ func newAccountSizeName(stats *StatsInfo, in io.ReadCloser, size int64, name str
exit: make(chan struct{}), exit: make(chan struct{}),
avg: 0, avg: 0,
lpTime: time.Now(), lpTime: time.Now(),
max: int64(fs.Config.MaxTransfer), max: -1,
}
if fs.Config.MaxTransferMode != fs.MaxTransferModeSoft {
acc.max = int64((fs.Config.MaxTransfer))
} }
go acc.averageLoop() go acc.averageLoop()
stats.inProgress.set(acc.name, acc) stats.inProgress.set(acc.name, acc)

View File

@ -197,9 +197,12 @@ func TestAccountAccounter(t *testing.T) {
func TestAccountMaxTransfer(t *testing.T) { func TestAccountMaxTransfer(t *testing.T) {
old := fs.Config.MaxTransfer old := fs.Config.MaxTransfer
oldMode := fs.Config.MaxTransferMode
fs.Config.MaxTransfer = 15 fs.Config.MaxTransfer = 15
defer func() { defer func() {
fs.Config.MaxTransfer = old fs.Config.MaxTransfer = old
fs.Config.MaxTransferMode = oldMode
}() }()
in := ioutil.NopCloser(bytes.NewBuffer(make([]byte, 100))) in := ioutil.NopCloser(bytes.NewBuffer(make([]byte, 100)))
@ -218,6 +221,20 @@ func TestAccountMaxTransfer(t *testing.T) {
assert.Equal(t, 0, n) assert.Equal(t, 0, n)
assert.Equal(t, ErrorMaxTransferLimitReached, err) assert.Equal(t, ErrorMaxTransferLimitReached, err)
assert.True(t, fserrors.IsFatalError(err)) assert.True(t, fserrors.IsFatalError(err))
fs.Config.MaxTransferMode = fs.MaxTransferModeSoft
stats = NewStats()
acc = newAccountSizeName(stats, in, 1, "test")
n, err = acc.Read(b)
assert.Equal(t, 10, n)
assert.NoError(t, err)
n, err = acc.Read(b)
assert.Equal(t, 10, n)
assert.NoError(t, err)
n, err = acc.Read(b)
assert.Equal(t, 10, n)
assert.NoError(t, err)
} }
func TestShortenName(t *testing.T) { func TestShortenName(t *testing.T) {

View File

@ -382,6 +382,22 @@ func (s *StatsInfo) GetBytes() int64 {
return s.bytes return s.bytes
} }
// GetBytesWithPending returns the number of bytes transferred and remaining transfers
func (s *StatsInfo) GetBytesWithPending() int64 {
s.mu.RLock()
defer s.mu.RUnlock()
pending := int64(0)
for _, tr := range s.startedTransfers {
if tr.acc != nil {
bytes, size := tr.acc.progress()
if bytes < size {
pending += size - bytes
}
}
}
return s.bytes + pending
}
// Errors updates the stats for errors // Errors updates the stats for errors
func (s *StatsInfo) Errors(errors int64) { func (s *StatsInfo) Errors(errors int64) {
s.mu.Lock() s.mu.Lock()

View File

@ -93,6 +93,7 @@ type ConfigInfo struct {
UseServerModTime bool UseServerModTime bool
MaxTransfer SizeSuffix MaxTransfer SizeSuffix
MaxDuration time.Duration MaxDuration time.Duration
MaxTransferMode MaxTransferMode
MaxBacklog int MaxBacklog int
MaxStatsGroups int MaxStatsGroups int
StatsOneLine bool StatsOneLine bool

View File

@ -29,6 +29,9 @@ var (
deleteAfter bool deleteAfter bool
bindAddr string bindAddr string
disableFeatures string disableFeatures string
maxTransferHard bool
maxTransferSoft bool
maxTransferCautious bool
) )
// AddFlags adds the non filing system specific flags to the command // AddFlags adds the non filing system specific flags to the command
@ -94,6 +97,9 @@ func AddFlags(flagSet *pflag.FlagSet) {
flags.FVarP(flagSet, &fs.Config.Dump, "dump", "", "List of items to dump from: "+fs.DumpFlagsList) flags.FVarP(flagSet, &fs.Config.Dump, "dump", "", "List of items to dump from: "+fs.DumpFlagsList)
flags.FVarP(flagSet, &fs.Config.MaxTransfer, "max-transfer", "", "Maximum size of data to transfer.") flags.FVarP(flagSet, &fs.Config.MaxTransfer, "max-transfer", "", "Maximum size of data to transfer.")
flags.DurationVarP(flagSet, &fs.Config.MaxDuration, "max-duration", "", 0, "Maximum duration rclone will transfer data for.") flags.DurationVarP(flagSet, &fs.Config.MaxDuration, "max-duration", "", 0, "Maximum duration rclone will transfer data for.")
flags.BoolVarP(flagSet, &maxTransferHard, "max-transfer-hard", "", false, "When transferring, stop immediately when --max-transfer is reached")
flags.BoolVarP(flagSet, &maxTransferSoft, "max-transfer-soft", "", false, "When transferring, stop starting new transfers when --max-transfer is reached")
flags.BoolVarP(flagSet, &maxTransferCautious, "max-transfer-cautious", "", false, "When transferring, try to avoid reaching --max-transfer")
flags.IntVarP(flagSet, &fs.Config.MaxBacklog, "max-backlog", "", fs.Config.MaxBacklog, "Maximum number of objects in sync or check backlog.") flags.IntVarP(flagSet, &fs.Config.MaxBacklog, "max-backlog", "", fs.Config.MaxBacklog, "Maximum number of objects in sync or check backlog.")
flags.IntVarP(flagSet, &fs.Config.MaxStatsGroups, "max-stats-groups", "", fs.Config.MaxStatsGroups, "Maximum number of stats groups to keep in memory. On max oldest is discarded.") flags.IntVarP(flagSet, &fs.Config.MaxStatsGroups, "max-stats-groups", "", fs.Config.MaxStatsGroups, "Maximum number of stats groups to keep in memory. On max oldest is discarded.")
flags.BoolVarP(flagSet, &fs.Config.StatsOneLine, "stats-one-line", "", fs.Config.StatsOneLine, "Make the stats fit on one line.") flags.BoolVarP(flagSet, &fs.Config.StatsOneLine, "stats-one-line", "", fs.Config.StatsOneLine, "Make the stats fit on one line.")
@ -178,6 +184,20 @@ func SetFlags() {
fs.Config.DeleteMode = fs.DeleteModeDefault fs.Config.DeleteMode = fs.DeleteModeDefault
} }
switch {
case maxTransferHard && (maxTransferSoft || maxTransferCautious),
maxTransferSoft && maxTransferCautious:
log.Fatalf(`Only one of --max-trans fer-hard, --max-transfer-soft or --max-transfer-cautious can be used.`)
case maxTransferHard:
fs.Config.MaxTransferMode = fs.MaxTransferModeHard
case maxTransferSoft:
fs.Config.MaxTransferMode = fs.MaxTransferModeSoft
case maxTransferCautious:
fs.Config.MaxTransferMode = fs.MaxTransferModeCautious
default:
fs.Config.MaxTransferMode = fs.MaxTransferModeDefault
}
if fs.Config.CompareDest != "" && fs.Config.CopyDest != "" { if fs.Config.CompareDest != "" && fs.Config.CopyDest != "" {
log.Fatalf(`Can't use --compare-dest with --copy-dest.`) log.Fatalf(`Can't use --compare-dest with --copy-dest.`)
} }

12
fs/maxtransfermode.go Normal file
View File

@ -0,0 +1,12 @@
package fs
// MaxTransferMode describes the possible delete modes in the config
type MaxTransferMode byte
// MaxTransferMode constants
const (
MaxTransferModeHard MaxTransferMode = iota
MaxTransferModeSoft
MaxTransferModeCautious
MaxTransferModeDefault = MaxTransferModeHard
)

View File

@ -364,7 +364,8 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
actionTaken = "Copied (server side copy)" actionTaken = "Copied (server side copy)"
if doCopy := f.Features().Copy; doCopy != nil && (SameConfig(src.Fs(), f) || (SameRemoteType(src.Fs(), f) && f.Features().ServerSideAcrossConfigs)) { if doCopy := f.Features().Copy; doCopy != nil && (SameConfig(src.Fs(), f) || (SameRemoteType(src.Fs(), f) && f.Features().ServerSideAcrossConfigs)) {
// Check transfer limit for server side copies // Check transfer limit for server side copies
if fs.Config.MaxTransfer >= 0 && accounting.Stats(ctx).GetBytes() >= int64(fs.Config.MaxTransfer) { if fs.Config.MaxTransfer >= 0 && (accounting.Stats(ctx).GetBytes() >= int64(fs.Config.MaxTransfer) ||
(fs.Config.MaxTransferMode == fs.MaxTransferModeCautious && accounting.Stats(ctx).GetBytesWithPending()+src.Size() >= int64(fs.Config.MaxTransfer))) {
return nil, accounting.ErrorMaxTransferLimitReached return nil, accounting.ErrorMaxTransferLimitReached
} }
in := tr.Account(nil) // account the transfer in := tr.Account(nil) // account the transfer
@ -385,6 +386,10 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
} }
// If can't server side copy, do it manually // If can't server side copy, do it manually
if err == fs.ErrorCantCopy { if err == fs.ErrorCantCopy {
if fs.Config.MaxTransfer >= 0 && (accounting.Stats(ctx).GetBytes() >= int64(fs.Config.MaxTransfer) ||
(fs.Config.MaxTransferMode == fs.MaxTransferModeCautious && accounting.Stats(ctx).GetBytesWithPending()+src.Size() >= int64(fs.Config.MaxTransfer))) {
return nil, accounting.ErrorMaxTransferLimitReached
}
if doMultiThreadCopy(f, src) { if doMultiThreadCopy(f, src) {
// Number of streams proportional to size // Number of streams proportional to size
streams := src.Size() / int64(fs.Config.MultiThreadCutoff) streams := src.Size() / int64(fs.Config.MultiThreadCutoff)

View File

@ -39,6 +39,7 @@ import (
"github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/filter" "github.com/rclone/rclone/fs/filter"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/fs/fshttp" "github.com/rclone/rclone/fs/fshttp"
"github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/operations" "github.com/rclone/rclone/fs/operations"
@ -1527,3 +1528,58 @@ func TestRcatSize(t *testing.T) {
// Check files exist // Check files exist
fstest.CheckItems(t, r.Fremote, file1, file2) fstest.CheckItems(t, r.Fremote, file1, file2)
} }
func TestCopyFileMaxTransfer(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
old := fs.Config.MaxTransfer
oldMode := fs.Config.MaxTransferMode
defer func() {
fs.Config.MaxTransfer = old
fs.Config.MaxTransferMode = oldMode
accounting.Stats(context.Background()).ResetCounters()
}()
file1 := r.WriteFile("file1", "file1 contents", t1)
file2 := r.WriteFile("file2", "file2 contents...........", t2)
rfile1 := file1
rfile1.Path = "sub/file1"
rfile2 := file2
rfile2.Path = "sub/file2"
fs.Config.MaxTransfer = 15
fs.Config.MaxTransferMode = fs.MaxTransferModeHard
accounting.Stats(context.Background()).ResetCounters()
err := operations.CopyFile(context.Background(), r.Fremote, r.Flocal, rfile1.Path, file1.Path)
require.NoError(t, err)
fstest.CheckItems(t, r.Flocal, file1, file2)
fstest.CheckItems(t, r.Fremote, rfile1)
accounting.Stats(context.Background()).ResetCounters()
err = operations.CopyFile(context.Background(), r.Fremote, r.Flocal, rfile2.Path, file2.Path)
fstest.CheckItems(t, r.Flocal, file1, file2)
fstest.CheckItems(t, r.Fremote, rfile1)
assert.Equal(t, accounting.ErrorMaxTransferLimitReached, err)
assert.True(t, fserrors.IsFatalError(err))
fs.Config.MaxTransferMode = fs.MaxTransferModeCautious
accounting.Stats(context.Background()).ResetCounters()
err = operations.CopyFile(context.Background(), r.Fremote, r.Flocal, rfile2.Path, file2.Path)
fstest.CheckItems(t, r.Flocal, file1, file2)
fstest.CheckItems(t, r.Fremote, rfile1)
assert.Equal(t, accounting.ErrorMaxTransferLimitReached, err)
assert.True(t, fserrors.IsFatalError(err))
fs.Config.MaxTransferMode = fs.MaxTransferModeSoft
accounting.Stats(context.Background()).ResetCounters()
err = operations.CopyFile(context.Background(), r.Fremote, r.Flocal, rfile2.Path, file2.Path)
require.NoError(t, err)
fstest.CheckItems(t, r.Flocal, file1, file2)
fstest.CheckItems(t, r.Fremote, rfile1, rfile2)
}