diff --git a/vfs/file.go b/vfs/file.go index ee526b937..891c6c1cc 100644 --- a/vfs/file.go +++ b/vfs/file.go @@ -558,6 +558,7 @@ func (f *File) Remove() (err error) { if wasWriting { // Ignore error deleting file if was writing it as it may not be uploaded yet err = nil + fs.Debugf(f._path(), "Ignoring File.Remove file error as uploading: %v", err) } else { fs.Debugf(f._path(), "File.Remove file error: %v", err) } diff --git a/vfs/vfscache/cache.go b/vfs/vfscache/cache.go index 296b6c4c5..0c2f213fc 100644 --- a/vfs/vfscache/cache.go +++ b/vfs/vfscache/cache.go @@ -315,9 +315,14 @@ func (c *Cache) Rename(name string, newName string, newObj fs.Object) (err error func (c *Cache) Remove(name string) (wasWriting bool) { name = clean(name) c.mu.Lock() - item, _ := c._get(name) - delete(c.item, name) + item := c.item[name] + if item != nil { + delete(c.item, name) + } c.mu.Unlock() + if item == nil { + return false + } return item.remove("file deleted") } diff --git a/vfs/vfscache/item.go b/vfs/vfscache/item.go index 09e50e3c5..db1a4e596 100644 --- a/vfs/vfscache/item.go +++ b/vfs/vfscache/item.go @@ -35,6 +35,11 @@ import ( // be taken before Item.mu. downloader may call into Item but Item may // **not** call downloader methods with Item.mu held +// NB Item and writeback are tightly linked so it is necessary to +// have a total lock ordering between them. writeback.mu must always +// be taken before Item.mu. writeback may call into Item but Item may +// **not** call writeback methods with Item.mu held + // Item is stored in the item map // // The Info field is written to the backing store to store status @@ -49,6 +54,7 @@ type Item struct { o fs.Object // object we are caching - may be nil fd *os.File // handle we are using to read and write to the file metaDirty bool // set if the info needs writeback + modified bool // set if the file has been modified since the last Open info Info // info about the file to persist to backing store } @@ -60,7 +66,7 @@ type Info struct { Size int64 // size of the file Rs ranges.Ranges // which parts of the file are present Fingerprint string // fingerprint of remote object - Dirty bool // set if the backing file has been modifed + Dirty bool // set if the backing file has been modified } // Items are a slice of *Item ordered by ATime @@ -111,7 +117,7 @@ func newItem(c *Cache, name string) (item *Item) { if os.IsNotExist(statErr) { item._removeMeta("cache file doesn't exist") } else { - item._remove(fmt.Sprintf("failed to stat cache file: %v", statErr)) + item.remove(fmt.Sprintf("failed to stat cache file: %v", statErr)) } } @@ -120,7 +126,7 @@ func newItem(c *Cache, name string) (item *Item) { if !exists { item._removeFile("metadata doesn't exist") } else if err != nil { - item._remove(fmt.Sprintf("failed to load metadata: %v", err)) + item.remove(fmt.Sprintf("failed to load metadata: %v", err)) } // Get size estimate (which is best we can do until Open() called) @@ -361,6 +367,10 @@ func (item *Item) _dirty() { item.info.ModTime = time.Now() item.info.ATime = item.info.ModTime item.metaDirty = true + if !item.modified { + item.modified = true + go item.c.writeback.cancelUpload(item) + } if !item.info.Dirty { item.info.Dirty = true err := item._save() @@ -410,6 +420,7 @@ func (item *Item) Open(o fs.Object) (err error) { if item.fd != nil { return errors.New("vfs cache item: internal error: didn't Close file") } + item.modified = false fd, err := file.OpenFile(osPath, os.O_RDWR, 0600) if err != nil { @@ -488,6 +499,7 @@ func (item *Item) _store(ctx context.Context, storeFn StoreFn) (err error) { fs.Errorf(item.name, "Failed to write metadata file: %v", err) } if storeFn != nil && item.o != nil { + fs.Debugf(item.name, "writeback object to VFS layer") // Write the object back to the VFS layer as last // thing we do with mutex unlocked item.mu.Unlock() @@ -595,10 +607,15 @@ func (item *Item) Close(storeFn StoreFn) (err error) { checkErr(item._store(context.Background(), storeFn)) } else { // asynchronous writeback - item.c.writeback.add(item, item.name, storeFn) + item.mu.Unlock() + item.c.writeback.add(item, item.name, item.modified, storeFn) + item.mu.Lock() } } + // mark as not modified now we have uploaded or queued for upload + item.modified = false + return err } @@ -711,7 +728,9 @@ func (item *Item) _removeMeta(reason string) { // call with lock held func (item *Item) _remove(reason string) (wasWriting bool) { // Cancel writeback, if any - wasWriting = item.c.writeback.cancel(item) + item.mu.Unlock() + wasWriting = item.c.writeback.remove(item) + item.mu.Lock() item.info.clean() item.metaDirty = false item._removeFile(reason) diff --git a/vfs/vfscache/writeback.go b/vfs/vfscache/writeback.go index 3d02a8237..bab5029d2 100644 --- a/vfs/vfscache/writeback.go +++ b/vfs/vfscache/writeback.go @@ -9,6 +9,7 @@ import ( "time" "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/fserrors" "github.com/rclone/rclone/vfs/vfscommon" ) @@ -56,7 +57,8 @@ type writeBackItem struct { index int // index into the priority queue for update item *Item // Item that needs writeback expiry time.Time // When this expires we will write it back - uploading bool // If we are uploading the item + uploading bool // True if item is being processed by upload() method + onHeap bool // true if this item is on the items heap cancel context.CancelFunc // To cancel the upload with done chan struct{} // closed when the cancellation completes storeFn StoreFn // To write the object back with @@ -149,21 +151,29 @@ func (wb *writeBack) _delItem(wbItem *writeBackItem) { // // call with the lock held func (wb *writeBack) _popItem() (wbItem *writeBackItem) { - return heap.Pop(&wb.items).(*writeBackItem) + wbItem = heap.Pop(&wb.items).(*writeBackItem) + wbItem.onHeap = false + return wbItem } // push a writeBackItem onto the items heap // // call with the lock held func (wb *writeBack) _pushItem(wbItem *writeBackItem) { - heap.Push(&wb.items, wbItem) + if !wbItem.onHeap { + heap.Push(&wb.items, wbItem) + wbItem.onHeap = true + } } // remove a writeBackItem from the items heap // // call with the lock held func (wb *writeBack) _removeItem(wbItem *writeBackItem) { - heap.Remove(&wb.items, wbItem.index) + if wbItem.onHeap { + heap.Remove(&wb.items, wbItem.index) + wbItem.onHeap = false + } } // peek the oldest writeBackItem - may be nil @@ -191,8 +201,10 @@ func (wb *writeBack) _resetTimer() { } // add adds an item to the writeback queue or resets its timer if it -// is already there -func (wb *writeBack) add(item *Item, name string, storeFn StoreFn) { +// is already there. +// +// if modified is false then it it doesn't a pending upload +func (wb *writeBack) add(item *Item, name string, modified bool, storeFn StoreFn) { wb.mu.Lock() defer wb.mu.Unlock() @@ -200,7 +212,7 @@ func (wb *writeBack) add(item *Item, name string, storeFn StoreFn) { if !ok { wbItem = wb._newItem(item, name) } else { - if wbItem.uploading { + if wbItem.uploading && modified { // We are uploading already so cancel the upload wb._cancelUpload(wbItem) } @@ -211,21 +223,21 @@ func (wb *writeBack) add(item *Item, name string, storeFn StoreFn) { wb._resetTimer() } -// cancel a writeback if there is one -func (wb *writeBack) cancel(item *Item) (found bool) { +// Call when a file is removed. This cancels a writeback if there is +// one and doesn't return the item to the queue. +func (wb *writeBack) remove(item *Item) (found bool) { wb.mu.Lock() defer wb.mu.Unlock() wbItem, found := wb.lookup[item] if found { - fs.Debugf(wbItem.name, "vfs cache: cancelling writeback") + fs.Debugf(wbItem.name, "vfs cache: cancelling writeback (uploading %v) %p item %p", wbItem.uploading, wbItem, wbItem.item) if wbItem.uploading { // We are uploading already so cancel the upload wb._cancelUpload(wbItem) - } else { - // Remove the item from the heap - wb._removeItem(wbItem) } + // Remove the item from the heap + wb._removeItem(wbItem) // Remove the item from the lookup map wb._delItem(wbItem) } @@ -247,6 +259,8 @@ func (wb *writeBack) _kickUploader() { } // upload the item - called as a goroutine +// +// uploading will have been incremented here already func (wb *writeBack) upload(ctx context.Context, wbItem *writeBackItem) { wb.mu.Lock() defer wb.mu.Unlock() @@ -258,10 +272,10 @@ func (wb *writeBack) upload(ctx context.Context, wbItem *writeBackItem) { wb.mu.Lock() wbItem.cancel() // cancel context to release resources since store done - if wbItem.uploading { - wbItem.uploading = false - wb.uploads-- - } + + fs.Debugf(wbItem.name, "uploading = false %p item %p", wbItem, wbItem.item) + wbItem.uploading = false + wb.uploads-- if err != nil { // FIXME should this have a max number of transfer attempts? @@ -269,7 +283,13 @@ func (wb *writeBack) upload(ctx context.Context, wbItem *writeBackItem) { if wbItem.delay > maxUploadDelay { wbItem.delay = maxUploadDelay } - fs.Errorf(wbItem.name, "vfs cache: failed to upload try #%d, will retry in %v: %v", wbItem.tries, wbItem.delay, err) + if _, uerr := fserrors.Cause(err); uerr == context.Canceled { + fs.Infof(wbItem.name, "vfs cache: upload canceled sucessfully") + // Upload was cancelled so reset timer + wbItem.delay = wb.opt.WriteBack + } else { + fs.Errorf(wbItem.name, "vfs cache: failed to upload try #%d, will retry in %v: %v", wbItem.tries, wbItem.delay, err) + } // push the item back on the queue for retry wb._pushItem(wbItem) wb.items._update(wbItem, time.Now().Add(wbItem.delay)) @@ -282,26 +302,42 @@ func (wb *writeBack) upload(ctx context.Context, wbItem *writeBackItem) { close(wbItem.done) } -// cancel the upload +// cancel the upload - the item should be on the heap after this returns // // call with lock held func (wb *writeBack) _cancelUpload(wbItem *writeBackItem) { if !wbItem.uploading { return } - fs.Debugf(wbItem.name, "vfs cache: cancelling upload") + fs.Infof(wbItem.name, "vfs cache: cancelling upload") if wbItem.cancel != nil { // Cancel the upload - this may or may not be effective wbItem.cancel() // wait for the uploader to finish + // + // we need to wait without the lock otherwise the + // background part will never run. + wb.mu.Unlock() <-wbItem.done - } - if wbItem.uploading { - wbItem.uploading = false - wb.uploads-- + wb.mu.Lock() } // uploading items are not on the heap so add them back wb._pushItem(wbItem) + fs.Infof(wbItem.name, "vfs cache: cancelled upload") +} + +// cancelUpload cancels the upload of the item if there is one in progress +// +// it returns true if there was an upload in progress +func (wb *writeBack) cancelUpload(item *Item) bool { + wb.mu.Lock() + defer wb.mu.Unlock() + wbItem, ok := wb.lookup[item] + if !ok || !wbItem.uploading { + return false + } + wb._cancelUpload(wbItem) + return true } // this uploads as many items as possible @@ -320,6 +356,7 @@ func (wb *writeBack) processItems(ctx context.Context) { resetTimer = true // Pop the item, mark as uploading and start the uploader wbItem = wb._popItem() + fs.Debugf(wbItem.name, "uploading = true %p item %p", wbItem, wbItem.item) wbItem.uploading = true wb.uploads++ newCtx, cancel := context.WithCancel(ctx)