union: implement write on multiple remotes

Introduce policy from mergerfs.
This commit is contained in:
Max Sum 2019-11-30 22:41:39 +08:00 committed by Nick Craig-Wood
parent a492c0fb0e
commit da9a44ea5e
9 changed files with 1477 additions and 137 deletions

View File

@ -0,0 +1,44 @@
package policy
import (
"context"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/backend/union/upstream"
)
func init(){
registerPolicy("all", &All{})
}
// All policy behaves the same as EpAll except for the CREATE category
// Action category: same as epall.
// Create category: apply to all branches.
// Search category: same as epall.
type All struct {
EpAll
}
// Create category policy, governing the creation of files and directories
func (p *All) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
if len(upstreams) == 0 {
return nil, fs.ErrorObjectNotFound
}
upstreams = filterNC(upstreams)
if len(upstreams) == 0 {
return nil, fs.ErrorPermissionDenied
}
return upstreams, nil
}
// CreateEntries is CREATE category policy but receving a set of candidate entries
func (p *All) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
if len(entries) == 0 {
return nil, fs.ErrorObjectNotFound
}
entries = filterNCEntries(entries)
if len(entries) == 0 {
return nil, fs.ErrorPermissionDenied
}
return entries, nil
}

View File

@ -0,0 +1,72 @@
package policy
import (
"context"
"sync"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/backend/union/upstream"
)
func init(){
registerPolicy("epall", &EpAll{})
}
// EpAll stands for existing path, All
// Action category: apply to all found.
// Create category: apply to all found.
// Search category: same as epff.
type EpAll struct {
EpFF
}
func (p *EpAll) epall(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
var wg sync.WaitGroup
ufs := make([]*upstream.Fs, len(upstreams))
for i, u := range upstreams {
wg.Add(1)
i, u := i, u // Closure
go func() {
if exists(ctx, u, path) {
ufs[i] = u
}
wg.Done()
}()
}
wg.Wait()
var results []*upstream.Fs
for _, f := range ufs {
if f != nil {
results = append(results, f)
}
}
if len(results) == 0 {
return nil, fs.ErrorObjectNotFound
}
return results, nil
}
// Action category policy, governing the modification of files and directories
func (p *EpAll) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
if len(upstreams) == 0 {
return nil, fs.ErrorObjectNotFound
}
upstreams = filterRO(upstreams)
if len(upstreams) == 0 {
return nil, fs.ErrorPermissionDenied
}
return p.epall(ctx, upstreams, path)
}
// Create category policy, governing the creation of files and directories
func (p *EpAll) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
if len(upstreams) == 0 {
return nil, fs.ErrorObjectNotFound
}
upstreams = filterNC(upstreams)
if len(upstreams) == 0 {
return nil, fs.ErrorPermissionDenied
}
upstreams, err := p.epall(ctx, upstreams, path)
return upstreams, err
}

View File

@ -0,0 +1,112 @@
package policy
import (
"context"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/backend/union/upstream"
)
func init(){
registerPolicy("epff", &EpFF{})
}
// EpFF stands for existing path, first found
// Given the order of the candidates, act on the first one found where the relative path exists.
type EpFF struct {}
func (p *EpFF) epff(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) {
ch := make(chan *upstream.Fs)
for _, r := range upstreams {
r := r // Closure
go func() {
if !exists(ctx, r, path) {
r = nil
}
ch <- r
}()
}
var r *upstream.Fs
for i := 0; i < len(upstreams); i++ {
r = <- ch
if r != nil {
// close remaining goroutines
go func(num int) {
defer close(ch)
for i := 0; i < num; i++ {
<- ch
}
}(len(upstreams) - 1 - i)
}
}
if r == nil {
return nil, fs.ErrorObjectNotFound
}
return r, nil
}
// Action category policy, governing the modification of files and directories
func (p *EpFF) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
if len(upstreams) == 0 {
return nil, fs.ErrorObjectNotFound
}
upstreams = filterRO(upstreams)
if len(upstreams) == 0 {
return nil, fs.ErrorPermissionDenied
}
r, err := p.epff(ctx, upstreams, path)
return []*upstream.Fs{r}, err
}
// ActionEntries is ACTION category policy but receving a set of candidate entries
func (p *EpFF) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
if len(entries) == 0 {
return nil, fs.ErrorObjectNotFound
}
entries = filterROEntries(entries)
if len(entries) == 0 {
return nil, fs.ErrorPermissionDenied
}
return entries[:1], nil
}
// Create category policy, governing the creation of files and directories
func (p *EpFF) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
if len(upstreams) == 0 {
return nil, fs.ErrorObjectNotFound
}
upstreams = filterNC(upstreams)
if len(upstreams) == 0 {
return nil, fs.ErrorPermissionDenied
}
r, err := p.epff(ctx, upstreams, path)
return []*upstream.Fs{r}, err
}
// CreateEntries is CREATE category policy but receving a set of candidate entries
func (p *EpFF) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
if len(entries) == 0 {
return nil, fs.ErrorObjectNotFound
}
entries = filterNCEntries(entries)
if len(entries) == 0 {
return nil, fs.ErrorPermissionDenied
}
return entries[:1], nil
}
// Search category policy, governing the access to files and directories
func (p *EpFF) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) {
if len(upstreams) == 0 {
return nil, fs.ErrorObjectNotFound
}
return p.epff(ctx, upstreams, path)
}
// SearchEntries is SEARCH category policy but receving a set of candidate entries
func (p *EpFF) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) {
if len(entries) == 0 {
return nil, fs.ErrorObjectNotFound
}
return entries[0], nil
}

View File

