vfs: cache: rework file downloader

- Download to multiple places at once in the stream
- Restart as necessary
- Timeout unused downloaders
- Close reader if too much data skipped
- Only use one file handle as use item for writing
- Implement --vfs-read-chunk-size and --vfs-read-chunk-limit
- fix deadlock between asyncbuffer and vfs cache downloader
- fix display of stream abandoned error which should be ignored
This commit is contained in:
Nick Craig-Wood 2020-06-04 09:33:50 +01:00
parent 58a7faa281
commit 9ac5c6de14
2 changed files with 530 additions and 292 deletions

View File

@ -2,85 +2,305 @@ package vfscache
import ( import (
"context" "context"
"io"
"io/ioutil"
"os"
"sync" "sync"
"time"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/asyncreader" "github.com/rclone/rclone/fs/asyncreader"
"github.com/rclone/rclone/fs/chunkedreader"
"github.com/rclone/rclone/fs/log" "github.com/rclone/rclone/fs/log"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/lib/file"
"github.com/rclone/rclone/lib/ranges" "github.com/rclone/rclone/lib/ranges"
"github.com/rclone/rclone/lib/readers"
) )
// downloader represents a running download for a file // FIXME implement max downloaders
type downloader struct {
// write only const (
mu sync.Mutex // max time a downloader can be idle before closing itself
maxDownloaderIdleTime = 5 * time.Second
// max number of bytes a reader should skip over before closing it
maxSkipBytes = 1024 * 1024
)
// downloaders is a number of downloader~s and a queue of waiters
// waiting for segments to be downloaded.
type downloaders struct {
// Write once - no locking required
ctx context.Context ctx context.Context
item *Item item *Item
src fs.Object // source object src fs.Object // source object
remote string
fcache fs.Fs // destination Fs fcache fs.Fs // destination Fs
osPath string osPath string
// per download // Read write
out *os.File // file we are writing to mu sync.Mutex
offset int64 // current offset dls []*downloader
waiters []waiter waiters []waiter
tr *accounting.Transfer
in *accounting.Account // input we are reading from
downloading bool // whether the download thread is running
finished chan struct{} // closed when download finished
} }
// waiter is a range we are waiting for and a channel to signal // waiter is a range we are waiting for and a channel to signal when
// the range is found
type waiter struct { type waiter struct {
r ranges.Range r ranges.Range
errChan chan<- error errChan chan<- error
} }
func newDownloader(item *Item, fcache fs.Fs, remote string, src fs.Object) (dl *downloader, err error) { // downloader represents a running download for part of a file.
defer log.Trace(src, "remote=%q", remote)("dl=%+v, err=%v", &dl, &err) type downloader struct {
// Write once
dls *downloaders // parent structure
quit chan struct{} // close to quit the downloader
wg sync.WaitGroup // to keep track of downloader goroutine
kick chan struct{} // kick the downloader when needed
dl = &downloader{ // Read write
mu sync.Mutex
start int64 // start offset
offset int64 // current offset
maxOffset int64 // maximum offset we are reading to
tr *accounting.Transfer
in *accounting.Account // input we are reading from
skipped int64 // number of bytes we have skipped sequentially
_closed bool // set to true if downloader is closed
stop bool // set to true if we have called _stop()
}
func newDownloaders(item *Item, fcache fs.Fs, remote string, src fs.Object) (dls *downloaders) {
dls = &downloaders{
ctx: context.Background(), ctx: context.Background(),
item: item, item: item,
src: src, src: src,
remote: remote,
fcache: fcache, fcache: fcache,
osPath: item.c.toOSPath(remote), osPath: item.c.toOSPath(remote),
} }
return dls
// make sure there is a cache file
_, err = os.Stat(dl.osPath)
if err == nil {
// do nothing
} else if os.IsNotExist(err) {
return nil, errors.New("vfs cache: internal error: newDownloader: called before Item.Open")
// fs.Debugf(src, "creating empty file")
// err = item._truncateToCurrentSize()
// if err != nil {
// return nil, errors.Wrap(err, "newDownloader: failed to create empty file")
// }
} else {
return nil, errors.Wrap(err, "newDownloader: failed to stat cache file")
} }
// Make a new downloader, starting it to download r
//
// call with lock held
func (dls *downloaders) _newDownloader(r ranges.Range) (dl *downloader, err error) {
defer log.Trace(dls.src, "r=%v", r)("err=%v", &err)
dl = &downloader{
kick: make(chan struct{}, 1),
quit: make(chan struct{}),
dls: dls,
start: r.Pos,
offset: r.Pos,
maxOffset: r.End(),
}
err = dl.open(dl.offset)
if err != nil {
_ = dl.close(err)
return nil, errors.Wrap(err, "failed to open downloader")
}
dls.dls = append(dls.dls, dl)
dl.wg.Add(1)
go func() {
defer dl.wg.Done()
err := dl.download()
_ = dl.close(err)
if err != nil {
fs.Errorf(dl.dls.src, "Failed to download: %v", err)
}
err = dl.dls.kickWaiters()
if err != nil {
fs.Errorf(dl.dls.src, "Failed to kick waiters: %v", err)
}
}()
return dl, nil return dl, nil
} }
// _removeClosed() removes any downloaders which are closed.
//
// Call with the mutex held
func (dls *downloaders) _removeClosed() {
newDownloaders := dls.dls[:0]
for _, dl := range dls.dls {
if !dl.closed() {
newDownloaders = append(newDownloaders, dl)
}
}
dls.dls = newDownloaders
}
// Close all running downloaders and return any unfulfilled waiters
// with inErr
func (dls *downloaders) close(inErr error) (err error) {
dls.mu.Lock()
defer dls.mu.Unlock()
dls._removeClosed()
for _, dl := range dls.dls {
dls.mu.Unlock()
closeErr := dl.stopAndClose(inErr)
dls.mu.Lock()
if closeErr != nil && err != nil {
err = closeErr
}
}
dls.dls = nil
dls._dispatchWaiters()
dls._closeWaiters(inErr)
return err
}
// Ensure a downloader is running to download r
func (dls *downloaders) ensure(r ranges.Range) (err error) {
defer log.Trace(dls.src, "r=%+v", r)("err=%v", &err)
dls.mu.Lock()
errChan := make(chan error)
waiter := waiter{
r: r,
errChan: errChan,
}
err = dls._ensureDownloader(r)
if err != nil {
dls.mu.Unlock()
return err
}
dls.waiters = append(dls.waiters, waiter)
dls.mu.Unlock()
return <-errChan
}
// close any waiters with the error passed in // close any waiters with the error passed in
// //
// call with lock held // call with lock held
func (dl *downloader) _closeWaiters(err error) { func (dls *downloaders) _closeWaiters(err error) {
for _, waiter := range dl.waiters { for _, waiter := range dls.waiters {
waiter.errChan <- err waiter.errChan <- err
} }
dl.waiters = nil dls.waiters = nil
}
// ensure a downloader is running for the range if required. If one isn't found
// then it starts it.
//
// call with lock held
func (dls *downloaders) _ensureDownloader(r ranges.Range) (err error) {
// FIXME this window could be a different config var?
window := int64(fs.Config.BufferSize)
// We may be reopening a downloader after a failure here or
// doing a tentative prefetch so check to see that we haven't
// read some stuff already.
//
// Clip r to stuff which needs downloading
r = dls.item.findMissing(r)
// If the range is entirely present then we only need to start a
// dowloader if the window isn't full.
if r.IsEmpty() {
// Make a new range which includes the window
rWindow := r
if rWindow.Size < window {
rWindow.Size = window
}
// Clip rWindow to stuff which needs downloading
rWindow = dls.item.findMissing(rWindow)
// If rWindow is empty then just return without starting a
// downloader as there is no data within the window which needs
// downloading.
if rWindow.IsEmpty() {
return nil
}
// Start downloading at the start of the unread window
r.Pos = rWindow.Pos
// But don't write anything for the moment
r.Size = 0
}
var dl *downloader
// Look through downloaders to find one in range
// If there isn't one then start a new one
dls._removeClosed()
for _, dl = range dls.dls {
start, maxOffset := dl.getRange()
// The downloader's offset to offset+window is the gap
// in which we would like to re-use this
// downloader. The downloader will never reach before
// start and maxOffset+windows is too far away - we'd
// rather start another downloader.
// fs.Debugf(nil, "r=%v start=%d, maxOffset=%d, found=%v", r, start, maxOffset, r.Pos >= start && r.Pos < maxOffset+window)
if r.Pos >= start && r.Pos < maxOffset+window {
// Found downloader which will soon have our data
dl.setRange(r)
return nil
}
}
// Downloader not found so start a new one
dl, err = dls._newDownloader(r)
if err != nil {
return errors.Wrap(err, "failed to start downloader")
}
return err
}
// ensure a downloader is running for offset if required. If one
// isn't found then it starts it
func (dls *downloaders) ensureDownloader(r ranges.Range) (err error) {
dls.mu.Lock()
defer dls.mu.Unlock()
return dls._ensureDownloader(r)
}
// _dispatchWaiters() sends any waiters which have completed back to
// their callers.
//
// Call with the mutex held
func (dls *downloaders) _dispatchWaiters() {
if len(dls.waiters) == 0 {
return
}
newWaiters := dls.waiters[:0]
for _, waiter := range dls.waiters {
if dls.item.hasRange(waiter.r) {
waiter.errChan <- nil
} else {
newWaiters = append(newWaiters, waiter)
}
}
dls.waiters = newWaiters
}
// Send any waiters which have completed back to their callers and make sure
// there is a downloader appropriate for each waiter
func (dls *downloaders) kickWaiters() (err error) {
dls.mu.Lock()
defer dls.mu.Unlock()
dls._dispatchWaiters()
if len(dls.waiters) == 0 {
return nil
}
// Make sure each waiter has a downloader
// This is an O(waiters*downloaders) algorithm
// However the number of waiters and the number of downloaders
// are both expected to be small.
for _, waiter := range dls.waiters {
err = dls._ensureDownloader(waiter.r)
if err != nil {
fs.Errorf(dls.src, "Restart download failed: %v", err)
}
}
return nil
} }
// Write writes len(p) bytes from p to the underlying data stream. It // Write writes len(p) bytes from p to the underlying data stream. It
@ -91,221 +311,194 @@ func (dl *downloader) _closeWaiters(err error) {
// //
// Implementations must not retain p. // Implementations must not retain p.
func (dl *downloader) Write(p []byte) (n int, err error) { func (dl *downloader) Write(p []byte) (n int, err error) {
defer log.Trace(dl.src, "p_len=%d", len(p))("n=%d, err=%v", &n, &err) defer log.Trace(dl.dls.src, "p_len=%d", len(p))("n=%d, err=%v", &n, &err)
var ( // Kick the waiters on exit if some characters received
// Range we wish to write defer func() {
r = ranges.Range{Pos: dl.offset, Size: int64(len(p))} if n <= 0 {
curr ranges.Range return
present bool
nn int
)
// Check to see what regions are already present
dl.mu.Lock()
defer dl.mu.Unlock()
dl.item.mu.Lock()
defer dl.item.mu.Unlock()
// Write the range out ignoring already written chunks
// FIXME might stop downloading if we are ignoring chunks?
for err == nil && !r.IsEmpty() {
curr, r, present = dl.item.info.Rs.Find(r)
if curr.Pos != dl.offset {
return n, errors.New("internal error: offset of range is wrong")
} }
if present { if waitErr := dl.dls.kickWaiters(); waitErr != nil {
// if present want to skip this range fs.Errorf(dl.dls.src, "vfs cache: download write: failed to kick waiters: %v", waitErr)
fs.Debugf(dl.src, "skip chunk offset=%d size=%d", dl.offset, curr.Size) if err == nil {
nn = int(curr.Size) err = waitErr
_, err = dl.out.Seek(curr.Size, io.SeekCurrent)
if err != nil {
nn = 0
} }
} else {
// if range not present then we want to write it
fs.Debugf(dl.src, "write chunk offset=%d size=%d", dl.offset, curr.Size)
nn, err = dl.out.Write(p[:curr.Size])
dl.item.info.Rs.Insert(ranges.Range{Pos: dl.offset, Size: int64(nn)})
}
dl.offset += int64(nn)
p = p[nn:]
n += nn
}
if n > 0 {
if len(dl.waiters) > 0 {
newWaiters := dl.waiters[:0]
for _, waiter := range dl.waiters {
if dl.item.info.Rs.Present(waiter.r) {
waiter.errChan <- nil
} else {
newWaiters = append(newWaiters, waiter)
}
}
dl.waiters = newWaiters
}
}
if err != nil && err != io.EOF {
dl._closeWaiters(err)
}
return n, err
}
// start the download running from offset
func (dl *downloader) start(offset int64) (err error) {
err = dl.open(offset)
if err != nil {
_ = dl.close(err)
return errors.Wrap(err, "failed to open downloader")
}
go func() {
err := dl.download()
_ = dl.close(err)
if err != nil && errors.Cause(err) != asyncreader.ErrorStreamAbandoned {
fs.Errorf(dl.src, "Failed to download: %v", err)
// FIXME set an error here????
} }
}() }()
return nil dl.mu.Lock()
defer dl.mu.Unlock()
// Wait here if we have reached maxOffset until
// - we are quitting
// - we get kicked
// - timeout happens
if dl.offset >= dl.maxOffset {
var timeout = time.NewTimer(maxDownloaderIdleTime)
dl.mu.Unlock()
select {
case <-dl.quit:
dl.mu.Lock()
timeout.Stop()
case <-dl.kick:
dl.mu.Lock()
timeout.Stop()
case <-timeout.C:
// stop any future reading
dl.mu.Lock()
if !dl.stop {
fs.Debugf(dl.dls.src, "stopping download thread as it timed out")
dl._stop()
}
}
}
n, skipped, err := dl.dls.item.writeAtNoOverwrite(p, dl.offset)
if skipped == n {
dl.skipped += int64(skipped)
} else {
dl.skipped = 0
}
dl.offset += int64(n)
// Kill this downloader if skipped too many bytes
if !dl.stop && dl.skipped > maxSkipBytes {
fs.Debugf(dl.dls.src, "stopping download thread as it has skipped %d bytes", dl.skipped)
dl._stop()
}
return n, err
} }
// open the file from offset // open the file from offset
// //
// should be called on a fresh downloader // should be called on a fresh downloader
func (dl *downloader) open(offset int64) (err error) { func (dl *downloader) open(offset int64) (err error) {
defer log.Trace(dl.src, "offset=%d", offset)("err=%v", &err) defer log.Trace(dl.dls.src, "offset=%d", offset)("err=%v", &err)
dl.finished = make(chan struct{}) dl.tr = accounting.Stats(dl.dls.ctx).NewTransfer(dl.dls.src)
defer close(dl.finished)
dl.downloading = true
dl.tr = accounting.Stats(dl.ctx).NewTransfer(dl.src)
size := dl.src.Size() size := dl.dls.src.Size()
if size < 0 { if size < 0 {
// FIXME should just completely download these // FIXME should just completely download these
return errors.New("can't open unknown sized file") return errors.New("can't open unknown sized file")
} }
// FIXME hashType needs to ignore when --no-checksum is set too? Which is a VFS flag. // FIXME hashType needs to ignore when --no-checksum is set too? Which is a VFS flag.
var rangeOption *fs.RangeOption // var rangeOption *fs.RangeOption
if offset > 0 { // if offset > 0 {
rangeOption = &fs.RangeOption{Start: offset, End: size - 1} // rangeOption = &fs.RangeOption{Start: offset, End: size - 1}
} // }
in0, err := operations.NewReOpen(dl.ctx, dl.src, fs.Config.LowLevelRetries, dl.item.c.hashOption, rangeOption) // in0, err := operations.NewReOpen(dl.dls.ctx, dl.dls.src, fs.Config.LowLevelRetries, dl.dls.item.c.hashOption, rangeOption)
in0 := chunkedreader.New(context.TODO(), dl.dls.src, int64(dl.dls.item.c.opt.ChunkSize), int64(dl.dls.item.c.opt.ChunkSizeLimit))
_, err = in0.Seek(offset, 0)
if err != nil { if err != nil {
return errors.Wrap(err, "vfs reader: failed to open source file") return errors.Wrap(err, "vfs reader: failed to open source file")
} }
dl.in = dl.tr.Account(in0).WithBuffer() // account and buffer the transfer dl.in = dl.tr.Account(in0).WithBuffer() // account and buffer the transfer
dl.out, err = file.OpenFile(dl.osPath, os.O_CREATE|os.O_WRONLY, 0700)
if err != nil {
return errors.Wrap(err, "vfs reader: failed to open cache file")
}
dl.offset = offset dl.offset = offset
err = file.SetSparse(dl.out)
if err != nil {
fs.Debugf(dl.src, "vfs reader: failed to set as a sparse file: %v", err)
}
_, err = dl.out.Seek(offset, io.SeekStart)
if err != nil {
return errors.Wrap(err, "vfs reader: failed to seek")
}
// FIXME set mod time // FIXME set mod time
// FIXME check checksums // FIXME check checksums
return nil return nil
} }
var errStop = errors.New("vfs downloader: reading stopped") // close the downloader
func (dl *downloader) close(inErr error) (err error) {
// stop the downloader if running and close everything defer log.Trace(dl.dls.src, "inErr=%v", err)("err=%v", &err)
func (dl *downloader) stop() { checkErr := func(e error) {
defer log.Trace(dl.src, "")("") if e == nil || errors.Cause(err) == asyncreader.ErrorStreamAbandoned {
dl.mu.Lock()
if !dl.downloading || dl.in == nil {
dl.mu.Unlock()
return return
} }
err = e
// stop the downloader
dl.in.StopBuffering()
oldReader := dl.in.GetReader()
dl.in.UpdateReader(ioutil.NopCloser(readers.ErrorReader{Err: errStop}))
err := oldReader.Close()
if err != nil {
fs.Debugf(dl.src, "vfs downloader: stop close old failed: %v", err)
} }
dl.mu.Unlock()
// wait for downloader to finish...
<-dl.finished
}
func (dl *downloader) close(inErr error) (err error) {
defer log.Trace(dl.src, "inErr=%v", err)("err=%v", &err)
dl.stop()
dl.mu.Lock() dl.mu.Lock()
if dl.in != nil { if dl.in != nil {
fs.CheckClose(dl.in, &err) checkErr(dl.in.Close())
dl.in = nil dl.in = nil
} }
if dl.tr != nil { if dl.tr != nil {
dl.tr.Done(inErr) dl.tr.Done(inErr)
dl.tr = nil dl.tr = nil
} }
if dl.out != nil { dl._closed = true
fs.CheckClose(dl.out, &err)
dl.out = nil
}
dl._closeWaiters(err)
dl.downloading = false
dl.mu.Unlock() dl.mu.Unlock()
return nil return nil
} }
/* // closed returns true if the downloader has been closed alread
FIXME func (dl *downloader) closed() bool {
need gating at all the Read/Write sites dl.mu.Lock()
need to pass in offset somehow and start the readfile off defer dl.mu.Unlock()
need to end when offset is reached return dl._closed
need to be able to quit on demand }
Need offset to be passed to NewReOpen
*/ // stop the downloader if running
// fetch the (offset, size) block from the remote file //
// Call with the mutex held
func (dl *downloader) _stop() {
defer log.Trace(dl.dls.src, "")("")
// exit if have already called _stop
if dl.stop {
return
}
dl.stop = true
// Signal quit now to unblock the downloader
close(dl.quit)
// stop the downloader by stopping the async reader buffering
// any more input. This causes all the stuff in the async
// buffer (which can be many MB) to be written to the disk
// before exiting.
if dl.in != nil {
dl.in.StopBuffering()
}
}
// stop the downloader if running then close it with the error passed in
func (dl *downloader) stopAndClose(inErr error) (err error) {
// Stop the downloader by closing its input
dl.mu.Lock()
dl._stop()
dl.mu.Unlock()
// wait for downloader to finish...
// do this without mutex as asyncreader
// calls back into Write() which needs the lock
dl.wg.Wait()
return dl.close(inErr)
}
// Start downloading to the local file starting at offset until maxOffset.
func (dl *downloader) download() (err error) { func (dl *downloader) download() (err error) {
defer log.Trace(dl.src, "")("err=%v", &err) defer log.Trace(dl.dls.src, "")("err=%v", &err)
_, err = dl.in.WriteTo(dl) _, err = dl.in.WriteTo(dl)
if err != nil { if err != nil && errors.Cause(err) != asyncreader.ErrorStreamAbandoned {
return errors.Wrap(err, "vfs reader: failed to write to cache file") return errors.Wrap(err, "vfs reader: failed to write to cache file")
} }
return nil return nil
} }
// ensure the range is present // setRange makes sure the downloader is downloading the range passed in
func (dl *downloader) ensure(r ranges.Range) (err error) { func (dl *downloader) setRange(r ranges.Range) {
defer log.Trace(dl.src, "r=%+v", r)("err=%v", &err)
errChan := make(chan error)
waiter := waiter{
r: r,
errChan: errChan,
}
dl.mu.Lock() dl.mu.Lock()
// FIXME racey - might have finished here maxOffset := r.End()
dl.waiters = append(dl.waiters, waiter) if maxOffset > dl.maxOffset {
dl.maxOffset = maxOffset
// fs.Debugf(dl.dls.src, "kicking downloader with maxOffset %d", maxOffset)
select {
case dl.kick <- struct{}{}:
default:
}
}
dl.mu.Unlock() dl.mu.Unlock()
return <-errChan
} }
// ensure the range is present // get the current range this downloader is working on
func (dl *downloader) running() bool { func (dl *downloader) getRange() (start, maxOffset int64) {
dl.mu.Lock() dl.mu.Lock()
defer dl.mu.Unlock() defer dl.mu.Unlock()
return dl.downloading return dl.start, dl.maxOffset
} }

