Refactor Account interface

This commit is contained in:
Nick Craig-Wood 2017-02-17 09:15:24 +00:00
parent ac62ef430d
commit 033d1eb7af
4 changed files with 16 additions and 36 deletions

View File

@ -29,7 +29,7 @@ func newReadFileHandle(o fs.Object) (*ReadFileHandle, error) {
} }
fh := &ReadFileHandle{ fh := &ReadFileHandle{
o: o, o: o,
r: fs.NewAccountWithBuffer(r, o), // account the transfer r: fs.NewAccount(r, o).WithBuffer(), // account the transfer
} }
fs.Stats.Transferring(fh.o.Remote()) fs.Stats.Transferring(fh.o.Remote())
return fh, nil return fh, nil

View File

@ -38,7 +38,7 @@ func newWriteFileHandle(d *Dir, f *File, src fs.ObjectInfo) (*WriteFileHandle, e
file: f, file: f,
} }
fh.pipeReader, fh.pipeWriter = io.Pipe() fh.pipeReader, fh.pipeWriter = io.Pipe()
r := fs.NewAccountSizeNameWithBuffer(fh.pipeReader, 0, src.Remote()) // account the transfer r := fs.NewAccountSizeName(fh.pipeReader, 0, src.Remote()).WithBuffer() // account the transfer
go func() { go func() {
o, err := d.f.Put(r, src) o, err := d.f.Put(r, src)
fh.o = o fh.o = o

View File

@ -346,44 +346,27 @@ func NewAccount(in io.ReadCloser, obj Object) *Account {
return NewAccountSizeName(in, obj.Size(), obj.Remote()) return NewAccountSizeName(in, obj.Size(), obj.Remote())
} }
// bufferReadCloser returns a buffered version of in if necessary // WithBuffer - If the file is above a certain size it adds an Async reader
func bufferReadCloser(in io.ReadCloser, size int64, name string) io.ReadCloser { func (acc *Account) WithBuffer() *Account {
acc.withBuf = true
var buffers int var buffers int
if size >= int64(Config.BufferSize) { if acc.size >= int64(Config.BufferSize) {
buffers = int(int64(Config.BufferSize) / asyncBufferSize) buffers = int(int64(Config.BufferSize) / asyncBufferSize)
} else { } else {
buffers = int(size / asyncBufferSize) buffers = int(acc.size / asyncBufferSize)
} }
// On big files add a buffer // On big files add a buffer
if buffers > 0 { if buffers > 0 {
newIn, err := newAsyncReader(in, buffers) in, err := newAsyncReader(acc.in, buffers)
if err != nil { if err != nil {
Errorf(name, "Failed to make buffer: %v", err) Errorf(acc.name, "Failed to make buffer: %v", err)
} else { } else {
in = newIn acc.in = in
} }
} }
return in
}
// NewAccountSizeNameWithBuffer makes a Account reader for an io.ReadCloser of
// the given size and name
//
// If the file is above a certain size it adds an Async reader
func NewAccountSizeNameWithBuffer(in io.ReadCloser, size int64, name string) *Account {
acc := NewAccountSizeName(in, size, name)
acc.in = bufferReadCloser(in, size, name)
acc.withBuf = true
return acc return acc
} }
// NewAccountWithBuffer makes a Account reader for an object
//
// If the file is above a certain size it adds an Async reader
func NewAccountWithBuffer(in io.ReadCloser, obj Object) *Account {
return NewAccountSizeNameWithBuffer(in, obj.Size(), obj.Remote())
}
// GetReader returns the underlying io.ReadCloser // GetReader returns the underlying io.ReadCloser
func (acc *Account) GetReader() io.ReadCloser { func (acc *Account) GetReader() io.ReadCloser {
acc.mu.Lock() acc.mu.Lock()
@ -402,12 +385,9 @@ func (acc *Account) StopBuffering() {
func (acc *Account) UpdateReader(in io.ReadCloser) { func (acc *Account) UpdateReader(in io.ReadCloser) {
acc.mu.Lock() acc.mu.Lock()
acc.StopBuffering() acc.StopBuffering()
acc.origIn = in
if acc.withBuf {
acc.in = bufferReadCloser(in, acc.size, acc.name)
} else {
acc.in = in acc.in = in
} acc.origIn = in
acc.WithBuffer()
acc.mu.Unlock() acc.mu.Unlock()
} }

View File

@ -273,7 +273,7 @@ func Copy(f Fs, dst Object, remote string, src Object) (err error) {
if err != nil { if err != nil {
err = errors.Wrap(err, "failed to open source object") err = errors.Wrap(err, "failed to open source object")
} else { } else {
in := NewAccountWithBuffer(in0, src) // account and buffer the transfer in := NewAccount(in0, src).WithBuffer() // account and buffer the transfer
wrappedSrc := &overrideRemoteObject{Object: src, remote: remote} wrappedSrc := &overrideRemoteObject{Object: src, remote: remote}
if doUpdate { if doUpdate {
@ -843,14 +843,14 @@ func CheckIdentical(dst, src Object) (differ bool, err error) {
if err != nil { if err != nil {
return true, errors.Wrapf(err, "failed to open %q", dst) return true, errors.Wrapf(err, "failed to open %q", dst)
} }
in1 = NewAccountWithBuffer(in1, dst) // account and buffer the transfer in1 = NewAccount(in1, dst).WithBuffer() // account and buffer the transfer
defer CheckClose(in1, &err) defer CheckClose(in1, &err)
in2, err := src.Open() in2, err := src.Open()
if err != nil { if err != nil {
return true, errors.Wrapf(err, "failed to open %q", src) return true, errors.Wrapf(err, "failed to open %q", src)
} }
in2 = NewAccountWithBuffer(in2, src) // account and buffer the transfer in2 = NewAccount(in2, src).WithBuffer() // account and buffer the transfer
defer CheckClose(in2, &err) defer CheckClose(in2, &err)
return CheckEqualReaders(in1, in2) return CheckEqualReaders(in1, in2)
@ -1388,7 +1388,7 @@ func Cat(f Fs, w io.Writer, offset, count int64) error {
size = count size = count
} }
} }
in = NewAccountSizeNameWithBuffer(in, size, o.Remote()) // account and buffer the transfer in = NewAccountSizeName(in, size, o.Remote()).WithBuffer() // account and buffer the transfer
defer func() { defer func() {
err = in.Close() err = in.Close()
if err != nil { if err != nil {