cache: offline uploading

This commit is contained in:
remusb 2018-01-30 00:05:04 +02:00 committed by GitHub
parent c277a4096c
commit 40af98b0b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 2690 additions and 782 deletions

634
backend/cache/cache.go vendored

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

78
backend/cache/cache_mount_unix_test.go vendored Normal file
View File

@ -0,0 +1,78 @@
// +build !plan9,!windows,go1.7
package cache_test
import (
"os"
"testing"
"time"
"bazil.org/fuse"
fusefs "bazil.org/fuse/fs"
"github.com/ncw/rclone/cmd/mount"
"github.com/ncw/rclone/cmd/mountlib"
"github.com/ncw/rclone/fs"
"github.com/stretchr/testify/require"
)
func (r *run) mountFs(t *testing.T, f fs.Fs) {
device := f.Name() + ":" + f.Root()
var options = []fuse.MountOption{
fuse.MaxReadahead(uint32(mountlib.MaxReadAhead)),
fuse.Subtype("rclone"),
fuse.FSName(device), fuse.VolumeName(device),
fuse.NoAppleDouble(),
fuse.NoAppleXattr(),
fuse.AllowOther(),
}
err := os.MkdirAll(r.mntDir, os.ModePerm)
require.NoError(t, err)
c, err := fuse.Mount(r.mntDir, options...)
require.NoError(t, err)
filesys := mount.NewFS(f)
server := fusefs.New(c, nil)
// Serve the mount point in the background returning error to errChan
r.unmountRes = make(chan error, 1)
go func() {
err := server.Serve(filesys)
closeErr := c.Close()
if err == nil {
err = closeErr
}
r.unmountRes <- err
}()
// check if the mount process has an error to report
<-c.Ready
require.NoError(t, c.MountError)
r.unmountFn = func() error {
// Shutdown the VFS
filesys.VFS.Shutdown()
return fuse.Unmount(r.mntDir)
}
r.vfs = filesys.VFS
r.isMounted = true
}
func (r *run) unmountFs(t *testing.T, f fs.Fs) {
var err error
for i := 0; i < 4; i++ {
err = r.unmountFn()
if err != nil {
//log.Printf("signal to umount failed - retrying: %v", err)
time.Sleep(3 * time.Second)
continue
}
break
}
require.NoError(t, err)
err = <-r.unmountRes
require.NoError(t, err)
err = r.vfs.CleanUp()
require.NoError(t, err)
r.isMounted = false
}

View File

@ -0,0 +1,124 @@
// +build windows,go1.7
package cache_test
import (
"fmt"
"os"
"testing"
"time"
"github.com/billziss-gh/cgofuse/fuse"
"github.com/ncw/rclone/cmd/cmount"
"github.com/ncw/rclone/cmd/mountlib"
"github.com/ncw/rclone/fs"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
)
// waitFor runs fn() until it returns true or the timeout expires
func waitFor(fn func() bool) (ok bool) {
const totalWait = 10 * time.Second
const individualWait = 10 * time.Millisecond
for i := 0; i < int(totalWait/individualWait); i++ {
ok = fn()
if ok {
return ok
}
time.Sleep(individualWait)
}
return false
}
func (r *run) mountFs(t *testing.T, f fs.Fs) {
// FIXME implement cmount
t.Skip("windows not supported yet")
device := f.Name() + ":" + f.Root()
options := []string{
"-o", "fsname=" + device,
"-o", "subtype=rclone",
"-o", fmt.Sprintf("max_readahead=%d", mountlib.MaxReadAhead),
"-o", "uid=-1",
"-o", "gid=-1",
"-o", "allow_other",
// This causes FUSE to supply O_TRUNC with the Open
// call which is more efficient for cmount. However
// it does not work with cgofuse on Windows with
// WinFSP so cmount must work with or without it.
"-o", "atomic_o_trunc",
"--FileSystemName=rclone",
}
fsys := cmount.NewFS(f)
host := fuse.NewFileSystemHost(fsys)
// Serve the mount point in the background returning error to errChan
r.unmountRes = make(chan error, 1)
go func() {
var err error
ok := host.Mount(r.mntDir, options)
if !ok {
err = errors.New("mount failed")
}
r.unmountRes <- err
}()
// unmount
r.unmountFn = func() error {
// Shutdown the VFS
fsys.VFS.Shutdown()
if host.Unmount() {
if !waitFor(func() bool {
_, err := os.Stat(r.mntDir)
return err != nil
}) {
t.Fatalf("mountpoint %q didn't disappear after unmount - continuing anyway", r.mntDir)
}
return nil
}
return errors.New("host unmount failed")
}
// Wait for the filesystem to become ready, checking the file
// system didn't blow up before starting
select {
case err := <-r.unmountRes:
require.NoError(t, err)
case <-time.After(time.Second * 3):
}
// Wait for the mount point to be available on Windows
// On Windows the Init signal comes slightly before the mount is ready
if !waitFor(func() bool {
_, err := os.Stat(r.mntDir)
return err == nil
}) {
t.Errorf("mountpoint %q didn't became available on mount", r.mntDir)
}
r.vfs = fsys.VFS
r.isMounted = true
}
func (r *run) unmountFs(t *testing.T, f fs.Fs) {
// FIXME implement cmount
t.Skip("windows not supported yet")
var err error
for i := 0; i < 4; i++ {
err = r.unmountFn()
if err != nil {
//log.Printf("signal to umount failed - retrying: %v", err)
time.Sleep(3 * time.Second)
continue
}
break
}
require.NoError(t, err)
err = <-r.unmountRes
require.NoError(t, err)
err = r.vfs.CleanUp()
require.NoError(t, err)
r.isMounted = false
}

View File

