From 471531eb6a58894ac1e302a064d4ff136634d295 Mon Sep 17 00:00:00 2001 From: wiserain Date: Wed, 17 Jul 2024 12:20:09 +0900 Subject: [PATCH] pikpak: optimize upload by pre-fetching gcid from API This commit optimizes the PikPak upload process by pre-fetching the Global Content Identifier (gcid) from the API server before calculating it locally. Previously, a gcid required for uploads was calculated locally. This process was resource-intensive and time-consuming. By first checking for a cached gcid on the server, we can potentially avoid the local calculation entirely. This significantly improves upload speed especially for large files. --- backend/pikpak/helper.go | 98 ++++++++++++++++++++++++++++++++++++++++ backend/pikpak/pikpak.go | 52 +++++++++++---------- 2 files changed, 127 insertions(+), 23 deletions(-) diff --git a/backend/pikpak/helper.go b/backend/pikpak/helper.go index 24230312b..b296900a4 100644 --- a/backend/pikpak/helper.go +++ b/backend/pikpak/helper.go @@ -8,13 +8,16 @@ import ( "errors" "fmt" "io" + "math/rand" "net/http" "net/url" "os" "strconv" + "strings" "time" "github.com/rclone/rclone/backend/pikpak/api" + "github.com/rclone/rclone/fs" "github.com/rclone/rclone/lib/rest" ) @@ -253,6 +256,37 @@ func (f *Fs) requestShare(ctx context.Context, req *api.RequestShare) (info *api return } +// getGcid retrieves Gcid cached in API server +func (f *Fs) getGcid(ctx context.Context, src fs.ObjectInfo) (gcid string, err error) { + cid, err := calcCid(ctx, src) + if err != nil { + return + } + + params := url.Values{} + params.Set("cid", cid) + params.Set("file_size", strconv.FormatInt(src.Size(), 10)) + opts := rest.Opts{ + Method: "GET", + Path: "/drive/v1/resource/cid", + Parameters: params, + ExtraHeaders: map[string]string{"x-device-id": f.deviceID}, + } + + info := struct { + Gcid string `json:"gcid,omitempty"` + }{} + var resp *http.Response + err = f.pacer.Call(func() (bool, error) { + resp, err = f.rst.CallJSON(ctx, &opts, nil, &info) + return f.shouldRetry(ctx, resp, err) + }) + if err != nil { + return "", err + } + return info.Gcid, nil +} + // Read the gcid of in returning a reader which will read the same contents // // The cleanup function should be called when out is finished with @@ -306,6 +340,9 @@ func readGcid(in io.Reader, size, threshold int64) (gcid string, out io.Reader, return } +// calcGcid calculates Gcid from reader +// +// Gcid is a custom hash to index a file contents func calcGcid(r io.Reader, size int64) (string, error) { calcBlockSize := func(j int64) int64 { var psize int64 = 0x40000 @@ -330,3 +367,64 @@ func calcGcid(r io.Reader, size int64) (string, error) { } return hex.EncodeToString(totalHash.Sum(nil)), nil } + +// calcCid calculates Cid from source +// +// Cid is a simplified version of Gcid +func calcCid(ctx context.Context, src fs.ObjectInfo) (cid string, err error) { + srcObj := fs.UnWrapObjectInfo(src) + if srcObj == nil { + return "", fmt.Errorf("failed to unwrap object from src: %s", src) + } + + size := src.Size() + hash := sha1.New() + var rc io.ReadCloser + + readHash := func(start, length int64) (err error) { + end := start + length - 1 + if rc, err = srcObj.Open(ctx, &fs.RangeOption{Start: start, End: end}); err != nil { + return fmt.Errorf("failed to open src with range (%d, %d): %w", start, end, err) + } + defer fs.CheckClose(rc, &err) + _, err = io.Copy(hash, rc) + return err + } + + if size <= 0xF000 { // 61440 = 60KB + err = readHash(0, size) + } else { // 20KB from three different parts + for _, start := range []int64{0, size / 3, size - 0x5000} { + err = readHash(start, 0x5000) + if err != nil { + break + } + } + } + if err != nil { + return "", fmt.Errorf("failed to hash: %w", err) + } + cid = strings.ToUpper(hex.EncodeToString(hash.Sum(nil))) + return +} + +// randomly generates device id used for request header 'x-device-id' +// +// original javascript implementation +// +// return "xxxxxxxxxxxx4xxxyxxxxxxxxxxxxxxx".replace(/[xy]/g, (e) => { +// const t = (16 * Math.random()) | 0; +// return ("x" == e ? t : (3 & t) | 8).toString(16); +// }); +func genDeviceID() string { + base := []byte("xxxxxxxxxxxx4xxxyxxxxxxxxxxxxxxx") + for i, char := range base { + switch char { + case 'x': + base[i] = fmt.Sprintf("%x", rand.Intn(16))[0] + case 'y': + base[i] = fmt.Sprintf("%x", rand.Intn(16)&3|8)[0] + } + } + return string(base) +} diff --git a/backend/pikpak/pikpak.go b/backend/pikpak/pikpak.go index 81645c13c..3d666867f 100644 --- a/backend/pikpak/pikpak.go +++ b/backend/pikpak/pikpak.go @@ -274,6 +274,7 @@ type Fs struct { dirCache *dircache.DirCache // Map of directory path to directory id pacer *fs.Pacer // pacer for API calls rootFolderID string // the id of the root folder + deviceID string // device id used for api requests client *http.Client // authorized client m configmap.Mapper tokenMu *sync.Mutex // when renewing tokens @@ -489,6 +490,7 @@ func newFs(ctx context.Context, name, path string, m configmap.Mapper) (*Fs, err CanHaveEmptyDirectories: true, // can have empty directories NoMultiThreading: true, // can't have multiple threads downloading }).Fill(ctx, f) + f.deviceID = genDeviceID() if err := f.newClientWithPacer(ctx); err != nil { return nil, err @@ -1694,32 +1696,36 @@ func (o *Object) upload(ctx context.Context, in io.Reader, src fs.ObjectInfo, wi } // Calculate gcid; grabbed from package jottacloud - var gcid string - if srcObj := fs.UnWrapObjectInfo(src); srcObj != nil && srcObj.Fs().Features().IsLocal { - // No buffering; directly calculate gcid from source - rc, err := srcObj.Open(ctx) - if err != nil { - return fmt.Errorf("failed to open src: %w", err) - } - defer fs.CheckClose(rc, &err) + gcid, err := o.fs.getGcid(ctx, src) + if err != nil || gcid == "" { + fs.Debugf(o, "calculating gcid: %v", err) + if srcObj := fs.UnWrapObjectInfo(src); srcObj != nil && srcObj.Fs().Features().IsLocal { + // No buffering; directly calculate gcid from source + rc, err := srcObj.Open(ctx) + if err != nil { + return fmt.Errorf("failed to open src: %w", err) + } + defer fs.CheckClose(rc, &err) - if gcid, err = calcGcid(rc, srcObj.Size()); err != nil { - return fmt.Errorf("failed to calculate gcid: %w", err) + if gcid, err = calcGcid(rc, srcObj.Size()); err != nil { + return fmt.Errorf("failed to calculate gcid: %w", err) + } + } else { + // unwrap the accounting from the input, we use wrap to put it + // back on after the buffering + var wrap accounting.WrapFn + in, wrap = accounting.UnWrap(in) + var cleanup func() + gcid, in, cleanup, err = readGcid(in, size, int64(o.fs.opt.HashMemoryThreshold)) + defer cleanup() + if err != nil { + return fmt.Errorf("failed to calculate gcid: %w", err) + } + // Wrap the accounting back onto the stream + in = wrap(in) } - } else { - // unwrap the accounting from the input, we use wrap to put it - // back on after the buffering - var wrap accounting.WrapFn - in, wrap = accounting.UnWrap(in) - var cleanup func() - gcid, in, cleanup, err = readGcid(in, size, int64(o.fs.opt.HashMemoryThreshold)) - defer cleanup() - if err != nil { - return fmt.Errorf("failed to calculate gcid: %w", err) - } - // Wrap the accounting back onto the stream - in = wrap(in) } + fs.Debugf(o, "gcid = %s", gcid) if !withTemp { info, err := o.fs.upload(ctx, in, leaf, dirID, gcid, size, options...)