diff --git a/backend/pikpak/multipart.go b/backend/pikpak/multipart.go new file mode 100644 index 000000000..85dc8f2d5 --- /dev/null +++ b/backend/pikpak/multipart.go @@ -0,0 +1,333 @@ +package pikpak + +import ( + "context" + "fmt" + "io" + "sort" + "strings" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/rclone/rclone/backend/pikpak/api" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/accounting" + "github.com/rclone/rclone/fs/chunksize" + "github.com/rclone/rclone/fs/fserrors" + "github.com/rclone/rclone/lib/atexit" + "github.com/rclone/rclone/lib/pacer" + "github.com/rclone/rclone/lib/pool" + "golang.org/x/sync/errgroup" +) + +const ( + bufferSize = 1024 * 1024 // default size of the pages used in the reader + bufferCacheSize = 64 // max number of buffers to keep in cache + bufferCacheFlushTime = 5 * time.Second // flush the cached buffers after this long +) + +// bufferPool is a global pool of buffers +var ( + bufferPool *pool.Pool + bufferPoolOnce sync.Once +) + +// get a buffer pool +func getPool() *pool.Pool { + bufferPoolOnce.Do(func() { + ci := fs.GetConfig(context.Background()) + // Initialise the buffer pool when used + bufferPool = pool.New(bufferCacheFlushTime, bufferSize, bufferCacheSize, ci.UseMmap) + }) + return bufferPool +} + +// NewRW gets a pool.RW using the multipart pool +func NewRW() *pool.RW { + return pool.NewRW(getPool()) +} + +// Upload does a multipart upload in parallel +func (w *pikpakChunkWriter) Upload(ctx context.Context) (err error) { + // make concurrency machinery + tokens := pacer.NewTokenDispenser(w.con) + + uploadCtx, cancel := context.WithCancel(ctx) + defer cancel() + defer atexit.OnError(&err, func() { + cancel() + fs.Debugf(w.o, "multipart upload: Cancelling...") + errCancel := w.Abort(ctx) + if errCancel != nil { + fs.Debugf(w.o, "multipart upload: failed to cancel: %v", errCancel) + } + })() + + var ( + g, gCtx = errgroup.WithContext(uploadCtx) + finished = false + off int64 + size = w.size + chunkSize = w.chunkSize + ) + + // Do the accounting manually + in, acc := accounting.UnWrapAccounting(w.in) + + for partNum := int64(0); !finished; partNum++ { + // Get a block of memory from the pool and token which limits concurrency. + tokens.Get() + rw := NewRW() + if acc != nil { + rw.SetAccounting(acc.AccountRead) + } + + free := func() { + // return the memory and token + _ = rw.Close() // Can't return an error + tokens.Put() + } + + // Fail fast, in case an errgroup managed function returns an error + // gCtx is cancelled. There is no point in uploading all the other parts. + if gCtx.Err() != nil { + free() + break + } + + // Read the chunk + var n int64 + n, err = io.CopyN(rw, in, chunkSize) + if err == io.EOF { + if n == 0 && partNum != 0 { // end if no data and if not first chunk + free() + break + } + finished = true + } else if err != nil { + free() + return fmt.Errorf("multipart upload: failed to read source: %w", err) + } + + partNum := partNum + partOff := off + off += n + g.Go(func() (err error) { + defer free() + fs.Debugf(w.o, "multipart upload: starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(n), fs.SizeSuffix(partOff), fs.SizeSuffix(size)) + _, err = w.WriteChunk(gCtx, int32(partNum), rw) + return err + }) + } + + err = g.Wait() + if err != nil { + return err + } + + err = w.Close(ctx) + if err != nil { + return fmt.Errorf("multipart upload: failed to finalise: %w", err) + } + + return nil +} + +var warnStreamUpload sync.Once + +// state of ChunkWriter +type pikpakChunkWriter struct { + chunkSize int64 + size int64 + con int + f *Fs + o *Object + in io.Reader + mu sync.Mutex + completedParts []types.CompletedPart + client *s3.Client + mOut *s3.CreateMultipartUploadOutput +} + +func (f *Fs) newChunkWriter(ctx context.Context, remote string, size int64, p *api.ResumableParams, in io.Reader, options ...fs.OpenOption) (w *pikpakChunkWriter, err error) { + // Temporary Object under construction + o := &Object{ + fs: f, + remote: remote, + } + + // calculate size of parts + chunkSize := f.opt.ChunkSize + + // size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize + // buffers here (default 5 MiB). With a maximum number of parts (10,000) this will be a file of + // 48 GiB which seems like a not too unreasonable limit. + if size == -1 { + warnStreamUpload.Do(func() { + fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v", + f.opt.ChunkSize, fs.SizeSuffix(int64(chunkSize)*int64(maxUploadParts))) + }) + } else { + chunkSize = chunksize.Calculator(o, size, maxUploadParts, chunkSize) + } + + client, err := f.newS3Client(ctx, p) + if err != nil { + return nil, fmt.Errorf("failed to create upload client: %w", err) + } + w = &pikpakChunkWriter{ + chunkSize: int64(chunkSize), + size: size, + con: max(1, f.opt.UploadConcurrency), + f: f, + o: o, + in: in, + completedParts: make([]types.CompletedPart, 0), + client: client, + } + + req := &s3.CreateMultipartUploadInput{ + Bucket: &p.Bucket, + Key: &p.Key, + } + // Apply upload options + for _, option := range options { + key, value := option.Header() + lowerKey := strings.ToLower(key) + switch lowerKey { + case "": + // ignore + case "cache-control": + req.CacheControl = aws.String(value) + case "content-disposition": + req.ContentDisposition = aws.String(value) + case "content-encoding": + req.ContentEncoding = aws.String(value) + case "content-type": + req.ContentType = aws.String(value) + } + } + err = w.f.pacer.Call(func() (bool, error) { + w.mOut, err = w.client.CreateMultipartUpload(ctx, req) + return w.shouldRetry(ctx, err) + }) + if err != nil { + return nil, fmt.Errorf("create multipart upload failed: %w", err) + } + fs.Debugf(w.o, "multipart upload: %q initiated", *w.mOut.UploadId) + return +} + +// shouldRetry returns a boolean as to whether this err +// deserve to be retried. It returns the err as a convenience +func (w *pikpakChunkWriter) shouldRetry(ctx context.Context, err error) (bool, error) { + if fserrors.ContextError(ctx, &err) { + return false, err + } + if fserrors.ShouldRetry(err) { + return true, err + } + return false, err +} + +// add a part number and etag to the completed parts +func (w *pikpakChunkWriter) addCompletedPart(part types.CompletedPart) { + w.mu.Lock() + defer w.mu.Unlock() + w.completedParts = append(w.completedParts, part) +} + +// WriteChunk will write chunk number with reader bytes, where chunk number >= 0 +func (w *pikpakChunkWriter) WriteChunk(ctx context.Context, chunkNumber int32, reader io.ReadSeeker) (currentChunkSize int64, err error) { + if chunkNumber < 0 { + err := fmt.Errorf("invalid chunk number provided: %v", chunkNumber) + return -1, err + } + + partNumber := chunkNumber + 1 + var res *s3.UploadPartOutput + err = w.f.pacer.Call(func() (bool, error) { + // Discover the size by seeking to the end + currentChunkSize, err = reader.Seek(0, io.SeekEnd) + if err != nil { + return false, err + } + // rewind the reader on retry and after reading md5 + _, err := reader.Seek(0, io.SeekStart) + if err != nil { + return false, err + } + res, err = w.client.UploadPart(ctx, &s3.UploadPartInput{ + Bucket: w.mOut.Bucket, + Key: w.mOut.Key, + UploadId: w.mOut.UploadId, + PartNumber: &partNumber, + Body: reader, + }) + if err != nil { + if chunkNumber <= 8 { + return w.shouldRetry(ctx, err) + } + // retry all chunks once have done the first few + return true, err + } + return false, nil + }) + if err != nil { + return -1, fmt.Errorf("failed to upload chunk %d with %v bytes: %w", partNumber, currentChunkSize, err) + } + + w.addCompletedPart(types.CompletedPart{ + PartNumber: &partNumber, + ETag: res.ETag, + }) + + fs.Debugf(w.o, "multipart upload: wrote chunk %d with %v bytes", partNumber, currentChunkSize) + return currentChunkSize, err +} + +// Abort the multipart upload +func (w *pikpakChunkWriter) Abort(ctx context.Context) (err error) { + // Abort the upload session + err = w.f.pacer.Call(func() (bool, error) { + _, err = w.client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ + Bucket: w.mOut.Bucket, + Key: w.mOut.Key, + UploadId: w.mOut.UploadId, + }) + return w.shouldRetry(ctx, err) + }) + if err != nil { + return fmt.Errorf("failed to abort multipart upload %q: %w", *w.mOut.UploadId, err) + } + fs.Debugf(w.o, "multipart upload: %q aborted", *w.mOut.UploadId) + return +} + +// Close and finalise the multipart upload +func (w *pikpakChunkWriter) Close(ctx context.Context) (err error) { + // sort the completed parts by part number + sort.Slice(w.completedParts, func(i, j int) bool { + return *w.completedParts[i].PartNumber < *w.completedParts[j].PartNumber + }) + // Finalise the upload session + err = w.f.pacer.Call(func() (bool, error) { + _, err = w.client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: w.mOut.Bucket, + Key: w.mOut.Key, + UploadId: w.mOut.UploadId, + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: w.completedParts, + }, + }) + return w.shouldRetry(ctx, err) + }) + if err != nil { + return fmt.Errorf("failed to complete multipart upload: %w", err) + } + fs.Debugf(w.o, "multipart upload: %q finished", *w.mOut.UploadId) + return +} diff --git a/backend/pikpak/pikpak.go b/backend/pikpak/pikpak.go index bda10993a..c1119a359 100644 --- a/backend/pikpak/pikpak.go +++ b/backend/pikpak/pikpak.go @@ -41,12 +41,10 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" - "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/rclone/rclone/backend/pikpak/api" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/accounting" - "github.com/rclone/rclone/fs/chunksize" "github.com/rclone/rclone/fs/config" "github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/config/configstruct" @@ -66,17 +64,22 @@ import ( // Constants const ( - clientID = "YUMx5nI8ZU8Ap8pm" - clientVersion = "2.0.0" - packageName = "mypikpak.com" - defaultUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:129.0) Gecko/20100101 Firefox/129.0" - minSleep = 100 * time.Millisecond - maxSleep = 2 * time.Second - taskWaitTime = 500 * time.Millisecond - decayConstant = 2 // bigger for slower decay, exponential - rootURL = "https://api-drive.mypikpak.com" - minChunkSize = fs.SizeSuffix(manager.MinUploadPartSize) - defaultUploadConcurrency = manager.DefaultUploadConcurrency + clientID = "YUMx5nI8ZU8Ap8pm" + clientVersion = "2.0.0" + packageName = "mypikpak.com" + defaultUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:129.0) Gecko/20100101 Firefox/129.0" + minSleep = 100 * time.Millisecond + maxSleep = 2 * time.Second + taskWaitTime = 500 * time.Millisecond + decayConstant = 2 // bigger for slower decay, exponential + rootURL = "https://api-drive.mypikpak.com" + + maxUploadParts = 10000 // Part number must be an integer between 1 and 10000, inclusive. + defaultChunkSize = fs.SizeSuffix(1024 * 1024 * 5) // Part size should be in [100KB, 5GB] + minChunkSize = 100 * fs.Kibi + maxChunkSize = 5 * fs.Gibi + defaultUploadCutoff = fs.SizeSuffix(200 * 1024 * 1024) + maxUploadCutoff = 5 * fs.Gibi // maximum allowed size for singlepart uploads ) // Globals @@ -223,6 +226,14 @@ Fill in for rclone to use a non root folder as its starting point. Help: "Files bigger than this will be cached on disk to calculate hash if required.", Default: fs.SizeSuffix(10 * 1024 * 1024), Advanced: true, + }, { + Name: "upload_cutoff", + Help: `Cutoff for switching to chunked upload. + +Any files larger than this will be uploaded in chunks of chunk_size. +The minimum is 0 and the maximum is 5 GiB.`, + Default: defaultUploadCutoff, + Advanced: true, }, { Name: "chunk_size", Help: `Chunk size for multipart uploads. @@ -241,7 +252,7 @@ large file of known size to stay below the 10,000 chunks limit. Increasing the chunk size decreases the accuracy of the progress statistics displayed with "-P" flag.`, - Default: minChunkSize, + Default: defaultChunkSize, Advanced: true, }, { Name: "upload_concurrency", @@ -257,7 +268,7 @@ in memory. If you are uploading small numbers of large files over high-speed links and these uploads do not fully utilize your bandwidth, then increasing this may help to speed up the transfers.`, - Default: defaultUploadConcurrency, + Default: 4, Advanced: true, }, { Name: config.ConfigEncoding, @@ -294,6 +305,7 @@ type Options struct { NoMediaLink bool `config:"no_media_link"` HashMemoryThreshold fs.SizeSuffix `config:"hash_memory_limit"` ChunkSize fs.SizeSuffix `config:"chunk_size"` + UploadCutoff fs.SizeSuffix `config:"upload_cutoff"` UploadConcurrency int `config:"upload_concurrency"` Enc encoder.MultiEncoder `config:"encoding"` } @@ -524,6 +536,39 @@ func (f *Fs) newClientWithPacer(ctx context.Context) (err error) { return nil } +func checkUploadChunkSize(cs fs.SizeSuffix) error { + if cs < minChunkSize { + return fmt.Errorf("%s is less than %s", cs, minChunkSize) + } + if cs > maxChunkSize { + return fmt.Errorf("%s is greater than %s", cs, maxChunkSize) + } + return nil +} + +func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) { + err = checkUploadChunkSize(cs) + if err == nil { + old, f.opt.ChunkSize = f.opt.ChunkSize, cs + } + return +} + +func checkUploadCutoff(cs fs.SizeSuffix) error { + if cs > maxUploadCutoff { + return fmt.Errorf("%s is greater than %s", cs, maxUploadCutoff) + } + return nil +} + +func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) { + err = checkUploadCutoff(cs) + if err == nil { + old, f.opt.UploadCutoff = f.opt.UploadCutoff, cs + } + return +} + // newFs partially constructs Fs from the path // // It constructs a valid Fs but doesn't attempt to figure out whether @@ -531,11 +576,17 @@ func (f *Fs) newClientWithPacer(ctx context.Context) (err error) { func newFs(ctx context.Context, name, path string, m configmap.Mapper) (*Fs, error) { // Parse config into Options struct opt := new(Options) - if err := configstruct.Set(m, opt); err != nil { + err := configstruct.Set(m, opt) + if err != nil { return nil, err } - if opt.ChunkSize < minChunkSize { - return nil, fmt.Errorf("chunk size must be at least %s", minChunkSize) + err = checkUploadChunkSize(opt.ChunkSize) + if err != nil { + return nil, fmt.Errorf("pikpak: chunk size: %w", err) + } + err = checkUploadCutoff(opt.UploadCutoff) + if err != nil { + return nil, fmt.Errorf("pikpak: upload cutoff: %w", err) } root := parsePath(path) @@ -1260,9 +1311,7 @@ func (f *Fs) uploadByForm(ctx context.Context, in io.Reader, name string, size i return } -func (f *Fs) uploadByResumable(ctx context.Context, in io.Reader, name string, size int64, resumable *api.Resumable) (err error) { - p := resumable.Params - +func (f *Fs) newS3Client(ctx context.Context, p *api.ResumableParams) (s3Client *s3.Client, err error) { // Create a credentials provider creds := credentials.NewStaticCredentialsProvider(p.AccessKeyID, p.AccessKeySecret, p.SecurityToken) @@ -1272,22 +1321,64 @@ func (f *Fs) uploadByResumable(ctx context.Context, in io.Reader, name string, s if err != nil { return } + ci := fs.GetConfig(ctx) + cfg.RetryMaxAttempts = ci.LowLevelRetries + cfg.HTTPClient = getClient(ctx, &f.opt) client := s3.NewFromConfig(cfg, func(o *s3.Options) { o.BaseEndpoint = aws.String("https://mypikpak.com/") + o.RequestChecksumCalculation = aws.RequestChecksumCalculationWhenRequired + o.ResponseChecksumValidation = aws.ResponseChecksumValidationWhenRequired }) - partSize := chunksize.Calculator(name, size, int(manager.MaxUploadParts), f.opt.ChunkSize) + return client, nil +} - // Create an uploader with custom options - uploader := manager.NewUploader(client, func(u *manager.Uploader) { - u.PartSize = int64(partSize) - u.Concurrency = f.opt.UploadConcurrency - }) - // Perform an upload - _, err = uploader.Upload(ctx, &s3.PutObjectInput{ +func (f *Fs) uploadByResumable(ctx context.Context, in io.Reader, name string, size int64, resumable *api.Resumable, options ...fs.OpenOption) (err error) { + p := resumable.Params + + if size < 0 || size >= int64(f.opt.UploadCutoff) { + mu, err := f.newChunkWriter(ctx, name, size, p, in, options...) + if err != nil { + return fmt.Errorf("multipart upload failed to initialise: %w", err) + } + return mu.Upload(ctx) + } + + // upload singlepart + client, err := f.newS3Client(ctx, p) + if err != nil { + return fmt.Errorf("failed to create upload client: %w", err) + } + req := &s3.PutObjectInput{ Bucket: &p.Bucket, Key: &p.Key, - Body: in, + Body: io.NopCloser(in), + } + // Apply upload options + for _, option := range options { + key, value := option.Header() + lowerKey := strings.ToLower(key) + switch lowerKey { + case "": + // ignore + case "cache-control": + req.CacheControl = aws.String(value) + case "content-disposition": + req.ContentDisposition = aws.String(value) + case "content-encoding": + req.ContentEncoding = aws.String(value) + case "content-type": + req.ContentType = aws.String(value) + } + } + var s3opts = []func(*s3.Options){} + // Can't retry single part uploads as only have an io.Reader + s3opts = append(s3opts, func(o *s3.Options) { + o.RetryMaxAttempts = 1 + }) + err = f.pacer.CallNoRetry(func() (bool, error) { + _, err = client.PutObject(ctx, req, s3opts...) + return f.shouldRetry(ctx, nil, err) }) return } @@ -1345,7 +1436,7 @@ func (f *Fs) upload(ctx context.Context, in io.Reader, leaf, dirID, gcid string, if uploadType == api.UploadTypeForm && new.Form != nil { err = f.uploadByForm(ctx, in, req.Name, size, new.Form, options...) } else if uploadType == api.UploadTypeResumable && new.Resumable != nil { - err = f.uploadByResumable(ctx, in, leaf, size, new.Resumable) + err = f.uploadByResumable(ctx, in, leaf, size, new.Resumable, options...) } else { err = fmt.Errorf("no method available for uploading: %+v", new) } diff --git a/backend/pikpak/pikpak_test.go b/backend/pikpak/pikpak_test.go index cbc97589e..ac9d6b8c3 100644 --- a/backend/pikpak/pikpak_test.go +++ b/backend/pikpak/pikpak_test.go @@ -1,10 +1,10 @@ // Test PikPak filesystem interface -package pikpak_test +package pikpak import ( "testing" - "github.com/rclone/rclone/backend/pikpak" + "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fstest/fstests" ) @@ -12,6 +12,23 @@ import ( func TestIntegration(t *testing.T) { fstests.Run(t, &fstests.Opt{ RemoteName: "TestPikPak:", - NilObject: (*pikpak.Object)(nil), + NilObject: (*Object)(nil), + ChunkedUpload: fstests.ChunkedUploadConfig{ + MinChunkSize: minChunkSize, + MaxChunkSize: maxChunkSize, + }, }) } + +func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) { + return f.setUploadChunkSize(cs) +} + +func (f *Fs) SetUploadCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) { + return f.setUploadCutoff(cs) +} + +var ( + _ fstests.SetUploadChunkSizer = (*Fs)(nil) + _ fstests.SetUploadCutoffer = (*Fs)(nil) +)