@ -93,15 +93,7 @@ func (d *Directory) String() string {
// Remote returns the remote path
func (d *Directory) Remote() string {
p := cleanPath(path.Join(d.Dir, d.Name))
if d.CacheFs.Root() != "" {
p = p[len(d.CacheFs.Root()):] // trim out root
if len(p) > 0 { // remove first separator
p = p[1:]
}
}
return p
return d.CacheFs.cleanRootFromPath(d.abs())
}
// abs returns the absolute path to the dir

View File

@ -9,14 +9,42 @@ import (
"sync"
"time"
"path"
"runtime"
"strings"
"github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fs/operations"
"github.com/pkg/errors"
)
var uploaderMap = make(map[string]*backgroundWriter)
var uploaderMapMx sync.Mutex
// initBackgroundUploader returns a single instance
func initBackgroundUploader(fs *Fs) (*backgroundWriter, error) {
// write lock to create one
uploaderMapMx.Lock()
defer uploaderMapMx.Unlock()
if b, ok := uploaderMap[fs.String()]; ok {
// if it was already started we close it so that it can be started again
if b.running {
b.close()
} else {
return b, nil
}
}
bb := newBackgroundWriter(fs)
uploaderMap[fs.String()] = bb
return uploaderMap[fs.String()], nil
}
// Handle is managing the read/write/seek operations on an open handle
type Handle struct {
cachedObject *Object
memory ChunkStorage
cfs *Fs
memory *Memory
preloadQueue chan int64
preloadOffset int64
offset int64
@ -31,20 +59,21 @@ type Handle struct {
}
// NewObjectHandle returns a new Handle for an existing Object
func NewObjectHandle(o *Object) *Handle {
func NewObjectHandle(o *Object, cfs *Fs) *Handle {
r := &Handle{
cachedObject: o,
cfs: cfs,
offset: 0,
preloadOffset: -1, // -1 to trigger the first preload
UseMemory: o.CacheFs.chunkMemory,
UseMemory: cfs.chunkMemory,
reading: false,
}
r.seenOffsets = make(map[int64]bool)
r.memory = NewMemory(-1)
// create a larger buffer to queue up requests
r.preloadQueue = make(chan int64, o.CacheFs.totalWorkers*10)
r.preloadQueue = make(chan int64, r.cfs.totalWorkers*10)
r.confirmReading = make(chan bool)
r.startReadWorkers()
return r
@ -52,11 +81,11 @@ func NewObjectHandle(o *Object) *Handle {
// cacheFs is a convenience method to get the parent cache FS of the object's manager
func (r *Handle) cacheFs() *Fs {
return r.cachedObject.CacheFs
return r.cfs
}
// storage is a convenience method to get the persistent storage of the object's manager
func (r *Handle) storage() Storage {
func (r *Handle) storage() *Persistent {
return r.cacheFs().cache
}
@ -76,7 +105,7 @@ func (r *Handle) startReadWorkers() {
if !r.cacheFs().plexConnector.isConnected() {
err := r.cacheFs().plexConnector.authenticate()
if err != nil {
fs.Infof(r, "failed to authenticate to Plex: %v", err)
fs.Errorf(r, "failed to authenticate to Plex: %v", err)
}
}
if r.cacheFs().plexConnector.isConnected() {
@ -113,7 +142,7 @@ func (r *Handle) scaleWorkers(desired int) {
}
// ignore first scale out from 0
if current != 0 {
fs.Infof(r, "scale workers to %v", desired)
fs.Debugf(r, "scale workers to %v", desired)
}
}
@ -156,7 +185,6 @@ func (r *Handle) queueOffset(offset int64) {
if r.UseMemory {
go r.memory.CleanChunksByNeed(offset)
}
go r.cacheFs().CleanUpCache(false)
r.confirmExternalReading()
r.preloadOffset = offset
@ -305,7 +333,6 @@ func (r *Handle) Close() error {
}
}
go r.cacheFs().CleanUpCache(false)
fs.Debugf(r, "cache reader closed %v", r.offset)
return nil
}
@ -357,11 +384,11 @@ func (w *worker) String() string {
// - if it supports seeking it will seek to the desired offset and return the same reader
// - if it doesn't support seeking it will close a possible existing one and open at the desired offset
// - if there's no reader associated with this worker, it will create one
func (w *worker) reader(offset, end int64) (io.ReadCloser, error) {
func (w *worker) reader(offset, end int64, closeOpen bool) (io.ReadCloser, error) {
var err error
r := w.rc
if w.rc == nil {
r, err = w.r.cacheFs().OpenRateLimited(func() (io.ReadCloser, error) {
r, err = w.r.cacheFs().openRateLimited(func() (io.ReadCloser, error) {
return w.r.cachedObject.Object.Open(&fs.SeekOption{Offset: offset}, &fs.RangeOption{Start: offset, End: end})
})
if err != nil {
@ -370,14 +397,16 @@ func (w *worker) reader(offset, end int64) (io.ReadCloser, error) {
return r, nil
}
if !closeOpen {
seekerObj, ok := r.(io.Seeker)
if ok {
_, err = seekerObj.Seek(offset, os.SEEK_SET)
return r, err
}
}
_ = w.rc.Close()
return w.r.cacheFs().OpenRateLimited(func() (io.ReadCloser, error) {
return w.r.cacheFs().openRateLimited(func() (io.ReadCloser, error) {
r, err = w.r.cachedObject.Object.Open(&fs.SeekOption{Offset: offset}, &fs.RangeOption{Start: offset, End: end})
if err != nil {
return nil, err
@ -463,10 +492,18 @@ func (w *worker) download(chunkStart, chunkEnd int64, retry int) {
time.Sleep(time.Second * time.Duration(retry))
}
w.rc, err = w.reader(chunkStart, chunkEnd)
closeOpen := false
if retry > 0 {
closeOpen = true
}
w.rc, err = w.reader(chunkStart, chunkEnd, closeOpen)
// we seem to be getting only errors so we abort
if err != nil {
fs.Errorf(w, "object open failed %v: %v", chunkStart, err)
err = w.r.cachedObject.refreshFromSource(true)
if err != nil {
fs.Errorf(w, "%v", err)
}
w.download(chunkStart, chunkEnd, retry+1)
return
}
@ -476,6 +513,10 @@ func (w *worker) download(chunkStart, chunkEnd int64, retry int) {
sourceRead, err = io.ReadFull(w.rc, data)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
fs.Errorf(w, "failed to read chunk %v: %v", chunkStart, err)
err = w.r.cachedObject.refreshFromSource(true)
if err != nil {
fs.Errorf(w, "%v", err)
}
w.download(chunkStart, chunkEnd, retry+1)
return
}
@ -483,7 +524,7 @@ func (w *worker) download(chunkStart, chunkEnd int64, retry int) {
if err == io.ErrUnexpectedEOF {
fs.Debugf(w, "partial downloaded chunk %v", fs.SizeSuffix(chunkStart))
} else {
fs.Debugf(w, "downloaded chunk %v", fs.SizeSuffix(chunkStart))
fs.Debugf(w, "downloaded chunk %v", chunkStart)
}
if w.r.UseMemory {
@ -499,6 +540,115 @@ func (w *worker) download(chunkStart, chunkEnd int64, retry int) {
}
}
const (
// BackgroundUploadStarted is a state for a temp file that has started upload
BackgroundUploadStarted = iota
// BackgroundUploadCompleted is a state for a temp file that has completed upload
BackgroundUploadCompleted
// BackgroundUploadError is a state for a temp file that has an error upload
BackgroundUploadError
)
// BackgroundUploadState is an entity that maps to an existing file which is stored on the temp fs
type BackgroundUploadState struct {
Remote string
Status int
Error error
}
type backgroundWriter struct {
fs *Fs
stateCh chan int
running bool
notifyCh chan BackgroundUploadState
}
func newBackgroundWriter(f *Fs) *backgroundWriter {
b := &backgroundWriter{
fs: f,
stateCh: make(chan int),
notifyCh: make(chan BackgroundUploadState),
}
return b
}
func (b *backgroundWriter) close() {
b.stateCh <- 2
}
func (b *backgroundWriter) pause() {
b.stateCh <- 1
}
func (b *backgroundWriter) play() {
b.stateCh <- 0
}
func (b *backgroundWriter) notify(remote string, status int, err error) {
state := BackgroundUploadState{
Remote: remote,
Status: status,
Error: err,
}
select {
case b.notifyCh <- state:
fs.Debugf(remote, "notified background upload state: %v", state.Status)
default:
}
}
func (b *backgroundWriter) run() {
state := 0
for {
b.running = true
select {
case s := <-b.stateCh:
state = s
default:
//
}
switch state {
case 1:
runtime.Gosched()
time.Sleep(time.Millisecond * 500)
continue
case 2:
b.running = false
return
}
absPath, err := b.fs.cache.getPendingUpload(b.fs.Root(), b.fs.tempWriteWait)
if err != nil || absPath == "" || !b.fs.isRootInPath(absPath) {
time.Sleep(time.Second)
continue
}
remote := b.fs.cleanRootFromPath(absPath)
b.notify(remote, BackgroundUploadStarted, nil)
fs.Infof(remote, "background upload: started upload")
err = operations.MoveFile(b.fs.UnWrap(), b.fs.tempFs, remote, remote)
if err != nil {
b.notify(remote, BackgroundUploadError, err)
_ = b.fs.cache.rollbackPendingUpload(absPath)
fs.Errorf(remote, "background upload: %v", err)
continue
}
fs.Infof(remote, "background upload: uploaded entry")
err = b.fs.cache.removePendingUpload(absPath)
if err != nil && !strings.Contains(err.Error(), "pending upload not found") {
fs.Errorf(remote, "background upload: %v", err)
}
parentCd := NewDirectory(b.fs, cleanPath(path.Dir(remote)))
err = b.fs.cache.ExpireDir(parentCd)
if err != nil {
fs.Errorf(parentCd, "background upload: cache expire error: %v", err)
}
fs.Infof(remote, "finished background upload")
b.notify(remote, BackgroundUploadCompleted, nil)
}
}
// Check the interfaces are satisfied
var (
_ io.ReadCloser = (*Handle)(nil)

View File

@ -3,24 +3,28 @@
package cache
import (
"encoding/json"
"io"
"os"
"path"
"sync"
"time"
"strconv"
"github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fs/hash"
"github.com/ncw/rclone/lib/readers"
"github.com/pkg/errors"
)
const (
objectInCache = "Object"
objectPendingUpload = "TempObject"
)
// Object is a generic file like object that stores basic information about it
type Object struct {
fs.Object `json:"-"`
ParentFs fs.Fs `json:"-"` // parent fs
CacheFs *Fs `json:"-"` // cache fs
Name string `json:"name"` // name of the directory
Dir string `json:"dir"` // abs path of the object
@ -29,79 +33,64 @@ type Object struct {
CacheStorable bool `json:"storable"` // says whether this object can be stored
CacheType string `json:"cacheType"`
CacheTs time.Time `json:"cacheTs"`
cacheHashes map[hash.Type]string // all supported hashes cached
CacheHashes map[hash.Type]string // all supported hashes cached
refreshMutex sync.Mutex
}
// NewObject builds one from a generic fs.Object
func NewObject(f *Fs, remote string) *Object { //0745 379 768
func NewObject(f *Fs, remote string) *Object {
fullRemote := path.Join(f.Root(), remote)
dir, name := path.Split(fullRemote)
cacheType := objectInCache
parentFs := f.UnWrap()
if f.tempWritePath != "" {
_, err := f.cache.SearchPendingUpload(fullRemote)
if err == nil { // queued for upload
cacheType = objectPendingUpload
parentFs = f.tempFs
fs.Debugf(fullRemote, "pending upload found")
}
}
co := &Object{
ParentFs: parentFs,
CacheFs: f,
Name: cleanPath(name),
Dir: cleanPath(dir),
CacheModTime: time.Now().UnixNano(),
CacheSize: 0,
CacheStorable: false,
CacheType: "Object",
CacheType: cacheType,
CacheTs: time.Now(),
}
return co
}
// MarshalJSON is needed to override the hashes map (needed to support older versions of Go)
func (o *Object) MarshalJSON() ([]byte, error) {
hashes := make(map[string]string)
for k, v := range o.cacheHashes {
hashes[strconv.Itoa(int(k))] = v
}
type Alias Object
return json.Marshal(&struct {
Hashes map[string]string `json:"hashes"`
*Alias
}{
Alias: (*Alias)(o),
Hashes: hashes,
})
}
// UnmarshalJSON is needed to override the CacheHashes map (needed to support older versions of Go)
func (o *Object) UnmarshalJSON(b []byte) error {
type Alias Object
aux := &struct {
Hashes map[string]string `json:"hashes"`
*Alias
}{
Alias: (*Alias)(o),
}
if err := json.Unmarshal(b, &aux); err != nil {
return err
}
o.cacheHashes = make(map[hash.Type]string)
for k, v := range aux.Hashes {
ht, _ := strconv.Atoi(k)
o.cacheHashes[hash.Type(ht)] = v
}
return nil
}
// ObjectFromOriginal builds one from a generic fs.Object
func ObjectFromOriginal(f *Fs, o fs.Object) *Object {
var co *Object
fullRemote := cleanPath(path.Join(f.Root(), o.Remote()))
dir, name := path.Split(fullRemote)
cacheType := objectInCache
parentFs := f.UnWrap()
if f.tempWritePath != "" {
_, err := f.cache.SearchPendingUpload(fullRemote)
if err == nil { // queued for upload
cacheType = objectPendingUpload
parentFs = f.tempFs
fs.Debugf(fullRemote, "pending upload found")
}
}
co = &Object{
ParentFs: parentFs,
CacheFs: f,
Name: cleanPath(name),
Dir: cleanPath(dir),
CacheType: "Object",
CacheType: cacheType,
CacheTs: time.Now(),
}
co.updateData(o)
@ -114,7 +103,7 @@ func (o *Object) updateData(source fs.Object) {
o.CacheSize = source.Size()
o.CacheStorable = source.Storable()
o.CacheTs = time.Now()
o.cacheHashes = make(map[hash.Type]string)
o.CacheHashes = make(map[hash.Type]string)
}
// Fs returns its FS info
@ -133,14 +122,7 @@ func (o *Object) String() string {
// Remote returns the remote path
func (o *Object) Remote() string {
p := path.Join(o.Dir, o.Name)
if o.CacheFs.Root() != "" {
p = p[len(o.CacheFs.Root()):] // trim out root
if len(p) > 0 { // remove first separator
p = p[1:]
}
}
return p
return o.CacheFs.cleanRootFromPath(p)
}
// abs returns the absolute path to the object
@ -148,17 +130,6 @@ func (o *Object) abs() string {
return path.Join(o.Dir, o.Name)
}
// parentRemote returns the absolute path parent remote
func (o *Object) parentRemote() string {
absPath := o.abs()
return cleanPath(path.Dir(absPath))
}
// parentDir returns the absolute path parent remote
func (o *Object) parentDir() *Directory {
return NewDirectory(o.CacheFs, cleanPath(path.Dir(o.Remote())))
}
// ModTime returns the cached ModTime
func (o *Object) ModTime() time.Time {
return time.Unix(0, o.CacheModTime)
@ -175,17 +146,24 @@ func (o *Object) Storable() bool {
}
// refreshFromSource requests the original FS for the object in case it comes from a cached entry
func (o *Object) refreshFromSource() error {
func (o *Object) refreshFromSource(force bool) error {
o.refreshMutex.Lock()
defer o.refreshMutex.Unlock()
var err error
var liveObject fs.Object
if o.Object != nil {
if o.Object != nil && !force {
return nil
}
liveObject, err := o.CacheFs.Fs.NewObject(o.Remote())
if o.isTempFile() {
liveObject, err = o.ParentFs.NewObject(o.Remote())
err = errors.Wrapf(err, "in parent fs %v", o.ParentFs)
} else {
liveObject, err = o.CacheFs.Fs.NewObject(o.Remote())
err = errors.Wrapf(err, "in cache fs %v", o.CacheFs.Fs)
}
if err != nil {
fs.Errorf(o, "error refreshing object: %v", err)
fs.Errorf(o, "error refreshing object in : %v", err)
return err
}
o.updateData(liveObject)
@ -196,7 +174,7 @@ func (o *Object) refreshFromSource() error {
// SetModTime sets the ModTime of this object
func (o *Object) SetModTime(t time.Time) error {
if err := o.refreshFromSource(); err != nil {
if err := o.refreshFromSource(false); err != nil {
return err
}
@ -207,19 +185,19 @@ func (o *Object) SetModTime(t time.Time) error {
o.CacheModTime = t.UnixNano()
o.persist()
fs.Debugf(o.Fs(), "updated ModTime %v: %v", o, t)
fs.Debugf(o, "updated ModTime: %v", t)
return nil
}
// Open is used to request a specific part of the file using fs.RangeOption
func (o *Object) Open(options ...fs.OpenOption) (io.ReadCloser, error) {
if err := o.refreshFromSource(); err != nil {
if err := o.refreshFromSource(true); err != nil {
return nil, err
}
var err error
cacheReader := NewObjectHandle(o)
cacheReader := NewObjectHandle(o, o.CacheFs)
var offset, limit int64 = 0, -1
for _, option := range options {
switch x := option.(type) {
@ -239,23 +217,34 @@ func (o *Object) Open(options ...fs.OpenOption) (io.ReadCloser, error) {
// Update will change the object data
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
if err := o.refreshFromSource(); err != nil {
if err := o.refreshFromSource(false); err != nil {
return err
}
fs.Infof(o, "updating object contents with size %v", src.Size())
// deleting cached chunks and info to be replaced with new ones
_ = o.CacheFs.cache.RemoveObject(o.abs())
// pause background uploads if active
if o.CacheFs.tempWritePath != "" {
o.CacheFs.backgroundRunner.pause()
defer o.CacheFs.backgroundRunner.play()
// don't allow started uploads
if o.isTempFile() && o.tempFileStartedUpload() {
return errors.Errorf("%v is currently uploading, can't update", o)
}
}
fs.Debugf(o, "updating object contents with size %v", src.Size())
// FIXME use reliable upload
err := o.Object.Update(in, src, options...)
if err != nil {
fs.Errorf(o, "error updating source: %v", err)
return err
}
// deleting cached chunks and info to be replaced with new ones
_ = o.CacheFs.cache.RemoveObject(o.abs())
o.CacheModTime = src.ModTime().UnixNano()
o.CacheSize = src.Size()
o.cacheHashes = make(map[hash.Type]string)
o.CacheHashes = make(map[hash.Type]string)
o.CacheTs = time.Now()
o.persist()
return nil
@ -263,41 +252,50 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
// Remove deletes the object from both the cache and the source
func (o *Object) Remove() error {
if err := o.refreshFromSource(); err != nil {
if err := o.refreshFromSource(false); err != nil {
return err
}
// pause background uploads if active
if o.CacheFs.tempWritePath != "" {
o.CacheFs.backgroundRunner.pause()
defer o.CacheFs.backgroundRunner.play()
// don't allow started uploads
if o.isTempFile() && o.tempFileStartedUpload() {
return errors.Errorf("%v is currently uploading, can't delete", o)
}
}
err := o.Object.Remove()
if err != nil {
return err
}
fs.Infof(o, "removing object")
fs.Debugf(o, "removing object")
_ = o.CacheFs.cache.RemoveObject(o.abs())
return err
_ = o.CacheFs.cache.removePendingUpload(o.abs())
_ = o.CacheFs.cache.ExpireDir(NewDirectory(o.CacheFs, cleanPath(path.Dir(o.Remote()))))
return nil
}
// Hash requests a hash of the object and stores in the cache
// since it might or might not be called, this is lazy loaded
func (o *Object) Hash(ht hash.Type) (string, error) {
if o.cacheHashes == nil {
o.cacheHashes = make(map[hash.Type]string)
if o.CacheHashes == nil {
o.CacheHashes = make(map[hash.Type]string)
}
cachedHash, found := o.cacheHashes[ht]
cachedHash, found := o.CacheHashes[ht]
if found {
return cachedHash, nil
}
if err := o.refreshFromSource(); err != nil {
if err := o.refreshFromSource(false); err != nil {
return "", err
}
liveHash, err := o.Object.Hash(ht)
if err != nil {
return "", err
}
o.cacheHashes[ht] = liveHash
o.CacheHashes[ht] = liveHash
o.persist()
fs.Debugf(o, "object hash cached: %v", liveHash)
@ -314,6 +312,25 @@ func (o *Object) persist() *Object {
return o
}
func (o *Object) isTempFile() bool {
_, err := o.CacheFs.cache.SearchPendingUpload(o.abs())
if err != nil {
o.CacheType = objectInCache
return false
}
o.CacheType = objectPendingUpload
return true
}
func (o *Object) tempFileStartedUpload() bool {
started, err := o.CacheFs.cache.SearchPendingUpload(o.abs())
if err != nil {
return false
}
return started
}
var (
_ fs.Object = (*Object)(nil)
)

39
backend/cache/plex.go vendored
View File

@ -12,6 +12,9 @@ import (
"sync"
"bytes"
"io/ioutil"
"github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fs/config"
)
@ -127,6 +130,17 @@ func (p *plexConnector) isConfigured() bool {
}
func (p *plexConnector) isPlaying(co *Object) bool {
var err error
remote := co.Remote()
if cr, yes := p.f.isWrappedByCrypt(); yes {
remote, err = cr.DecryptFileName(co.Remote())
if err != nil {
fs.Errorf("plex", "can not decrypt wrapped file: %v", err)
return false
}
}
isPlaying := false
req, err := http.NewRequest("GET", fmt.Sprintf("%s/status/sessions", p.url.String()), nil)
if err != nil {
@ -180,31 +194,12 @@ func (p *plexConnector) isPlaying(co *Object) bool {
if err != nil {
return false
}
var data map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&data)
var data []byte
data, err = ioutil.ReadAll(resp.Body)
if err != nil {
return false
}
remote := co.Remote()
if cr, yes := co.CacheFs.isWrappedByCrypt(); yes {
remote, err = cr.DecryptFileName(co.Remote())
if err != nil {
fs.Errorf("plex", "can not decrypt wrapped file: %v", err)
continue
}
}
fpGen, ok := get(data, "MediaContainer", "Metadata", 0, "Media", 0, "Part", 0, "file")
if !ok {
fs.Errorf("plex", "failed to understand: %v", data)
continue
}
fp, ok := fpGen.(string)
if !ok {
fs.Errorf("plex", "failed to understand: %v", fp)
continue
}
if strings.Contains(fp, remote) {
if bytes.Contains(data, []byte(remote)) {
isPlaying = true
break
}

View File

@ -14,8 +14,6 @@ import (
// Memory is a wrapper of transient storage for a go-cache store
type Memory struct {
ChunkStorage
db *cache.Cache
}

View File

@ -16,8 +16,11 @@ import (
"io/ioutil"
"fmt"
bolt "github.com/coreos/bbolt"
"github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fs/walk"
"github.com/pkg/errors"
)
@ -26,6 +29,7 @@ const (
RootBucket = "root"
RootTsBucket = "rootTs"
DataTsBucket = "dataTs"
tempBucket = "pending"
)
// Features flags for this storage type
@ -42,6 +46,12 @@ func GetPersistent(dbPath, chunkPath string, f *Features) (*Persistent, error) {
boltMapMx.Lock()
defer boltMapMx.Unlock()
if b, ok := boltMap[dbPath]; ok {
if !b.open {
err := b.connect()
if err != nil {
return nil, err
}
}
return b, nil
}
@ -59,14 +69,25 @@ type chunkInfo struct {
Size int64
}
type tempUploadInfo struct {
DestPath string
AddedOn time.Time
Started bool
}
// String representation of a tempUploadInfo
func (t *tempUploadInfo) String() string {
return fmt.Sprintf("%v - %v (%v)", t.DestPath, t.Started, t.AddedOn)
}
// Persistent is a wrapper of persistent storage for a bolt.DB file
type Persistent struct {
Storage
dbPath string
dataPath string
open bool
db *bolt.DB
cleanupMux sync.Mutex
tempQueueMux sync.Mutex
features *Features
}
@ -78,7 +99,7 @@ func newPersistent(dbPath, chunkPath string, f *Features) (*Persistent, error) {
features: f,
}
err := b.Connect()
err := b.connect()
if err != nil {
fs.Errorf(dbPath, "Error opening storage cache. Is there another rclone running on the same remote? %v", err)
return nil, err
@ -92,41 +113,32 @@ func (b *Persistent) String() string {
return "<Cache DB> " + b.dbPath
}
// Connect creates a connection to the configured file
// connect creates a connection to the configured file
// refreshDb will delete the file before to create an empty DB if it's set to true
func (b *Persistent) Connect() error {
var db *bolt.DB
func (b *Persistent) connect() error {
var err error
if b.features.PurgeDb {
err := os.Remove(b.dbPath)
if err != nil {
fs.Errorf(b, "failed to remove cache file: %v", err)
}
err = os.RemoveAll(b.dataPath)
if err != nil {
fs.Errorf(b, "failed to remove cache data: %v", err)
}
}
err = os.MkdirAll(b.dataPath, os.ModePerm)
if err != nil {
return errors.Wrapf(err, "failed to create a data directory %q", b.dataPath)
}
db, err = bolt.Open(b.dbPath, 0644, &bolt.Options{Timeout: 1 * time.Second})
b.db, err = bolt.Open(b.dbPath, 0644, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
return errors.Wrapf(err, "failed to open a cache connection to %q", b.dbPath)
}
_ = db.Update(func(tx *bolt.Tx) error {
if b.features.PurgeDb {
b.Purge()
}
_ = b.db.Update(func(tx *bolt.Tx) error {
_, _ = tx.CreateBucketIfNotExists([]byte(RootBucket))
_, _ = tx.CreateBucketIfNotExists([]byte(RootTsBucket))
_, _ = tx.CreateBucketIfNotExists([]byte(DataTsBucket))
_, _ = tx.CreateBucketIfNotExists([]byte(tempBucket))
return nil
})
b.db = db
b.open = true
return nil
}
@ -136,7 +148,9 @@ func (b *Persistent) getBucket(dir string, createIfMissing bool, tx *bolt.Tx) *b
cleanPath(dir)
entries := strings.FieldsFunc(dir, func(c rune) bool {
return os.PathSeparator == c
// cover Windows where rclone still uses '/' as path separator
// this should be safe as '/' is not a valid Windows character
return (os.PathSeparator == c || c == rune('/'))
})
bucket := tx.Bucket([]byte(RootBucket))
@ -478,6 +492,7 @@ func (b *Persistent) CleanChunksBySize(maxSize int64) {
b.cleanupMux.Lock()
defer b.cleanupMux.Unlock()
var cntChunks int
var roughlyCleaned fs.SizeSuffix
err := b.db.Update(func(tx *bolt.Tx) error {
dataTsBucket := tx.Bucket([]byte(DataTsBucket))
@ -499,6 +514,7 @@ func (b *Persistent) CleanChunksBySize(maxSize int64) {
if totalSize > maxSize {
needToClean := totalSize - maxSize
roughlyCleaned = fs.SizeSuffix(needToClean)
for k, v := c.First(); k != nil; k, v = c.Next() {
var ci chunkInfo
err := json.Unmarshal(v, &ci)
@ -521,7 +537,10 @@ func (b *Persistent) CleanChunksBySize(maxSize int64) {
}
}
}
fs.Infof("cache", "deleted (%v) chunks", cntChunks)
if cntChunks > 0 {
fs.Infof("cache-cleanup", "chunks %v, est. size: %v", cntChunks, roughlyCleaned.String())
}
return nil
})
@ -691,6 +710,313 @@ func (b *Persistent) iterateBuckets(buk *bolt.Bucket, bucketFn func(name string)
return err
}
func (b *Persistent) dumpRoot() string {
var itBuckets func(buk *bolt.Bucket) map[string]interface{}
itBuckets = func(buk *bolt.Bucket) map[string]interface{} {
m := make(map[string]interface{})
c := buk.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
if v == nil {
buk2 := buk.Bucket(k)
m[string(k)] = itBuckets(buk2)
} else {
m[string(k)] = "-"
}
}
return m
}
var mm map[string]interface{}
_ = b.db.View(func(tx *bolt.Tx) error {
mm = itBuckets(tx.Bucket([]byte(RootBucket)))
return nil
})
raw, _ := json.MarshalIndent(mm, "", " ")
return string(raw)
}
// addPendingUpload adds a new file to the pending queue of uploads
func (b *Persistent) addPendingUpload(destPath string, started bool) error {
return b.db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists([]byte(tempBucket))
if err != nil {
return errors.Errorf("couldn't bucket for %v", tempBucket)
}
tempObj := &tempUploadInfo{
DestPath: destPath,
AddedOn: time.Now(),
Started: started,
}
// cache Object Info
encoded, err := json.Marshal(tempObj)
if err != nil {
return errors.Errorf("couldn't marshal object (%v) info: %v", destPath, err)
}
err = bucket.Put([]byte(destPath), []byte(encoded))
if err != nil {
return errors.Errorf("couldn't cache object (%v) info: %v", destPath, err)
}
return nil
})
}
// getPendingUpload returns the next file from the pending queue of uploads
func (b *Persistent) getPendingUpload(inRoot string, waitTime time.Duration) (destPath string, err error) {
b.tempQueueMux.Lock()
defer b.tempQueueMux.Unlock()
err = b.db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists([]byte(tempBucket))
if err != nil {
return errors.Errorf("couldn't bucket for %v", tempBucket)
}
c := bucket.Cursor()
for k, v := c.Seek([]byte(inRoot)); k != nil && bytes.HasPrefix(k, []byte(inRoot)); k, v = c.Next() {
//for k, v := c.First(); k != nil; k, v = c.Next() {
var tempObj = &tempUploadInfo{}
err = json.Unmarshal(v, tempObj)
if err != nil {
fs.Errorf(b, "failed to read pending upload: %v", err)
continue
}
// skip over started uploads
if tempObj.Started || time.Now().Before(tempObj.AddedOn.Add(waitTime)) {
continue
}
tempObj.Started = true
v2, err := json.Marshal(tempObj)
if err != nil {
fs.Errorf(b, "failed to update pending upload: %v", err)
continue
}
err = bucket.Put(k, v2)
if err != nil {
fs.Errorf(b, "failed to update pending upload: %v", err)
continue
}
destPath = tempObj.DestPath
return nil
}
return errors.Errorf("no pending upload found")
})
return destPath, err
}
// SearchPendingUpload returns the file info from the pending queue of uploads
func (b *Persistent) SearchPendingUpload(remote string) (started bool, err error) {
err = b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(tempBucket))
if bucket == nil {
return errors.Errorf("couldn't bucket for %v", tempBucket)
}
var tempObj = &tempUploadInfo{}
v := bucket.Get([]byte(remote))
err = json.Unmarshal(v, tempObj)
if err != nil {
return errors.Errorf("pending upload (%v) not found %v", remote, err)
}
started = tempObj.Started
return nil
})
return started, err
}
// searchPendingUploadFromDir files currently pending upload from a single dir
func (b *Persistent) searchPendingUploadFromDir(dir string) (remotes []string, err error) {
err = b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(tempBucket))
if bucket == nil {
return errors.Errorf("couldn't bucket for %v", tempBucket)
}
c := bucket.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
var tempObj = &tempUploadInfo{}
err = json.Unmarshal(v, tempObj)
if err != nil {
fs.Errorf(b, "failed to read pending upload: %v", err)
continue
}
parentDir := cleanPath(path.Dir(tempObj.DestPath))
if dir == parentDir {
remotes = append(remotes, tempObj.DestPath)
}
}
return nil
})
return remotes, err
}
func (b *Persistent) rollbackPendingUpload(remote string) error {
b.tempQueueMux.Lock()
defer b.tempQueueMux.Unlock()
return b.db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists([]byte(tempBucket))
if err != nil {
return errors.Errorf("couldn't bucket for %v", tempBucket)
}
var tempObj = &tempUploadInfo{}
v := bucket.Get([]byte(remote))
err = json.Unmarshal(v, tempObj)
if err != nil {
return errors.Errorf("pending upload (%v) not found %v", remote, err)
}
tempObj.Started = false
v2, err := json.Marshal(tempObj)
if err != nil {
return errors.Errorf("pending upload not updated %v", err)
}
err = bucket.Put([]byte(tempObj.DestPath), v2)
if err != nil {
return errors.Errorf("pending upload not updated %v", err)
}
return nil
})
}
func (b *Persistent) removePendingUpload(remote string) error {
b.tempQueueMux.Lock()
defer b.tempQueueMux.Unlock()
return b.db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists([]byte(tempBucket))
if err != nil {
return errors.Errorf("couldn't bucket for %v", tempBucket)
}
return bucket.Delete([]byte(remote))
})
}
// updatePendingUpload allows to update an existing item in the queue while checking if it's not started in the same
// transaction. If it is started, it will not allow the update
func (b *Persistent) updatePendingUpload(remote string, fn func(item *tempUploadInfo) error) error {
b.tempQueueMux.Lock()
defer b.tempQueueMux.Unlock()
return b.db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists([]byte(tempBucket))
if err != nil {
return errors.Errorf("couldn't bucket for %v", tempBucket)
}
var tempObj = &tempUploadInfo{}
v := bucket.Get([]byte(remote))
err = json.Unmarshal(v, tempObj)
if err != nil {
return errors.Errorf("pending upload (%v) not found %v", remote, err)
}
if tempObj.Started {
return errors.Errorf("pending upload already started %v", remote)
}
err = fn(tempObj)
if err != nil {
return err
}
if remote != tempObj.DestPath {
err := bucket.Delete([]byte(remote))
if err != nil {
return err
}
// if this is removed then the entry can be removed too
if tempObj.DestPath == "" {
return nil
}
}
v2, err := json.Marshal(tempObj)
if err != nil {
return errors.Errorf("pending upload not updated %v", err)
}
err = bucket.Put([]byte(tempObj.DestPath), v2)
if err != nil {
return errors.Errorf("pending upload not updated %v", err)
}
return nil
})
}
// SetPendingUploadToStarted is a way to mark an entry as started (even if it's not already)
// TO BE USED IN TESTING ONLY
func (b *Persistent) SetPendingUploadToStarted(remote string) error {
return b.updatePendingUpload(remote, func(item *tempUploadInfo) error {
item.Started = true
return nil
})
}
// ReconcileTempUploads will recursively look for all the files in the temp directory and add them to the queue
func (b *Persistent) ReconcileTempUploads(cacheFs *Fs) error {
return b.db.Update(func(tx *bolt.Tx) error {
_ = tx.DeleteBucket([]byte(tempBucket))
bucket, err := tx.CreateBucketIfNotExists([]byte(tempBucket))
if err != nil {
return err
}
var queuedEntries []fs.Object
err = walk.Walk(cacheFs.tempFs, "", true, -1, func(path string, entries fs.DirEntries, err error) error {
for _, o := range entries {
if oo, ok := o.(fs.Object); ok {
queuedEntries = append(queuedEntries, oo)
}
}
return nil
})
if err != nil {
return err
}
fs.Debugf(cacheFs, "reconciling temporary uploads")
for _, queuedEntry := range queuedEntries {
destPath := path.Join(cacheFs.Root(), queuedEntry.Remote())
tempObj := &tempUploadInfo{
DestPath: destPath,
AddedOn: time.Now(),
Started: false,
}
// cache Object Info
encoded, err := json.Marshal(tempObj)
if err != nil {
return errors.Errorf("couldn't marshal object (%v) info: %v", queuedEntry, err)
}
err = bucket.Put([]byte(destPath), []byte(encoded))
if err != nil {
return errors.Errorf("couldn't cache object (%v) info: %v", destPath, err)
}
fs.Debugf(cacheFs, "reconciled temporary upload: %v", destPath)
}
return nil
})
}
// PurgeTempUploads will remove all the pending uploads from the queue
// TO BE USED IN TESTING ONLY
func (b *Persistent) PurgeTempUploads() {
b.tempQueueMux.Lock()
defer b.tempQueueMux.Unlock()
_ = b.db.Update(func(tx *bolt.Tx) error {
_ = tx.DeleteBucket([]byte(tempBucket))
_, _ = tx.CreateBucketIfNotExists([]byte(tempBucket))
return nil
})
}
// Close should be called when the program ends gracefully
func (b *Persistent) Close() {
b.cleanupMux.Lock()
@ -700,6 +1026,7 @@ func (b *Persistent) Close() {
if err != nil {
fs.Errorf(b, "closing handle: %v", err)
}
b.open = false
}
// itob returns an 8-byte big endian representation of v.

View File

@ -109,12 +109,38 @@ To start a cached mount
rclone mount --allow-other test-cache: /var/tmp/test-cache
### Write Features ###
### Offline uploading ###
In an effort to make writing through cache more reliable, the backend
now supports this feature which can be activated by specifying a
`cache-tmp-upload-path`.
A files goes through these states when using this feature:
1. An upload is started (usually by copying a file on the cache remote)
2. When the copy to the temporary location is complete the file is part
of the cached remote and looks and behaves like any other file (reading included)
3. After `cache-tmp-wait-time` passes and the file is next in line, `rclone move`
is used to move the file to the cloud provider
4. Reading the file still works during the upload but most modifications on it will be prohibited
5. Once the move is complete the file is unlocked for modifications as it
becomes as any other regular file
6. If the file is being read through `cache` when it's actually
deleted from the temporary path then `cache` will simply swap the source
to the cloud provider without interrupting the reading (small blip can happen though)
Files are uploaded in sequence and only one file is uploaded at a time.
Uploads will be stored in a queue and be processed based on the order they were added.
The queue and the temporary storage is persistent across restarts and even purges of the cache.
### Write Support ###
Writes are supported through `cache`.
One caveat is that a mounted cache remote does not add any retry or fallback
mechanism to the upload operation. This will depend on the implementation
of the wrapped remote.
of the wrapped remote. Consider using `Offline uploading` for reliable writes.
One special case is covered with `cache-writes` which will cache the file
data at the same time as the upload when it is enabled making it available
@ -157,6 +183,16 @@ Affected settings:
### Known issues ###
#### Mount and --dir-cache-time ####
--dir-cache-time controls the first layer of directory caching which works at the mount layer.
Being an independent caching mechanism from the `cache` backend, it will manage its own entries
based on the configured time.
To avoid getting in a scenario where dir cache has obsolete data and cache would have the correct
one, try to set `--dir-cache-time` to a lower time than `--cache-info-age`. Default values are
already configured in this way.
#### Windows support - Experimental ####
There are a couple of issues with Windows `mount` functionality that still require some investigations.
@ -341,3 +377,23 @@ you can enable this flag to have their data stored in the cache store at the
same time during upload.
**Default**: not set
#### --cache-tmp-upload-path=PATH ####
This is the path where `cache` will use as a temporary storage for new files
that need to be uploaded to the cloud provider.
Specifying a value will enable this feature. Without it, it is completely disabled
and files will be uploaded directly to the cloud provider
**Default**: empty
#### --cache-tmp-wait-time=DURATION ####
This is the duration that a file must wait in the temporary location
_cache-tmp-upload-path_ before it is selected for upload.
Note that only one file is uploaded at a time and it can take longer to
start the upload if a queue formed for this purpose.
**Default**: 15m