diff --git a/fs/accounting/accounting.go b/fs/accounting/accounting.go index c23ce74df..6fa098c0e 100644 --- a/fs/accounting/accounting.go +++ b/fs/accounting/accounting.go @@ -96,6 +96,16 @@ func (acc *Account) GetReader() io.ReadCloser { return acc.origIn } +// GetAsyncReader returns the current AsyncReader or nil if Account is unbuffered +func (acc *Account) GetAsyncReader() *asyncreader.AsyncReader { + acc.mu.Lock() + defer acc.mu.Unlock() + if asyncIn, ok := acc.in.(*asyncreader.AsyncReader); ok { + return asyncIn + } + return nil +} + // StopBuffering stops the async buffer doing any more buffering func (acc *Account) StopBuffering() { if asyncIn, ok := acc.in.(*asyncreader.AsyncReader); ok { diff --git a/fs/asyncreader/asyncreader.go b/fs/asyncreader/asyncreader.go index e19a4766d..9f7437d79 100644 --- a/fs/asyncreader/asyncreader.go +++ b/fs/asyncreader/asyncreader.go @@ -180,6 +180,76 @@ func (a *AsyncReader) WriteTo(w io.Writer) (n int64, err error) { } } +// SkipBytes will try to seek 'skip' bytes relative to the current position. +// On success it returns true. If 'skip' is outside the current buffer data or +// an error occurs, Abandon is called and false is returned. +func (a *AsyncReader) SkipBytes(skip int) (ok bool) { + a.mu.Lock() + defer func() { + a.mu.Unlock() + if !ok { + a.Abandon() + } + }() + + if a.err != nil { + return false + } + if skip < 0 { + // seek backwards if skip is inside current buffer + if a.cur != nil && a.cur.offset+skip >= 0 { + a.cur.offset += skip + return true + } + return false + } + // early return if skip is past the maximum buffer capacity + if skip >= (len(a.ready)+1)*BufferSize { + return false + } + + refillTokens := 0 + for { + if a.cur.isEmpty() { + if a.cur != nil { + a.putBuffer(a.cur) + refillTokens++ + a.cur = nil + } + select { + case b, ok := <-a.ready: + if !ok { + return false + } + a.cur = b + default: + return false + } + } + + n := len(a.cur.buffer()) + if n > skip { + n = skip + } + a.cur.increment(n) + skip -= n + if skip == 0 { + for ; refillTokens > 0; refillTokens-- { + a.token <- struct{}{} + } + // If at end of buffer, store any error, if present + if a.cur.isEmpty() && a.cur.err != nil { + a.err = a.cur.err + } + return true + } + if a.cur.err != nil { + a.err = a.cur.err + return false + } + } +} + // Abandon will ensure that the underlying async reader is shut down. // It will NOT close the input supplied on New. func (a *AsyncReader) Abandon() { diff --git a/fs/asyncreader/asyncreader_test.go b/fs/asyncreader/asyncreader_test.go index 2f9648f91..43e6c6de3 100644 --- a/fs/asyncreader/asyncreader_test.go +++ b/fs/asyncreader/asyncreader_test.go @@ -3,14 +3,17 @@ package asyncreader import ( "bufio" "bytes" + "fmt" "io" "io/ioutil" + "math/rand" "strings" "sync" "testing" "testing/iotest" "time" + "github.com/ncw/rclone/lib/readers" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -281,3 +284,78 @@ func testAsyncReaderClose(t *testing.T, writeto bool) { } func TestAsyncReaderCloseRead(t *testing.T) { testAsyncReaderClose(t, false) } func TestAsyncReaderCloseWriteTo(t *testing.T) { testAsyncReaderClose(t, true) } + +func TestAsyncReaderSkipBytes(t *testing.T) { + t.Parallel() + data := make([]byte, 15000) + buf := make([]byte, len(data)) + r := rand.New(rand.NewSource(42)) + + n, err := r.Read(data) + require.NoError(t, err) + require.Equal(t, len(data), n) + + initialReads := []int{0, 1, 100, 2048, + softStartInitial - 1, softStartInitial, softStartInitial + 1, + 8000, len(data)} + skips := []int{-1000, -101, -100, -99, 0, 1, 2048, + softStartInitial - 1, softStartInitial, softStartInitial + 1, + 8000, len(data), BufferSize, 2 * BufferSize} + + for buffers := 1; buffers <= 5; buffers++ { + t.Run(fmt.Sprintf("%d", buffers), func(t *testing.T) { + for _, initialRead := range initialReads { + t.Run(fmt.Sprintf("%d", initialRead), func(t *testing.T) { + for _, skip := range skips { + t.Run(fmt.Sprintf("%d", skip), func(t *testing.T) { + ar, err := New(ioutil.NopCloser(bytes.NewReader(data)), buffers) + require.NoError(t, err) + + wantSkipFalse := false + buf = buf[:initialRead] + n, err := readers.ReadFill(ar, buf) + if initialRead >= len(data) { + wantSkipFalse = true + if initialRead > len(data) { + assert.Equal(t, err, io.EOF) + } else { + assert.True(t, err == nil || err == io.EOF) + } + assert.Equal(t, len(data), n) + assert.Equal(t, data, buf[:len(data)]) + } else { + assert.NoError(t, err) + assert.Equal(t, initialRead, n) + assert.Equal(t, data[:initialRead], buf) + } + + skipped := ar.SkipBytes(skip) + buf = buf[:1024] + n, err = readers.ReadFill(ar, buf) + offset := initialRead + skip + if skipped { + assert.False(t, wantSkipFalse) + l := len(buf) + if offset >= len(data) { + assert.Equal(t, err, io.EOF) + } else { + if offset+1024 >= len(data) { + l = len(data) - offset + } + assert.Equal(t, l, n) + assert.Equal(t, data[offset:offset+l], buf[:l]) + } + } else { + if initialRead >= len(data) { + assert.Equal(t, err, io.EOF) + } else { + assert.True(t, err == errorStreamAbandoned || err == io.EOF) + } + } + }) + } + }) + } + }) + } +} diff --git a/vfs/read.go b/vfs/read.go index 5a3cecd6d..3e7b1b920 100644 --- a/vfs/read.go +++ b/vfs/read.go @@ -104,8 +104,16 @@ func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) { if fh.noSeek { return ESPIPE } - fh.r.StopBuffering() // stop the background reading first fh.hash = nil + if !reopen { + ar := fh.r.GetAsyncReader() + // try to fullfill the seek with buffer discard + if ar != nil && ar.SkipBytes(int(offset-fh.offset)) { + fh.offset = offset + return nil + } + } + fh.r.StopBuffering() // stop the background reading first oldReader := fh.r.GetReader() r, ok := oldReader.(*chunkedreader.ChunkedReader) if !ok {