mirror of
https://github.com/rclone/rclone.git
synced 2024-11-29 20:04:52 +01:00
b33140ddeb
This adds a :writeback tag to upstreams. If set on a single upstream then it writes back objects not found into that upstream. Fixes #6934
462 lines
11 KiB
Go
462 lines
11 KiB
Go
// Package upstream provides utility functionality to union.
|
|
package upstream
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/rclone/rclone/backend/union/common"
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/cache"
|
|
"github.com/rclone/rclone/fs/fspath"
|
|
"github.com/rclone/rclone/fs/operations"
|
|
)
|
|
|
|
var (
|
|
// ErrUsageFieldNotSupported stats the usage field is not supported by the backend
|
|
ErrUsageFieldNotSupported = errors.New("this usage field is not supported")
|
|
)
|
|
|
|
// Fs is a wrap of any fs and its configs
|
|
type Fs struct {
|
|
fs.Fs
|
|
RootFs fs.Fs
|
|
RootPath string
|
|
Opt *common.Options
|
|
writable bool
|
|
creatable bool
|
|
usage *fs.Usage // Cache the usage
|
|
cacheTime time.Duration // cache duration
|
|
cacheExpiry atomic.Int64 // usage cache expiry time
|
|
cacheMutex sync.RWMutex
|
|
cacheOnce sync.Once
|
|
cacheUpdate bool // if the cache is updating
|
|
writeback bool // writeback to this upstream
|
|
writebackFs *Fs // if non zero, writeback to this upstream
|
|
}
|
|
|
|
// Directory describes a wrapped Directory
|
|
//
|
|
// This is a wrapped Directory which contains the upstream Fs
|
|
type Directory struct {
|
|
fs.Directory
|
|
f *Fs
|
|
}
|
|
|
|
// Object describes a wrapped Object
|
|
//
|
|
// This is a wrapped Object which contains the upstream Fs
|
|
type Object struct {
|
|
fs.Object
|
|
f *Fs
|
|
}
|
|
|
|
// Entry describe a wrapped fs.DirEntry interface with the
|
|
// information of upstream Fs
|
|
type Entry interface {
|
|
fs.DirEntry
|
|
UpstreamFs() *Fs
|
|
}
|
|
|
|
// New creates a new Fs based on the
|
|
// string formatted `type:root_path(:ro/:nc)`
|
|
func New(ctx context.Context, remote, root string, opt *common.Options) (*Fs, error) {
|
|
configName, fsPath, err := fspath.SplitFs(remote)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
f := &Fs{
|
|
RootPath: strings.TrimRight(root, "/"),
|
|
Opt: opt,
|
|
writable: true,
|
|
creatable: true,
|
|
cacheTime: time.Duration(opt.CacheTime) * time.Second,
|
|
usage: &fs.Usage{},
|
|
}
|
|
f.cacheExpiry.Store(time.Now().Unix())
|
|
if strings.HasSuffix(fsPath, ":ro") {
|
|
f.writable = false
|
|
f.creatable = false
|
|
fsPath = fsPath[0 : len(fsPath)-3]
|
|
} else if strings.HasSuffix(fsPath, ":nc") {
|
|
f.writable = true
|
|
f.creatable = false
|
|
fsPath = fsPath[0 : len(fsPath)-3]
|
|
} else if strings.HasSuffix(fsPath, ":writeback") {
|
|
f.writeback = true
|
|
fsPath = fsPath[0 : len(fsPath)-len(":writeback")]
|
|
}
|
|
remote = configName + fsPath
|
|
rFs, err := cache.Get(ctx, remote)
|
|
if err != nil && err != fs.ErrorIsFile {
|
|
return nil, err
|
|
}
|
|
f.RootFs = rFs
|
|
rootString := fspath.JoinRootPath(remote, root)
|
|
myFs, err := cache.Get(ctx, rootString)
|
|
if err != nil && err != fs.ErrorIsFile {
|
|
return nil, err
|
|
}
|
|
f.Fs = myFs
|
|
cache.PinUntilFinalized(f.Fs, f)
|
|
return f, err
|
|
}
|
|
|
|
// Prepare the configured upstreams as a group
|
|
func Prepare(fses []*Fs) error {
|
|
writebacks := 0
|
|
var writebackFs *Fs
|
|
for _, f := range fses {
|
|
if f.writeback {
|
|
writebackFs = f
|
|
writebacks++
|
|
}
|
|
}
|
|
if writebacks == 0 {
|
|
return nil
|
|
} else if writebacks > 1 {
|
|
return fmt.Errorf("can only have 1 :writeback not %d", writebacks)
|
|
}
|
|
for _, f := range fses {
|
|
if !f.writeback {
|
|
f.writebackFs = writebackFs
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// WrapDirectory wraps an fs.Directory to include the info
|
|
// of the upstream Fs
|
|
func (f *Fs) WrapDirectory(e fs.Directory) *Directory {
|
|
if e == nil {
|
|
return nil
|
|
}
|
|
return &Directory{
|
|
Directory: e,
|
|
f: f,
|
|
}
|
|
}
|
|
|
|
// WrapObject wraps an fs.Object to include the info
|
|
// of the upstream Fs
|
|
func (f *Fs) WrapObject(o fs.Object) *Object {
|
|
if o == nil {
|
|
return nil
|
|
}
|
|
return &Object{
|
|
Object: o,
|
|
f: f,
|
|
}
|
|
}
|
|
|
|
// WrapEntry wraps an fs.DirEntry to include the info
|
|
// of the upstream Fs
|
|
func (f *Fs) WrapEntry(e fs.DirEntry) (Entry, error) {
|
|
switch e := e.(type) {
|
|
case fs.Object:
|
|
return f.WrapObject(e), nil
|
|
case fs.Directory:
|
|
return f.WrapDirectory(e), nil
|
|
default:
|
|
return nil, fmt.Errorf("unknown object type %T", e)
|
|
}
|
|
}
|
|
|
|
// UpstreamFs get the upstream Fs the entry is stored in
|
|
func (e *Directory) UpstreamFs() *Fs {
|
|
return e.f
|
|
}
|
|
|
|
// UpstreamFs get the upstream Fs the entry is stored in
|
|
func (o *Object) UpstreamFs() *Fs {
|
|
return o.f
|
|
}
|
|
|
|
// UnWrap returns the Object that this Object is wrapping or
|
|
// nil if it isn't wrapping anything
|
|
func (o *Object) UnWrap() fs.Object {
|
|
return o.Object
|
|
}
|
|
|
|
// IsCreatable return if the fs is allowed to create new objects
|
|
func (f *Fs) IsCreatable() bool {
|
|
return f.creatable
|
|
}
|
|
|
|
// IsWritable return if the fs is allowed to write
|
|
func (f *Fs) IsWritable() bool {
|
|
return f.writable
|
|
}
|
|
|
|
// Put in to the remote path with the modTime given of the given size
|
|
//
|
|
// May create the object even if it returns an error - if so
|
|
// will return the object and the error, otherwise will return
|
|
// nil and the error
|
|
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
|
o, err := f.Fs.Put(ctx, in, src, options...)
|
|
if err != nil {
|
|
return o, err
|
|
}
|
|
f.cacheMutex.Lock()
|
|
defer f.cacheMutex.Unlock()
|
|
size := src.Size()
|
|
if f.usage.Used != nil {
|
|
*f.usage.Used += size
|
|
}
|
|
if f.usage.Free != nil {
|
|
*f.usage.Free -= size
|
|
}
|
|
if f.usage.Objects != nil {
|
|
*f.usage.Objects++
|
|
}
|
|
return o, nil
|
|
}
|
|
|
|
// PutStream uploads to the remote path with the modTime given of indeterminate size
|
|
//
|
|
// May create the object even if it returns an error - if so
|
|
// will return the object and the error, otherwise will return
|
|
// nil and the error
|
|
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
|
do := f.Features().PutStream
|
|
if do == nil {
|
|
return nil, fs.ErrorNotImplemented
|
|
}
|
|
o, err := do(ctx, in, src, options...)
|
|
if err != nil {
|
|
return o, err
|
|
}
|
|
f.cacheMutex.Lock()
|
|
defer f.cacheMutex.Unlock()
|
|
size := o.Size()
|
|
if f.usage.Used != nil {
|
|
*f.usage.Used += size
|
|
}
|
|
if f.usage.Free != nil {
|
|
*f.usage.Free -= size
|
|
}
|
|
if f.usage.Objects != nil {
|
|
*f.usage.Objects++
|
|
}
|
|
return o, nil
|
|
}
|
|
|
|
// Update in to the object with the modTime given of the given size
|
|
//
|
|
// When called from outside an Fs by rclone, src.Size() will always be >= 0.
|
|
// But for unknown-sized objects (indicated by src.Size() == -1), Upload should either
|
|
// return an error or update the object properly (rather than e.g. calling panic).
|
|
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
|
size := o.Size()
|
|
err := o.Object.Update(ctx, in, src, options...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
o.f.cacheMutex.Lock()
|
|
defer o.f.cacheMutex.Unlock()
|
|
delta := o.Size() - size
|
|
if delta <= 0 {
|
|
return nil
|
|
}
|
|
if o.f.usage.Used != nil {
|
|
*o.f.usage.Used += size
|
|
}
|
|
if o.f.usage.Free != nil {
|
|
*o.f.usage.Free -= size
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetTier returns storage tier or class of the Object
|
|
func (o *Object) GetTier() string {
|
|
do, ok := o.Object.(fs.GetTierer)
|
|
if !ok {
|
|
return ""
|
|
}
|
|
return do.GetTier()
|
|
}
|
|
|
|
// ID returns the ID of the Object if known, or "" if not
|
|
func (o *Object) ID() string {
|
|
do, ok := o.Object.(fs.IDer)
|
|
if !ok {
|
|
return ""
|
|
}
|
|
return do.ID()
|
|
}
|
|
|
|
// MimeType returns the content type of the Object if known
|
|
func (o *Object) MimeType(ctx context.Context) (mimeType string) {
|
|
if do, ok := o.Object.(fs.MimeTyper); ok {
|
|
mimeType = do.MimeType(ctx)
|
|
}
|
|
return mimeType
|
|
}
|
|
|
|
// SetTier performs changing storage tier of the Object if
|
|
// multiple storage classes supported
|
|
func (o *Object) SetTier(tier string) error {
|
|
do, ok := o.Object.(fs.SetTierer)
|
|
if !ok {
|
|
return errors.New("underlying remote does not support SetTier")
|
|
}
|
|
return do.SetTier(tier)
|
|
}
|
|
|
|
// Metadata returns metadata for an object
|
|
//
|
|
// It should return nil if there is no Metadata
|
|
func (o *Object) Metadata(ctx context.Context) (fs.Metadata, error) {
|
|
do, ok := o.Object.(fs.Metadataer)
|
|
if !ok {
|
|
return nil, nil
|
|
}
|
|
return do.Metadata(ctx)
|
|
}
|
|
|
|
// Writeback writes the object back and returns a new object
|
|
//
|
|
// If it returns nil, nil then the original object is OK
|
|
func (o *Object) Writeback(ctx context.Context) (*Object, error) {
|
|
if o.f.writebackFs == nil {
|
|
return nil, nil
|
|
}
|
|
newObj, err := operations.Copy(ctx, o.f.writebackFs.Fs, nil, o.Object.Remote(), o.Object)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// newObj could be nil here
|
|
if newObj == nil {
|
|
fs.Errorf(o, "nil Object returned from operations.Copy")
|
|
return nil, nil
|
|
}
|
|
return &Object{
|
|
Object: newObj,
|
|
f: o.f,
|
|
}, err
|
|
}
|
|
|
|
// About gets quota information from the Fs
|
|
func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
|
|
if f.cacheExpiry.Load() <= time.Now().Unix() {
|
|
err := f.updateUsage()
|
|
if err != nil {
|
|
return nil, ErrUsageFieldNotSupported
|
|
}
|
|
}
|
|
f.cacheMutex.RLock()
|
|
defer f.cacheMutex.RUnlock()
|
|
return f.usage, nil
|
|
}
|
|
|
|
// GetFreeSpace get the free space of the fs
|
|
//
|
|
// This is returned as 0..math.MaxInt64-1 leaving math.MaxInt64 as a sentinel
|
|
func (f *Fs) GetFreeSpace() (int64, error) {
|
|
if f.cacheExpiry.Load() <= time.Now().Unix() {
|
|
err := f.updateUsage()
|
|
if err != nil {
|
|
return math.MaxInt64 - 1, ErrUsageFieldNotSupported
|
|
}
|
|
}
|
|
f.cacheMutex.RLock()
|
|
defer f.cacheMutex.RUnlock()
|
|
if f.usage.Free == nil {
|
|
return math.MaxInt64 - 1, ErrUsageFieldNotSupported
|
|
}
|
|
return *f.usage.Free, nil
|
|
}
|
|
|
|
// GetUsedSpace get the used space of the fs
|
|
//
|
|
// This is returned as 0..math.MaxInt64-1 leaving math.MaxInt64 as a sentinel
|
|
func (f *Fs) GetUsedSpace() (int64, error) {
|
|
if f.cacheExpiry.Load() <= time.Now().Unix() {
|
|
err := f.updateUsage()
|
|
if err != nil {
|
|
return 0, ErrUsageFieldNotSupported
|
|
}
|
|
}
|
|
f.cacheMutex.RLock()
|
|
defer f.cacheMutex.RUnlock()
|
|
if f.usage.Used == nil {
|
|
return 0, ErrUsageFieldNotSupported
|
|
}
|
|
return *f.usage.Used, nil
|
|
}
|
|
|
|
// GetNumObjects get the number of objects of the fs
|
|
func (f *Fs) GetNumObjects() (int64, error) {
|
|
if f.cacheExpiry.Load() <= time.Now().Unix() {
|
|
err := f.updateUsage()
|
|
if err != nil {
|
|
return 0, ErrUsageFieldNotSupported
|
|
}
|
|
}
|
|
f.cacheMutex.RLock()
|
|
defer f.cacheMutex.RUnlock()
|
|
if f.usage.Objects == nil {
|
|
return 0, ErrUsageFieldNotSupported
|
|
}
|
|
return *f.usage.Objects, nil
|
|
}
|
|
|
|
func (f *Fs) updateUsage() (err error) {
|
|
if do := f.RootFs.Features().About; do == nil {
|
|
return ErrUsageFieldNotSupported
|
|
}
|
|
done := false
|
|
f.cacheOnce.Do(func() {
|
|
f.cacheMutex.Lock()
|
|
err = f.updateUsageCore(false)
|
|
f.cacheMutex.Unlock()
|
|
done = true
|
|
})
|
|
if done {
|
|
return err
|
|
}
|
|
if !f.cacheUpdate {
|
|
f.cacheUpdate = true
|
|
go func() {
|
|
_ = f.updateUsageCore(true)
|
|
f.cacheUpdate = false
|
|
}()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (f *Fs) updateUsageCore(lock bool) error {
|
|
// Run in background, should not be cancelled by user
|
|
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
|
defer cancel()
|
|
usage, err := f.RootFs.Features().About(ctx)
|
|
if err != nil {
|
|
f.cacheUpdate = false
|
|
if errors.Is(err, fs.ErrorDirNotFound) {
|
|
err = nil
|
|
}
|
|
return err
|
|
}
|
|
if lock {
|
|
f.cacheMutex.Lock()
|
|
defer f.cacheMutex.Unlock()
|
|
}
|
|
// Store usage
|
|
f.cacheExpiry.Store(time.Now().Add(f.cacheTime).Unix())
|
|
f.usage = usage
|
|
return nil
|
|
}
|
|
|
|
// Check the interfaces are satisfied
|
|
var (
|
|
_ fs.FullObject = (*Object)(nil)
|
|
)
|