pikpak: implement configurable chunk size for multipart upload

Previously, the fixed 10MB chunk size could lead to exceeding the maximum 
allowed number of parts for very large files. Similar to other backends, options for 
chunk size and upload concurrency are now user-configurable. Additionally, 
the internal library is used to automatically adjust chunk size to prevent exceeding 
upload part limitations.

Fixes #7850
This commit is contained in:
wiserain 2024-06-12 13:19:25 +09:00 committed by GitHub
parent 6db9f7180f
commit b7624287ac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -18,7 +18,6 @@ package pikpak
// ------------------------------------------------------------ // ------------------------------------------------------------
// * List() with options starred-only // * List() with options starred-only
// * uploadByResumable() with configurable chunk-size
// * user-configurable list chunk // * user-configurable list chunk
// * backend command: untrash, iscached // * backend command: untrash, iscached
// * api(event,task) // * api(event,task)
@ -47,6 +46,7 @@ import (
"github.com/rclone/rclone/backend/pikpak/api" "github.com/rclone/rclone/backend/pikpak/api"
"github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/chunksize"
"github.com/rclone/rclone/fs/config" "github.com/rclone/rclone/fs/config"
"github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/config/configstruct" "github.com/rclone/rclone/fs/config/configstruct"
@ -72,6 +72,8 @@ const (
waitTime = 500 * time.Millisecond waitTime = 500 * time.Millisecond
decayConstant = 2 // bigger for slower decay, exponential decayConstant = 2 // bigger for slower decay, exponential
rootURL = "https://api-drive.mypikpak.com" rootURL = "https://api-drive.mypikpak.com"
minChunkSize = fs.SizeSuffix(s3manager.MinUploadPartSize)
defaultUploadConcurrency = s3manager.DefaultUploadConcurrency
) )
// Globals // Globals
@ -193,6 +195,42 @@ Fill in for rclone to use a non root folder as its starting point.
Help: "Files bigger than this will be cached on disk to calculate hash if required.", Help: "Files bigger than this will be cached on disk to calculate hash if required.",
Default: fs.SizeSuffix(10 * 1024 * 1024), Default: fs.SizeSuffix(10 * 1024 * 1024),
Advanced: true, Advanced: true,
}, {
Name: "chunk_size",
Help: `Chunk size for multipart uploads.
Large files will be uploaded in chunks of this size.
Note that this is stored in memory and there may be up to
"--transfers" * "--pikpak-upload-concurrency" chunks stored at once
in memory.
If you are transferring large files over high-speed links and you have
enough memory, then increasing this will speed up the transfers.
Rclone will automatically increase the chunk size when uploading a
large file of known size to stay below the 10,000 chunks limit.
Increasing the chunk size decreases the accuracy of the progress
statistics displayed with "-P" flag.`,
Default: minChunkSize,
Advanced: true,
}, {
Name: "upload_concurrency",
Help: `Concurrency for multipart uploads.
This is the number of chunks of the same file that are uploaded
concurrently for multipart uploads.
Note that chunks are stored in memory and there may be up to
"--transfers" * "--pikpak-upload-concurrency" chunks stored at once
in memory.
If you are uploading small numbers of large files over high-speed links
and these uploads do not fully utilize your bandwidth, then increasing
this may help to speed up the transfers.`,
Default: defaultUploadConcurrency,
Advanced: true,
}, { }, {
Name: config.ConfigEncoding, Name: config.ConfigEncoding,
Help: config.ConfigEncodingHelp, Help: config.ConfigEncodingHelp,
@ -223,6 +261,8 @@ type Options struct {
UseTrash bool `config:"use_trash"` UseTrash bool `config:"use_trash"`
TrashedOnly bool `config:"trashed_only"` TrashedOnly bool `config:"trashed_only"`
HashMemoryThreshold fs.SizeSuffix `config:"hash_memory_limit"` HashMemoryThreshold fs.SizeSuffix `config:"hash_memory_limit"`
ChunkSize fs.SizeSuffix `config:"chunk_size"`
UploadConcurrency int `config:"upload_concurrency"`
Enc encoder.MultiEncoder `config:"encoding"` Enc encoder.MultiEncoder `config:"encoding"`
} }
@ -432,6 +472,9 @@ func newFs(ctx context.Context, name, path string, m configmap.Mapper) (*Fs, err
if err := configstruct.Set(m, opt); err != nil { if err := configstruct.Set(m, opt); err != nil {
return nil, err return nil, err
} }
if opt.ChunkSize < minChunkSize {
return nil, fmt.Errorf("chunk size must be at least %s", minChunkSize)
}
root := parsePath(path) root := parsePath(path)
@ -1148,7 +1191,7 @@ func (f *Fs) uploadByForm(ctx context.Context, in io.Reader, name string, size i
return return
} }
func (f *Fs) uploadByResumable(ctx context.Context, in io.Reader, resumable *api.Resumable) (err error) { func (f *Fs) uploadByResumable(ctx context.Context, in io.Reader, name string, size int64, resumable *api.Resumable) (err error) {
p := resumable.Params p := resumable.Params
endpoint := strings.Join(strings.Split(p.Endpoint, ".")[1:], ".") // "mypikpak.com" endpoint := strings.Join(strings.Split(p.Endpoint, ".")[1:], ".") // "mypikpak.com"
@ -1161,19 +1204,21 @@ func (f *Fs) uploadByResumable(ctx context.Context, in io.Reader, resumable *api
if err != nil { if err != nil {
return return
} }
partSize := chunksize.Calculator(name, size, s3manager.MaxUploadParts, f.opt.ChunkSize)
uploader := s3manager.NewUploader(sess) // Create an uploader with the session and custom options
uploader := s3manager.NewUploader(sess, func(u *s3manager.Uploader) {
u.PartSize = int64(partSize)
u.Concurrency = f.opt.UploadConcurrency
})
// Upload input parameters // Upload input parameters
uParams := &s3manager.UploadInput{ uParams := &s3manager.UploadInput{
Bucket: &p.Bucket, Bucket: &p.Bucket,
Key: &p.Key, Key: &p.Key,
Body: in, Body: in,
} }
// Perform upload with options different than the those in the Uploader. // Perform an upload
_, err = uploader.UploadWithContext(ctx, uParams, func(u *s3manager.Uploader) { _, err = uploader.UploadWithContext(ctx, uParams)
// TODO can be user-configurable
u.PartSize = 10 * 1024 * 1024 // 10MB part size
})
return return
} }
@ -1224,7 +1269,7 @@ func (f *Fs) upload(ctx context.Context, in io.Reader, leaf, dirID, sha1Str stri
if uploadType == api.UploadTypeForm && new.Form != nil { if uploadType == api.UploadTypeForm && new.Form != nil {
err = f.uploadByForm(ctx, in, req.Name, size, new.Form, options...) err = f.uploadByForm(ctx, in, req.Name, size, new.Form, options...)
} else if uploadType == api.UploadTypeResumable && new.Resumable != nil { } else if uploadType == api.UploadTypeResumable && new.Resumable != nil {
err = f.uploadByResumable(ctx, in, new.Resumable) err = f.uploadByResumable(ctx, in, leaf, size, new.Resumable)
} else { } else {
err = fmt.Errorf("no method available for uploading: %+v", new) err = fmt.Errorf("no method available for uploading: %+v", new)
} }