union: fix issues when using space-relavant and path-preserving policies

Path-preserving policy would need to look for the parent dir of operating path. Therefor if the operating path is the
same path as root passed in during NewFs, there would be no room for uplooking. And also About() might have
problem if the folder is no exist. RootFs is added to solve this problem.
This commit is contained in:
Max Sum 2020-01-31 11:34:46 +08:00 committed by Nick Craig-Wood
parent 0081971ade
commit c9374fbe5a
5 changed files with 72 additions and 40 deletions

View File

@ -2,6 +2,7 @@ package policy
import ( import (
"context" "context"
"path/filepath"
"sync" "sync"
"github.com/rclone/rclone/backend/union/upstream" "github.com/rclone/rclone/backend/union/upstream"
@ -27,7 +28,9 @@ func (p *EpAll) epall(ctx context.Context, upstreams []*upstream.Fs, path string
wg.Add(1) wg.Add(1)
i, u := i, u // Closure i, u := i, u // Closure
go func() { go func() {
if findEntry(ctx, u, path) != nil { rfs := u.RootFs
remote := filepath.Join(u.RootPath, path)
if findEntry(ctx, rfs, remote) != nil {
ufs[i] = u ufs[i] = u
} }
wg.Done() wg.Done()
@ -79,7 +82,7 @@ func (p *EpAll) Create(ctx context.Context, upstreams []*upstream.Fs, path strin
if len(upstreams) == 0 { if len(upstreams) == 0 {
return nil, fs.ErrorPermissionDenied return nil, fs.ErrorPermissionDenied
} }
upstreams, err := p.epall(ctx, upstreams, path) upstreams, err := p.epall(ctx, upstreams, path+"/..")
return upstreams, err return upstreams, err
} }

View File

@ -2,6 +2,7 @@ package policy
import ( import (
"context" "context"
"path/filepath"
"github.com/rclone/rclone/backend/union/upstream" "github.com/rclone/rclone/backend/union/upstream"
"github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs"
@ -20,7 +21,9 @@ func (p *EpFF) epff(ctx context.Context, upstreams []*upstream.Fs, path string)
for _, u := range upstreams { for _, u := range upstreams {
u := u // Closure u := u // Closure
go func() { go func() {
if findEntry(ctx, u, path) == nil { rfs := u.RootFs
remote := filepath.Join(u.RootPath, path)
if findEntry(ctx, rfs, remote) == nil {
u = nil u = nil
} }
ch <- u ch <- u
@ -79,7 +82,7 @@ func (p *EpFF) Create(ctx context.Context, upstreams []*upstream.Fs, path string
if len(upstreams) == 0 { if len(upstreams) == 0 {
return nil, fs.ErrorPermissionDenied return nil, fs.ErrorPermissionDenied
} }
u, err := p.epff(ctx, upstreams, path) u, err := p.epff(ctx, upstreams, path+"/..")
return []*upstream.Fs{u}, err return []*upstream.Fs{u}, err
} }

View File

@ -2,6 +2,7 @@ package policy
import ( import (
"context" "context"
"path/filepath"
"sync" "sync"
"time" "time"
@ -28,7 +29,9 @@ func (p *Newest) newest(ctx context.Context, upstreams []*upstream.Fs, path stri
i, u := i, u // Closure i, u := i, u // Closure
go func() { go func() {
defer wg.Done() defer wg.Done()
if e := findEntry(ctx, u, path); e != nil { rfs := u.RootFs
remote := filepath.Join(u.RootPath, path)
if e := findEntry(ctx, rfs, remote); e != nil {
ufs[i] = u ufs[i] = u
mtimes[i] = e.ModTime(ctx) mtimes[i] = e.ModTime(ctx)
} }
@ -112,7 +115,7 @@ func (p *Newest) Create(ctx context.Context, upstreams []*upstream.Fs, path stri
if len(upstreams) == 0 { if len(upstreams) == 0 {
return nil, fs.ErrorPermissionDenied return nil, fs.ErrorPermissionDenied
} }
u, err := p.newest(ctx, upstreams, path) u, err := p.newest(ctx, upstreams, path+"/..")
return []*upstream.Fs{u}, err return []*upstream.Fs{u}, err
} }

View File

@ -15,7 +15,6 @@ import (
"github.com/rclone/rclone/backend/union/policy" "github.com/rclone/rclone/backend/union/policy"
"github.com/rclone/rclone/backend/union/upstream" "github.com/rclone/rclone/backend/union/upstream"
"github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/cache"
"github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/config/configstruct" "github.com/rclone/rclone/fs/config/configstruct"
"github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/hash"
@ -143,21 +142,12 @@ func (f *Fs) Hashes() hash.Set {
// Mkdir makes the root directory of the Fs object // Mkdir makes the root directory of the Fs object
func (f *Fs) Mkdir(ctx context.Context, dir string) error { func (f *Fs) Mkdir(ctx context.Context, dir string) error {
var upstreams []*upstream.Fs upstreams, err := f.create(ctx, dir)
var err error if err == fs.ErrorObjectNotFound && dir != parentDir(dir) {
if dir == "" { if err := f.Mkdir(ctx, parentDir(dir)); err != nil {
pf, e := cache.Get(f.name + ":") return err
if e != nil {
return e
} }
pfs, ok := pf.(*Fs) upstreams, err = f.create(ctx, dir)
if !ok {
return errors.New("failed to get parent Fs")
}
upstreams, err = pfs.create(ctx, "")
dir = f.root
} else {
upstreams, err = f.create(ctx, parentDir(dir))
} }
if err != nil { if err != nil {
return err return err
@ -259,7 +249,7 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object,
errs[i] = errors.Wrap(fs.ErrorNotAFile, u.Name()) errs[i] = errors.Wrap(fs.ErrorNotAFile, u.Name())
return return
} }
mo, err := u.Features().Move(ctx, o, remote) mo, err := u.Features().Move(ctx, o.UnWrap(), remote)
if err != nil || mo == nil { if err != nil || mo == nil {
errs[i] = errors.Wrap(err, u.Name()) errs[i] = errors.Wrap(err, u.Name())
return return
@ -288,12 +278,12 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object,
// //
// If destination exists then return fs.ErrorDirExists // If destination exists then return fs.ErrorDirExists
func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) error { func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) error {
_, ok := src.(*Fs) sfs, ok := src.(*Fs)
if !ok { if !ok {
fs.Debugf(src, "Can't move directory - not same remote type") fs.Debugf(src, "Can't move directory - not same remote type")
return fs.ErrorCantDirMove return fs.ErrorCantDirMove
} }
upstreams, err := f.action(ctx, srcRemote) upstreams, err := sfs.action(ctx, srcRemote)
if err != nil { if err != nil {
return err return err
} }
@ -304,11 +294,30 @@ func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string
} }
errs := Errors(make([]error, len(upstreams))) errs := Errors(make([]error, len(upstreams)))
multithread(len(upstreams), func(i int) { multithread(len(upstreams), func(i int) {
u := upstreams[i] su := upstreams[i]
err := u.Features().DirMove(ctx, u, srcRemote, dstRemote) var du *upstream.Fs
errs[i] = errors.Wrap(err, u.Name()) for _, u := range f.upstreams {
if u.RootFs.Root() == su.RootFs.Root() {
du = u
}
}
if du == nil {
errs[i] = errors.Wrap(fs.ErrorCantDirMove, su.Name()+":"+su.Root())
return
}
err := du.Features().DirMove(ctx, su.Fs, srcRemote, dstRemote)
errs[i] = errors.Wrap(err, du.Name()+":"+du.Root())
}) })
return errs.Err() errs = errs.FilterNil()
if len(errs) == 0 {
return nil
}
for _, e := range errs {
if errors.Cause(e) != fs.ErrorDirExists {
return errs
}
}
return fs.ErrorDirExists
} }
// ChangeNotify calls the passed function with a path // ChangeNotify calls the passed function with a path
@ -355,7 +364,13 @@ func (f *Fs) DirCacheFlush() {
func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, stream bool, options ...fs.OpenOption) (fs.Object, error) { func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, stream bool, options ...fs.OpenOption) (fs.Object, error) {
srcPath := src.Remote() srcPath := src.Remote()
upstreams, err := f.create(ctx, parentDir(srcPath)) upstreams, err := f.create(ctx, srcPath)
if err == fs.ErrorObjectNotFound {
if err := f.Mkdir(ctx, parentDir(srcPath)); err != nil {
return nil, err
}
upstreams, err = f.create(ctx, srcPath)
}
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -23,6 +23,8 @@ var (
// Fs is a wrap of any fs and its configs // Fs is a wrap of any fs and its configs
type Fs struct { type Fs struct {
fs.Fs fs.Fs
RootFs fs.Fs
RootPath string
writable bool writable bool
creatable bool creatable bool
usage *fs.Usage // Cache the usage usage *fs.Usage // Cache the usage
@ -63,7 +65,8 @@ func New(remote, root string, cacheTime time.Duration) (*Fs, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
rFs := &Fs{ f := &Fs{
RootPath: root,
writable: true, writable: true,
creatable: true, creatable: true,
cacheExpiry: time.Now().Unix(), cacheExpiry: time.Now().Unix(),
@ -71,24 +74,29 @@ func New(remote, root string, cacheTime time.Duration) (*Fs, error) {
usage: &fs.Usage{}, usage: &fs.Usage{},
} }
if strings.HasSuffix(fsPath, ":ro") { if strings.HasSuffix(fsPath, ":ro") {
rFs.writable = false f.writable = false
rFs.creatable = false f.creatable = false
fsPath = fsPath[0 : len(fsPath)-3] fsPath = fsPath[0 : len(fsPath)-3]
} else if strings.HasSuffix(fsPath, ":nc") { } else if strings.HasSuffix(fsPath, ":nc") {
rFs.writable = true f.writable = true
rFs.creatable = false f.creatable = false
fsPath = fsPath[0 : len(fsPath)-3] fsPath = fsPath[0 : len(fsPath)-3]
} }
var rootString = path.Join(fsPath, filepath.ToSlash(root))
if configName != "local" { if configName != "local" {
rootString = configName + ":" + rootString fsPath = configName + ":" + fsPath
} }
rFs, err := cache.Get(fsPath)
if err != nil && err != fs.ErrorIsFile {
return nil, err
}
f.RootFs = rFs
rootString := path.Join(fsPath, filepath.ToSlash(root))
myFs, err := cache.Get(rootString) myFs, err := cache.Get(rootString)
if err != nil && err != fs.ErrorIsFile { if err != nil && err != fs.ErrorIsFile {
return nil, err return nil, err
} }
rFs.Fs = myFs f.Fs = myFs
return rFs, err return f, err
} }
// WrapDirectory wraps a fs.Directory to include the info // WrapDirectory wraps a fs.Directory to include the info
@ -274,7 +282,7 @@ func (f *Fs) GetUsedSpace() (int64, error) {
} }
func (f *Fs) updateUsage() (err error) { func (f *Fs) updateUsage() (err error) {
if do := f.Fs.Features().About; do == nil { if do := f.RootFs.Features().About; do == nil {
return ErrUsageFieldNotSupported return ErrUsageFieldNotSupported
} }
done := false done := false
@ -301,7 +309,7 @@ func (f *Fs) updateUsageCore(lock bool) error {
// Run in background, should not be cancelled by user // Run in background, should not be cancelled by user
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel() defer cancel()
usage, err := f.Features().About(ctx) usage, err := f.RootFs.Features().About(ctx)
if err != nil { if err != nil {
f.cacheUpdate = false f.cacheUpdate = false
return err return err