@ -0,0 +1,110 @@
package policy
import (
"context"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/backend/union/upstream"
)
func init(){
registerPolicy("epmfs", &EpMfs{})
}
// EpMfs stands for existing path, most free space
// Of all the branches on which the path exists choose the drive with the most free space.
type EpMfs struct {
EpAll
}
func (p *EpMfs) mfs(upstreams []*upstream.Fs) (*upstream.Fs, error) {
var maxFreeSpace int64
var mfsupstream *upstream.Fs
for _, r := range upstreams {
space, err := r.GetFreeSpace()
if err != nil {
return nil, err
}
if maxFreeSpace < space {
maxFreeSpace = space
mfsupstream = r
}
}
return mfsupstream, nil
}
func (p *EpMfs) mfsEntries(entries []upstream.Entry) (upstream.Entry, error) {
var maxFreeSpace int64
var mfsEntry upstream.Entry
for _, e := range entries {
space, err := e.UpstreamFs().GetFreeSpace()
if err != nil {
return nil, err
}
if maxFreeSpace < space {
maxFreeSpace = space
mfsEntry = e
}
}
return mfsEntry, nil
}
// Action category policy, governing the modification of files and directories
func (p *EpMfs) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
upstreams, err := p.EpAll.Action(ctx, upstreams, path)
if err != nil {
return nil, err
}
r, err := p.mfs(upstreams)
return []*upstream.Fs{r}, err
}
// ActionEntries is ACTION category policy but receving a set of candidate entries
func (p *EpMfs) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
entries, err := p.EpAll.ActionEntries(entries...)
if err != nil {
return nil, err
}
e, err := p.mfsEntries(entries)
return []upstream.Entry{e}, err
}
// Create category policy, governing the creation of files and directories
func (p *EpMfs) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
upstreams, err := p.EpAll.Create(ctx, upstreams, path)
if err != nil {
return nil, err
}
r, err := p.mfs(upstreams)
return []*upstream.Fs{r}, err
}
// CreateEntries is CREATE category policy but receving a set of candidate entries
func (p *EpMfs) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
entries, err := p.EpAll.CreateEntries(entries...)
if err != nil {
return nil, err
}
e, err := p.mfsEntries(entries)
return []upstream.Entry{e}, err
}
// Search category policy, governing the access to files and directories
func (p *EpMfs) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) {
if len(upstreams) == 0 {
return nil, fs.ErrorObjectNotFound
}
upstreams, err := p.epall(ctx, upstreams, path)
if err != nil {
return nil, err
}
return p.mfs(upstreams)
}
// SearchEntries is SEARCH category policy but receving a set of candidate entries
func (p *EpMfs) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) {
if len(entries) == 0 {
return nil, fs.ErrorObjectNotFound
}
return p.mfsEntries(entries)
}

View File

@ -0,0 +1,32 @@
package policy
import (
"context"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/backend/union/upstream"
)
func init(){
registerPolicy("ff", &FF{})
}
// FF stands for first found
// Search category: same as epff.
// Action category: same as epff.
// Create category: Given the order of the candiates, act on the first one found.
type FF struct {
EpFF
}
// Create category policy, governing the creation of files and directories
func (p *FF) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
if len(upstreams) == 0 {
return nil, fs.ErrorObjectNotFound
}
upstreams = filterNC(upstreams)
if len(upstreams) == 0 {
return upstreams, fs.ErrorPermissionDenied
}
return upstreams[:1], nil
}

View File

@ -0,0 +1,31 @@
package policy
import (
"context"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/backend/union/upstream"
)
func init(){
registerPolicy("mfs", &Mfs{})
}
// Mfs stands for most free space
// Of all the candidates on which the path exists choose the one with the most free space.
type Mfs struct {
EpMfs
}
// Create category policy, governing the creation of files and directories
func (p *Mfs) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) {
if len(upstreams) == 0 {
return nil, fs.ErrorObjectNotFound
}
upstreams = filterNC(upstreams)
if len(upstreams) == 0 {
return nil, fs.ErrorPermissionDenied
}
r, err := p.mfs(upstreams)
return []*upstream.Fs{r}, err
}

View File

@ -0,0 +1,123 @@
package policy
import (
"context"
"strings"
"path"
"path/filepath"
"github.com/pkg/errors"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/backend/union/upstream"
)
var policies = make(map[string]Policy)
// Policy is the interface of a set of defined behavior choosing
// the upstream Fs to operate on
type Policy interface {
// Action category policy, governing the modification of files and directories
Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error)
// Create category policy, governing the creation of files and directories
Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error)
// Search category policy, governing the access to files and directories
Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error)
// ActionEntries is ACTION category policy but receving a set of candidate entries
ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error)
// CreateEntries is CREATE category policy but receving a set of candidate entries
CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error)
// SearchEntries is SEARCH category policy but receving a set of candidate entries
SearchEntries(entries ...upstream.Entry) (upstream.Entry, error)
}
func registerPolicy(name string, p Policy) {
policies[strings.ToLower(name)] = p
}
// Get a Policy from the list
func Get(name string) (Policy, error) {
p, ok := policies[strings.ToLower(name)]
if !ok {
return nil, errors.Errorf("didn't find policy called %q", name)
}
return p, nil
}
func filterRO(ufs []*upstream.Fs) (wufs []*upstream.Fs) {
for _, u := range ufs {
if u.IsWritable() {
wufs = append(wufs, u)
}
}
return wufs
}
func filterROEntries(ue []upstream.Entry) (wue []upstream.Entry) {
for _, e := range ue {
if e.UpstreamFs().IsWritable() {
wue = append(wue, e)
}
}
return wue
}
func filterNC(ufs []*upstream.Fs) (wufs []*upstream.Fs) {
for _, u := range ufs {
if u.IsCreatable() {
wufs = append(wufs, u)
}
}
return wufs
}
func filterNCEntries(ue []upstream.Entry) (wue []upstream.Entry) {
for _, e := range ue {
if e.UpstreamFs().IsCreatable() {
wue = append(wue, e)
}
}
return wue
}
func parentDir(absPath string) string {
parent := path.Dir(strings.TrimRight(absPath, "/"))
if parent == "." {
parent = ""
}
return parent
}
func clean(absPath string) string {
cleanPath := path.Clean(filepath.ToSlash(absPath))
if cleanPath == "." {
cleanPath = ""
}
return cleanPath
}
func exists(ctx context.Context, f fs.Fs, remote string) bool {
remote = clean(remote)
dir := parentDir(remote)
if remote == dir {
return true
}
found := false
entries, _ := f.List(ctx, dir);
for _, e := range entries {
eRemote := e.Remote()
if f.Features().CaseInsensitive {
found = strings.EqualFold(remote, eRemote)
} else {
found = (remote == eRemote)
}
if found {
break
}
}
return found
}

