diff --git a/backend/compress/compress.go b/backend/compress/compress.go index 038391602..d632b815d 100644 --- a/backend/compress/compress.go +++ b/backend/compress/compress.go @@ -38,6 +38,7 @@ import ( const ( initialChunkSize = 262144 // Initial and max sizes of chunks when reading parts of the file. Currently maxChunkSize = 8388608 // at 256 KiB and 8 MiB. + chunkStreams = 0 // Streams to use for reading bufferSize = 8388608 heuristicBytes = 1048576 @@ -1362,7 +1363,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (rc io.Read } } // Get a chunkedreader for the wrapped object - chunkedReader := chunkedreader.New(ctx, o.Object, initialChunkSize, maxChunkSize) + chunkedReader := chunkedreader.New(ctx, o.Object, initialChunkSize, maxChunkSize, chunkStreams) // Get file handle var file io.Reader if offset != 0 { diff --git a/fs/chunkedreader/chunkedreader.go b/fs/chunkedreader/chunkedreader.go index 66cf666d2..2d36dcee1 100644 --- a/fs/chunkedreader/chunkedreader.go +++ b/fs/chunkedreader/chunkedreader.go @@ -1,14 +1,12 @@ -// Package chunkedreader provides functionality for reading in chunks. +// Package chunkedreader provides functionality for reading a stream in chunks. package chunkedreader import ( "context" "errors" "io" - "sync" "github.com/rclone/rclone/fs" - "github.com/rclone/rclone/fs/hash" ) // io related errors returned by ChunkedReader @@ -17,22 +15,13 @@ var ( ErrorInvalidSeek = errors.New("invalid seek position") ) -// ChunkedReader is a reader for an Object with the possibility -// of reading the source in chunks of given size -// -// An initialChunkSize of <= 0 will disable chunked reading. -type ChunkedReader struct { - ctx context.Context - mu sync.Mutex // protects following fields - o fs.Object // source to read from - rc io.ReadCloser // reader for the current open chunk - offset int64 // offset the next Read will start. -1 forces a reopen of o - chunkOffset int64 // beginning of the current or next chunk - chunkSize int64 // length of the current or next chunk. -1 will open o from chunkOffset to the end - initialChunkSize int64 // default chunkSize after the chunk specified by RangeSeek is complete - maxChunkSize int64 // consecutive read chunks will double in size until reached. -1 means no limit - customChunkSize bool // is the current chunkSize set by RangeSeek? - closed bool // has Close been called? +// ChunkedReader describes what a chunked reader can do. +type ChunkedReader interface { + io.Reader + io.Seeker + io.Closer + fs.RangeSeeker + Open() (ChunkedReader, error) } // New returns a ChunkedReader for the Object. @@ -41,210 +30,18 @@ type ChunkedReader struct { // If maxChunkSize is greater than initialChunkSize, the chunk size will be // doubled after each chunk read with a maximum of maxChunkSize. // A Seek or RangeSeek will reset the chunk size to it's initial value -func New(ctx context.Context, o fs.Object, initialChunkSize int64, maxChunkSize int64) *ChunkedReader { +func New(ctx context.Context, o fs.Object, initialChunkSize int64, maxChunkSize int64, streams int) ChunkedReader { if initialChunkSize <= 0 { initialChunkSize = -1 } if maxChunkSize != -1 && maxChunkSize < initialChunkSize { maxChunkSize = initialChunkSize } - return &ChunkedReader{ - ctx: ctx, - o: o, - offset: -1, - chunkSize: initialChunkSize, - initialChunkSize: initialChunkSize, - maxChunkSize: maxChunkSize, + if streams < 0 { + streams = 0 } + if streams <= 1 || o.Size() < 0 { + return newSequential(ctx, o, initialChunkSize, maxChunkSize) + } + return newParallel(ctx, o, initialChunkSize, streams) } - -// Read from the file - for details see io.Reader -func (cr *ChunkedReader) Read(p []byte) (n int, err error) { - cr.mu.Lock() - defer cr.mu.Unlock() - - if cr.closed { - return 0, ErrorFileClosed - } - - for reqSize := int64(len(p)); reqSize > 0; reqSize = int64(len(p)) { - // the current chunk boundary. valid only when chunkSize > 0 - chunkEnd := cr.chunkOffset + cr.chunkSize - - fs.Debugf(cr.o, "ChunkedReader.Read at %d length %d chunkOffset %d chunkSize %d", cr.offset, reqSize, cr.chunkOffset, cr.chunkSize) - - switch { - case cr.chunkSize > 0 && cr.offset == chunkEnd: // last chunk read completely - cr.chunkOffset = cr.offset - if cr.customChunkSize { // last chunkSize was set by RangeSeek - cr.customChunkSize = false - cr.chunkSize = cr.initialChunkSize - } else { - cr.chunkSize *= 2 - if cr.chunkSize > cr.maxChunkSize && cr.maxChunkSize != -1 { - cr.chunkSize = cr.maxChunkSize - } - } - // recalculate the chunk boundary. valid only when chunkSize > 0 - chunkEnd = cr.chunkOffset + cr.chunkSize - fallthrough - case cr.offset == -1: // first Read or Read after RangeSeek - err = cr.openRange() - if err != nil { - return - } - } - - var buf []byte - chunkRest := chunkEnd - cr.offset - // limit read to chunk boundaries if chunkSize > 0 - if reqSize > chunkRest && cr.chunkSize > 0 { - buf, p = p[0:chunkRest], p[chunkRest:] - } else { - buf, p = p, nil - } - var rn int - rn, err = io.ReadFull(cr.rc, buf) - n += rn - cr.offset += int64(rn) - if err != nil { - if err == io.ErrUnexpectedEOF { - err = io.EOF - } - return - } - } - return n, nil -} - -// Close the file - for details see io.Closer -// -// All methods on ChunkedReader will return ErrorFileClosed afterwards -func (cr *ChunkedReader) Close() error { - cr.mu.Lock() - defer cr.mu.Unlock() - - if cr.closed { - return ErrorFileClosed - } - cr.closed = true - - return cr.resetReader(nil, 0) -} - -// Seek the file - for details see io.Seeker -func (cr *ChunkedReader) Seek(offset int64, whence int) (int64, error) { - return cr.RangeSeek(context.TODO(), offset, whence, -1) -} - -// RangeSeek the file - for details see RangeSeeker -// -// The specified length will only apply to the next chunk opened. -// RangeSeek will not reopen the source until Read is called. -func (cr *ChunkedReader) RangeSeek(ctx context.Context, offset int64, whence int, length int64) (int64, error) { - cr.mu.Lock() - defer cr.mu.Unlock() - - fs.Debugf(cr.o, "ChunkedReader.RangeSeek from %d to %d length %d", cr.offset, offset, length) - - if cr.closed { - return 0, ErrorFileClosed - } - - size := cr.o.Size() - switch whence { - case io.SeekStart: - cr.offset = 0 - case io.SeekEnd: - cr.offset = size - } - // set the new chunk start - cr.chunkOffset = cr.offset + offset - // force reopen on next Read - cr.offset = -1 - if length > 0 { - cr.customChunkSize = true - cr.chunkSize = length - } else { - cr.chunkSize = cr.initialChunkSize - } - if cr.chunkOffset < 0 || cr.chunkOffset >= size { - cr.chunkOffset = 0 - return 0, ErrorInvalidSeek - } - return cr.chunkOffset, nil -} - -// Open forces the connection to be opened -func (cr *ChunkedReader) Open() (*ChunkedReader, error) { - cr.mu.Lock() - defer cr.mu.Unlock() - - if cr.rc != nil && cr.offset != -1 { - return cr, nil - } - return cr, cr.openRange() -} - -// openRange will open the source Object with the current chunk range -// -// If the current open reader implements RangeSeeker, it is tried first. -// When RangeSeek fails, o.Open with a RangeOption is used. -// -// A length <= 0 will request till the end of the file -func (cr *ChunkedReader) openRange() error { - offset, length := cr.chunkOffset, cr.chunkSize - fs.Debugf(cr.o, "ChunkedReader.openRange at %d length %d", offset, length) - - if cr.closed { - return ErrorFileClosed - } - - if rs, ok := cr.rc.(fs.RangeSeeker); ok { - n, err := rs.RangeSeek(cr.ctx, offset, io.SeekStart, length) - if err == nil && n == offset { - cr.offset = offset - return nil - } - if err != nil { - fs.Debugf(cr.o, "ChunkedReader.openRange seek failed (%s). Trying Open", err) - } else { - fs.Debugf(cr.o, "ChunkedReader.openRange seeked to wrong offset. Wanted %d, got %d. Trying Open", offset, n) - } - } - - var rc io.ReadCloser - var err error - if length <= 0 { - if offset == 0 { - rc, err = cr.o.Open(cr.ctx, &fs.HashesOption{Hashes: hash.Set(hash.None)}) - } else { - rc, err = cr.o.Open(cr.ctx, &fs.HashesOption{Hashes: hash.Set(hash.None)}, &fs.RangeOption{Start: offset, End: -1}) - } - } else { - rc, err = cr.o.Open(cr.ctx, &fs.HashesOption{Hashes: hash.Set(hash.None)}, &fs.RangeOption{Start: offset, End: offset + length - 1}) - } - if err != nil { - return err - } - return cr.resetReader(rc, offset) -} - -// resetReader switches the current reader to the given reader. -// The old reader will be Close'd before setting the new reader. -func (cr *ChunkedReader) resetReader(rc io.ReadCloser, offset int64) error { - if cr.rc != nil { - if err := cr.rc.Close(); err != nil { - return err - } - } - cr.rc = rc - cr.offset = offset - return nil -} - -var ( - _ io.ReadCloser = (*ChunkedReader)(nil) - _ io.Seeker = (*ChunkedReader)(nil) - _ fs.RangeSeeker = (*ChunkedReader)(nil) -) diff --git a/fs/chunkedreader/chunkedreader_test.go b/fs/chunkedreader/chunkedreader_test.go index d9fb3cccb..62c72094e 100644 --- a/fs/chunkedreader/chunkedreader_test.go +++ b/fs/chunkedreader/chunkedreader_test.go @@ -7,21 +7,47 @@ import ( "math/rand" "testing" + "github.com/rclone/rclone/fstest" "github.com/rclone/rclone/fstest/mockobject" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestChunkedReader(t *testing.T) { - content := makeContent(t, 1024) +// TestMain drives the tests +func TestMain(m *testing.M) { + fstest.TestMain(m) +} - for _, mode := range mockobject.SeekModes { - t.Run(mode.String(), testRead(content, mode)) +func TestChunkedReader(t *testing.T) { + ctx := context.Background() + o := mockobject.New("test.bin").WithContent([]byte("hello"), mockobject.SeekModeRegular) + const MB = 1024 * 1024 + + for _, test := range []struct { + initialChunkSize int64 + maxChunkSize int64 + streams int + crType any + unknownSize bool + }{ + {-1, MB, 0, new(sequential), false}, + {MB, 10 * MB, 0, new(sequential), false}, + {MB, 10 * MB, 1, new(sequential), false}, + {MB, 10 * MB, 1, new(sequential), true}, + {MB, 10 * MB, 2, new(parallel), false}, + {MB, 10 * MB, 2, new(sequential), true}, + } { + what := fmt.Sprintf("%+v", test) + o.SetUnknownSize(test.unknownSize) + cr := New(ctx, o, test.initialChunkSize, test.maxChunkSize, test.streams) + assert.IsType(t, test.crType, cr, what) + require.NoError(t, cr.Close(), what) } } -func testRead(content []byte, mode mockobject.SeekMode) func(*testing.T) { +func testRead(content []byte, mode mockobject.SeekMode, streams int) func(*testing.T) { return func(t *testing.T) { + ctx := context.Background() chunkSizes := []int64{-1, 0, 1, 15, 16, 17, 1023, 1024, 1025, 2000} offsets := []int64{0, 1, 2, 3, 4, 5, 7, 8, 9, 15, 16, 17, 31, 32, 33, 63, 64, 65, 511, 512, 513, 1023, 1024, 1025} @@ -39,13 +65,13 @@ func testRead(content []byte, mode mockobject.SeekMode) func(*testing.T) { } t.Run(fmt.Sprintf("Chunksize_%d_%d", cs, csMax), func(t *testing.T) { - cr := New(context.Background(), o, cs, csMax) + cr := New(ctx, o, cs, csMax, streams) for _, offset := range offsets { for _, limit := range limits { what := fmt.Sprintf("offset %d, limit %d", offset, limit) - p, err := cr.RangeSeek(context.Background(), offset, io.SeekStart, limit) + p, err := cr.RangeSeek(ctx, offset, io.SeekStart, limit) if offset >= cl { require.Error(t, err, what) return @@ -74,32 +100,33 @@ func testRead(content []byte, mode mockobject.SeekMode) func(*testing.T) { } } -func TestErrorAfterClose(t *testing.T) { +func testErrorAfterClose(t *testing.T, streams int) { + ctx := context.Background() content := makeContent(t, 1024) o := mockobject.New("test.bin").WithContent(content, mockobject.SeekModeNone) // Close - cr := New(context.Background(), o, 0, 0) + cr := New(ctx, o, 0, 0, streams) require.NoError(t, cr.Close()) require.Error(t, cr.Close()) // Read - cr = New(context.Background(), o, 0, 0) + cr = New(ctx, o, 0, 0, streams) require.NoError(t, cr.Close()) var buf [1]byte _, err := cr.Read(buf[:]) require.Error(t, err) // Seek - cr = New(context.Background(), o, 0, 0) + cr = New(ctx, o, 0, 0, streams) require.NoError(t, cr.Close()) _, err = cr.Seek(1, io.SeekCurrent) require.Error(t, err) // RangeSeek - cr = New(context.Background(), o, 0, 0) + cr = New(ctx, o, 0, 0, streams) require.NoError(t, cr.Close()) - _, err = cr.RangeSeek(context.Background(), 1, io.SeekCurrent, 0) + _, err = cr.RangeSeek(ctx, 1, io.SeekCurrent, 0) require.Error(t, err) } diff --git a/fs/chunkedreader/parallel.go b/fs/chunkedreader/parallel.go new file mode 100644 index 000000000..15d0da4a1 --- /dev/null +++ b/fs/chunkedreader/parallel.go @@ -0,0 +1,383 @@ +package chunkedreader + +import ( + "context" + "fmt" + "io" + "sync" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/fs/log" + "github.com/rclone/rclone/fs/operations" + "github.com/rclone/rclone/lib/multipart" + "github.com/rclone/rclone/lib/pool" +) + +// parallel reads Object in chunks of a given size in parallel. +type parallel struct { + ctx context.Context + o fs.Object // source to read from + mu sync.Mutex // protects following fields + endStream int64 // offset we have started streams for + offset int64 // offset the read file pointer is at + chunkSize int64 // length of the chunks to read + nstreams int // number of streams to use + streams []*stream // the opened streams in offset order - the current one is first + closed bool // has Close been called? +} + +// stream holds the info about a single download +type stream struct { + cr *parallel // parent reader + ctx context.Context // ctx to cancel if needed + cancel func() // cancel the stream + rc io.ReadCloser // reader that it is reading from, may be nil + offset int64 // where the stream is reading from + size int64 // and the size it is reading + readBytes int64 // bytes read from the stream + rw *pool.RW // buffer for read + err chan error // error returned from the read + name string // name of this stream for debugging +} + +// Start a stream reading (offset, offset+size) +func (cr *parallel) newStream(ctx context.Context, offset, size int64) (s *stream, err error) { + ctx, cancel := context.WithCancel(ctx) + + // Create the stream + rw := multipart.NewRW() + s = &stream{ + cr: cr, + ctx: ctx, + cancel: cancel, + offset: offset, + size: size, + rw: rw, + err: make(chan error, 1), + } + s.name = fmt.Sprintf("stream(%d,%d,%p)", s.offset, s.size, s) + + // Start the background read into the buffer + go s.readFrom(ctx) + + // Return the stream to the caller + return s, nil +} + +// read the file into the buffer +func (s *stream) readFrom(ctx context.Context) { + // Open the object at the correct range + fs.Debugf(s.cr.o, "%s: open", s.name) + rc, err := operations.Open(ctx, s.cr.o, + &fs.HashesOption{Hashes: hash.Set(hash.None)}, + &fs.RangeOption{Start: s.offset, End: s.offset + s.size - 1}) + if err != nil { + s.err <- fmt.Errorf("parallel chunked reader: failed to open stream at %d size %d: %w", s.offset, s.size, err) + return + } + s.rc = rc + + fs.Debugf(s.cr.o, "%s: readfrom started", s.name) + _, err = s.rw.ReadFrom(s.rc) + fs.Debugf(s.cr.o, "%s: readfrom finished (%d bytes): %v", s.name, s.rw.Size(), err) + s.err <- err +} + +// eof is true when we've read all the data we are expecting +func (s *stream) eof() bool { + return s.readBytes >= s.size +} + +// read reads up to len(p) bytes into p. It returns the number of +// bytes read (0 <= n <= len(p)) and any error encountered. If some +// data is available but not len(p) bytes, read returns what is +// available instead of waiting for more. +func (s *stream) read(p []byte) (n int, err error) { + defer log.Trace(s.cr.o, "%s: Read len(p)=%d", s.name, len(p))("n=%d, err=%v", &n, &err) + if len(p) == 0 { + return n, nil + } + for { + var nn int + nn, err = s.rw.Read(p[n:]) + fs.Debugf(s.cr.o, "%s: rw.Read nn=%d, err=%v", s.name, nn, err) + s.readBytes += int64(nn) + n += nn + if err != nil && err != io.EOF { + return n, err + } + if s.eof() { + return n, io.EOF + } + // Received a faux io.EOF because we haven't read all the data yet + if n >= len(p) { + break + } + // Wait for a write to happen to read more + s.rw.WaitWrite(s.ctx) + } + return n, nil +} + +// Sets *perr to newErr if err is nil +func orErr(perr *error, newErr error) { + if *perr == nil { + *perr = newErr + } +} + +// Close a stream +func (s *stream) close() (err error) { + defer log.Trace(s.cr.o, "%s: close", s.name)("err=%v", &err) + s.cancel() + err = <-s.err // wait for readFrom to stop and return error + orErr(&err, s.rw.Close()) + if s.rc != nil { + orErr(&err, s.rc.Close()) + } + if err != nil && err != io.EOF { + return fmt.Errorf("parallel chunked reader: failed to read stream at %d size %d: %w", s.offset, s.size, err) + } + return nil +} + +// Make a new parallel chunked reader +// +// Mustn't be called for an unknown size object +func newParallel(ctx context.Context, o fs.Object, chunkSize int64, streams int) ChunkedReader { + // Make sure chunkSize is a multiple of multipart.BufferSize + if chunkSize < 0 { + chunkSize = multipart.BufferSize + } + newChunkSize := multipart.BufferSize * (chunkSize / multipart.BufferSize) + if newChunkSize < chunkSize { + newChunkSize += multipart.BufferSize + } + + fs.Debugf(o, "newParallel chunkSize=%d, streams=%d", chunkSize, streams) + + return ¶llel{ + ctx: ctx, + o: o, + offset: 0, + chunkSize: newChunkSize, + nstreams: streams, + } +} + +// _open starts the file transferring at offset +// +// Call with the lock held +func (cr *parallel) _open() (err error) { + size := cr.o.Size() + if size < 0 { + return fmt.Errorf("parallel chunked reader: can't use multiple threads for unknown sized object %q", cr.o) + } + // Launched enough streams already + if cr.endStream >= size { + return nil + } + + // Make sure cr.nstreams are running + for i := len(cr.streams); i < cr.nstreams; i++ { + // clip to length of file + chunkSize := cr.chunkSize + newEndStream := cr.endStream + chunkSize + if newEndStream > size { + chunkSize = size - cr.endStream + newEndStream = cr.endStream + chunkSize + } + + s, err := cr.newStream(cr.ctx, cr.endStream, chunkSize) + if err != nil { + return err + } + cr.streams = append(cr.streams, s) + cr.endStream = newEndStream + + if cr.endStream >= size { + break + } + } + + return nil +} + +// Finished reading the current stream so pop it off and destroy it +// +// Call with lock held +func (cr *parallel) _popStream() (err error) { + defer log.Trace(cr.o, "streams=%+v", cr.streams)("streams=%+v, err=%v", &cr.streams, &err) + if len(cr.streams) == 0 { + return nil + } + stream := cr.streams[0] + err = stream.close() + cr.streams[0] = nil + cr.streams = cr.streams[1:] + return err +} + +// Get rid of all the streams +// +// Call with lock held +func (cr *parallel) _popStreams() (err error) { + defer log.Trace(cr.o, "streams=%+v", cr.streams)("streams=%+v, err=%v", &cr.streams, &err) + for len(cr.streams) > 0 { + orErr(&err, cr._popStream()) + } + cr.streams = nil + return err +} + +// Read from the file - for details see io.Reader +func (cr *parallel) Read(p []byte) (n int, err error) { + defer log.Trace(cr.o, "Read len(p)=%d", len(p))("n=%d, err=%v", &n, &err) + cr.mu.Lock() + defer cr.mu.Unlock() + + if cr.closed { + return 0, ErrorFileClosed + } + + for n < len(p) { + // Make sure we have the correct number of streams open + err = cr._open() + if err != nil { + return n, err + } + + // No streams left means EOF + if len(cr.streams) == 0 { + return n, io.EOF + } + + // Read from the stream + stream := cr.streams[0] + nn, err := stream.read(p[n:]) + n += nn + cr.offset += int64(nn) + if err == io.EOF { + err = cr._popStream() + if err != nil { + break + } + } else if err != nil { + break + } + } + return n, err +} + +// Close the file - for details see io.Closer +// +// All methods on ChunkedReader will return ErrorFileClosed afterwards +func (cr *parallel) Close() error { + cr.mu.Lock() + defer cr.mu.Unlock() + + if cr.closed { + return ErrorFileClosed + } + cr.closed = true + + // Close all the streams + return cr._popStreams() +} + +// Seek the file - for details see io.Seeker +func (cr *parallel) Seek(offset int64, whence int) (int64, error) { + cr.mu.Lock() + defer cr.mu.Unlock() + + fs.Debugf(cr.o, "parallel chunked reader: seek from %d to %d whence %d", cr.offset, offset, whence) + + if cr.closed { + return 0, ErrorFileClosed + } + + size := cr.o.Size() + currentOffset := cr.offset + switch whence { + case io.SeekStart: + currentOffset = 0 + case io.SeekEnd: + currentOffset = size + } + // set the new chunk start + newOffset := currentOffset + offset + if newOffset < 0 || newOffset >= size { + return 0, ErrorInvalidSeek + } + + // If seek pointer didn't move, return now + if newOffset == cr.offset { + fs.Debugf(cr.o, "parallel chunked reader: seek pointer didn't move") + return cr.offset, nil + } + + cr.offset = newOffset + + // Ditch out of range streams + for len(cr.streams) > 0 { + stream := cr.streams[0] + if newOffset >= stream.offset+stream.size { + _ = cr._popStream() + } else { + break + } + } + + // If no streams remain we can just restart + if len(cr.streams) == 0 { + fs.Debugf(cr.o, "parallel chunked reader: no streams remain") + cr.endStream = cr.offset + return cr.offset, nil + } + + // Current stream + stream := cr.streams[0] + + // If new offset is before current stream then ditch all the streams + if newOffset < stream.offset { + _ = cr._popStreams() + fs.Debugf(cr.o, "parallel chunked reader: new offset is before current stream - ditch all") + cr.endStream = cr.offset + return cr.offset, nil + } + + // Seek the current stream + streamOffset := newOffset - stream.offset + stream.readBytes = streamOffset // correct read value + fs.Debugf(cr.o, "parallel chunked reader: seek the current stream to %d", streamOffset) + // Wait for the read to the correct part of the data + for stream.rw.Size() < streamOffset { + stream.rw.WaitWrite(cr.ctx) + } + _, err := stream.rw.Seek(streamOffset, io.SeekStart) + if err != nil { + return cr.offset, fmt.Errorf("parallel chunked reader: failed to seek stream: %w", err) + } + + return cr.offset, nil +} + +// RangeSeek the file - for details see RangeSeeker +// +// In the parallel chunked reader this just acts like Seek +func (cr *parallel) RangeSeek(ctx context.Context, offset int64, whence int, length int64) (int64, error) { + return cr.Seek(offset, whence) +} + +// Open forces the connection to be opened +func (cr *parallel) Open() (ChunkedReader, error) { + cr.mu.Lock() + defer cr.mu.Unlock() + + return cr, cr._open() +} + +var ( + _ ChunkedReader = (*parallel)(nil) +) diff --git a/fs/chunkedreader/parallel_test.go b/fs/chunkedreader/parallel_test.go new file mode 100644 index 000000000..5df6492e9 --- /dev/null +++ b/fs/chunkedreader/parallel_test.go @@ -0,0 +1,102 @@ +package chunkedreader + +import ( + "context" + "io" + "math/rand" + "testing" + + "github.com/rclone/rclone/fstest/mockobject" + "github.com/rclone/rclone/lib/multipart" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParallel(t *testing.T) { + content := makeContent(t, 1024) + + for _, mode := range mockobject.SeekModes { + t.Run(mode.String(), testRead(content, mode, 3)) + } +} + +func TestParallelErrorAfterClose(t *testing.T) { + testErrorAfterClose(t, 3) +} + +func TestParallelLarge(t *testing.T) { + ctx := context.Background() + const streams = 3 + const chunkSize = multipart.BufferSize + const size = (2*streams+1)*chunkSize + 255 + content := makeContent(t, size) + o := mockobject.New("test.bin").WithContent(content, mockobject.SeekModeNone) + + cr := New(ctx, o, chunkSize, 0, streams) + + for _, test := range []struct { + name string + offset int64 + seekMode int + }{ + {name: "Straight", offset: 0, seekMode: -1}, + {name: "Rewind", offset: 0, seekMode: io.SeekStart}, + {name: "NearStart", offset: 1, seekMode: io.SeekStart}, + {name: "NearEnd", offset: size - 2*chunkSize - 127, seekMode: io.SeekEnd}, + } { + t.Run(test.name, func(t *testing.T) { + if test.seekMode >= 0 { + var n int64 + var err error + if test.seekMode == io.SeekEnd { + n, err = cr.Seek(test.offset-size, test.seekMode) + } else { + n, err = cr.Seek(test.offset, test.seekMode) + } + require.NoError(t, err) + assert.Equal(t, test.offset, n) + } + got, err := io.ReadAll(cr) + require.NoError(t, err) + require.Equal(t, len(content[test.offset:]), len(got)) + assert.Equal(t, content[test.offset:], got) + }) + } + + require.NoError(t, cr.Close()) + + t.Run("Seeky", func(t *testing.T) { + cr := New(ctx, o, chunkSize, 0, streams) + offset := 0 + buf := make([]byte, 1024) + + for { + // Read and check a random read + readSize := rand.Intn(1024) + readBuf := buf[:readSize] + n, err := cr.Read(readBuf) + + require.Equal(t, content[offset:offset+n], readBuf[:n]) + offset += n + + if err == io.EOF { + assert.Equal(t, size, offset) + break + } + require.NoError(t, err) + + // Now do a smaller random seek backwards + seekSize := rand.Intn(512) + if offset-seekSize < 0 { + seekSize = offset + } + nn, err := cr.Seek(-int64(seekSize), io.SeekCurrent) + offset -= seekSize + require.NoError(t, err) + assert.Equal(t, nn, int64(offset)) + } + + require.NoError(t, cr.Close()) + }) + +} diff --git a/fs/chunkedreader/sequential.go b/fs/chunkedreader/sequential.go new file mode 100644 index 000000000..ee9400fb4 --- /dev/null +++ b/fs/chunkedreader/sequential.go @@ -0,0 +1,232 @@ +package chunkedreader + +import ( + "context" + "io" + "sync" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/hash" +) + +// sequential is a reader for an Object with the possibility +// of reading the source in chunks of given size +// +// An initialChunkSize of <= 0 will disable chunked reading. +type sequential struct { + ctx context.Context + mu sync.Mutex // protects following fields + o fs.Object // source to read from + rc io.ReadCloser // reader for the current open chunk + offset int64 // offset the next Read will start. -1 forces a reopen of o + chunkOffset int64 // beginning of the current or next chunk + chunkSize int64 // length of the current or next chunk. -1 will open o from chunkOffset to the end + initialChunkSize int64 // default chunkSize after the chunk specified by RangeSeek is complete + maxChunkSize int64 // consecutive read chunks will double in size until reached. -1 means no limit + customChunkSize bool // is the current chunkSize set by RangeSeek? + closed bool // has Close been called? +} + +// Make a new sequential chunked reader +func newSequential(ctx context.Context, o fs.Object, initialChunkSize int64, maxChunkSize int64) ChunkedReader { + return &sequential{ + ctx: ctx, + o: o, + offset: -1, + chunkSize: initialChunkSize, + initialChunkSize: initialChunkSize, + maxChunkSize: maxChunkSize, + } +} + +// Read from the file - for details see io.Reader +func (cr *sequential) Read(p []byte) (n int, err error) { + cr.mu.Lock() + defer cr.mu.Unlock() + + if cr.closed { + return 0, ErrorFileClosed + } + + for reqSize := int64(len(p)); reqSize > 0; reqSize = int64(len(p)) { + // the current chunk boundary. valid only when chunkSize > 0 + chunkEnd := cr.chunkOffset + cr.chunkSize + + fs.Debugf(cr.o, "ChunkedReader.Read at %d length %d chunkOffset %d chunkSize %d", cr.offset, reqSize, cr.chunkOffset, cr.chunkSize) + + switch { + case cr.chunkSize > 0 && cr.offset == chunkEnd: // last chunk read completely + cr.chunkOffset = cr.offset + if cr.customChunkSize { // last chunkSize was set by RangeSeek + cr.customChunkSize = false + cr.chunkSize = cr.initialChunkSize + } else { + cr.chunkSize *= 2 + if cr.chunkSize > cr.maxChunkSize && cr.maxChunkSize != -1 { + cr.chunkSize = cr.maxChunkSize + } + } + // recalculate the chunk boundary. valid only when chunkSize > 0 + chunkEnd = cr.chunkOffset + cr.chunkSize + fallthrough + case cr.offset == -1: // first Read or Read after RangeSeek + err = cr.openRange() + if err != nil { + return + } + } + + var buf []byte + chunkRest := chunkEnd - cr.offset + // limit read to chunk boundaries if chunkSize > 0 + if reqSize > chunkRest && cr.chunkSize > 0 { + buf, p = p[0:chunkRest], p[chunkRest:] + } else { + buf, p = p, nil + } + var rn int + rn, err = io.ReadFull(cr.rc, buf) + n += rn + cr.offset += int64(rn) + if err != nil { + if err == io.ErrUnexpectedEOF { + err = io.EOF + } + return + } + } + return n, nil +} + +// Close the file - for details see io.Closer +// +// All methods on ChunkedReader will return ErrorFileClosed afterwards +func (cr *sequential) Close() error { + cr.mu.Lock() + defer cr.mu.Unlock() + + if cr.closed { + return ErrorFileClosed + } + cr.closed = true + + return cr.resetReader(nil, 0) +} + +// Seek the file - for details see io.Seeker +func (cr *sequential) Seek(offset int64, whence int) (int64, error) { + return cr.RangeSeek(context.TODO(), offset, whence, -1) +} + +// RangeSeek the file - for details see RangeSeeker +// +// The specified length will only apply to the next chunk opened. +// RangeSeek will not reopen the source until Read is called. +func (cr *sequential) RangeSeek(ctx context.Context, offset int64, whence int, length int64) (int64, error) { + cr.mu.Lock() + defer cr.mu.Unlock() + + fs.Debugf(cr.o, "ChunkedReader.RangeSeek from %d to %d length %d", cr.offset, offset, length) + + if cr.closed { + return 0, ErrorFileClosed + } + + size := cr.o.Size() + switch whence { + case io.SeekStart: + cr.offset = 0 + case io.SeekEnd: + if size < 0 { + return 0, ErrorInvalidSeek // Can't seek from end for unknown size + } + cr.offset = size + } + // set the new chunk start + cr.chunkOffset = cr.offset + offset + // force reopen on next Read + cr.offset = -1 + if length > 0 { + cr.customChunkSize = true + cr.chunkSize = length + } else { + cr.chunkSize = cr.initialChunkSize + } + if cr.chunkOffset < 0 || cr.chunkOffset >= size { + cr.chunkOffset = 0 + return 0, ErrorInvalidSeek + } + return cr.chunkOffset, nil +} + +// Open forces the connection to be opened +func (cr *sequential) Open() (ChunkedReader, error) { + cr.mu.Lock() + defer cr.mu.Unlock() + + if cr.rc != nil && cr.offset != -1 { + return cr, nil + } + return cr, cr.openRange() +} + +// openRange will open the source Object with the current chunk range +// +// If the current open reader implements RangeSeeker, it is tried first. +// When RangeSeek fails, o.Open with a RangeOption is used. +// +// A length <= 0 will request till the end of the file +func (cr *sequential) openRange() error { + offset, length := cr.chunkOffset, cr.chunkSize + fs.Debugf(cr.o, "ChunkedReader.openRange at %d length %d", offset, length) + + if cr.closed { + return ErrorFileClosed + } + + if rs, ok := cr.rc.(fs.RangeSeeker); ok { + n, err := rs.RangeSeek(cr.ctx, offset, io.SeekStart, length) + if err == nil && n == offset { + cr.offset = offset + return nil + } + if err != nil { + fs.Debugf(cr.o, "ChunkedReader.openRange seek failed (%s). Trying Open", err) + } else { + fs.Debugf(cr.o, "ChunkedReader.openRange seeked to wrong offset. Wanted %d, got %d. Trying Open", offset, n) + } + } + + var rc io.ReadCloser + var err error + if length <= 0 { + if offset == 0 { + rc, err = cr.o.Open(cr.ctx, &fs.HashesOption{Hashes: hash.Set(hash.None)}) + } else { + rc, err = cr.o.Open(cr.ctx, &fs.HashesOption{Hashes: hash.Set(hash.None)}, &fs.RangeOption{Start: offset, End: -1}) + } + } else { + rc, err = cr.o.Open(cr.ctx, &fs.HashesOption{Hashes: hash.Set(hash.None)}, &fs.RangeOption{Start: offset, End: offset + length - 1}) + } + if err != nil { + return err + } + return cr.resetReader(rc, offset) +} + +// resetReader switches the current reader to the given reader. +// The old reader will be Close'd before setting the new reader. +func (cr *sequential) resetReader(rc io.ReadCloser, offset int64) error { + if cr.rc != nil { + if err := cr.rc.Close(); err != nil { + return err + } + } + cr.rc = rc + cr.offset = offset + return nil +} + +var ( + _ ChunkedReader = (*sequential)(nil) +) diff --git a/fs/chunkedreader/sequential_test.go b/fs/chunkedreader/sequential_test.go new file mode 100644 index 000000000..b384c893d --- /dev/null +++ b/fs/chunkedreader/sequential_test.go @@ -0,0 +1,20 @@ +package chunkedreader + +import ( + "testing" + + _ "github.com/rclone/rclone/backend/local" + "github.com/rclone/rclone/fstest/mockobject" +) + +func TestSequential(t *testing.T) { + content := makeContent(t, 1024) + + for _, mode := range mockobject.SeekModes { + t.Run(mode.String(), testRead(content, mode, 0)) + } +} + +func TestSequentialErrorAfterClose(t *testing.T) { + testErrorAfterClose(t, 0) +} diff --git a/vfs/read.go b/vfs/read.go index 6ea0de8e7..aa2b6555a 100644 --- a/vfs/read.go +++ b/vfs/read.go @@ -75,7 +75,8 @@ func (fh *ReadFileHandle) openPending() (err error) { return nil } o := fh.file.getObject() - r, err := chunkedreader.New(context.TODO(), o, int64(fh.file.VFS().Opt.ChunkSize), int64(fh.file.VFS().Opt.ChunkSizeLimit)).Open() + opt := &fh.file.VFS().Opt + r, err := chunkedreader.New(context.TODO(), o, int64(opt.ChunkSize), int64(opt.ChunkSizeLimit), opt.ChunkStreams).Open() if err != nil { return err } @@ -127,7 +128,7 @@ func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) { } fh.r.StopBuffering() // stop the background reading first oldReader := fh.r.GetReader() - r, ok := oldReader.(*chunkedreader.ChunkedReader) + r, ok := oldReader.(chunkedreader.ChunkedReader) if !ok { fs.Logf(fh.remote, "ReadFileHandle.Read expected reader to be a ChunkedReader, got %T", oldReader) reopen = true @@ -148,7 +149,8 @@ func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) { } // re-open with a seek o := fh.file.getObject() - r = chunkedreader.New(context.TODO(), o, int64(fh.file.VFS().Opt.ChunkSize), int64(fh.file.VFS().Opt.ChunkSizeLimit)) + opt := &fh.file.VFS().Opt + r = chunkedreader.New(context.TODO(), o, int64(opt.ChunkSize), int64(opt.ChunkSizeLimit), opt.ChunkStreams) _, err := r.Seek(offset, 0) if err != nil { fs.Debugf(fh.remote, "ReadFileHandle.Read seek failed: %v", err) diff --git a/vfs/vfs.md b/vfs/vfs.md index 8d5350764..d8f407cc4 100644 --- a/vfs/vfs.md +++ b/vfs/vfs.md @@ -230,6 +230,11 @@ These flags control the chunking: --vfs-read-chunk-size SizeSuffix Read the source objects in chunks (default 128M) --vfs-read-chunk-size-limit SizeSuffix Max chunk doubling size (default off) + --vfs-read-chunk-streams int The number of parallel streams to read at once + +The chunking behaves differently depending on the `--vfs-read-chunk-streams` parameter. + +#### `--vfs-read-chunk-streams` == 0 Rclone will start reading a chunk of size `--vfs-read-chunk-size`, and then double the size for each read. When `--vfs-read-chunk-size-limit` is @@ -245,6 +250,30 @@ When `--vfs-read-chunk-size-limit 500M` is specified, the result would be Setting `--vfs-read-chunk-size` to `0` or "off" disables chunked reading. +The chunks will not be buffered in memory. + +#### `--vfs-read-chunk-streams` > 0 + +Rclone reads `--vfs-read-chunk-streams` chunks of size +`--vfs-read-chunk-size` concurrently. The size for each read will stay +constant. + +This improves performance performance massively on high latency links +or very high bandwidth links to high performance object stores. + +Some experimentation will be needed to find the optimum values of +`--vfs-read-chunk-size` and `--vfs-read-chunk-streams` as these will +depend on the backend in use and the latency to the backend. + +For high performance object stores (eg AWS S3) a reasonable place to +start might be `--vfs-read-chunk-streams 16` and +`--vfs-read-chunk-size 4M`. In testing with AWS S3 the performance +scaled roughly as the `--vfs-read-chunk-streams` setting. + +Similar settings should work for high latency links, but depending on +the latency they may need more `--vfs-read-chunk-streams` in order to +get the throughput. + ### VFS Performance These flags may be used to enable/disable features of the VFS for diff --git a/vfs/vfscache/downloaders/downloaders.go b/vfs/vfscache/downloaders/downloaders.go index 2546afdd9..87783fb6b 100644 --- a/vfs/vfscache/downloaders/downloaders.go +++ b/vfs/vfscache/downloaders/downloaders.go @@ -540,7 +540,7 @@ func (dl *downloader) open(offset int64) (err error) { // } // in0, err := operations.NewReOpen(dl.dls.ctx, dl.dls.src, ci.LowLevelRetries, dl.dls.item.c.hashOption, rangeOption) - in0 := chunkedreader.New(context.TODO(), dl.dls.src, int64(dl.dls.opt.ChunkSize), int64(dl.dls.opt.ChunkSizeLimit)) + in0 := chunkedreader.New(context.TODO(), dl.dls.src, int64(dl.dls.opt.ChunkSize), int64(dl.dls.opt.ChunkSizeLimit), dl.dls.opt.ChunkStreams) _, err = in0.Seek(offset, 0) if err != nil { return fmt.Errorf("vfs reader: failed to open source file: %w", err) diff --git a/vfs/vfscommon/options.go b/vfs/vfscommon/options.go index 12ccd8664..591ce5044 100644 --- a/vfs/vfscommon/options.go +++ b/vfs/vfscommon/options.go @@ -79,6 +79,11 @@ var OptionsInfo = fs.Options{{ Default: fs.SizeSuffix(-1), Help: "If greater than --vfs-read-chunk-size, double the chunk size after each chunk read, until the limit is reached ('off' is unlimited)", Groups: "VFS", +}, { + Name: "vfs_read_chunk_streams", + Default: 0, + Help: "The number of parallel streams to read at once", + Groups: "VFS", }, { Name: "dir_perms", Default: FileMode(0777), @@ -171,6 +176,7 @@ type Options struct { FilePerms FileMode `config:"file_perms"` ChunkSize fs.SizeSuffix `config:"vfs_read_chunk_size"` // if > 0 read files in chunks ChunkSizeLimit fs.SizeSuffix `config:"vfs_read_chunk_size_limit"` // if > ChunkSize double the chunk size after each chunk until reached + ChunkStreams int `config:"vfs_read_chunk_streams"` // Number of download streams to use CacheMode CacheMode `config:"vfs_cache_mode"` CacheMaxAge fs.Duration `config:"vfs_cache_max_age"` CacheMaxSize fs.SizeSuffix `config:"vfs_cache_max_size"`