mirror of
https://github.com/rclone/rclone.git
synced 2025-01-12 01:08:37 +01:00
af030f74f5
This makes WriteAt for non cached files wait a short time if it gets an out of order write (which would normally cause an error) to see if the gap will be filled with an in order write. The makes the SFTP backend work fine with --vfs-cache-mode off
325 lines
9.3 KiB
Go
325 lines
9.3 KiB
Go
package vfs
|
|
|
|
import (
|
|
"io"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ncw/rclone/fs"
|
|
"github.com/ncw/rclone/fs/operations"
|
|
)
|
|
|
|
// WriteFileHandle is an open for write handle on a File
|
|
type WriteFileHandle struct {
|
|
baseHandle
|
|
mu sync.Mutex
|
|
closed bool // set if handle has been closed
|
|
remote string
|
|
pipeWriter *io.PipeWriter
|
|
o fs.Object
|
|
result chan error
|
|
file *File
|
|
writeCalled bool // set the first time Write() is called
|
|
offset int64
|
|
opened bool
|
|
flags int
|
|
truncated bool
|
|
}
|
|
|
|
// Check interfaces
|
|
var (
|
|
_ io.Writer = (*WriteFileHandle)(nil)
|
|
_ io.WriterAt = (*WriteFileHandle)(nil)
|
|
_ io.Closer = (*WriteFileHandle)(nil)
|
|
)
|
|
|
|
func newWriteFileHandle(d *Dir, f *File, remote string, flags int) (*WriteFileHandle, error) {
|
|
fh := &WriteFileHandle{
|
|
remote: remote,
|
|
flags: flags,
|
|
result: make(chan error, 1),
|
|
file: f,
|
|
}
|
|
fh.file.addWriter(fh)
|
|
return fh, nil
|
|
}
|
|
|
|
// returns whether it is OK to truncate the file
|
|
func (fh *WriteFileHandle) safeToTruncate() bool {
|
|
return fh.truncated || fh.flags&os.O_TRUNC != 0 || !fh.file.exists()
|
|
}
|
|
|
|
// openPending opens the file if there is a pending open
|
|
//
|
|
// call with the lock held
|
|
func (fh *WriteFileHandle) openPending() (err error) {
|
|
if fh.opened {
|
|
return nil
|
|
}
|
|
if !fh.safeToTruncate() {
|
|
fs.Errorf(fh.remote, "WriteFileHandle: Can't open for write without O_TRUNC on existing file without --vfs-cache-mode >= writes")
|
|
return EPERM
|
|
}
|
|
var pipeReader *io.PipeReader
|
|
pipeReader, fh.pipeWriter = io.Pipe()
|
|
go func() {
|
|
// NB Rcat deals with Stats.Transferring etc
|
|
o, err := operations.Rcat(fh.file.d.f, fh.remote, pipeReader, time.Now())
|
|
if err != nil {
|
|
fs.Errorf(fh.remote, "WriteFileHandle.New Rcat failed: %v", err)
|
|
}
|
|
// Close the pipeReader so the pipeWriter fails with ErrClosedPipe
|
|
_ = pipeReader.Close()
|
|
fh.o = o
|
|
fh.result <- err
|
|
}()
|
|
fh.file.setSize(0)
|
|
fh.truncated = true
|
|
fh.file.d.addObject(fh.file) // make sure the directory has this object in it now
|
|
fh.opened = true
|
|
return nil
|
|
}
|
|
|
|
// String converts it to printable
|
|
func (fh *WriteFileHandle) String() string {
|
|
if fh == nil {
|
|
return "<nil *WriteFileHandle>"
|
|
}
|
|
fh.mu.Lock()
|
|
defer fh.mu.Unlock()
|
|
if fh.file == nil {
|
|
return "<nil *WriteFileHandle.file>"
|
|
}
|
|
return fh.file.String() + " (w)"
|
|
}
|
|
|
|
// Node returns the Node assocuated with this - satisfies Noder interface
|
|
func (fh *WriteFileHandle) Node() Node {
|
|
fh.mu.Lock()
|
|
defer fh.mu.Unlock()
|
|
return fh.file
|
|
}
|
|
|
|
// WriteAt writes len(p) bytes from p to the underlying data stream at offset
|
|
// off. It returns the number of bytes written from p (0 <= n <= len(p)) and
|
|
// any error encountered that caused the write to stop early. WriteAt must
|
|
// return a non-nil error if it returns n < len(p).
|
|
//
|
|
// If WriteAt is writing to a destination with a seek offset, WriteAt should
|
|
// not affect nor be affected by the underlying seek offset.
|
|
//
|
|
// Clients of WriteAt can execute parallel WriteAt calls on the same
|
|
// destination if the ranges do not overlap.
|
|
//
|
|
// Implementations must not retain p.
|
|
func (fh *WriteFileHandle) WriteAt(p []byte, off int64) (n int, err error) {
|
|
fh.mu.Lock()
|
|
defer fh.mu.Unlock()
|
|
return fh.writeAt(p, off)
|
|
}
|
|
|
|
// Implementatino of WriteAt - call with lock held
|
|
func (fh *WriteFileHandle) writeAt(p []byte, off int64) (n int, err error) {
|
|
// defer log.Trace(fh.remote, "len=%d off=%d", len(p), off)("n=%d, fh.off=%d, err=%v", &n, &fh.offset, &err)
|
|
if fh.closed {
|
|
fs.Errorf(fh.remote, "WriteFileHandle.Write: error: %v", EBADF)
|
|
return 0, ECLOSED
|
|
}
|
|
// Wait a short time for sequential writes to appear
|
|
const maxTries = 1000
|
|
const sleepTime = 1 * time.Millisecond
|
|
for try := 1; fh.offset != off && try <= maxTries; try++ {
|
|
//fs.Debugf(fh.remote, "waiting for in sequence write %d/%d", try, maxTries)
|
|
fh.mu.Unlock()
|
|
time.Sleep(sleepTime)
|
|
fh.mu.Lock()
|
|
}
|
|
if fh.offset != off {
|
|
fs.Errorf(fh.remote, "WriteFileHandle.Write: can't seek in file without --vfs-cache-mode >= writes")
|
|
return 0, ESPIPE
|
|
}
|
|
if err = fh.openPending(); err != nil {
|
|
return 0, err
|
|
}
|
|
fh.writeCalled = true
|
|
n, err = fh.pipeWriter.Write(p)
|
|
fh.offset += int64(n)
|
|
fh.file.setSize(fh.offset)
|
|
if err != nil {
|
|
fs.Errorf(fh.remote, "WriteFileHandle.Write error: %v", err)
|
|
return 0, err
|
|
}
|
|
// fs.Debugf(fh.remote, "WriteFileHandle.Write OK (%d bytes written)", n)
|
|
return n, nil
|
|
}
|
|
|
|
// Write writes len(p) bytes from p to the underlying data stream. It returns
|
|
// the number of bytes written from p (0 <= n <= len(p)) and any error
|
|
// encountered that caused the write to stop early. Write must return a non-nil
|
|
// error if it returns n < len(p). Write must not modify the slice data, even
|
|
// temporarily.
|
|
//
|
|
// Implementations must not retain p.
|
|
func (fh *WriteFileHandle) Write(p []byte) (n int, err error) {
|
|
fh.mu.Lock()
|
|
defer fh.mu.Unlock()
|
|
// Since we can't seek, just call WriteAt with the current offset
|
|
return fh.writeAt(p, fh.offset)
|
|
}
|
|
|
|
// WriteString a string to the file
|
|
func (fh *WriteFileHandle) WriteString(s string) (n int, err error) {
|
|
return fh.Write([]byte(s))
|
|
}
|
|
|
|
// Offset returns the offset of the file pointer
|
|
func (fh *WriteFileHandle) Offset() (offset int64) {
|
|
fh.mu.Lock()
|
|
defer fh.mu.Unlock()
|
|
return fh.offset
|
|
}
|
|
|
|
// close the file handle returning EBADF if it has been
|
|
// closed already.
|
|
//
|
|
// Must be called with fh.mu held
|
|
func (fh *WriteFileHandle) close() (err error) {
|
|
if fh.closed {
|
|
return ECLOSED
|
|
}
|
|
fh.closed = true
|
|
// leave writer open until file is transferred
|
|
defer func() {
|
|
fh.file.delWriter(fh, false)
|
|
fh.file.finishWriterClose()
|
|
}()
|
|
// If file not opened and not safe to truncate then then leave file intact
|
|
if !fh.opened && !fh.safeToTruncate() {
|
|
return nil
|
|
}
|
|
if err = fh.openPending(); err != nil {
|
|
return err
|
|
}
|
|
writeCloseErr := fh.pipeWriter.Close()
|
|
err = <-fh.result
|
|
if err == nil {
|
|
fh.file.setObject(fh.o)
|
|
err = writeCloseErr
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Close closes the file
|
|
func (fh *WriteFileHandle) Close() error {
|
|
fh.mu.Lock()
|
|
defer fh.mu.Unlock()
|
|
return fh.close()
|
|
}
|
|
|
|
// Flush is called on each close() of a file descriptor. So if a
|
|
// filesystem wants to return write errors in close() and the file has
|
|
// cached dirty data, this is a good place to write back data and
|
|
// return any errors. Since many applications ignore close() errors
|
|
// this is not always useful.
|
|
//
|
|
// NOTE: The flush() method may be called more than once for each
|
|
// open(). This happens if more than one file descriptor refers to an
|
|
// opened file due to dup(), dup2() or fork() calls. It is not
|
|
// possible to determine if a flush is final, so each flush should be
|
|
// treated equally. Multiple write-flush sequences are relatively
|
|
// rare, so this shouldn't be a problem.
|
|
//
|
|
// Filesystems shouldn't assume that flush will always be called after
|
|
// some writes, or that if will be called at all.
|
|
func (fh *WriteFileHandle) Flush() error {
|
|
fh.mu.Lock()
|
|
defer fh.mu.Unlock()
|
|
if fh.closed {
|
|
fs.Debugf(fh.remote, "WriteFileHandle.Flush nothing to do")
|
|
return nil
|
|
}
|
|
// fs.Debugf(fh.remote, "WriteFileHandle.Flush")
|
|
// If Write hasn't been called then ignore the Flush - Release
|
|
// will pick it up
|
|
if !fh.writeCalled {
|
|
fs.Debugf(fh.remote, "WriteFileHandle.Flush unwritten handle, writing 0 bytes to avoid race conditions")
|
|
_, err := fh.writeAt([]byte{}, fh.offset)
|
|
return err
|
|
}
|
|
err := fh.close()
|
|
if err != nil {
|
|
fs.Errorf(fh.remote, "WriteFileHandle.Flush error: %v", err)
|
|
} else {
|
|
// fs.Debugf(fh.remote, "WriteFileHandle.Flush OK")
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Release is called when we are finished with the file handle
|
|
//
|
|
// It isn't called directly from userspace so the error is ignored by
|
|
// the kernel
|
|
func (fh *WriteFileHandle) Release() error {
|
|
fh.mu.Lock()
|
|
defer fh.mu.Unlock()
|
|
if fh.closed {
|
|
fs.Debugf(fh.remote, "WriteFileHandle.Release nothing to do")
|
|
return nil
|
|
}
|
|
fs.Debugf(fh.remote, "WriteFileHandle.Release closing")
|
|
err := fh.close()
|
|
if err != nil {
|
|
fs.Errorf(fh.remote, "WriteFileHandle.Release error: %v", err)
|
|
} else {
|
|
// fs.Debugf(fh.remote, "WriteFileHandle.Release OK")
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Stat returns info about the file
|
|
func (fh *WriteFileHandle) Stat() (os.FileInfo, error) {
|
|
fh.mu.Lock()
|
|
defer fh.mu.Unlock()
|
|
return fh.file, nil
|
|
}
|
|
|
|
// Truncate file to given size
|
|
func (fh *WriteFileHandle) Truncate(size int64) (err error) {
|
|
fh.mu.Lock()
|
|
defer fh.mu.Unlock()
|
|
if fh.closed {
|
|
return ECLOSED
|
|
}
|
|
if size != fh.offset {
|
|
fs.Errorf(fh.remote, "WriteFileHandle: Truncate: Can't change size without --vfs-cache-mode >= writes")
|
|
return EPERM
|
|
}
|
|
// File is correct size
|
|
if size == 0 {
|
|
fh.truncated = true
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Read reads up to len(p) bytes into p.
|
|
func (fh *WriteFileHandle) Read(p []byte) (n int, err error) {
|
|
fs.Errorf(fh.remote, "WriteFileHandle: Read: Can't read and write to file without --vfs-cache-mode >= minimal")
|
|
return 0, EPERM
|
|
}
|
|
|
|
// ReadAt reads len(p) bytes into p starting at offset off in the
|
|
// underlying input source. It returns the number of bytes read (0 <=
|
|
// n <= len(p)) and any error encountered.
|
|
func (fh *WriteFileHandle) ReadAt(p []byte, off int64) (n int, err error) {
|
|
fs.Errorf(fh.remote, "WriteFileHandle: ReadAt: Can't read and write to file without --vfs-cache-mode >= minimal")
|
|
return 0, EPERM
|
|
}
|
|
|
|
// Sync commits the current contents of the file to stable storage. Typically,
|
|
// this means flushing the file system's in-memory copy of recently written
|
|
// data to disk.
|
|
func (fh *WriteFileHandle) Sync() error {
|
|
return nil
|
|
}
|