pikpak: rewrite upload to bypass AWS S3 manager - fixes #8629

This commit introduces a significant rewrite of PikPak's upload, specifically
targeting direct handling of file uploads rather than relying on the generic
S3 manager. The primary motivation is to address critical upload failures
reported in #8629.

* Added new `multipart.go` file for multipart uploads using AWS S3 SDK.
* Removed dependency on AWS S3 manager; replaced with custom handling.
* Updated PikPak test package with new multipart upload tests,
  including configurable chunk size and upload cutoff.
* Added new configuration option `upload_cutoff` to control chunked uploads.
* Defined constraints for `chunk_size` and `upload_cutoff` (min/max values,
  validation).
* Adjusted default `upload_concurrency` from 5 to 4.
This commit is contained in:
wiserain
2025-07-04 11:25:12 +09:00
committed by GitHub
parent a97425d9cb
commit 24eb8dcde0
3 changed files with 475 additions and 34 deletions

333
backend/pikpak/multipart.go Normal file
View File

@@ -0,0 +1,333 @@
package pikpak
import (
"context"
"fmt"
"io"
"sort"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/rclone/rclone/backend/pikpak/api"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/chunksize"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/pool"
"golang.org/x/sync/errgroup"
)
const (
bufferSize = 1024 * 1024 // default size of the pages used in the reader
bufferCacheSize = 64 // max number of buffers to keep in cache
bufferCacheFlushTime = 5 * time.Second // flush the cached buffers after this long
)
// bufferPool is a global pool of buffers
var (
bufferPool *pool.Pool
bufferPoolOnce sync.Once
)
// get a buffer pool
func getPool() *pool.Pool {
bufferPoolOnce.Do(func() {
ci := fs.GetConfig(context.Background())
// Initialise the buffer pool when used
bufferPool = pool.New(bufferCacheFlushTime, bufferSize, bufferCacheSize, ci.UseMmap)
})
return bufferPool
}
// NewRW gets a pool.RW using the multipart pool
func NewRW() *pool.RW {
return pool.NewRW(getPool())
}
// Upload does a multipart upload in parallel
func (w *pikpakChunkWriter) Upload(ctx context.Context) (err error) {
// make concurrency machinery
tokens := pacer.NewTokenDispenser(w.con)
uploadCtx, cancel := context.WithCancel(ctx)
defer cancel()
defer atexit.OnError(&err, func() {
cancel()
fs.Debugf(w.o, "multipart upload: Cancelling...")
errCancel := w.Abort(ctx)
if errCancel != nil {
fs.Debugf(w.o, "multipart upload: failed to cancel: %v", errCancel)
}
})()
var (
g, gCtx = errgroup.WithContext(uploadCtx)
finished = false
off int64
size = w.size
chunkSize = w.chunkSize
)
// Do the accounting manually
in, acc := accounting.UnWrapAccounting(w.in)
for partNum := int64(0); !finished; partNum++ {
// Get a block of memory from the pool and token which limits concurrency.
tokens.Get()
rw := NewRW()
if acc != nil {
rw.SetAccounting(acc.AccountRead)
}
free := func() {
// return the memory and token
_ = rw.Close() // Can't return an error
tokens.Put()
}
// Fail fast, in case an errgroup managed function returns an error
// gCtx is cancelled. There is no point in uploading all the other parts.
if gCtx.Err() != nil {
free()
break
}
// Read the chunk
var n int64
n, err = io.CopyN(rw, in, chunkSize)
if err == io.EOF {
if n == 0 && partNum != 0 { // end if no data and if not first chunk
free()
break
}
finished = true
} else if err != nil {
free()
return fmt.Errorf("multipart upload: failed to read source: %w", err)
}
partNum := partNum
partOff := off
off += n
g.Go(func() (err error) {
defer free()
fs.Debugf(w.o, "multipart upload: starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(n), fs.SizeSuffix(partOff), fs.SizeSuffix(size))
_, err = w.WriteChunk(gCtx, int32(partNum), rw)
return err
})
}
err = g.Wait()
if err != nil {
return err
}
err = w.Close(ctx)
if err != nil {
return fmt.Errorf("multipart upload: failed to finalise: %w", err)
}
return nil
}
var warnStreamUpload sync.Once
// state of ChunkWriter
type pikpakChunkWriter struct {
chunkSize int64
size int64
con int
f *Fs
o *Object
in io.Reader
mu sync.Mutex
completedParts []types.CompletedPart
client *s3.Client
mOut *s3.CreateMultipartUploadOutput
}
func (f *Fs) newChunkWriter(ctx context.Context, remote string, size int64, p *api.ResumableParams, in io.Reader, options ...fs.OpenOption) (w *pikpakChunkWriter, err error) {
// Temporary Object under construction
o := &Object{
fs: f,
remote: remote,
}
// calculate size of parts
chunkSize := f.opt.ChunkSize
// size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize
// buffers here (default 5 MiB). With a maximum number of parts (10,000) this will be a file of
// 48 GiB which seems like a not too unreasonable limit.
if size == -1 {
warnStreamUpload.Do(func() {
fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v",
f.opt.ChunkSize, fs.SizeSuffix(int64(chunkSize)*int64(maxUploadParts)))
})
} else {
chunkSize = chunksize.Calculator(o, size, maxUploadParts, chunkSize)
}
client, err := f.newS3Client(ctx, p)
if err != nil {
return nil, fmt.Errorf("failed to create upload client: %w", err)
}
w = &pikpakChunkWriter{
chunkSize: int64(chunkSize),
size: size,
con: max(1, f.opt.UploadConcurrency),
f: f,
o: o,
in: in,
completedParts: make([]types.CompletedPart, 0),
client: client,
}
req := &s3.CreateMultipartUploadInput{
Bucket: &p.Bucket,
Key: &p.Key,
}
// Apply upload options
for _, option := range options {
key, value := option.Header()
lowerKey := strings.ToLower(key)
switch lowerKey {
case "":
// ignore
case "cache-control":
req.CacheControl = aws.String(value)
case "content-disposition":
req.ContentDisposition = aws.String(value)
case "content-encoding":
req.ContentEncoding = aws.String(value)
case "content-type":
req.ContentType = aws.String(value)
}
}
err = w.f.pacer.Call(func() (bool, error) {
w.mOut, err = w.client.CreateMultipartUpload(ctx, req)
return w.shouldRetry(ctx, err)
})
if err != nil {
return nil, fmt.Errorf("create multipart upload failed: %w", err)
}
fs.Debugf(w.o, "multipart upload: %q initiated", *w.mOut.UploadId)
return
}
// shouldRetry returns a boolean as to whether this err
// deserve to be retried. It returns the err as a convenience
func (w *pikpakChunkWriter) shouldRetry(ctx context.Context, err error) (bool, error) {
if fserrors.ContextError(ctx, &err) {
return false, err
}
if fserrors.ShouldRetry(err) {
return true, err
}
return false, err
}
// add a part number and etag to the completed parts
func (w *pikpakChunkWriter) addCompletedPart(part types.CompletedPart) {
w.mu.Lock()
defer w.mu.Unlock()
w.completedParts = append(w.completedParts, part)
}
// WriteChunk will write chunk number with reader bytes, where chunk number >= 0
func (w *pikpakChunkWriter) WriteChunk(ctx context.Context, chunkNumber int32, reader io.ReadSeeker) (currentChunkSize int64, err error) {
if chunkNumber < 0 {
err := fmt.Errorf("invalid chunk number provided: %v", chunkNumber)
return -1, err
}
partNumber := chunkNumber + 1
var res *s3.UploadPartOutput
err = w.f.pacer.Call(func() (bool, error) {
// Discover the size by seeking to the end
currentChunkSize, err = reader.Seek(0, io.SeekEnd)
if err != nil {
return false, err
}
// rewind the reader on retry and after reading md5
_, err := reader.Seek(0, io.SeekStart)
if err != nil {
return false, err
}
res, err = w.client.UploadPart(ctx, &s3.UploadPartInput{
Bucket: w.mOut.Bucket,
Key: w.mOut.Key,
UploadId: w.mOut.UploadId,
PartNumber: &partNumber,
Body: reader,
})
if err != nil {
if chunkNumber <= 8 {
return w.shouldRetry(ctx, err)
}
// retry all chunks once have done the first few
return true, err
}
return false, nil
})
if err != nil {
return -1, fmt.Errorf("failed to upload chunk %d with %v bytes: %w", partNumber, currentChunkSize, err)
}
w.addCompletedPart(types.CompletedPart{
PartNumber: &partNumber,
ETag: res.ETag,
})
fs.Debugf(w.o, "multipart upload: wrote chunk %d with %v bytes", partNumber, currentChunkSize)
return currentChunkSize, err
}
// Abort the multipart upload
func (w *pikpakChunkWriter) Abort(ctx context.Context) (err error) {
// Abort the upload session
err = w.f.pacer.Call(func() (bool, error) {
_, err = w.client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{
Bucket: w.mOut.Bucket,
Key: w.mOut.Key,
UploadId: w.mOut.UploadId,
})
return w.shouldRetry(ctx, err)
})
if err != nil {
return fmt.Errorf("failed to abort multipart upload %q: %w", *w.mOut.UploadId, err)
}
fs.Debugf(w.o, "multipart upload: %q aborted", *w.mOut.UploadId)
return
}
// Close and finalise the multipart upload
func (w *pikpakChunkWriter) Close(ctx context.Context) (err error) {
// sort the completed parts by part number
sort.Slice(w.completedParts, func(i, j int) bool {
return *w.completedParts[i].PartNumber < *w.completedParts[j].PartNumber
})
// Finalise the upload session
err = w.f.pacer.Call(func() (bool, error) {
_, err = w.client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: w.mOut.Bucket,
Key: w.mOut.Key,
UploadId: w.mOut.UploadId,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: w.completedParts,
},
})
return w.shouldRetry(ctx, err)
})
if err != nil {
return fmt.Errorf("failed to complete multipart upload: %w", err)
}
fs.Debugf(w.o, "multipart upload: %q finished", *w.mOut.UploadId)
return
}

