From b33140ddeb733dedece91b1dd091c4fe3f1b3c43 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Sat, 22 Jul 2023 08:23:57 +0100 Subject: [PATCH] union: add :writback to act as a simple cache This adds a :writeback tag to upstreams. If set on a single upstream then it writes back objects not found into that upstream. Fixes #6934 --- backend/union/entry.go | 33 +++++++++++++++++-- backend/union/union.go | 4 +++ backend/union/upstream/upstream.go | 51 ++++++++++++++++++++++++++++ docs/content/union.md | 53 ++++++++++++++++++++++++------ 4 files changed, 128 insertions(+), 13 deletions(-) diff --git a/backend/union/entry.go b/backend/union/entry.go index d794d6226..24ec2f518 100644 --- a/backend/union/entry.go +++ b/backend/union/entry.go @@ -17,8 +17,9 @@ import ( // This is a wrapped object which returns the Union Fs as its parent type Object struct { *upstream.Object - fs *Fs // what this object is part of - co []upstream.Entry + fs *Fs // what this object is part of + co []upstream.Entry + writebackMu sync.Mutex } // Directory describes a union Directory @@ -34,6 +35,13 @@ type entry interface { candidates() []upstream.Entry } +// Update o with the contents of newO excluding the lock +func (o *Object) update(newO *Object) { + o.Object = newO.Object + o.fs = newO.fs + o.co = newO.co +} + // UnWrapUpstream returns the upstream Object that this Object is wrapping func (o *Object) UnWrapUpstream() *upstream.Object { return o.Object @@ -67,7 +75,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op return err } // Update current object - *o = *newO.(*Object) + o.update(newO.(*Object)) return nil } else if err != nil { return err @@ -175,6 +183,25 @@ func (o *Object) SetTier(tier string) error { return do.SetTier(tier) } +// Open opens the file for read. Call Close() on the returned io.ReadCloser +func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) { + // Need some sort of locking to prevent multiple downloads + o.writebackMu.Lock() + defer o.writebackMu.Unlock() + + // FIXME what if correct object is already in o.co + + newObj, err := o.Object.Writeback(ctx) + if err != nil { + return nil, err + } + if newObj != nil { + o.Object = newObj + o.co = append(o.co, newObj) // FIXME should this append or overwrite or update? + } + return o.Object.Object.Open(ctx, options...) +} + // ModTime returns the modification date of the directory // It returns the latest ModTime of all candidates func (d *Directory) ModTime(ctx context.Context) (t time.Time) { diff --git a/backend/union/union.go b/backend/union/union.go index c87b6e2d7..eeaed8f31 100644 --- a/backend/union/union.go +++ b/backend/union/union.go @@ -877,6 +877,10 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e opt: *opt, upstreams: usedUpstreams, } + err = upstream.Prepare(f.upstreams) + if err != nil { + return nil, err + } f.actionPolicy, err = policy.Get(opt.ActionPolicy) if err != nil { return nil, err diff --git a/backend/union/upstream/upstream.go b/backend/union/upstream/upstream.go index aa4dcd5cc..07bd732d9 100644 --- a/backend/union/upstream/upstream.go +++ b/backend/union/upstream/upstream.go @@ -16,6 +16,7 @@ import ( "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/cache" "github.com/rclone/rclone/fs/fspath" + "github.com/rclone/rclone/fs/operations" ) var ( @@ -37,6 +38,8 @@ type Fs struct { cacheMutex sync.RWMutex cacheOnce sync.Once cacheUpdate bool // if the cache is updating + writeback bool // writeback to this upstream + writebackFs *Fs // if non zero, writeback to this upstream } // Directory describes a wrapped Directory @@ -86,6 +89,9 @@ func New(ctx context.Context, remote, root string, opt *common.Options) (*Fs, er f.writable = true f.creatable = false fsPath = fsPath[0 : len(fsPath)-3] + } else if strings.HasSuffix(fsPath, ":writeback") { + f.writeback = true + fsPath = fsPath[0 : len(fsPath)-len(":writeback")] } remote = configName + fsPath rFs, err := cache.Get(ctx, remote) @@ -103,6 +109,29 @@ func New(ctx context.Context, remote, root string, opt *common.Options) (*Fs, er return f, err } +// Prepare the configured upstreams as a group +func Prepare(fses []*Fs) error { + writebacks := 0 + var writebackFs *Fs + for _, f := range fses { + if f.writeback { + writebackFs = f + writebacks++ + } + } + if writebacks == 0 { + return nil + } else if writebacks > 1 { + return fmt.Errorf("can only have 1 :writeback not %d", writebacks) + } + for _, f := range fses { + if !f.writeback { + f.writebackFs = writebackFs + } + } + return nil +} + // WrapDirectory wraps an fs.Directory to include the info // of the upstream Fs func (f *Fs) WrapDirectory(e fs.Directory) *Directory { @@ -293,6 +322,28 @@ func (o *Object) Metadata(ctx context.Context) (fs.Metadata, error) { return do.Metadata(ctx) } +// Writeback writes the object back and returns a new object +// +// If it returns nil, nil then the original object is OK +func (o *Object) Writeback(ctx context.Context) (*Object, error) { + if o.f.writebackFs == nil { + return nil, nil + } + newObj, err := operations.Copy(ctx, o.f.writebackFs.Fs, nil, o.Object.Remote(), o.Object) + if err != nil { + return nil, err + } + // newObj could be nil here + if newObj == nil { + fs.Errorf(o, "nil Object returned from operations.Copy") + return nil, nil + } + return &Object{ + Object: newObj, + f: o.f, + }, err +} + // About gets quota information from the Fs func (f *Fs) About(ctx context.Context) (*fs.Usage, error) { if f.cacheExpiry.Load() <= time.Now().Unix() { diff --git a/docs/content/union.md b/docs/content/union.md index bce1c44aa..4d74aff44 100644 --- a/docs/content/union.md +++ b/docs/content/union.md @@ -6,24 +6,27 @@ versionIntroduced: "v1.44" # {{< icon "fa fa-link" >}} Union -The `union` remote provides a unification similar to UnionFS using other remotes. - -Paths may be as deep as required or a local path, -e.g. `remote:directory/subdirectory` or `/directory/subdirectory`. +The `union` backend joins several remotes together to make a single unified view of them. During the initial setup with `rclone config` you will specify the upstream -remotes as a space separated list. The upstream remotes can either be a local paths or other remotes. +remotes as a space separated list. The upstream remotes can either be a local +paths or other remotes. -Attribute `:ro` and `:nc` can be attach to the end of path to tag the remote as **read only** or **no create**, -e.g. `remote:directory/subdirectory:ro` or `remote:directory/subdirectory:nc`. +The attributes `:ro`, `:nc` and `:nc` can be attached to the end of the remote +to tag the remote as **read only**, **no create** or **writeback**, e.g. +`remote:directory/subdirectory:ro` or `remote:directory/subdirectory:nc`. + +- `:ro` means files will only be read from here and never written +- `:nc` means new files or directories won't be created here +- `:writeback` means files found in different remotes will be written back here. See the [writeback section](#writeback) for more info. Subfolders can be used in upstream remotes. Assume a union remote named `backup` with the remotes `mydrive:private/backup`. Invoking `rclone mkdir backup:desktop` is exactly the same as invoking `rclone mkdir mydrive:private/backup/desktop`. -There will be no special handling of paths containing `..` segments. -Invoking `rclone mkdir backup:../desktop` is exactly the same as invoking -`rclone mkdir mydrive:private/backup/../desktop`. +There is no special handling of paths containing `..` segments. Invoking `rclone +mkdir backup:../desktop` is exactly the same as invoking `rclone mkdir +mydrive:private/backup/../desktop`. ## Configuration @@ -172,6 +175,36 @@ The policies definition are inspired by [trapexit/mergerfs](https://github.com/t | newest | Pick the file / directory with the largest mtime. | | rand (random) | Calls **all** and then randomizes. Returns only one upstream. | + +### Writeback {#writeback} + +The tag `:writeback` on an upstream remote can be used to make a simple cache +system like this: + +``` +[union] +type = union +action_policy = all +create_policy = all +search_policy = ff +upstreams = /local:writeback remote:dir +``` + +When files are opened for read, if the file is in `remote:dir` but not `/local` +then rclone will copy the file entirely into `/local` before returning a +reference to the file in `/local`. The copy will be done with the equivalent of +`rclone copy` so will use `--multi-thread-streams` if configured. Any copies +will be logged with an INFO log. + +When files are written, they will be written to both `remote:dir` and `/local`. + +As many remotes as desired can be added to `upstreams` but there should only be +one `:writeback` tag. + +Rclone does not manage the `:writeback` remote in any way other than writing +files back to it. So if you need to expire old files or manage the size then you +will have to do this yourself. + {{< rem autogenerated options start" - DO NOT EDIT - instead edit fs.RegInfo in backend/union/union.go then run make backenddocs" >}} ### Standard options