View File

@ -46,7 +46,7 @@ type Item struct {
mu sync.Mutex // protect the variables mu sync.Mutex // protect the variables
name string // name in the VFS name string // name in the VFS
opens int // number of times file is open opens int // number of times file is open
downloader *downloader // if the file is being downloaded to cache downloaders *downloaders // a record of the downloaders in action - may be nil
o fs.Object // object we are caching - may be nil o fs.Object // object we are caching - may be nil
fd *os.File // handle we are using to read and write to the file fd *os.File // handle we are using to read and write to the file
metaDirty bool // set if the info needs writeback metaDirty bool // set if the info needs writeback
@ -453,6 +453,9 @@ func (item *Item) Open(o fs.Object) (err error) {
// Relock the Item.mu for the return // Relock the Item.mu for the return
item.mu.Lock() item.mu.Lock()
// Create the downloaders
item.downloaders = newDownloaders(item, item.c.fremote, item.name, item.o)
return err return err
} }
@ -514,32 +517,9 @@ func (item *Item) store(ctx context.Context, storeFn StoreFn) (err error) {
func (item *Item) Close(storeFn StoreFn) (err error) { func (item *Item) Close(storeFn StoreFn) (err error) {
defer log.Trace(item.o, "Item.Close")("err=%v", &err) defer log.Trace(item.o, "Item.Close")("err=%v", &err)
var ( var (
downloader *downloader downloaders *downloaders
syncWriteBack = item.c.opt.WriteBack <= 0 syncWriteBack = item.c.opt.WriteBack <= 0
) )
// FIXME need to unlock to kill downloader - should we
// re-arrange locking so this isn't necessary? maybe
// downloader should use the item mutex for locking? or put a
// finer lock on Rs?
//
// close downloader with mutex unlocked
// downloader.Write calls ensure which needs the lock
defer func() {
if downloader != nil {
closeErr := downloader.close(nil)
if closeErr != nil && err == nil {
err = closeErr
}
}
// save the metadata once more since it may be dirty
// after the downloader
item.mu.Lock()
saveErr := item._save()
if saveErr != nil && err == nil {
err = errors.Wrap(saveErr, "close failed to save item")
}
item.mu.Unlock()
}()
item.mu.Lock() item.mu.Lock()
defer item.mu.Unlock() defer item.mu.Unlock()
@ -554,21 +534,43 @@ func (item *Item) Close(storeFn StoreFn) (err error) {
// Update the size on close // Update the size on close
_, _ = item._getSize() _, _ = item._getSize()
err = item._save()
if err != nil { // Accumulate and log errors
return errors.Wrap(err, "close failed to save item") checkErr := func(e error) {
if e != nil {
fs.Errorf(item.o, "vfs cache item close failed: %v", e)
if err == nil {
err = e
}
}
} }
// close the downloader // Close the downloaders
downloader = item.downloader if downloaders = item.downloaders; downloaders != nil {
item.downloader = nil item.downloaders = nil
// FIXME need to unlock to kill downloader - should we
// re-arrange locking so this isn't necessary? maybe
// downloader should use the item mutex for locking? or put a
// finer lock on Rs?
//
// downloader.Write calls ensure which needs the lock
// close downloader with mutex unlocked
item.mu.Unlock()
checkErr(downloaders.close(nil))
item.mu.Lock()
}
// close the file handle // close the file handle
if item.fd == nil { if item.fd == nil {
return errors.New("vfs cache item: internal error: didn't Open file") checkErr(errors.New("vfs cache item: internal error: didn't Open file"))
} } else {
err = item.fd.Close() checkErr(item.fd.Close())
item.fd = nil item.fd = nil
}
// save the metadata once more since it may be dirty
// after the downloader
checkErr(item._save())
// if the item hasn't been changed but has been completed then // if the item hasn't been changed but has been completed then
// set the modtime from the object otherwise set it from the info // set the modtime from the object otherwise set it from the info
@ -585,7 +587,7 @@ func (item *Item) Close(storeFn StoreFn) (err error) {
fs.Debugf(item.name, "item changed - writeback in %v", item.c.opt.WriteBack) fs.Debugf(item.name, "item changed - writeback in %v", item.c.opt.WriteBack)
if syncWriteBack { if syncWriteBack {
// do synchronous writeback // do synchronous writeback
err = item._store(context.Background(), storeFn) checkErr(item._store(context.Background(), storeFn))
} else { } else {
// asynchronous writeback // asynchronous writeback
item.c.writeback.add(item, item.name, storeFn) item.c.writeback.add(item, item.name, storeFn)
@ -720,35 +722,33 @@ func (item *Item) remove(reason string) (wasWriting bool) {
return item._remove(reason) return item._remove(reason)
} }
// create a downloader for the item
//
// call with item mutex held
func (item *Item) _newDownloader() (err error) {
// If no cached object then can't download
if item.o == nil {
return errors.New("vfs cache: internal error: tried to download nil object")
}
// If downloading the object already stop the downloader and restart it
if item.downloader != nil {
item.mu.Unlock()
_ = item.downloader.close(nil)
item.mu.Lock()
item.downloader = nil
}
item.downloader, err = newDownloader(item, item.c.fremote, item.name, item.o)
return err
}
// _present returns true if the whole file has been downloaded // _present returns true if the whole file has been downloaded
// //
// call with the lock held // call with the lock held
func (item *Item) _present() bool { func (item *Item) _present() bool {
if item.downloader != nil && item.downloader.running() {
return false
}
return item.info.Rs.Present(ranges.Range{Pos: 0, Size: item.info.Size}) return item.info.Rs.Present(ranges.Range{Pos: 0, Size: item.info.Size})
} }
// hasRange returns true if the current ranges entirely include range
func (item *Item) hasRange(r ranges.Range) bool {
item.mu.Lock()
defer item.mu.Unlock()
return item.info.Rs.Present(r)
}
// findMissing adjusts r returning a new ranges.Range which only
// contains the range which needs to be downloaded. This could be
// empty - check with IsEmpty. It also adjust this to make sure it is
// not larger than the file.
func (item *Item) findMissing(r ranges.Range) (outr ranges.Range) {
item.mu.Lock()
defer item.mu.Unlock()
outr = item.info.Rs.FindMissing(r)
// Clip returned block to size of file
outr.Clip(item.info.Size)
return outr
}
// ensure the range from offset, size is present in the backing file // ensure the range from offset, size is present in the backing file
// //
// call with the item lock held // call with the item lock held
@ -759,33 +759,21 @@ func (item *Item) _ensure(offset, size int64) (err error) {
} }
r := ranges.Range{Pos: offset, Size: size} r := ranges.Range{Pos: offset, Size: size}
present := item.info.Rs.Present(r) present := item.info.Rs.Present(r)
downloader := item.downloader
fs.Debugf(nil, "looking for range=%+v in %+v - present %v", r, item.info.Rs, present) fs.Debugf(nil, "looking for range=%+v in %+v - present %v", r, item.info.Rs, present)
if present {
return nil
}
// FIXME pass in offset here to decide to seek?
err = item._newDownloader()
if err != nil {
return errors.Wrap(err, "Ensure: failed to start downloader")
}
downloader = item.downloader
if downloader == nil {
return errors.New("internal error: downloader is nil")
}
if !downloader.running() {
// FIXME need to make sure we start in the correct place because some of offset,size might exist
// FIXME this could stop an old download
item.mu.Unlock()
err = downloader.start(offset)
item.mu.Lock()
if err != nil {
return errors.Wrap(err, "Ensure: failed to run downloader")
}
}
item.mu.Unlock() item.mu.Unlock()
defer item.mu.Lock() defer item.mu.Lock()
return item.downloader.ensure(r) if present {
// This is a file we are writing so no downloaders needed
if item.downloaders == nil {
return nil
}
// Otherwise start the downloader for the future if required
return item.downloaders.ensureDownloader(r)
}
if item.downloaders == nil {
return errors.New("internal error: downloaders is nil")
}
return item.downloaders.ensure(r)
} }
// _written marks the (offset, size) as present in the backing file // _written marks the (offset, size) as present in the backing file
@ -796,7 +784,7 @@ func (item *Item) _ensure(offset, size int64) (err error) {
// call with lock held // call with lock held
func (item *Item) _written(offset, size int64) { func (item *Item) _written(offset, size int64) {
defer log.Trace(item.name, "offset=%d, size=%d", offset, size)("") defer log.Trace(item.name, "offset=%d, size=%d", offset, size)("")
item.info.Rs.Insert(ranges.Range{Pos: offset, Size: offset + size}) item.info.Rs.Insert(ranges.Range{Pos: offset, Size: size})
item.metaDirty = true item.metaDirty = true
} }
@ -868,6 +856,9 @@ func (item *Item) WriteAt(b []byte, off int64) (n int, err error) {
item.mu.Unlock() item.mu.Unlock()
// Do the writing with Item.mu unlocked // Do the writing with Item.mu unlocked
n, err = item.fd.WriteAt(b, off) n, err = item.fd.WriteAt(b, off)
if err == nil && n != len(b) {
err = errors.Errorf("short write: tried to write %d but only %d written", len(b), n)
}
item.mu.Lock() item.mu.Lock()
item._written(off, int64(n)) item._written(off, int64(n))
if n > 0 { if n > 0 {
@ -881,6 +872,60 @@ func (item *Item) WriteAt(b []byte, off int64) (n int, err error) {
return n, err return n, err
} }
// writeAtNoOverwrite writes b to the file, but will not overwrite
// already present ranges.
//
// This is used by the downloader to write bytes to the file
//
// It returns n the total bytes processed and skipped the number of
// bytes which were processed but not actually written to the file.
func (item *Item) writeAtNoOverwrite(b []byte, off int64) (n int, skipped int, err error) {
item.mu.Lock()
var (
// Range we wish to write
r = ranges.Range{Pos: off, Size: int64(len(b))}
// Ranges that we need to write
foundRanges = item.info.Rs.FindAll(r)
// Length of each write
nn int
)
// Write the range out ignoring already written chunks
fs.Debugf(item.name, "Ranges = %v", item.info.Rs)
for i := range foundRanges {
foundRange := &foundRanges[i]
fs.Debugf(item.name, "foundRange[%d] = %v", i, foundRange)
if foundRange.R.Pos != off {
err = errors.New("internal error: offset of range is wrong")
break
}
size := int(foundRange.R.Size)
if foundRange.Present {
// if present want to skip this range
fs.Debugf(item.name, "skip chunk offset=%d size=%d", off, size)
nn = size
skipped += size
} else {
// if range not present then we want to write it
fs.Debugf(item.name, "write chunk offset=%d size=%d", off, size)
nn, err = item.fd.WriteAt(b[:size], off)
if err == nil && nn != size {
err = errors.Errorf("downloader: short write: tried to write %d but only %d written", size, nn)
}
item._written(off, int64(nn))
}
off += int64(nn)
b = b[nn:]
n += nn
if err != nil {
break
}
}
item.mu.Unlock()
return n, skipped, err
}
// Sync commits the current contents of the file to stable storage. Typically, // Sync commits the current contents of the file to stable storage. Typically,
// this means flushing the file system's in-memory copy of recently written // this means flushing the file system's in-memory copy of recently written
// data to disk. // data to disk.
@ -904,11 +949,11 @@ func (item *Item) Sync() (err error) {
// rename the item // rename the item
func (item *Item) rename(name string, newName string, newObj fs.Object) (err error) { func (item *Item) rename(name string, newName string, newObj fs.Object) (err error) {
var downloader *downloader var downloaders *downloaders
// close downloader with mutex unlocked // close downloader with mutex unlocked
defer func() { defer func() {
if downloader != nil { if downloaders != nil {
_ = downloader.close(nil) _ = downloaders.close(nil)
} }
}() }()
@ -916,8 +961,8 @@ func (item *Item) rename(name string, newName string, newObj fs.Object) (err err
defer item.mu.Unlock() defer item.mu.Unlock()
// stop downloader // stop downloader
downloader = item.downloader downloaders = item.downloaders
item.downloader = nil item.downloaders = nil
// Set internal state // Set internal state
item.name = newName item.name = newName