View File

@@ -41,12 +41,10 @@ import (
"github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config" awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/rclone/rclone/backend/pikpak/api" "github.com/rclone/rclone/backend/pikpak/api"
"github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/chunksize"
"github.com/rclone/rclone/fs/config" "github.com/rclone/rclone/fs/config"
"github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/config/configstruct" "github.com/rclone/rclone/fs/config/configstruct"
@@ -66,17 +64,22 @@ import (
// Constants // Constants
const ( const (
clientID = "YUMx5nI8ZU8Ap8pm" clientID = "YUMx5nI8ZU8Ap8pm"
clientVersion = "2.0.0" clientVersion = "2.0.0"
packageName = "mypikpak.com" packageName = "mypikpak.com"
defaultUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:129.0) Gecko/20100101 Firefox/129.0" defaultUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:129.0) Gecko/20100101 Firefox/129.0"
minSleep = 100 * time.Millisecond minSleep = 100 * time.Millisecond
maxSleep = 2 * time.Second maxSleep = 2 * time.Second
taskWaitTime = 500 * time.Millisecond taskWaitTime = 500 * time.Millisecond
decayConstant = 2 // bigger for slower decay, exponential decayConstant = 2 // bigger for slower decay, exponential
rootURL = "https://api-drive.mypikpak.com" rootURL = "https://api-drive.mypikpak.com"
minChunkSize = fs.SizeSuffix(manager.MinUploadPartSize)
defaultUploadConcurrency = manager.DefaultUploadConcurrency maxUploadParts = 10000 // Part number must be an integer between 1 and 10000, inclusive.
defaultChunkSize = fs.SizeSuffix(1024 * 1024 * 5) // Part size should be in [100KB, 5GB]
minChunkSize = 100 * fs.Kibi
maxChunkSize = 5 * fs.Gibi
defaultUploadCutoff = fs.SizeSuffix(200 * 1024 * 1024)
maxUploadCutoff = 5 * fs.Gibi // maximum allowed size for singlepart uploads
) )
// Globals // Globals
@@ -223,6 +226,14 @@ Fill in for rclone to use a non root folder as its starting point.
Help: "Files bigger than this will be cached on disk to calculate hash if required.", Help: "Files bigger than this will be cached on disk to calculate hash if required.",
Default: fs.SizeSuffix(10 * 1024 * 1024), Default: fs.SizeSuffix(10 * 1024 * 1024),
Advanced: true, Advanced: true,
}, {
Name: "upload_cutoff",
Help: `Cutoff for switching to chunked upload.
Any files larger than this will be uploaded in chunks of chunk_size.
The minimum is 0 and the maximum is 5 GiB.`,
Default: defaultUploadCutoff,
Advanced: true,
}, { }, {
Name: "chunk_size", Name: "chunk_size",
Help: `Chunk size for multipart uploads. Help: `Chunk size for multipart uploads.
@@ -241,7 +252,7 @@ large file of known size to stay below the 10,000 chunks limit.
Increasing the chunk size decreases the accuracy of the progress Increasing the chunk size decreases the accuracy of the progress
statistics displayed with "-P" flag.`, statistics displayed with "-P" flag.`,
Default: minChunkSize, Default: defaultChunkSize,
Advanced: true, Advanced: true,
}, { }, {
Name: "upload_concurrency", Name: "upload_concurrency",
@@ -257,7 +268,7 @@ in memory.
If you are uploading small numbers of large files over high-speed links If you are uploading small numbers of large files over high-speed links
and these uploads do not fully utilize your bandwidth, then increasing and these uploads do not fully utilize your bandwidth, then increasing
this may help to speed up the transfers.`, this may help to speed up the transfers.`,
Default: defaultUploadConcurrency, Default: 4,
Advanced: true, Advanced: true,
}, { }, {
Name: config.ConfigEncoding, Name: config.ConfigEncoding,
@@ -294,6 +305,7 @@ type Options struct {
NoMediaLink bool `config:"no_media_link"` NoMediaLink bool `config:"no_media_link"`
HashMemoryThreshold fs.SizeSuffix `config:"hash_memory_limit"` HashMemoryThreshold fs.SizeSuffix `config:"hash_memory_limit"`
ChunkSize fs.SizeSuffix `config:"chunk_size"` ChunkSize fs.SizeSuffix `config:"chunk_size"`
UploadCutoff fs.SizeSuffix `config:"upload_cutoff"`
UploadConcurrency int `config:"upload_concurrency"` UploadConcurrency int `config:"upload_concurrency"`
Enc encoder.MultiEncoder `config:"encoding"` Enc encoder.MultiEncoder `config:"encoding"`
} }
@@ -524,6 +536,39 @@ func (f *Fs) newClientWithPacer(ctx context.Context) (err error) {
return nil return nil
} }
func checkUploadChunkSize(cs fs.SizeSuffix) error {
if cs < minChunkSize {
return fmt.Errorf("%s is less than %s", cs, minChunkSize)
}
if cs > maxChunkSize {
return fmt.Errorf("%s is greater than %s", cs, maxChunkSize)
}
return nil
}
func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
err = checkUploadChunkSize(cs)
if err == nil {
old, f.opt.ChunkSize = f.opt.ChunkSize, cs
}
return
}
func checkUploadCutoff(cs fs.SizeSuffix) error {
if cs > maxUploadCutoff {
return fmt.Errorf("%s is greater than %s", cs, maxUploadCutoff)
}
return nil
}
func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
err = checkUploadCutoff(cs)
if err == nil {
old, f.opt.UploadCutoff = f.opt.UploadCutoff, cs
}
return
}
// newFs partially constructs Fs from the path // newFs partially constructs Fs from the path
// //
// It constructs a valid Fs but doesn't attempt to figure out whether // It constructs a valid Fs but doesn't attempt to figure out whether
@@ -531,11 +576,17 @@ func (f *Fs) newClientWithPacer(ctx context.Context) (err error) {
func newFs(ctx context.Context, name, path string, m configmap.Mapper) (*Fs, error) { func newFs(ctx context.Context, name, path string, m configmap.Mapper) (*Fs, error) {
// Parse config into Options struct // Parse config into Options struct
opt := new(Options) opt := new(Options)
if err := configstruct.Set(m, opt); err != nil { err := configstruct.Set(m, opt)
if err != nil {
return nil, err return nil, err
} }
if opt.ChunkSize < minChunkSize { err = checkUploadChunkSize(opt.ChunkSize)
return nil, fmt.Errorf("chunk size must be at least %s", minChunkSize) if err != nil {
return nil, fmt.Errorf("pikpak: chunk size: %w", err)
}
err = checkUploadCutoff(opt.UploadCutoff)
if err != nil {
return nil, fmt.Errorf("pikpak: upload cutoff: %w", err)
} }
root := parsePath(path) root := parsePath(path)
@@ -1260,9 +1311,7 @@ func (f *Fs) uploadByForm(ctx context.Context, in io.Reader, name string, size i
return return
} }
func (f *Fs) uploadByResumable(ctx context.Context, in io.Reader, name string, size int64, resumable *api.Resumable) (err error) { func (f *Fs) newS3Client(ctx context.Context, p *api.ResumableParams) (s3Client *s3.Client, err error) {
p := resumable.Params
// Create a credentials provider // Create a credentials provider
creds := credentials.NewStaticCredentialsProvider(p.AccessKeyID, p.AccessKeySecret, p.SecurityToken) creds := credentials.NewStaticCredentialsProvider(p.AccessKeyID, p.AccessKeySecret, p.SecurityToken)
@@ -1272,22 +1321,64 @@ func (f *Fs) uploadByResumable(ctx context.Context, in io.Reader, name string, s
if err != nil { if err != nil {
return return
} }
ci := fs.GetConfig(ctx)
cfg.RetryMaxAttempts = ci.LowLevelRetries
cfg.HTTPClient = getClient(ctx, &f.opt)
client := s3.NewFromConfig(cfg, func(o *s3.Options) { client := s3.NewFromConfig(cfg, func(o *s3.Options) {
o.BaseEndpoint = aws.String("https://mypikpak.com/") o.BaseEndpoint = aws.String("https://mypikpak.com/")
o.RequestChecksumCalculation = aws.RequestChecksumCalculationWhenRequired
o.ResponseChecksumValidation = aws.ResponseChecksumValidationWhenRequired
}) })
partSize := chunksize.Calculator(name, size, int(manager.MaxUploadParts), f.opt.ChunkSize) return client, nil
}
// Create an uploader with custom options func (f *Fs) uploadByResumable(ctx context.Context, in io.Reader, name string, size int64, resumable *api.Resumable, options ...fs.OpenOption) (err error) {
uploader := manager.NewUploader(client, func(u *manager.Uploader) { p := resumable.Params
u.PartSize = int64(partSize)
u.Concurrency = f.opt.UploadConcurrency if size < 0 || size >= int64(f.opt.UploadCutoff) {
}) mu, err := f.newChunkWriter(ctx, name, size, p, in, options...)
// Perform an upload if err != nil {
_, err = uploader.Upload(ctx, &s3.PutObjectInput{ return fmt.Errorf("multipart upload failed to initialise: %w", err)
}
return mu.Upload(ctx)
}
// upload singlepart
client, err := f.newS3Client(ctx, p)
if err != nil {
return fmt.Errorf("failed to create upload client: %w", err)
}
req := &s3.PutObjectInput{
Bucket: &p.Bucket, Bucket: &p.Bucket,
Key: &p.Key, Key: &p.Key,
Body: in, Body: io.NopCloser(in),
}
// Apply upload options
for _, option := range options {
key, value := option.Header()
lowerKey := strings.ToLower(key)
switch lowerKey {
case "":
// ignore
case "cache-control":
req.CacheControl = aws.String(value)
case "content-disposition":
req.ContentDisposition = aws.String(value)
case "content-encoding":
req.ContentEncoding = aws.String(value)
case "content-type":
req.ContentType = aws.String(value)
}
}
var s3opts = []func(*s3.Options){}
// Can't retry single part uploads as only have an io.Reader
s3opts = append(s3opts, func(o *s3.Options) {
o.RetryMaxAttempts = 1
})
err = f.pacer.CallNoRetry(func() (bool, error) {
_, err = client.PutObject(ctx, req, s3opts...)
return f.shouldRetry(ctx, nil, err)
}) })
return return
} }
@@ -1345,7 +1436,7 @@ func (f *Fs) upload(ctx context.Context, in io.Reader, leaf, dirID, gcid string,
if uploadType == api.UploadTypeForm && new.Form != nil { if uploadType == api.UploadTypeForm && new.Form != nil {
err = f.uploadByForm(ctx, in, req.Name, size, new.Form, options...) err = f.uploadByForm(ctx, in, req.Name, size, new.Form, options...)
} else if uploadType == api.UploadTypeResumable && new.Resumable != nil { } else if uploadType == api.UploadTypeResumable && new.Resumable != nil {
err = f.uploadByResumable(ctx, in, leaf, size, new.Resumable) err = f.uploadByResumable(ctx, in, leaf, size, new.Resumable, options...)
} else { } else {
err = fmt.Errorf("no method available for uploading: %+v", new) err = fmt.Errorf("no method available for uploading: %+v", new)
} }

View File

@@ -1,10 +1,10 @@
// Test PikPak filesystem interface // Test PikPak filesystem interface
package pikpak_test package pikpak
import ( import (
"testing" "testing"
"github.com/rclone/rclone/backend/pikpak" "github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fstest/fstests" "github.com/rclone/rclone/fstest/fstests"
) )
@@ -12,6 +12,23 @@ import (
func TestIntegration(t *testing.T) { func TestIntegration(t *testing.T) {
fstests.Run(t, &fstests.Opt{ fstests.Run(t, &fstests.Opt{
RemoteName: "TestPikPak:", RemoteName: "TestPikPak:",
NilObject: (*pikpak.Object)(nil), NilObject: (*Object)(nil),
ChunkedUpload: fstests.ChunkedUploadConfig{
MinChunkSize: minChunkSize,
MaxChunkSize: maxChunkSize,
},
}) })
} }
func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
return f.setUploadChunkSize(cs)
}
func (f *Fs) SetUploadCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
return f.setUploadCutoff(cs)
}
var (
_ fstests.SetUploadChunkSizer = (*Fs)(nil)
_ fstests.SetUploadCutoffer = (*Fs)(nil)
)