mirror of
https://github.com/rclone/rclone.git
synced 2025-01-25 15:49:33 +01:00
fs: Add support for --max-transfer-cutoff modes #2672
This also adds max transfer cut off check for server side copies too
This commit is contained in:
parent
7d70eb0346
commit
6f1766dd9e
@ -736,6 +736,20 @@ When the limit is reached all transfers will stop immediately.
|
|||||||
|
|
||||||
Rclone will exit with exit code 8 if the transfer limit is reached.
|
Rclone will exit with exit code 8 if the transfer limit is reached.
|
||||||
|
|
||||||
|
### --max-transfer-(hard,soft,cautious) ###
|
||||||
|
|
||||||
|
This modifies the behavior of `--max-transfer`
|
||||||
|
Defaults to `--max-transfer-hard`.
|
||||||
|
|
||||||
|
Specifiying `--max-transfer-hard` will stop transferring immediately
|
||||||
|
when Rclone reaches the limit.
|
||||||
|
|
||||||
|
Specifiying `--max-transfer-soft` will stop starting new transfers
|
||||||
|
when Rclone reaches the limit.
|
||||||
|
|
||||||
|
Specifiying `--max-transfer-cautious` will try to prevent Rclone
|
||||||
|
from reaching the limit.
|
||||||
|
|
||||||
### --modify-window=TIME ###
|
### --modify-window=TIME ###
|
||||||
|
|
||||||
When checking whether a file has been modified, this is the maximum
|
When checking whether a file has been modified, this is the maximum
|
||||||
|
@ -61,7 +61,10 @@ func newAccountSizeName(stats *StatsInfo, in io.ReadCloser, size int64, name str
|
|||||||
exit: make(chan struct{}),
|
exit: make(chan struct{}),
|
||||||
avg: 0,
|
avg: 0,
|
||||||
lpTime: time.Now(),
|
lpTime: time.Now(),
|
||||||
max: int64(fs.Config.MaxTransfer),
|
max: -1,
|
||||||
|
}
|
||||||
|
if fs.Config.MaxTransferMode != fs.MaxTransferModeSoft {
|
||||||
|
acc.max = int64((fs.Config.MaxTransfer))
|
||||||
}
|
}
|
||||||
go acc.averageLoop()
|
go acc.averageLoop()
|
||||||
stats.inProgress.set(acc.name, acc)
|
stats.inProgress.set(acc.name, acc)
|
||||||
|
@ -197,9 +197,12 @@ func TestAccountAccounter(t *testing.T) {
|
|||||||
|
|
||||||
func TestAccountMaxTransfer(t *testing.T) {
|
func TestAccountMaxTransfer(t *testing.T) {
|
||||||
old := fs.Config.MaxTransfer
|
old := fs.Config.MaxTransfer
|
||||||
|
oldMode := fs.Config.MaxTransferMode
|
||||||
|
|
||||||
fs.Config.MaxTransfer = 15
|
fs.Config.MaxTransfer = 15
|
||||||
defer func() {
|
defer func() {
|
||||||
fs.Config.MaxTransfer = old
|
fs.Config.MaxTransfer = old
|
||||||
|
fs.Config.MaxTransferMode = oldMode
|
||||||
}()
|
}()
|
||||||
|
|
||||||
in := ioutil.NopCloser(bytes.NewBuffer(make([]byte, 100)))
|
in := ioutil.NopCloser(bytes.NewBuffer(make([]byte, 100)))
|
||||||
@ -218,6 +221,20 @@ func TestAccountMaxTransfer(t *testing.T) {
|
|||||||
assert.Equal(t, 0, n)
|
assert.Equal(t, 0, n)
|
||||||
assert.Equal(t, ErrorMaxTransferLimitReached, err)
|
assert.Equal(t, ErrorMaxTransferLimitReached, err)
|
||||||
assert.True(t, fserrors.IsFatalError(err))
|
assert.True(t, fserrors.IsFatalError(err))
|
||||||
|
|
||||||
|
fs.Config.MaxTransferMode = fs.MaxTransferModeSoft
|
||||||
|
stats = NewStats()
|
||||||
|
acc = newAccountSizeName(stats, in, 1, "test")
|
||||||
|
|
||||||
|
n, err = acc.Read(b)
|
||||||
|
assert.Equal(t, 10, n)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
n, err = acc.Read(b)
|
||||||
|
assert.Equal(t, 10, n)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
n, err = acc.Read(b)
|
||||||
|
assert.Equal(t, 10, n)
|
||||||
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestShortenName(t *testing.T) {
|
func TestShortenName(t *testing.T) {
|
||||||
|
@ -382,6 +382,22 @@ func (s *StatsInfo) GetBytes() int64 {
|
|||||||
return s.bytes
|
return s.bytes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetBytesWithPending returns the number of bytes transferred and remaining transfers
|
||||||
|
func (s *StatsInfo) GetBytesWithPending() int64 {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
pending := int64(0)
|
||||||
|
for _, tr := range s.startedTransfers {
|
||||||
|
if tr.acc != nil {
|
||||||
|
bytes, size := tr.acc.progress()
|
||||||
|
if bytes < size {
|
||||||
|
pending += size - bytes
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return s.bytes + pending
|
||||||
|
}
|
||||||
|
|
||||||
// Errors updates the stats for errors
|
// Errors updates the stats for errors
|
||||||
func (s *StatsInfo) Errors(errors int64) {
|
func (s *StatsInfo) Errors(errors int64) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
|
@ -93,6 +93,7 @@ type ConfigInfo struct {
|
|||||||
UseServerModTime bool
|
UseServerModTime bool
|
||||||
MaxTransfer SizeSuffix
|
MaxTransfer SizeSuffix
|
||||||
MaxDuration time.Duration
|
MaxDuration time.Duration
|
||||||
|
MaxTransferMode MaxTransferMode
|
||||||
MaxBacklog int
|
MaxBacklog int
|
||||||
MaxStatsGroups int
|
MaxStatsGroups int
|
||||||
StatsOneLine bool
|
StatsOneLine bool
|
||||||
|
@ -20,15 +20,18 @@ import (
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
// these will get interpreted into fs.Config via SetFlags() below
|
// these will get interpreted into fs.Config via SetFlags() below
|
||||||
verbose int
|
verbose int
|
||||||
quiet bool
|
quiet bool
|
||||||
dumpHeaders bool
|
dumpHeaders bool
|
||||||
dumpBodies bool
|
dumpBodies bool
|
||||||
deleteBefore bool
|
deleteBefore bool
|
||||||
deleteDuring bool
|
deleteDuring bool
|
||||||
deleteAfter bool
|
deleteAfter bool
|
||||||
bindAddr string
|
bindAddr string
|
||||||
disableFeatures string
|
disableFeatures string
|
||||||
|
maxTransferHard bool
|
||||||
|
maxTransferSoft bool
|
||||||
|
maxTransferCautious bool
|
||||||
)
|
)
|
||||||
|
|
||||||
// AddFlags adds the non filing system specific flags to the command
|
// AddFlags adds the non filing system specific flags to the command
|
||||||
@ -94,6 +97,9 @@ func AddFlags(flagSet *pflag.FlagSet) {
|
|||||||
flags.FVarP(flagSet, &fs.Config.Dump, "dump", "", "List of items to dump from: "+fs.DumpFlagsList)
|
flags.FVarP(flagSet, &fs.Config.Dump, "dump", "", "List of items to dump from: "+fs.DumpFlagsList)
|
||||||
flags.FVarP(flagSet, &fs.Config.MaxTransfer, "max-transfer", "", "Maximum size of data to transfer.")
|
flags.FVarP(flagSet, &fs.Config.MaxTransfer, "max-transfer", "", "Maximum size of data to transfer.")
|
||||||
flags.DurationVarP(flagSet, &fs.Config.MaxDuration, "max-duration", "", 0, "Maximum duration rclone will transfer data for.")
|
flags.DurationVarP(flagSet, &fs.Config.MaxDuration, "max-duration", "", 0, "Maximum duration rclone will transfer data for.")
|
||||||
|
flags.BoolVarP(flagSet, &maxTransferHard, "max-transfer-hard", "", false, "When transferring, stop immediately when --max-transfer is reached")
|
||||||
|
flags.BoolVarP(flagSet, &maxTransferSoft, "max-transfer-soft", "", false, "When transferring, stop starting new transfers when --max-transfer is reached")
|
||||||
|
flags.BoolVarP(flagSet, &maxTransferCautious, "max-transfer-cautious", "", false, "When transferring, try to avoid reaching --max-transfer")
|
||||||
flags.IntVarP(flagSet, &fs.Config.MaxBacklog, "max-backlog", "", fs.Config.MaxBacklog, "Maximum number of objects in sync or check backlog.")
|
flags.IntVarP(flagSet, &fs.Config.MaxBacklog, "max-backlog", "", fs.Config.MaxBacklog, "Maximum number of objects in sync or check backlog.")
|
||||||
flags.IntVarP(flagSet, &fs.Config.MaxStatsGroups, "max-stats-groups", "", fs.Config.MaxStatsGroups, "Maximum number of stats groups to keep in memory. On max oldest is discarded.")
|
flags.IntVarP(flagSet, &fs.Config.MaxStatsGroups, "max-stats-groups", "", fs.Config.MaxStatsGroups, "Maximum number of stats groups to keep in memory. On max oldest is discarded.")
|
||||||
flags.BoolVarP(flagSet, &fs.Config.StatsOneLine, "stats-one-line", "", fs.Config.StatsOneLine, "Make the stats fit on one line.")
|
flags.BoolVarP(flagSet, &fs.Config.StatsOneLine, "stats-one-line", "", fs.Config.StatsOneLine, "Make the stats fit on one line.")
|
||||||
@ -178,6 +184,20 @@ func SetFlags() {
|
|||||||
fs.Config.DeleteMode = fs.DeleteModeDefault
|
fs.Config.DeleteMode = fs.DeleteModeDefault
|
||||||
}
|
}
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case maxTransferHard && (maxTransferSoft || maxTransferCautious),
|
||||||
|
maxTransferSoft && maxTransferCautious:
|
||||||
|
log.Fatalf(`Only one of --max-trans fer-hard, --max-transfer-soft or --max-transfer-cautious can be used.`)
|
||||||
|
case maxTransferHard:
|
||||||
|
fs.Config.MaxTransferMode = fs.MaxTransferModeHard
|
||||||
|
case maxTransferSoft:
|
||||||
|
fs.Config.MaxTransferMode = fs.MaxTransferModeSoft
|
||||||
|
case maxTransferCautious:
|
||||||
|
fs.Config.MaxTransferMode = fs.MaxTransferModeCautious
|
||||||
|
default:
|
||||||
|
fs.Config.MaxTransferMode = fs.MaxTransferModeDefault
|
||||||
|
}
|
||||||
|
|
||||||
if fs.Config.CompareDest != "" && fs.Config.CopyDest != "" {
|
if fs.Config.CompareDest != "" && fs.Config.CopyDest != "" {
|
||||||
log.Fatalf(`Can't use --compare-dest with --copy-dest.`)
|
log.Fatalf(`Can't use --compare-dest with --copy-dest.`)
|
||||||
}
|
}
|
||||||
|
12
fs/maxtransfermode.go
Normal file
12
fs/maxtransfermode.go
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
package fs
|
||||||
|
|
||||||
|
// MaxTransferMode describes the possible delete modes in the config
|
||||||
|
type MaxTransferMode byte
|
||||||
|
|
||||||
|
// MaxTransferMode constants
|
||||||
|
const (
|
||||||
|
MaxTransferModeHard MaxTransferMode = iota
|
||||||
|
MaxTransferModeSoft
|
||||||
|
MaxTransferModeCautious
|
||||||
|
MaxTransferModeDefault = MaxTransferModeHard
|
||||||
|
)
|
@ -364,7 +364,8 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
|
|||||||
actionTaken = "Copied (server side copy)"
|
actionTaken = "Copied (server side copy)"
|
||||||
if doCopy := f.Features().Copy; doCopy != nil && (SameConfig(src.Fs(), f) || (SameRemoteType(src.Fs(), f) && f.Features().ServerSideAcrossConfigs)) {
|
if doCopy := f.Features().Copy; doCopy != nil && (SameConfig(src.Fs(), f) || (SameRemoteType(src.Fs(), f) && f.Features().ServerSideAcrossConfigs)) {
|
||||||
// Check transfer limit for server side copies
|
// Check transfer limit for server side copies
|
||||||
if fs.Config.MaxTransfer >= 0 && accounting.Stats(ctx).GetBytes() >= int64(fs.Config.MaxTransfer) {
|
if fs.Config.MaxTransfer >= 0 && (accounting.Stats(ctx).GetBytes() >= int64(fs.Config.MaxTransfer) ||
|
||||||
|
(fs.Config.MaxTransferMode == fs.MaxTransferModeCautious && accounting.Stats(ctx).GetBytesWithPending()+src.Size() >= int64(fs.Config.MaxTransfer))) {
|
||||||
return nil, accounting.ErrorMaxTransferLimitReached
|
return nil, accounting.ErrorMaxTransferLimitReached
|
||||||
}
|
}
|
||||||
in := tr.Account(nil) // account the transfer
|
in := tr.Account(nil) // account the transfer
|
||||||
@ -385,6 +386,10 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
|
|||||||
}
|
}
|
||||||
// If can't server side copy, do it manually
|
// If can't server side copy, do it manually
|
||||||
if err == fs.ErrorCantCopy {
|
if err == fs.ErrorCantCopy {
|
||||||
|
if fs.Config.MaxTransfer >= 0 && (accounting.Stats(ctx).GetBytes() >= int64(fs.Config.MaxTransfer) ||
|
||||||
|
(fs.Config.MaxTransferMode == fs.MaxTransferModeCautious && accounting.Stats(ctx).GetBytesWithPending()+src.Size() >= int64(fs.Config.MaxTransfer))) {
|
||||||
|
return nil, accounting.ErrorMaxTransferLimitReached
|
||||||
|
}
|
||||||
if doMultiThreadCopy(f, src) {
|
if doMultiThreadCopy(f, src) {
|
||||||
// Number of streams proportional to size
|
// Number of streams proportional to size
|
||||||
streams := src.Size() / int64(fs.Config.MultiThreadCutoff)
|
streams := src.Size() / int64(fs.Config.MultiThreadCutoff)
|
||||||
|
@ -39,6 +39,7 @@ import (
|
|||||||
"github.com/rclone/rclone/fs"
|
"github.com/rclone/rclone/fs"
|
||||||
"github.com/rclone/rclone/fs/accounting"
|
"github.com/rclone/rclone/fs/accounting"
|
||||||
"github.com/rclone/rclone/fs/filter"
|
"github.com/rclone/rclone/fs/filter"
|
||||||
|
"github.com/rclone/rclone/fs/fserrors"
|
||||||
"github.com/rclone/rclone/fs/fshttp"
|
"github.com/rclone/rclone/fs/fshttp"
|
||||||
"github.com/rclone/rclone/fs/hash"
|
"github.com/rclone/rclone/fs/hash"
|
||||||
"github.com/rclone/rclone/fs/operations"
|
"github.com/rclone/rclone/fs/operations"
|
||||||
@ -1527,3 +1528,58 @@ func TestRcatSize(t *testing.T) {
|
|||||||
// Check files exist
|
// Check files exist
|
||||||
fstest.CheckItems(t, r.Fremote, file1, file2)
|
fstest.CheckItems(t, r.Fremote, file1, file2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCopyFileMaxTransfer(t *testing.T) {
|
||||||
|
r := fstest.NewRun(t)
|
||||||
|
defer r.Finalise()
|
||||||
|
old := fs.Config.MaxTransfer
|
||||||
|
oldMode := fs.Config.MaxTransferMode
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
fs.Config.MaxTransfer = old
|
||||||
|
fs.Config.MaxTransferMode = oldMode
|
||||||
|
accounting.Stats(context.Background()).ResetCounters()
|
||||||
|
}()
|
||||||
|
|
||||||
|
file1 := r.WriteFile("file1", "file1 contents", t1)
|
||||||
|
file2 := r.WriteFile("file2", "file2 contents...........", t2)
|
||||||
|
|
||||||
|
rfile1 := file1
|
||||||
|
rfile1.Path = "sub/file1"
|
||||||
|
rfile2 := file2
|
||||||
|
rfile2.Path = "sub/file2"
|
||||||
|
|
||||||
|
fs.Config.MaxTransfer = 15
|
||||||
|
fs.Config.MaxTransferMode = fs.MaxTransferModeHard
|
||||||
|
accounting.Stats(context.Background()).ResetCounters()
|
||||||
|
|
||||||
|
err := operations.CopyFile(context.Background(), r.Fremote, r.Flocal, rfile1.Path, file1.Path)
|
||||||
|
require.NoError(t, err)
|
||||||
|
fstest.CheckItems(t, r.Flocal, file1, file2)
|
||||||
|
fstest.CheckItems(t, r.Fremote, rfile1)
|
||||||
|
|
||||||
|
accounting.Stats(context.Background()).ResetCounters()
|
||||||
|
|
||||||
|
err = operations.CopyFile(context.Background(), r.Fremote, r.Flocal, rfile2.Path, file2.Path)
|
||||||
|
fstest.CheckItems(t, r.Flocal, file1, file2)
|
||||||
|
fstest.CheckItems(t, r.Fremote, rfile1)
|
||||||
|
assert.Equal(t, accounting.ErrorMaxTransferLimitReached, err)
|
||||||
|
assert.True(t, fserrors.IsFatalError(err))
|
||||||
|
|
||||||
|
fs.Config.MaxTransferMode = fs.MaxTransferModeCautious
|
||||||
|
accounting.Stats(context.Background()).ResetCounters()
|
||||||
|
|
||||||
|
err = operations.CopyFile(context.Background(), r.Fremote, r.Flocal, rfile2.Path, file2.Path)
|
||||||
|
fstest.CheckItems(t, r.Flocal, file1, file2)
|
||||||
|
fstest.CheckItems(t, r.Fremote, rfile1)
|
||||||
|
assert.Equal(t, accounting.ErrorMaxTransferLimitReached, err)
|
||||||
|
assert.True(t, fserrors.IsFatalError(err))
|
||||||
|
|
||||||
|
fs.Config.MaxTransferMode = fs.MaxTransferModeSoft
|
||||||
|
accounting.Stats(context.Background()).ResetCounters()
|
||||||
|
|
||||||
|
err = operations.CopyFile(context.Background(), r.Fremote, r.Flocal, rfile2.Path, file2.Path)
|
||||||
|
require.NoError(t, err)
|
||||||
|
fstest.CheckItems(t, r.Flocal, file1, file2)
|
||||||
|
fstest.CheckItems(t, r.Fremote, rfile1, rfile2)
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user