From e53892f53ba34224ca229526c7a0eeeac4b76091 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Thu, 23 Nov 2017 10:59:28 +0000 Subject: [PATCH] fs,drive,dropbox: Make and use new RepeatableReader variants to lower memory use RepeatableReaderSized has a pre-allocated buffer which should help with memory usage - before it grew the buffer. Since we know the size of the chunks, pre-allocating it should be much more efficient. RepeatableReaderBuffer uses the buffer passed in. RepeatableLimit* are convenience funcitions for wrapping a reader in an io.LimitReader and then a RepeatableReader with the same buffer size. --- drive/upload.go | 3 ++- dropbox/dropbox.go | 7 ++++--- fs/readers.go | 31 +++++++++++++++++++++++++++++++ 3 files changed, 37 insertions(+), 4 deletions(-) diff --git a/drive/upload.go b/drive/upload.go index 9c0032462..46d75c043 100644 --- a/drive/upload.go +++ b/drive/upload.go @@ -195,12 +195,13 @@ func (rx *resumableUpload) Upload() (*drive.File, error) { start := int64(0) var StatusCode int var err error + buf := make([]byte, int(chunkSize)) for start < rx.ContentLength { reqSize := rx.ContentLength - start if reqSize >= int64(chunkSize) { reqSize = int64(chunkSize) } - chunk := fs.NewRepeatableReader(io.LimitReader(rx.Media, reqSize)) + chunk := fs.NewRepeatableLimitReaderBuffer(rx.Media, buf, reqSize) // Transfer the chunk err = rx.f.pacer.Call(func() (bool, error) { diff --git a/dropbox/dropbox.go b/dropbox/dropbox.go index 49ce003d6..3b659c317 100644 --- a/dropbox/dropbox.go +++ b/dropbox/dropbox.go @@ -832,6 +832,7 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size chunks = int(size/chunkSize) + 1 } in := fs.NewCountingReader(in0) + buf := make([]byte, int(chunkSize)) fmtChunk := func(cur int, last bool) { if chunks == 0 && last { @@ -846,7 +847,7 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size // write the first chunk fmtChunk(1, false) var res *files.UploadSessionStartResult - chunk := fs.NewRepeatableReader(&io.LimitedReader{R: in, N: chunkSize}) + chunk := fs.NewRepeatableLimitReaderBuffer(in, buf, chunkSize) err = o.fs.pacer.Call(func() (bool, error) { // seek to the start in case this is a retry if _, err = chunk.Seek(0, 0); err != nil { @@ -882,7 +883,7 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size } cursor.Offset = in.BytesRead() fmtChunk(currentChunk, false) - chunk = fs.NewRepeatableReader(&io.LimitedReader{R: in, N: chunkSize}) + chunk = fs.NewRepeatableLimitReaderBuffer(in, buf, chunkSize) err = o.fs.pacer.Call(func() (bool, error) { // seek to the start in case this is a retry if _, err = chunk.Seek(0, 0); err != nil { @@ -905,7 +906,7 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size Commit: commitInfo, } fmtChunk(currentChunk, true) - chunk = fs.NewRepeatableReader(in) + chunk = fs.NewRepeatableReaderBuffer(in, buf) err = o.fs.pacer.Call(func() (bool, error) { // seek to the start in case this is a retry if _, err = chunk.Seek(0, 0); err != nil { diff --git a/fs/readers.go b/fs/readers.go index ef5b0c471..c7a9271a5 100644 --- a/fs/readers.go +++ b/fs/readers.go @@ -63,3 +63,34 @@ func (r *RepeatableReader) Read(b []byte) (n int, err error) { func NewRepeatableReader(r io.Reader) *RepeatableReader { return &RepeatableReader{in: r} } + +// NewRepeatableReaderSized create new repeatable reader from Reader r +// with an initial buffer of size. +func NewRepeatableReaderSized(r io.Reader, size int) *RepeatableReader { + return &RepeatableReader{ + in: r, + b: make([]byte, 0, size), + } +} + +// NewRepeatableLimitReader create new repeatable reader from Reader r +// with an initial buffer of size wrapped in a io.LimitReader to read +// only size. +func NewRepeatableLimitReader(r io.Reader, size int) *RepeatableReader { + return NewRepeatableReaderSized(io.LimitReader(r, int64(size)), size) +} + +// NewRepeatableReaderBuffer create new repeatable reader from Reader r +// using the buffer passed in. +func NewRepeatableReaderBuffer(r io.Reader, buf []byte) *RepeatableReader { + return &RepeatableReader{ + in: r, + b: buf[:0], + } +} + +// NewRepeatableLimitReaderBuffer create new repeatable reader from +// Reader r and buf wrapped in a io.LimitReader to read only size. +func NewRepeatableLimitReaderBuffer(r io.Reader, buf []byte, size int64) *RepeatableReader { + return NewRepeatableReaderBuffer(io.LimitReader(r, int64(size)), buf) +}