diff --git a/backend/pikpak/helper.go b/backend/pikpak/helper.go index 6351ef1b4..2e0306d2b 100644 --- a/backend/pikpak/helper.go +++ b/backend/pikpak/helper.go @@ -12,6 +12,7 @@ import ( "net/url" "os" "strconv" + "time" "github.com/rclone/rclone/backend/pikpak/api" "github.com/rclone/rclone/lib/rest" @@ -82,19 +83,21 @@ func (f *Fs) getVIPInfo(ctx context.Context) (info *api.VIP, err error) { // action can be one of batch{Copy,Delete,Trash,Untrash} func (f *Fs) requestBatchAction(ctx context.Context, action string, req *api.RequestBatch) (err error) { opts := rest.Opts{ - Method: "POST", - Path: "/drive/v1/files:" + action, - NoResponse: true, // Only returns `{"task_id":""} + Method: "POST", + Path: "/drive/v1/files:" + action, } + info := struct { + TaskID string `json:"task_id"` + }{} var resp *http.Response err = f.pacer.Call(func() (bool, error) { - resp, err = f.rst.CallJSON(ctx, &opts, &req, nil) + resp, err = f.rst.CallJSON(ctx, &opts, &req, &info) return f.shouldRetry(ctx, resp, err) }) if err != nil { return fmt.Errorf("batch action %q failed: %w", action, err) } - return nil + return f.waitTask(ctx, info.TaskID) } // requestNewTask requests a new api.NewTask and returns api.Task @@ -179,8 +182,8 @@ func (f *Fs) getTask(ctx context.Context, ID string, checkPhase bool) (info *api resp, err = f.rst.CallJSON(ctx, &opts, nil, &info) if checkPhase { if err == nil && info.Phase != api.PhaseTypeComplete { - // could be pending right after file is created/uploaded. - return true, errors.New(info.Phase) + // could be pending right after the task is created + return true, fmt.Errorf("%s (%s) is still in %s", info.Name, info.Type, info.Phase) } } return f.shouldRetry(ctx, resp, err) @@ -188,6 +191,18 @@ func (f *Fs) getTask(ctx context.Context, ID string, checkPhase bool) (info *api return } +// waitTask waits for async tasks to be completed +func (f *Fs) waitTask(ctx context.Context, ID string) (err error) { + time.Sleep(taskWaitTime) + if info, err := f.getTask(ctx, ID, true); err != nil { + if info == nil { + return fmt.Errorf("can't verify the task is completed: %q", ID) + } + return fmt.Errorf("can't verify the task is completed: %#v", info) + } + return +} + // deleteTask remove a task having the specified ID func (f *Fs) deleteTask(ctx context.Context, ID string, deleteFiles bool) (err error) { params := url.Values{} diff --git a/backend/pikpak/pikpak.go b/backend/pikpak/pikpak.go index 4a3fbd3d9..565180cde 100644 --- a/backend/pikpak/pikpak.go +++ b/backend/pikpak/pikpak.go @@ -69,7 +69,7 @@ const ( rcloneEncryptedClientSecret = "aqrmB6M1YJ1DWCBxVxFSjFo7wzWEky494YMmkqgAl1do1WKOe2E" minSleep = 100 * time.Millisecond maxSleep = 2 * time.Second - waitTime = 500 * time.Millisecond + taskWaitTime = 500 * time.Millisecond decayConstant = 2 // bigger for slower decay, exponential rootURL = "https://api-drive.mypikpak.com" minChunkSize = fs.SizeSuffix(s3manager.MinUploadPartSize) @@ -917,19 +917,21 @@ func (f *Fs) Purge(ctx context.Context, dir string) error { // CleanUp empties the trash func (f *Fs) CleanUp(ctx context.Context) (err error) { opts := rest.Opts{ - Method: "PATCH", - Path: "/drive/v1/files/trash:empty", - NoResponse: true, // Only returns `{"task_id":""} + Method: "PATCH", + Path: "/drive/v1/files/trash:empty", } + info := struct { + TaskID string `json:"task_id"` + }{} var resp *http.Response err = f.pacer.Call(func() (bool, error) { - resp, err = f.rst.Call(ctx, &opts) + resp, err = f.rst.CallJSON(ctx, &opts, nil, &info) return f.shouldRetry(ctx, resp, err) }) if err != nil { return fmt.Errorf("couldn't empty trash: %w", err) } - return nil + return f.waitTask(ctx, info.TaskID) } // Move the object @@ -1262,8 +1264,8 @@ func (f *Fs) upload(ctx context.Context, in io.Reader, leaf, dirID, sha1Str stri if cancelErr := f.deleteTask(ctx, new.Task.ID, false); cancelErr != nil { fs.Logf(leaf, "failed to cancel upload: %v", cancelErr) } - fs.Debugf(leaf, "waiting %v for the cancellation to be effective", waitTime) - time.Sleep(waitTime) + fs.Debugf(leaf, "waiting %v for the cancellation to be effective", taskWaitTime) + time.Sleep(taskWaitTime) })() if uploadType == api.UploadTypeForm && new.Form != nil { @@ -1277,12 +1279,7 @@ func (f *Fs) upload(ctx context.Context, in io.Reader, leaf, dirID, sha1Str stri if err != nil { return nil, fmt.Errorf("failed to upload: %w", err) } - fs.Debugf(leaf, "sleeping for %v before checking upload status", waitTime) - time.Sleep(waitTime) - if _, err = f.getTask(ctx, new.Task.ID, true); err != nil { - return nil, fmt.Errorf("unable to complete the upload: %w", err) - } - return new.File, nil + return new.File, f.waitTask(ctx, new.Task.ID) } // Put the object