vfs: stop reading Dir members from outside dir.go

This commit is contained in:
Nick Craig-Wood 2020-04-14 18:14:24 +01:00
parent 238f26cc90
commit 19db0df639
4 changed files with 54 additions and 35 deletions

View File

@ -191,6 +191,26 @@ func (d *Dir) walk(fun func(*Dir)) {
fun(d) fun(d)
} }
// countActiveWriters returns the number of writers active in this
// directory and any subdirectories.
func (d *Dir) countActiveWriters() (writers int) {
d.walk(func(d *Dir) {
// NB d.mu is held by walk() here
fs.Debugf(d.path, "Looking for writers")
for leaf, item := range d.items {
fs.Debugf(leaf, "reading active writers")
if file, ok := item.(*File); ok {
n := file.activeWriters()
if n != 0 {
fs.Debugf(file, "active writers %d", n)
}
writers += n
}
}
})
return writers
}
// age returns the duration since the last time the directory contents // age returns the duration since the last time the directory contents
// was read and the content is cosidered stale. age will be 0 and // was read and the content is cosidered stale. age will be 0 and
// stale true if the last read time is empty. // stale true if the last read time is empty.
@ -698,9 +718,16 @@ func (d *Dir) Sync() error {
// VFS returns the instance of the VFS // VFS returns the instance of the VFS
func (d *Dir) VFS() *VFS { func (d *Dir) VFS() *VFS {
// No locking required
return d.vfs return d.vfs
} }
// Fs returns the Fs that the Dir is on
func (d *Dir) Fs() fs.Fs {
// No locking required
return d.f
}
// Truncate changes the size of the named file. // Truncate changes the size of the named file.
func (d *Dir) Truncate(size int64) error { func (d *Dir) Truncate(size int64) error {
return ENOSYS return ENOSYS

View File

@ -18,13 +18,19 @@ import (
// both have locks there is plenty of potential for deadlocks. In // both have locks there is plenty of potential for deadlocks. In
// order to mitigate this, we use the following conventions // order to mitigate this, we use the following conventions
// //
// File may read directly, without locking, from the read-only section // File may **only** call these methods from Dir with the File lock
// of the Dir object, eg vfs, and f members. File may **not** read any // held.
// other members directly.
// //
// File may **not** call Dir methods with the File lock held. This // Dir.Fs
// preserves total lock ordering and makes File subordinate to Dir as // Dir.VFS
// far as locking is concerned, preventing deadlocks. //
// (As these are read only and do not need to take the Dir mutex.)
//
// File may **not** call any other Dir methods with the File lock
// held. This preserves total lock ordering and makes File subordinate
// to Dir as far as locking is concerned, preventing deadlocks.
//
// File may **not** read any members of Dir directly.
// File represents a file // File represents a file
type File struct { type File struct {
@ -162,8 +168,8 @@ func (f *File) rename(ctx context.Context, destDir *Dir, newName string) error {
oldPendingRenameFun := f.pendingRenameFun oldPendingRenameFun := f.pendingRenameFun
f.mu.RUnlock() f.mu.RUnlock()
if features := d.f.Features(); features.Move == nil && features.Copy == nil { if features := d.Fs().Features(); features.Move == nil && features.Copy == nil {
err := errors.Errorf("Fs %q can't rename files (no server side Move or Copy)", d.f) err := errors.Errorf("Fs %q can't rename files (no server side Move or Copy)", d.Fs())
fs.Errorf(f.Path(), "Dir.Rename error: %v", err) fs.Errorf(f.Path(), "Dir.Rename error: %v", err)
return err return err
} }
@ -191,8 +197,8 @@ func (f *File) rename(ctx context.Context, destDir *Dir, newName string) error {
} }
// do the move of the remote object // do the move of the remote object
dstOverwritten, _ := d.f.NewObject(ctx, newPath) dstOverwritten, _ := d.Fs().NewObject(ctx, newPath)
newObject, err := operations.Move(ctx, d.f, dstOverwritten, newPath, o) newObject, err := operations.Move(ctx, d.Fs(), dstOverwritten, newPath, o)
if err != nil { if err != nil {
fs.Errorf(f.Path(), "File.Rename error: %v", err) fs.Errorf(f.Path(), "File.Rename error: %v", err)
return err return err
@ -611,7 +617,7 @@ func (f *File) VFS() *VFS {
func (f *File) Fs() fs.Fs { func (f *File) Fs() fs.Fs {
f.mu.RLock() f.mu.RLock()
defer f.mu.RUnlock() defer f.mu.RUnlock()
return f.d.f return f.d.Fs()
} }
// Open a file according to the flags provided // Open a file according to the flags provided

View File

@ -55,12 +55,12 @@ func newRWFileHandle(d *Dir, f *File, flags int) (fh *RWFileHandle, err error) {
} }
// mark the file as open in the cache - must be done before the mkdir // mark the file as open in the cache - must be done before the mkdir
fh.d.vfs.cache.open(fh.file.Path()) fh.d.VFS().cache.open(fh.file.Path())
// Make a place for the file // Make a place for the file
_, err = d.vfs.cache.mkdir(fh.file.Path()) _, err = d.VFS().cache.mkdir(fh.file.Path())
if err != nil { if err != nil {
fh.d.vfs.cache.close(fh.file.Path()) fh.d.VFS().cache.close(fh.file.Path())
return nil, errors.Wrap(err, "open RW handle failed to make cache directory") return nil, errors.Wrap(err, "open RW handle failed to make cache directory")
} }
@ -110,9 +110,9 @@ func (fh *RWFileHandle) openPending(truncate bool) (err error) {
// If the remote object exists AND its cached file exists locally AND there are no // If the remote object exists AND its cached file exists locally AND there are no
// other RW handles with it open, then attempt to update it. // other RW handles with it open, then attempt to update it.
if o != nil && fh.file.rwOpens() == 0 { if o != nil && fh.file.rwOpens() == 0 {
cacheObj, err := fh.d.vfs.cache.f.NewObject(context.TODO(), fh.file.Path()) cacheObj, err := fh.d.VFS().cache.f.NewObject(context.TODO(), fh.file.Path())
if err == nil && cacheObj != nil { if err == nil && cacheObj != nil {
_, err = copyObj(fh.d.vfs.cache.f, cacheObj, fh.file.Path(), o) _, err = copyObj(fh.d.VFS().cache.f, cacheObj, fh.file.Path(), o)
if err != nil { if err != nil {
return errors.Wrap(err, "open RW handle failed to update cached file") return errors.Wrap(err, "open RW handle failed to update cached file")
} }
@ -125,7 +125,7 @@ func (fh *RWFileHandle) openPending(truncate bool) (err error) {
// cache file does not exist, so need to fetch it if we have an object to fetch // cache file does not exist, so need to fetch it if we have an object to fetch
// it from // it from
if o != nil { if o != nil {
_, err = copyObj(fh.d.vfs.cache.f, nil, fh.file.Path(), o) _, err = copyObj(fh.d.VFS().cache.f, nil, fh.file.Path(), o)
if err != nil { if err != nil {
cause := errors.Cause(err) cause := errors.Cause(err)
if cause != fs.ErrorObjectNotFound && cause != fs.ErrorDirNotFound { if cause != fs.ErrorObjectNotFound && cause != fs.ErrorDirNotFound {
@ -277,7 +277,7 @@ func (fh *RWFileHandle) flushWrites(closeFile bool) error {
if isCopied { if isCopied {
// Transfer the temp file to the remote // Transfer the temp file to the remote
cacheObj, err := fh.d.vfs.cache.f.NewObject(context.TODO(), fh.file.Path()) cacheObj, err := fh.d.VFS().cache.f.NewObject(context.TODO(), fh.file.Path())
if err != nil { if err != nil {
err = errors.Wrap(err, "failed to find cache file") err = errors.Wrap(err, "failed to find cache file")
fs.Errorf(fh.logPrefix(), "%v", err) fs.Errorf(fh.logPrefix(), "%v", err)
@ -289,7 +289,7 @@ func (fh *RWFileHandle) flushWrites(closeFile bool) error {
if objOld != nil { if objOld != nil {
objPath = objOld.Remote() // use the path of the actual object if available objPath = objOld.Remote() // use the path of the actual object if available
} }
o, err := copyObj(fh.d.vfs.f, objOld, objPath, cacheObj) o, err := copyObj(fh.d.VFS().f, objOld, objPath, cacheObj)
if err != nil { if err != nil {
err = errors.Wrap(err, "failed to transfer file from cache to remote") err = errors.Wrap(err, "failed to transfer file from cache to remote")
fs.Errorf(fh.logPrefix(), "%v", err) fs.Errorf(fh.logPrefix(), "%v", err)
@ -322,7 +322,7 @@ func (fh *RWFileHandle) close() (err error) {
if fh.opened { if fh.opened {
fh.file.delRWOpen() fh.file.delRWOpen()
} }
fh.d.vfs.cache.close(fh.file.Path()) fh.d.VFS().cache.close(fh.file.Path())
}() }()
return fh.flushWrites(true) return fh.flushWrites(true)

View File

@ -305,21 +305,7 @@ func (vfs *VFS) WaitForWriters(timeout time.Duration) {
defer tick.Stop() defer tick.Stop()
tick.Stop() tick.Stop()
for { for {
writers := 0 writers := vfs.root.countActiveWriters()
vfs.root.walk(func(d *Dir) {
fs.Debugf(d.path, "Looking for writers")
// NB d.mu is held by walk() here
for leaf, item := range d.items {
fs.Debugf(leaf, "reading active writers")
if file, ok := item.(*File); ok {
n := file.activeWriters()
if n != 0 {
fs.Debugf(file, "active writers %d", n)
}
writers += n
}
}
})
if writers == 0 { if writers == 0 {
return return
} }