mirror of
https://github.com/rclone/rclone.git
synced 2025-01-08 07:20:42 +01:00
386ca20792
Before this patch backends could be shutdown when they fell out of the cache when they were in use with combine. This was particularly noticeable with the dropbox backend which gave this error when uploading files after the backend was Shutdown. Failed to copy: upload failed: batcher is shutting down This patch gets the combine remote to pin them until it is finished. See: https://forum.rclone.org/t/rclone-combine-upload-failed-batcher-is-shutting-down/32168
993 lines
25 KiB
Go
993 lines
25 KiB
Go
// Package combine implents a backend to combine multipe remotes in a directory tree
|
|
package combine
|
|
|
|
/*
|
|
Have API to add/remove branches in the combine
|
|
*/
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"path"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/cache"
|
|
"github.com/rclone/rclone/fs/config/configmap"
|
|
"github.com/rclone/rclone/fs/config/configstruct"
|
|
"github.com/rclone/rclone/fs/hash"
|
|
"github.com/rclone/rclone/fs/operations"
|
|
"github.com/rclone/rclone/fs/walk"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
// Register with Fs
|
|
func init() {
|
|
fsi := &fs.RegInfo{
|
|
Name: "combine",
|
|
Description: "Combine several remotes into one",
|
|
NewFs: NewFs,
|
|
MetadataInfo: &fs.MetadataInfo{
|
|
Help: `Any metadata supported by the underlying remote is read and written.`,
|
|
},
|
|
Options: []fs.Option{{
|
|
Name: "upstreams",
|
|
Help: `Upstreams for combining
|
|
|
|
These should be in the form
|
|
|
|
dir=remote:path dir2=remote2:path
|
|
|
|
Where before the = is specified the root directory and after is the remote to
|
|
put there.
|
|
|
|
Embedded spaces can be added using quotes
|
|
|
|
"dir=remote:path with space" "dir2=remote2:path with space"
|
|
|
|
`,
|
|
Required: true,
|
|
Default: fs.SpaceSepList(nil),
|
|
}},
|
|
}
|
|
fs.Register(fsi)
|
|
}
|
|
|
|
// Options defines the configuration for this backend
|
|
type Options struct {
|
|
Upstreams fs.SpaceSepList `config:"upstreams"`
|
|
}
|
|
|
|
// Fs represents a combine of upstreams
|
|
type Fs struct {
|
|
name string // name of this remote
|
|
features *fs.Features // optional features
|
|
opt Options // options for this Fs
|
|
root string // the path we are working on
|
|
hashSet hash.Set // common hashes
|
|
when time.Time // directory times
|
|
upstreams map[string]*upstream // map of upstreams
|
|
}
|
|
|
|
// adjustment stores the info to add a prefix to a path or chop characters off
|
|
type adjustment struct {
|
|
root string
|
|
rootSlash string
|
|
mountpoint string
|
|
mountpointSlash string
|
|
}
|
|
|
|
// newAdjustment makes a new path adjustment adjusting between mountpoint and root
|
|
//
|
|
// mountpoint is the point the upstream is mounted and root is the combine root
|
|
func newAdjustment(root, mountpoint string) (a adjustment) {
|
|
return adjustment{
|
|
root: root,
|
|
rootSlash: root + "/",
|
|
mountpoint: mountpoint,
|
|
mountpointSlash: mountpoint + "/",
|
|
}
|
|
}
|
|
|
|
var errNotUnderRoot = errors.New("file not under root")
|
|
|
|
// do makes the adjustment on s, mapping an upstream path into a combine path
|
|
func (a *adjustment) do(s string) (string, error) {
|
|
absPath := join(a.mountpoint, s)
|
|
if a.root == "" {
|
|
return absPath, nil
|
|
}
|
|
if absPath == a.root {
|
|
return "", nil
|
|
}
|
|
if !strings.HasPrefix(absPath, a.rootSlash) {
|
|
return "", errNotUnderRoot
|
|
}
|
|
return absPath[len(a.rootSlash):], nil
|
|
}
|
|
|
|
// undo makes the adjustment on s, mapping a combine path into an upstream path
|
|
func (a *adjustment) undo(s string) (string, error) {
|
|
absPath := join(a.root, s)
|
|
if absPath == a.mountpoint {
|
|
return "", nil
|
|
}
|
|
if !strings.HasPrefix(absPath, a.mountpointSlash) {
|
|
return "", errNotUnderRoot
|
|
}
|
|
return absPath[len(a.mountpointSlash):], nil
|
|
}
|
|
|
|
// upstream represents an upstream Fs
|
|
type upstream struct {
|
|
f fs.Fs
|
|
parent *Fs
|
|
dir string // directory the upstream is mounted
|
|
pathAdjustment adjustment // how to fiddle with the path
|
|
}
|
|
|
|
// Create an upstream from the directory it is mounted on and the remote
|
|
func (f *Fs) newUpstream(ctx context.Context, dir, remote string) (*upstream, error) {
|
|
uFs, err := cache.Get(ctx, remote)
|
|
if err == fs.ErrorIsFile {
|
|
return nil, fmt.Errorf("can't combine files yet, only directories %q: %w", remote, err)
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create upstream %q: %w", remote, err)
|
|
}
|
|
u := &upstream{
|
|
f: uFs,
|
|
parent: f,
|
|
dir: dir,
|
|
pathAdjustment: newAdjustment(f.root, dir),
|
|
}
|
|
cache.PinUntilFinalized(u.f, u)
|
|
return u, nil
|
|
}
|
|
|
|
// NewFs constructs an Fs from the path.
|
|
//
|
|
// The returned Fs is the actual Fs, referenced by remote in the config
|
|
func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (outFs fs.Fs, err error) {
|
|
// defer log.Trace(nil, "name=%q, root=%q, m=%v", name, root, m)("f=%+v, err=%v", &outFs, &err)
|
|
// Parse config into Options struct
|
|
opt := new(Options)
|
|
err = configstruct.Set(m, opt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Backward compatible to old config
|
|
if len(opt.Upstreams) == 0 {
|
|
return nil, errors.New("combine can't point to an empty upstream - check the value of the upstreams setting")
|
|
}
|
|
for _, u := range opt.Upstreams {
|
|
if strings.HasPrefix(u, name+":") {
|
|
return nil, errors.New("can't point combine remote at itself - check the value of the upstreams setting")
|
|
}
|
|
}
|
|
isDir := false
|
|
for strings.HasSuffix(root, "/") {
|
|
root = root[:len(root)-1]
|
|
isDir = true
|
|
}
|
|
|
|
f := &Fs{
|
|
name: name,
|
|
root: root,
|
|
opt: *opt,
|
|
upstreams: make(map[string]*upstream, len(opt.Upstreams)),
|
|
when: time.Now(),
|
|
}
|
|
|
|
g, gCtx := errgroup.WithContext(ctx)
|
|
var mu sync.Mutex
|
|
for _, upstream := range opt.Upstreams {
|
|
upstream := upstream
|
|
g.Go(func() (err error) {
|
|
equal := strings.IndexRune(upstream, '=')
|
|
if equal < 0 {
|
|
return fmt.Errorf("no \"=\" in upstream definition %q", upstream)
|
|
}
|
|
dir, remote := upstream[:equal], upstream[equal+1:]
|
|
if dir == "" {
|
|
return fmt.Errorf("empty dir in upstream definition %q", upstream)
|
|
}
|
|
if remote == "" {
|
|
return fmt.Errorf("empty remote in upstream definition %q", upstream)
|
|
}
|
|
if strings.ContainsRune(dir, '/') {
|
|
return fmt.Errorf("dirs can't contain / (yet): %q", dir)
|
|
}
|
|
u, err := f.newUpstream(gCtx, dir, remote)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
mu.Lock()
|
|
if _, found := f.upstreams[dir]; found {
|
|
err = fmt.Errorf("duplicate directory name %q", dir)
|
|
} else {
|
|
f.upstreams[dir] = u
|
|
}
|
|
mu.Unlock()
|
|
return err
|
|
})
|
|
}
|
|
err = g.Wait()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// check features
|
|
var features = (&fs.Features{
|
|
CaseInsensitive: true,
|
|
DuplicateFiles: false,
|
|
ReadMimeType: true,
|
|
WriteMimeType: true,
|
|
CanHaveEmptyDirectories: true,
|
|
BucketBased: true,
|
|
SetTier: true,
|
|
GetTier: true,
|
|
ReadMetadata: true,
|
|
WriteMetadata: true,
|
|
UserMetadata: true,
|
|
}).Fill(ctx, f)
|
|
canMove := true
|
|
for _, u := range f.upstreams {
|
|
features = features.Mask(ctx, u.f) // Mask all upstream fs
|
|
if !operations.CanServerSideMove(u.f) {
|
|
canMove = false
|
|
}
|
|
}
|
|
// We can move if all remotes support Move or Copy
|
|
if canMove {
|
|
features.Move = f.Move
|
|
}
|
|
|
|
// Enable ListR when upstreams either support ListR or is local
|
|
// But not when all upstreams are local
|
|
if features.ListR == nil {
|
|
for _, u := range f.upstreams {
|
|
if u.f.Features().ListR != nil {
|
|
features.ListR = f.ListR
|
|
} else if !u.f.Features().IsLocal {
|
|
features.ListR = nil
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Enable Purge when any upstreams support it
|
|
if features.Purge == nil {
|
|
for _, u := range f.upstreams {
|
|
if u.f.Features().Purge != nil {
|
|
features.Purge = f.Purge
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Enable Shutdown when any upstreams support it
|
|
if features.Shutdown == nil {
|
|
for _, u := range f.upstreams {
|
|
if u.f.Features().Shutdown != nil {
|
|
features.Shutdown = f.Shutdown
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Enable DirCacheFlush when any upstreams support it
|
|
if features.DirCacheFlush == nil {
|
|
for _, u := range f.upstreams {
|
|
if u.f.Features().DirCacheFlush != nil {
|
|
features.DirCacheFlush = f.DirCacheFlush
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Enable ChangeNotify when any upstreams support it
|
|
if features.ChangeNotify == nil {
|
|
for _, u := range f.upstreams {
|
|
if u.f.Features().ChangeNotify != nil {
|
|
features.ChangeNotify = f.ChangeNotify
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
f.features = features
|
|
|
|
// Get common intersection of hashes
|
|
var hashSet hash.Set
|
|
var first = true
|
|
for _, u := range f.upstreams {
|
|
if first {
|
|
hashSet = u.f.Hashes()
|
|
first = false
|
|
} else {
|
|
hashSet = hashSet.Overlap(u.f.Hashes())
|
|
}
|
|
}
|
|
f.hashSet = hashSet
|
|
|
|
// Check to see if the root is actually a file
|
|
if f.root != "" && !isDir {
|
|
_, err := f.NewObject(ctx, "")
|
|
if err != nil {
|
|
if err == fs.ErrorObjectNotFound || err == fs.ErrorNotAFile || err == fs.ErrorIsDir {
|
|
// File doesn't exist or is a directory so return old f
|
|
return f, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
// Check to see if the root path is actually an existing file
|
|
f.root = path.Dir(f.root)
|
|
if f.root == "." {
|
|
f.root = ""
|
|
}
|
|
// Adjust path adjustment to remove leaf
|
|
for _, u := range f.upstreams {
|
|
u.pathAdjustment = newAdjustment(f.root, u.dir)
|
|
}
|
|
return f, fs.ErrorIsFile
|
|
}
|
|
return f, nil
|
|
}
|
|
|
|
// Run a function over all the upstreams in parallel
|
|
func (f *Fs) multithread(ctx context.Context, fn func(context.Context, *upstream) error) error {
|
|
g, gCtx := errgroup.WithContext(ctx)
|
|
for _, u := range f.upstreams {
|
|
u := u
|
|
g.Go(func() (err error) {
|
|
return fn(gCtx, u)
|
|
})
|
|
}
|
|
return g.Wait()
|
|
}
|
|
|
|
// join the elements together but unline path.Join return empty string
|
|
func join(elem ...string) string {
|
|
result := path.Join(elem...)
|
|
if result == "." {
|
|
return ""
|
|
}
|
|
if len(result) > 0 && result[0] == '/' {
|
|
result = result[1:]
|
|
}
|
|
return result
|
|
}
|
|
|
|
// find the upstream for the remote passed in, returning the upstream and the adjusted path
|
|
func (f *Fs) findUpstream(remote string) (u *upstream, uRemote string, err error) {
|
|
// defer log.Trace(remote, "")("f=%v, uRemote=%q, err=%v", &u, &uRemote, &err)
|
|
for _, u := range f.upstreams {
|
|
uRemote, err = u.pathAdjustment.undo(remote)
|
|
if err == nil {
|
|
return u, uRemote, nil
|
|
}
|
|
}
|
|
return nil, "", fmt.Errorf("combine for remote %q: %w", remote, fs.ErrorDirNotFound)
|
|
}
|
|
|
|
// Name of the remote (as passed into NewFs)
|
|
func (f *Fs) Name() string {
|
|
return f.name
|
|
}
|
|
|
|
// Root of the remote (as passed into NewFs)
|
|
func (f *Fs) Root() string {
|
|
return f.root
|
|
}
|
|
|
|
// String converts this Fs to a string
|
|
func (f *Fs) String() string {
|
|
return fmt.Sprintf("combine root '%s'", f.root)
|
|
}
|
|
|
|
// Features returns the optional features of this Fs
|
|
func (f *Fs) Features() *fs.Features {
|
|
return f.features
|
|
}
|
|
|
|
// Rmdir removes the root directory of the Fs object
|
|
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
|
|
// The root always exists
|
|
if f.root == "" && dir == "" {
|
|
return nil
|
|
}
|
|
u, uRemote, err := f.findUpstream(dir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return u.f.Rmdir(ctx, uRemote)
|
|
}
|
|
|
|
// Hashes returns hash.HashNone to indicate remote hashing is unavailable
|
|
func (f *Fs) Hashes() hash.Set {
|
|
return f.hashSet
|
|
}
|
|
|
|
// Mkdir makes the root directory of the Fs object
|
|
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
|
|
// The root always exists
|
|
if f.root == "" && dir == "" {
|
|
return nil
|
|
}
|
|
u, uRemote, err := f.findUpstream(dir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return u.f.Mkdir(ctx, uRemote)
|
|
}
|
|
|
|
// purge the upstream or fallback to a slow way
|
|
func (u *upstream) purge(ctx context.Context, dir string) (err error) {
|
|
if do := u.f.Features().Purge; do != nil {
|
|
err = do(ctx, dir)
|
|
} else {
|
|
err = operations.Purge(ctx, u.f, dir)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Purge all files in the directory
|
|
//
|
|
// Implement this if you have a way of deleting all the files
|
|
// quicker than just running Remove() on the result of List()
|
|
//
|
|
// Return an error if it doesn't exist
|
|
func (f *Fs) Purge(ctx context.Context, dir string) error {
|
|
if f.root == "" && dir == "" {
|
|
return f.multithread(ctx, func(ctx context.Context, u *upstream) error {
|
|
return u.purge(ctx, "")
|
|
})
|
|
}
|
|
u, uRemote, err := f.findUpstream(dir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return u.purge(ctx, uRemote)
|
|
}
|
|
|
|
// Copy src to this remote using server-side copy operations.
|
|
//
|
|
// This is stored with the remote path given
|
|
//
|
|
// It returns the destination Object and a possible error
|
|
//
|
|
// Will only be called if src.Fs().Name() == f.Name()
|
|
//
|
|
// If it isn't possible then return fs.ErrorCantCopy
|
|
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
|
|
srcObj, ok := src.(*Object)
|
|
if !ok {
|
|
fs.Debugf(src, "Can't copy - not same remote type")
|
|
return nil, fs.ErrorCantCopy
|
|
}
|
|
|
|
dstU, dstRemote, err := f.findUpstream(remote)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
do := dstU.f.Features().Copy
|
|
if do == nil {
|
|
return nil, fs.ErrorCantCopy
|
|
}
|
|
|
|
o, err := do(ctx, srcObj.Object, dstRemote)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return dstU.newObject(o), nil
|
|
}
|
|
|
|
// Move src to this remote using server-side move operations.
|
|
//
|
|
// This is stored with the remote path given
|
|
//
|
|
// It returns the destination Object and a possible error
|
|
//
|
|
// Will only be called if src.Fs().Name() == f.Name()
|
|
//
|
|
// If it isn't possible then return fs.ErrorCantMove
|
|
func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
|
|
srcObj, ok := src.(*Object)
|
|
if !ok {
|
|
fs.Debugf(src, "Can't move - not same remote type")
|
|
return nil, fs.ErrorCantMove
|
|
}
|
|
|
|
dstU, dstRemote, err := f.findUpstream(remote)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
do := dstU.f.Features().Move
|
|
useCopy := false
|
|
if do == nil {
|
|
do = dstU.f.Features().Copy
|
|
if do == nil {
|
|
return nil, fs.ErrorCantMove
|
|
}
|
|
useCopy = true
|
|
}
|
|
|
|
o, err := do(ctx, srcObj.Object, dstRemote)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// If did Copy then remove the source object
|
|
if useCopy {
|
|
err = srcObj.Remove(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return dstU.newObject(o), nil
|
|
}
|
|
|
|
// DirMove moves src, srcRemote to this remote at dstRemote
|
|
// using server-side move operations.
|
|
//
|
|
// Will only be called if src.Fs().Name() == f.Name()
|
|
//
|
|
// If it isn't possible then return fs.ErrorCantDirMove
|
|
//
|
|
// If destination exists then return fs.ErrorDirExists
|
|
func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) (err error) {
|
|
// defer log.Trace(f, "src=%v, srcRemote=%q, dstRemote=%q", src, srcRemote, dstRemote)("err=%v", &err)
|
|
srcFs, ok := src.(*Fs)
|
|
if !ok {
|
|
fs.Debugf(src, "Can't move directory - not same remote type")
|
|
return fs.ErrorCantDirMove
|
|
}
|
|
|
|
dstU, dstURemote, err := f.findUpstream(dstRemote)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
srcU, srcURemote, err := srcFs.findUpstream(srcRemote)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
do := dstU.f.Features().DirMove
|
|
if do == nil {
|
|
return fs.ErrorCantDirMove
|
|
}
|
|
|
|
fs.Logf(dstU.f, "srcU.f=%v, srcURemote=%q, dstURemote=%q", srcU.f, srcURemote, dstURemote)
|
|
return do(ctx, srcU.f, srcURemote, dstURemote)
|
|
}
|
|
|
|
// ChangeNotify calls the passed function with a path
|
|
// that has had changes. If the implementation
|
|
// uses polling, it should adhere to the given interval.
|
|
// At least one value will be written to the channel,
|
|
// specifying the initial value and updated values might
|
|
// follow. A 0 Duration should pause the polling.
|
|
// The ChangeNotify implementation must empty the channel
|
|
// regularly. When the channel gets closed, the implementation
|
|
// should stop polling and release resources.
|
|
func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), ch <-chan time.Duration) {
|
|
var uChans []chan time.Duration
|
|
|
|
for _, u := range f.upstreams {
|
|
u := u
|
|
if do := u.f.Features().ChangeNotify; do != nil {
|
|
ch := make(chan time.Duration)
|
|
uChans = append(uChans, ch)
|
|
wrappedNotifyFunc := func(path string, entryType fs.EntryType) {
|
|
newPath, err := u.pathAdjustment.do(path)
|
|
if err != nil {
|
|
fs.Logf(f, "ChangeNotify: unable to process %q: %s", path, err)
|
|
return
|
|
}
|
|
fs.Debugf(f, "ChangeNotify: path %q entryType %d", newPath, entryType)
|
|
notifyFunc(newPath, entryType)
|
|
}
|
|
do(ctx, wrappedNotifyFunc, ch)
|
|
}
|
|
}
|
|
|
|
go func() {
|
|
for i := range ch {
|
|
for _, c := range uChans {
|
|
c <- i
|
|
}
|
|
}
|
|
for _, c := range uChans {
|
|
close(c)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// DirCacheFlush resets the directory cache - used in testing
|
|
// as an optional interface
|
|
func (f *Fs) DirCacheFlush() {
|
|
ctx := context.Background()
|
|
_ = f.multithread(ctx, func(ctx context.Context, u *upstream) error {
|
|
if do := u.f.Features().DirCacheFlush; do != nil {
|
|
do()
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, stream bool, options ...fs.OpenOption) (fs.Object, error) {
|
|
srcPath := src.Remote()
|
|
u, uRemote, err := f.findUpstream(srcPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
uSrc := operations.NewOverrideRemote(src, uRemote)
|
|
var o fs.Object
|
|
if stream {
|
|
o, err = u.f.Features().PutStream(ctx, in, uSrc, options...)
|
|
} else {
|
|
o, err = u.f.Put(ctx, in, uSrc, options...)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return u.newObject(o), nil
|
|
}
|
|
|
|
// Put in to the remote path with the modTime given of the given size
|
|
//
|
|
// May create the object even if it returns an error - if so
|
|
// will return the object and the error, otherwise will return
|
|
// nil and the error
|
|
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
|
o, err := f.NewObject(ctx, src.Remote())
|
|
switch err {
|
|
case nil:
|
|
return o, o.Update(ctx, in, src, options...)
|
|
case fs.ErrorObjectNotFound:
|
|
return f.put(ctx, in, src, false, options...)
|
|
default:
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// PutStream uploads to the remote path with the modTime given of indeterminate size
|
|
//
|
|
// May create the object even if it returns an error - if so
|
|
// will return the object and the error, otherwise will return
|
|
// nil and the error
|
|
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
|
o, err := f.NewObject(ctx, src.Remote())
|
|
switch err {
|
|
case nil:
|
|
return o, o.Update(ctx, in, src, options...)
|
|
case fs.ErrorObjectNotFound:
|
|
return f.put(ctx, in, src, true, options...)
|
|
default:
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// About gets quota information from the Fs
|
|
func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
|
|
usage := &fs.Usage{
|
|
Total: new(int64),
|
|
Used: new(int64),
|
|
Trashed: new(int64),
|
|
Other: new(int64),
|
|
Free: new(int64),
|
|
Objects: new(int64),
|
|
}
|
|
for _, u := range f.upstreams {
|
|
doAbout := u.f.Features().About
|
|
if doAbout == nil {
|
|
continue
|
|
}
|
|
usg, err := doAbout(ctx)
|
|
if errors.Is(err, fs.ErrorDirNotFound) {
|
|
continue
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if usg.Total != nil && usage.Total != nil {
|
|
*usage.Total += *usg.Total
|
|
} else {
|
|
usage.Total = nil
|
|
}
|
|
if usg.Used != nil && usage.Used != nil {
|
|
*usage.Used += *usg.Used
|
|
} else {
|
|
usage.Used = nil
|
|
}
|
|
if usg.Trashed != nil && usage.Trashed != nil {
|
|
*usage.Trashed += *usg.Trashed
|
|
} else {
|
|
usage.Trashed = nil
|
|
}
|
|
if usg.Other != nil && usage.Other != nil {
|
|
*usage.Other += *usg.Other
|
|
} else {
|
|
usage.Other = nil
|
|
}
|
|
if usg.Free != nil && usage.Free != nil {
|
|
*usage.Free += *usg.Free
|
|
} else {
|
|
usage.Free = nil
|
|
}
|
|
if usg.Objects != nil && usage.Objects != nil {
|
|
*usage.Objects += *usg.Objects
|
|
} else {
|
|
usage.Objects = nil
|
|
}
|
|
}
|
|
return usage, nil
|
|
}
|
|
|
|
// Wraps entries for this upstream
|
|
func (u *upstream) wrapEntries(ctx context.Context, entries fs.DirEntries) (fs.DirEntries, error) {
|
|
for i, entry := range entries {
|
|
switch x := entry.(type) {
|
|
case fs.Object:
|
|
entries[i] = u.newObject(x)
|
|
case fs.Directory:
|
|
newDir := fs.NewDirCopy(ctx, x)
|
|
newPath, err := u.pathAdjustment.do(newDir.Remote())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
newDir.SetRemote(newPath)
|
|
entries[i] = newDir
|
|
default:
|
|
return nil, fmt.Errorf("unknown entry type %T", entry)
|
|
}
|
|
}
|
|
return entries, nil
|
|
}
|
|
|
|
// List the objects and directories in dir into entries. The
|
|
// entries can be returned in any order but should be for a
|
|
// complete directory.
|
|
//
|
|
// dir should be "" to list the root, and should not have
|
|
// trailing slashes.
|
|
//
|
|
// This should return ErrDirNotFound if the directory isn't
|
|
// found.
|
|
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
|
|
// defer log.Trace(f, "dir=%q", dir)("entries = %v, err=%v", &entries, &err)
|
|
if f.root == "" && dir == "" {
|
|
entries = make(fs.DirEntries, 0, len(f.upstreams))
|
|
for combineDir := range f.upstreams {
|
|
d := fs.NewDir(combineDir, f.when)
|
|
entries = append(entries, d)
|
|
}
|
|
return entries, nil
|
|
}
|
|
u, uRemote, err := f.findUpstream(dir)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
entries, err = u.f.List(ctx, uRemote)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return u.wrapEntries(ctx, entries)
|
|
}
|
|
|
|
// ListR lists the objects and directories of the Fs starting
|
|
// from dir recursively into out.
|
|
//
|
|
// dir should be "" to start from the root, and should not
|
|
// have trailing slashes.
|
|
//
|
|
// This should return ErrDirNotFound if the directory isn't
|
|
// found.
|
|
//
|
|
// It should call callback for each tranche of entries read.
|
|
// These need not be returned in any particular order. If
|
|
// callback returns an error then the listing will stop
|
|
// immediately.
|
|
//
|
|
// Don't implement this unless you have a more efficient way
|
|
// of listing recursively that doing a directory traversal.
|
|
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
|
|
// defer log.Trace(f, "dir=%q, callback=%v", dir, callback)("err=%v", &err)
|
|
if f.root == "" && dir == "" {
|
|
rootEntries, err := f.List(ctx, "")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = callback(rootEntries)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var mu sync.Mutex
|
|
syncCallback := func(entries fs.DirEntries) error {
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
return callback(entries)
|
|
}
|
|
err = f.multithread(ctx, func(ctx context.Context, u *upstream) error {
|
|
return f.ListR(ctx, u.dir, syncCallback)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
u, uRemote, err := f.findUpstream(dir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
wrapCallback := func(entries fs.DirEntries) error {
|
|
entries, err := u.wrapEntries(ctx, entries)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return callback(entries)
|
|
}
|
|
if do := u.f.Features().ListR; do != nil {
|
|
err = do(ctx, uRemote, wrapCallback)
|
|
} else {
|
|
err = walk.ListR(ctx, u.f, uRemote, true, -1, walk.ListAll, wrapCallback)
|
|
}
|
|
if err == fs.ErrorDirNotFound {
|
|
err = nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
// NewObject creates a new remote combine file object
|
|
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
|
u, uRemote, err := f.findUpstream(remote)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if uRemote == "" || strings.HasSuffix(uRemote, "/") {
|
|
return nil, fs.ErrorIsDir
|
|
}
|
|
o, err := u.f.NewObject(ctx, uRemote)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return u.newObject(o), nil
|
|
}
|
|
|
|
// Precision is the greatest Precision of all upstreams
|
|
func (f *Fs) Precision() time.Duration {
|
|
var greatestPrecision time.Duration
|
|
for _, u := range f.upstreams {
|
|
uPrecision := u.f.Precision()
|
|
if uPrecision > greatestPrecision {
|
|
greatestPrecision = uPrecision
|
|
}
|
|
}
|
|
return greatestPrecision
|
|
}
|
|
|
|
// Shutdown the backend, closing any background tasks and any
|
|
// cached connections.
|
|
func (f *Fs) Shutdown(ctx context.Context) error {
|
|
return f.multithread(ctx, func(ctx context.Context, u *upstream) error {
|
|
if do := u.f.Features().Shutdown; do != nil {
|
|
return do(ctx)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// Object describes a wrapped Object
|
|
//
|
|
// This is a wrapped Object which knows its path prefix
|
|
type Object struct {
|
|
fs.Object
|
|
u *upstream
|
|
}
|
|
|
|
func (u *upstream) newObject(o fs.Object) *Object {
|
|
return &Object{
|
|
Object: o,
|
|
u: u,
|
|
}
|
|
}
|
|
|
|
// Fs returns read only access to the Fs that this object is part of
|
|
func (o *Object) Fs() fs.Info {
|
|
return o.u.parent
|
|
}
|
|
|
|
// String returns the remote path
|
|
func (o *Object) String() string {
|
|
return o.Remote()
|
|
}
|
|
|
|
// Remote returns the remote path
|
|
func (o *Object) Remote() string {
|
|
newPath, err := o.u.pathAdjustment.do(o.Object.String())
|
|
if err != nil {
|
|
fs.Errorf(o, "Bad object: %v", err)
|
|
return err.Error()
|
|
}
|
|
return newPath
|
|
}
|
|
|
|
// MimeType returns the content type of the Object if known
|
|
func (o *Object) MimeType(ctx context.Context) (mimeType string) {
|
|
if do, ok := o.Object.(fs.MimeTyper); ok {
|
|
mimeType = do.MimeType(ctx)
|
|
}
|
|
return mimeType
|
|
}
|
|
|
|
// UnWrap returns the Object that this Object is wrapping or
|
|
// nil if it isn't wrapping anything
|
|
func (o *Object) UnWrap() fs.Object {
|
|
return o.Object
|
|
}
|
|
|
|
// GetTier returns storage tier or class of the Object
|
|
func (o *Object) GetTier() string {
|
|
do, ok := o.Object.(fs.GetTierer)
|
|
if !ok {
|
|
return ""
|
|
}
|
|
return do.GetTier()
|
|
}
|
|
|
|
// ID returns the ID of the Object if known, or "" if not
|
|
func (o *Object) ID() string {
|
|
do, ok := o.Object.(fs.IDer)
|
|
if !ok {
|
|
return ""
|
|
}
|
|
return do.ID()
|
|
}
|
|
|
|
// Metadata returns metadata for an object
|
|
//
|
|
// It should return nil if there is no Metadata
|
|
func (o *Object) Metadata(ctx context.Context) (fs.Metadata, error) {
|
|
do, ok := o.Object.(fs.Metadataer)
|
|
if !ok {
|
|
return nil, nil
|
|
}
|
|
return do.Metadata(ctx)
|
|
}
|
|
|
|
// SetTier performs changing storage tier of the Object if
|
|
// multiple storage classes supported
|
|
func (o *Object) SetTier(tier string) error {
|
|
do, ok := o.Object.(fs.SetTierer)
|
|
if !ok {
|
|
return errors.New("underlying remote does not support SetTier")
|
|
}
|
|
return do.SetTier(tier)
|
|
}
|
|
|
|
// Check the interfaces are satisfied
|
|
var (
|
|
_ fs.Fs = (*Fs)(nil)
|
|
_ fs.Purger = (*Fs)(nil)
|
|
_ fs.PutStreamer = (*Fs)(nil)
|
|
_ fs.Copier = (*Fs)(nil)
|
|
_ fs.Mover = (*Fs)(nil)
|
|
_ fs.DirMover = (*Fs)(nil)
|
|
_ fs.DirCacheFlusher = (*Fs)(nil)
|
|
_ fs.ChangeNotifier = (*Fs)(nil)
|
|
_ fs.Abouter = (*Fs)(nil)
|
|
_ fs.ListRer = (*Fs)(nil)
|
|
_ fs.Shutdowner = (*Fs)(nil)
|
|
_ fs.FullObject = (*Object)(nil)
|
|
)
|