diff --git a/fs/accounting.go b/fs/accounting.go index f9cd600ea..ef85f4a8e 100644 --- a/fs/accounting.go +++ b/fs/accounting.go @@ -343,12 +343,8 @@ func NewAccount(in io.ReadCloser, obj Object) *Account { return NewAccountSizeName(in, obj.Size(), obj.Remote()) } -// NewAccountSizeNameWithBuffer makes a Account reader for an io.ReadCloser of -// the given size and name -// -// If the file is above a certain size it adds an Async reader -func NewAccountSizeNameWithBuffer(in io.ReadCloser, size int64, name string) *Account { - const bufSize = 1024 * 1024 +// bufferReadCloser returns a buffered version of in if necessary +func bufferReadCloser(in io.ReadCloser, size int64, name string) io.ReadCloser { var buffers int if size >= int64(Config.BufferSize) { buffers = int(int64(Config.BufferSize) / asyncBufferSize) @@ -364,7 +360,15 @@ func NewAccountSizeNameWithBuffer(in io.ReadCloser, size int64, name string) *Ac in = newIn } } - return NewAccountSizeName(in, size, name) + return in +} + +// NewAccountSizeNameWithBuffer makes a Account reader for an io.ReadCloser of +// the given size and name +// +// If the file is above a certain size it adds an Async reader +func NewAccountSizeNameWithBuffer(in io.ReadCloser, size int64, name string) *Account { + return NewAccountSizeName(bufferReadCloser(in, size, name), size, name) } // NewAccountWithBuffer makes a Account reader for an object @@ -384,7 +388,10 @@ func (acc *Account) GetReader() io.ReadCloser { // UpdateReader updates the underlying io.ReadCloser func (acc *Account) UpdateReader(in io.ReadCloser) { acc.mu.Lock() - acc.in = in + if asyncIn, ok := acc.in.(*asyncReader); ok { + asyncIn.Abandon() + } + acc.in = bufferReadCloser(in, acc.size, acc.name) acc.mu.Unlock() } diff --git a/fs/buffer.go b/fs/buffer.go index 754e79d7b..26cd1770c 100644 --- a/fs/buffer.go +++ b/fs/buffer.go @@ -152,9 +152,10 @@ func (a *asyncReader) WriteTo(w io.Writer) (n int64, err error) { } } -// Close will ensure that the underlying async reader is shut down. -// It will also close the input supplied on newAsyncReader. -func (a *asyncReader) Close() (err error) { +// close will ensure that the underlying async reader is shut down. +// If closeIn is set it will also close the input supplied on +// newAsyncReader. +func (a *asyncReader) close(closeIn bool) (err error) { // Return if already closed select { case <-a.exit: @@ -171,7 +172,22 @@ func (a *asyncReader) Close() (err error) { for b := range a.ready { a.putBuffer(b) } - return a.in.Close() + if closeIn { + return a.in.Close() + } + return nil +} + +// Close will ensure that the underlying async reader is shut down. +// It will also close the input supplied on newAsyncReader. +func (a *asyncReader) Close() (err error) { + return a.close(true) +} + +// Abandon will ensure that the underlying async reader is shut down. +// It will NOT close the input supplied on newAsyncReader. +func (a *asyncReader) Abandon() { + _ = a.close(false) } // Internal buffer