rclone/backend/combine/combine.go
Nick Craig-Wood 821e084f28 combine: fix errors with backends shutting down while in use
Before this patch backends could be shutdown when they fell out of the
cache when they were in use with combine. This was particularly
noticeable with the dropbox backend which gave this error when
uploading files after the backend was Shutdown.

    Failed to copy: upload failed: batcher is shutting down

This patch gets the combine remote to pin them until it is finished.

See: https://forum.rclone.org/t/rclone-combine-upload-failed-batcher-is-shutting-down/32168
2022-08-04 10:13:41 +01:00

993 lines
25 KiB
Go

// Package combine implents a backend to combine multipe remotes in a directory tree
package combine
/*
Have API to add/remove branches in the combine
*/
import (
"context"
"errors"
"fmt"
"io"
"path"
"strings"
"sync"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/cache"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/config/configstruct"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/fs/walk"
"golang.org/x/sync/errgroup"
)
// Register with Fs
func init() {
fsi := &fs.RegInfo{
Name: "combine",
Description: "Combine several remotes into one",
NewFs: NewFs,
MetadataInfo: &fs.MetadataInfo{
Help: `Any metadata supported by the underlying remote is read and written.`,
},
Options: []fs.Option{{
Name: "upstreams",
Help: `Upstreams for combining
These should be in the form
dir=remote:path dir2=remote2:path
Where before the = is specified the root directory and after is the remote to
put there.
Embedded spaces can be added using quotes
"dir=remote:path with space" "dir2=remote2:path with space"
`,
Required: true,
Default: fs.SpaceSepList(nil),
}},
}
fs.Register(fsi)
}
// Options defines the configuration for this backend
type Options struct {
Upstreams fs.SpaceSepList `config:"upstreams"`
}
// Fs represents a combine of upstreams
type Fs struct {
name string // name of this remote
features *fs.Features // optional features
opt Options // options for this Fs
root string // the path we are working on
hashSet hash.Set // common hashes
when time.Time // directory times
upstreams map[string]*upstream // map of upstreams
}
// adjustment stores the info to add a prefix to a path or chop characters off
type adjustment struct {
root string
rootSlash string
mountpoint string
mountpointSlash string
}
// newAdjustment makes a new path adjustment adjusting between mountpoint and root
//
// mountpoint is the point the upstream is mounted and root is the combine root
func newAdjustment(root, mountpoint string) (a adjustment) {
return adjustment{
root: root,
rootSlash: root + "/",
mountpoint: mountpoint,
mountpointSlash: mountpoint + "/",
}
}
var errNotUnderRoot = errors.New("file not under root")
// do makes the adjustment on s, mapping an upstream path into a combine path
func (a *adjustment) do(s string) (string, error) {
absPath := join(a.mountpoint, s)
if a.root == "" {
return absPath, nil
}
if absPath == a.root {
return "", nil
}
if !strings.HasPrefix(absPath, a.rootSlash) {
return "", errNotUnderRoot
}
return absPath[len(a.rootSlash):], nil
}
// undo makes the adjustment on s, mapping a combine path into an upstream path
func (a *adjustment) undo(s string) (string, error) {
absPath := join(a.root, s)
if absPath == a.mountpoint {
return "", nil
}
if !strings.HasPrefix(absPath, a.mountpointSlash) {
return "", errNotUnderRoot
}
return absPath[len(a.mountpointSlash):], nil
}
// upstream represents an upstream Fs
type upstream struct {
f fs.Fs
parent *Fs
dir string // directory the upstream is mounted
pathAdjustment adjustment // how to fiddle with the path
}
// Create an upstream from the directory it is mounted on and the remote
func (f *Fs) newUpstream(ctx context.Context, dir, remote string) (*upstream, error) {
uFs, err := cache.Get(ctx, remote)
if err == fs.ErrorIsFile {
return nil, fmt.Errorf("can't combine files yet, only directories %q: %w", remote, err)
}
if err != nil {
return nil, fmt.Errorf("failed to create upstream %q: %w", remote, err)
}
u := &upstream{
f: uFs,
parent: f,
dir: dir,
pathAdjustment: newAdjustment(f.root, dir),
}
cache.PinUntilFinalized(u.f, u)
return u, nil
}
// NewFs constructs an Fs from the path.
//
// The returned Fs is the actual Fs, referenced by remote in the config
func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (outFs fs.Fs, err error) {
// defer log.Trace(nil, "name=%q, root=%q, m=%v", name, root, m)("f=%+v, err=%v", &outFs, &err)
// Parse config into Options struct
opt := new(Options)
err = configstruct.Set(m, opt)
if err != nil {
return nil, err
}
// Backward compatible to old config
if len(opt.Upstreams) == 0 {
return nil, errors.New("combine can't point to an empty upstream - check the value of the upstreams setting")
}
for _, u := range opt.Upstreams {
if strings.HasPrefix(u, name+":") {
return nil, errors.New("can't point combine remote at itself - check the value of the upstreams setting")
}
}
isDir := false
for strings.HasSuffix(root, "/") {
root = root[:len(root)-1]
isDir = true
}
f := &Fs{
name: name,
root: root,
opt: *opt,
upstreams: make(map[string]*upstream, len(opt.Upstreams)),
when: time.Now(),
}
g, gCtx := errgroup.WithContext(ctx)
var mu sync.Mutex
for _, upstream := range opt.Upstreams {
upstream := upstream
g.Go(func() (err error) {
equal := strings.IndexRune(upstream, '=')
if equal < 0 {
return fmt.Errorf("no \"=\" in upstream definition %q", upstream)
}
dir, remote := upstream[:equal], upstream[equal+1:]
if dir == "" {
return fmt.Errorf("empty dir in upstream definition %q", upstream)
}
if remote == "" {
return fmt.Errorf("empty remote in upstream definition %q", upstream)
}
if strings.ContainsRune(dir, '/') {
return fmt.Errorf("dirs can't contain / (yet): %q", dir)
}
u, err := f.newUpstream(gCtx, dir, remote)
if err != nil {
return err
}
mu.Lock()
if _, found := f.upstreams[dir]; found {
err = fmt.Errorf("duplicate directory name %q", dir)
} else {
f.upstreams[dir] = u
}
mu.Unlock()
return err
})
}
err = g.Wait()
if err != nil {
return nil, err
}
// check features
var features = (&fs.Features{
CaseInsensitive: true,
DuplicateFiles: false,
ReadMimeType: true,
WriteMimeType: true,
CanHaveEmptyDirectories: true,
BucketBased: true,
SetTier: true,
GetTier: true,
ReadMetadata: true,
WriteMetadata: true,
UserMetadata: true,
}).Fill(ctx, f)
canMove := true
for _, u := range f.upstreams {
features = features.Mask(ctx, u.f) // Mask all upstream fs
if !operations.CanServerSideMove(u.f) {
canMove = false
}
}
// We can move if all remotes support Move or Copy
if canMove {
features.Move = f.Move
}
// Enable ListR when upstreams either support ListR or is local
// But not when all upstreams are local
if features.ListR == nil {
for _, u := range f.upstreams {
if u.f.Features().ListR != nil {
features.ListR = f.ListR
} else if !u.f.Features().IsLocal {
features.ListR = nil
break
}
}
}
// Enable Purge when any upstreams support it
if features.Purge == nil {
for _, u := range f.upstreams {
if u.f.Features().Purge != nil {
features.Purge = f.Purge
break
}
}
}
// Enable Shutdown when any upstreams support it
if features.Shutdown == nil {
for _, u := range f.upstreams {
if u.f.Features().Shutdown != nil {
features.Shutdown = f.Shutdown
break
}
}
}
// Enable DirCacheFlush when any upstreams support it
if features.DirCacheFlush == nil {
for _, u := range f.upstreams {
if u.f.Features().DirCacheFlush != nil {
features.DirCacheFlush = f.DirCacheFlush
break
}
}
}
// Enable ChangeNotify when any upstreams support it
if features.ChangeNotify == nil {
for _, u := range f.upstreams {
if u.f.Features().ChangeNotify != nil {
features.ChangeNotify = f.ChangeNotify
break
}
}
}
f.features = features
// Get common intersection of hashes
var hashSet hash.Set
var first = true
for _, u := range f.upstreams {
if first {
hashSet = u.f.Hashes()
first = false
} else {
hashSet = hashSet.Overlap(u.f.Hashes())
}
}
f.hashSet = hashSet
// Check to see if the root is actually a file
if f.root != "" && !isDir {
_, err := f.NewObject(ctx, "")
if err != nil {
if err == fs.ErrorObjectNotFound || err == fs.ErrorNotAFile || err == fs.ErrorIsDir {
// File doesn't exist or is a directory so return old f
return f, nil
}
return nil, err
}
// Check to see if the root path is actually an existing file
f.root = path.Dir(f.root)
if f.root == "." {
f.root = ""
}
// Adjust path adjustment to remove leaf
for _, u := range f.upstreams {
u.pathAdjustment = newAdjustment(f.root, u.dir)
}
return f, fs.ErrorIsFile
}
return f, nil
}
// Run a function over all the upstreams in parallel
func (f *Fs) multithread(ctx context.Context, fn func(context.Context, *upstream) error) error {
g, gCtx := errgroup.WithContext(ctx)
for _, u := range f.upstreams {
u := u
g.Go(func() (err error) {
return fn(gCtx, u)
})
}
return g.Wait()
}
// join the elements together but unline path.Join return empty string
func join(elem ...string) string {
result := path.Join(elem...)
if result == "." {
return ""
}
if len(result) > 0 && result[0] == '/' {
result = result[1:]
}
return result
}
// find the upstream for the remote passed in, returning the upstream and the adjusted path
func (f *Fs) findUpstream(remote string) (u *upstream, uRemote string, err error) {
// defer log.Trace(remote, "")("f=%v, uRemote=%q, err=%v", &u, &uRemote, &err)
for _, u := range f.upstreams {
uRemote, err = u.pathAdjustment.undo(remote)
if err == nil {
return u, uRemote, nil
}
}
return nil, "", fmt.Errorf("combine for remote %q: %w", remote, fs.ErrorDirNotFound)
}
// Name of the remote (as passed into NewFs)
func (f *Fs) Name() string {
return f.name
}
// Root of the remote (as passed into NewFs)
func (f *Fs) Root() string {
return f.root
}
// String converts this Fs to a string
func (f *Fs) String() string {
return fmt.Sprintf("combine root '%s'", f.root)
}
// Features returns the optional features of this Fs
func (f *Fs) Features() *fs.Features {
return f.features
}
// Rmdir removes the root directory of the Fs object
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
// The root always exists
if f.root == "" && dir == "" {
return nil
}
u, uRemote, err := f.findUpstream(dir)
if err != nil {
return err
}
return u.f.Rmdir(ctx, uRemote)
}
// Hashes returns hash.HashNone to indicate remote hashing is unavailable
func (f *Fs) Hashes() hash.Set {
return f.hashSet
}
// Mkdir makes the root directory of the Fs object
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
// The root always exists
if f.root == "" && dir == "" {
return nil
}
u, uRemote, err := f.findUpstream(dir)
if err != nil {
return err
}
return u.f.Mkdir(ctx, uRemote)
}
// purge the upstream or fallback to a slow way
func (u *upstream) purge(ctx context.Context, dir string) (err error) {
if do := u.f.Features().Purge; do != nil {
err = do(ctx, dir)
} else {
err = operations.Purge(ctx, u.f, dir)
}
return err
}
// Purge all files in the directory
//
// Implement this if you have a way of deleting all the files
// quicker than just running Remove() on the result of List()
//
// Return an error if it doesn't exist
func (f *Fs) Purge(ctx context.Context, dir string) error {
if f.root == "" && dir == "" {
return f.multithread(ctx, func(ctx context.Context, u *upstream) error {
return u.purge(ctx, "")
})
}
u, uRemote, err := f.findUpstream(dir)
if err != nil {
return err
}
return u.purge(ctx, uRemote)
}
// Copy src to this remote using server-side copy operations.
//
// This is stored with the remote path given
//
// It returns the destination Object and a possible error
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantCopy
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
srcObj, ok := src.(*Object)
if !ok {
fs.Debugf(src, "Can't copy - not same remote type")
return nil, fs.ErrorCantCopy
}
dstU, dstRemote, err := f.findUpstream(remote)
if err != nil {
return nil, err
}
do := dstU.f.Features().Copy
if do == nil {
return nil, fs.ErrorCantCopy
}
o, err := do(ctx, srcObj.Object, dstRemote)
if err != nil {
return nil, err
}
return dstU.newObject(o), nil
}
// Move src to this remote using server-side move operations.
//
// This is stored with the remote path given
//
// It returns the destination Object and a possible error
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantMove
func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
srcObj, ok := src.(*Object)
if !ok {
fs.Debugf(src, "Can't move - not same remote type")
return nil, fs.ErrorCantMove
}
dstU, dstRemote, err := f.findUpstream(remote)
if err != nil {
return nil, err
}
do := dstU.f.Features().Move
useCopy := false
if do == nil {
do = dstU.f.Features().Copy
if do == nil {
return nil, fs.ErrorCantMove
}
useCopy = true
}
o, err := do(ctx, srcObj.Object, dstRemote)
if err != nil {
return nil, err
}
// If did Copy then remove the source object
if useCopy {
err = srcObj.Remove(ctx)
if err != nil {
return nil, err
}
}
return dstU.newObject(o), nil
}
// DirMove moves src, srcRemote to this remote at dstRemote
// using server-side move operations.
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantDirMove
//
// If destination exists then return fs.ErrorDirExists
func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) (err error) {
// defer log.Trace(f, "src=%v, srcRemote=%q, dstRemote=%q", src, srcRemote, dstRemote)("err=%v", &err)
srcFs, ok := src.(*Fs)
if !ok {
fs.Debugf(src, "Can't move directory - not same remote type")
return fs.ErrorCantDirMove
}
dstU, dstURemote, err := f.findUpstream(dstRemote)
if err != nil {
return err
}
srcU, srcURemote, err := srcFs.findUpstream(srcRemote)
if err != nil {
return err
}
do := dstU.f.Features().DirMove
if do == nil {
return fs.ErrorCantDirMove
}
fs.Logf(dstU.f, "srcU.f=%v, srcURemote=%q, dstURemote=%q", srcU.f, srcURemote, dstURemote)
return do(ctx, srcU.f, srcURemote, dstURemote)
}
// ChangeNotify calls the passed function with a path
// that has had changes. If the implementation
// uses polling, it should adhere to the given interval.
// At least one value will be written to the channel,
// specifying the initial value and updated values might
// follow. A 0 Duration should pause the polling.
// The ChangeNotify implementation must empty the channel
// regularly. When the channel gets closed, the implementation
// should stop polling and release resources.
func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), ch <-chan time.Duration) {
var uChans []chan time.Duration
for _, u := range f.upstreams {
u := u
if do := u.f.Features().ChangeNotify; do != nil {
ch := make(chan time.Duration)
uChans = append(uChans, ch)
wrappedNotifyFunc := func(path string, entryType fs.EntryType) {
newPath, err := u.pathAdjustment.do(path)
if err != nil {
fs.Logf(f, "ChangeNotify: unable to process %q: %s", path, err)
return
}
fs.Debugf(f, "ChangeNotify: path %q entryType %d", newPath, entryType)
notifyFunc(newPath, entryType)
}
do(ctx, wrappedNotifyFunc, ch)
}
}
go func() {
for i := range ch {
for _, c := range uChans {
c <- i
}
}
for _, c := range uChans {
close(c)
}
}()
}
// DirCacheFlush resets the directory cache - used in testing
// as an optional interface
func (f *Fs) DirCacheFlush() {
ctx := context.Background()
_ = f.multithread(ctx, func(ctx context.Context, u *upstream) error {
if do := u.f.Features().DirCacheFlush; do != nil {
do()
}
return nil
})
}
func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, stream bool, options ...fs.OpenOption) (fs.Object, error) {
srcPath := src.Remote()
u, uRemote, err := f.findUpstream(srcPath)
if err != nil {
return nil, err
}
uSrc := operations.NewOverrideRemote(src, uRemote)
var o fs.Object
if stream {
o, err = u.f.Features().PutStream(ctx, in, uSrc, options...)
} else {
o, err = u.f.Put(ctx, in, uSrc, options...)
}
if err != nil {
return nil, err
}
return u.newObject(o), nil
}
// Put in to the remote path with the modTime given of the given size
//
// May create the object even if it returns an error - if so
// will return the object and the error, otherwise will return
// nil and the error
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
o, err := f.NewObject(ctx, src.Remote())
switch err {
case nil:
return o, o.Update(ctx, in, src, options...)
case fs.ErrorObjectNotFound:
return f.put(ctx, in, src, false, options...)
default:
return nil, err
}
}
// PutStream uploads to the remote path with the modTime given of indeterminate size
//
// May create the object even if it returns an error - if so
// will return the object and the error, otherwise will return
// nil and the error
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
o, err := f.NewObject(ctx, src.Remote())
switch err {
case nil:
return o, o.Update(ctx, in, src, options...)
case fs.ErrorObjectNotFound:
return f.put(ctx, in, src, true, options...)
default:
return nil, err
}
}
// About gets quota information from the Fs
func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
usage := &fs.Usage{
Total: new(int64),
Used: new(int64),
Trashed: new(int64),
Other: new(int64),
Free: new(int64),
Objects: new(int64),
}
for _, u := range f.upstreams {
doAbout := u.f.Features().About
if doAbout == nil {
continue
}
usg, err := doAbout(ctx)
if errors.Is(err, fs.ErrorDirNotFound) {
continue
}
if err != nil {
return nil, err
}
if usg.Total != nil && usage.Total != nil {
*usage.Total += *usg.Total
} else {
usage.Total = nil
}
if usg.Used != nil && usage.Used != nil {
*usage.Used += *usg.Used
} else {
usage.Used = nil
}
if usg.Trashed != nil && usage.Trashed != nil {
*usage.Trashed += *usg.Trashed
} else {
usage.Trashed = nil
}
if usg.Other != nil && usage.Other != nil {
*usage.Other += *usg.Other
} else {
usage.Other = nil
}
if usg.Free != nil && usage.Free != nil {
*usage.Free += *usg.Free
} else {
usage.Free = nil
}
if usg.Objects != nil && usage.Objects != nil {
*usage.Objects += *usg.Objects
} else {
usage.Objects = nil
}
}
return usage, nil
}
// Wraps entries for this upstream
func (u *upstream) wrapEntries(ctx context.Context, entries fs.DirEntries) (fs.DirEntries, error) {
for i, entry := range entries {
switch x := entry.(type) {
case fs.Object:
entries[i] = u.newObject(x)
case fs.Directory:
newDir := fs.NewDirCopy(ctx, x)
newPath, err := u.pathAdjustment.do(newDir.Remote())
if err != nil {
return nil, err
}
newDir.SetRemote(newPath)
entries[i] = newDir
default:
return nil, fmt.Errorf("unknown entry type %T", entry)
}
}
return entries, nil
}
// List the objects and directories in dir into entries. The
// entries can be returned in any order but should be for a
// complete directory.
//
// dir should be "" to list the root, and should not have
// trailing slashes.
//
// This should return ErrDirNotFound if the directory isn't
// found.
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
// defer log.Trace(f, "dir=%q", dir)("entries = %v, err=%v", &entries, &err)
if f.root == "" && dir == "" {
entries = make(fs.DirEntries, 0, len(f.upstreams))
for combineDir := range f.upstreams {
d := fs.NewDir(combineDir, f.when)
entries = append(entries, d)
}
return entries, nil
}
u, uRemote, err := f.findUpstream(dir)
if err != nil {
return nil, err
}
entries, err = u.f.List(ctx, uRemote)
if err != nil {
return nil, err
}
return u.wrapEntries(ctx, entries)
}
// ListR lists the objects and directories of the Fs starting
// from dir recursively into out.
//
// dir should be "" to start from the root, and should not
// have trailing slashes.
//
// This should return ErrDirNotFound if the directory isn't
// found.
//
// It should call callback for each tranche of entries read.
// These need not be returned in any particular order. If
// callback returns an error then the listing will stop
// immediately.
//
// Don't implement this unless you have a more efficient way
// of listing recursively that doing a directory traversal.
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
// defer log.Trace(f, "dir=%q, callback=%v", dir, callback)("err=%v", &err)
if f.root == "" && dir == "" {
rootEntries, err := f.List(ctx, "")
if err != nil {
return err
}
err = callback(rootEntries)
if err != nil {
return err
}
var mu sync.Mutex
syncCallback := func(entries fs.DirEntries) error {
mu.Lock()
defer mu.Unlock()
return callback(entries)
}
err = f.multithread(ctx, func(ctx context.Context, u *upstream) error {
return f.ListR(ctx, u.dir, syncCallback)
})
if err != nil {
return err
}
return nil
}
u, uRemote, err := f.findUpstream(dir)
if err != nil {
return err
}
wrapCallback := func(entries fs.DirEntries) error {
entries, err := u.wrapEntries(ctx, entries)
if err != nil {
return err
}
return callback(entries)
}
if do := u.f.Features().ListR; do != nil {
err = do(ctx, uRemote, wrapCallback)
} else {
err = walk.ListR(ctx, u.f, uRemote, true, -1, walk.ListAll, wrapCallback)
}
if err == fs.ErrorDirNotFound {
err = nil
}
return err
}
// NewObject creates a new remote combine file object
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
u, uRemote, err := f.findUpstream(remote)
if err != nil {
return nil, err
}
if uRemote == "" || strings.HasSuffix(uRemote, "/") {
return nil, fs.ErrorIsDir
}
o, err := u.f.NewObject(ctx, uRemote)
if err != nil {
return nil, err
}
return u.newObject(o), nil
}
// Precision is the greatest Precision of all upstreams
func (f *Fs) Precision() time.Duration {
var greatestPrecision time.Duration
for _, u := range f.upstreams {
uPrecision := u.f.Precision()
if uPrecision > greatestPrecision {
greatestPrecision = uPrecision
}
}
return greatestPrecision
}
// Shutdown the backend, closing any background tasks and any
// cached connections.
func (f *Fs) Shutdown(ctx context.Context) error {
return f.multithread(ctx, func(ctx context.Context, u *upstream) error {
if do := u.f.Features().Shutdown; do != nil {
return do(ctx)
}
return nil
})
}
// Object describes a wrapped Object
//
// This is a wrapped Object which knows its path prefix
type Object struct {
fs.Object
u *upstream
}
func (u *upstream) newObject(o fs.Object) *Object {
return &Object{
Object: o,
u: u,
}
}
// Fs returns read only access to the Fs that this object is part of
func (o *Object) Fs() fs.Info {
return o.u.parent
}
// String returns the remote path
func (o *Object) String() string {
return o.Remote()
}
// Remote returns the remote path
func (o *Object) Remote() string {
newPath, err := o.u.pathAdjustment.do(o.Object.String())
if err != nil {
fs.Errorf(o, "Bad object: %v", err)
return err.Error()
}
return newPath
}
// MimeType returns the content type of the Object if known
func (o *Object) MimeType(ctx context.Context) (mimeType string) {
if do, ok := o.Object.(fs.MimeTyper); ok {
mimeType = do.MimeType(ctx)
}
return mimeType
}
// UnWrap returns the Object that this Object is wrapping or
// nil if it isn't wrapping anything
func (o *Object) UnWrap() fs.Object {
return o.Object
}
// GetTier returns storage tier or class of the Object
func (o *Object) GetTier() string {
do, ok := o.Object.(fs.GetTierer)
if !ok {
return ""
}
return do.GetTier()
}
// ID returns the ID of the Object if known, or "" if not
func (o *Object) ID() string {
do, ok := o.Object.(fs.IDer)
if !ok {
return ""
}
return do.ID()
}
// Metadata returns metadata for an object
//
// It should return nil if there is no Metadata
func (o *Object) Metadata(ctx context.Context) (fs.Metadata, error) {
do, ok := o.Object.(fs.Metadataer)
if !ok {
return nil, nil
}
return do.Metadata(ctx)
}
// SetTier performs changing storage tier of the Object if
// multiple storage classes supported
func (o *Object) SetTier(tier string) error {
do, ok := o.Object.(fs.SetTierer)
if !ok {
return errors.New("underlying remote does not support SetTier")
}
return do.SetTier(tier)
}
// Check the interfaces are satisfied
var (
_ fs.Fs = (*Fs)(nil)
_ fs.Purger = (*Fs)(nil)
_ fs.PutStreamer = (*Fs)(nil)
_ fs.Copier = (*Fs)(nil)
_ fs.Mover = (*Fs)(nil)
_ fs.DirMover = (*Fs)(nil)
_ fs.DirCacheFlusher = (*Fs)(nil)
_ fs.ChangeNotifier = (*Fs)(nil)
_ fs.Abouter = (*Fs)(nil)
_ fs.ListRer = (*Fs)(nil)
_ fs.Shutdowner = (*Fs)(nil)
_ fs.FullObject = (*Object)(nil)
)