vfs: move writeback of dirty data out of close() method into its own method (FlushWrites) and remove close() call from Flush()

If a file handle is duplicated with dup() and the duplicate handle is
flushed, rclone will go ahead and close the file, making the original
file handle stale. This change removes the close() call from Flush() and
replaces it with FlushWrites() so that the file only gets closed when
Release() is called. The new FlushWrites method takes care of actually
writing the file back to the underlying storage.

Fixes #3381
This commit is contained in:
Brett Dutro 2019-10-06 15:05:21 -05:00 committed by Nick Craig-Wood
parent 0cac9d9bd0
commit 7d0d7e66ca
7 changed files with 131 additions and 34 deletions

View File

@ -77,6 +77,7 @@ func RunTests(t *testing.T, fn MountFn) {
t.Run("TestWriteFileOverwrite", TestWriteFileOverwrite) t.Run("TestWriteFileOverwrite", TestWriteFileOverwrite)
t.Run("TestWriteFileDoubleClose", TestWriteFileDoubleClose) t.Run("TestWriteFileDoubleClose", TestWriteFileDoubleClose)
t.Run("TestWriteFileFsync", TestWriteFileFsync) t.Run("TestWriteFileFsync", TestWriteFileFsync)
t.Run("TestWriteFileDup", TestWriteFileDup)
}) })
log.Printf("Finished test run with cache mode %v (ok=%v)", cacheMode, ok) log.Printf("Finished test run with cache mode %v (ok=%v)", cacheMode, ok)
if !ok { if !ok {

View File

@ -1,10 +1,13 @@
package mounttest package mounttest
import ( import (
"os"
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/rclone/rclone/vfs"
) )
// TestWriteFileNoWrite tests writing a file with no write()'s to it // TestWriteFileNoWrite tests writing a file with no write()'s to it
@ -82,3 +85,48 @@ func TestWriteFileFsync(t *testing.T) {
run.waitForWriters() run.waitForWriters()
run.rm(t, "to be synced") run.rm(t, "to be synced")
} }
// TestWriteFileDup tests behavior of mmap() in Python by using dup() on a file handle
func TestWriteFileDup(t *testing.T) {
run.skipIfNoFUSE(t)
if run.vfs.Opt.CacheMode < vfs.CacheModeWrites {
t.Skip("not supported on vfs-cache-mode < writes")
return
}
filepath := run.path("to be synced")
fh, err := osCreate(filepath)
require.NoError(t, err)
testData := []byte("0123456789")
err = fh.Truncate(int64(len(testData) + 2))
require.NoError(t, err)
err = fh.Sync()
require.NoError(t, err)
var dupFd uintptr
dupFd, err = writeTestDup(fh.Fd())
require.NoError(t, err)
dupFile := os.NewFile(dupFd, fh.Name())
_, err = dupFile.Write(testData)
require.NoError(t, err)
err = dupFile.Close()
require.NoError(t, err)
_, err = fh.Seek(int64(len(testData)), 0)
require.NoError(t, err)
_, err = fh.Write([]byte("10"))
require.NoError(t, err)
err = fh.Close()
require.NoError(t, err)
run.waitForWriters()
run.rm(t, "to be synced")
}

View File

@ -5,9 +5,21 @@ package mounttest
import ( import (
"runtime" "runtime"
"testing" "testing"
"golang.org/x/sys/windows"
) )
// TestWriteFileDoubleClose tests double close on write // TestWriteFileDoubleClose tests double close on write
func TestWriteFileDoubleClose(t *testing.T) { func TestWriteFileDoubleClose(t *testing.T) {
t.Skip("not supported on " + runtime.GOOS) t.Skip("not supported on " + runtime.GOOS)
} }
// writeTestDup performs the platform-specific implementation of the dup() syscall
func writeTestDup(oldfd uintptr) (uintptr, error) {
p, err := windows.GetCurrentProcess()
if err != nil {
return 0, err
}
var h windows.Handle
return uintptr(h), windows.DuplicateHandle(p, windows.Handle(oldfd), p, &h, 0, true, windows.DUPLICATE_SAME_ACCESS)
}

View File

@ -4,10 +4,12 @@ package mounttest
import ( import (
"runtime" "runtime"
"syscall"
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"golang.org/x/sys/unix"
"github.com/rclone/rclone/vfs"
) )
// TestWriteFileDoubleClose tests double close on write // TestWriteFileDoubleClose tests double close on write
@ -21,14 +23,14 @@ func TestWriteFileDoubleClose(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
fd := out.Fd() fd := out.Fd()
fd1, err := syscall.Dup(int(fd)) fd1, err := unix.Dup(int(fd))
assert.NoError(t, err) assert.NoError(t, err)
fd2, err := syscall.Dup(int(fd)) fd2, err := unix.Dup(int(fd))
assert.NoError(t, err) assert.NoError(t, err)
// close one of the dups - should produce no error // close one of the dups - should produce no error
err = syscall.Close(fd1) err = unix.Close(fd1)
assert.NoError(t, err) assert.NoError(t, err)
// write to the file // write to the file
@ -41,14 +43,26 @@ func TestWriteFileDoubleClose(t *testing.T) {
err = out.Close() err = out.Close()
assert.NoError(t, err) assert.NoError(t, err)
// write to the other dup - should produce an error // write to the other dup
_, err = syscall.Write(fd2, buf) _, err = unix.Write(fd2, buf)
if run.vfs.Opt.CacheMode < vfs.CacheModeWrites {
// produces an error if cache mode < writes
assert.Error(t, err, "input/output error") assert.Error(t, err, "input/output error")
} else {
// otherwise does not produce an error
assert.NoError(t, err)
}
// close the dup - should not produce an error // close the dup - should not produce an error
err = syscall.Close(fd2) err = unix.Close(fd2)
assert.NoError(t, err) assert.NoError(t, err)
run.waitForWriters() run.waitForWriters()
run.rm(t, "testdoubleclose") run.rm(t, "testdoubleclose")
} }
// writeTestDup performs the platform-specific implementation of the dup() unix
func writeTestDup(oldfd uintptr) (uintptr, error) {
newfd, err := unix.Dup(int(oldfd))
return uintptr(newfd), err
}

View File

@ -230,28 +230,14 @@ func (fh *RWFileHandle) modified() bool {
return true return true
} }
// close the file handle returning EBADF if it has been // flushWrites flushes any pending writes to cloud storage
// closed already.
// //
// Must be called with fh.mu held // Must be called with fh.muRW held
// func (fh *RWFileHandle) flushWrites(closeFile bool) error {
// Note that we leave the file around in the cache on error conditions if fh.closed && !closeFile {
// to give the user a chance to recover it. return nil
func (fh *RWFileHandle) close() (err error) { }
defer log.Trace(fh.logPrefix(), "")("err=%v", &err)
fh.file.muRW.Lock()
defer fh.file.muRW.Unlock()
if fh.closed {
return ECLOSED
}
fh.closed = true
defer func() {
if fh.opened {
fh.file.delRWOpen()
}
fh.d.vfs.cache.close(fh.remote)
}()
rdwrMode := fh.flags & accessModeMask rdwrMode := fh.flags & accessModeMask
writer := rdwrMode != os.O_RDONLY writer := rdwrMode != os.O_RDONLY
@ -284,8 +270,8 @@ func (fh *RWFileHandle) close() (err error) {
} }
// Close the underlying file // Close the underlying file
if fh.opened { if fh.opened && closeFile {
err = fh.File.Close() err := fh.File.Close()
if err != nil { if err != nil {
err = errors.Wrap(err, "failed to close cache file") err = errors.Wrap(err, "failed to close cache file")
return err return err
@ -314,6 +300,32 @@ func (fh *RWFileHandle) close() (err error) {
return nil return nil
} }
// close the file handle returning EBADF if it has been
// closed already.
//
// Must be called with fh.mu held
//
// Note that we leave the file around in the cache on error conditions
// to give the user a chance to recover it.
func (fh *RWFileHandle) close() (err error) {
defer log.Trace(fh.logPrefix(), "")("err=%v", &err)
fh.file.muRW.Lock()
defer fh.file.muRW.Unlock()
if fh.closed {
return ECLOSED
}
fh.closed = true
defer func() {
if fh.opened {
fh.file.delRWOpen()
}
fh.d.vfs.cache.close(fh.remote)
}()
return fh.flushWrites(true)
}
// Close closes the file // Close closes the file
func (fh *RWFileHandle) Close() error { func (fh *RWFileHandle) Close() error {
fh.mu.Lock() fh.mu.Lock()
@ -346,7 +358,11 @@ func (fh *RWFileHandle) Flush() error {
fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush ignoring flush on unwritten handle") fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush ignoring flush on unwritten handle")
return nil return nil
} }
err := fh.close()
fh.file.muRW.Lock()
defer fh.file.muRW.Unlock()
err := fh.flushWrites(false)
if err != nil { if err != nil {
fs.Errorf(fh.logPrefix(), "RWFileHandle.Flush error: %v", err) fs.Errorf(fh.logPrefix(), "RWFileHandle.Flush error: %v", err)
} else { } else {

View File

@ -457,14 +457,19 @@ func TestRWFileHandleFlushWrite(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 5, n) assert.Equal(t, 5, n)
// Check Flush closes file if write called // Check Flush does not close file if write called
err = fh.Flush() err = fh.Flush()
assert.NoError(t, err) assert.NoError(t, err)
assert.True(t, fh.closed) assert.False(t, fh.closed)
// Check flush does nothing if called again // Check flush does nothing if called again
err = fh.Flush() err = fh.Flush()
assert.NoError(t, err) assert.NoError(t, err)
assert.False(t, fh.closed)
// Check that Close closes the file
err = fh.Close()
assert.NoError(t, err)
assert.True(t, fh.closed) assert.True(t, fh.closed)
} }

View File

@ -254,7 +254,7 @@ func (vfs *VFS) Fs() fs.Fs {
func (vfs *VFS) SetCacheMode(cacheMode CacheMode) { func (vfs *VFS) SetCacheMode(cacheMode CacheMode) {
vfs.Shutdown() vfs.Shutdown()
vfs.cache = nil vfs.cache = nil
if vfs.Opt.CacheMode > CacheModeOff { if cacheMode > CacheModeOff {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
cache, err := newCache(ctx, vfs.f, &vfs.Opt) // FIXME pass on context or get from Opt? cache, err := newCache(ctx, vfs.f, &vfs.Opt) // FIXME pass on context or get from Opt?
if err != nil { if err != nil {
@ -263,6 +263,7 @@ func (vfs *VFS) SetCacheMode(cacheMode CacheMode) {
cancel() cancel()
return return
} }
vfs.Opt.CacheMode = cacheMode
vfs.cancel = cancel vfs.cancel = cancel
vfs.cache = cache vfs.cache = cache
} }