mirror of
https://github.com/rclone/rclone.git
synced 2024-11-29 20:04:52 +01:00
889 lines
29 KiB
Go
889 lines
29 KiB
Go
package hidrive
|
|
|
|
// This file is for helper-functions which may provide more general and
|
|
// specialized functionality than the generic interfaces.
|
|
// There are two sections:
|
|
// 1. methods bound to Fs
|
|
// 2. other functions independent from Fs used throughout the package
|
|
|
|
// NOTE: Functions accessing paths expect any relative paths
|
|
// to be resolved prior to execution with resolvePath(...).
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"net/http"
|
|
"path"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/rclone/rclone/backend/hidrive/api"
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/accounting"
|
|
"github.com/rclone/rclone/fs/fserrors"
|
|
"github.com/rclone/rclone/lib/ranges"
|
|
"github.com/rclone/rclone/lib/readers"
|
|
"github.com/rclone/rclone/lib/rest"
|
|
"golang.org/x/sync/errgroup"
|
|
"golang.org/x/sync/semaphore"
|
|
)
|
|
|
|
const (
|
|
// MaximumUploadBytes represents the maximum amount of bytes
|
|
// a single upload-operation will support.
|
|
MaximumUploadBytes = 2147483647 // = 2GiB - 1
|
|
// iterationChunkSize represents the chunk size used to iterate directory contents.
|
|
iterationChunkSize = 5000
|
|
)
|
|
|
|
var (
|
|
// retryErrorCodes is a slice of error codes that we will always retry.
|
|
retryErrorCodes = []int{
|
|
429, // Too Many Requests
|
|
500, // Internal Server Error
|
|
502, // Bad Gateway
|
|
503, // Service Unavailable
|
|
504, // Gateway Timeout
|
|
509, // Bandwidth Limit Exceeded
|
|
}
|
|
// ErrorFileExists is returned when a query tries to create a file
|
|
// that already exists.
|
|
ErrorFileExists = errors.New("destination file already exists")
|
|
)
|
|
|
|
// MemberType represents the possible types of entries a directory can contain.
|
|
type MemberType string
|
|
|
|
// possible values for MemberType
|
|
const (
|
|
AllMembers MemberType = "all"
|
|
NoMembers MemberType = "none"
|
|
DirectoryMembers MemberType = api.HiDriveObjectTypeDirectory
|
|
FileMembers MemberType = api.HiDriveObjectTypeFile
|
|
SymlinkMembers MemberType = api.HiDriveObjectTypeSymlink
|
|
)
|
|
|
|
// SortByField represents possible fields to sort entries of a directory by.
|
|
type SortByField string
|
|
|
|
// possible values for SortByField
|
|
const (
|
|
descendingSort string = "-"
|
|
SortByName SortByField = "name"
|
|
SortByModTime SortByField = "mtime"
|
|
SortByObjectType SortByField = "type"
|
|
SortBySize SortByField = "size"
|
|
SortByNameDescending SortByField = SortByField(descendingSort) + SortByName
|
|
SortByModTimeDescending SortByField = SortByField(descendingSort) + SortByModTime
|
|
SortByObjectTypeDescending SortByField = SortByField(descendingSort) + SortByObjectType
|
|
SortBySizeDescending SortByField = SortByField(descendingSort) + SortBySize
|
|
)
|
|
|
|
var (
|
|
// Unsorted disables sorting and can therefore not be combined with other values.
|
|
Unsorted = []SortByField{"none"}
|
|
// DefaultSorted does not specify how to sort and
|
|
// therefore implies the default sort order.
|
|
DefaultSorted = []SortByField{}
|
|
)
|
|
|
|
// CopyOrMoveOperationType represents the possible types of copy- and move-operations.
|
|
type CopyOrMoveOperationType int
|
|
|
|
// possible values for CopyOrMoveOperationType
|
|
const (
|
|
MoveOriginal CopyOrMoveOperationType = iota
|
|
CopyOriginal
|
|
CopyOriginalPreserveModTime
|
|
)
|
|
|
|
// OnExistAction represents possible actions the API should take,
|
|
// when a request tries to create a path that already exists.
|
|
type OnExistAction string
|
|
|
|
// possible values for OnExistAction
|
|
const (
|
|
// IgnoreOnExist instructs the API not to execute
|
|
// the request in case of a conflict, but to return an error.
|
|
IgnoreOnExist OnExistAction = "ignore"
|
|
// AutoNameOnExist instructs the API to automatically rename
|
|
// any conflicting request-objects.
|
|
AutoNameOnExist OnExistAction = "autoname"
|
|
// OverwriteOnExist instructs the API to overwrite any conflicting files.
|
|
// This can only be used, if the request operates on files directly.
|
|
// (For example when moving/copying a file.)
|
|
// For most requests this action will simply be ignored.
|
|
OverwriteOnExist OnExistAction = "overwrite"
|
|
)
|
|
|
|
// shouldRetry returns a boolean as to whether this resp and err deserve to be retried.
|
|
// It tries to expire/invalidate the token, if necessary.
|
|
// It returns the err as a convenience.
|
|
func (f *Fs) shouldRetry(ctx context.Context, resp *http.Response, err error) (bool, error) {
|
|
if fserrors.ContextError(ctx, &err) {
|
|
return false, err
|
|
}
|
|
if resp != nil && (resp.StatusCode == 401 || isHTTPError(err, 401)) && len(resp.Header["Www-Authenticate"]) > 0 {
|
|
fs.Debugf(f, "Token might be invalid: %v", err)
|
|
if f.tokenRenewer != nil {
|
|
iErr := f.tokenRenewer.Expire()
|
|
if iErr == nil {
|
|
return true, err
|
|
}
|
|
}
|
|
}
|
|
return fserrors.ShouldRetry(err) || fserrors.ShouldRetryHTTP(resp, retryErrorCodes), err
|
|
}
|
|
|
|
// resolvePath resolves the given (relative) path and
|
|
// returns a path suitable for API-calls.
|
|
// This will consider the root-path of the fs and any needed prefixes.
|
|
//
|
|
// Any relative paths passed to functions that access these paths should
|
|
// be resolved with this first!
|
|
func (f *Fs) resolvePath(objectPath string) string {
|
|
resolved := path.Join(f.opt.RootPrefix, f.root, f.opt.Enc.FromStandardPath(objectPath))
|
|
return resolved
|
|
}
|
|
|
|
// iterateOverDirectory calls the given function callback
|
|
// on each item found in a given directory.
|
|
//
|
|
// If callback ever returns true then this exits early with found = true.
|
|
func (f *Fs) iterateOverDirectory(ctx context.Context, directory string, searchOnly MemberType, callback func(*api.HiDriveObject) bool, fields []string, sortBy []SortByField) (found bool, err error) {
|
|
parameters := api.NewQueryParameters()
|
|
parameters.SetPath(directory)
|
|
parameters.AddFields("members.", fields...)
|
|
parameters.AddFields("", api.DirectoryContentFields...)
|
|
parameters.Set("members", string(searchOnly))
|
|
for _, v := range sortBy {
|
|
// The explicit conversion is necessary for each element.
|
|
parameters.AddList("sort", ",", string(v))
|
|
}
|
|
|
|
opts := rest.Opts{
|
|
Method: "GET",
|
|
Path: "/dir",
|
|
Parameters: parameters.Values,
|
|
}
|
|
|
|
iterateContent := func(result *api.DirectoryContent, err error) (bool, error) {
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
for _, item := range result.Entries {
|
|
item.Name = f.opt.Enc.ToStandardName(item.Name)
|
|
if callback(&item) {
|
|
return true, nil
|
|
}
|
|
}
|
|
return false, nil
|
|
}
|
|
return f.paginateDirectoryAccess(ctx, &opts, iterationChunkSize, 0, iterateContent)
|
|
}
|
|
|
|
// paginateDirectoryAccess executes requests specified via ctx and opts
|
|
// which should produce api.DirectoryContent.
|
|
// This will paginate the requests using limit starting at the given offset.
|
|
//
|
|
// The given function callback is called on each api.DirectoryContent found
|
|
// along with any errors that occurred.
|
|
// If callback ever returns true then this exits early with found = true.
|
|
// If callback ever returns an error then this exits early with that error.
|
|
func (f *Fs) paginateDirectoryAccess(ctx context.Context, opts *rest.Opts, limit int64, offset int64, callback func(*api.DirectoryContent, error) (bool, error)) (found bool, err error) {
|
|
for {
|
|
opts.Parameters.Set("limit", strconv.FormatInt(offset, 10)+","+strconv.FormatInt(limit, 10))
|
|
|
|
var result api.DirectoryContent
|
|
var resp *http.Response
|
|
err = f.pacer.Call(func() (bool, error) {
|
|
resp, err = f.srv.CallJSON(ctx, opts, nil, &result)
|
|
return f.shouldRetry(ctx, resp, err)
|
|
})
|
|
|
|
found, err = callback(&result, err)
|
|
if found || err != nil {
|
|
return found, err
|
|
}
|
|
|
|
offset += int64(len(result.Entries))
|
|
if offset >= result.TotalCount || limit > int64(len(result.Entries)) {
|
|
break
|
|
}
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
// fetchMetadataForPath reads the metadata from the path.
|
|
func (f *Fs) fetchMetadataForPath(ctx context.Context, path string, fields []string) (*api.HiDriveObject, error) {
|
|
parameters := api.NewQueryParameters()
|
|
parameters.SetPath(path)
|
|
parameters.AddFields("", fields...)
|
|
|
|
opts := rest.Opts{
|
|
Method: "GET",
|
|
Path: "/meta",
|
|
Parameters: parameters.Values,
|
|
}
|
|
|
|
var result api.HiDriveObject
|
|
var resp *http.Response
|
|
var err error
|
|
err = f.pacer.Call(func() (bool, error) {
|
|
resp, err = f.srv.CallJSON(ctx, &opts, nil, &result)
|
|
return f.shouldRetry(ctx, resp, err)
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &result, nil
|
|
}
|
|
|
|
// copyOrMove copies or moves a directory or file
|
|
// from the source-path to the destination-path.
|
|
//
|
|
// The operation will only be successful
|
|
// if the parent-directory of the destination-path exists.
|
|
//
|
|
// NOTE: Use the explicit methods instead of directly invoking this method.
|
|
// (Those are: copyDirectory, moveDirectory, copyFile, moveFile.)
|
|
func (f *Fs) copyOrMove(ctx context.Context, isDirectory bool, operationType CopyOrMoveOperationType, source string, destination string, onExist OnExistAction) (*api.HiDriveObject, error) {
|
|
parameters := api.NewQueryParameters()
|
|
parameters.Set("src", source)
|
|
parameters.Set("dst", destination)
|
|
if onExist == AutoNameOnExist ||
|
|
(onExist == OverwriteOnExist && !isDirectory) {
|
|
parameters.Set("on_exist", string(onExist))
|
|
}
|
|
|
|
endpoint := "/"
|
|
if isDirectory {
|
|
endpoint += "dir"
|
|
} else {
|
|
endpoint += "file"
|
|
}
|
|
switch operationType {
|
|
case MoveOriginal:
|
|
endpoint += "/move"
|
|
case CopyOriginalPreserveModTime:
|
|
parameters.Set("preserve_mtime", strconv.FormatBool(true))
|
|
fallthrough
|
|
case CopyOriginal:
|
|
endpoint += "/copy"
|
|
}
|
|
|
|
opts := rest.Opts{
|
|
Method: "POST",
|
|
Path: endpoint,
|
|
Parameters: parameters.Values,
|
|
}
|
|
|
|
var result api.HiDriveObject
|
|
var resp *http.Response
|
|
var err error
|
|
err = f.pacer.Call(func() (bool, error) {
|
|
resp, err = f.srv.CallJSON(ctx, &opts, nil, &result)
|
|
return f.shouldRetry(ctx, resp, err)
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &result, nil
|
|
}
|
|
|
|
// copyDirectory moves the directory at the source-path to the destination-path and
|
|
// returns the resulting api-object if successful.
|
|
//
|
|
// The operation will only be successful
|
|
// if the parent-directory of the destination-path exists.
|
|
func (f *Fs) copyDirectory(ctx context.Context, source string, destination string, onExist OnExistAction) (*api.HiDriveObject, error) {
|
|
return f.copyOrMove(ctx, true, CopyOriginalPreserveModTime, source, destination, onExist)
|
|
}
|
|
|
|
// moveDirectory moves the directory at the source-path to the destination-path and
|
|
// returns the resulting api-object if successful.
|
|
//
|
|
// The operation will only be successful
|
|
// if the parent-directory of the destination-path exists.
|
|
func (f *Fs) moveDirectory(ctx context.Context, source string, destination string, onExist OnExistAction) (*api.HiDriveObject, error) {
|
|
return f.copyOrMove(ctx, true, MoveOriginal, source, destination, onExist)
|
|
}
|
|
|
|
// copyFile copies the file at the source-path to the destination-path and
|
|
// returns the resulting api-object if successful.
|
|
//
|
|
// The operation will only be successful
|
|
// if the parent-directory of the destination-path exists.
|
|
//
|
|
// NOTE: This operation will expand sparse areas in the content of the source-file
|
|
// to blocks of 0-bytes in the destination-file.
|
|
func (f *Fs) copyFile(ctx context.Context, source string, destination string, onExist OnExistAction) (*api.HiDriveObject, error) {
|
|
return f.copyOrMove(ctx, false, CopyOriginalPreserveModTime, source, destination, onExist)
|
|
}
|
|
|
|
// moveFile moves the file at the source-path to the destination-path and
|
|
// returns the resulting api-object if successful.
|
|
//
|
|
// The operation will only be successful
|
|
// if the parent-directory of the destination-path exists.
|
|
//
|
|
// NOTE: This operation may expand sparse areas in the content of the source-file
|
|
// to blocks of 0-bytes in the destination-file.
|
|
func (f *Fs) moveFile(ctx context.Context, source string, destination string, onExist OnExistAction) (*api.HiDriveObject, error) {
|
|
return f.copyOrMove(ctx, false, MoveOriginal, source, destination, onExist)
|
|
}
|
|
|
|
// createDirectory creates the directory at the given path and
|
|
// returns the resulting api-object if successful.
|
|
//
|
|
// The directory will only be created if its parent-directory exists.
|
|
// This returns fs.ErrorDirNotFound if the parent-directory is not found.
|
|
// This returns fs.ErrorDirExists if the directory already exists.
|
|
func (f *Fs) createDirectory(ctx context.Context, directory string, onExist OnExistAction) (*api.HiDriveObject, error) {
|
|
parameters := api.NewQueryParameters()
|
|
parameters.SetPath(directory)
|
|
if onExist == AutoNameOnExist {
|
|
parameters.Set("on_exist", string(onExist))
|
|
}
|
|
|
|
opts := rest.Opts{
|
|
Method: "POST",
|
|
Path: "/dir",
|
|
Parameters: parameters.Values,
|
|
}
|
|
|
|
var result api.HiDriveObject
|
|
var resp *http.Response
|
|
var err error
|
|
err = f.pacer.Call(func() (bool, error) {
|
|
resp, err = f.srv.CallJSON(ctx, &opts, nil, &result)
|
|
return f.shouldRetry(ctx, resp, err)
|
|
})
|
|
|
|
switch {
|
|
case err == nil:
|
|
return &result, nil
|
|
case isHTTPError(err, 404):
|
|
return nil, fs.ErrorDirNotFound
|
|
case isHTTPError(err, 409):
|
|
return nil, fs.ErrorDirExists
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
// createDirectories creates the directory at the given path
|
|
// along with any missing parent directories and
|
|
// returns the resulting api-object (of the created directory) if successful.
|
|
//
|
|
// This returns fs.ErrorDirExists if the directory already exists.
|
|
//
|
|
// If an error occurs while the parent directories are being created,
|
|
// any directories already created will NOT be deleted again.
|
|
func (f *Fs) createDirectories(ctx context.Context, directory string, onExist OnExistAction) (*api.HiDriveObject, error) {
|
|
result, err := f.createDirectory(ctx, directory, onExist)
|
|
if err == nil {
|
|
return result, nil
|
|
}
|
|
if err != fs.ErrorDirNotFound {
|
|
return nil, err
|
|
}
|
|
parentDirectory := path.Dir(directory)
|
|
_, err = f.createDirectories(ctx, parentDirectory, onExist)
|
|
if err != nil && err != fs.ErrorDirExists {
|
|
return nil, err
|
|
}
|
|
// NOTE: Ignoring fs.ErrorDirExists does no harm,
|
|
// since it does not mean the child directory cannot be created.
|
|
return f.createDirectory(ctx, directory, onExist)
|
|
}
|
|
|
|
// deleteDirectory deletes the directory at the given path.
|
|
//
|
|
// If recursive is false, the directory will only be deleted if it is empty.
|
|
// If recursive is true, the directory will be deleted regardless of its content.
|
|
// This returns fs.ErrorDirNotFound if the directory is not found.
|
|
// This returns fs.ErrorDirectoryNotEmpty if the directory is not empty and
|
|
// recursive is false.
|
|
func (f *Fs) deleteDirectory(ctx context.Context, directory string, recursive bool) error {
|
|
parameters := api.NewQueryParameters()
|
|
parameters.SetPath(directory)
|
|
parameters.Set("recursive", strconv.FormatBool(recursive))
|
|
|
|
opts := rest.Opts{
|
|
Method: "DELETE",
|
|
Path: "/dir",
|
|
Parameters: parameters.Values,
|
|
NoResponse: true,
|
|
}
|
|
|
|
var resp *http.Response
|
|
var err error
|
|
err = f.pacer.Call(func() (bool, error) {
|
|
resp, err = f.srv.Call(ctx, &opts)
|
|
return f.shouldRetry(ctx, resp, err)
|
|
})
|
|
|
|
switch {
|
|
case isHTTPError(err, 404):
|
|
return fs.ErrorDirNotFound
|
|
case isHTTPError(err, 409):
|
|
return fs.ErrorDirectoryNotEmpty
|
|
}
|
|
return err
|
|
}
|
|
|
|
// deleteObject deletes the object/file at the given path.
|
|
//
|
|
// This returns fs.ErrorObjectNotFound if the object is not found.
|
|
func (f *Fs) deleteObject(ctx context.Context, path string) error {
|
|
parameters := api.NewQueryParameters()
|
|
parameters.SetPath(path)
|
|
|
|
opts := rest.Opts{
|
|
Method: "DELETE",
|
|
Path: "/file",
|
|
Parameters: parameters.Values,
|
|
NoResponse: true,
|
|
}
|
|
|
|
var resp *http.Response
|
|
var err error
|
|
err = f.pacer.Call(func() (bool, error) {
|
|
resp, err = f.srv.Call(ctx, &opts)
|
|
return f.shouldRetry(ctx, resp, err)
|
|
})
|
|
|
|
if isHTTPError(err, 404) {
|
|
return fs.ErrorObjectNotFound
|
|
}
|
|
return err
|
|
}
|
|
|
|
// createFile creates a file at the given path
|
|
// with the content of the io.ReadSeeker.
|
|
// This guarantees that existing files will not be overwritten.
|
|
// The maximum size of the content is limited by MaximumUploadBytes.
|
|
// The io.ReadSeeker should be resettable by seeking to its start.
|
|
// If modTime is not the zero time instant,
|
|
// it will be set as the file's modification time after the operation.
|
|
//
|
|
// This returns fs.ErrorDirNotFound
|
|
// if the parent directory of the file is not found.
|
|
// This returns ErrorFileExists if a file already exists at the specified path.
|
|
func (f *Fs) createFile(ctx context.Context, path string, content io.ReadSeeker, modTime time.Time, onExist OnExistAction) (*api.HiDriveObject, error) {
|
|
parameters := api.NewQueryParameters()
|
|
parameters.SetFileInDirectory(path)
|
|
if onExist == AutoNameOnExist {
|
|
parameters.Set("on_exist", string(onExist))
|
|
}
|
|
|
|
var err error
|
|
if !modTime.IsZero() {
|
|
err = parameters.SetTime("mtime", modTime)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
opts := rest.Opts{
|
|
Method: "POST",
|
|
Path: "/file",
|
|
Body: content,
|
|
ContentType: "application/octet-stream",
|
|
Parameters: parameters.Values,
|
|
}
|
|
|
|
var result api.HiDriveObject
|
|
var resp *http.Response
|
|
err = f.pacer.Call(func() (bool, error) {
|
|
// Reset the reading index (in case this is a retry).
|
|
if _, err = content.Seek(0, io.SeekStart); err != nil {
|
|
return false, err
|
|
}
|
|
resp, err = f.srv.CallJSON(ctx, &opts, nil, &result)
|
|
return f.shouldRetry(ctx, resp, err)
|
|
})
|
|
|
|
switch {
|
|
case err == nil:
|
|
return &result, nil
|
|
case isHTTPError(err, 404):
|
|
return nil, fs.ErrorDirNotFound
|
|
case isHTTPError(err, 409):
|
|
return nil, ErrorFileExists
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
// overwriteFile updates the content of the file at the given path
|
|
// with the content of the io.ReadSeeker.
|
|
// If the file does not exist it will be created.
|
|
// The maximum size of the content is limited by MaximumUploadBytes.
|
|
// The io.ReadSeeker should be resettable by seeking to its start.
|
|
// If modTime is not the zero time instant,
|
|
// it will be set as the file's modification time after the operation.
|
|
//
|
|
// This returns fs.ErrorDirNotFound
|
|
// if the parent directory of the file is not found.
|
|
func (f *Fs) overwriteFile(ctx context.Context, path string, content io.ReadSeeker, modTime time.Time) (*api.HiDriveObject, error) {
|
|
parameters := api.NewQueryParameters()
|
|
parameters.SetFileInDirectory(path)
|
|
|
|
var err error
|
|
if !modTime.IsZero() {
|
|
err = parameters.SetTime("mtime", modTime)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
opts := rest.Opts{
|
|
Method: "PUT",
|
|
Path: "/file",
|
|
Body: content,
|
|
ContentType: "application/octet-stream",
|
|
Parameters: parameters.Values,
|
|
}
|
|
|
|
var result api.HiDriveObject
|
|
var resp *http.Response
|
|
err = f.pacer.Call(func() (bool, error) {
|
|
// Reset the reading index (in case this is a retry).
|
|
if _, err = content.Seek(0, io.SeekStart); err != nil {
|
|
return false, err
|
|
}
|
|
resp, err = f.srv.CallJSON(ctx, &opts, nil, &result)
|
|
return f.shouldRetry(ctx, resp, err)
|
|
})
|
|
|
|
switch {
|
|
case err == nil:
|
|
return &result, nil
|
|
case isHTTPError(err, 404):
|
|
return nil, fs.ErrorDirNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
// uploadFileChunked updates the content of the existing file at the given path
|
|
// with the content of the io.Reader.
|
|
// Returns the position of the last successfully written byte, stopping before the first failed write.
|
|
// If nothing was written this will be 0.
|
|
// Returns the resulting api-object if successful.
|
|
//
|
|
// Replaces the file contents by uploading multiple chunks of the given size in parallel.
|
|
// Therefore this can and be used to upload files of any size efficiently.
|
|
// The number of parallel transfers is limited by transferLimit which should larger than 0.
|
|
// If modTime is not the zero time instant,
|
|
// it will be set as the file's modification time after the operation.
|
|
//
|
|
// NOTE: This method uses updateFileChunked and may create sparse files,
|
|
// if the upload of a chunk fails unexpectedly.
|
|
// See note about sparse files in patchFile.
|
|
// If any of the uploads fail, the process will be aborted and
|
|
// the first error that occurred will be returned.
|
|
// This is not an atomic operation,
|
|
// therefore if the upload fails the file may be partially modified.
|
|
//
|
|
// This returns fs.ErrorObjectNotFound if the object is not found.
|
|
func (f *Fs) uploadFileChunked(ctx context.Context, path string, content io.Reader, modTime time.Time, chunkSize int, transferLimit int64) (okSize uint64, info *api.HiDriveObject, err error) {
|
|
okSize, err = f.updateFileChunked(ctx, path, content, 0, chunkSize, transferLimit)
|
|
|
|
if err == nil {
|
|
info, err = f.resizeFile(ctx, path, okSize, modTime)
|
|
}
|
|
return okSize, info, err
|
|
}
|
|
|
|
// updateFileChunked updates the content of the existing file at the given path
|
|
// starting at the given offset.
|
|
// Returns the position of the last successfully written byte, stopping before the first failed write.
|
|
// If nothing was written this will be 0.
|
|
//
|
|
// Replaces the file contents starting from the given byte offset
|
|
// with the content of the io.Reader.
|
|
// If the offset is beyond the file end, the file is extended up to the offset.
|
|
//
|
|
// The upload is done multiple chunks of the given size in parallel.
|
|
// Therefore this can and be used to upload files of any size efficiently.
|
|
// The number of parallel transfers is limited by transferLimit which should larger than 0.
|
|
//
|
|
// NOTE: Because it is inefficient to set the modification time with every chunk,
|
|
// setting it to a specific value must be done in a separate request
|
|
// after this operation finishes.
|
|
//
|
|
// NOTE: This method uses patchFile and may create sparse files,
|
|
// especially if the upload of a chunk fails unexpectedly.
|
|
// See note about sparse files in patchFile.
|
|
// If any of the uploads fail, the process will be aborted and
|
|
// the first error that occurred will be returned.
|
|
// This is not an atomic operation,
|
|
// therefore if the upload fails the file may be partially modified.
|
|
//
|
|
// This returns fs.ErrorObjectNotFound if the object is not found.
|
|
func (f *Fs) updateFileChunked(ctx context.Context, path string, content io.Reader, offset uint64, chunkSize int, transferLimit int64) (okSize uint64, err error) {
|
|
var (
|
|
okChunksMu sync.Mutex // protects the variables below
|
|
okChunks []ranges.Range
|
|
)
|
|
g, gCtx := errgroup.WithContext(ctx)
|
|
transferSemaphore := semaphore.NewWeighted(transferLimit)
|
|
|
|
var readErr error
|
|
startMoreTransfers := true
|
|
zeroTime := time.Time{}
|
|
for chunk := uint64(0); startMoreTransfers; chunk++ {
|
|
// Acquire semaphore to limit number of transfers in parallel.
|
|
readErr = transferSemaphore.Acquire(gCtx, 1)
|
|
if readErr != nil {
|
|
break
|
|
}
|
|
|
|
// Read a chunk of data.
|
|
chunkReader, bytesRead, readErr := readerForChunk(content, chunkSize)
|
|
if bytesRead < chunkSize {
|
|
startMoreTransfers = false
|
|
}
|
|
if readErr != nil || bytesRead <= 0 {
|
|
break
|
|
}
|
|
|
|
// Transfer the chunk.
|
|
chunkOffset := uint64(chunkSize)*chunk + offset
|
|
g.Go(func() error {
|
|
// After this upload is done,
|
|
// signal that another transfer can be started.
|
|
defer transferSemaphore.Release(1)
|
|
uploadErr := f.patchFile(gCtx, path, cachedReader(chunkReader), chunkOffset, zeroTime)
|
|
if uploadErr == nil {
|
|
// Remember successfully written chunks.
|
|
okChunksMu.Lock()
|
|
okChunks = append(okChunks, ranges.Range{Pos: int64(chunkOffset), Size: int64(bytesRead)})
|
|
okChunksMu.Unlock()
|
|
fs.Debugf(f, "Done uploading chunk of size %v at offset %v.", bytesRead, chunkOffset)
|
|
} else {
|
|
fs.Infof(f, "Error while uploading chunk at offset %v. Error is %v.", chunkOffset, uploadErr)
|
|
}
|
|
return uploadErr
|
|
})
|
|
}
|
|
|
|
if readErr != nil {
|
|
// Log the error in case it is later ignored because of an upload-error.
|
|
fs.Infof(f, "Error while reading/preparing to upload a chunk. Error is %v.", readErr)
|
|
}
|
|
|
|
err = g.Wait()
|
|
|
|
// Compute the first continuous range of the file content,
|
|
// which does not contain any failed chunks.
|
|
// Do not forget to add the file content up to the starting offset,
|
|
// which is presumed to be already correct.
|
|
rs := ranges.Ranges{}
|
|
rs.Insert(ranges.Range{Pos: 0, Size: int64(offset)})
|
|
for _, chunkRange := range okChunks {
|
|
rs.Insert(chunkRange)
|
|
}
|
|
if len(rs) > 0 && rs[0].Pos == 0 {
|
|
okSize = uint64(rs[0].Size)
|
|
}
|
|
|
|
if err != nil {
|
|
return okSize, err
|
|
}
|
|
if readErr != nil {
|
|
return okSize, readErr
|
|
}
|
|
|
|
return okSize, nil
|
|
}
|
|
|
|
// patchFile updates the content of the existing file at the given path
|
|
// starting at the given offset.
|
|
//
|
|
// Replaces the file contents starting from the given byte offset
|
|
// with the content of the io.ReadSeeker.
|
|
// If the offset is beyond the file end, the file is extended up to the offset.
|
|
// The maximum size of the update is limited by MaximumUploadBytes.
|
|
// The io.ReadSeeker should be resettable by seeking to its start.
|
|
// If modTime is not the zero time instant,
|
|
// it will be set as the file's modification time after the operation.
|
|
//
|
|
// NOTE: By extending the file up to the offset this may create sparse files,
|
|
// which allocate less space on the file system than their apparent size indicates,
|
|
// since holes between data chunks are "real" holes
|
|
// and not regions made up of consecutive 0-bytes.
|
|
// Subsequent operations (such as copying data)
|
|
// usually expand the holes into regions of 0-bytes.
|
|
//
|
|
// This returns fs.ErrorObjectNotFound if the object is not found.
|
|
func (f *Fs) patchFile(ctx context.Context, path string, content io.ReadSeeker, offset uint64, modTime time.Time) error {
|
|
parameters := api.NewQueryParameters()
|
|
parameters.SetPath(path)
|
|
parameters.Set("offset", strconv.FormatUint(offset, 10))
|
|
|
|
if !modTime.IsZero() {
|
|
err := parameters.SetTime("mtime", modTime)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
opts := rest.Opts{
|
|
Method: "PATCH",
|
|
Path: "/file",
|
|
Body: content,
|
|
ContentType: "application/octet-stream",
|
|
Parameters: parameters.Values,
|
|
NoResponse: true,
|
|
}
|
|
|
|
var resp *http.Response
|
|
var err error
|
|
err = f.pacer.Call(func() (bool, error) {
|
|
// Reset the reading index (in case this is a retry).
|
|
_, err = content.Seek(0, io.SeekStart)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
resp, err = f.srv.Call(ctx, &opts)
|
|
if isHTTPError(err, 423) {
|
|
return true, err
|
|
}
|
|
return f.shouldRetry(ctx, resp, err)
|
|
})
|
|
|
|
if isHTTPError(err, 404) {
|
|
return fs.ErrorObjectNotFound
|
|
}
|
|
return err
|
|
}
|
|
|
|
// resizeFile updates the existing file at the given path to be of the given size
|
|
// and returns the resulting api-object if successful.
|
|
//
|
|
// If the given size is smaller than the current filesize,
|
|
// the file is cut/truncated at that position.
|
|
// If the given size is larger, the file is extended up to that position.
|
|
// If modTime is not the zero time instant,
|
|
// it will be set as the file's modification time after the operation.
|
|
//
|
|
// NOTE: By extending the file this may create sparse files,
|
|
// which allocate less space on the file system than their apparent size indicates,
|
|
// since holes between data chunks are "real" holes
|
|
// and not regions made up of consecutive 0-bytes.
|
|
// Subsequent operations (such as copying data)
|
|
// usually expand the holes into regions of 0-bytes.
|
|
//
|
|
// This returns fs.ErrorObjectNotFound if the object is not found.
|
|
func (f *Fs) resizeFile(ctx context.Context, path string, size uint64, modTime time.Time) (*api.HiDriveObject, error) {
|
|
parameters := api.NewQueryParameters()
|
|
parameters.SetPath(path)
|
|
parameters.Set("size", strconv.FormatUint(size, 10))
|
|
|
|
if !modTime.IsZero() {
|
|
err := parameters.SetTime("mtime", modTime)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
opts := rest.Opts{
|
|
Method: "POST",
|
|
Path: "/file/truncate",
|
|
Parameters: parameters.Values,
|
|
}
|
|
|
|
var result api.HiDriveObject
|
|
var resp *http.Response
|
|
var err error
|
|
err = f.pacer.Call(func() (bool, error) {
|
|
resp, err = f.srv.CallJSON(ctx, &opts, nil, &result)
|
|
return f.shouldRetry(ctx, resp, err)
|
|
})
|
|
|
|
switch {
|
|
case err == nil:
|
|
return &result, nil
|
|
case isHTTPError(err, 404):
|
|
return nil, fs.ErrorObjectNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
// ------------------------------------------------------------
|
|
|
|
// isHTTPError compares the numerical status code
|
|
// of an api.Error to the given HTTP status.
|
|
//
|
|
// If the given error is not an api.Error or
|
|
// a numerical status code could not be determined, this returns false.
|
|
// Otherwise this returns whether the status code of the error is equal to the given status.
|
|
func isHTTPError(err error, status int64) bool {
|
|
if apiErr, ok := err.(*api.Error); ok {
|
|
errStatus, decodeErr := apiErr.Code.Int64()
|
|
if decodeErr == nil && errStatus == status {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// createHiDriveScopes creates oauth-scopes
|
|
// from the given user-role and access-permissions.
|
|
//
|
|
// If the arguments are empty, they will not be included in the result.
|
|
func createHiDriveScopes(role string, access string) []string {
|
|
switch {
|
|
case role != "" && access != "":
|
|
return []string{access + "," + role}
|
|
case role != "":
|
|
return []string{role}
|
|
case access != "":
|
|
return []string{access}
|
|
}
|
|
return []string{}
|
|
}
|
|
|
|
// cachedReader returns a version of the reader that caches its contents and
|
|
// can therefore be reset using Seek.
|
|
func cachedReader(reader io.Reader) io.ReadSeeker {
|
|
bytesReader, ok := reader.(*bytes.Reader)
|
|
if ok {
|
|
return bytesReader
|
|
}
|
|
|
|
repeatableReader, ok := reader.(*readers.RepeatableReader)
|
|
if ok {
|
|
return repeatableReader
|
|
}
|
|
|
|
return readers.NewRepeatableReader(reader)
|
|
}
|
|
|
|
// readerForChunk reads a chunk of bytes from reader (after handling any accounting).
|
|
// Returns a new io.Reader (chunkReader) for that chunk
|
|
// and the number of bytes that have been read from reader.
|
|
func readerForChunk(reader io.Reader, length int) (chunkReader io.Reader, bytesRead int, err error) {
|
|
// Unwrap any accounting from the input if present.
|
|
reader, wrap := accounting.UnWrap(reader)
|
|
|
|
// Read a chunk of data.
|
|
buffer := make([]byte, length)
|
|
bytesRead, err = io.ReadFull(reader, buffer)
|
|
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
|
err = nil
|
|
}
|
|
if err != nil {
|
|
return nil, bytesRead, err
|
|
}
|
|
// Truncate unused capacity.
|
|
buffer = buffer[:bytesRead]
|
|
|
|
// Use wrap to put any accounting back for chunkReader.
|
|
return wrap(bytes.NewReader(buffer)), bytesRead, nil
|
|
}
|