From 7fb53a031cbfe5e74790e626f18e69756ec99e48 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Thu, 1 Mar 2018 15:50:23 +0000 Subject: [PATCH] vfs: don't cache the object in read and read/write handles This should help with inconsistent reads when the source object changes. --- vfs/file.go | 11 +++++++-- vfs/read.go | 60 +++++++++++++++++++++++++---------------------- vfs/read_write.go | 16 ++++++------- 3 files changed, 49 insertions(+), 38 deletions(-) diff --git a/vfs/file.go b/vfs/file.go index e44a0e512..8765741fb 100644 --- a/vfs/file.go +++ b/vfs/file.go @@ -252,6 +252,13 @@ func (f *File) setObject(o fs.Object) { f.d.addObject(f) } +// Get the current fs.Object - may be nil +func (f *File) getObject() fs.Object { + f.mu.Lock() + defer f.mu.Unlock() + return f.o +} + // exists returns whether the file exists already func (f *File) exists() bool { f.mu.Lock() @@ -284,13 +291,13 @@ func (f *File) waitForValidObject() (o fs.Object, err error) { // openRead open the file for read func (f *File) openRead() (fh *ReadFileHandle, err error) { // if o is nil it isn't valid yet - o, err := f.waitForValidObject() + _, err = f.waitForValidObject() if err != nil { return nil, err } // fs.Debugf(o, "File.openRead") - fh, err = newReadFileHandle(f, o) + fh, err = newReadFileHandle(f) if err != nil { fs.Errorf(f, "File.openRead failed: %v", err) return nil, err diff --git a/vfs/read.go b/vfs/read.go index e3d830a9b..1e6faf80b 100644 --- a/vfs/read.go +++ b/vfs/read.go @@ -17,7 +17,6 @@ type ReadFileHandle struct { mu sync.Mutex closed bool // set if handle has been closed r *accounting.Account - o fs.Object readCalled bool // set if read has been called size int64 // size of the object offset int64 // offset of read of o @@ -26,6 +25,7 @@ type ReadFileHandle struct { file *File hash *hash.MultiHasher opened bool + remote string } // Check interfaces @@ -36,9 +36,10 @@ var ( _ io.Closer = (*ReadFileHandle)(nil) ) -func newReadFileHandle(f *File, o fs.Object) (*ReadFileHandle, error) { +func newReadFileHandle(f *File) (*ReadFileHandle, error) { var mhash *hash.MultiHasher var err error + o := f.getObject() if !f.d.vfs.Opt.NoChecksum { mhash, err = hash.NewMultiHasherTypes(o.Fs().Hashes()) if err != nil { @@ -47,7 +48,7 @@ func newReadFileHandle(f *File, o fs.Object) (*ReadFileHandle, error) { } fh := &ReadFileHandle{ - o: o, + remote: o.Remote(), noSeek: f.d.vfs.Opt.NoSeek, file: f, hash: mhash, @@ -62,13 +63,14 @@ func (fh *ReadFileHandle) openPending() (err error) { if fh.opened { return nil } - r, err := fh.o.Open() + o := fh.file.getObject() + r, err := o.Open() if err != nil { return err } - fh.r = accounting.NewAccount(r, fh.o).WithBuffer() // account the transfer + fh.r = accounting.NewAccount(r, o).WithBuffer() // account the transfer fh.opened = true - accounting.Stats.Transferring(fh.o.Remote()) + accounting.Stats.Transferring(o.Remote()) return nil } @@ -107,23 +109,24 @@ func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) { r := oldReader // Can we seek it directly? if do, ok := oldReader.(io.Seeker); !reopen && ok { - fs.Debugf(fh.o, "ReadFileHandle.seek from %d to %d (io.Seeker)", fh.offset, offset) + fs.Debugf(fh.remote, "ReadFileHandle.seek from %d to %d (io.Seeker)", fh.offset, offset) _, err = do.Seek(offset, 0) if err != nil { - fs.Debugf(fh.o, "ReadFileHandle.Read io.Seeker failed: %v", err) + fs.Debugf(fh.remote, "ReadFileHandle.Read io.Seeker failed: %v", err) return err } } else { - fs.Debugf(fh.o, "ReadFileHandle.seek from %d to %d", fh.offset, offset) + fs.Debugf(fh.remote, "ReadFileHandle.seek from %d to %d", fh.offset, offset) // close old one err = oldReader.Close() if err != nil { - fs.Debugf(fh.o, "ReadFileHandle.Read seek close old failed: %v", err) + fs.Debugf(fh.remote, "ReadFileHandle.Read seek close old failed: %v", err) } // re-open with a seek - r, err = fh.o.Open(&fs.SeekOption{Offset: offset}) + o := fh.file.getObject() + r, err = o.Open(&fs.SeekOption{Offset: offset}) if err != nil { - fs.Debugf(fh.o, "ReadFileHandle.Read seek failed: %v", err) + fs.Debugf(fh.remote, "ReadFileHandle.Read seek failed: %v", err) return err } } @@ -187,9 +190,9 @@ func (fh *ReadFileHandle) readAt(p []byte, off int64) (n int, err error) { if err != nil { return 0, err } - // fs.Debugf(fh.o, "ReadFileHandle.Read size %d offset %d", reqSize, off) + // fs.Debugf(fh.remote, "ReadFileHandle.Read size %d offset %d", reqSize, off) if fh.closed { - fs.Errorf(fh.o, "ReadFileHandle.Read error: %v", EBADF) + fs.Errorf(fh.remote, "ReadFileHandle.Read error: %v", EBADF) return 0, ECLOSED } doSeek := off != fh.offset @@ -206,7 +209,7 @@ func (fh *ReadFileHandle) readAt(p []byte, off int64) (n int, err error) { // file - if so just return EOF leaving the underlying // file in an unchanged state. if off >= fh.size { - fs.Debugf(fh.o, "ReadFileHandle.Read attempt to read beyond end of file: %d > %d", off, fh.size) + fs.Debugf(fh.remote, "ReadFileHandle.Read attempt to read beyond end of file: %d > %d", off, fh.size) return 0, io.EOF } // Otherwise do the seek @@ -235,20 +238,20 @@ func (fh *ReadFileHandle) readAt(p []byte, off int64) (n int, err error) { break } retries++ - fs.Errorf(fh.o, "ReadFileHandle.Read error: low level retry %d/%d: %v", retries, fs.Config.LowLevelRetries, err) + fs.Errorf(fh.remote, "ReadFileHandle.Read error: low level retry %d/%d: %v", retries, fs.Config.LowLevelRetries, err) doSeek = true doReopen = true } if err != nil { - fs.Errorf(fh.o, "ReadFileHandle.Read error: %v", err) + fs.Errorf(fh.remote, "ReadFileHandle.Read error: %v", err) } else { fh.offset = newOffset - // fs.Debugf(fh.o, "ReadFileHandle.Read OK") + // fs.Debugf(fh.remote, "ReadFileHandle.Read OK") if fh.hash != nil { _, err = fh.hash.Write(p[:n]) if err != nil { - fs.Errorf(fh.o, "ReadFileHandle.Read HashError: %v", err) + fs.Errorf(fh.remote, "ReadFileHandle.Read HashError: %v", err) return 0, err } } @@ -266,8 +269,9 @@ func (fh *ReadFileHandle) checkHash() error { return nil } + o := fh.file.getObject() for hashType, dstSum := range fh.hash.Sums() { - srcSum, err := fh.o.Hash(hashType) + srcSum, err := o.Hash(hashType) if err != nil { return err } @@ -324,7 +328,7 @@ func (fh *ReadFileHandle) close() error { fh.closed = true if fh.opened { - accounting.Stats.DoneTransferring(fh.o.Remote(), true) + accounting.Stats.DoneTransferring(fh.remote, true) // Close first so that we have hashes err := fh.r.Close() if err != nil { @@ -355,14 +359,14 @@ func (fh *ReadFileHandle) Flush() error { if !fh.opened { return nil } - // fs.Debugf(fh.o, "ReadFileHandle.Flush") + // fs.Debugf(fh.remote, "ReadFileHandle.Flush") if err := fh.checkHash(); err != nil { - fs.Errorf(fh.o, "ReadFileHandle.Flush error: %v", err) + fs.Errorf(fh.remote, "ReadFileHandle.Flush error: %v", err) return err } - // fs.Debugf(fh.o, "ReadFileHandle.Flush OK") + // fs.Debugf(fh.remote, "ReadFileHandle.Flush OK") return nil } @@ -377,15 +381,15 @@ func (fh *ReadFileHandle) Release() error { return nil } if fh.closed { - fs.Debugf(fh.o, "ReadFileHandle.Release nothing to do") + fs.Debugf(fh.remote, "ReadFileHandle.Release nothing to do") return nil } - fs.Debugf(fh.o, "ReadFileHandle.Release closing") + fs.Debugf(fh.remote, "ReadFileHandle.Release closing") err := fh.close() if err != nil { - fs.Errorf(fh.o, "ReadFileHandle.Release error: %v", err) + fs.Errorf(fh.remote, "ReadFileHandle.Release error: %v", err) } else { - // fs.Debugf(fh.o, "ReadFileHandle.Release OK") + // fs.Debugf(fh.remote, "ReadFileHandle.Release OK") } return err } diff --git a/vfs/read_write.go b/vfs/read_write.go index 5c02f8282..5243220fc 100644 --- a/vfs/read_write.go +++ b/vfs/read_write.go @@ -22,8 +22,7 @@ import ( type RWFileHandle struct { *os.File mu sync.Mutex - closed bool // set if handle has been closed - o fs.Object // may be nil + closed bool // set if handle has been closed remote string file *File d *Dir @@ -51,7 +50,6 @@ func newRWFileHandle(d *Dir, f *File, remote string, flags int) (fh *RWFileHandl } fh = &RWFileHandle{ - o: f.o, file: f, d: d, remote: remote, @@ -107,16 +105,18 @@ func (fh *RWFileHandle) openPending(truncate bool) (err error) { fh.file.muOpen.Lock() defer fh.file.muOpen.Unlock() + o := fh.file.getObject() + var fd *os.File cacheFileOpenFlags := fh.flags // if not truncating the file, need to read it first if fh.flags&os.O_TRUNC == 0 && !truncate { // If the remote object exists AND its cached file exists locally AND there are no // other handles with it open writers, then attempt to update it. - if fh.o != nil && fh.d.vfs.cache.opens(fh.remote) <= 1 { + if o != nil && fh.d.vfs.cache.opens(fh.remote) <= 1 { cacheObj, err := fh.d.vfs.cache.f.NewObject(fh.remote) if err == nil && cacheObj != nil { - cacheObj, err = copyObj(fh.d.vfs.cache.f, cacheObj, fh.remote, fh.o) + cacheObj, err = copyObj(fh.d.vfs.cache.f, cacheObj, fh.remote, o) if err != nil { return errors.Wrap(err, "open RW handle failed to update cached file") } @@ -128,8 +128,8 @@ func (fh *RWFileHandle) openPending(truncate bool) (err error) { if os.IsNotExist(err) { // cache file does not exist, so need to fetch it if we have an object to fetch // it from - if fh.o != nil { - _, err = copyObj(fh.d.vfs.cache.f, nil, fh.remote, fh.o) + if o != nil { + _, err = copyObj(fh.d.vfs.cache.f, nil, fh.remote, o) if err != nil { cause := errors.Cause(err) if cause != fs.ErrorObjectNotFound && cause != fs.ErrorDirNotFound { @@ -296,7 +296,7 @@ func (fh *RWFileHandle) close() (err error) { return err } - o, err := copyObj(fh.d.vfs.f, fh.o, fh.remote, cacheObj) + o, err := copyObj(fh.d.vfs.f, fh.file.getObject(), fh.remote, cacheObj) if err != nil { err = errors.Wrap(err, "failed to transfer file from cache to remote") fs.Errorf(fh.logPrefix(), "%v", err)