diff --git a/backend/pikpak/pikpak.go b/backend/pikpak/pikpak.go index 9b6a527cf..4a3fbd3d9 100644 --- a/backend/pikpak/pikpak.go +++ b/backend/pikpak/pikpak.go @@ -18,7 +18,6 @@ package pikpak // ------------------------------------------------------------ // * List() with options starred-only -// * uploadByResumable() with configurable chunk-size // * user-configurable list chunk // * backend command: untrash, iscached // * api(event,task) @@ -47,6 +46,7 @@ import ( "github.com/rclone/rclone/backend/pikpak/api" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/accounting" + "github.com/rclone/rclone/fs/chunksize" "github.com/rclone/rclone/fs/config" "github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/config/configstruct" @@ -72,6 +72,8 @@ const ( waitTime = 500 * time.Millisecond decayConstant = 2 // bigger for slower decay, exponential rootURL = "https://api-drive.mypikpak.com" + minChunkSize = fs.SizeSuffix(s3manager.MinUploadPartSize) + defaultUploadConcurrency = s3manager.DefaultUploadConcurrency ) // Globals @@ -193,6 +195,42 @@ Fill in for rclone to use a non root folder as its starting point. Help: "Files bigger than this will be cached on disk to calculate hash if required.", Default: fs.SizeSuffix(10 * 1024 * 1024), Advanced: true, + }, { + Name: "chunk_size", + Help: `Chunk size for multipart uploads. + +Large files will be uploaded in chunks of this size. + +Note that this is stored in memory and there may be up to +"--transfers" * "--pikpak-upload-concurrency" chunks stored at once +in memory. + +If you are transferring large files over high-speed links and you have +enough memory, then increasing this will speed up the transfers. + +Rclone will automatically increase the chunk size when uploading a +large file of known size to stay below the 10,000 chunks limit. + +Increasing the chunk size decreases the accuracy of the progress +statistics displayed with "-P" flag.`, + Default: minChunkSize, + Advanced: true, + }, { + Name: "upload_concurrency", + Help: `Concurrency for multipart uploads. + +This is the number of chunks of the same file that are uploaded +concurrently for multipart uploads. + +Note that chunks are stored in memory and there may be up to +"--transfers" * "--pikpak-upload-concurrency" chunks stored at once +in memory. + +If you are uploading small numbers of large files over high-speed links +and these uploads do not fully utilize your bandwidth, then increasing +this may help to speed up the transfers.`, + Default: defaultUploadConcurrency, + Advanced: true, }, { Name: config.ConfigEncoding, Help: config.ConfigEncodingHelp, @@ -223,6 +261,8 @@ type Options struct { UseTrash bool `config:"use_trash"` TrashedOnly bool `config:"trashed_only"` HashMemoryThreshold fs.SizeSuffix `config:"hash_memory_limit"` + ChunkSize fs.SizeSuffix `config:"chunk_size"` + UploadConcurrency int `config:"upload_concurrency"` Enc encoder.MultiEncoder `config:"encoding"` } @@ -432,6 +472,9 @@ func newFs(ctx context.Context, name, path string, m configmap.Mapper) (*Fs, err if err := configstruct.Set(m, opt); err != nil { return nil, err } + if opt.ChunkSize < minChunkSize { + return nil, fmt.Errorf("chunk size must be at least %s", minChunkSize) + } root := parsePath(path) @@ -1148,7 +1191,7 @@ func (f *Fs) uploadByForm(ctx context.Context, in io.Reader, name string, size i return } -func (f *Fs) uploadByResumable(ctx context.Context, in io.Reader, resumable *api.Resumable) (err error) { +func (f *Fs) uploadByResumable(ctx context.Context, in io.Reader, name string, size int64, resumable *api.Resumable) (err error) { p := resumable.Params endpoint := strings.Join(strings.Split(p.Endpoint, ".")[1:], ".") // "mypikpak.com" @@ -1161,19 +1204,21 @@ func (f *Fs) uploadByResumable(ctx context.Context, in io.Reader, resumable *api if err != nil { return } + partSize := chunksize.Calculator(name, size, s3manager.MaxUploadParts, f.opt.ChunkSize) - uploader := s3manager.NewUploader(sess) + // Create an uploader with the session and custom options + uploader := s3manager.NewUploader(sess, func(u *s3manager.Uploader) { + u.PartSize = int64(partSize) + u.Concurrency = f.opt.UploadConcurrency + }) // Upload input parameters uParams := &s3manager.UploadInput{ Bucket: &p.Bucket, Key: &p.Key, Body: in, } - // Perform upload with options different than the those in the Uploader. - _, err = uploader.UploadWithContext(ctx, uParams, func(u *s3manager.Uploader) { - // TODO can be user-configurable - u.PartSize = 10 * 1024 * 1024 // 10MB part size - }) + // Perform an upload + _, err = uploader.UploadWithContext(ctx, uParams) return } @@ -1224,7 +1269,7 @@ func (f *Fs) upload(ctx context.Context, in io.Reader, leaf, dirID, sha1Str stri if uploadType == api.UploadTypeForm && new.Form != nil { err = f.uploadByForm(ctx, in, req.Name, size, new.Form, options...) } else if uploadType == api.UploadTypeResumable && new.Resumable != nil { - err = f.uploadByResumable(ctx, in, new.Resumable) + err = f.uploadByResumable(ctx, in, leaf, size, new.Resumable) } else { err = fmt.Errorf("no method available for uploading: %+v", new) }