diff --git a/docs/content/docs.md b/docs/content/docs.md index fe68b65b7..8dc82d96e 100644 --- a/docs/content/docs.md +++ b/docs/content/docs.md @@ -1511,6 +1511,25 @@ if you are reading and writing to an OS X filing system this will be This command line flag allows you to override that computed default. +### --multi-thread-write-buffer-size=SIZE ### + +When downloading with multiple threads, rclone will buffer SIZE bytes in +memory before writing to disk for each thread. + +This can improve performance if the underlying filesystem does not deal +well with a lot of small writes in different positions of the file, so +if you see downloads being limited by disk write speed, you might want +to experiment with different values. Specially for magnetic drives and +remote file systems a higher value can be useful. + +Nevertheless, the default of `128k` should be fine for almost all use +cases, so before changing it ensure that network is not really your +bottleneck. + +As a final hint, size is not the only factor: block size (or similar +concept) can have an impact. In one case, we observed that exact +multiples of 16k performed much better than other values. + ### --multi-thread-cutoff=SIZE ### When downloading files to the local backend above this size, rclone diff --git a/fs/config.go b/fs/config.go index a6de70b85..060567447 100644 --- a/fs/config.go +++ b/fs/config.go @@ -51,101 +51,102 @@ var ( // ConfigInfo is filesystem config options type ConfigInfo struct { - LogLevel LogLevel - StatsLogLevel LogLevel - UseJSONLog bool - DryRun bool - Interactive bool - CheckSum bool - SizeOnly bool - IgnoreTimes bool - IgnoreExisting bool - IgnoreErrors bool - ModifyWindow time.Duration - Checkers int - Transfers int - ConnectTimeout time.Duration // Connect timeout - Timeout time.Duration // Data channel timeout - ExpectContinueTimeout time.Duration - Dump DumpFlags - InsecureSkipVerify bool // Skip server certificate verification - DeleteMode DeleteMode - MaxDelete int64 - MaxDeleteSize SizeSuffix - TrackRenames bool // Track file renames. - TrackRenamesStrategy string // Comma separated list of strategies used to track renames - LowLevelRetries int - UpdateOlder bool // Skip files that are newer on the destination - NoGzip bool // Disable compression - MaxDepth int - IgnoreSize bool - IgnoreChecksum bool - IgnoreCaseSync bool - NoTraverse bool - CheckFirst bool - NoCheckDest bool - NoUnicodeNormalization bool - NoUpdateModTime bool - DataRateUnit string - CompareDest []string - CopyDest []string - BackupDir string - Suffix string - SuffixKeepExtension bool - UseListR bool - BufferSize SizeSuffix - BwLimit BwTimetable - BwLimitFile BwTimetable - TPSLimit float64 - TPSLimitBurst int - BindAddr net.IP - DisableFeatures []string - UserAgent string - Immutable bool - AutoConfirm bool - StreamingUploadCutoff SizeSuffix - StatsFileNameLength int - AskPassword bool - PasswordCommand SpaceSepList - UseServerModTime bool - MaxTransfer SizeSuffix - MaxDuration time.Duration - CutoffMode CutoffMode - MaxBacklog int - MaxStatsGroups int - StatsOneLine bool - StatsOneLineDate bool // If we want a date prefix at all - StatsOneLineDateFormat string // If we want to customize the prefix - ErrorOnNoTransfer bool // Set appropriate exit code if no files transferred - Progress bool - ProgressTerminalTitle bool - Cookie bool - UseMmap bool - CaCert []string // Client Side CA - ClientCert string // Client Side Cert - ClientKey string // Client Side Key - MultiThreadCutoff SizeSuffix - MultiThreadStreams int - MultiThreadSet bool // whether MultiThreadStreams was set (set in fs/config/configflags) - OrderBy string // instructions on how to order the transfer - UploadHeaders []*HTTPOption - DownloadHeaders []*HTTPOption - Headers []*HTTPOption - MetadataSet Metadata // extra metadata to write when uploading - RefreshTimes bool - NoConsole bool - TrafficClass uint8 - FsCacheExpireDuration time.Duration - FsCacheExpireInterval time.Duration - DisableHTTP2 bool - HumanReadable bool - KvLockTime time.Duration // maximum time to keep key-value database locked by process - DisableHTTPKeepAlives bool - Metadata bool - ServerSideAcrossConfigs bool - TerminalColorMode TerminalColorMode - DefaultTime Time // time that directories with no time should display - Inplace bool // Download directly to destination file instead of atomic download to temp/rename + LogLevel LogLevel + StatsLogLevel LogLevel + UseJSONLog bool + DryRun bool + Interactive bool + CheckSum bool + SizeOnly bool + IgnoreTimes bool + IgnoreExisting bool + IgnoreErrors bool + ModifyWindow time.Duration + Checkers int + Transfers int + ConnectTimeout time.Duration // Connect timeout + Timeout time.Duration // Data channel timeout + ExpectContinueTimeout time.Duration + Dump DumpFlags + InsecureSkipVerify bool // Skip server certificate verification + DeleteMode DeleteMode + MaxDelete int64 + MaxDeleteSize SizeSuffix + TrackRenames bool // Track file renames. + TrackRenamesStrategy string // Comma separated list of strategies used to track renames + LowLevelRetries int + UpdateOlder bool // Skip files that are newer on the destination + NoGzip bool // Disable compression + MaxDepth int + IgnoreSize bool + IgnoreChecksum bool + IgnoreCaseSync bool + NoTraverse bool + CheckFirst bool + NoCheckDest bool + NoUnicodeNormalization bool + NoUpdateModTime bool + DataRateUnit string + CompareDest []string + CopyDest []string + BackupDir string + Suffix string + SuffixKeepExtension bool + UseListR bool + BufferSize SizeSuffix + MultiThreadWriteBufferSize SizeSuffix + BwLimit BwTimetable + BwLimitFile BwTimetable + TPSLimit float64 + TPSLimitBurst int + BindAddr net.IP + DisableFeatures []string + UserAgent string + Immutable bool + AutoConfirm bool + StreamingUploadCutoff SizeSuffix + StatsFileNameLength int + AskPassword bool + PasswordCommand SpaceSepList + UseServerModTime bool + MaxTransfer SizeSuffix + MaxDuration time.Duration + CutoffMode CutoffMode + MaxBacklog int + MaxStatsGroups int + StatsOneLine bool + StatsOneLineDate bool // If we want a date prefix at all + StatsOneLineDateFormat string // If we want to customize the prefix + ErrorOnNoTransfer bool // Set appropriate exit code if no files transferred + Progress bool + ProgressTerminalTitle bool + Cookie bool + UseMmap bool + CaCert []string // Client Side CA + ClientCert string // Client Side Cert + ClientKey string // Client Side Key + MultiThreadCutoff SizeSuffix + MultiThreadStreams int + MultiThreadSet bool // whether MultiThreadStreams was set (set in fs/config/configflags) + OrderBy string // instructions on how to order the transfer + UploadHeaders []*HTTPOption + DownloadHeaders []*HTTPOption + Headers []*HTTPOption + MetadataSet Metadata // extra metadata to write when uploading + RefreshTimes bool + NoConsole bool + TrafficClass uint8 + FsCacheExpireDuration time.Duration + FsCacheExpireInterval time.Duration + DisableHTTP2 bool + HumanReadable bool + KvLockTime time.Duration // maximum time to keep key-value database locked by process + DisableHTTPKeepAlives bool + Metadata bool + ServerSideAcrossConfigs bool + TerminalColorMode TerminalColorMode + DefaultTime Time // time that directories with no time should display + Inplace bool // Download directly to destination file instead of atomic download to temp/rename } // NewConfig creates a new config with everything set to the default @@ -170,6 +171,7 @@ func NewConfig() *ConfigInfo { c.MaxDepth = -1 c.DataRateUnit = "bytes" c.BufferSize = SizeSuffix(16 << 20) + c.MultiThreadWriteBufferSize = SizeSuffix(128 * 1024) c.UserAgent = "rclone/" + Version c.StreamingUploadCutoff = SizeSuffix(100 * 1024) c.MaxStatsGroups = 1000 diff --git a/fs/config/configflags/configflags.go b/fs/config/configflags/configflags.go index 60047227e..a3eeedf70 100644 --- a/fs/config/configflags/configflags.go +++ b/fs/config/configflags/configflags.go @@ -126,6 +126,7 @@ func AddFlags(ci *fs.ConfigInfo, flagSet *pflag.FlagSet) { flags.StringVarP(flagSet, &ci.ClientKey, "client-key", "", ci.ClientKey, "Client SSL private key (PEM) for mutual TLS auth") flags.FVarP(flagSet, &ci.MultiThreadCutoff, "multi-thread-cutoff", "", "Use multi-thread downloads for files above this size") flags.IntVarP(flagSet, &ci.MultiThreadStreams, "multi-thread-streams", "", ci.MultiThreadStreams, "Max number of streams to use for multi-thread downloads") + flags.FVarP(flagSet, &ci.MultiThreadWriteBufferSize, "multi-thread-write-buffer-size", "", "In memory buffer size for writing when in multi-thread mode") flags.BoolVarP(flagSet, &ci.UseJSONLog, "use-json-log", "", ci.UseJSONLog, "Use json log format") flags.StringVarP(flagSet, &ci.OrderBy, "order-by", "", ci.OrderBy, "Instructions on how to order the transfers, e.g. 'size,descending'") flags.StringArrayVarP(flagSet, &uploadHeaders, "header-upload", "", nil, "Set HTTP header for upload transactions") diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go index b4b79a5a6..5ab41b9a2 100644 --- a/fs/operations/multithread.go +++ b/fs/operations/multithread.go @@ -1,6 +1,7 @@ package operations import ( + "bufio" "context" "errors" "fmt" @@ -12,11 +13,32 @@ import ( ) const ( - multithreadChunkSize = 64 << 10 - multithreadChunkSizeMask = multithreadChunkSize - 1 - multithreadBufferSize = 32 * 1024 + multithreadChunkSize = 64 << 10 + multithreadChunkSizeMask = multithreadChunkSize - 1 + multithreadReadBufferSize = 32 * 1024 ) +// An offsetWriter maps writes at offset base to offset base+off in the underlying writer. +// +// Modified from the go source code. Can be replaced with +// io.OffsetWriter when we no longer need to support go1.19 +type offsetWriter struct { + w io.WriterAt + off int64 // the current offset +} + +// newOffsetWriter returns an offsetWriter that writes to w +// starting at offset off. +func newOffsetWriter(w io.WriterAt, off int64) *offsetWriter { + return &offsetWriter{w, off} +} + +func (o *offsetWriter) Write(p []byte) (n int, err error) { + n, err = o.w.WriteAt(p, o.off) + o.off += int64(n) + return +} + // Return a boolean as to whether we should use multi thread copy for // this transfer func doMultiThreadCopy(ctx context.Context, f fs.Fs, src fs.Object) bool { @@ -62,6 +84,7 @@ type multiThreadCopyState struct { // Copy a single stream into place func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err error) { + ci := fs.GetConfig(ctx) defer func() { if err != nil { fs.Debugf(mc.src, "multi-thread copy: stream %d/%d failed: %v", stream+1, mc.streams, err) @@ -84,8 +107,13 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err } defer fs.CheckClose(rc, &err) + var writer io.Writer = newOffsetWriter(mc.wc, start) + if ci.MultiThreadWriteBufferSize > 0 { + writer = bufio.NewWriterSize(writer, int(ci.MultiThreadWriteBufferSize)) + fs.Debugf(mc.src, "multi-thread copy: write buffer set to %v", ci.MultiThreadWriteBufferSize) + } // Copy the data - buf := make([]byte, multithreadBufferSize) + buf := make([]byte, multithreadReadBufferSize) offset := start for { // Check if context cancelled and exit if so @@ -98,7 +126,7 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err if err != nil { return fmt.Errorf("multipart copy: accounting failed: %w", err) } - nw, ew := mc.wc.WriteAt(buf[0:nr], offset) + nw, ew := writer.Write(buf[0:nr]) if nw > 0 { offset += int64(nw) } @@ -113,6 +141,16 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err if er != io.EOF { return fmt.Errorf("multipart copy: read failed: %w", er) } + + // if we were buffering, flush do disk + switch w := writer.(type) { + case *bufio.Writer: + er2 := w.Flush() + if er2 != nil { + return fmt.Errorf("multipart copy: flush failed: %w", er2) + } + } + break } }