rclone/backend/putio/fs.go
Nick Craig-Wood 2121c0fa23 dircache: factor DirMove code out of backends into dircache
Before this change there was lots of duplicated code in all the
dircache using backends to support DirMove.

This change factors this code into the dircache library.
2020-06-25 09:41:36 +01:00

696 lines
21 KiB
Go

package putio
import (
"bytes"
"context"
"encoding/base64"
"fmt"
"io"
"net/http"
"net/url"
"path"
"strconv"
"strings"
"time"
"github.com/pkg/errors"
"github.com/putdotio/go-putio/putio"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/config/configstruct"
"github.com/rclone/rclone/fs/fshttp"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/dircache"
"github.com/rclone/rclone/lib/oauthutil"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/readers"
)
// Fs represents a remote Putio server
type Fs struct {
name string // name of this remote
root string // the path we are working on
features *fs.Features // optional features
opt Options // options for this Fs
client *putio.Client // client for making API calls to Put.io
pacer *fs.Pacer // To pace the API calls
dirCache *dircache.DirCache // Map of directory path to directory id
httpClient *http.Client // base http client
oAuthClient *http.Client // http client with oauth Authorization
}
// ------------------------------------------------------------
// Name of the remote (as passed into NewFs)
func (f *Fs) Name() string {
return f.name
}
// Root of the remote (as passed into NewFs)
func (f *Fs) Root() string {
return f.root
}
// String converts this Fs to a string
func (f *Fs) String() string {
return fmt.Sprintf("Putio root '%s'", f.root)
}
// Features returns the optional features of this Fs
func (f *Fs) Features() *fs.Features {
return f.features
}
// parsePath parses a putio 'url'
func parsePath(path string) (root string) {
root = strings.Trim(path, "/")
return
}
// NewFs constructs an Fs from the path, container:path
func NewFs(name, root string, m configmap.Mapper) (f fs.Fs, err error) {
// defer log.Trace(name, "root=%v", root)("f=%+v, err=%v", &f, &err)
// Parse config into Options struct
opt := new(Options)
err = configstruct.Set(m, opt)
if err != nil {
return nil, err
}
root = parsePath(root)
httpClient := fshttp.NewClient(fs.Config)
oAuthClient, _, err := oauthutil.NewClientWithBaseClient(name, m, putioConfig, httpClient)
if err != nil {
return nil, errors.Wrap(err, "failed to configure putio")
}
p := &Fs{
name: name,
root: root,
opt: *opt,
pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
client: putio.NewClient(oAuthClient),
httpClient: httpClient,
oAuthClient: oAuthClient,
}
p.features = (&fs.Features{
DuplicateFiles: true,
ReadMimeType: true,
CanHaveEmptyDirectories: true,
}).Fill(p)
p.dirCache = dircache.New(root, "0", p)
ctx := context.Background()
// Find the current root
err = p.dirCache.FindRoot(ctx, false)
if err != nil {
// Assume it is a file
newRoot, remote := dircache.SplitPath(root)
tempF := *p
tempF.dirCache = dircache.New(newRoot, "0", &tempF)
tempF.root = newRoot
// Make new Fs which is the parent
err = tempF.dirCache.FindRoot(ctx, false)
if err != nil {
// No root so return old f
return p, nil
}
_, err := tempF.NewObject(ctx, remote)
if err != nil {
// unable to list folder so return old f
return p, nil
}
// XXX: update the old f here instead of returning tempF, since
// `features` were already filled with functions having *f as a receiver.
// See https://github.com/rclone/rclone/issues/2182
p.dirCache = tempF.dirCache
p.root = tempF.root
return p, fs.ErrorIsFile
}
// fs.Debugf(p, "Root id: %s", p.dirCache.RootID())
return p, nil
}
func itoa(i int64) string {
return strconv.FormatInt(i, 10)
}
func atoi(a string) int64 {
i, err := strconv.ParseInt(a, 10, 64)
if err != nil {
panic(err)
}
return i
}
// CreateDir makes a directory with pathID as parent and name leaf
func (f *Fs) CreateDir(ctx context.Context, pathID, leaf string) (newID string, err error) {
// defer log.Trace(f, "pathID=%v, leaf=%v", pathID, leaf)("newID=%v, err=%v", newID, &err)
parentID := atoi(pathID)
var entry putio.File
err = f.pacer.Call(func() (bool, error) {
// fs.Debugf(f, "creating folder. part: %s, parentID: %d", leaf, parentID)
entry, err = f.client.Files.CreateFolder(ctx, f.opt.Enc.FromStandardName(leaf), parentID)
return shouldRetry(err)
})
return itoa(entry.ID), err
}
// FindLeaf finds a directory of name leaf in the folder with ID pathID
func (f *Fs) FindLeaf(ctx context.Context, pathID, leaf string) (pathIDOut string, found bool, err error) {
// defer log.Trace(f, "pathID=%v, leaf=%v", pathID, leaf)("pathIDOut=%v, found=%v, err=%v", pathIDOut, found, &err)
if pathID == "0" && leaf == "" {
// that's the root directory
return pathID, true, nil
}
fileID := atoi(pathID)
var children []putio.File
err = f.pacer.Call(func() (bool, error) {
// fs.Debugf(f, "listing file: %d", fileID)
children, _, err = f.client.Files.List(ctx, fileID)
return shouldRetry(err)
})
if err != nil {
if perr, ok := err.(*putio.ErrorResponse); ok && perr.Response.StatusCode == 404 {
err = nil
}
return
}
for _, child := range children {
if f.opt.Enc.ToStandardName(child.Name) == leaf {
found = true
pathIDOut = itoa(child.ID)
if !child.IsDir() {
err = fs.ErrorIsFile
}
return
}
}
return
}
// List the objects and directories in dir into entries. The
// entries can be returned in any order but should be for a
// complete directory.
//
// dir should be "" to list the root, and should not have
// trailing slashes.
//
// This should return ErrDirNotFound if the directory isn't
// found.
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
// defer log.Trace(f, "dir=%v", dir)("err=%v", &err)
directoryID, err := f.dirCache.FindDir(ctx, dir, false)
if err != nil {
return nil, err
}
parentID := atoi(directoryID)
var children []putio.File
err = f.pacer.Call(func() (bool, error) {
// fs.Debugf(f, "listing files inside List: %d", parentID)
children, _, err = f.client.Files.List(ctx, parentID)
return shouldRetry(err)
})
if err != nil {
return
}
for _, child := range children {
remote := path.Join(dir, f.opt.Enc.ToStandardName(child.Name))
// fs.Debugf(f, "child: %s", remote)
if child.IsDir() {
f.dirCache.Put(remote, itoa(child.ID))
d := fs.NewDir(remote, child.UpdatedAt.Time)
entries = append(entries, d)
} else {
o, err := f.newObjectWithInfo(ctx, remote, child)
if err != nil {
return nil, err
}
entries = append(entries, o)
}
}
return
}
// Put the object
//
// Copy the reader in to the new object which is returned
//
// The new object may have been created if an error is returned
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (o fs.Object, err error) {
// defer log.Trace(f, "src=%+v", src)("o=%+v, err=%v", &o, &err)
exisitingObj, err := f.NewObject(ctx, src.Remote())
switch err {
case nil:
return exisitingObj, exisitingObj.Update(ctx, in, src, options...)
case fs.ErrorObjectNotFound:
// Not found so create it
return f.PutUnchecked(ctx, in, src, options...)
default:
return nil, err
}
}
// PutUnchecked uploads the object
//
// This will create a duplicate if we upload a new file without
// checking to see if there is one already - use Put() for that.
func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (o fs.Object, err error) {
// defer log.Trace(f, "src=%+v", src)("o=%+v, err=%v", &o, &err)
size := src.Size()
remote := src.Remote()
leaf, directoryID, err := f.dirCache.FindPath(ctx, remote, true)
if err != nil {
return nil, err
}
loc, err := f.createUpload(ctx, leaf, size, directoryID, src.ModTime(ctx), options)
if err != nil {
return nil, err
}
fileID, err := f.sendUpload(ctx, loc, size, in)
if err != nil {
return nil, err
}
var entry putio.File
err = f.pacer.Call(func() (bool, error) {
// fs.Debugf(f, "getting file: %d", fileID)
entry, err = f.client.Files.Get(ctx, fileID)
return shouldRetry(err)
})
if err != nil {
return nil, err
}
return f.newObjectWithInfo(ctx, remote, entry)
}
func (f *Fs) createUpload(ctx context.Context, name string, size int64, parentID string, modTime time.Time, options []fs.OpenOption) (location string, err error) {
// defer log.Trace(f, "name=%v, size=%v, parentID=%v, modTime=%v", name, size, parentID, modTime.String())("location=%v, err=%v", location, &err)
err = f.pacer.Call(func() (bool, error) {
req, err := http.NewRequest("POST", "https://upload.put.io/files/", nil)
if err != nil {
return false, err
}
req = req.WithContext(ctx) // go1.13 can use NewRequestWithContext
req.Header.Set("tus-resumable", "1.0.0")
req.Header.Set("upload-length", strconv.FormatInt(size, 10))
b64name := base64.StdEncoding.EncodeToString([]byte(f.opt.Enc.FromStandardName(name)))
b64true := base64.StdEncoding.EncodeToString([]byte("true"))
b64parentID := base64.StdEncoding.EncodeToString([]byte(parentID))
b64modifiedAt := base64.StdEncoding.EncodeToString([]byte(modTime.Format(time.RFC3339)))
req.Header.Set("upload-metadata", fmt.Sprintf("name %s,no-torrent %s,parent_id %s,updated-at %s", b64name, b64true, b64parentID, b64modifiedAt))
fs.OpenOptionAddHTTPHeaders(req.Header, options)
resp, err := f.oAuthClient.Do(req)
retry, err := shouldRetry(err)
if retry {
return true, err
}
if err != nil {
return false, err
}
if resp.StatusCode != 201 {
return false, fmt.Errorf("unexpected status code from upload create: %d", resp.StatusCode)
}
location = resp.Header.Get("location")
if location == "" {
return false, errors.New("empty location header from upload create")
}
return false, nil
})
return
}
func (f *Fs) sendUpload(ctx context.Context, location string, size int64, in io.Reader) (fileID int64, err error) {
// defer log.Trace(f, "location=%v, size=%v", location, size)("fileID=%v, err=%v", &fileID, &err)
if size == 0 {
err = f.pacer.Call(func() (bool, error) {
fs.Debugf(f, "Sending zero length chunk")
_, fileID, err = f.transferChunk(ctx, location, 0, bytes.NewReader([]byte{}), 0)
return shouldRetry(err)
})
return
}
var clientOffset int64
var offsetMismatch bool
buf := make([]byte, defaultChunkSize)
for clientOffset < size {
chunkSize := size - clientOffset
if chunkSize >= int64(defaultChunkSize) {
chunkSize = int64(defaultChunkSize)
}
chunk := readers.NewRepeatableLimitReaderBuffer(in, buf, chunkSize)
chunkStart := clientOffset
reqSize := chunkSize
transferOffset := clientOffset
fs.Debugf(f, "chunkStart: %d, reqSize: %d", chunkStart, reqSize)
// Transfer the chunk
err = f.pacer.Call(func() (bool, error) {
if offsetMismatch {
// Get file offset and seek to the position
offset, err := f.getServerOffset(ctx, location)
if err != nil {
return shouldRetry(err)
}
sentBytes := offset - chunkStart
fs.Debugf(f, "sentBytes: %d", sentBytes)
_, err = chunk.Seek(sentBytes, io.SeekStart)
if err != nil {
return shouldRetry(err)
}
transferOffset = offset
reqSize = chunkSize - sentBytes
offsetMismatch = false
}
fs.Debugf(f, "Sending chunk. transferOffset: %d length: %d", transferOffset, reqSize)
var serverOffset int64
serverOffset, fileID, err = f.transferChunk(ctx, location, transferOffset, chunk, reqSize)
if cerr, ok := err.(*statusCodeError); ok && cerr.response.StatusCode == 409 {
offsetMismatch = true
return true, err
}
if serverOffset != (transferOffset + reqSize) {
offsetMismatch = true
return true, errors.New("connection broken")
}
return shouldRetry(err)
})
if err != nil {
return
}
clientOffset += chunkSize
}
return
}
func (f *Fs) getServerOffset(ctx context.Context, location string) (offset int64, err error) {
// defer log.Trace(f, "location=%v", location)("offset=%v, err=%v", &offset, &err)
req, err := f.makeUploadHeadRequest(ctx, location)
if err != nil {
return 0, err
}
resp, err := f.oAuthClient.Do(req)
if err != nil {
return 0, err
}
err = checkStatusCode(resp, 200)
if err != nil {
return 0, err
}
return strconv.ParseInt(resp.Header.Get("upload-offset"), 10, 64)
}
func (f *Fs) transferChunk(ctx context.Context, location string, start int64, chunk io.ReadSeeker, chunkSize int64) (serverOffset, fileID int64, err error) {
// defer log.Trace(f, "location=%v, start=%v, chunkSize=%v", location, start, chunkSize)("fileID=%v, err=%v", &fileID, &err)
req, err := f.makeUploadPatchRequest(ctx, location, chunk, start, chunkSize)
if err != nil {
return
}
resp, err := f.oAuthClient.Do(req)
if err != nil {
return
}
defer func() {
_ = resp.Body.Close()
}()
err = checkStatusCode(resp, 204)
if err != nil {
return
}
serverOffset, err = strconv.ParseInt(resp.Header.Get("upload-offset"), 10, 64)
if err != nil {
return
}
sfid := resp.Header.Get("putio-file-id")
if sfid != "" {
fileID, err = strconv.ParseInt(sfid, 10, 64)
if err != nil {
return
}
}
return
}
func (f *Fs) makeUploadHeadRequest(ctx context.Context, location string) (*http.Request, error) {
req, err := http.NewRequest("HEAD", location, nil)
if err != nil {
return nil, err
}
req = req.WithContext(ctx) // go1.13 can use NewRequestWithContext
req.Header.Set("tus-resumable", "1.0.0")
return req, nil
}
func (f *Fs) makeUploadPatchRequest(ctx context.Context, location string, in io.Reader, offset, length int64) (*http.Request, error) {
req, err := http.NewRequest("PATCH", location, in)
if err != nil {
return nil, err
}
req = req.WithContext(ctx) // go1.13 can use NewRequestWithContext
req.Header.Set("tus-resumable", "1.0.0")
req.Header.Set("upload-offset", strconv.FormatInt(offset, 10))
req.Header.Set("content-length", strconv.FormatInt(length, 10))
req.Header.Set("content-type", "application/offset+octet-stream")
return req, nil
}
// Mkdir creates the container if it doesn't exist
func (f *Fs) Mkdir(ctx context.Context, dir string) (err error) {
// defer log.Trace(f, "dir=%v", dir)("err=%v", &err)
_, err = f.dirCache.FindDir(ctx, dir, true)
return err
}
// Rmdir deletes the container
//
// Returns an error if it isn't empty
func (f *Fs) Rmdir(ctx context.Context, dir string) (err error) {
// defer log.Trace(f, "dir=%v", dir)("err=%v", &err)
root := strings.Trim(path.Join(f.root, dir), "/")
// can't remove root
if root == "" {
return errors.New("can't remove root directory")
}
// check directory exists
directoryID, err := f.dirCache.FindDir(ctx, dir, false)
if err != nil {
return errors.Wrap(err, "Rmdir")
}
dirID := atoi(directoryID)
// check directory empty
var children []putio.File
err = f.pacer.Call(func() (bool, error) {
// fs.Debugf(f, "listing files: %d", dirID)
children, _, err = f.client.Files.List(ctx, dirID)
return shouldRetry(err)
})
if err != nil {
return errors.Wrap(err, "Rmdir")
}
if len(children) != 0 {
return errors.New("directory not empty")
}
// remove it
err = f.pacer.Call(func() (bool, error) {
// fs.Debugf(f, "deleting file: %d", dirID)
err = f.client.Files.Delete(ctx, dirID)
return shouldRetry(err)
})
f.dirCache.FlushDir(dir)
return err
}
// Precision returns the precision
func (f *Fs) Precision() time.Duration {
return time.Second
}
// Purge deletes all the files and the container
//
// Optional interface: Only implement this if you have a way of
// deleting all the files quicker than just running Remove() on the
// result of List()
func (f *Fs) Purge(ctx context.Context) (err error) {
// defer log.Trace(f, "")("err=%v", &err)
if f.root == "" {
return errors.New("can't purge root directory")
}
rootIDs, err := f.dirCache.RootID(ctx, false)
if err != nil {
return err
}
rootID := atoi(rootIDs)
// Let putio delete the filesystem tree
err = f.pacer.Call(func() (bool, error) {
// fs.Debugf(f, "deleting file: %d", rootID)
err = f.client.Files.Delete(ctx, rootID)
return shouldRetry(err)
})
f.dirCache.ResetRoot()
return err
}
// Copy src to this remote using server side copy operations.
//
// This is stored with the remote path given
//
// It returns the destination Object and a possible error
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantCopy
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (o fs.Object, err error) {
// defer log.Trace(f, "src=%+v, remote=%v", src, remote)("o=%+v, err=%v", &o, &err)
srcObj, ok := src.(*Object)
if !ok {
return nil, fs.ErrorCantCopy
}
leaf, directoryID, err := f.dirCache.FindPath(ctx, remote, true)
if err != nil {
return nil, err
}
err = f.pacer.Call(func() (bool, error) {
params := url.Values{}
params.Set("file_id", strconv.FormatInt(srcObj.file.ID, 10))
params.Set("parent_id", directoryID)
params.Set("name", f.opt.Enc.FromStandardName(leaf))
req, err := f.client.NewRequest(ctx, "POST", "/v2/files/copy", strings.NewReader(params.Encode()))
if err != nil {
return false, err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
// fs.Debugf(f, "copying file (%d) to parent_id: %s", srcObj.file.ID, directoryID)
_, err = f.client.Do(req, nil)
return shouldRetry(err)
})
if err != nil {
return nil, err
}
return f.NewObject(ctx, remote)
}
// Move src to this remote using server side move operations.
//
// This is stored with the remote path given
//
// It returns the destination Object and a possible error
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantMove
func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (o fs.Object, err error) {
// defer log.Trace(f, "src=%+v, remote=%v", src, remote)("o=%+v, err=%v", &o, &err)
srcObj, ok := src.(*Object)
if !ok {
return nil, fs.ErrorCantMove
}
leaf, directoryID, err := f.dirCache.FindPath(ctx, remote, true)
if err != nil {
return nil, err
}
err = f.pacer.Call(func() (bool, error) {
params := url.Values{}
params.Set("file_id", strconv.FormatInt(srcObj.file.ID, 10))
params.Set("parent_id", directoryID)
params.Set("name", f.opt.Enc.FromStandardName(leaf))
req, err := f.client.NewRequest(ctx, "POST", "/v2/files/move", strings.NewReader(params.Encode()))
if err != nil {
return false, err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
// fs.Debugf(f, "moving file (%d) to parent_id: %s", srcObj.file.ID, directoryID)
_, err = f.client.Do(req, nil)
return shouldRetry(err)
})
if err != nil {
return nil, err
}
return f.NewObject(ctx, remote)
}
// DirMove moves src, srcRemote to this remote at dstRemote
// using server side move operations.
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantDirMove
//
// If destination exists then return fs.ErrorDirExists
func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) (err error) {
// defer log.Trace(f, "src=%+v, srcRemote=%v, dstRemote", src, srcRemote, dstRemote)("err=%v", &err)
srcFs, ok := src.(*Fs)
if !ok {
return fs.ErrorCantDirMove
}
srcID, _, _, dstDirectoryID, dstLeaf, err := f.dirCache.DirMove(ctx, srcFs.dirCache, srcFs.root, srcRemote, f.root, dstRemote)
if err != nil {
return err
}
err = f.pacer.Call(func() (bool, error) {
params := url.Values{}
params.Set("file_id", srcID)
params.Set("parent_id", dstDirectoryID)
params.Set("name", f.opt.Enc.FromStandardName(dstLeaf))
req, err := f.client.NewRequest(ctx, "POST", "/v2/files/move", strings.NewReader(params.Encode()))
if err != nil {
return false, err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
// fs.Debugf(f, "moving file (%s) to parent_id: %s", srcID, dstDirectoryID)
_, err = f.client.Do(req, nil)
return shouldRetry(err)
})
srcFs.dirCache.FlushDir(srcRemote)
return err
}
// About gets quota information
func (f *Fs) About(ctx context.Context) (usage *fs.Usage, err error) {
// defer log.Trace(f, "")("usage=%+v, err=%v", usage, &err)
var ai putio.AccountInfo
err = f.pacer.Call(func() (bool, error) {
// fs.Debugf(f, "getting account info")
ai, err = f.client.Account.Info(ctx)
return shouldRetry(err)
})
if err != nil {
return nil, errors.Wrap(err, "about failed")
}
return &fs.Usage{
Total: fs.NewUsageValue(ai.Disk.Size), // quota of bytes that can be used
Used: fs.NewUsageValue(ai.Disk.Used), // bytes in use
Free: fs.NewUsageValue(ai.Disk.Avail), // bytes which can be uploaded before reaching the quota
}, nil
}
// Hashes returns the supported hash sets.
func (f *Fs) Hashes() hash.Set {
return hash.Set(hash.CRC32)
}
// DirCacheFlush resets the directory cache - used in testing as an
// optional interface
func (f *Fs) DirCacheFlush() {
// defer log.Trace(f, "")("")
f.dirCache.ResetRoot()
}
// CleanUp the trash in the Fs
func (f *Fs) CleanUp(ctx context.Context) (err error) {
// defer log.Trace(f, "")("err=%v", &err)
return f.pacer.Call(func() (bool, error) {
req, err := f.client.NewRequest(ctx, "POST", "/v2/trash/empty", nil)
if err != nil {
return false, err
}
// fs.Debugf(f, "emptying trash")
_, err = f.client.Do(req, nil)
return shouldRetry(err)
})
}