mirror of
https://github.com/rclone/rclone.git
synced 2024-11-22 16:34:30 +01:00
mount: this removes the async buffering as it was killing seek performance
This commit is contained in:
parent
442861581a
commit
0117aeafbf
@ -16,7 +16,7 @@ import (
|
|||||||
type ReadFileHandle struct {
|
type ReadFileHandle struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
closed bool // set if handle has been closed
|
closed bool // set if handle has been closed
|
||||||
r io.ReadCloser
|
r *fs.Account
|
||||||
o fs.Object
|
o fs.Object
|
||||||
readCalled bool // set if read has been called
|
readCalled bool // set if read has been called
|
||||||
offset int64
|
offset int64
|
||||||
@ -29,7 +29,7 @@ func newReadFileHandle(o fs.Object) (*ReadFileHandle, error) {
|
|||||||
}
|
}
|
||||||
fh := &ReadFileHandle{
|
fh := &ReadFileHandle{
|
||||||
o: o,
|
o: o,
|
||||||
r: fs.NewAccount(r, o), // account and buffer the transfer
|
r: fs.NewAccount(r, o), // account the transfer
|
||||||
}
|
}
|
||||||
fs.Stats.Transferring(fh.o.Remote())
|
fs.Stats.Transferring(fh.o.Remote())
|
||||||
return fh, nil
|
return fh, nil
|
||||||
@ -46,7 +46,8 @@ var _ fusefs.HandleReader = (*ReadFileHandle)(nil)
|
|||||||
// Must be called with fh.mu held
|
// Must be called with fh.mu held
|
||||||
func (fh *ReadFileHandle) seek(offset int64) error {
|
func (fh *ReadFileHandle) seek(offset int64) error {
|
||||||
// Can we seek it directly?
|
// Can we seek it directly?
|
||||||
if do, ok := fh.r.(io.Seeker); ok {
|
oldReader := fh.r.GetReader()
|
||||||
|
if do, ok := oldReader.(io.Seeker); ok {
|
||||||
fs.Debug(fh.o, "ReadFileHandle.seek from %d to %d (io.Seeker)", fh.offset, offset)
|
fs.Debug(fh.o, "ReadFileHandle.seek from %d to %d (io.Seeker)", fh.offset, offset)
|
||||||
_, err := do.Seek(offset, 0)
|
_, err := do.Seek(offset, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -61,11 +62,12 @@ func (fh *ReadFileHandle) seek(offset int64) error {
|
|||||||
fs.Debug(fh.o, "ReadFileHandle.Read seek failed: %v", err)
|
fs.Debug(fh.o, "ReadFileHandle.Read seek failed: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = fh.r.Close()
|
err = oldReader.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fs.Debug(fh.o, "ReadFileHandle.Read seek close old failed: %v", err)
|
fs.Debug(fh.o, "ReadFileHandle.Read seek close old failed: %v", err)
|
||||||
}
|
}
|
||||||
fh.r = fs.NewAccount(r, fh.o) // account and buffer the transfer
|
// fh.r = fs.NewAccount(r, fh.o) // account the transfer
|
||||||
|
fh.r.UpdateReader(r)
|
||||||
}
|
}
|
||||||
fh.offset = offset
|
fh.offset = offset
|
||||||
return nil
|
return nil
|
||||||
|
@ -38,7 +38,7 @@ func newWriteFileHandle(d *Dir, f *File, src fs.ObjectInfo) (*WriteFileHandle, e
|
|||||||
file: f,
|
file: f,
|
||||||
}
|
}
|
||||||
fh.pipeReader, fh.pipeWriter = io.Pipe()
|
fh.pipeReader, fh.pipeWriter = io.Pipe()
|
||||||
r := fs.NewAccountSizeName(fh.pipeReader, 0, src.Remote()) // account and buffer the transfer
|
r := fs.NewAccountSizeName(fh.pipeReader, 0, src.Remote()) // account the transfer
|
||||||
go func() {
|
go func() {
|
||||||
o, err := d.f.Put(r, src)
|
o, err := d.f.Put(r, src)
|
||||||
fh.o = o
|
fh.o = o
|
||||||
|
@ -278,22 +278,7 @@ type Account struct {
|
|||||||
|
|
||||||
// NewAccountSizeName makes a Account reader for an io.ReadCloser of
|
// NewAccountSizeName makes a Account reader for an io.ReadCloser of
|
||||||
// the given size and name
|
// the given size and name
|
||||||
//
|
|
||||||
// If the file is above a certain size it adds an Async reader
|
|
||||||
func NewAccountSizeName(in io.ReadCloser, size int64, name string) *Account {
|
func NewAccountSizeName(in io.ReadCloser, size int64, name string) *Account {
|
||||||
// On big files add a buffer
|
|
||||||
if size > 10<<20 {
|
|
||||||
const memUsed = 16 * 1024 * 1024
|
|
||||||
const bufSize = 128 * 1024
|
|
||||||
const buffers = memUsed / bufSize
|
|
||||||
newIn, err := newAsyncReader(in, buffers, bufSize)
|
|
||||||
if err != nil {
|
|
||||||
ErrorLog(name, "Failed to make buffer: %v", err)
|
|
||||||
} else {
|
|
||||||
in = newIn
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
acc := &Account{
|
acc := &Account{
|
||||||
in: in,
|
in: in,
|
||||||
size: size,
|
size: size,
|
||||||
@ -312,6 +297,47 @@ func NewAccount(in io.ReadCloser, obj Object) *Account {
|
|||||||
return NewAccountSizeName(in, obj.Size(), obj.Remote())
|
return NewAccountSizeName(in, obj.Size(), obj.Remote())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewAccountSizeName makes a Account reader for an io.ReadCloser of
|
||||||
|
// the given size and name
|
||||||
|
//
|
||||||
|
// If the file is above a certain size it adds an Async reader
|
||||||
|
func NewAccountSizeNameWithBuffer(in io.ReadCloser, size int64, name string) *Account {
|
||||||
|
// On big files add a buffer
|
||||||
|
if size > 10<<20 {
|
||||||
|
const memUsed = 16 * 1024 * 1024
|
||||||
|
const bufSize = 128 * 1024
|
||||||
|
const buffers = memUsed / bufSize
|
||||||
|
newIn, err := newAsyncReader(in, buffers, bufSize)
|
||||||
|
if err != nil {
|
||||||
|
ErrorLog(name, "Failed to make buffer: %v", err)
|
||||||
|
} else {
|
||||||
|
in = newIn
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return NewAccountSizeName(in, size, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewAccountWithBuffer makes a Account reader for an object
|
||||||
|
//
|
||||||
|
// If the file is above a certain size it adds an Async reader
|
||||||
|
func NewAccountWithBuffer(in io.ReadCloser, obj Object) *Account {
|
||||||
|
return NewAccountSizeNameWithBuffer(in, obj.Size(), obj.Remote())
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetReader returns the underlying io.ReadCloser
|
||||||
|
func (acc *Account) GetReader() io.ReadCloser {
|
||||||
|
acc.mu.Lock()
|
||||||
|
defer acc.mu.Unlock()
|
||||||
|
return acc.in
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateReader updates the underlying io.ReadCloser
|
||||||
|
func (acc *Account) UpdateReader(in io.ReadCloser) {
|
||||||
|
acc.mu.Lock()
|
||||||
|
acc.in = in
|
||||||
|
acc.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
// disableWholeFileAccounting turns off the whole file accounting
|
// disableWholeFileAccounting turns off the whole file accounting
|
||||||
func (acc *Account) disableWholeFileAccounting() {
|
func (acc *Account) disableWholeFileAccounting() {
|
||||||
acc.mu.Lock()
|
acc.mu.Lock()
|
||||||
|
@ -264,7 +264,7 @@ func Copy(f Fs, dst Object, remote string, src Object) (err error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
err = errors.Wrap(err, "failed to open source object")
|
err = errors.Wrap(err, "failed to open source object")
|
||||||
} else {
|
} else {
|
||||||
in := NewAccount(in0, src) // account and buffer the transfer
|
in := NewAccountWithBuffer(in0, src) // account and buffer the transfer
|
||||||
|
|
||||||
wrappedSrc := &overrideRemoteObject{Object: src, remote: remote}
|
wrappedSrc := &overrideRemoteObject{Object: src, remote: remote}
|
||||||
if doUpdate {
|
if doUpdate {
|
||||||
@ -1126,7 +1126,7 @@ func Cat(f Fs, w io.Writer) error {
|
|||||||
ErrorLog(o, "Failed to close: %v", err)
|
ErrorLog(o, "Failed to close: %v", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
inAccounted := NewAccount(in, o) // account and buffer the transfer
|
inAccounted := NewAccountWithBuffer(in, o) // account and buffer the transfer
|
||||||
_, err = io.Copy(w, inAccounted)
|
_, err = io.Copy(w, inAccounted)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
Stats.Error()
|
Stats.Error()
|
||||||
|
Loading…
Reference in New Issue
Block a user