diff --git a/vfs/vfscache/item.go b/vfs/vfscache/item.go index 702847719..b76688db1 100644 --- a/vfs/vfscache/item.go +++ b/vfs/vfscache/item.go @@ -737,7 +737,7 @@ func (item *Item) Close(storeFn StoreFn) (err error) { item.c.writeback.SetID(&item.writeBackID) id := item.writeBackID item.mu.Unlock() - item.c.writeback.Add(id, item.name, item.modified, func(ctx context.Context) error { + item.c.writeback.Add(id, item.name, item.info.Size, item.modified, func(ctx context.Context) error { return item.store(ctx, storeFn) }) item.mu.Lock() diff --git a/vfs/vfscache/writeback/writeback.go b/vfs/vfscache/writeback/writeback.go index 9801a2d3c..33c5616c2 100644 --- a/vfs/vfscache/writeback/writeback.go +++ b/vfs/vfscache/writeback/writeback.go @@ -62,6 +62,7 @@ func New(ctx context.Context, opt *vfscommon.Options) *WriteBack { // writeBack.mu must be held to manipulate this type writeBackItem struct { name string // name of the item so we don't have to read it from item + size int64 // size of the item so we don't have to read it from item id Handle // id of the item index int // index into the priority queue for update expiry time.Time // When this expires we will write it back @@ -135,10 +136,11 @@ func (wb *WriteBack) _newExpiry() time.Time { // make a new writeBackItem // // call with the lock held -func (wb *WriteBack) _newItem(id Handle, name string) *writeBackItem { +func (wb *WriteBack) _newItem(id Handle, name string, size int64) *writeBackItem { wb.SetID(&id) wbItem := &writeBackItem{ name: name, + size: size, expiry: wb._newExpiry(), delay: time.Duration(wb.opt.WriteBack), id: id, @@ -256,13 +258,13 @@ func (wb *WriteBack) SetID(pid *Handle) { // // If modified is false then it it doesn't cancel a pending upload if // there is one as there is no need. -func (wb *WriteBack) Add(id Handle, name string, modified bool, putFn PutFn) Handle { +func (wb *WriteBack) Add(id Handle, name string, size int64, modified bool, putFn PutFn) Handle { wb.mu.Lock() defer wb.mu.Unlock() wbItem, ok := wb.lookup[id] if !ok { - wbItem = wb._newItem(id, name) + wbItem = wb._newItem(id, name, size) } else { if wbItem.uploading && modified { // We are uploading already so cancel the upload @@ -272,6 +274,7 @@ func (wb *WriteBack) Add(id Handle, name string, modified bool, putFn PutFn) Han wb.items._update(wbItem, wb._newExpiry()) } wbItem.putFn = putFn + wbItem.size = size wb._resetTimer() return wbItem.id } diff --git a/vfs/vfscache/writeback/writeback_test.go b/vfs/vfscache/writeback/writeback_test.go index 64645eccf..21d8bae7e 100644 --- a/vfs/vfscache/writeback/writeback_test.go +++ b/vfs/vfscache/writeback/writeback_test.go @@ -122,15 +122,15 @@ func TestWriteBackItemCRUD(t *testing.T) { // _peekItem empty assert.Nil(t, wb._peekItem()) - wbItem1 := wb._newItem(0, "one") + wbItem1 := wb._newItem(0, "one", 10) checkOnHeap(t, wb, wbItem1) checkInLookup(t, wb, wbItem1) - wbItem2 := wb._newItem(0, "two") + wbItem2 := wb._newItem(0, "two", 10) checkOnHeap(t, wb, wbItem2) checkInLookup(t, wb, wbItem2) - wbItem3 := wb._newItem(0, "three") + wbItem3 := wb._newItem(0, "three", 10) checkOnHeap(t, wb, wbItem3) checkInLookup(t, wb, wbItem3) @@ -201,7 +201,7 @@ func TestWriteBackResetTimer(t *testing.T) { // Check timer is stopped assertTimerRunning(t, wb, false) - _ = wb._newItem(0, "three") + _ = wb._newItem(0, "three", 10) // Reset the timer on an queue with stuff wb._resetTimer() @@ -297,7 +297,7 @@ func TestWriteBackAddOK(t *testing.T) { wb.SetID(&inID) assert.Equal(t, Handle(1), inID) - id := wb.Add(inID, "one", true, pi.put) + id := wb.Add(inID, "one", 10, true, pi.put) assert.Equal(t, inID, id) wbItem := wb.lookup[id] checkOnHeap(t, wb, wbItem) @@ -321,7 +321,7 @@ func TestWriteBackAddFailRetry(t *testing.T) { pi := newPutItem(t) - id := wb.Add(0, "one", true, pi.put) + id := wb.Add(0, "one", 10, true, pi.put) wbItem := wb.lookup[id] checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) @@ -354,8 +354,9 @@ func TestWriteBackAddUpdate(t *testing.T) { pi := newPutItem(t) - id := wb.Add(0, "one", true, pi.put) + id := wb.Add(0, "one", 10, true, pi.put) wbItem := wb.lookup[id] + assert.Equal(t, int64(10), wbItem.size) // check size checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) assert.Equal(t, "one", wb.string(t)) @@ -367,9 +368,10 @@ func TestWriteBackAddUpdate(t *testing.T) { // Now the upload has started add another one pi2 := newPutItem(t) - id2 := wb.Add(id, "one", true, pi2.put) + id2 := wb.Add(id, "one", 20, true, pi2.put) assert.Equal(t, id, id2) - checkOnHeap(t, wb, wbItem) // object awaiting writeback time + assert.Equal(t, int64(20), wbItem.size) // check size has changed + checkOnHeap(t, wb, wbItem) // object awaiting writeback time checkInLookup(t, wb, wbItem) // check the previous transfer was cancelled @@ -393,7 +395,7 @@ func TestWriteBackAddUpdateNotModified(t *testing.T) { pi := newPutItem(t) - id := wb.Add(0, "one", false, pi.put) + id := wb.Add(0, "one", 10, false, pi.put) wbItem := wb.lookup[id] checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) @@ -406,7 +408,7 @@ func TestWriteBackAddUpdateNotModified(t *testing.T) { // Now the upload has started add another one pi2 := newPutItem(t) - id2 := wb.Add(id, "one", false, pi2.put) + id2 := wb.Add(id, "one", 10, false, pi2.put) assert.Equal(t, id, id2) checkNotOnHeap(t, wb, wbItem) // object still being transferred checkInLookup(t, wb, wbItem) @@ -432,7 +434,7 @@ func TestWriteBackAddUpdateNotStarted(t *testing.T) { pi := newPutItem(t) - id := wb.Add(0, "one", true, pi.put) + id := wb.Add(0, "one", 10, true, pi.put) wbItem := wb.lookup[id] checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) @@ -441,7 +443,7 @@ func TestWriteBackAddUpdateNotStarted(t *testing.T) { // Immediately add another upload before the first has started pi2 := newPutItem(t) - id2 := wb.Add(id, "one", true, pi2.put) + id2 := wb.Add(id, "one", 10, true, pi2.put) assert.Equal(t, id, id2) checkOnHeap(t, wb, wbItem) // object still awaiting transfer checkInLookup(t, wb, wbItem) @@ -470,7 +472,7 @@ func TestWriteBackGetStats(t *testing.T) { pi := newPutItem(t) - wb.Add(0, "one", true, pi.put) + wb.Add(0, "one", 10, true, pi.put) inProgress, queued := wb.Stats() assert.Equal(t, queued, 1) @@ -506,7 +508,7 @@ func TestWriteBackMaxQueue(t *testing.T) { for i := 0; i < toTransfer; i++ { pi := newPutItem(t) pis = append(pis, pi) - wb.Add(0, fmt.Sprintf("number%d", 1), true, pi.put) + wb.Add(0, fmt.Sprintf("number%d", 1), 10, true, pi.put) } inProgress, queued := wb.Stats() @@ -551,7 +553,7 @@ func TestWriteBackRename(t *testing.T) { // add item pi1 := newPutItem(t) - id := wb.Add(0, "one", true, pi1.put) + id := wb.Add(0, "one", 10, true, pi1.put) wbItem := wb.lookup[id] checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) @@ -566,7 +568,7 @@ func TestWriteBackRename(t *testing.T) { // add item pi2 := newPutItem(t) - id = wb.Add(id, "two", true, pi2.put) + id = wb.Add(id, "two", 10, true, pi2.put) wbItem = wb.lookup[id] checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem) @@ -591,9 +593,9 @@ func TestWriteBackRenameDuplicates(t *testing.T) { wb, cancel := newTestWriteBack(t) defer cancel() - // add item "one" + // add item "one", 10 pi1 := newPutItem(t) - id1 := wb.Add(0, "one", true, pi1.put) + id1 := wb.Add(0, "one", 10, true, pi1.put) wbItem1 := wb.lookup[id1] checkOnHeap(t, wb, wbItem1) checkInLookup(t, wb, wbItem1) @@ -605,7 +607,7 @@ func TestWriteBackRenameDuplicates(t *testing.T) { // add item "two" pi2 := newPutItem(t) - id2 := wb.Add(0, "two", true, pi2.put) + id2 := wb.Add(0, "two", 10, true, pi2.put) wbItem2 := wb.lookup[id2] checkOnHeap(t, wb, wbItem2) checkInLookup(t, wb, wbItem2) @@ -641,7 +643,7 @@ func TestWriteBackCancelUpload(t *testing.T) { // add item pi := newPutItem(t) - id := wb.Add(0, "one", true, pi.put) + id := wb.Add(0, "one", 10, true, pi.put) wbItem := wb.lookup[id] checkOnHeap(t, wb, wbItem) checkInLookup(t, wb, wbItem)