View File

@ -2,16 +2,20 @@ package union
import (
"context"
"bufio"
"fmt"
"io"
"path"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/rclone/rclone/backend/union/upstream"
"github.com/rclone/rclone/backend/union/policy"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/cache"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/config/configstruct"
"github.com/rclone/rclone/fs/hash"
@ -21,12 +25,32 @@ import (
func init() {
fsi := &fs.RegInfo{
Name: "union",
Description: "Union merges the contents of several remotes",
Description: "Union merges the contents of several upstream fs",
NewFs: NewFs,
Options: []fs.Option{{
Name: "remotes",
Help: "List of space separated remotes.\nCan be 'remotea:test/dir remoteb:', '\"remotea:test/space dir\" remoteb:', etc.\nThe last remote is used to write to.",
Name: "upstreams",
Help: "List of space separated upstreams.\nCan be 'upstreama:test/dir upstreamb:', '\"remotea:test/space:ro dir\" remoteb:', etc.\n",
Required: true,
}, {
Name: "action_policy",
Help: "Policy to choose upstream on ACTION class.",
Required: true,
Default: "epall",
}, {
Name: "create_policy",
Help: "Policy to choose upstream on CREATE class.",
Required: true,
Default: "epmfs",
}, {
Name: "search_policy",
Help: "Policy to choose upstream on SEARCH class.",
Required: true,
Default: "ff",
}, {
Name: "cache_time",
Help: "Cache time of usage and free space (in seconds)",
Required: true,
Default: 120,
}},
}
fs.Register(fsi)
@ -34,34 +58,75 @@ func init() {
// Options defines the configuration for this backend
type Options struct {
Remotes fs.SpaceSepList `config:"remotes"`
Upstreams fs.SpaceSepList `config:"upstreams"`
ActionPolicy string `config:"action_policy"`
CreatePolicy string `config:"create_policy"`
SearchPolicy string `config:"search_policy"`
CacheTime int `config:"cache_time"`
}
// Fs represents a union of remotes
// Fs represents a union of upstreams
type Fs struct {
name string // name of this remote
features *fs.Features // optional features
opt Options // options for this Fs
root string // the path we are working on
remotes []fs.Fs // slice of remotes
wr fs.Fs // writable remote
hashSet hash.Set // intersection of hash types
name string // name of this remote
features *fs.Features // optional features
opt Options // options for this Fs
root string // the path we are working on
upstreams []*upstream.Fs // slice of upstreams
hashSet hash.Set // intersection of hash types
actionPolicy policy.Policy // policy for ACTION
createPolicy policy.Policy // policy for CREATE
searchPolicy policy.Policy // policy for SEARCH
}
// Object describes a union Object
//
// This is a wrapped object which returns the Union Fs as its parent
type Object struct {
fs.Object
*upstream.Object
fs *Fs // what this object is part of
co []upstream.Entry
}
// Wrap an existing object in the union Object
func (f *Fs) wrapObject(o fs.Object) *Object {
return &Object{
Object: o,
fs: f,
// Directory describes a union Directory
//
// This is a wrapped object contains all candidates
type Directory struct {
*upstream.Directory
cd []upstream.Entry
}
type entry interface {
upstream.Entry
candidates() []upstream.Entry
}
// Wrap candidate objects in to an union Object
func (f *Fs) wrapEntries(entries ...upstream.Entry) (entry, error) {
e, err := f.searchEntries(entries...)
if err != nil {
return nil, err
}
switch e.(type) {
case *upstream.Object:
return &Object {
Object: e.(*upstream.Object),
fs: f,
co: entries,
}, nil
case *upstream.Directory:
return &Directory {
Directory: e.(*upstream.Directory),
cd: entries,
}, nil
default:
return nil, errors.Errorf("unknown object type %T", e)
}
}
// UnWrap returns the Object that this Object is wrapping or
// nil if it isn't wrapping anything
func (o *Object) UnWrap() *upstream.Object {
return o.Object
}
// Fs returns the union Fs as the parent
@ -69,6 +134,133 @@ func (o *Object) Fs() fs.Info {
return o.fs
}
func (o *Object) candidates() []upstream.Entry {
return o.co
}
func (d *Directory) candidates() []upstream.Entry {
return d.cd
}
// Update in to the object with the modTime given of the given size
//
// When called from outside a Fs by rclone, src.Size() will always be >= 0.
// But for unknown-sized objects (indicated by src.Size() == -1), Upload should either
// return an error or update the object properly (rather than e.g. calling panic).
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
f := o.Fs().(*Fs)
entries, err := f.actionEntries(o.candidates()...)
if err != nil {
return err
}
// Get mutliple reader
readers := make([]io.Reader, len(entries))
writers := make([]io.Writer, len(entries))
errs := make([]error, len(entries) + 1)
for i := range entries {
r, w := io.Pipe()
bw := bufio.NewWriter(w)
readers[i], writers[i] = r, bw
defer func () {
w.Close()
}()
}
go func() {
mw := io.MultiWriter(writers...)
_, errs[len(entries)] = io.Copy(mw, in)
for _, bw := range writers {
bw.(*bufio.Writer).Flush()
}
}()
// Multi-threading
multithread(len(entries), func(i int){
if o, ok := entries[i].(*upstream.Object); ok {
errs[i] = o.Update(ctx, readers[i], src, options...)
} else {
errs[i] = fs.ErrorNotAFile
}
})
// Get an object for future operation
for _, err := range errs {
if err != nil {
return err
}
}
return nil
}
// Remove candidate objects selected by ACTION policy
func (o *Object) Remove(ctx context.Context) error {
f := o.Fs().(*Fs)
entries, err := f.actionEntries(o.candidates()...)
if err != nil {
return err
}
errs := make([]error, len(entries))
multithread(len(entries), func(i int){
if o, ok := entries[i].(*upstream.Object); ok {
errs[i] = o.Remove(ctx)
} else {
errs[i] = fs.ErrorNotAFile
}
})
for _, err := range errs {
if err != nil {
return err
}
}
return nil
}
// SetModTime sets the metadata on the object to set the modification date
func (o *Object) SetModTime(ctx context.Context, t time.Time) error {
f := o.Fs().(*Fs)
entries, err := f.actionEntries(o.candidates()...)
if err != nil {
return err
}
var wg sync.WaitGroup
errs := make([]error, len(entries))
multithread(len(entries), func(i int){
if o, ok := entries[i].(*upstream.Object); ok {
errs[i] = o.SetModTime(ctx, t)
} else {
errs[i] = fs.ErrorNotAFile
}
})
wg.Wait()
for _, err := range errs {
if err != nil {
return err
}
}
return nil
}
// ModTime returns the modification date of the directory
// It returns the latest ModTime of all candidates
func (d *Directory) ModTime(ctx context.Context) (t time.Time) {
entries := d.candidates()
times := make([]time.Time, len(entries))
multithread(len(entries), func(i int){
times[i] = entries[i].ModTime(ctx)
})
for _, ti := range times {
if t.Before(ti) {
t = ti
}
}
return t
}
// Size returns the size of the directory
// It returns the sum of all candidates
func (d *Directory) Size() (s int64) {
for _, e := range d.candidates() {
s += e.Size()
}
return s
}
// Name of the remote (as passed into NewFs)
func (f *Fs) Name() string {
return f.name
@ -91,7 +283,20 @@ func (f *Fs) Features() *fs.Features {
// Rmdir removes the root directory of the Fs object
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
return f.wr.Rmdir(ctx, dir)
upstreams, err := f.action(ctx, dir)
if err != nil {
return err
}
errs := make([]error, len(upstreams))
multithread(len(upstreams), func(i int){
errs[i] = upstreams[i].Rmdir(ctx, dir)
})
for _, err := range errs {
if err != nil {
return err
}
}
return nil
}
// Hashes returns hash.HashNone to indicate remote hashing is unavailable
@ -101,7 +306,21 @@ func (f *Fs) Hashes() hash.Set {
// Mkdir makes the root directory of the Fs object
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
return f.wr.Mkdir(ctx, dir)
parent := parentDir(dir)
upstreams, err := f.create(ctx, parent)
if err != nil {
return err
}
errs := make([]error, len(upstreams))
multithread(len(upstreams), func(i int){
errs[i] = upstreams[i].Mkdir(ctx, dir)
})
for _, err := range errs {
if err != nil {
return err
}
}
return nil
}
// Purge all files in the root and the root directory
@ -111,7 +330,27 @@ func (f *Fs) Mkdir(ctx context.Context, dir string) error {
//
// Return an error if it doesn't exist
func (f *Fs) Purge(ctx context.Context) error {
return f.wr.Features().Purge(ctx)
for _, r := range f.upstreams {
if r.Features().Purge == nil {
return fs.ErrorCantPurge
}
}
for _, r := range f.upstreams {
// Can't Purge if any read-only upstreams
if !r.IsWritable() {
return fs.ErrorPermissionDenied
}
}
errs := make([]error, len(f.upstreams))
multithread(len(f.upstreams), func(i int){
errs[i] = f.upstreams[i].Features().Purge(ctx)
})
for _, err := range errs {
if err != nil {
return err
}
}
return nil
}
// Copy src to this remote using server side copy operations.
@ -124,15 +363,26 @@ func (f *Fs) Purge(ctx context.Context) error {
//
// If it isn't possible then return fs.ErrorCantCopy
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
if src.Fs() != f.wr {
srcObj, ok := src.(*Object)
if !ok {
fs.Debugf(src, "Can't copy - not same remote type")
return nil, fs.ErrorCantCopy
}
o, err := f.wr.Features().Copy(ctx, src, remote)
if err != nil {
o := srcObj.UnWrap()
u := o.UpstreamFs()
do := u.Features().Copy
if do == nil {
return nil, fs.ErrorCantCopy
}
if !u.IsCreatable() {
return nil, fs.ErrorPermissionDenied
}
co, err := do(ctx, o, remote)
if err != nil || co == nil {
return nil, err
}
return f.wrapObject(o), nil
wo, err := f.wrapEntries(u.WrapObject(co))
return wo.(*Object), err
}
// Move src to this remote using server side move operations.
@ -145,15 +395,52 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
//
// If it isn't possible then return fs.ErrorCantMove
func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
if src.Fs() != f.wr {
o, ok := src.(*Object)
if !ok {
fs.Debugf(src, "Can't move - not same remote type")
return nil, fs.ErrorCantMove
}
o, err := f.wr.Features().Move(ctx, src, remote)
entries, err := f.actionEntries(o.candidates()...)
if err != nil {
return nil, err
}
return f.wrapObject(o), err
for _, e := range entries {
if e.UpstreamFs().Features().Move == nil {
return nil, fs.ErrorCantMove
}
}
objs := make([]*upstream.Object, len(entries))
errs := make([]error, len(entries))
multithread(len(entries), func(i int){
u := entries[i].UpstreamFs()
o, ok := entries[i].(*upstream.Object);
if !ok {
errs[i] = fs.ErrorNotAFile
return
}
mo, err := u.Features().Move(ctx, o, remote)
if err != nil || mo == nil {
errs[i] = err
return
}
objs[i] = u.WrapObject(mo)
})
var en []upstream.Entry
for _, o := range objs {
if o != nil {
en = append(en, o)
}
}
e, err := f.wrapEntries(en...)
if err != nil {
return nil, err
}
for _, err := range errs {
if err != nil {
return e.(*Object), err
}
}
return e.(*Object), nil
}
// DirMove moves src, srcRemote to this remote at dstRemote
@ -165,12 +452,31 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object,
//
// If destination exists then return fs.ErrorDirExists
func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) error {
srcFs, ok := src.(*Fs)
_, ok := src.(*Fs)
if !ok {
fs.Debugf(srcFs, "Can't move directory - not same remote type")
fs.Debugf(src, "Can't move directory - not same remote type")
return fs.ErrorCantDirMove
}
return f.wr.Features().DirMove(ctx, srcFs.wr, srcRemote, dstRemote)
upstreams, err := f.action(ctx, srcRemote)
if err != nil {
return err
}
for _, u := range upstreams {
if u.Features().DirMove == nil {
return fs.ErrorCantDirMove
}
}
errs := make([]error, len(upstreams))
multithread(len(upstreams), func(i int){
u := upstreams[i]
errs[i] = u.Features().DirMove(ctx, u, srcRemote, dstRemote)
})
for _, err := range errs {
if err != nil {
return err
}
}
return nil
}
// ChangeNotify calls the passed function with a path
@ -183,23 +489,23 @@ func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string
// regularly. When the channel gets closed, the implementation
// should stop polling and release resources.
func (f *Fs) ChangeNotify(ctx context.Context, fn func(string, fs.EntryType), ch <-chan time.Duration) {
var remoteChans []chan time.Duration
var uChans []chan time.Duration
for _, remote := range f.remotes {
if ChangeNotify := remote.Features().ChangeNotify; ChangeNotify != nil {
for _, u := range f.upstreams {
if ChangeNotify := u.Features().ChangeNotify; ChangeNotify != nil {
ch := make(chan time.Duration)
remoteChans = append(remoteChans, ch)
uChans = append(uChans, ch)
ChangeNotify(ctx, fn, ch)
}
}
go func() {
for i := range ch {
for _, c := range remoteChans {
for _, c := range uChans {
c <- i
}
}
for _, c := range remoteChans {
for _, c := range uChans {
close(c)
}
}()
@ -208,29 +514,81 @@ func (f *Fs) ChangeNotify(ctx context.Context, fn func(string, fs.EntryType), ch
// DirCacheFlush resets the directory cache - used in testing
// as an optional interface
func (f *Fs) DirCacheFlush() {
for _, remote := range f.remotes {
if DirCacheFlush := remote.Features().DirCacheFlush; DirCacheFlush != nil {
DirCacheFlush()
multithread(len(f.upstreams), func(i int){
do := f.upstreams[i].Features().DirCacheFlush;
if do != nil {
do()
}
})
}
func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, stream bool, options ...fs.OpenOption) (fs.Object, error) {
srcPath := src.Remote()
u, err := f.search(ctx, srcPath)
upstreams := []*upstream.Fs{u}
if err != nil || len(upstreams) == 0 {
upstreams, err = f.create(ctx, parentDir(srcPath))
if err != nil {
return nil, err
}
}
}
// PutStream uploads to the remote path with the modTime given of indeterminate size
//
// May create the object even if it returns an error - if so
// will return the object and the error, otherwise will return
// nil and the error
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
o, err := f.wr.Features().PutStream(ctx, in, src, options...)
if err != nil {
return nil, err
// Get mutliple reader
readers := make([]*io.PipeReader, len(upstreams))
writers := make([]*io.PipeWriter, len(upstreams))
errs := make([]error, len(upstreams) + 1)
for i := range upstreams {
r, w := io.Pipe()
readers[i], writers[i] = r, w
}
return f.wrapObject(o), err
}
// About gets quota information from the Fs
func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
return f.wr.Features().About(ctx)
go func() {
bufw := make([]io.Writer, len(writers))
for i, w := range writers {
bufw[i] = bufio.NewWriter(w)
}
mw := io.MultiWriter(bufw...)
_, err := io.Copy(mw, in)
if err != nil {
errs[len(upstreams)] = err
}
for _, bw := range bufw {
bw.(*bufio.Writer).Flush()
}
}()
// Multi-threading
var wg sync.WaitGroup
objs := make([]upstream.Entry, len(upstreams))
for i, u := range upstreams {
wg.Add(1)
i, u := i, u // Closure
go func() {
defer wg.Done()
var o fs.Object
var err error
if stream {
o, err = u.Features().PutStream(ctx, readers[i], src, options...)
} else {
o, err = u.Put(ctx, readers[i], src, options...)
}
if err != nil {
errs[i] = err
return
}
objs[i] = u.WrapObject(o)
}()
}
wg.Wait()
for _, w := range writers {
w.Close()
}
// Get an object for future operation
e, _ := f.wrapEntries(objs...)
for _, err := range errs {
if err != nil {
return e.(*Object), err
}
}
return e.(*Object), nil
}
// Put in to the remote path with the modTime given of the given size
@ -239,11 +597,65 @@ func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
// will return the object and the error, otherwise will return
// nil and the error
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
o, err := f.wr.Put(ctx, in, src, options...)
if err != nil {
return nil, err
return f.put(ctx, in, src, false, options...)
}
// PutStream uploads to the remote path with the modTime given of indeterminate size
//
// May create the object even if it returns an error - if so
// will return the object and the error, otherwise will return
// nil and the error
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
return f.put(ctx, in, src, true, options...)
}
// About gets quota information from the Fs
func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
usage := &fs.Usage{
Total: new(int64),
Used: new(int64),
Trashed: new(int64),
Other: new(int64),
Free: new(int64),
Objects: new(int64),
}
return f.wrapObject(o), err
for _, u := range f.upstreams {
usg, err := u.About(ctx)
if err != nil {
return nil, err
}
if usg.Total != nil && usage.Total != nil {
*usage.Total += *usg.Total
} else {
usage.Total = nil
}
if usg.Used != nil && usage.Used != nil {
*usage.Used += *usg.Used
} else {
usage.Used = nil
}
if usg.Trashed != nil && usage.Trashed != nil {
*usage.Trashed += *usg.Trashed
} else {
usage.Trashed = nil
}
if usg.Other != nil && usage.Other != nil {
*usage.Other += *usg.Other
} else {
usage.Other = nil
}
if usg.Free != nil && usage.Free != nil {
*usage.Free += *usg.Free
} else {
usage.Free = nil
}
if usg.Objects != nil && usage.Objects != nil {
*usage.Objects += *usg.Objects
} else {
usage.Objects = nil
}
}
return usage, nil
}
// List the objects and directories in dir into entries. The
@ -256,60 +668,132 @@ func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options .
// This should return ErrDirNotFound if the directory isn't
// found.
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
set := make(map[string]fs.DirEntry)
found := false
for _, remote := range f.remotes {
var remoteEntries, err = remote.List(ctx, dir)
if err == fs.ErrorDirNotFound {
continue
var found int32
entriess := make([][]upstream.Entry, len(f.upstreams))
errs := make([]error, len(f.upstreams))
multithread(len(f.upstreams), func(i int){
u := f.upstreams[i]
entries, err := u.List(ctx, dir)
if err != nil && err != fs.ErrorDirNotFound {
errs[i] = err
return
}
if err != nil {
return nil, errors.Wrapf(err, "List failed on %v", remote)
atomic.StoreInt32(&found, 1)
uEntries := make([]upstream.Entry, len(entries))
for j, e := range entries {
uEntries[j], _ = u.WrapEntry(e)
}
found = true
for _, remoteEntry := range remoteEntries {
set[remoteEntry.Remote()] = remoteEntry
}
}
if !found {
entriess[i] = uEntries
})
if found == 0 {
return nil, fs.ErrorDirNotFound
}
for _, entry := range set {
if o, ok := entry.(fs.Object); ok {
entry = f.wrapObject(o)
entries, err = f.mergeDirEntries(entriess)
if err != nil {
return entries, err
}
for _, err := range errs {
if err != nil {
return entries, err
}
entries = append(entries, entry)
}
return entries, nil
}
// NewObject creates a new remote union file object based on the first Object it finds (reverse remote order)
func (f *Fs) NewObject(ctx context.Context, path string) (fs.Object, error) {
for i := range f.remotes {
var remote = f.remotes[len(f.remotes)-i-1]
var obj, err = remote.NewObject(ctx, path)
if err == fs.ErrorObjectNotFound {
continue
// NewObject creates a new remote union file object
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
objs := make([]*upstream.Object, len(f.upstreams))
errs := make([]error, len(f.upstreams))
multithread(len(f.upstreams), func(i int){
u := f.upstreams[i]
o, err := u.NewObject(ctx, remote)
if err != nil && err != fs.ErrorObjectNotFound {
errs[i] = err
return
}
if err != nil {
return nil, errors.Wrapf(err, "NewObject failed on %v", remote)
objs[i] = u.WrapObject(o)
})
var entries []upstream.Entry
for _, o := range objs {
if o != nil {
entries = append(entries, o)
}
return f.wrapObject(obj), nil
}
return nil, fs.ErrorObjectNotFound
if len(entries) == 0 {
return nil, fs.ErrorObjectNotFound
}
e, err := f.wrapEntries(entries...)
if err != nil {
return nil, err
}
for _, err := range errs {
if err != nil {
return e.(*Object), err
}
}
return e.(*Object), nil
}
// Precision is the greatest Precision of all remotes
// Precision is the greatest Precision of all upstreams
func (f *Fs) Precision() time.Duration {
var greatestPrecision time.Duration
for _, remote := range f.remotes {
if remote.Precision() > greatestPrecision {
greatestPrecision = remote.Precision()
for _, u := range f.upstreams {
if u.Precision() > greatestPrecision {
greatestPrecision = u.Precision()
}
}
return greatestPrecision
}
func (f *Fs) action(ctx context.Context, path string) ([]*upstream.Fs, error) {
return f.actionPolicy.Action(ctx, f.upstreams, path)
}
func (f *Fs) actionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
return f.actionPolicy.ActionEntries(entries...)
}
func (f *Fs) create(ctx context.Context, path string) ([]*upstream.Fs, error) {
return f.createPolicy.Create(ctx, f.upstreams, path)
}
func (f *Fs) createEntries(entries ...upstream.Entry) ([]upstream.Entry, error) {
return f.createPolicy.CreateEntries(entries...)
}
func (f *Fs) search(ctx context.Context, path string) (*upstream.Fs, error) {
return f.searchPolicy.Search(ctx, f.upstreams, path)
}
func (f *Fs) searchEntries(entries ...upstream.Entry) (upstream.Entry, error) {
return f.searchPolicy.SearchEntries(entries...)
}
func (f *Fs) mergeDirEntries(entriess [][]upstream.Entry) (fs.DirEntries, error) {
entryMap := make(map[string]([]upstream.Entry))
for _, en := range entriess {
if en == nil {
continue
}
for _, entry := range en {
remote := entry.Remote()
if f.Features().CaseInsensitive {
remote = strings.ToLower(remote)
}
entryMap[remote] = append(entryMap[remote], entry)
}
}
var entries fs.DirEntries
for path := range entryMap {
e, err := f.wrapEntries(entryMap[path]...)
if err != nil {
return nil, err
}
entries = append(entries, e)
}
return entries, nil
}
// NewFs constructs an Fs from the path.
//
// The returned Fs is the actual Fs, referenced by remote in the config
@ -320,51 +804,46 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
if err != nil {
return nil, err
}
if len(opt.Remotes) == 0 {
return nil, errors.New("union can't point to an empty remote - check the value of the remotes setting")
if len(opt.Upstreams) == 0 {
return nil, errors.New("union can't point to an empty upstream - check the value of the upstreams setting")
}
if len(opt.Remotes) == 1 {
return nil, errors.New("union can't point to a single remote - check the value of the remotes setting")
if len(opt.Upstreams) == 1 {
return nil, errors.New("union can't point to a single upstream - check the value of the upstreams setting")
}
for _, remote := range opt.Remotes {
if strings.HasPrefix(remote, name+":") {
return nil, errors.New("can't point union remote at itself - check the value of the remote setting")
for _, u := range opt.Upstreams {
if strings.HasPrefix(u, name+":") {
return nil, errors.New("can't point union remote at itself - check the value of the upstreams setting")
}
}
var remotes []fs.Fs
for i := range opt.Remotes {
var upstreams []*upstream.Fs
for i := range opt.Upstreams {
// Last remote first so we return the correct (last) matching fs in case of fs.ErrorIsFile
var remote = opt.Remotes[len(opt.Remotes)-i-1]
_, configName, fsPath, err := fs.ParseRemote(remote)
var u = opt.Upstreams[len(opt.Upstreams)-i-1]
uFs, err := upstream.New(u, root, time.Duration(opt.CacheTime) * time.Second)
if err != nil {
return nil, err
}
var rootString = path.Join(fsPath, filepath.ToSlash(root))
if configName != "local" {
rootString = configName + ":" + rootString
}
myFs, err := cache.Get(rootString)
if err != nil {
if err == fs.ErrorIsFile {
return myFs, err
}
return nil, err
}
remotes = append(remotes, myFs)
}
// Reverse the remotes again so they are in the order as before
for i, j := 0, len(remotes)-1; i < j; i, j = i+1, j-1 {
remotes[i], remotes[j] = remotes[j], remotes[i]
upstreams = append(upstreams, uFs)
}
f := &Fs{
name: name,
root: root,
opt: *opt,
remotes: remotes,
wr: remotes[len(remotes)-1],
name: name,
root: root,
opt: *opt,
upstreams: upstreams,
}
f.actionPolicy, err = policy.Get(opt.ActionPolicy)
if err != nil {
return nil, err
}
f.createPolicy, err = policy.Get(opt.CreatePolicy)
if err != nil {
return nil, err
}
f.searchPolicy, err = policy.Get(opt.SearchPolicy)
if err != nil {
return nil, err
}
var features = (&fs.Features{
CaseInsensitive: true,
@ -376,9 +855,14 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
SetTier: true,
GetTier: true,
}).Fill(f)
features = features.Mask(f.wr) // mask the features just on the writable fs
for _, f := range upstreams {
if !f.IsWritable() {
continue
}
features = features.Mask(f) // Mask all writable upstream fs
}
// Really need the union of all remotes for these, so
// Really need the union of all upstreams for these, so
// re-instate and calculate separately.
features.ChangeNotify = f.ChangeNotify
features.DirCacheFlush = f.DirCacheFlush
@ -388,12 +872,12 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
// Clear ChangeNotify and DirCacheFlush if all are nil
clearChangeNotify := true
clearDirCacheFlush := true
for _, remote := range f.remotes {
remoteFeatures := remote.Features()
if remoteFeatures.ChangeNotify != nil {
for _, u := range f.upstreams {
uFeatures := u.Features()
if uFeatures.ChangeNotify != nil {
clearChangeNotify = false
}
if remoteFeatures.DirCacheFlush != nil {
if uFeatures.DirCacheFlush != nil {
clearDirCacheFlush = false
}
}
@ -407,15 +891,36 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
f.features = features
// Get common intersection of hashes
hashSet := f.remotes[0].Hashes()
for _, remote := range f.remotes[1:] {
hashSet = hashSet.Overlap(remote.Hashes())
hashSet := f.upstreams[0].Hashes()
for _, u := range f.upstreams[1:] {
hashSet = hashSet.Overlap(u.Hashes())
}
f.hashSet = hashSet
return f, nil
}
func parentDir(absPath string) string {
parent := path.Dir(strings.TrimRight(filepath.ToSlash(absPath), "/"))
if parent == "." {
parent = ""
}
return parent
}
func multithread(num int, fn func(int)) {
var wg sync.WaitGroup
for i := 0; i < num; i++ {
wg.Add(1)
i := i
go func() {
defer wg.Done()
fn(i)
}()
}
wg.Wait()
}
// Check the interfaces are satisfied
var (
_ fs.Fs = (*Fs)(nil)

View File

@ -0,0 +1,311 @@
package upstream
import (
"context"
"io"
"path"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/cache"
)
var (
// ErrUsageFieldNotSupported stats the usage field is not supported by the backend
ErrUsageFieldNotSupported = errors.New("this usage field is not supported")
)
const (
unInitilized uint32 = iota
initilizing
normal
updating
)
// Fs is a wrap of any fs and its configs
type Fs struct {
fs.Fs
writable bool
creatable bool
usage *fs.Usage // Cache the usage
cacheMutex sync.RWMutex
cacheExpiry int64 // usage cache expiry time
cacheTime time.Duration // cache duration
cacheState uint32 // if the cache is updating
}
// Directory describes a wrapped Directory
//
// This is a wrapped Directory which contains the upstream Fs
type Directory struct {
fs.Directory
f *Fs
}
// Object describes a wrapped Object
//
// This is a wrapped Object which contains the upstream Fs
type Object struct {
fs.Object
f *Fs
}
// Entry describe a warpped fs.DirEntry interface with the
// information of upstream Fs
type Entry interface {
fs.DirEntry
UpstreamFs() *Fs
}
// New creates a new Fs based on the
// string formatted `type:root_path(:ro/:nc)`
func New(remote, root string, cacheTime time.Duration) (*Fs, error) {
_, configName, fsPath, err := fs.ParseRemote(remote)
if err != nil {
return nil, err
}
rFs := &Fs{
writable: true,
creatable: true,
cacheExpiry: time.Now().Unix(),
cacheTime: cacheTime,
usage: &fs.Usage{},
}
if strings.HasSuffix(fsPath, ":ro") {
rFs.writable = false
rFs.creatable = false
fsPath = fsPath[0 : len(fsPath)-3]
} else if strings.HasSuffix(fsPath, ":nc") {
rFs.writable = true
rFs.creatable = false
fsPath = fsPath[0 : len(fsPath)-3]
}
var rootString = path.Join(fsPath, filepath.ToSlash(root))
if configName != "local" {
rootString = configName + ":" + rootString
}
myFs, err := cache.Get(rootString)
if err != nil {
return nil, err
}
rFs.Fs = myFs
return rFs, nil
}
// WrapDirectory wraps a fs.Directory to include the info
// of the upstream Fs
func (f *Fs) WrapDirectory(e fs.Directory) *Directory {
if e == nil {
return nil
}
return &Directory{
Directory: e,
f: f,
}
}
// WrapObject wraps a fs.Object to include the info
// of the upstream Fs
func (f *Fs) WrapObject(o fs.Object) *Object {
if o == nil {
return nil
}
return &Object{
Object: o,
f: f,
}
}
// WrapEntry wraps a fs.DirEntry to include the info
// of the upstream Fs
func (f *Fs) WrapEntry(e fs.DirEntry) (Entry, error) {
switch e.(type) {
case fs.Object:
return f.WrapObject(e.(fs.Object)), nil
case fs.Directory:
return f.WrapDirectory(e.(fs.Directory)), nil
default:
return nil, errors.Errorf("unknown object type %T", e)
}
}
// UpstreamFs get the upstream Fs the entry is stored in
func (e *Directory) UpstreamFs() *Fs {
return e.f
}
// UpstreamFs get the upstream Fs the entry is stored in
func (o *Object) UpstreamFs() *Fs {
return o.f
}
// IsCreatable return if the fs is allowed to create new objects
func (f *Fs) IsCreatable() bool {
return f.creatable
}
// IsWritable return if the fs is allowed to write
func (f *Fs) IsWritable() bool {
return f.writable
}
// Put in to the remote path with the modTime given of the given size
//
// May create the object even if it returns an error - if so
// will return the object and the error, otherwise will return
// nil and the error
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
o, err := f.Fs.Put(ctx, in, src, options...)
if err != nil {
return o, err
}
f.cacheMutex.Lock()
defer f.cacheMutex.Unlock()
size := src.Size()
if f.usage.Used != nil {
*f.usage.Used += size
}
if f.usage.Free != nil {
*f.usage.Free -= size
}
return o, nil
}
// PutStream uploads to the remote path with the modTime given of indeterminate size
//
// May create the object even if it returns an error - if so
// will return the object and the error, otherwise will return
// nil and the error
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
do := f.Features().PutStream
if do == nil {
return nil, fs.ErrorNotImplemented
}
o, err := do(ctx, in, src, options...)
if err != nil {
return o, err
}
f.cacheMutex.Lock()
defer f.cacheMutex.Unlock()
size := o.Size()
if f.usage.Used != nil {
*f.usage.Used += size
}
if f.usage.Free != nil {
*f.usage.Free -= size
}
return o, nil
}
// Update in to the object with the modTime given of the given size
//
// When called from outside a Fs by rclone, src.Size() will always be >= 0.
// But for unknown-sized objects (indicated by src.Size() == -1), Upload should either
// return an error or update the object properly (rather than e.g. calling panic).
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
size := o.Size()
err := o.Object.Update(ctx, in, src, options...)
if err != nil {
return err
}
o.f.cacheMutex.Lock()
defer o.f.cacheMutex.Unlock()
delta := o.Size() - size
if delta <= 0 {
return nil
}
if o.f.usage.Used != nil {
*o.f.usage.Used += size
}
if o.f.usage.Free != nil {
*o.f.usage.Free -= size
}
return nil
}
// About gets quota information from the Fs
func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
if atomic.LoadInt64(&f.cacheExpiry) <= time.Now().Unix() {
err := f.updateUsage()
if err != nil {
return nil, ErrUsageFieldNotSupported
}
}
f.cacheMutex.RLock()
defer f.cacheMutex.RUnlock()
return f.usage, nil
}
// GetFreeSpace get the free space of the fs
func (f *Fs) GetFreeSpace() (int64, error) {
if atomic.LoadInt64(&f.cacheExpiry) <= time.Now().Unix() {
err := f.updateUsage()
if err != nil {
return 0, ErrUsageFieldNotSupported
}
}
f.cacheMutex.RLock()
defer f.cacheMutex.RUnlock()
if f.usage.Free == nil {
return 0, ErrUsageFieldNotSupported
}
return *f.usage.Free, nil
}
// GetUsedSpace get the used space of the fs
func (f *Fs) GetUsedSpace() (int64, error) {
if atomic.LoadInt64(&f.cacheExpiry) <= time.Now().Unix() {
err := f.updateUsage()
if err != nil {
return 0, ErrUsageFieldNotSupported
}
}
f.cacheMutex.RLock()
defer f.cacheMutex.RUnlock()
if f.usage.Used == nil {
return 0, ErrUsageFieldNotSupported
}
return *f.usage.Used, nil
}
func (f *Fs) updateUsage() error {
if do := f.Fs.Features().About; do == nil {
return ErrUsageFieldNotSupported
}
if atomic.LoadUint32(&f.cacheState) == unInitilized {
f.cacheMutex.Lock()
defer f.cacheMutex.Unlock()
if !atomic.CompareAndSwapUint32(&f.cacheState, unInitilized, initilizing) {
return f.updateUsage()
}
return f.updateUsageCore(false)
}
if atomic.CompareAndSwapUint32(&f.cacheState, normal, updating) {
go f.updateUsageCore(true)
}
return nil
}
func (f *Fs) updateUsageCore(lock bool) error {
defer func() {
atomic.StoreInt64(&f.cacheExpiry, time.Now().Add(f.cacheTime).Unix())
atomic.StoreUint32(&f.cacheState, normal)
}()
if (lock) {
f.cacheMutex.Lock()
defer f.cacheMutex.Unlock()
}
// Run in background, should not be cancelled by user
ctx, _ := context.WithTimeout(context.Background(), 15*time.Second)
usage, err := f.Features().About(ctx)
if err != nil {
return err
}
f.usage = usage
return nil
}