diff --git a/backend/zoho/api/types.go b/backend/zoho/api/types.go index ed6751a44..4e5433701 100644 --- a/backend/zoho/api/types.go +++ b/backend/zoho/api/types.go @@ -180,11 +180,38 @@ func (ui *UploadInfo) GetUploadFileInfo() (*UploadFileInfo, error) { return &ufi, nil } +// LargeUploadInfo is once again a slightly different version of UploadInfo +// returned as part of an LargeUploadResponse by the large file upload API. +type LargeUploadInfo struct { + Attributes struct { + ParentID string `json:"parent_id"` + FileName string `json:"file_name"` + RessourceID string `json:"resource_id"` + FileInfo string `json:"file_info"` + } `json:"attributes"` +} + +// GetUploadFileInfo decodes the embedded FileInfo +func (ui *LargeUploadInfo) GetUploadFileInfo() (*UploadFileInfo, error) { + var ufi UploadFileInfo + err := json.Unmarshal([]byte(ui.Attributes.FileInfo), &ufi) + if err != nil { + return nil, fmt.Errorf("failed to decode FileInfo: %w", err) + } + return &ufi, nil +} + // UploadResponse is the response to a file Upload type UploadResponse struct { Uploads []UploadInfo `json:"data"` } +// LargeUploadResponse is the response returned by large file upload API. +type LargeUploadResponse struct { + Uploads []LargeUploadInfo `json:"data"` + Status string `json:"status"` +} + // WriteMetadataRequest is used to write metadata for a // single item type WriteMetadataRequest struct { diff --git a/backend/zoho/zoho.go b/backend/zoho/zoho.go index dad316b49..d6f79a1f3 100644 --- a/backend/zoho/zoho.go +++ b/backend/zoho/zoho.go @@ -14,6 +14,7 @@ import ( "strings" "time" + "github.com/google/uuid" "github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/random" @@ -63,6 +64,7 @@ var ( } rootURL = "https://workdrive.zoho.eu/api/v1" downloadURL = "https://download.zoho.eu/v1/workdrive" + uploadURL = "http://upload.zoho.eu/workdrive-api/v1/" accountsURL = "https://accounts.zoho.eu" ) @@ -202,14 +204,15 @@ type Options struct { // Fs represents a remote workdrive type Fs struct { - name string // name of this remote - root string // the path we are working on - opt Options // parsed options - features *fs.Features // optional features - srv *rest.Client // the connection to the server - downloadsrv *rest.Client // the connection to the Download server - dirCache *dircache.DirCache // Map of directory path to directory id - pacer *fs.Pacer // pacer for API calls + name string // name of this remote + root string // the path we are working on + opt Options // parsed options + features *fs.Features // optional features + srv *rest.Client // the connection to the server + downloadsrv *rest.Client // the connection to the download server + uploadsrv *rest.Client // the connection to the upload server + dirCache *dircache.DirCache // Map of directory path to directory id + pacer *fs.Pacer // pacer for API calls } // Object describes a Zoho WorkDrive object @@ -232,7 +235,8 @@ func setupRegion(m configmap.Mapper) error { return errors.New("no region set") } rootURL = fmt.Sprintf("https://workdrive.zoho.%s/api/v1", region) - downloadURL = fmt.Sprintf("https://download.zoho.%s/v1/workdrive",region) + downloadURL = fmt.Sprintf("https://download.zoho.%s/v1/workdrive", region) + uploadURL = fmt.Sprintf("https://upload.zoho.%s/workdrive-api/v1", region) accountsURL = fmt.Sprintf("https://accounts.zoho.%s", region) oauthConfig.Endpoint.AuthURL = fmt.Sprintf("https://accounts.zoho.%s/oauth/v2/auth", region) oauthConfig.Endpoint.TokenURL = fmt.Sprintf("https://accounts.zoho.%s/oauth/v2/token", region) @@ -405,12 +409,13 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e } f := &Fs{ - name: name, - root: root, - opt: *opt, - srv: rest.NewClient(oAuthClient).SetRoot(rootURL), - downloadsrv: rest.NewClient(oAuthClient).SetRoot(downloadURL), - pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), + name: name, + root: root, + opt: *opt, + srv: rest.NewClient(oAuthClient).SetRoot(rootURL), + downloadsrv: rest.NewClient(oAuthClient).SetRoot(downloadURL), + uploadsrv: rest.NewClient(oAuthClient).SetRoot(uploadURL), + pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), } f.features = (&fs.Features{ CanHaveEmptyDirectories: true, @@ -648,9 +653,61 @@ func (f *Fs) createObject(ctx context.Context, remote string, size int64, modTim return } +func (f *Fs) uploadLargeFile(ctx context.Context, name string, parent string, size int64, in io.Reader, options ...fs.OpenOption) (*api.Item, error) { + opts := rest.Opts{ + Method: "POST", + Path: "/stream/upload", + Body: in, + ContentLength: &size, + ContentType: "application/octet-stream", + Options: options, + ExtraHeaders: map[string]string{ + "x-filename": url.QueryEscape(name), + "x-parent_id": parent, + "override-name-exist": "true", + "upload-id": uuid.New().String(), + "x-streammode": "1", + }, + } + + var err error + var resp *http.Response + var uploadResponse *api.LargeUploadResponse + err = f.pacer.CallNoRetry(func() (bool, error) { + resp, err = f.uploadsrv.CallJSON(ctx, &opts, nil, &uploadResponse) + return shouldRetry(ctx, resp, err) + }) + if err != nil { + return nil, fmt.Errorf("upload large error: %v", err) + } + if len(uploadResponse.Uploads) != 1 { + return nil, errors.New("upload: invalid response") + } + upload := uploadResponse.Uploads[0] + uploadInfo, err := upload.GetUploadFileInfo() + if err != nil { + return nil, fmt.Errorf("upload error: %w", err) + } + + // Fill in the api.Item from the api.UploadFileInfo + var info api.Item + info.ID = upload.Attributes.RessourceID + info.Attributes.Name = upload.Attributes.FileName + // info.Attributes.Type = not used + info.Attributes.IsFolder = false + // info.Attributes.CreatedTime = not used + info.Attributes.ModifiedTime = uploadInfo.GetModTime() + // info.Attributes.UploadedTime = 0 not used + info.Attributes.StorageInfo.Size = uploadInfo.Size + info.Attributes.StorageInfo.FileCount = 0 + info.Attributes.StorageInfo.FolderCount = 0 + + return &info, nil +} + func (f *Fs) upload(ctx context.Context, name string, parent string, size int64, in io.Reader, options ...fs.OpenOption) (*api.Item, error) { params := url.Values{} - params.Set("filename", name) + params.Set("filename", url.QueryEscape(name)) params.Set("parent_id", parent) params.Set("override-name-exist", strconv.FormatBool(true)) formReader, contentType, overhead, err := rest.MultipartUpload(ctx, in, nil, "content", name) @@ -710,21 +767,40 @@ func (f *Fs) upload(ctx context.Context, name string, parent string, size int64, // // The new object may have been created if an error is returned func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { - size := src.Size() - remote := src.Remote() + existingObj, err := f.NewObject(ctx, src.Remote()) + switch err { + case nil: + return existingObj, existingObj.Update(ctx, in, src, options...) + case fs.ErrorObjectNotFound: + size := src.Size() + remote := src.Remote() - // Create the directory for the object if it doesn't exist - leaf, directoryID, err := f.dirCache.FindPath(ctx, remote, true) - if err != nil { + // Create the directory for the object if it doesn't exist + leaf, directoryID, err := f.dirCache.FindPath(ctx, remote, true) + if err != nil { + return nil, err + } + + // use normal upload API for small sizes (<10MiB) + if size < 10*1024*1024 { + info, err := f.upload(ctx, f.opt.Enc.FromStandardName(leaf), directoryID, size, in, options...) + if err != nil { + return nil, err + } + + return f.newObjectWithInfo(ctx, remote, info) + } + + // large file API otherwise + info, err := f.uploadLargeFile(ctx, f.opt.Enc.FromStandardName(leaf), directoryID, size, in, options...) + if err != nil { + return nil, err + } + + return f.newObjectWithInfo(ctx, remote, info) + default: return nil, err } - - // Upload the file - info, err := f.upload(ctx, f.opt.Enc.FromStandardName(leaf), directoryID, size, in, options...) - if err != nil { - return nil, err - } - return f.newObjectWithInfo(ctx, remote, info) } // Mkdir creates the container if it doesn't exist @@ -1188,11 +1264,22 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op return err } - // Overwrite the old file - info, err := o.fs.upload(ctx, o.fs.opt.Enc.FromStandardName(leaf), directoryID, size, in, options...) + // use normal upload API for small sizes (<10MiB) + if size < 10*1024*1024 { + info, err := o.fs.upload(ctx, o.fs.opt.Enc.FromStandardName(leaf), directoryID, size, in, options...) + if err != nil { + return err + } + + return o.setMetaData(info) + } + + // large file API otherwise + info, err := o.fs.uploadLargeFile(ctx, o.fs.opt.Enc.FromStandardName(leaf), directoryID, size, in, options...) if err != nil { return err } + return o.setMetaData(info) } diff --git a/backend/zoho/zoho_test.go b/backend/zoho/zoho_test.go index 1314440fa..75eb4a95b 100644 --- a/backend/zoho/zoho_test.go +++ b/backend/zoho/zoho_test.go @@ -11,7 +11,8 @@ import ( // TestIntegration runs integration tests against the remote func TestIntegration(t *testing.T) { fstests.Run(t, &fstests.Opt{ - RemoteName: "TestZoho:", - NilObject: (*zoho.Object)(nil), + RemoteName: "TestZoho:", + SkipInvalidUTF8: true, + NilObject: (*zoho.Object)(nil), }) }