rc: add vfs/queue-set-expiry to adjust expiry of items in the VFS queue

This commit is contained in:
Nick Craig-Wood 2024-06-20 17:00:41 +01:00
parent 59acb9dfa9
commit 16e0245a8e
5 changed files with 143 additions and 0 deletions

View File

@ -11,6 +11,7 @@ import (
"github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/cache" "github.com/rclone/rclone/fs/cache"
"github.com/rclone/rclone/fs/rc" "github.com/rclone/rclone/fs/rc"
"github.com/rclone/rclone/vfs/vfscache/writeback"
) )
const getVFSHelp = ` const getVFSHelp = `
@ -485,3 +486,63 @@ func rcQueue(ctx context.Context, in rc.Params) (out rc.Params, err error) {
} }
return vfs.cache.Queue(), nil return vfs.cache.Queue(), nil
} }
func init() {
rc.Add(rc.Call{
Path: "vfs/queue-set-expiry",
Title: "Set the expiry time for an item queued for upload.",
Help: strings.ReplaceAll(`
Use this to adjust the |expiry| time for an item in the upload queue.
You will need to read the |id| of the item using |vfs/queue| before
using this call.
You can then set |expiry| to a floating point number of seconds from
now when the item is eligible for upload. If you want the item to be
uploaded as soon as possible then set it to a large negative number (eg
-1000000000). If you want the upload of the item to be delayed
for a long time then set it to a large positive number.
Setting the |expiry| of an item which has already has started uploading
will have no effect - the item will carry on being uploaded.
This will return an error if called with |--vfs-cache-mode| off or if
the |id| passed is not found.
This takes the following parameters
- |fs| - select the VFS in use (optional)
- |id| - a numeric ID as returned from |vfs/queue|
- |expiry| - a new expiry time as floating point seconds
This returns an empty result on success, or an error.
`, "|", "`") + getVFSHelp,
Fn: rcQueueSetExpiry,
})
}
func rcQueueSetExpiry(ctx context.Context, in rc.Params) (out rc.Params, err error) {
vfs, err := getVFS(in)
if err != nil {
return nil, err
}
if vfs.cache == nil {
return nil, rc.NewErrParamInvalid(errors.New("can't call this unless using the VFS cache"))
}
// Read input values
id, err := in.GetInt64("id")
if err != nil {
return nil, err
}
expiry, err := in.GetFloat64("expiry")
if err != nil {
return nil, err
}
// Set expiry
expiryTime := time.Now().Add(time.Duration(float64(time.Second) * expiry))
err = vfs.cache.QueueSetExpiry(writeback.Handle(id), expiryTime)
return nil, err
}

View File

@ -177,6 +177,11 @@ func (c *Cache) Queue() (out rc.Params) {
return out return out
} }
// QueueSetExpiry updates the expiry of a single item in the upload queue
func (c *Cache) QueueSetExpiry(id writeback.Handle, expiry time.Time) error {
return c.writeback.SetExpiry(id, expiry)
}
// createDir creates a directory path, along with any necessary parents // createDir creates a directory path, along with any necessary parents
func createDir(dir string) error { func createDir(dir string) error {
return file.MkdirAll(dir, 0700) return file.MkdirAll(dir, 0700)

View File

@ -741,3 +741,13 @@ func TestCacheQueue(t *testing.T) {
_, ok := queue.([]writeback.QueueInfo) _, ok := queue.([]writeback.QueueInfo)
require.True(t, ok) require.True(t, ok)
} }
func TestCacheQueueSetExpiry(t *testing.T) {
_, c := newTestCache(t)
// Check this returns the correct error when called so we know
// it is plumbed in correctly. The actual tests are done in
// writeback.
err := c.QueueSetExpiry(123123, time.Now())
assert.Equal(t, writeback.ErrorIDNotFound, err)
}

View File

@ -511,3 +511,26 @@ func (wb *WriteBack) Queue() []QueueInfo {
return items return items
} }
// ErrorIDNotFound is returned from SetExpiry when the item is not found
var ErrorIDNotFound = errors.New("id not found in queue")
// SetExpiry sets the expiry time for an item in the writeback queue.
//
// id should be as returned from the Queue call
//
// If the item isn't found then it will return ErrorIDNotFound
func (wb *WriteBack) SetExpiry(id Handle, expiry time.Time) error {
wb.mu.Lock()
defer wb.mu.Unlock()
wbItem, ok := wb.lookup[id]
if !ok {
return ErrorIDNotFound
}
// Update the expiry with the user requested value
wb.items._update(wbItem, expiry)
wb._resetTimer()
return nil
}

View File

@ -545,6 +545,50 @@ func TestWriteBackQueue(t *testing.T) {
assert.Equal(t, []QueueInfo{}, queue) assert.Equal(t, []QueueInfo{}, queue)
} }
func TestWriteBackSetExpiry(t *testing.T) {
wb, cancel := newTestWriteBack(t)
defer cancel()
err := wb.SetExpiry(123123123, time.Now())
assert.Equal(t, ErrorIDNotFound, err)
pi := newPutItem(t)
id := wb.Add(0, "one", 10, true, pi.put)
wbItem := wb.lookup[id]
// get the expiry time with locking so we don't cause races
getExpiry := func() time.Time {
wb.mu.Lock()
defer wb.mu.Unlock()
return wbItem.expiry
}
expiry := time.Until(getExpiry()).Seconds()
assert.Greater(t, expiry, 0.0)
assert.Less(t, expiry, 1.0)
newExpiry := time.Now().Add(100 * time.Second)
require.NoError(t, wb.SetExpiry(wbItem.id, newExpiry))
assert.Equal(t, newExpiry, getExpiry())
// This starts the transfer
newExpiry = time.Now().Add(-100 * time.Second)
require.NoError(t, wb.SetExpiry(wbItem.id, newExpiry))
assert.Equal(t, newExpiry, getExpiry())
<-pi.started
expiry = time.Until(getExpiry()).Seconds()
assert.LessOrEqual(t, expiry, -100.0)
pi.finish(nil) // transfer successful
waitUntilNoTransfers(t, wb)
expiry = time.Until(getExpiry()).Seconds()
assert.LessOrEqual(t, expiry, -100.0)
}
// Test queuing more than fs.Config.Transfers // Test queuing more than fs.Config.Transfers
func TestWriteBackMaxQueue(t *testing.T) { func TestWriteBackMaxQueue(t *testing.T) {
ctx := context.Background() ctx := context.Background()