diff --git a/vfs/vfscache/downloader.go b/vfs/vfscache/downloader.go index f85406044..b47b3f86b 100644 --- a/vfs/vfscache/downloader.go +++ b/vfs/vfscache/downloader.go @@ -21,6 +21,8 @@ const ( maxDownloaderIdleTime = 5 * time.Second // max number of bytes a reader should skip over before closing it maxSkipBytes = 1024 * 1024 + // time between background kicks of waiters to pick up errors + backgroundKickerInterval = 5 * time.Second ) // downloaders is a number of downloader~s and a queue of waiters @@ -28,11 +30,13 @@ const ( type downloaders struct { // Write once - no locking required ctx context.Context + cancel context.CancelFunc item *Item src fs.Object // source object remote string fcache fs.Fs // destination Fs osPath string + wg sync.WaitGroup // Read write mu sync.Mutex @@ -68,14 +72,35 @@ type downloader struct { } func newDownloaders(item *Item, fcache fs.Fs, remote string, src fs.Object) (dls *downloaders) { + if src == nil { + panic("internal error: newDownloaders called with nil src object") + } + ctx, cancel := context.WithCancel(context.Background()) dls = &downloaders{ - ctx: context.Background(), + ctx: ctx, + cancel: cancel, item: item, src: src, remote: remote, fcache: fcache, osPath: item.c.toOSPath(remote), } + dls.wg.Add(1) + go func() { + defer dls.wg.Done() + ticker := time.NewTicker(backgroundKickerInterval) + select { + case <-ticker.C: + err := dls.kickWaiters() + if err != nil { + fs.Errorf(dls.src, "Failed to kick waiters: %v", err) + } + case <-ctx.Done(): + break + } + ticker.Stop() + }() + return dls } @@ -146,6 +171,8 @@ func (dls *downloaders) close(inErr error) (err error) { err = closeErr } } + dls.cancel() + dls.wg.Wait() dls.dls = nil dls._dispatchWaiters() dls._closeWaiters(inErr) @@ -296,10 +323,15 @@ func (dls *downloaders) kickWaiters() (err error) { for _, waiter := range dls.waiters { err = dls._ensureDownloader(waiter.r) if err != nil { + // Failures here will be retried by background kicker fs.Errorf(dls.src, "Restart download failed: %v", err) } } + if true { + + } + return nil } diff --git a/vfs/vfscache/item.go b/vfs/vfscache/item.go index 1d1d1779b..09e50e3c5 100644 --- a/vfs/vfscache/item.go +++ b/vfs/vfscache/item.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "io" "os" "sync" "time" @@ -32,13 +33,11 @@ import ( // NB Item and downloader are tightly linked so it is necessary to // have a total lock ordering between them. downloader.mu must always // be taken before Item.mu. downloader may call into Item but Item may -// **not** call downloader methods with Item.mu held, except for -// -// - downloader.running +// **not** call downloader methods with Item.mu held // Item is stored in the item map // -// These are written to the backing store to store status +// The Info field is written to the backing store to store status type Item struct { // read only c *Cache // cache this is part of @@ -145,13 +144,6 @@ func (item *Item) getATime() time.Time { return item.info.ATime } -// getName returns the name of the item -func (item *Item) getName() string { - item.mu.Lock() - defer item.mu.Unlock() - return item.name -} - // getDiskSize returns the size on disk (approximately) of the item // // We return the sizes of the chunks we have fetched, however there is @@ -277,6 +269,10 @@ func (item *Item) Truncate(size int64) (err error) { item.mu.Lock() defer item.mu.Unlock() + if item.fd == nil { + return errors.New("vfs cache item truncate: internal error: didn't Open file") + } + // Read old size oldSize, err := item._getSize() if err != nil { @@ -454,7 +450,9 @@ func (item *Item) Open(o fs.Object) (err error) { item.mu.Lock() // Create the downloaders - item.downloaders = newDownloaders(item, item.c.fremote, item.name, item.o) + if item.o != nil { + item.downloaders = newDownloaders(item, item.c.fremote, item.name, item.o) + } return err } @@ -466,12 +464,6 @@ func (item *Item) Open(o fs.Object) (err error) { func (item *Item) _store(ctx context.Context, storeFn StoreFn) (err error) { defer log.Trace(item.name, "item=%p", item)("err=%v", &err) - // Ensure any segments not transferred are brought in - err = item._ensure(0, item.info.Size) - if err != nil { - return errors.Wrap(err, "vfs cache: failed to download missing parts of cache file") - } - // Transfer the temp file to the remote cacheObj, err := item.c.fcache.NewObject(ctx, item.name) if err != nil && err != fs.ErrorObjectNotFound { @@ -535,6 +527,19 @@ func (item *Item) Close(storeFn StoreFn) (err error) { // Update the size on close _, _ = item._getSize() + // If the file is dirty ensure any segments not transferred + // are brought in first. + // + // FIXME It would be nice to do this asynchronously howeve it + // would require keeping the downloaders alive after the item + // has been closed + if item.info.Dirty && item.o != nil { + err = item._ensure(0, item.info.Size) + if err != nil { + return errors.Wrap(err, "vfs cache: failed to download missing parts of cache file") + } + } + // Accumulate and log errors checkErr := func(e error) { if e != nil { @@ -599,6 +604,8 @@ func (item *Item) Close(storeFn StoreFn) (err error) { // reload is called with valid items recovered from a cache reload. // +// If they are dirty then it makes sure they get uploaded +// // it is called before the cache has started so opens will be 0 and // metaDirty will be false. func (item *Item) reload(ctx context.Context) error { @@ -729,6 +736,13 @@ func (item *Item) _present() bool { return item.info.Rs.Present(ranges.Range{Pos: 0, Size: item.info.Size}) } +// present returns true if the whole file has been downloaded +func (item *Item) present() bool { + item.mu.Lock() + defer item.mu.Unlock() + return item._present() +} + // hasRange returns true if the current ranges entirely include range func (item *Item) hasRange(r ranges.Range) bool { item.mu.Lock() @@ -781,6 +795,10 @@ func (item *Item) _ensure(offset, size int64) (err error) { // This is called by the downloader downloading file segments and the // vfs layer writing to the file. // +// This doesn't mark the item as Dirty - that the the responsibility +// of the caller as we don't know here whether we are adding reads or +// writes to the cache file. +// // call with lock held func (item *Item) _written(offset, size int64) { defer log.Trace(item.name, "offset=%d, size=%d", offset, size)("") @@ -835,6 +853,10 @@ func (item *Item) ReadAt(b []byte, off int64) (n int, err error) { item.mu.Unlock() return 0, errors.New("vfs cache item ReadAt: internal error: didn't Open file") } + if off < 0 { + item.mu.Unlock() + return 0, io.EOF + } err = item._ensure(off, int64(len(b))) if err != nil { item.mu.Unlock() @@ -865,6 +887,14 @@ func (item *Item) WriteAt(b []byte, off int64) (n int, err error) { item._dirty() } end := off + int64(n) + // Writing off the end of the file so need to make some + // zeroes. we do this by showing that we have written to the + // new parts of the file. + if off > item.info.Size { + item._written(item.info.Size, off-item.info.Size) + item._dirty() + } + // Update size if end > item.info.Size { item.info.Size = end } diff --git a/vfs/vfscache/item_test.go b/vfs/vfscache/item_test.go new file mode 100644 index 000000000..91fdf2137 --- /dev/null +++ b/vfs/vfscache/item_test.go @@ -0,0 +1,594 @@ +package vfscache + +// FIXME need to test async writeback here + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "math/rand" + "os" + "sync" + "testing" + "time" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fstest" + "github.com/rclone/rclone/lib/random" + "github.com/rclone/rclone/lib/readers" + "github.com/rclone/rclone/vfs/vfscommon" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var zeroes = string(make([]byte, 100)) + +func newItemTestCache(t *testing.T) (r *fstest.Run, c *Cache, cleanup func()) { + opt := vfscommon.DefaultOpt + + // Disable the cache cleaner as it interferes with these tests + opt.CachePollInterval = 0 + + // Disable synchronous write + opt.WriteBack = 0 + + return newTestCacheOpt(t, opt) +} + +// Check the object has contents +func checkObject(t *testing.T, r *fstest.Run, remote string, contents string) { + obj, err := r.Fremote.NewObject(context.Background(), remote) + require.NoError(t, err) + in, err := obj.Open(context.Background()) + require.NoError(t, err) + buf, err := ioutil.ReadAll(in) + require.NoError(t, err) + require.NoError(t, in.Close()) + assert.Equal(t, contents, string(buf)) +} + +func newFileLength(t *testing.T, r *fstest.Run, c *Cache, remote string, length int) (contents string, obj fs.Object, item *Item) { + contents = random.String(length) + r.WriteObject(context.Background(), remote, contents, time.Now()) + item, _ = c.get(remote) + obj, err := r.Fremote.NewObject(context.Background(), remote) + require.NoError(t, err) + return +} + +func newFile(t *testing.T, r *fstest.Run, c *Cache, remote string) (contents string, obj fs.Object, item *Item) { + return newFileLength(t, r, c, remote, 100) +} + +func TestItemExists(t *testing.T) { + _, c, cleanup := newItemTestCache(t) + defer cleanup() + item, _ := c.get("potato") + + assert.False(t, item.Exists()) + require.NoError(t, item.Open(nil)) + assert.True(t, item.Exists()) + require.NoError(t, item.Close(nil)) + assert.True(t, item.Exists()) + item.remove("test") + assert.False(t, item.Exists()) +} + +func TestItemGetSize(t *testing.T) { + r, c, cleanup := newItemTestCache(t) + defer cleanup() + item, _ := c.get("potato") + require.NoError(t, item.Open(nil)) + + size, err := item.GetSize() + require.NoError(t, err) + assert.Equal(t, int64(0), size) + + n, err := item.WriteAt([]byte("hello"), 0) + require.NoError(t, err) + assert.Equal(t, 5, n) + + size, err = item.GetSize() + require.NoError(t, err) + assert.Equal(t, int64(5), size) + + require.NoError(t, item.Close(nil)) + checkObject(t, r, "potato", "hello") +} + +func TestItemDirty(t *testing.T) { + r, c, cleanup := newItemTestCache(t) + defer cleanup() + item, _ := c.get("potato") + require.NoError(t, item.Open(nil)) + + assert.Equal(t, false, item.IsDirty()) + + n, err := item.WriteAt([]byte("hello"), 0) + require.NoError(t, err) + assert.Equal(t, 5, n) + + assert.Equal(t, true, item.IsDirty()) + + require.NoError(t, item.Close(nil)) + + // Sync writeback so expect clean here + assert.Equal(t, false, item.IsDirty()) + + item.Dirty() + + assert.Equal(t, true, item.IsDirty()) + checkObject(t, r, "potato", "hello") +} + +func TestItemSync(t *testing.T) { + _, c, cleanup := newItemTestCache(t) + defer cleanup() + item, _ := c.get("potato") + + require.Error(t, item.Sync()) + + require.NoError(t, item.Open(nil)) + + require.NoError(t, item.Sync()) + + require.NoError(t, item.Close(nil)) +} + +func TestItemTruncateNew(t *testing.T) { + r, c, cleanup := newItemTestCache(t) + defer cleanup() + item, _ := c.get("potato") + + require.Error(t, item.Truncate(0)) + + require.NoError(t, item.Open(nil)) + + require.NoError(t, item.Truncate(100)) + + size, err := item.GetSize() + require.NoError(t, err) + assert.Equal(t, int64(100), size) + + // Check the Close callback works + callbackCalled := false + callback := func(o fs.Object) { + callbackCalled = true + assert.Equal(t, "potato", o.Remote()) + assert.Equal(t, int64(100), o.Size()) + } + require.NoError(t, item.Close(callback)) + assert.True(t, callbackCalled) + + checkObject(t, r, "potato", zeroes) +} + +func TestItemTruncateExisting(t *testing.T) { + r, c, cleanup := newItemTestCache(t) + defer cleanup() + + contents, obj, item := newFile(t, r, c, "existing") + + require.Error(t, item.Truncate(40)) + checkObject(t, r, "existing", contents) + + require.NoError(t, item.Open(obj)) + + require.NoError(t, item.Truncate(40)) + + require.NoError(t, item.Truncate(60)) + + require.NoError(t, item.Close(nil)) + + checkObject(t, r, "existing", contents[:40]+zeroes[:20]) +} + +func TestItemReadAt(t *testing.T) { + r, c, cleanup := newItemTestCache(t) + defer cleanup() + + contents, obj, item := newFile(t, r, c, "existing") + buf := make([]byte, 10) + + _, err := item.ReadAt(buf, 10) + require.Error(t, err) + + require.NoError(t, item.Open(obj)) + + n, err := item.ReadAt(buf, 10) + assert.Equal(t, 10, n) + require.NoError(t, err) + assert.Equal(t, contents[10:20], string(buf[:n])) + + n, err = item.ReadAt(buf, 95) + assert.Equal(t, 5, n) + assert.Equal(t, io.EOF, err) + assert.Equal(t, contents[95:], string(buf[:n])) + + n, err = item.ReadAt(buf, 1000) + assert.Equal(t, 0, n) + assert.Equal(t, io.EOF, err) + assert.Equal(t, contents[:0], string(buf[:n])) + + n, err = item.ReadAt(buf, -1) + assert.Equal(t, 0, n) + assert.Equal(t, io.EOF, err) + assert.Equal(t, contents[:0], string(buf[:n])) + + require.NoError(t, item.Close(nil)) +} + +func TestItemWriteAtNew(t *testing.T) { + r, c, cleanup := newItemTestCache(t) + defer cleanup() + item, _ := c.get("potato") + buf := make([]byte, 10) + + _, err := item.WriteAt(buf, 10) + require.Error(t, err) + + require.NoError(t, item.Open(nil)) + + assert.Equal(t, int64(0), item.getDiskSize()) + + n, err := item.WriteAt([]byte("HELLO"), 10) + require.NoError(t, err) + assert.Equal(t, 5, n) + + // FIXME we account for the sparse data we've "written" to + // disk here so this is actually 5 bytes higher than expected + assert.Equal(t, int64(15), item.getDiskSize()) + + n, err = item.WriteAt([]byte("THEND"), 20) + require.NoError(t, err) + assert.Equal(t, 5, n) + + assert.Equal(t, int64(25), item.getDiskSize()) + + require.NoError(t, item.Close(nil)) + + checkObject(t, r, "potato", zeroes[:10]+"HELLO"+zeroes[:5]+"THEND") +} + +func TestItemWriteAtExisting(t *testing.T) { + r, c, cleanup := newItemTestCache(t) + defer cleanup() + + contents, obj, item := newFile(t, r, c, "existing") + + require.NoError(t, item.Open(obj)) + + n, err := item.WriteAt([]byte("HELLO"), 10) + require.NoError(t, err) + assert.Equal(t, 5, n) + + n, err = item.WriteAt([]byte("THEND"), 95) + require.NoError(t, err) + assert.Equal(t, 5, n) + + n, err = item.WriteAt([]byte("THEVERYEND"), 120) + require.NoError(t, err) + assert.Equal(t, 10, n) + + require.NoError(t, item.Close(nil)) + + checkObject(t, r, "existing", contents[:10]+"HELLO"+contents[15:95]+"THEND"+zeroes[:20]+"THEVERYEND") +} + +func TestItemLoadMeta(t *testing.T) { + r, c, cleanup := newItemTestCache(t) + defer cleanup() + + contents, obj, item := newFile(t, r, c, "existing") + _ = contents + + // Open the object to create metadata for it + require.NoError(t, item.Open(obj)) + require.NoError(t, item.Close(nil)) + info := item.info + + // Remove the item from the cache + c.mu.Lock() + delete(c.item, item.name) + c.mu.Unlock() + + // Reload the item so we have to load the metadata + item2, _ := c._get("existing") + require.NoError(t, item2.Open(obj)) + info2 := item.info + require.NoError(t, item2.Close(nil)) + + // Check that the item is different + assert.NotEqual(t, item, item2) + // ... but the info is the same + assert.Equal(t, info, info2) +} + +func TestItemReload(t *testing.T) { + r, c, cleanup := newItemTestCache(t) + defer cleanup() + + contents, obj, item := newFile(t, r, c, "existing") + _ = contents + + // Open the object to create metadata for it + require.NoError(t, item.Open(obj)) + + // Make it dirty + n, err := item.WriteAt([]byte("THEENDMYFRIEND"), 95) + require.NoError(t, err) + assert.Equal(t, 14, n) + assert.True(t, item.IsDirty()) + + // Close the file to pacify Windows, but don't call item.Close() + item.mu.Lock() + require.NoError(t, item.fd.Close()) + item.fd = nil + item.mu.Unlock() + + // Remove the item from the cache + c.mu.Lock() + delete(c.item, item.name) + c.mu.Unlock() + + // Reload the item so we have to load the metadata and restart + // the transfer + item2, _ := c._get("existing") + require.NoError(t, item2.reload(context.Background())) + assert.False(t, item2.IsDirty()) + + // Check that the item is different + assert.NotEqual(t, item, item2) + + // And check the contents got written back to the remote + checkObject(t, r, "existing", contents[:95]+"THEENDMYFRIEND") +} + +func TestItemReloadRemoteGone(t *testing.T) { + r, c, cleanup := newItemTestCache(t) + defer cleanup() + + contents, obj, item := newFile(t, r, c, "existing") + _ = contents + + // Open the object to create metadata for it + require.NoError(t, item.Open(obj)) + + size, err := item.GetSize() + require.NoError(t, err) + assert.Equal(t, int64(100), size) + + // Read something to instantiate the cache file + buf := make([]byte, 10) + _, err = item.ReadAt(buf, 10) + require.NoError(t, err) + + // Test cache file present + _, err = os.Stat(item.c.toOSPath(item.name)) + require.NoError(t, err) + + require.NoError(t, item.Close(nil)) + + // Remove the remote object + require.NoError(t, obj.Remove(context.Background())) + + // Re-open with no object + require.NoError(t, item.Open(nil)) + + // Check size is now 0 + size, err = item.GetSize() + require.NoError(t, err) + assert.Equal(t, int64(0), size) + + // Test cache file is now empty + fi, err := os.Stat(item.c.toOSPath(item.name)) + require.NoError(t, err) + assert.Equal(t, int64(0), fi.Size()) + + require.NoError(t, item.Close(nil)) +} + +func TestItemReloadCacheStale(t *testing.T) { + r, c, cleanup := newItemTestCache(t) + defer cleanup() + + contents, obj, item := newFile(t, r, c, "existing") + + // Open the object to create metadata for it + require.NoError(t, item.Open(obj)) + + size, err := item.GetSize() + require.NoError(t, err) + assert.Equal(t, int64(100), size) + + // Read something to instantiate the cache file + buf := make([]byte, 10) + _, err = item.ReadAt(buf, 10) + require.NoError(t, err) + + // Test cache file present + _, err = os.Stat(item.c.toOSPath(item.name)) + require.NoError(t, err) + + require.NoError(t, item.Close(nil)) + + // Update the remote to something different + contents2, obj, item := newFileLength(t, r, c, "existing", 110) + assert.NotEqual(t, contents, contents2) + + // Re-open with updated object + require.NoError(t, item.Open(obj)) + + // Check size is now 110 + size, err = item.GetSize() + require.NoError(t, err) + assert.Equal(t, int64(110), size) + + // Test cache file is now correct size + fi, err := os.Stat(item.c.toOSPath(item.name)) + require.NoError(t, err) + assert.Equal(t, int64(110), fi.Size()) + + // Write to the file to make it dirty + // This checks we aren't re-using stale data + n, err := item.WriteAt([]byte("HELLO"), 0) + require.NoError(t, err) + assert.Equal(t, 5, n) + assert.Equal(t, true, item.IsDirty()) + + require.NoError(t, item.Close(nil)) + + // Now check with all that swizzling stuff around that the + // object is correct + + checkObject(t, r, "existing", "HELLO"+contents2[5:]) +} + +func TestItemReadWrite(t *testing.T) { + r, c, cleanup := newItemTestCache(t) + defer cleanup() + const ( + size = 50*1024*1024 + 123 + fileName = "large" + ) + + item, _ := c.get(fileName) + require.NoError(t, item.Open(nil)) + + // Create the test file + in := readers.NewPatternReader(size) + buf := make([]byte, 1024*1024) + buf2 := make([]byte, 1024*1024) + offset := int64(0) + for { + n, err := in.Read(buf) + n2, err2 := item.WriteAt(buf[:n], offset) + offset += int64(n2) + require.NoError(t, err2) + require.Equal(t, n, n2) + if err == io.EOF { + break + } + require.NoError(t, err) + } + + // Check it is the right size + readSize, err := item.GetSize() + require.NoError(t, err) + assert.Equal(t, int64(size), readSize) + + require.NoError(t, item.Close(nil)) + + assert.False(t, item.remove(fileName)) + + obj, err := r.Fremote.NewObject(context.Background(), fileName) + require.NoError(t, err) + assert.Equal(t, int64(size), obj.Size()) + + // read and check a block of size N at offset + // It returns eof true if the end of file has been reached + readCheckBuf := func(t *testing.T, in io.ReadSeeker, buf, buf2 []byte, item *Item, offset int64, N int) (n int, eof bool) { + what := fmt.Sprintf("buf=%p, buf2=%p, item=%p, offset=%d, N=%d", buf, buf2, item, offset, N) + n, err := item.ReadAt(buf, offset) + + _, err2 := in.Seek(offset, io.SeekStart) + require.NoError(t, err2, what) + n2, err2 := in.Read(buf2[:n]) + require.Equal(t, n, n2, what) + assert.Equal(t, buf[:n], buf2[:n2], what) + assert.Equal(t, buf[:n], buf2[:n2], what) + + if err == io.EOF { + return n, true + } + require.NoError(t, err, what) + require.NoError(t, err2, what) + return n, false + } + readCheck := func(t *testing.T, item *Item, offset int64, N int) (n int, eof bool) { + return readCheckBuf(t, in, buf, buf2, item, offset, N) + } + + // Read it back sequentially + t.Run("Sequential", func(t *testing.T) { + require.NoError(t, item.Open(obj)) + assert.False(t, item.present()) + offset := int64(0) + for { + n, eof := readCheck(t, item, offset, len(buf)) + offset += int64(n) + if eof { + break + } + } + assert.Equal(t, int64(size), offset) + require.NoError(t, item.Close(nil)) + assert.False(t, item.remove(fileName)) + }) + + // Read it back randomly + t.Run("Random", func(t *testing.T) { + require.NoError(t, item.Open(obj)) + assert.False(t, item.present()) + for !item.present() { + blockSize := rand.Intn(len(buf)) + offset := rand.Int63n(size+2*int64(blockSize)) - int64(blockSize) + if offset < 0 { + offset = 0 + } + _, _ = readCheck(t, item, offset, blockSize) + } + require.NoError(t, item.Close(nil)) + assert.False(t, item.remove(fileName)) + }) + + // Read it back randomly concurently + t.Run("RandomConcurrent", func(t *testing.T) { + require.NoError(t, item.Open(obj)) + assert.False(t, item.present()) + var wg sync.WaitGroup + for i := 0; i < 8; i++ { + wg.Add(1) + go func() { + defer wg.Done() + in := readers.NewPatternReader(size) + buf := make([]byte, 1024*1024) + buf2 := make([]byte, 1024*1024) + for !item.present() { + blockSize := rand.Intn(len(buf)) + offset := rand.Int63n(size+2*int64(blockSize)) - int64(blockSize) + if offset < 0 { + offset = 0 + } + _, _ = readCheckBuf(t, in, buf, buf2, item, offset, blockSize) + } + }() + } + wg.Wait() + require.NoError(t, item.Close(nil)) + assert.False(t, item.remove(fileName)) + }) + + // Read it back in reverse which creates the maximum number of + // downloaders + t.Run("Reverse", func(t *testing.T) { + require.NoError(t, item.Open(obj)) + assert.False(t, item.present()) + offset := int64(size) + for { + blockSize := len(buf) + offset -= int64(blockSize) + if offset < 0 { + offset = 0 + blockSize += int(offset) + } + _, _ = readCheck(t, item, offset, blockSize) + if offset == 0 { + break + } + } + require.NoError(t, item.Close(nil)) + assert.False(t, item.remove(fileName)) + }) +}