diff --git a/cmd/mount/read.go b/cmd/mount/read.go index 091d33acc..5360e97d4 100644 --- a/cmd/mount/read.go +++ b/cmd/mount/read.go @@ -16,7 +16,7 @@ import ( type ReadFileHandle struct { mu sync.Mutex closed bool // set if handle has been closed - r io.ReadCloser + r *fs.Account o fs.Object readCalled bool // set if read has been called offset int64 @@ -29,7 +29,7 @@ func newReadFileHandle(o fs.Object) (*ReadFileHandle, error) { } fh := &ReadFileHandle{ o: o, - r: fs.NewAccount(r, o), // account and buffer the transfer + r: fs.NewAccount(r, o), // account the transfer } fs.Stats.Transferring(fh.o.Remote()) return fh, nil @@ -46,7 +46,8 @@ var _ fusefs.HandleReader = (*ReadFileHandle)(nil) // Must be called with fh.mu held func (fh *ReadFileHandle) seek(offset int64) error { // Can we seek it directly? - if do, ok := fh.r.(io.Seeker); ok { + oldReader := fh.r.GetReader() + if do, ok := oldReader.(io.Seeker); ok { fs.Debug(fh.o, "ReadFileHandle.seek from %d to %d (io.Seeker)", fh.offset, offset) _, err := do.Seek(offset, 0) if err != nil { @@ -61,11 +62,12 @@ func (fh *ReadFileHandle) seek(offset int64) error { fs.Debug(fh.o, "ReadFileHandle.Read seek failed: %v", err) return err } - err = fh.r.Close() + err = oldReader.Close() if err != nil { fs.Debug(fh.o, "ReadFileHandle.Read seek close old failed: %v", err) } - fh.r = fs.NewAccount(r, fh.o) // account and buffer the transfer + // fh.r = fs.NewAccount(r, fh.o) // account the transfer + fh.r.UpdateReader(r) } fh.offset = offset return nil diff --git a/cmd/mount/write.go b/cmd/mount/write.go index 8e1da2b92..c9420a556 100644 --- a/cmd/mount/write.go +++ b/cmd/mount/write.go @@ -38,7 +38,7 @@ func newWriteFileHandle(d *Dir, f *File, src fs.ObjectInfo) (*WriteFileHandle, e file: f, } fh.pipeReader, fh.pipeWriter = io.Pipe() - r := fs.NewAccountSizeName(fh.pipeReader, 0, src.Remote()) // account and buffer the transfer + r := fs.NewAccountSizeName(fh.pipeReader, 0, src.Remote()) // account the transfer go func() { o, err := d.f.Put(r, src) fh.o = o diff --git a/fs/accounting.go b/fs/accounting.go index 355753ebb..be649562c 100644 --- a/fs/accounting.go +++ b/fs/accounting.go @@ -278,22 +278,7 @@ type Account struct { // NewAccountSizeName makes a Account reader for an io.ReadCloser of // the given size and name -// -// If the file is above a certain size it adds an Async reader func NewAccountSizeName(in io.ReadCloser, size int64, name string) *Account { - // On big files add a buffer - if size > 10<<20 { - const memUsed = 16 * 1024 * 1024 - const bufSize = 128 * 1024 - const buffers = memUsed / bufSize - newIn, err := newAsyncReader(in, buffers, bufSize) - if err != nil { - ErrorLog(name, "Failed to make buffer: %v", err) - } else { - in = newIn - } - } - acc := &Account{ in: in, size: size, @@ -312,6 +297,47 @@ func NewAccount(in io.ReadCloser, obj Object) *Account { return NewAccountSizeName(in, obj.Size(), obj.Remote()) } +// NewAccountSizeName makes a Account reader for an io.ReadCloser of +// the given size and name +// +// If the file is above a certain size it adds an Async reader +func NewAccountSizeNameWithBuffer(in io.ReadCloser, size int64, name string) *Account { + // On big files add a buffer + if size > 10<<20 { + const memUsed = 16 * 1024 * 1024 + const bufSize = 128 * 1024 + const buffers = memUsed / bufSize + newIn, err := newAsyncReader(in, buffers, bufSize) + if err != nil { + ErrorLog(name, "Failed to make buffer: %v", err) + } else { + in = newIn + } + } + return NewAccountSizeName(in, size, name) +} + +// NewAccountWithBuffer makes a Account reader for an object +// +// If the file is above a certain size it adds an Async reader +func NewAccountWithBuffer(in io.ReadCloser, obj Object) *Account { + return NewAccountSizeNameWithBuffer(in, obj.Size(), obj.Remote()) +} + +// GetReader returns the underlying io.ReadCloser +func (acc *Account) GetReader() io.ReadCloser { + acc.mu.Lock() + defer acc.mu.Unlock() + return acc.in +} + +// UpdateReader updates the underlying io.ReadCloser +func (acc *Account) UpdateReader(in io.ReadCloser) { + acc.mu.Lock() + acc.in = in + acc.mu.Unlock() +} + // disableWholeFileAccounting turns off the whole file accounting func (acc *Account) disableWholeFileAccounting() { acc.mu.Lock() diff --git a/fs/operations.go b/fs/operations.go index 006dece92..16c479e33 100644 --- a/fs/operations.go +++ b/fs/operations.go @@ -264,7 +264,7 @@ func Copy(f Fs, dst Object, remote string, src Object) (err error) { if err != nil { err = errors.Wrap(err, "failed to open source object") } else { - in := NewAccount(in0, src) // account and buffer the transfer + in := NewAccountWithBuffer(in0, src) // account and buffer the transfer wrappedSrc := &overrideRemoteObject{Object: src, remote: remote} if doUpdate { @@ -1126,7 +1126,7 @@ func Cat(f Fs, w io.Writer) error { ErrorLog(o, "Failed to close: %v", err) } }() - inAccounted := NewAccount(in, o) // account and buffer the transfer + inAccounted := NewAccountWithBuffer(in, o) // account and buffer the transfer _, err = io.Copy(w, inAccounted) if err != nil { Stats.Error()