vfs: add option to read source files in chunks

This commit is contained in:
Fabian Möller 2018-02-18 15:18:12 +01:00 committed by Nick Craig-Wood
parent 9fdf273614
commit bd3ad1ac3e
4 changed files with 41 additions and 8 deletions

View File

@ -189,6 +189,25 @@ to use Type=notify. In this case the service will enter the started state
after the mountpoint has been successfully set up. after the mountpoint has been successfully set up.
Units having the rclone ` + commandName + ` service specified as a requirement Units having the rclone ` + commandName + ` service specified as a requirement
will see all files and folders immediately in this mode. will see all files and folders immediately in this mode.
### chunked reading ###
--vfs-read-chunk-size will enable reading the source objects in parts.
This can reduce the used download quota for some remotes by requesting only chunks
from the remote that are actually read at the cost of an increased number of requests.
When --vfs-read-chunk-size-limit is also specified and greater than --vfs-read-chunk-size,
the chunk size for each open file will get doubled for each chunk read, until the
specified value is reached. A value of -1 will disable the limit and the chunk size will
grow indefinitely.
With --vfs-read-chunk-size 100M and --vfs-read-chunk-size-limit 0 the following
parts will be downloaded: 0-100M, 100M-200M, 200M-300M, 300M-400M and so on.
When --vfs-read-chunk-size-limit 500M is specified, the result would be
0-100M, 100M-300M, 300M-700M, 700M-1200M, 1200M-1700M and so on.
Chunked reading will only work with --vfs-cache-mode < full, as the file will always
be copied to the vfs cache before opening with --vfs-cache-mode full.
` + vfs.Help, ` + vfs.Help,
Run: func(command *cobra.Command, args []string) { Run: func(command *cobra.Command, args []string) {
cmd.CheckArgs(2, 2, command, args) cmd.CheckArgs(2, 2, command, args)

View File

@ -7,6 +7,7 @@ import (
"github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fs/accounting" "github.com/ncw/rclone/fs/accounting"
"github.com/ncw/rclone/fs/chunkedreader"
"github.com/ncw/rclone/fs/hash" "github.com/ncw/rclone/fs/hash"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -64,7 +65,7 @@ func (fh *ReadFileHandle) openPending() (err error) {
return nil return nil
} }
o := fh.file.getObject() o := fh.file.getObject()
r, err := o.Open() r, err := chunkedreader.New(o, int64(fh.file.d.vfs.Opt.ChunkSize), int64(fh.file.d.vfs.Opt.ChunkSizeLimit)).Open()
if err != nil { if err != nil {
return err return err
} }
@ -106,13 +107,16 @@ func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) {
fh.r.StopBuffering() // stop the background reading first fh.r.StopBuffering() // stop the background reading first
fh.hash = nil fh.hash = nil
oldReader := fh.r.GetReader() oldReader := fh.r.GetReader()
r := oldReader r, ok := oldReader.(*chunkedreader.ChunkedReader)
// Can we seek it directly? if !ok {
if do, ok := oldReader.(io.Seeker); !reopen && ok { fs.Logf(fh.remote, "ReadFileHandle.Read expected reader to be a ChunkedReader, got %T", oldReader)
fs.Debugf(fh.remote, "ReadFileHandle.seek from %d to %d (io.Seeker)", fh.offset, offset) reopen = true
_, err = do.Seek(offset, io.SeekStart) }
if !reopen {
fs.Debugf(fh.remote, "ReadFileHandle.seek from %d to %d (fs.RangeSeeker)", fh.offset, offset)
_, err = r.RangeSeek(offset, io.SeekStart, -1)
if err != nil { if err != nil {
fs.Debugf(fh.remote, "ReadFileHandle.Read io.Seeker failed: %v", err) fs.Debugf(fh.remote, "ReadFileHandle.Read fs.RangeSeeker failed: %v", err)
return err return err
} }
} else { } else {
@ -124,7 +128,13 @@ func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) {
} }
// re-open with a seek // re-open with a seek
o := fh.file.getObject() o := fh.file.getObject()
r, err = o.Open(&fs.SeekOption{Offset: offset}) r = chunkedreader.New(o, int64(fh.file.d.vfs.Opt.ChunkSize), int64(fh.file.d.vfs.Opt.ChunkSizeLimit))
_, err := r.Seek(offset, 0)
if err != nil {
fs.Debugf(fh.remote, "ReadFileHandle.Read seek failed: %v", err)
return err
}
r, err = r.Open()
if err != nil { if err != nil {
fs.Debugf(fh.remote, "ReadFileHandle.Read seek failed: %v", err) fs.Debugf(fh.remote, "ReadFileHandle.Read seek failed: %v", err)
return err return err

View File

@ -189,6 +189,8 @@ type Options struct {
GID uint32 GID uint32
DirPerms os.FileMode DirPerms os.FileMode
FilePerms os.FileMode FilePerms os.FileMode
ChunkSize fs.SizeSuffix // if > 0 read files in chunks
ChunkSizeLimit fs.SizeSuffix // if > ChunkSize double the chunk size after each chunk until reached
CacheMode CacheMode CacheMode CacheMode
CacheMaxAge time.Duration CacheMaxAge time.Duration
CachePollInterval time.Duration CachePollInterval time.Duration

View File

@ -23,5 +23,7 @@ func AddFlags(flagSet *pflag.FlagSet) {
flags.FVarP(flagSet, &Opt.CacheMode, "vfs-cache-mode", "", "Cache mode off|minimal|writes|full") flags.FVarP(flagSet, &Opt.CacheMode, "vfs-cache-mode", "", "Cache mode off|minimal|writes|full")
flags.DurationVarP(flagSet, &Opt.CachePollInterval, "vfs-cache-poll-interval", "", Opt.CachePollInterval, "Interval to poll the cache for stale objects.") flags.DurationVarP(flagSet, &Opt.CachePollInterval, "vfs-cache-poll-interval", "", Opt.CachePollInterval, "Interval to poll the cache for stale objects.")
flags.DurationVarP(flagSet, &Opt.CacheMaxAge, "vfs-cache-max-age", "", Opt.CacheMaxAge, "Max age of objects in the cache.") flags.DurationVarP(flagSet, &Opt.CacheMaxAge, "vfs-cache-max-age", "", Opt.CacheMaxAge, "Max age of objects in the cache.")
flags.FVarP(flagSet, &Opt.ChunkSize, "vfs-read-chunk-size", "", "Read the source objects in chunks.")
flags.FVarP(flagSet, &Opt.ChunkSizeLimit, "vfs-read-chunk-size-limit", "", "If greater than --vfs-read-chunk-size, double the chunk size after each chunk read, until the limit is reached. -1 is unlimited.")
platformFlags(flagSet) platformFlags(flagSet)
} }