fs,drive,dropbox: Make and use new RepeatableReader variants to lower memory use

RepeatableReaderSized has a pre-allocated buffer which should help
with memory usage - before it grew the buffer.  Since we know the size
of the chunks, pre-allocating it should be much more efficient.

RepeatableReaderBuffer uses the buffer passed in.

RepeatableLimit* are convenience funcitions for wrapping a reader in
an io.LimitReader and then a RepeatableReader with the same buffer
size.
This commit is contained in:
Nick Craig-Wood 2017-11-23 10:59:28 +00:00
parent 6c62fced60
commit e53892f53b
3 changed files with 37 additions and 4 deletions

View File

@ -195,12 +195,13 @@ func (rx *resumableUpload) Upload() (*drive.File, error) {
start := int64(0) start := int64(0)
var StatusCode int var StatusCode int
var err error var err error
buf := make([]byte, int(chunkSize))
for start < rx.ContentLength { for start < rx.ContentLength {
reqSize := rx.ContentLength - start reqSize := rx.ContentLength - start
if reqSize >= int64(chunkSize) { if reqSize >= int64(chunkSize) {
reqSize = int64(chunkSize) reqSize = int64(chunkSize)
} }
chunk := fs.NewRepeatableReader(io.LimitReader(rx.Media, reqSize)) chunk := fs.NewRepeatableLimitReaderBuffer(rx.Media, buf, reqSize)
// Transfer the chunk // Transfer the chunk
err = rx.f.pacer.Call(func() (bool, error) { err = rx.f.pacer.Call(func() (bool, error) {

View File

@ -832,6 +832,7 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
chunks = int(size/chunkSize) + 1 chunks = int(size/chunkSize) + 1
} }
in := fs.NewCountingReader(in0) in := fs.NewCountingReader(in0)
buf := make([]byte, int(chunkSize))
fmtChunk := func(cur int, last bool) { fmtChunk := func(cur int, last bool) {
if chunks == 0 && last { if chunks == 0 && last {
@ -846,7 +847,7 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
// write the first chunk // write the first chunk
fmtChunk(1, false) fmtChunk(1, false)
var res *files.UploadSessionStartResult var res *files.UploadSessionStartResult
chunk := fs.NewRepeatableReader(&io.LimitedReader{R: in, N: chunkSize}) chunk := fs.NewRepeatableLimitReaderBuffer(in, buf, chunkSize)
err = o.fs.pacer.Call(func() (bool, error) { err = o.fs.pacer.Call(func() (bool, error) {
// seek to the start in case this is a retry // seek to the start in case this is a retry
if _, err = chunk.Seek(0, 0); err != nil { if _, err = chunk.Seek(0, 0); err != nil {
@ -882,7 +883,7 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
} }
cursor.Offset = in.BytesRead() cursor.Offset = in.BytesRead()
fmtChunk(currentChunk, false) fmtChunk(currentChunk, false)
chunk = fs.NewRepeatableReader(&io.LimitedReader{R: in, N: chunkSize}) chunk = fs.NewRepeatableLimitReaderBuffer(in, buf, chunkSize)
err = o.fs.pacer.Call(func() (bool, error) { err = o.fs.pacer.Call(func() (bool, error) {
// seek to the start in case this is a retry // seek to the start in case this is a retry
if _, err = chunk.Seek(0, 0); err != nil { if _, err = chunk.Seek(0, 0); err != nil {
@ -905,7 +906,7 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
Commit: commitInfo, Commit: commitInfo,
} }
fmtChunk(currentChunk, true) fmtChunk(currentChunk, true)
chunk = fs.NewRepeatableReader(in) chunk = fs.NewRepeatableReaderBuffer(in, buf)
err = o.fs.pacer.Call(func() (bool, error) { err = o.fs.pacer.Call(func() (bool, error) {
// seek to the start in case this is a retry // seek to the start in case this is a retry
if _, err = chunk.Seek(0, 0); err != nil { if _, err = chunk.Seek(0, 0); err != nil {

View File

@ -63,3 +63,34 @@ func (r *RepeatableReader) Read(b []byte) (n int, err error) {
func NewRepeatableReader(r io.Reader) *RepeatableReader { func NewRepeatableReader(r io.Reader) *RepeatableReader {
return &RepeatableReader{in: r} return &RepeatableReader{in: r}
} }
// NewRepeatableReaderSized create new repeatable reader from Reader r
// with an initial buffer of size.
func NewRepeatableReaderSized(r io.Reader, size int) *RepeatableReader {
return &RepeatableReader{
in: r,
b: make([]byte, 0, size),
}
}
// NewRepeatableLimitReader create new repeatable reader from Reader r
// with an initial buffer of size wrapped in a io.LimitReader to read
// only size.
func NewRepeatableLimitReader(r io.Reader, size int) *RepeatableReader {
return NewRepeatableReaderSized(io.LimitReader(r, int64(size)), size)
}
// NewRepeatableReaderBuffer create new repeatable reader from Reader r
// using the buffer passed in.
func NewRepeatableReaderBuffer(r io.Reader, buf []byte) *RepeatableReader {
return &RepeatableReader{
in: r,
b: buf[:0],
}
}
// NewRepeatableLimitReaderBuffer create new repeatable reader from
// Reader r and buf wrapped in a io.LimitReader to read only size.
func NewRepeatableLimitReaderBuffer(r io.Reader, buf []byte, size int64) *RepeatableReader {
return NewRepeatableReaderBuffer(io.LimitReader(r, int64(size)), buf)
}