rcat: configurable small files cutoff and implement proper upload verification

This commit is contained in:
Stefan Breunig 2017-09-11 08:26:53 +02:00
parent 57817397a0
commit 80b1f2a494
4 changed files with 170 additions and 125 deletions

View File

@ -22,13 +22,24 @@ rclone rcat reads from standard input (stdin) and copies it to a
single remote file. single remote file.
echo "hello world" | rclone rcat remote:path/to/file echo "hello world" | rclone rcat remote:path/to/file
ffmpeg - | rclone rcat --checksum remote:path/to/file
Note that since the size is not known in advance, chunking options If the remote file already exists, it will be overwritten.
will likely be ignored. The upload can also not be retried because
the data is not kept around until the upload succeeds. If you need rcat will try to upload small files in a single request, which is
to transfer a lot of data, you're better off caching locally and usually more efficient than the streaming/chunked upload endpoints,
then ` + "`rclone move`" + ` it to the destination. which use multiple requests. Exact behaviour depends on the remote.
`, What is considered a small file may be set through
` + "`--streaming-upload-cutoff`" + `. Uploading only starts after
the cutoff is reached or if the file ends before that. The data
must fit into RAM. The cutoff needs to be small enough to adhere
the limits of your remote, please see there. Generally speaking,
setting this cutoff too high will decrease your performance.
Note that the upload can also not be retried because the data is
not kept around until the upload succeeds. If you need to transfer
a lot of data, you're better off caching locally and then
` + "`rclone move`" + ` it to the destination.`,
Run: func(command *cobra.Command, args []string) { Run: func(command *cobra.Command, args []string) {
cmd.CheckArgs(1, 1, command, args) cmd.CheckArgs(1, 1, command, args)

View File

@ -64,48 +64,49 @@ var (
// Config is the global config // Config is the global config
Config = &ConfigInfo{} Config = &ConfigInfo{}
// Flags // Flags
verbose = CountP("verbose", "v", "Print lots more stuff (repeat for more)") verbose = CountP("verbose", "v", "Print lots more stuff (repeat for more)")
quiet = BoolP("quiet", "q", false, "Print as little stuff as possible") quiet = BoolP("quiet", "q", false, "Print as little stuff as possible")
modifyWindow = DurationP("modify-window", "", time.Nanosecond, "Max time diff to be considered the same") modifyWindow = DurationP("modify-window", "", time.Nanosecond, "Max time diff to be considered the same")
checkers = IntP("checkers", "", 8, "Number of checkers to run in parallel.") checkers = IntP("checkers", "", 8, "Number of checkers to run in parallel.")
transfers = IntP("transfers", "", 4, "Number of file transfers to run in parallel.") transfers = IntP("transfers", "", 4, "Number of file transfers to run in parallel.")
configFile = StringP("config", "", ConfigPath, "Config file.") configFile = StringP("config", "", ConfigPath, "Config file.")
checkSum = BoolP("checksum", "c", false, "Skip based on checksum & size, not mod-time & size") checkSum = BoolP("checksum", "c", false, "Skip based on checksum & size, not mod-time & size")
sizeOnly = BoolP("size-only", "", false, "Skip based on size only, not mod-time or checksum") sizeOnly = BoolP("size-only", "", false, "Skip based on size only, not mod-time or checksum")
ignoreTimes = BoolP("ignore-times", "I", false, "Don't skip files that match size and time - transfer all files") ignoreTimes = BoolP("ignore-times", "I", false, "Don't skip files that match size and time - transfer all files")
ignoreExisting = BoolP("ignore-existing", "", false, "Skip all files that exist on destination") ignoreExisting = BoolP("ignore-existing", "", false, "Skip all files that exist on destination")
dryRun = BoolP("dry-run", "n", false, "Do a trial run with no permanent changes") dryRun = BoolP("dry-run", "n", false, "Do a trial run with no permanent changes")
connectTimeout = DurationP("contimeout", "", 60*time.Second, "Connect timeout") connectTimeout = DurationP("contimeout", "", 60*time.Second, "Connect timeout")
timeout = DurationP("timeout", "", 5*60*time.Second, "IO idle timeout") timeout = DurationP("timeout", "", 5*60*time.Second, "IO idle timeout")
dumpHeaders = BoolP("dump-headers", "", false, "Dump HTTP headers - may contain sensitive info") dumpHeaders = BoolP("dump-headers", "", false, "Dump HTTP headers - may contain sensitive info")
dumpBodies = BoolP("dump-bodies", "", false, "Dump HTTP headers and bodies - may contain sensitive info") dumpBodies = BoolP("dump-bodies", "", false, "Dump HTTP headers and bodies - may contain sensitive info")
dumpAuth = BoolP("dump-auth", "", false, "Dump HTTP headers with auth info") dumpAuth = BoolP("dump-auth", "", false, "Dump HTTP headers with auth info")
skipVerify = BoolP("no-check-certificate", "", false, "Do not verify the server SSL certificate. Insecure.") skipVerify = BoolP("no-check-certificate", "", false, "Do not verify the server SSL certificate. Insecure.")
AskPassword = BoolP("ask-password", "", true, "Allow prompt for password for encrypted configuration.") AskPassword = BoolP("ask-password", "", true, "Allow prompt for password for encrypted configuration.")
deleteBefore = BoolP("delete-before", "", false, "When synchronizing, delete files on destination before transfering") deleteBefore = BoolP("delete-before", "", false, "When synchronizing, delete files on destination before transfering")
deleteDuring = BoolP("delete-during", "", false, "When synchronizing, delete files during transfer (default)") deleteDuring = BoolP("delete-during", "", false, "When synchronizing, delete files during transfer (default)")
deleteAfter = BoolP("delete-after", "", false, "When synchronizing, delete files on destination after transfering") deleteAfter = BoolP("delete-after", "", false, "When synchronizing, delete files on destination after transfering")
trackRenames = BoolP("track-renames", "", false, "When synchronizing, track file renames and do a server side move if possible") trackRenames = BoolP("track-renames", "", false, "When synchronizing, track file renames and do a server side move if possible")
lowLevelRetries = IntP("low-level-retries", "", 10, "Number of low level retries to do.") lowLevelRetries = IntP("low-level-retries", "", 10, "Number of low level retries to do.")
updateOlder = BoolP("update", "u", false, "Skip files that are newer on the destination.") updateOlder = BoolP("update", "u", false, "Skip files that are newer on the destination.")
noGzip = BoolP("no-gzip-encoding", "", false, "Don't set Accept-Encoding: gzip.") noGzip = BoolP("no-gzip-encoding", "", false, "Don't set Accept-Encoding: gzip.")
maxDepth = IntP("max-depth", "", -1, "If set limits the recursion depth to this.") maxDepth = IntP("max-depth", "", -1, "If set limits the recursion depth to this.")
ignoreSize = BoolP("ignore-size", "", false, "Ignore size when skipping use mod-time or checksum.") ignoreSize = BoolP("ignore-size", "", false, "Ignore size when skipping use mod-time or checksum.")
ignoreChecksum = BoolP("ignore-checksum", "", false, "Skip post copy check of checksums.") ignoreChecksum = BoolP("ignore-checksum", "", false, "Skip post copy check of checksums.")
noTraverse = BoolP("no-traverse", "", false, "Don't traverse destination file system on copy.") noTraverse = BoolP("no-traverse", "", false, "Don't traverse destination file system on copy.")
noUpdateModTime = BoolP("no-update-modtime", "", false, "Don't update destination mod-time if files identical.") noUpdateModTime = BoolP("no-update-modtime", "", false, "Don't update destination mod-time if files identical.")
backupDir = StringP("backup-dir", "", "", "Make backups into hierarchy based in DIR.") backupDir = StringP("backup-dir", "", "", "Make backups into hierarchy based in DIR.")
suffix = StringP("suffix", "", "", "Suffix for use with --backup-dir.") suffix = StringP("suffix", "", "", "Suffix for use with --backup-dir.")
useListR = BoolP("fast-list", "", false, "Use recursive list if available. Uses more memory but fewer transactions.") useListR = BoolP("fast-list", "", false, "Use recursive list if available. Uses more memory but fewer transactions.")
tpsLimit = Float64P("tpslimit", "", 0, "Limit HTTP transactions per second to this.") tpsLimit = Float64P("tpslimit", "", 0, "Limit HTTP transactions per second to this.")
tpsLimitBurst = IntP("tpslimit-burst", "", 1, "Max burst of transactions for --tpslimit.") tpsLimitBurst = IntP("tpslimit-burst", "", 1, "Max burst of transactions for --tpslimit.")
bindAddr = StringP("bind", "", "", "Local address to bind to for outgoing connections, IPv4, IPv6 or name.") bindAddr = StringP("bind", "", "", "Local address to bind to for outgoing connections, IPv4, IPv6 or name.")
disableFeatures = StringP("disable", "", "", "Disable a comma separated list of features. Use help to see a list.") disableFeatures = StringP("disable", "", "", "Disable a comma separated list of features. Use help to see a list.")
userAgent = StringP("user-agent", "", "rclone/"+Version, "Set the user-agent to a specified string. The default is rclone/ version") userAgent = StringP("user-agent", "", "rclone/"+Version, "Set the user-agent to a specified string. The default is rclone/ version")
logLevel = LogLevelNotice streamingUploadCutoff = SizeSuffix(100 * 1024)
statsLogLevel = LogLevelInfo logLevel = LogLevelNotice
bwLimit BwTimetable statsLogLevel = LogLevelInfo
bufferSize SizeSuffix = 16 << 20 bwLimit BwTimetable
bufferSize SizeSuffix = 16 << 20
// Key to use for password en/decryption. // Key to use for password en/decryption.
// When nil, no encryption will be used for saving. // When nil, no encryption will be used for saving.
@ -117,6 +118,7 @@ func init() {
VarP(&statsLogLevel, "stats-log-level", "", "Log level to show --stats output DEBUG|INFO|NOTICE|ERROR") VarP(&statsLogLevel, "stats-log-level", "", "Log level to show --stats output DEBUG|INFO|NOTICE|ERROR")
VarP(&bwLimit, "bwlimit", "", "Bandwidth limit in kBytes/s, or use suffix b|k|M|G or a full timetable.") VarP(&bwLimit, "bwlimit", "", "Bandwidth limit in kBytes/s, or use suffix b|k|M|G or a full timetable.")
VarP(&bufferSize, "buffer-size", "", "Buffer size when copying files.") VarP(&bufferSize, "buffer-size", "", "Buffer size when copying files.")
VarP(&streamingUploadCutoff, "streaming-upload-cutoff", "", "Cutoff for switching to chunked upload if file size is unknown. Upload starts after reaching cutoff or when file ends.")
} }
// crypt internals // crypt internals
@ -202,42 +204,43 @@ func MustReveal(x string) string {
// ConfigInfo is filesystem config options // ConfigInfo is filesystem config options
type ConfigInfo struct { type ConfigInfo struct {
LogLevel LogLevel LogLevel LogLevel
StatsLogLevel LogLevel StatsLogLevel LogLevel
DryRun bool DryRun bool
CheckSum bool CheckSum bool
SizeOnly bool SizeOnly bool
IgnoreTimes bool IgnoreTimes bool
IgnoreExisting bool IgnoreExisting bool
ModifyWindow time.Duration ModifyWindow time.Duration
Checkers int Checkers int
Transfers int Transfers int
ConnectTimeout time.Duration // Connect timeout ConnectTimeout time.Duration // Connect timeout
Timeout time.Duration // Data channel timeout Timeout time.Duration // Data channel timeout
DumpHeaders bool DumpHeaders bool
DumpBodies bool DumpBodies bool
DumpAuth bool DumpAuth bool
Filter *Filter Filter *Filter
InsecureSkipVerify bool // Skip server certificate verification InsecureSkipVerify bool // Skip server certificate verification
DeleteMode DeleteMode DeleteMode DeleteMode
TrackRenames bool // Track file renames. TrackRenames bool // Track file renames.
LowLevelRetries int LowLevelRetries int
UpdateOlder bool // Skip files that are newer on the destination UpdateOlder bool // Skip files that are newer on the destination
NoGzip bool // Disable compression NoGzip bool // Disable compression
MaxDepth int MaxDepth int
IgnoreSize bool IgnoreSize bool
IgnoreChecksum bool IgnoreChecksum bool
NoTraverse bool NoTraverse bool
NoUpdateModTime bool NoUpdateModTime bool
DataRateUnit string DataRateUnit string
BackupDir string BackupDir string
Suffix string Suffix string
UseListR bool UseListR bool
BufferSize SizeSuffix BufferSize SizeSuffix
TPSLimit float64 TPSLimit float64
TPSLimitBurst int TPSLimitBurst int
BindAddr net.IP BindAddr net.IP
DisableFeatures []string DisableFeatures []string
StreamingUploadCutoff SizeSuffix
} }
// Return the path to the configuration file // Return the path to the configuration file
@ -377,6 +380,7 @@ func LoadConfig() {
Config.TPSLimit = *tpsLimit Config.TPSLimit = *tpsLimit
Config.TPSLimitBurst = *tpsLimitBurst Config.TPSLimitBurst = *tpsLimitBurst
Config.BufferSize = bufferSize Config.BufferSize = bufferSize
Config.StreamingUploadCutoff = streamingUploadCutoff
Config.TrackRenames = *trackRenames Config.TrackRenames = *trackRenames

View File

@ -65,7 +65,7 @@ func HashEquals(src, dst string) bool {
// err - may return an error which will already have been logged // err - may return an error which will already have been logged
// //
// If an error is returned it will return equal as false // If an error is returned it will return equal as false
func CheckHashes(src, dst Object) (equal bool, hash HashType, err error) { func CheckHashes(src ObjectInfo, dst Object) (equal bool, hash HashType, err error) {
common := src.Fs().Hashes().Overlap(dst.Fs().Hashes()) common := src.Fs().Hashes().Overlap(dst.Fs().Hashes())
// Debugf(nil, "Shared hashes: %v", common) // Debugf(nil, "Shared hashes: %v", common)
if common.Count() == 0 { if common.Count() == 0 {
@ -115,11 +115,11 @@ func CheckHashes(src, dst Object) (equal bool, hash HashType, err error) {
// //
// Otherwise the file is considered to be not equal including if there // Otherwise the file is considered to be not equal including if there
// were errors reading info. // were errors reading info.
func Equal(src, dst Object) bool { func Equal(src ObjectInfo, dst Object) bool {
return equal(src, dst, Config.SizeOnly, Config.CheckSum) return equal(src, dst, Config.SizeOnly, Config.CheckSum)
} }
func equal(src, dst Object, sizeOnly, checkSum bool) bool { func equal(src ObjectInfo, dst Object, sizeOnly, checkSum bool) bool {
if !Config.IgnoreSize { if !Config.IgnoreSize {
if src.Size() != dst.Size() { if src.Size() != dst.Size() {
Debugf(src, "Sizes differ") Debugf(src, "Sizes differ")
@ -1587,27 +1587,44 @@ func Rcat(fdst Fs, dstFileName string, in0 io.ReadCloser, modTime time.Time) (er
Stats.Transferring(dstFileName) Stats.Transferring(dstFileName)
defer func() { defer func() {
Stats.DoneTransferring(dstFileName, err == nil) Stats.DoneTransferring(dstFileName, err == nil)
if err := in0.Close(); err != nil { if err = in0.Close(); err != nil {
Debugf(fdst, "Rcat: failed to close source: %v", err) Debugf(fdst, "Rcat: failed to close source: %v", err)
} }
}() }()
hashOption := &HashesOption{Hashes: NewHashSet()} hashOption := &HashesOption{Hashes: fdst.Hashes()}
hash, err := NewMultiHasherTypes(fdst.Hashes())
in := in0 if err != nil {
buf := make([]byte, 100*1024)
if n, err := io.ReadFull(in0, buf); err != nil {
Debugf(fdst, "File to upload is small, uploading instead of streaming")
in = ioutil.NopCloser(bytes.NewReader(buf[:n]))
in = NewAccountSizeName(in, int64(n), dstFileName).WithBuffer()
if !Config.SizeOnly {
hashOption = &HashesOption{Hashes: HashSet(fdst.Hashes().GetOne())}
}
objInfo := NewStaticObjectInfo(dstFileName, modTime, int64(n), false, nil, nil)
_, err := fdst.Put(in, objInfo, hashOption)
return err return err
} }
in = ioutil.NopCloser(io.MultiReader(bytes.NewReader(buf), in0)) readCounter := NewCountingReader(in0)
trackingIn := io.TeeReader(readCounter, hash)
compare := func(dst Object) error {
src := NewStaticObjectInfo(dstFileName, modTime, int64(readCounter.BytesRead()), false, hash.Sums(), fdst)
if !Equal(src, dst) {
Stats.Error()
err = errors.Errorf("corrupted on transfer")
Errorf(dst, "%v", err)
return err
}
return nil
}
// check if file small enough for direct upload
buf := make([]byte, Config.StreamingUploadCutoff)
if n, err := io.ReadFull(trackingIn, buf); err == io.EOF || err == io.ErrUnexpectedEOF {
Debugf(fdst, "File to upload is small (%d bytes), uploading instead of streaming", n)
in := ioutil.NopCloser(bytes.NewReader(buf[:n]))
in = NewAccountSizeName(in, int64(n), dstFileName).WithBuffer()
objInfo := NewStaticObjectInfo(dstFileName, modTime, int64(n), false, nil, nil)
dst, err := fdst.Put(in, objInfo, hashOption)
if err != nil {
return err
}
return compare(dst)
}
in := ioutil.NopCloser(io.MultiReader(bytes.NewReader(buf), trackingIn))
fStreamTo := fdst fStreamTo := fdst
canStream := fdst.Features().PutStream != nil canStream := fdst.Features().PutStream != nil
@ -1626,10 +1643,6 @@ func Rcat(fdst Fs, dstFileName string, in0 io.ReadCloser, modTime time.Time) (er
fStreamTo = tmpLocalFs fStreamTo = tmpLocalFs
} }
if !Config.SizeOnly {
hashOption = &HashesOption{Hashes: HashSet(fStreamTo.Hashes().GetOne())}
}
in = NewAccountSizeName(in, -1, dstFileName).WithBuffer() in = NewAccountSizeName(in, -1, dstFileName).WithBuffer()
if Config.DryRun { if Config.DryRun {
@ -1641,10 +1654,16 @@ func Rcat(fdst Fs, dstFileName string, in0 io.ReadCloser, modTime time.Time) (er
objInfo := NewStaticObjectInfo(dstFileName, modTime, -1, false, nil, nil) objInfo := NewStaticObjectInfo(dstFileName, modTime, -1, false, nil, nil)
tmpObj, err := fStreamTo.Features().PutStream(in, objInfo, hashOption) tmpObj, err := fStreamTo.Features().PutStream(in, objInfo, hashOption)
if err == nil && !canStream { if err != nil {
err = Copy(fdst, nil, dstFileName, tmpObj) return err
} }
return err if err = compare(tmpObj); err != nil {
return err
}
if !canStream {
return Copy(fdst, nil, dstFileName, tmpObj)
}
return nil
} }
// Rmdirs removes any empty directories (or directories only // Rmdirs removes any empty directories (or directories only

View File

@ -732,28 +732,39 @@ func TestCat(t *testing.T) {
} }
func TestRcat(t *testing.T) { func TestRcat(t *testing.T) {
r := NewRun(t) checkSumBefore := fs.Config.CheckSum
defer r.Finalise() defer func() { fs.Config.CheckSum = checkSumBefore }()
fstest.CheckListing(t, r.fremote, []fstest.Item{}) check := func() {
r := NewRun(t)
defer r.Finalise()
data1 := "this is some really nice test data" fstest.CheckListing(t, r.fremote, []fstest.Item{})
path1 := "small_file_from_pipe"
data2 := string(make([]byte, 100*1024+1)) data1 := "this is some really nice test data"
path2 := "big_file_from_pipe" path1 := "small_file_from_pipe"
in := ioutil.NopCloser(strings.NewReader(data1)) data2 := string(make([]byte, fs.Config.StreamingUploadCutoff+1))
err := fs.Rcat(r.fremote, path1, in, t1) path2 := "big_file_from_pipe"
require.NoError(t, err)
in = ioutil.NopCloser(strings.NewReader(data2)) in := ioutil.NopCloser(strings.NewReader(data1))
err = fs.Rcat(r.fremote, path2, in, t2) err := fs.Rcat(r.fremote, path1, in, t1)
require.NoError(t, err) require.NoError(t, err)
file1 := fstest.NewItem(path1, data1, t1) in = ioutil.NopCloser(strings.NewReader(data2))
file2 := fstest.NewItem(path2, data2, t2) err = fs.Rcat(r.fremote, path2, in, t2)
fstest.CheckItems(t, r.fremote, file1, file2) require.NoError(t, err)
file1 := fstest.NewItem(path1, data1, t1)
file2 := fstest.NewItem(path2, data2, t2)
fstest.CheckItems(t, r.fremote, file1, file2)
}
fs.Config.CheckSum = true
check()
fs.Config.CheckSum = false
check()
} }
func TestRmdirs(t *testing.T) { func TestRmdirs(t *testing.T) {