chunkedreader: add --vfs-read-chunk-streams to parallel read chunks

This converts the ChunkedReader into an interface and provides two
implementations one sequential and one parallel.

This can be used to improve the performance of the VFS on high
bandwidth or high latency links.

Fixes #4760
This commit is contained in:
Nick Craig-Wood 2024-03-12 16:57:16 +00:00
parent 10270a4354
commit 27b281ef69
11 changed files with 835 additions and 236 deletions

View File

@ -38,6 +38,7 @@ import (
const ( const (
initialChunkSize = 262144 // Initial and max sizes of chunks when reading parts of the file. Currently initialChunkSize = 262144 // Initial and max sizes of chunks when reading parts of the file. Currently
maxChunkSize = 8388608 // at 256 KiB and 8 MiB. maxChunkSize = 8388608 // at 256 KiB and 8 MiB.
chunkStreams = 0 // Streams to use for reading
bufferSize = 8388608 bufferSize = 8388608
heuristicBytes = 1048576 heuristicBytes = 1048576
@ -1362,7 +1363,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (rc io.Read
} }
} }
// Get a chunkedreader for the wrapped object // Get a chunkedreader for the wrapped object
chunkedReader := chunkedreader.New(ctx, o.Object, initialChunkSize, maxChunkSize) chunkedReader := chunkedreader.New(ctx, o.Object, initialChunkSize, maxChunkSize, chunkStreams)
// Get file handle // Get file handle
var file io.Reader var file io.Reader
if offset != 0 { if offset != 0 {

View File

@ -1,14 +1,12 @@
// Package chunkedreader provides functionality for reading in chunks. // Package chunkedreader provides functionality for reading a stream in chunks.
package chunkedreader package chunkedreader
import ( import (
"context" "context"
"errors" "errors"
"io" "io"
"sync"
"github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/hash"
) )
// io related errors returned by ChunkedReader // io related errors returned by ChunkedReader
@ -17,22 +15,13 @@ var (
ErrorInvalidSeek = errors.New("invalid seek position") ErrorInvalidSeek = errors.New("invalid seek position")
) )
// ChunkedReader is a reader for an Object with the possibility // ChunkedReader describes what a chunked reader can do.
// of reading the source in chunks of given size type ChunkedReader interface {
// io.Reader
// An initialChunkSize of <= 0 will disable chunked reading. io.Seeker
type ChunkedReader struct { io.Closer
ctx context.Context fs.RangeSeeker
mu sync.Mutex // protects following fields Open() (ChunkedReader, error)
o fs.Object // source to read from
rc io.ReadCloser // reader for the current open chunk
offset int64 // offset the next Read will start. -1 forces a reopen of o
chunkOffset int64 // beginning of the current or next chunk
chunkSize int64 // length of the current or next chunk. -1 will open o from chunkOffset to the end
initialChunkSize int64 // default chunkSize after the chunk specified by RangeSeek is complete
maxChunkSize int64 // consecutive read chunks will double in size until reached. -1 means no limit
customChunkSize bool // is the current chunkSize set by RangeSeek?
closed bool // has Close been called?
} }
// New returns a ChunkedReader for the Object. // New returns a ChunkedReader for the Object.
@ -41,210 +30,18 @@ type ChunkedReader struct {
// If maxChunkSize is greater than initialChunkSize, the chunk size will be // If maxChunkSize is greater than initialChunkSize, the chunk size will be
// doubled after each chunk read with a maximum of maxChunkSize. // doubled after each chunk read with a maximum of maxChunkSize.
// A Seek or RangeSeek will reset the chunk size to it's initial value // A Seek or RangeSeek will reset the chunk size to it's initial value
func New(ctx context.Context, o fs.Object, initialChunkSize int64, maxChunkSize int64) *ChunkedReader { func New(ctx context.Context, o fs.Object, initialChunkSize int64, maxChunkSize int64, streams int) ChunkedReader {
if initialChunkSize <= 0 { if initialChunkSize <= 0 {
initialChunkSize = -1 initialChunkSize = -1
} }
if maxChunkSize != -1 && maxChunkSize < initialChunkSize { if maxChunkSize != -1 && maxChunkSize < initialChunkSize {
maxChunkSize = initialChunkSize maxChunkSize = initialChunkSize
} }
return &ChunkedReader{ if streams < 0 {
ctx: ctx, streams = 0
o: o,
offset: -1,
chunkSize: initialChunkSize,
initialChunkSize: initialChunkSize,
maxChunkSize: maxChunkSize,
} }
if streams <= 1 || o.Size() < 0 {
return newSequential(ctx, o, initialChunkSize, maxChunkSize)
}
return newParallel(ctx, o, initialChunkSize, streams)
} }
// Read from the file - for details see io.Reader
func (cr *ChunkedReader) Read(p []byte) (n int, err error) {
cr.mu.Lock()
defer cr.mu.Unlock()
if cr.closed {
return 0, ErrorFileClosed
}
for reqSize := int64(len(p)); reqSize > 0; reqSize = int64(len(p)) {
// the current chunk boundary. valid only when chunkSize > 0
chunkEnd := cr.chunkOffset + cr.chunkSize
fs.Debugf(cr.o, "ChunkedReader.Read at %d length %d chunkOffset %d chunkSize %d", cr.offset, reqSize, cr.chunkOffset, cr.chunkSize)
switch {
case cr.chunkSize > 0 && cr.offset == chunkEnd: // last chunk read completely
cr.chunkOffset = cr.offset
if cr.customChunkSize { // last chunkSize was set by RangeSeek
cr.customChunkSize = false
cr.chunkSize = cr.initialChunkSize
} else {
cr.chunkSize *= 2
if cr.chunkSize > cr.maxChunkSize && cr.maxChunkSize != -1 {
cr.chunkSize = cr.maxChunkSize
}
}
// recalculate the chunk boundary. valid only when chunkSize > 0
chunkEnd = cr.chunkOffset + cr.chunkSize
fallthrough
case cr.offset == -1: // first Read or Read after RangeSeek
err = cr.openRange()
if err != nil {
return
}
}
var buf []byte
chunkRest := chunkEnd - cr.offset
// limit read to chunk boundaries if chunkSize > 0
if reqSize > chunkRest && cr.chunkSize > 0 {
buf, p = p[0:chunkRest], p[chunkRest:]
} else {
buf, p = p, nil
}
var rn int
rn, err = io.ReadFull(cr.rc, buf)
n += rn
cr.offset += int64(rn)
if err != nil {
if err == io.ErrUnexpectedEOF {
err = io.EOF
}
return
}
}
return n, nil
}
// Close the file - for details see io.Closer
//
// All methods on ChunkedReader will return ErrorFileClosed afterwards
func (cr *ChunkedReader) Close() error {
cr.mu.Lock()
defer cr.mu.Unlock()
if cr.closed {
return ErrorFileClosed
}
cr.closed = true
return cr.resetReader(nil, 0)
}
// Seek the file - for details see io.Seeker
func (cr *ChunkedReader) Seek(offset int64, whence int) (int64, error) {
return cr.RangeSeek(context.TODO(), offset, whence, -1)
}
// RangeSeek the file - for details see RangeSeeker
//
// The specified length will only apply to the next chunk opened.
// RangeSeek will not reopen the source until Read is called.
func (cr *ChunkedReader) RangeSeek(ctx context.Context, offset int64, whence int, length int64) (int64, error) {
cr.mu.Lock()
defer cr.mu.Unlock()
fs.Debugf(cr.o, "ChunkedReader.RangeSeek from %d to %d length %d", cr.offset, offset, length)
if cr.closed {
return 0, ErrorFileClosed
}
size := cr.o.Size()
switch whence {
case io.SeekStart:
cr.offset = 0
case io.SeekEnd:
cr.offset = size
}
// set the new chunk start
cr.chunkOffset = cr.offset + offset
// force reopen on next Read
cr.offset = -1
if length > 0 {
cr.customChunkSize = true
cr.chunkSize = length
} else {
cr.chunkSize = cr.initialChunkSize
}
if cr.chunkOffset < 0 || cr.chunkOffset >= size {
cr.chunkOffset = 0
return 0, ErrorInvalidSeek
}
return cr.chunkOffset, nil
}
// Open forces the connection to be opened
func (cr *ChunkedReader) Open() (*ChunkedReader, error) {
cr.mu.Lock()
defer cr.mu.Unlock()
if cr.rc != nil && cr.offset != -1 {
return cr, nil
}
return cr, cr.openRange()
}
// openRange will open the source Object with the current chunk range
//
// If the current open reader implements RangeSeeker, it is tried first.
// When RangeSeek fails, o.Open with a RangeOption is used.
//
// A length <= 0 will request till the end of the file
func (cr *ChunkedReader) openRange() error {
offset, length := cr.chunkOffset, cr.chunkSize
fs.Debugf(cr.o, "ChunkedReader.openRange at %d length %d", offset, length)
if cr.closed {
return ErrorFileClosed
}
if rs, ok := cr.rc.(fs.RangeSeeker); ok {
n, err := rs.RangeSeek(cr.ctx, offset, io.SeekStart, length)
if err == nil && n == offset {
cr.offset = offset
return nil
}
if err != nil {
fs.Debugf(cr.o, "ChunkedReader.openRange seek failed (%s). Trying Open", err)
} else {
fs.Debugf(cr.o, "ChunkedReader.openRange seeked to wrong offset. Wanted %d, got %d. Trying Open", offset, n)
}
}
var rc io.ReadCloser
var err error
if length <= 0 {
if offset == 0 {
rc, err = cr.o.Open(cr.ctx, &fs.HashesOption{Hashes: hash.Set(hash.None)})
} else {
rc, err = cr.o.Open(cr.ctx, &fs.HashesOption{Hashes: hash.Set(hash.None)}, &fs.RangeOption{Start: offset, End: -1})
}
} else {
rc, err = cr.o.Open(cr.ctx, &fs.HashesOption{Hashes: hash.Set(hash.None)}, &fs.RangeOption{Start: offset, End: offset + length - 1})
}
if err != nil {
return err
}
return cr.resetReader(rc, offset)
}
// resetReader switches the current reader to the given reader.
// The old reader will be Close'd before setting the new reader.
func (cr *ChunkedReader) resetReader(rc io.ReadCloser, offset int64) error {
if cr.rc != nil {
if err := cr.rc.Close(); err != nil {
return err
}
}
cr.rc = rc
cr.offset = offset
return nil
}
var (
_ io.ReadCloser = (*ChunkedReader)(nil)
_ io.Seeker = (*ChunkedReader)(nil)
_ fs.RangeSeeker = (*ChunkedReader)(nil)
)

View File

@ -7,21 +7,47 @@ import (
"math/rand" "math/rand"
"testing" "testing"
"github.com/rclone/rclone/fstest"
"github.com/rclone/rclone/fstest/mockobject" "github.com/rclone/rclone/fstest/mockobject"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestChunkedReader(t *testing.T) { // TestMain drives the tests
content := makeContent(t, 1024) func TestMain(m *testing.M) {
fstest.TestMain(m)
}
for _, mode := range mockobject.SeekModes { func TestChunkedReader(t *testing.T) {
t.Run(mode.String(), testRead(content, mode)) ctx := context.Background()
o := mockobject.New("test.bin").WithContent([]byte("hello"), mockobject.SeekModeRegular)
const MB = 1024 * 1024
for _, test := range []struct {
initialChunkSize int64
maxChunkSize int64
streams int
crType any
unknownSize bool
}{
{-1, MB, 0, new(sequential), false},
{MB, 10 * MB, 0, new(sequential), false},
{MB, 10 * MB, 1, new(sequential), false},
{MB, 10 * MB, 1, new(sequential), true},
{MB, 10 * MB, 2, new(parallel), false},
{MB, 10 * MB, 2, new(sequential), true},
} {
what := fmt.Sprintf("%+v", test)
o.SetUnknownSize(test.unknownSize)
cr := New(ctx, o, test.initialChunkSize, test.maxChunkSize, test.streams)
assert.IsType(t, test.crType, cr, what)
require.NoError(t, cr.Close(), what)
} }
} }
func testRead(content []byte, mode mockobject.SeekMode) func(*testing.T) { func testRead(content []byte, mode mockobject.SeekMode, streams int) func(*testing.T) {
return func(t *testing.T) { return func(t *testing.T) {
ctx := context.Background()
chunkSizes := []int64{-1, 0, 1, 15, 16, 17, 1023, 1024, 1025, 2000} chunkSizes := []int64{-1, 0, 1, 15, 16, 17, 1023, 1024, 1025, 2000}
offsets := []int64{0, 1, 2, 3, 4, 5, 7, 8, 9, 15, 16, 17, 31, 32, 33, offsets := []int64{0, 1, 2, 3, 4, 5, 7, 8, 9, 15, 16, 17, 31, 32, 33,
63, 64, 65, 511, 512, 513, 1023, 1024, 1025} 63, 64, 65, 511, 512, 513, 1023, 1024, 1025}
@ -39,13 +65,13 @@ func testRead(content []byte, mode mockobject.SeekMode) func(*testing.T) {
} }
t.Run(fmt.Sprintf("Chunksize_%d_%d", cs, csMax), func(t *testing.T) { t.Run(fmt.Sprintf("Chunksize_%d_%d", cs, csMax), func(t *testing.T) {
cr := New(context.Background(), o, cs, csMax) cr := New(ctx, o, cs, csMax, streams)
for _, offset := range offsets { for _, offset := range offsets {
for _, limit := range limits { for _, limit := range limits {
what := fmt.Sprintf("offset %d, limit %d", offset, limit) what := fmt.Sprintf("offset %d, limit %d", offset, limit)
p, err := cr.RangeSeek(context.Background(), offset, io.SeekStart, limit) p, err := cr.RangeSeek(ctx, offset, io.SeekStart, limit)
if offset >= cl { if offset >= cl {
require.Error(t, err, what) require.Error(t, err, what)
return return
@ -74,32 +100,33 @@ func testRead(content []byte, mode mockobject.SeekMode) func(*testing.T) {
} }
} }
func TestErrorAfterClose(t *testing.T) { func testErrorAfterClose(t *testing.T, streams int) {
ctx := context.Background()
content := makeContent(t, 1024) content := makeContent(t, 1024)
o := mockobject.New("test.bin").WithContent(content, mockobject.SeekModeNone) o := mockobject.New("test.bin").WithContent(content, mockobject.SeekModeNone)
// Close // Close
cr := New(context.Background(), o, 0, 0) cr := New(ctx, o, 0, 0, streams)
require.NoError(t, cr.Close()) require.NoError(t, cr.Close())
require.Error(t, cr.Close()) require.Error(t, cr.Close())
// Read // Read
cr = New(context.Background(), o, 0, 0) cr = New(ctx, o, 0, 0, streams)
require.NoError(t, cr.Close()) require.NoError(t, cr.Close())
var buf [1]byte var buf [1]byte
_, err := cr.Read(buf[:]) _, err := cr.Read(buf[:])
require.Error(t, err) require.Error(t, err)
// Seek // Seek
cr = New(context.Background(), o, 0, 0) cr = New(ctx, o, 0, 0, streams)
require.NoError(t, cr.Close()) require.NoError(t, cr.Close())
_, err = cr.Seek(1, io.SeekCurrent) _, err = cr.Seek(1, io.SeekCurrent)
require.Error(t, err) require.Error(t, err)
// RangeSeek // RangeSeek
cr = New(context.Background(), o, 0, 0) cr = New(ctx, o, 0, 0, streams)
require.NoError(t, cr.Close()) require.NoError(t, cr.Close())
_, err = cr.RangeSeek(context.Background(), 1, io.SeekCurrent, 0) _, err = cr.RangeSeek(ctx, 1, io.SeekCurrent, 0)
require.Error(t, err) require.Error(t, err)
} }

View File

@ -0,0 +1,383 @@
package chunkedreader
import (
"context"
"fmt"
"io"
"sync"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/log"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/lib/multipart"
"github.com/rclone/rclone/lib/pool"
)
// parallel reads Object in chunks of a given size in parallel.
type parallel struct {
ctx context.Context
o fs.Object // source to read from
mu sync.Mutex // protects following fields
endStream int64 // offset we have started streams for
offset int64 // offset the read file pointer is at
chunkSize int64 // length of the chunks to read
nstreams int // number of streams to use
streams []*stream // the opened streams in offset order - the current one is first
closed bool // has Close been called?
}
// stream holds the info about a single download
type stream struct {
cr *parallel // parent reader
ctx context.Context // ctx to cancel if needed
cancel func() // cancel the stream
rc io.ReadCloser // reader that it is reading from, may be nil
offset int64 // where the stream is reading from
size int64 // and the size it is reading
readBytes int64 // bytes read from the stream
rw *pool.RW // buffer for read
err chan error // error returned from the read
name string // name of this stream for debugging
}
// Start a stream reading (offset, offset+size)
func (cr *parallel) newStream(ctx context.Context, offset, size int64) (s *stream, err error) {
ctx, cancel := context.WithCancel(ctx)
// Create the stream
rw := multipart.NewRW()
s = &stream{
cr: cr,
ctx: ctx,
cancel: cancel,
offset: offset,
size: size,
rw: rw,
err: make(chan error, 1),
}
s.name = fmt.Sprintf("stream(%d,%d,%p)", s.offset, s.size, s)
// Start the background read into the buffer
go s.readFrom(ctx)
// Return the stream to the caller
return s, nil
}
// read the file into the buffer
func (s *stream) readFrom(ctx context.Context) {
// Open the object at the correct range
fs.Debugf(s.cr.o, "%s: open", s.name)
rc, err := operations.Open(ctx, s.cr.o,
&fs.HashesOption{Hashes: hash.Set(hash.None)},
&fs.RangeOption{Start: s.offset, End: s.offset + s.size - 1})
if err != nil {
s.err <- fmt.Errorf("parallel chunked reader: failed to open stream at %d size %d: %w", s.offset, s.size, err)
return
}
s.rc = rc
fs.Debugf(s.cr.o, "%s: readfrom started", s.name)
_, err = s.rw.ReadFrom(s.rc)
fs.Debugf(s.cr.o, "%s: readfrom finished (%d bytes): %v", s.name, s.rw.Size(), err)
s.err <- err
}
// eof is true when we've read all the data we are expecting
func (s *stream) eof() bool {
return s.readBytes >= s.size
}
// read reads up to len(p) bytes into p. It returns the number of
// bytes read (0 <= n <= len(p)) and any error encountered. If some
// data is available but not len(p) bytes, read returns what is
// available instead of waiting for more.
func (s *stream) read(p []byte) (n int, err error) {
defer log.Trace(s.cr.o, "%s: Read len(p)=%d", s.name, len(p))("n=%d, err=%v", &n, &err)
if len(p) == 0 {
return n, nil
}
for {
var nn int
nn, err = s.rw.Read(p[n:])
fs.Debugf(s.cr.o, "%s: rw.Read nn=%d, err=%v", s.name, nn, err)
s.readBytes += int64(nn)
n += nn
if err != nil && err != io.EOF {
return n, err
}
if s.eof() {
return n, io.EOF
}
// Received a faux io.EOF because we haven't read all the data yet
if n >= len(p) {
break
}
// Wait for a write to happen to read more
s.rw.WaitWrite(s.ctx)
}
return n, nil
}
// Sets *perr to newErr if err is nil
func orErr(perr *error, newErr error) {
if *perr == nil {
*perr = newErr
}
}
// Close a stream
func (s *stream) close() (err error) {
defer log.Trace(s.cr.o, "%s: close", s.name)("err=%v", &err)
s.cancel()
err = <-s.err // wait for readFrom to stop and return error
orErr(&err, s.rw.Close())
if s.rc != nil {
orErr(&err, s.rc.Close())
}
if err != nil && err != io.EOF {
return fmt.Errorf("parallel chunked reader: failed to read stream at %d size %d: %w", s.offset, s.size, err)
}
return nil
}
// Make a new parallel chunked reader
//
// Mustn't be called for an unknown size object
func newParallel(ctx context.Context, o fs.Object, chunkSize int64, streams int) ChunkedReader {
// Make sure chunkSize is a multiple of multipart.BufferSize
if chunkSize < 0 {
chunkSize = multipart.BufferSize
}
newChunkSize := multipart.BufferSize * (chunkSize / multipart.BufferSize)
if newChunkSize < chunkSize {
newChunkSize += multipart.BufferSize
}
fs.Debugf(o, "newParallel chunkSize=%d, streams=%d", chunkSize, streams)
return &parallel{
ctx: ctx,
o: o,
offset: 0,
chunkSize: newChunkSize,
nstreams: streams,
}
}
// _open starts the file transferring at offset
//
// Call with the lock held
func (cr *parallel) _open() (err error) {
size := cr.o.Size()
if size < 0 {
return fmt.Errorf("parallel chunked reader: can't use multiple threads for unknown sized object %q", cr.o)
}
// Launched enough streams already
if cr.endStream >= size {
return nil
}
// Make sure cr.nstreams are running
for i := len(cr.streams); i < cr.nstreams; i++ {
// clip to length of file
chunkSize := cr.chunkSize
newEndStream := cr.endStream + chunkSize
if newEndStream > size {
chunkSize = size - cr.endStream
newEndStream = cr.endStream + chunkSize
}
s, err := cr.newStream(cr.ctx, cr.endStream, chunkSize)
if err != nil {
return err
}
cr.streams = append(cr.streams, s)
cr.endStream = newEndStream
if cr.endStream >= size {
break
}
}
return nil
}
// Finished reading the current stream so pop it off and destroy it
//
// Call with lock held
func (cr *parallel) _popStream() (err error) {
defer log.Trace(cr.o, "streams=%+v", cr.streams)("streams=%+v, err=%v", &cr.streams, &err)
if len(cr.streams) == 0 {
return nil
}
stream := cr.streams[0]
err = stream.close()
cr.streams[0] = nil
cr.streams = cr.streams[1:]
return err
}
// Get rid of all the streams
//
// Call with lock held
func (cr *parallel) _popStreams() (err error) {
defer log.Trace(cr.o, "streams=%+v", cr.streams)("streams=%+v, err=%v", &cr.streams, &err)
for len(cr.streams) > 0 {
orErr(&err, cr._popStream())
}
cr.streams = nil
return err
}
// Read from the file - for details see io.Reader
func (cr *parallel) Read(p []byte) (n int, err error) {
defer log.Trace(cr.o, "Read len(p)=%d", len(p))("n=%d, err=%v", &n, &err)
cr.mu.Lock()
defer cr.mu.Unlock()
if cr.closed {
return 0, ErrorFileClosed
}
for n < len(p) {
// Make sure we have the correct number of streams open
err = cr._open()
if err != nil {
return n, err
}
// No streams left means EOF
if len(cr.streams) == 0 {
return n, io.EOF
}
// Read from the stream
stream := cr.streams[0]
nn, err := stream.read(p[n:])
n += nn
cr.offset += int64(nn)
if err == io.EOF {
err = cr._popStream()
if err != nil {
break
}
} else if err != nil {
break
}
}
return n, err
}
// Close the file - for details see io.Closer
//
// All methods on ChunkedReader will return ErrorFileClosed afterwards
func (cr *parallel) Close() error {
cr.mu.Lock()
defer cr.mu.Unlock()
if cr.closed {
return ErrorFileClosed
}
cr.closed = true
// Close all the streams
return cr._popStreams()
}
// Seek the file - for details see io.Seeker
func (cr *parallel) Seek(offset int64, whence int) (int64, error) {
cr.mu.Lock()
defer cr.mu.Unlock()
fs.Debugf(cr.o, "parallel chunked reader: seek from %d to %d whence %d", cr.offset, offset, whence)
if cr.closed {
return 0, ErrorFileClosed
}
size := cr.o.Size()
currentOffset := cr.offset
switch whence {
case io.SeekStart:
currentOffset = 0
case io.SeekEnd:
currentOffset = size
}
// set the new chunk start
newOffset := currentOffset + offset
if newOffset < 0 || newOffset >= size {
return 0, ErrorInvalidSeek
}
// If seek pointer didn't move, return now
if newOffset == cr.offset {
fs.Debugf(cr.o, "parallel chunked reader: seek pointer didn't move")
return cr.offset, nil
}
cr.offset = newOffset
// Ditch out of range streams
for len(cr.streams) > 0 {
stream := cr.streams[0]
if newOffset >= stream.offset+stream.size {
_ = cr._popStream()
} else {
break
}
}
// If no streams remain we can just restart
if len(cr.streams) == 0 {
fs.Debugf(cr.o, "parallel chunked reader: no streams remain")
cr.endStream = cr.offset
return cr.offset, nil
}
// Current stream
stream := cr.streams[0]
// If new offset is before current stream then ditch all the streams
if newOffset < stream.offset {
_ = cr._popStreams()
fs.Debugf(cr.o, "parallel chunked reader: new offset is before current stream - ditch all")
cr.endStream = cr.offset
return cr.offset, nil
}
// Seek the current stream
streamOffset := newOffset - stream.offset
stream.readBytes = streamOffset // correct read value
fs.Debugf(cr.o, "parallel chunked reader: seek the current stream to %d", streamOffset)
// Wait for the read to the correct part of the data
for stream.rw.Size() < streamOffset {
stream.rw.WaitWrite(cr.ctx)
}
_, err := stream.rw.Seek(streamOffset, io.SeekStart)
if err != nil {
return cr.offset, fmt.Errorf("parallel chunked reader: failed to seek stream: %w", err)
}
return cr.offset, nil
}
// RangeSeek the file - for details see RangeSeeker
//
// In the parallel chunked reader this just acts like Seek
func (cr *parallel) RangeSeek(ctx context.Context, offset int64, whence int, length int64) (int64, error) {
return cr.Seek(offset, whence)
}
// Open forces the connection to be opened
func (cr *parallel) Open() (ChunkedReader, error) {
cr.mu.Lock()
defer cr.mu.Unlock()
return cr, cr._open()
}
var (
_ ChunkedReader = (*parallel)(nil)
)

View File

@ -0,0 +1,102 @@
package chunkedreader
import (
"context"
"io"
"math/rand"
"testing"
"github.com/rclone/rclone/fstest/mockobject"
"github.com/rclone/rclone/lib/multipart"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestParallel(t *testing.T) {
content := makeContent(t, 1024)
for _, mode := range mockobject.SeekModes {
t.Run(mode.String(), testRead(content, mode, 3))
}
}
func TestParallelErrorAfterClose(t *testing.T) {
testErrorAfterClose(t, 3)
}
func TestParallelLarge(t *testing.T) {
ctx := context.Background()
const streams = 3
const chunkSize = multipart.BufferSize
const size = (2*streams+1)*chunkSize + 255
content := makeContent(t, size)
o := mockobject.New("test.bin").WithContent(content, mockobject.SeekModeNone)
cr := New(ctx, o, chunkSize, 0, streams)
for _, test := range []struct {
name string
offset int64
seekMode int
}{
{name: "Straight", offset: 0, seekMode: -1},
{name: "Rewind", offset: 0, seekMode: io.SeekStart},
{name: "NearStart", offset: 1, seekMode: io.SeekStart},
{name: "NearEnd", offset: size - 2*chunkSize - 127, seekMode: io.SeekEnd},
} {
t.Run(test.name, func(t *testing.T) {
if test.seekMode >= 0 {
var n int64
var err error
if test.seekMode == io.SeekEnd {
n, err = cr.Seek(test.offset-size, test.seekMode)
} else {
n, err = cr.Seek(test.offset, test.seekMode)
}
require.NoError(t, err)
assert.Equal(t, test.offset, n)
}
got, err := io.ReadAll(cr)
require.NoError(t, err)
require.Equal(t, len(content[test.offset:]), len(got))
assert.Equal(t, content[test.offset:], got)
})
}
require.NoError(t, cr.Close())
t.Run("Seeky", func(t *testing.T) {
cr := New(ctx, o, chunkSize, 0, streams)
offset := 0
buf := make([]byte, 1024)
for {
// Read and check a random read
readSize := rand.Intn(1024)
readBuf := buf[:readSize]
n, err := cr.Read(readBuf)
require.Equal(t, content[offset:offset+n], readBuf[:n])
offset += n
if err == io.EOF {
assert.Equal(t, size, offset)
break
}
require.NoError(t, err)
// Now do a smaller random seek backwards
seekSize := rand.Intn(512)
if offset-seekSize < 0 {
seekSize = offset
}
nn, err := cr.Seek(-int64(seekSize), io.SeekCurrent)
offset -= seekSize
require.NoError(t, err)
assert.Equal(t, nn, int64(offset))
}
require.NoError(t, cr.Close())
})
}

View File

@ -0,0 +1,232 @@
package chunkedreader
import (
"context"
"io"
"sync"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/hash"
)
// sequential is a reader for an Object with the possibility
// of reading the source in chunks of given size
//
// An initialChunkSize of <= 0 will disable chunked reading.
type sequential struct {
ctx context.Context
mu sync.Mutex // protects following fields
o fs.Object // source to read from
rc io.ReadCloser // reader for the current open chunk
offset int64 // offset the next Read will start. -1 forces a reopen of o
chunkOffset int64 // beginning of the current or next chunk
chunkSize int64 // length of the current or next chunk. -1 will open o from chunkOffset to the end
initialChunkSize int64 // default chunkSize after the chunk specified by RangeSeek is complete
maxChunkSize int64 // consecutive read chunks will double in size until reached. -1 means no limit
customChunkSize bool // is the current chunkSize set by RangeSeek?
closed bool // has Close been called?
}
// Make a new sequential chunked reader
func newSequential(ctx context.Context, o fs.Object, initialChunkSize int64, maxChunkSize int64) ChunkedReader {
return &sequential{
ctx: ctx,
o: o,
offset: -1,
chunkSize: initialChunkSize,
initialChunkSize: initialChunkSize,
maxChunkSize: maxChunkSize,
}
}
// Read from the file - for details see io.Reader
func (cr *sequential) Read(p []byte) (n int, err error) {
cr.mu.Lock()
defer cr.mu.Unlock()
if cr.closed {
return 0, ErrorFileClosed
}
for reqSize := int64(len(p)); reqSize > 0; reqSize = int64(len(p)) {
// the current chunk boundary. valid only when chunkSize > 0
chunkEnd := cr.chunkOffset + cr.chunkSize
fs.Debugf(cr.o, "ChunkedReader.Read at %d length %d chunkOffset %d chunkSize %d", cr.offset, reqSize, cr.chunkOffset, cr.chunkSize)
switch {
case cr.chunkSize > 0 && cr.offset == chunkEnd: // last chunk read completely
cr.chunkOffset = cr.offset
if cr.customChunkSize { // last chunkSize was set by RangeSeek
cr.customChunkSize = false
cr.chunkSize = cr.initialChunkSize
} else {
cr.chunkSize *= 2
if cr.chunkSize > cr.maxChunkSize && cr.maxChunkSize != -1 {
cr.chunkSize = cr.maxChunkSize
}
}
// recalculate the chunk boundary. valid only when chunkSize > 0
chunkEnd = cr.chunkOffset + cr.chunkSize
fallthrough
case cr.offset == -1: // first Read or Read after RangeSeek
err = cr.openRange()
if err != nil {
return
}
}
var buf []byte
chunkRest := chunkEnd - cr.offset
// limit read to chunk boundaries if chunkSize > 0
if reqSize > chunkRest && cr.chunkSize > 0 {
buf, p = p[0:chunkRest], p[chunkRest:]
} else {
buf, p = p, nil
}
var rn int
rn, err = io.ReadFull(cr.rc, buf)
n += rn
cr.offset += int64(rn)
if err != nil {
if err == io.ErrUnexpectedEOF {
err = io.EOF
}
return
}
}
return n, nil
}
// Close the file - for details see io.Closer
//
// All methods on ChunkedReader will return ErrorFileClosed afterwards
func (cr *sequential) Close() error {
cr.mu.Lock()
defer cr.mu.Unlock()
if cr.closed {
return ErrorFileClosed
}
cr.closed = true
return cr.resetReader(nil, 0)
}
// Seek the file - for details see io.Seeker
func (cr *sequential) Seek(offset int64, whence int) (int64, error) {
return cr.RangeSeek(context.TODO(), offset, whence, -1)
}
// RangeSeek the file - for details see RangeSeeker
//
// The specified length will only apply to the next chunk opened.
// RangeSeek will not reopen the source until Read is called.
func (cr *sequential) RangeSeek(ctx context.Context, offset int64, whence int, length int64) (int64, error) {
cr.mu.Lock()
defer cr.mu.Unlock()
fs.Debugf(cr.o, "ChunkedReader.RangeSeek from %d to %d length %d", cr.offset, offset, length)
if cr.closed {
return 0, ErrorFileClosed
}
size := cr.o.Size()
switch whence {
case io.SeekStart:
cr.offset = 0
case io.SeekEnd:
if size < 0 {
return 0, ErrorInvalidSeek // Can't seek from end for unknown size
}
cr.offset = size
}
// set the new chunk start
cr.chunkOffset = cr.offset + offset
// force reopen on next Read
cr.offset = -1
if length > 0 {
cr.customChunkSize = true
cr.chunkSize = length
} else {
cr.chunkSize = cr.initialChunkSize
}
if cr.chunkOffset < 0 || cr.chunkOffset >= size {
cr.chunkOffset = 0
return 0, ErrorInvalidSeek
}
return cr.chunkOffset, nil
}
// Open forces the connection to be opened
func (cr *sequential) Open() (ChunkedReader, error) {
cr.mu.Lock()
defer cr.mu.Unlock()
if cr.rc != nil && cr.offset != -1 {
return cr, nil
}
return cr, cr.openRange()
}
// openRange will open the source Object with the current chunk range
//
// If the current open reader implements RangeSeeker, it is tried first.
// When RangeSeek fails, o.Open with a RangeOption is used.
//
// A length <= 0 will request till the end of the file
func (cr *sequential) openRange() error {
offset, length := cr.chunkOffset, cr.chunkSize
fs.Debugf(cr.o, "ChunkedReader.openRange at %d length %d", offset, length)
if cr.closed {
return ErrorFileClosed
}
if rs, ok := cr.rc.(fs.RangeSeeker); ok {
n, err := rs.RangeSeek(cr.ctx, offset, io.SeekStart, length)
if err == nil && n == offset {
cr.offset = offset
return nil
}
if err != nil {
fs.Debugf(cr.o, "ChunkedReader.openRange seek failed (%s). Trying Open", err)
} else {
fs.Debugf(cr.o, "ChunkedReader.openRange seeked to wrong offset. Wanted %d, got %d. Trying Open", offset, n)
}
}
var rc io.ReadCloser
var err error
if length <= 0 {
if offset == 0 {
rc, err = cr.o.Open(cr.ctx, &fs.HashesOption{Hashes: hash.Set(hash.None)})
} else {
rc, err = cr.o.Open(cr.ctx, &fs.HashesOption{Hashes: hash.Set(hash.None)}, &fs.RangeOption{Start: offset, End: -1})
}
} else {
rc, err = cr.o.Open(cr.ctx, &fs.HashesOption{Hashes: hash.Set(hash.None)}, &fs.RangeOption{Start: offset, End: offset + length - 1})
}
if err != nil {
return err
}
return cr.resetReader(rc, offset)
}
// resetReader switches the current reader to the given reader.
// The old reader will be Close'd before setting the new reader.
func (cr *sequential) resetReader(rc io.ReadCloser, offset int64) error {
if cr.rc != nil {
if err := cr.rc.Close(); err != nil {
return err
}
}
cr.rc = rc
cr.offset = offset
return nil
}
var (
_ ChunkedReader = (*sequential)(nil)
)

View File

@ -0,0 +1,20 @@
package chunkedreader
import (
"testing"
_ "github.com/rclone/rclone/backend/local"
"github.com/rclone/rclone/fstest/mockobject"
)
func TestSequential(t *testing.T) {
content := makeContent(t, 1024)
for _, mode := range mockobject.SeekModes {
t.Run(mode.String(), testRead(content, mode, 0))
}
}
func TestSequentialErrorAfterClose(t *testing.T) {
testErrorAfterClose(t, 0)
}

View File

@ -75,7 +75,8 @@ func (fh *ReadFileHandle) openPending() (err error) {
return nil return nil
} }
o := fh.file.getObject() o := fh.file.getObject()
r, err := chunkedreader.New(context.TODO(), o, int64(fh.file.VFS().Opt.ChunkSize), int64(fh.file.VFS().Opt.ChunkSizeLimit)).Open() opt := &fh.file.VFS().Opt
r, err := chunkedreader.New(context.TODO(), o, int64(opt.ChunkSize), int64(opt.ChunkSizeLimit), opt.ChunkStreams).Open()
if err != nil { if err != nil {
return err return err
} }
@ -127,7 +128,7 @@ func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) {
} }
fh.r.StopBuffering() // stop the background reading first fh.r.StopBuffering() // stop the background reading first
oldReader := fh.r.GetReader() oldReader := fh.r.GetReader()
r, ok := oldReader.(*chunkedreader.ChunkedReader) r, ok := oldReader.(chunkedreader.ChunkedReader)
if !ok { if !ok {
fs.Logf(fh.remote, "ReadFileHandle.Read expected reader to be a ChunkedReader, got %T", oldReader) fs.Logf(fh.remote, "ReadFileHandle.Read expected reader to be a ChunkedReader, got %T", oldReader)
reopen = true reopen = true
@ -148,7 +149,8 @@ func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) {
} }
// re-open with a seek // re-open with a seek
o := fh.file.getObject() o := fh.file.getObject()
r = chunkedreader.New(context.TODO(), o, int64(fh.file.VFS().Opt.ChunkSize), int64(fh.file.VFS().Opt.ChunkSizeLimit)) opt := &fh.file.VFS().Opt
r = chunkedreader.New(context.TODO(), o, int64(opt.ChunkSize), int64(opt.ChunkSizeLimit), opt.ChunkStreams)
_, err := r.Seek(offset, 0) _, err := r.Seek(offset, 0)
if err != nil { if err != nil {
fs.Debugf(fh.remote, "ReadFileHandle.Read seek failed: %v", err) fs.Debugf(fh.remote, "ReadFileHandle.Read seek failed: %v", err)

View File

@ -230,6 +230,11 @@ These flags control the chunking:
--vfs-read-chunk-size SizeSuffix Read the source objects in chunks (default 128M) --vfs-read-chunk-size SizeSuffix Read the source objects in chunks (default 128M)
--vfs-read-chunk-size-limit SizeSuffix Max chunk doubling size (default off) --vfs-read-chunk-size-limit SizeSuffix Max chunk doubling size (default off)
--vfs-read-chunk-streams int The number of parallel streams to read at once
The chunking behaves differently depending on the `--vfs-read-chunk-streams` parameter.
#### `--vfs-read-chunk-streams` == 0
Rclone will start reading a chunk of size `--vfs-read-chunk-size`, Rclone will start reading a chunk of size `--vfs-read-chunk-size`,
and then double the size for each read. When `--vfs-read-chunk-size-limit` is and then double the size for each read. When `--vfs-read-chunk-size-limit` is
@ -245,6 +250,30 @@ When `--vfs-read-chunk-size-limit 500M` is specified, the result would be
Setting `--vfs-read-chunk-size` to `0` or "off" disables chunked reading. Setting `--vfs-read-chunk-size` to `0` or "off" disables chunked reading.
The chunks will not be buffered in memory.
#### `--vfs-read-chunk-streams` > 0
Rclone reads `--vfs-read-chunk-streams` chunks of size
`--vfs-read-chunk-size` concurrently. The size for each read will stay
constant.
This improves performance performance massively on high latency links
or very high bandwidth links to high performance object stores.
Some experimentation will be needed to find the optimum values of
`--vfs-read-chunk-size` and `--vfs-read-chunk-streams` as these will
depend on the backend in use and the latency to the backend.
For high performance object stores (eg AWS S3) a reasonable place to
start might be `--vfs-read-chunk-streams 16` and
`--vfs-read-chunk-size 4M`. In testing with AWS S3 the performance
scaled roughly as the `--vfs-read-chunk-streams` setting.
Similar settings should work for high latency links, but depending on
the latency they may need more `--vfs-read-chunk-streams` in order to
get the throughput.
### VFS Performance ### VFS Performance
These flags may be used to enable/disable features of the VFS for These flags may be used to enable/disable features of the VFS for

View File

@ -540,7 +540,7 @@ func (dl *downloader) open(offset int64) (err error) {
// } // }
// in0, err := operations.NewReOpen(dl.dls.ctx, dl.dls.src, ci.LowLevelRetries, dl.dls.item.c.hashOption, rangeOption) // in0, err := operations.NewReOpen(dl.dls.ctx, dl.dls.src, ci.LowLevelRetries, dl.dls.item.c.hashOption, rangeOption)
in0 := chunkedreader.New(context.TODO(), dl.dls.src, int64(dl.dls.opt.ChunkSize), int64(dl.dls.opt.ChunkSizeLimit)) in0 := chunkedreader.New(context.TODO(), dl.dls.src, int64(dl.dls.opt.ChunkSize), int64(dl.dls.opt.ChunkSizeLimit), dl.dls.opt.ChunkStreams)
_, err = in0.Seek(offset, 0) _, err = in0.Seek(offset, 0)
if err != nil { if err != nil {
return fmt.Errorf("vfs reader: failed to open source file: %w", err) return fmt.Errorf("vfs reader: failed to open source file: %w", err)

View File

@ -79,6 +79,11 @@ var OptionsInfo = fs.Options{{
Default: fs.SizeSuffix(-1), Default: fs.SizeSuffix(-1),
Help: "If greater than --vfs-read-chunk-size, double the chunk size after each chunk read, until the limit is reached ('off' is unlimited)", Help: "If greater than --vfs-read-chunk-size, double the chunk size after each chunk read, until the limit is reached ('off' is unlimited)",
Groups: "VFS", Groups: "VFS",
}, {
Name: "vfs_read_chunk_streams",
Default: 0,
Help: "The number of parallel streams to read at once",
Groups: "VFS",
}, { }, {
Name: "dir_perms", Name: "dir_perms",
Default: FileMode(0777), Default: FileMode(0777),
@ -171,6 +176,7 @@ type Options struct {
FilePerms FileMode `config:"file_perms"` FilePerms FileMode `config:"file_perms"`
ChunkSize fs.SizeSuffix `config:"vfs_read_chunk_size"` // if > 0 read files in chunks ChunkSize fs.SizeSuffix `config:"vfs_read_chunk_size"` // if > 0 read files in chunks
ChunkSizeLimit fs.SizeSuffix `config:"vfs_read_chunk_size_limit"` // if > ChunkSize double the chunk size after each chunk until reached ChunkSizeLimit fs.SizeSuffix `config:"vfs_read_chunk_size_limit"` // if > ChunkSize double the chunk size after each chunk until reached
ChunkStreams int `config:"vfs_read_chunk_streams"` // Number of download streams to use
CacheMode CacheMode `config:"vfs_cache_mode"` CacheMode CacheMode `config:"vfs_cache_mode"`
CacheMaxAge fs.Duration `config:"vfs_cache_max_age"` CacheMaxAge fs.Duration `config:"vfs_cache_max_age"`
CacheMaxSize fs.SizeSuffix `config:"vfs_cache_max_size"` CacheMaxSize fs.SizeSuffix `config:"vfs_cache_max_size"`