Add resume feature

Added an interface and machinery for resuming failed uploads.
Implemented this interface in the local backend.
Later on it can be implemented by any supporting backend.

Fixes #87
This commit is contained in:
Maxwell Calman 2020-08-31 10:25:10 -04:00 committed by Ivan Andreev
parent ac2e5fde36
commit b015012d8b
19 changed files with 712 additions and 80 deletions

View File

@ -18,7 +18,7 @@ func TestIntegration(t *testing.T) {
fstests.Run(t, &fstests.Opt{
RemoteName: "TestCache:",
NilObject: (*cache.Object)(nil),
UnimplementableFsMethods: []string{"PublicLink", "OpenWriterAt"},
UnimplementableFsMethods: []string{"PublicLink", "OpenWriterAt", "Resume"},
UnimplementableObjectMethods: []string{"MimeType", "ID", "GetTier", "SetTier"},
SkipInvalidUTF8: true, // invalid UTF-8 confuses the cache
})

View File

@ -43,6 +43,7 @@ func TestIntegration(t *testing.T) {
"DirCacheFlush",
"UserInfo",
"Disconnect",
"Resume",
},
}
if *fstest.RemoteName == "" {

View File

@ -23,7 +23,7 @@ func TestIntegration(t *testing.T) {
fstests.Run(t, &fstests.Opt{
RemoteName: *fstest.RemoteName,
NilObject: (*crypt.Object)(nil),
UnimplementableFsMethods: []string{"OpenWriterAt"},
UnimplementableFsMethods: []string{"OpenWriterAt", "Resume"},
UnimplementableObjectMethods: []string{"MimeType"},
})
}

View File

@ -12,6 +12,7 @@ import (
"path"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"time"
@ -24,6 +25,7 @@ import (
"github.com/rclone/rclone/fs/config/configstruct"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/file"
"github.com/rclone/rclone/lib/readers"
@ -230,6 +232,7 @@ type Fs struct {
precision time.Duration // precision of local filesystem
warnedMu sync.Mutex // used for locking access to 'warned'.
warned map[string]struct{} // whether we have warned about this string
hashState map[string]string // set in resume(), used to restore hash state
// do os.Lstat or os.Stat
lstat func(name string) (os.FileInfo, error)
@ -267,11 +270,12 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
}
f := &Fs{
name: name,
opt: *opt,
warned: make(map[string]struct{}),
dev: devUnset,
lstat: os.Lstat,
name: name,
opt: *opt,
warned: make(map[string]struct{}),
hashState: make(map[string]string),
dev: devUnset,
lstat: os.Lstat,
}
f.root = cleanRootPath(root, f.opt.NoUNC, f.opt.Enc)
f.features = (&fs.Features{
@ -1115,6 +1119,7 @@ func (nwc nopWriterCloser) Close() error {
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
var out io.WriteCloser
var hasher *hash.MultiHasher
var resumeOpt *fs.OptionResume
for _, option := range options {
switch x := option.(type) {
@ -1125,6 +1130,24 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
return err
}
}
case *fs.OptionResume:
resumeOpt = option.(*fs.OptionResume)
if resumeOpt.Pos != 0 {
fs.Logf(o, "Resuming at byte position: %d", resumeOpt.Pos)
// Discard bytes that already exist on backend
_, err := io.CopyN(ioutil.Discard, in, resumeOpt.Pos)
if err != nil {
return err
}
hashType := o.fs.Hashes().GetOne()
hasher, err = hash.NewMultiHasherTypes(hash.NewHashSet(hashType))
if err != nil {
return err
}
if err := hasher.RestoreHashState(hashType, o.fs.hashState[o.remote]); err != nil {
return err
}
}
}
}
@ -1138,7 +1161,12 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
// If it is a translated link, just read in the contents, and
// then create a symlink
if !o.translatedLink {
f, err := file.OpenFile(o.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
var f *os.File
if resumeOpt != nil && resumeOpt.Pos != 0 {
f, err = file.OpenFile(o.path, os.O_WRONLY|os.O_APPEND, 0666)
} else {
f, err = file.OpenFile(o.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
}
if err != nil {
if runtime.GOOS == "windows" && os.IsPermission(err) {
// If permission denied on Windows might be trying to update a
@ -1152,7 +1180,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
return err
}
}
if !o.fs.opt.NoPreAllocate {
if !o.fs.opt.NoPreAllocate && resumeOpt != nil && resumeOpt.Pos == 0 {
// Pre-allocate the file for performance reasons
err = file.PreAllocate(src.Size(), f)
if err != nil {
@ -1173,7 +1201,40 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
in = io.TeeReader(in, hasher)
}
_, err = io.Copy(out, in)
var cacheingWg sync.WaitGroup // Used to halt code execution while resume cache is written
var copyWg sync.WaitGroup // Ensure that io.Copy has returned before writing resume data
copyWg.Add(1)
// Context for read so that we can handle io.copy being interrupted
ctxr, cancel := context.WithCancel(ctx)
// Create exit handler during Copy so that resume data can be written if interrupted
atexitHandle := atexit.Register(func() {
// If OptionResume was passed, call SetID to prepare for future resumes
// ID is the number of bytes written to the destination
if resumeOpt != nil {
// Stops the copy so cache is consistent with remote
cacheingWg.Add(1)
cancel()
copyWg.Wait()
fs.Infof(o, "Updating resume cache")
fileInfo, _ := o.fs.lstat(o.path)
writtenStr := strconv.FormatInt(fileInfo.Size(), 10)
hashType := o.fs.Hashes().GetOne()
hashState, err := hasher.GetHashState(hashType)
if err != nil {
return
}
_ = resumeOpt.SetID(ctx, writtenStr, hashType.String(), hashState)
}
})
cr := readers.NewContextReader(ctxr, in)
_, err = io.Copy(out, cr)
copyWg.Done()
atexit.Unregister(atexitHandle)
if errors.Is(err, context.Canceled) {
// If resume data is being written we want to wait here for the program to exit
cacheingWg.Wait()
}
closeErr := out.Close()
if err == nil {
err = closeErr
@ -1338,9 +1399,44 @@ func cleanRootPath(s string, noUNC bool, enc encoder.MultiEncoder) string {
return s
}
// Resume checks whether the (remote, ID) pair is valid and returns
// the point the file should be resumed from or an error.
func (f *Fs) Resume(ctx context.Context, remote, ID, hashName, hashState string) (Pos int64, err error) {
cachedPos, err := strconv.ParseInt(ID, 10, 64)
if err != nil {
return 0, err
}
// Compare hash of partial file on remote with partial hash in cache
remoteObject, err := f.NewObject(ctx, remote)
if err != nil {
return 0, err
}
if remoteObject.Size() != cachedPos {
return 0, errors.New("size on remote does not match resume cache")
}
hashType := hash.NameToType(hashName)
remoteHash, err := remoteObject.Hash(ctx, hashType)
if err != nil {
return 0, err
}
cachedHash, err := hash.SumPartialHash(hashName, hashState)
if err != nil {
return 0, err
}
// Hashes match, attempt resume
if cachedHash == remoteHash {
f.hashState[remote] = hashState
return cachedPos, nil
}
// No valid position found, restart from beginning
fs.Infof(remote, "Not resuming as cached hash state did not match hash state on remote")
return 0, nil
}
// Check the interfaces are satisfied
var (
_ fs.Fs = &Fs{}
_ fs.Resumer = &Fs{}
_ fs.Purger = &Fs{}
_ fs.PutStreamer = &Fs{}
_ fs.Mover = &Fs{}

View File

@ -18,7 +18,7 @@ func TestIntegration(t *testing.T) {
}
fstests.Run(t, &fstests.Opt{
RemoteName: *fstest.RemoteName,
UnimplementableFsMethods: []string{"OpenWriterAt", "DuplicateFiles"},
UnimplementableFsMethods: []string{"OpenWriterAt", "DuplicateFiles", "Resume"},
UnimplementableObjectMethods: []string{"MimeType"},
})
}

View File

@ -130,6 +130,8 @@ type ConfigInfo struct {
FsCacheExpireDuration time.Duration
FsCacheExpireInterval time.Duration
DisableHTTP2 bool
MaxResumeCacheSize SizeSuffix
ResumeCutoff SizeSuffix
HumanReadable bool
KvLockTime time.Duration // maximum time to keep key-value database locked by process
}
@ -163,6 +165,8 @@ func NewConfig() *ConfigInfo {
c.TPSLimitBurst = 1
c.MaxTransfer = -1
c.MaxBacklog = 10000
c.MaxResumeCacheSize = SizeSuffix(100 * 1024)
c.ResumeCutoff = -1
// We do not want to set the default here. We use this variable being empty as part of the fall-through of options.
// c.StatsOneLineDateFormat = "2006/01/02 15:04:05 - "
c.MultiThreadCutoff = SizeSuffix(250 * 1024 * 1024)

View File

@ -132,6 +132,8 @@ func AddFlags(ci *fs.ConfigInfo, flagSet *pflag.FlagSet) {
flags.BoolVarP(flagSet, &ci.RefreshTimes, "refresh-times", "", ci.RefreshTimes, "Refresh the modtime of remote files")
flags.BoolVarP(flagSet, &ci.NoConsole, "no-console", "", ci.NoConsole, "Hide console window (supported on Windows only)")
flags.StringVarP(flagSet, &dscp, "dscp", "", "", "Set DSCP value to connections, value or name, e.g. CS1, LE, DF, AF21")
flags.FVarP(flagSet, &ci.MaxResumeCacheSize, "max-resume-cache-size", "", "The maximum size of the cache used to store data necessary for resuming uploads. When the storage grows beyond this size, the oldest resume data will be deleted. (default 100k")
flags.FVarP(flagSet, &ci.ResumeCutoff, "resume-cutoff", "", "If set, attempt to resume all partial uploads larger than this size. (default off)")
flags.DurationVarP(flagSet, &ci.FsCacheExpireDuration, "fs-cache-expire-duration", "", ci.FsCacheExpireDuration, "Cache remotes for this long (0 to disable caching)")
flags.DurationVarP(flagSet, &ci.FsCacheExpireInterval, "fs-cache-expire-interval", "", ci.FsCacheExpireInterval, "Interval to check for expired remotes")
flags.BoolVarP(flagSet, &ci.DisableHTTP2, "disable-http2", "", ci.DisableHTTP2, "Disable HTTP/2 in the global transport")

View File

@ -163,6 +163,10 @@ type Features struct {
// Shutdown the backend, closing any background tasks and any
// cached connections.
Shutdown func(ctx context.Context) error
// Resume checks whether the (remote, ID) pair is valid and returns
// the point the file should be resumed from or an error.
Resume func(ctx context.Context, remote, ID, hashName, hashState string) (Pos int64, err error)
}
// Disable nil's out the named feature. If it isn't found then it
@ -290,6 +294,9 @@ func (ft *Features) Fill(ctx context.Context, f Fs) *Features {
if do, ok := f.(Shutdowner); ok {
ft.Shutdown = do.Shutdown
}
if do, ok := f.(Resumer); ok {
ft.Resume = do.Resume
}
return ft.DisableList(GetConfig(ctx).DisableFeatures)
}
@ -636,6 +643,13 @@ type Shutdowner interface {
Shutdown(ctx context.Context) error
}
// Resumer is an optional interface for Fs
type Resumer interface {
// Resume checks whether the (remote, ID) pair is valid and returns
// the point the file should be resumed from or an error.
Resume(ctx context.Context, remote, ID, hashName, hashState string) (Pos int64, err error)
}
// ObjectsChan is a channel of Objects
type ObjectsChan chan Object

View File

@ -48,6 +48,7 @@ var (
ErrorNotImplemented = errors.New("optional feature not implemented")
ErrorCommandNotFound = errors.New("command not found")
ErrorFileNameTooLong = errors.New("file name too long")
ErrorCantResume = errors.New("can't resume file upload")
)
// CheckClose is a utility function used to check the return from

View File

@ -4,6 +4,7 @@ import (
"crypto/md5"
"crypto/sha1"
"crypto/sha256"
"encoding"
"encoding/base64"
"encoding/hex"
"errors"
@ -264,6 +265,67 @@ func (m *MultiHasher) Size() int64 {
return m.size
}
// GetHashState returns the partial hash state for the given hash type encoded as a string
func (m *MultiHasher) GetHashState(hashType Type) (string, error) {
h, ok := m.h[hashType]
if !ok {
return "", ErrUnsupported
}
marshaler, ok := h.(encoding.BinaryMarshaler)
if !ok {
return "", errors.New(hashType.String() + " does not implement encoding.BinaryMarshaler")
}
data, err := marshaler.MarshalBinary()
if err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(data), nil
}
// RestoreHashState restores the partial hash state for the passed hash type
func (m *MultiHasher) RestoreHashState(hashType Type, hashState string) error {
partialHashState, err := base64.StdEncoding.DecodeString(hashState)
if err != nil {
return err
}
unmarshaler, ok := m.h[hashType].(encoding.BinaryUnmarshaler)
if ok {
if err := unmarshaler.UnmarshalBinary(partialHashState); err != nil {
return err
}
}
return nil
}
// SumPartialHash returns the hash of the partial hash state
func SumPartialHash(hashName, hashState string) (string, error) {
partialHashDef, ok := name2hash[hashName]
if !ok {
return "", ErrUnsupported
}
partialHash := partialHashDef.newFunc()
partialHashState, err := base64.StdEncoding.DecodeString(hashState)
if err != nil {
return "", err
}
unmarshaler, ok := partialHash.(encoding.BinaryUnmarshaler)
if ok {
if err := unmarshaler.UnmarshalBinary(partialHashState); err != nil {
return "", err
}
}
return hex.EncodeToString(partialHash.Sum(nil)), nil
}
// NameToType returns the requested hash type or None if the hash type isn't supported
func NameToType(hashName string) Type {
hashDef, ok := name2hash[hashName]
if !ok {
return None
}
return hashDef.hashType
}
// A Set Indicates one or more hash types.
type Set int

View File

@ -3,13 +3,19 @@
package fs
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/cacheroot"
)
// OpenOption is an interface describing options for Open
@ -230,6 +236,144 @@ func (o *HashesOption) Mandatory() bool {
return false
}
// OptionResume defines a Put/Upload for doing resumes
type OptionResume struct {
ID string // resume this ID if set
Pos int64 // and resume from this position
Src Object
F Fs
Remote string
CacheCleaned bool
CacheDir string
}
// SetID will be called by backend's Put/Update function if the object's upload
// could be resumed upon failure
//
// SetID takes the passed resume ID, hash state, hash name and Fingerprint of the object and stores it in
// --cache-dir so that future Copy operations can resume the upload if it fails
func (o *OptionResume) SetID(ctx context.Context, ID, hashName, hashState string) error {
ci := GetConfig(ctx)
// Get the Fingerprint of the src object so that future Copy operations can ensure the
// object hasn't changed before resuming an upload
fingerprint := Fingerprint(ctx, o.Src, true)
data, err := marshalResumeJSON(ctx, fingerprint, ID, hashName, hashState)
if err != nil {
return fmt.Errorf("failed to marshal data JSON: %w", err)
}
if len(data) < int(ci.MaxResumeCacheSize) {
// Each remote will have its own directory for cached resume files
dirPath, _, err := cacheroot.CreateCacheRoot(o.CacheDir, o.F.Name(), o.F.Root(), "resume")
if err != nil {
return err
}
err = os.MkdirAll(dirPath, os.ModePerm)
if err != nil {
return fmt.Errorf("failed to create cache directory %v: %w", dirPath, err)
}
// Write resume data to disk
cachePath := filepath.Join(dirPath, o.Remote)
cacheFile, err := os.Create(cachePath)
if err != nil {
return fmt.Errorf("failed to create cache file %v: %w", cachePath, err)
}
defer func() {
_ = cacheFile.Close()
}()
_, errWrite := cacheFile.Write(data)
if errWrite != nil {
return fmt.Errorf("failed to write JSON to file: %w", errWrite)
}
}
if !o.CacheCleaned {
rootCacheDir := filepath.Join(o.CacheDir, "resume")
if err := cleanResumeCache(ctx, rootCacheDir); err != nil {
return fmt.Errorf("failed to clean resume cache: %w", err)
}
}
o.CacheCleaned = true
return nil
}
// ResumeJSON is a struct for storing resume info in cache
type ResumeJSON struct {
Fingerprint string `json:"fprint"`
ID string `json:"id"`
HashName string `json:"hname"`
HashState string `json:"hstate"`
}
func marshalResumeJSON(ctx context.Context, fprint, id, hashName, hashState string) ([]byte, error) {
resumedata := ResumeJSON{
Fingerprint: fprint,
ID: id,
HashName: hashName,
HashState: hashState,
}
data, err := json.Marshal(&resumedata)
return data, err
}
// cleanCache checks the size of the resume cache and removes the oldest resume files if more than limit
func cleanResumeCache(ctx context.Context, rootCacheDir string) error {
ci := GetConfig(ctx)
var paths []string
pathsWithInfo := make(map[string]os.FileInfo)
totalCacheSize := int64(0)
walkErr := filepath.Walk(rootCacheDir,
func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
// Empty subdirectories in the resume cache dir can be removed
removeErr := os.Remove(path)
if err != nil && !os.IsNotExist(removeErr) {
return fmt.Errorf("failed to remove empty subdirectory: %s: %w", path, err)
}
return nil
}
paths = append(paths, path)
pathsWithInfo[path] = info
totalCacheSize += info.Size()
return nil
})
if walkErr != nil {
return fmt.Errorf("error walking through cache when cleaning cache dir: %w", walkErr)
}
if totalCacheSize > int64(ci.MaxResumeCacheSize) {
sort.Slice(paths, func(i, j int) bool {
return pathsWithInfo[paths[i]].ModTime().Before(pathsWithInfo[paths[j]].ModTime())
})
for _, p := range paths {
if totalCacheSize < int64(ci.MaxResumeCacheSize) {
break
}
if err := os.Remove(p); err != nil {
return fmt.Errorf("error removing oldest cache file: %s: %w", p, err)
}
totalCacheSize -= pathsWithInfo[p].Size()
Debugf(p, "Successfully removed oldest cache file")
}
}
return nil
}
// Header formats the option as an http header
func (o *OptionResume) Header() (key string, value string) {
return "", ""
}
// String formats the option into human readable form
func (o *OptionResume) String() string {
return fmt.Sprintf("OptionResume(ID:%v, Pos:%v)", o.ID, o.Pos)
}
// Mandatory returns whether the option must be parsed or can be ignored
func (o *OptionResume) Mandatory() bool {
return false
}
// NullOption defines an Option which does nothing
type NullOption struct {
}

View File

@ -0,0 +1,23 @@
//go:build !windows
// +build !windows
package operations
import (
"os"
"os/exec"
"syscall"
)
func sendInterrupt() error {
p, err := os.FindProcess(syscall.Getpid())
if err != nil {
return err
}
err = p.Signal(os.Interrupt)
return err
}
func setupCmd(cmd *exec.Cmd) {
// Only needed for windows
}

View File

@ -0,0 +1,32 @@
//go:build windows
// +build windows
package operations
import (
"os/exec"
"syscall"
)
// Credit: https://github.com/golang/go/blob/6125d0c4265067cdb67af1340bf689975dd128f4/src/os/signal/signal_windows_test.go#L18
func sendInterrupt() error {
d, e := syscall.LoadDLL("kernel32.dll")
if e != nil {
return e
}
p, e := d.FindProc("GenerateConsoleCtrlEvent")
if e != nil {
return e
}
r, _, e := p.Call(syscall.CTRL_BREAK_EVENT, uintptr(syscall.Getpid()))
if r == 0 {
return e
}
return nil
}
func setupCmd(cmd *exec.Cmd) {
(*cmd).SysProcAttr = &syscall.SysProcAttr{
CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP,
}
}

View File

@ -33,6 +33,7 @@ import (
"github.com/rclone/rclone/fs/object"
"github.com/rclone/rclone/fs/walk"
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/cacheroot"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/random"
"github.com/rclone/rclone/lib/readers"
@ -364,6 +365,11 @@ func CommonHash(ctx context.Context, fa, fb fs.Info) (hash.Type, *fs.HashesOptio
// be nil.
func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) {
ci := fs.GetConfig(ctx)
var resumeOpt *fs.OptionResume
if f.Features().Resume != nil {
resumeOpt = createResumeOpt(ctx, f, remote, src)
}
tr := accounting.Stats(ctx).NewTransfer(src)
defer func() {
tr.Done(ctx, err)
@ -461,6 +467,10 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
wrappedSrc = NewOverrideRemote(src, remote)
}
options := []fs.OpenOption{hashOption}
// Appends OptionResume if it was set
if resumeOpt != nil {
options = append(options, resumeOpt)
}
for _, option := range ci.UploadHeaders {
options = append(options, option)
}
@ -475,6 +485,17 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
if err == nil {
newDst = dst
err = closeErr
cacheParent := config.GetCacheDir()
// Remove resume cache file (if one was created) when Put/Upload is successful
cacheDir, _, err := cacheroot.CreateCacheRoot(cacheParent, f.Name(), f.Root(), "resume")
if err != nil {
return nil, err
}
cacheFile := filepath.Join(cacheDir, remote)
removeErr := os.Remove(cacheFile)
if err != nil && !os.IsNotExist(removeErr) {
return nil, fmt.Errorf("failed to remove resume cache file after upload: %w", err)
}
}
}
}

72
fs/operations/resume.go Normal file
View File

@ -0,0 +1,72 @@
package operations
import (
"context"
"encoding/json"
"io/ioutil"
"os"
"path/filepath"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/config"
"github.com/rclone/rclone/lib/cacheroot"
)
// Creates an OptionResume that will be passed to Put/Upload
func createResumeOpt(ctx context.Context, f fs.Fs, remote string, src fs.Object) (resumeOpt *fs.OptionResume) {
ci := fs.GetConfig(ctx)
cacheParent := config.GetCacheDir()
resumeOpt = &fs.OptionResume{ID: "", Pos: 0, Src: src, F: f, Remote: remote, CacheCleaned: false, CacheDir: cacheParent}
if ci.ResumeCutoff >= 0 {
cacheDir, _, err := cacheroot.CreateCacheRoot(cacheParent, f.Name(), f.Root(), "resume")
if err != nil {
return nil
}
cacheFile := filepath.Join(cacheDir, remote)
resumeID, hashName, hashState, attemptResume := readResumeCache(ctx, f, src, cacheFile)
if attemptResume {
fs.Debugf(f, "Existing resume cache file found: %s. A resume will now be attempted.", cacheFile)
position, resumeErr := f.Features().Resume(ctx, remote, resumeID, hashName, hashState)
if resumeErr != nil {
fs.Errorf(src, "Resume canceled: %v", resumeErr)
} else if position > int64(ci.ResumeCutoff) {
resumeOpt.Pos = position
}
}
}
return resumeOpt
}
// readResumeCache checks to see if a resume ID has been cached for the source object.
// If it finds one it returns it along with true to signal a resume can be attempted
func readResumeCache(ctx context.Context, f fs.Fs, src fs.Object, cacheName string) (resumeID, hashName, hashState string, attemptResume bool) {
existingCacheFile, statErr := os.Open(cacheName)
defer func() {
_ = existingCacheFile.Close()
}()
if !os.IsNotExist(statErr) {
rawData, readErr := ioutil.ReadAll(existingCacheFile)
if readErr == nil {
existingFingerprint, resumeID, hashName, hashState, unmarshalErr := unmarshalResumeJSON(ctx, rawData)
if unmarshalErr != nil {
fs.Debugf(f, "Failed to unmarshal Resume JSON: %s. Resume will not be attempted.", unmarshalErr.Error())
} else if existingFingerprint != "" {
// Check if the src object has changed by comparing new Fingerprint to Fingerprint in cache file
fingerprint := fs.Fingerprint(ctx, src, true)
if existingFingerprint == fingerprint {
return resumeID, hashName, hashState, true
}
}
}
}
return "", "", "", false
}
func unmarshalResumeJSON(ctx context.Context, data []byte) (fprint, id, hashName, hashState string, err error) {
var resumedata fs.ResumeJSON
err = json.Unmarshal(data, &resumedata)
if err != nil {
return "", "", "", "", err
}
return resumedata.Fingerprint, resumedata.ID, resumedata.HashName, resumedata.HashState, nil
}

View File

@ -0,0 +1,163 @@
package operations
import (
"bytes"
"context"
"io"
"io/ioutil"
"log"
"math/rand"
"os"
"os/exec"
"runtime"
"strings"
"sync"
"testing"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fstest"
"github.com/rclone/rclone/fstest/mockobject"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type interruptReader struct {
once sync.Once
r io.Reader
}
// Read sends an OS specific interrupt signal and then reads 1 byte at a time
func (r *interruptReader) Read(b []byte) (n int, err error) {
r.once.Do(func() {
_ = sendInterrupt()
})
buffer := make([]byte, 1)
n, err = r.r.Read(buffer)
b[0] = buffer[0]
// Simulate duration of a larger read without needing to test with a large file
// Allows for the interrupt to be handled before Copy completes
time.Sleep(time.Microsecond * 10)
return n, err
}
// this is a wrapper for a mockobject with a custom Open function
//
// n indicates the number of bytes to read before sending an
// interrupt signal
type resumeTestObject struct {
fs.Object
n int64
}
// Open opens the file for read. Call Close() on the returned io.ReadCloser
//
// The Reader will signal an interrupt after reading n bytes, then continue to read 1 byte at a time.
// If TestResume is successful, the interrupt will be processed and reads will be cancelled before running
// out of bytes to read
func (o *resumeTestObject) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) {
rc, err := o.Object.Open(ctx, options...)
if err != nil {
return nil, err
}
r := io.MultiReader(&io.LimitedReader{R: rc, N: o.n}, &interruptReader{r: rc})
// Wrap with Close in a new readCloser
rc = readCloser{Reader: r, Closer: rc}
return rc, nil
}
func makeContent(t *testing.T, size int) []byte {
content := make([]byte, size)
r := rand.New(rand.NewSource(42))
_, err := io.ReadFull(r, content)
assert.NoError(t, err)
return content
}
func TestResume(t *testing.T) {
ctx := context.Background()
r := fstest.NewRun(t)
defer r.Finalise()
ci := fs.GetConfig(ctx)
ci.ResumeCutoff = 0
// Contents for the mock object
var (
// Test contents must be large enough that io.Copy does not complete during the first Rclone Copy operation
resumeTestContents = makeContent(t, 1024)
expectedContents = resumeTestContents
)
// Create mockobjects with given breaks
createTestSrc := func(interrupt int64) (fs.Object, fs.Object) {
srcOrig := mockobject.New("potato").WithContent(resumeTestContents, mockobject.SeekModeNone)
srcOrig.SetFs(r.Flocal)
src := &resumeTestObject{
Object: srcOrig,
n: interrupt,
}
return src, srcOrig
}
checkContents := func(obj fs.Object, contents string) {
assert.NotNil(t, obj)
assert.Equal(t, int64(len(contents)), obj.Size())
r, err := obj.Open(ctx)
assert.NoError(t, err)
assert.NotNil(t, r)
if r == nil {
return
}
data, err := ioutil.ReadAll(r)
assert.NoError(t, err)
assert.Equal(t, contents, string(data))
_ = r.Close()
}
srcBreak, srcNoBreak := createTestSrc(2)
// Run first Copy only in a subprocess so that it can be interrupted without ending the test
// adapted from: https://stackoverflow.com/questions/26225513/how-to-test-os-exit-scenarios-in-go
if os.Getenv("RUNTEST") == "1" {
remoteRoot := os.Getenv("REMOTEROOT")
remoteFs, err := fs.NewFs(ctx, remoteRoot)
require.NoError(t, err)
_, _ = Copy(ctx, remoteFs, nil, "testdst", srcBreak)
// This should never be reached as the subroutine should exit during Copy
require.True(t, false, "Problem with test, first Copy operation should've been interrupted before completion")
return
}
// Start the subprocess
cmd := exec.Command(os.Args[0], "-test.run=TestResume")
cmd.Env = append(os.Environ(), "RUNTEST=1", "REMOTEROOT="+r.Fremote.Root())
cmd.Stdout = os.Stdout
setupCmd(cmd)
err := cmd.Run()
e, ok := err.(*exec.ExitError)
// Exit code after signal will be (128+signum) on Linux or (signum) on Windows
expectedErrorString := "exit status 1"
if runtime.GOOS == "windows" {
expectedErrorString = "exit status 2"
}
assert.True(t, ok)
assert.Contains(t, e.Error(), expectedErrorString)
var buf bytes.Buffer
log.SetOutput(&buf)
defer func() {
log.SetOutput(os.Stderr)
}()
// Start copy again, but with no breaks
newDst, err := Copy(ctx, r.Fremote, nil, "testdst", srcNoBreak)
assert.NoError(t, err)
// Checks to see if a resume was initiated
// Resumed byte position can vary slightly depending how long it takes atexit to process the interrupt
assert.True(t, strings.Contains(buf.String(), "Resuming at byte position: "), "The upload did not resume when restarted. Message: %q", buf.String())
checkContents(newDst, string(expectedContents))
}

View File

@ -0,0 +1,50 @@
package cacheroot
import (
"fmt"
"os"
"path/filepath"
"runtime"
"strings"
"github.com/pkg/errors"
"github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/file"
)
// CreateCacheRoot will derive and make a subsystem cache path.
//
// Returned root OS path is an absolute path with UNC prefix,
// OS-specific path separators, and encoded with OS-specific encoder.
//
// Additionally it is returned as a standard path without UNC prefix,
// with slash path separators, and standard (internal) encoding.
//
// Care is taken when creating OS paths so that the ':' separator
// following a drive letter is not encoded, e.g. into unicode fullwidth colon.
//
// parentOSPath should contain an absolute local path in OS encoding.
//
// Note: instead of fs.Fs it takes name and root as plain strings
// to prevent import loops due to dependency on the fs package.
func CreateCacheRoot(parentOSPath, fsName, fsRoot, cacheName string) (rootOSPath, standardPath string, err error) {
// Get a relative cache path representing the remote.
relativeDir := fsRoot
if runtime.GOOS == "windows" && strings.HasPrefix(relativeDir, `//?/`) {
// Trim off the leading "//" to make the result
// valid for appending to another path.
relativeDir = relativeDir[2:]
}
relativeDir = fsName + "/" + relativeDir
// Derive and make the cache root directory
relativeOSPath := filepath.FromSlash(encoder.OS.FromStandardPath(relativeDir))
rootOSPath = file.UNCPath(filepath.Join(parentOSPath, cacheName, relativeOSPath))
if err = os.MkdirAll(rootOSPath, 0700); err != nil {
return "", "", errors.Wrapf(err, "failed to create %s cache directory", cacheName)
}
parentStdPath := encoder.OS.ToStandardPath(filepath.ToSlash(parentOSPath))
standardPath = fmt.Sprintf("%s/%s/%s", parentStdPath, cacheName, relativeDir)
return rootOSPath, standardPath, nil
}

View File

@ -9,7 +9,6 @@ import (
"sync/atomic"
"syscall"
"github.com/rclone/rclone/fs"
"golang.org/x/sys/unix"
)
@ -48,7 +47,6 @@ func PreAllocate(size int64, out *os.File) (err error) {
// Try the next flags combination
index++
atomic.StoreInt32(&fallocFlagsIndex, index)
fs.Debugf(nil, "preAllocate: got error on fallocate, trying combination %d/%d: %v", index, len(fallocFlags), err)
goto again
}

View File

@ -8,7 +8,6 @@ import (
"os"
"path"
"path/filepath"
"runtime"
"sort"
"strings"
"sync"
@ -21,6 +20,7 @@ import (
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/lib/cacheroot"
"github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/file"
"github.com/rclone/rclone/vfs/vfscache/writeback"
@ -75,43 +75,30 @@ type AddVirtualFn func(remote string, size int64, isDir bool) error
// This starts background goroutines which can be cancelled with the
// context passed in.
func New(ctx context.Context, fremote fs.Fs, opt *vfscommon.Options, avFn AddVirtualFn) (*Cache, error) {
// Get cache root path.
// We need it in two variants: OS path as an absolute path with UNC prefix,
// OS-specific path separators, and encoded with OS-specific encoder. Standard path
// without UNC prefix, with slash path separators, and standard (internal) encoding.
// Care must be taken when creating OS paths so that the ':' separator following a
// drive letter is not encoded (e.g. into unicode fullwidth colon).
var err error
parentOSPath := config.GetCacheDir() // Assuming string contains a local absolute path in OS encoding
fs.Debugf(nil, "vfs cache: root is %q", parentOSPath)
parentPath := fromOSPath(parentOSPath)
// Get a relative cache path representing the remote.
relativeDirPath := fremote.Root() // This is a remote path in standard encoding
if runtime.GOOS == "windows" {
if strings.HasPrefix(relativeDirPath, `//?/`) {
relativeDirPath = relativeDirPath[2:] // Trim off the "//" for the result to be a valid when appending to another path
}
}
relativeDirPath = fremote.Name() + "/" + relativeDirPath
relativeDirOSPath := toOSPath(relativeDirPath)
// Create cache root dirs
var dataOSPath, metaOSPath string
if dataOSPath, metaOSPath, err = createRootDirs(parentOSPath, relativeDirOSPath); err != nil {
fsName, fsRoot := fremote.Name(), fremote.Root()
dataOSPath, dataStdPath, err := cacheroot.CreateCacheRoot(parentOSPath, fsName, fsRoot, "vfs")
if err != nil {
return nil, err
}
fdata, err := fscache.Get(ctx, dataStdPath)
if err != nil {
return nil, fmt.Errorf("failed to get data cache backend: %w", err)
}
fs.Debugf(nil, "vfs cache: data root is %q", dataOSPath)
fs.Debugf(nil, "vfs cache: metadata root is %q", metaOSPath)
// Get (create) cache backends
var fdata, fmeta fs.Fs
if fdata, fmeta, err = getBackends(ctx, parentPath, relativeDirPath); err != nil {
metaOSPath, metaStdPath, err := cacheroot.CreateCacheRoot(parentOSPath, fsName, fsRoot, "vfsMeta")
if err != nil {
return nil, err
}
hashType, hashOption := operations.CommonHash(ctx, fdata, fremote)
fmeta, err := fscache.Get(ctx, metaStdPath)
if err != nil {
return nil, fmt.Errorf("failed to get metadata cache backend: %w", err)
}
fs.Debugf(nil, "vfs cache: metadata root is %q", metaOSPath)
// Create the cache object
hashType, hashOption := operations.CommonHash(ctx, fdata, fremote)
c := &Cache{
fremote: fremote,
fcache: fdata,
@ -150,23 +137,6 @@ func createDir(dir string) error {
return file.MkdirAll(dir, 0700)
}
// createRootDir creates a single cache root directory
func createRootDir(parentOSPath string, name string, relativeDirOSPath string) (path string, err error) {
path = file.UNCPath(filepath.Join(parentOSPath, name, relativeDirOSPath))
err = createDir(path)
return
}
// createRootDirs creates all cache root directories
func createRootDirs(parentOSPath string, relativeDirOSPath string) (dataOSPath string, metaOSPath string, err error) {
if dataOSPath, err = createRootDir(parentOSPath, "vfs", relativeDirOSPath); err != nil {
err = fmt.Errorf("failed to create data cache directory: %w", err)
} else if metaOSPath, err = createRootDir(parentOSPath, "vfsMeta", relativeDirOSPath); err != nil {
err = fmt.Errorf("failed to create metadata cache directory: %w", err)
}
return
}
// createItemDir creates the directory for named item in all cache roots
//
// Returns an os path for the data cache file.
@ -186,22 +156,6 @@ func (c *Cache) createItemDir(name string) (string, error) {
return filepath.Join(parentPath, leaf), nil
}
// getBackend gets a backend for a cache root dir
func getBackend(ctx context.Context, parentPath string, name string, relativeDirPath string) (fs.Fs, error) {
path := fmt.Sprintf("%s/%s/%s", parentPath, name, relativeDirPath)
return fscache.Get(ctx, path)
}
// getBackends gets backends for all cache root dirs
func getBackends(ctx context.Context, parentPath string, relativeDirPath string) (fdata fs.Fs, fmeta fs.Fs, err error) {
if fdata, err = getBackend(ctx, parentPath, "vfs", relativeDirPath); err != nil {
err = fmt.Errorf("failed to get data cache backend: %w", err)
} else if fmeta, err = getBackend(ctx, parentPath, "vfsMeta", relativeDirPath); err != nil {
err = fmt.Errorf("failed to get metadata cache backend: %w", err)
}
return
}
// clean returns the cleaned version of name for use in the index map
//
// name should be a remote path not an osPath
@ -214,11 +168,6 @@ func clean(name string) string {
return name
}
// fromOSPath turns a OS path into a standard/remote path
func fromOSPath(osPath string) string {
return encoder.OS.ToStandardPath(filepath.ToSlash(osPath))
}
// toOSPath turns a standard/remote path into an OS path
func toOSPath(standardPath string) string {
return filepath.FromSlash(encoder.OS.FromStandardPath(standardPath))