diff --git a/vfs/rc.go b/vfs/rc.go index 12b7df051..72060984b 100644 --- a/vfs/rc.go +++ b/vfs/rc.go @@ -11,6 +11,7 @@ import ( "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/cache" "github.com/rclone/rclone/fs/rc" + "github.com/rclone/rclone/vfs/vfscache/writeback" ) const getVFSHelp = ` @@ -485,3 +486,63 @@ func rcQueue(ctx context.Context, in rc.Params) (out rc.Params, err error) { } return vfs.cache.Queue(), nil } + +func init() { + rc.Add(rc.Call{ + Path: "vfs/queue-set-expiry", + Title: "Set the expiry time for an item queued for upload.", + Help: strings.ReplaceAll(` + +Use this to adjust the |expiry| time for an item in the upload queue. +You will need to read the |id| of the item using |vfs/queue| before +using this call. + +You can then set |expiry| to a floating point number of seconds from +now when the item is eligible for upload. If you want the item to be +uploaded as soon as possible then set it to a large negative number (eg +-1000000000). If you want the upload of the item to be delayed +for a long time then set it to a large positive number. + +Setting the |expiry| of an item which has already has started uploading +will have no effect - the item will carry on being uploaded. + +This will return an error if called with |--vfs-cache-mode| off or if +the |id| passed is not found. + +This takes the following parameters + +- |fs| - select the VFS in use (optional) +- |id| - a numeric ID as returned from |vfs/queue| +- |expiry| - a new expiry time as floating point seconds + +This returns an empty result on success, or an error. + +`, "|", "`") + getVFSHelp, + Fn: rcQueueSetExpiry, + }) +} + +func rcQueueSetExpiry(ctx context.Context, in rc.Params) (out rc.Params, err error) { + vfs, err := getVFS(in) + if err != nil { + return nil, err + } + if vfs.cache == nil { + return nil, rc.NewErrParamInvalid(errors.New("can't call this unless using the VFS cache")) + } + + // Read input values + id, err := in.GetInt64("id") + if err != nil { + return nil, err + } + expiry, err := in.GetFloat64("expiry") + if err != nil { + return nil, err + } + + // Set expiry + expiryTime := time.Now().Add(time.Duration(float64(time.Second) * expiry)) + err = vfs.cache.QueueSetExpiry(writeback.Handle(id), expiryTime) + return nil, err +} diff --git a/vfs/vfscache/cache.go b/vfs/vfscache/cache.go index dbd408cae..a0af31bd1 100644 --- a/vfs/vfscache/cache.go +++ b/vfs/vfscache/cache.go @@ -177,6 +177,11 @@ func (c *Cache) Queue() (out rc.Params) { return out } +// QueueSetExpiry updates the expiry of a single item in the upload queue +func (c *Cache) QueueSetExpiry(id writeback.Handle, expiry time.Time) error { + return c.writeback.SetExpiry(id, expiry) +} + // createDir creates a directory path, along with any necessary parents func createDir(dir string) error { return file.MkdirAll(dir, 0700) diff --git a/vfs/vfscache/cache_test.go b/vfs/vfscache/cache_test.go index 20ebee2e7..9f8abfa64 100644 --- a/vfs/vfscache/cache_test.go +++ b/vfs/vfscache/cache_test.go @@ -741,3 +741,13 @@ func TestCacheQueue(t *testing.T) { _, ok := queue.([]writeback.QueueInfo) require.True(t, ok) } + +func TestCacheQueueSetExpiry(t *testing.T) { + _, c := newTestCache(t) + + // Check this returns the correct error when called so we know + // it is plumbed in correctly. The actual tests are done in + // writeback. + err := c.QueueSetExpiry(123123, time.Now()) + assert.Equal(t, writeback.ErrorIDNotFound, err) +} diff --git a/vfs/vfscache/writeback/writeback.go b/vfs/vfscache/writeback/writeback.go index cf3380d4f..6212bfec9 100644 --- a/vfs/vfscache/writeback/writeback.go +++ b/vfs/vfscache/writeback/writeback.go @@ -511,3 +511,26 @@ func (wb *WriteBack) Queue() []QueueInfo { return items } + +// ErrorIDNotFound is returned from SetExpiry when the item is not found +var ErrorIDNotFound = errors.New("id not found in queue") + +// SetExpiry sets the expiry time for an item in the writeback queue. +// +// id should be as returned from the Queue call +// +// If the item isn't found then it will return ErrorIDNotFound +func (wb *WriteBack) SetExpiry(id Handle, expiry time.Time) error { + wb.mu.Lock() + defer wb.mu.Unlock() + + wbItem, ok := wb.lookup[id] + if !ok { + return ErrorIDNotFound + } + + // Update the expiry with the user requested value + wb.items._update(wbItem, expiry) + wb._resetTimer() + return nil +} diff --git a/vfs/vfscache/writeback/writeback_test.go b/vfs/vfscache/writeback/writeback_test.go index b862e283e..872d19e71 100644 --- a/vfs/vfscache/writeback/writeback_test.go +++ b/vfs/vfscache/writeback/writeback_test.go @@ -545,6 +545,50 @@ func TestWriteBackQueue(t *testing.T) { assert.Equal(t, []QueueInfo{}, queue) } +func TestWriteBackSetExpiry(t *testing.T) { + wb, cancel := newTestWriteBack(t) + defer cancel() + + err := wb.SetExpiry(123123123, time.Now()) + assert.Equal(t, ErrorIDNotFound, err) + + pi := newPutItem(t) + + id := wb.Add(0, "one", 10, true, pi.put) + wbItem := wb.lookup[id] + + // get the expiry time with locking so we don't cause races + getExpiry := func() time.Time { + wb.mu.Lock() + defer wb.mu.Unlock() + return wbItem.expiry + } + + expiry := time.Until(getExpiry()).Seconds() + assert.Greater(t, expiry, 0.0) + assert.Less(t, expiry, 1.0) + + newExpiry := time.Now().Add(100 * time.Second) + require.NoError(t, wb.SetExpiry(wbItem.id, newExpiry)) + assert.Equal(t, newExpiry, getExpiry()) + + // This starts the transfer + newExpiry = time.Now().Add(-100 * time.Second) + require.NoError(t, wb.SetExpiry(wbItem.id, newExpiry)) + assert.Equal(t, newExpiry, getExpiry()) + + <-pi.started + + expiry = time.Until(getExpiry()).Seconds() + assert.LessOrEqual(t, expiry, -100.0) + + pi.finish(nil) // transfer successful + waitUntilNoTransfers(t, wb) + + expiry = time.Until(getExpiry()).Seconds() + assert.LessOrEqual(t, expiry, -100.0) +} + // Test queuing more than fs.Config.Transfers func TestWriteBackMaxQueue(t *testing.T) { ctx := context.Background()