diff --git a/backend/union/policy/all.go b/backend/union/policy/all.go new file mode 100644 index 000000000..42f7a263b --- /dev/null +++ b/backend/union/policy/all.go @@ -0,0 +1,44 @@ +package policy + +import ( + "context" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/backend/union/upstream" +) + +func init(){ + registerPolicy("all", &All{}) +} + +// All policy behaves the same as EpAll except for the CREATE category +// Action category: same as epall. +// Create category: apply to all branches. +// Search category: same as epall. +type All struct { + EpAll +} + +// Create category policy, governing the creation of files and directories +func (p *All) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams = filterNC(upstreams) + if len(upstreams) == 0 { + return nil, fs.ErrorPermissionDenied + } + return upstreams, nil +} + +// CreateEntries is CREATE category policy but receving a set of candidate entries +func (p *All) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + entries = filterNCEntries(entries) + if len(entries) == 0 { + return nil, fs.ErrorPermissionDenied + } + return entries, nil +} \ No newline at end of file diff --git a/backend/union/policy/epall.go b/backend/union/policy/epall.go new file mode 100644 index 000000000..e1c5f9d34 --- /dev/null +++ b/backend/union/policy/epall.go @@ -0,0 +1,72 @@ +package policy + +import ( + "context" + "sync" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/backend/union/upstream" +) + +func init(){ + registerPolicy("epall", &EpAll{}) +} + +// EpAll stands for existing path, All +// Action category: apply to all found. +// Create category: apply to all found. +// Search category: same as epff. +type EpAll struct { + EpFF +} + +func (p *EpAll) epall(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + var wg sync.WaitGroup + ufs := make([]*upstream.Fs, len(upstreams)) + for i, u := range upstreams { + wg.Add(1) + i, u := i, u // Closure + go func() { + if exists(ctx, u, path) { + ufs[i] = u + } + wg.Done() + }() + } + wg.Wait() + var results []*upstream.Fs + for _, f := range ufs { + if f != nil { + results = append(results, f) + } + } + if len(results) == 0 { + return nil, fs.ErrorObjectNotFound + } + return results, nil +} + +// Action category policy, governing the modification of files and directories +func (p *EpAll) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams = filterRO(upstreams) + if len(upstreams) == 0 { + return nil, fs.ErrorPermissionDenied + } + return p.epall(ctx, upstreams, path) +} + +// Create category policy, governing the creation of files and directories +func (p *EpAll) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams = filterNC(upstreams) + if len(upstreams) == 0 { + return nil, fs.ErrorPermissionDenied + } + upstreams, err := p.epall(ctx, upstreams, path) + return upstreams, err +} \ No newline at end of file diff --git a/backend/union/policy/epff.go b/backend/union/policy/epff.go new file mode 100644 index 000000000..2da8b3f5a --- /dev/null +++ b/backend/union/policy/epff.go @@ -0,0 +1,112 @@ +package policy + +import ( + "context" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/backend/union/upstream" +) + +func init(){ + registerPolicy("epff", &EpFF{}) +} + +// EpFF stands for existing path, first found +// Given the order of the candidates, act on the first one found where the relative path exists. +type EpFF struct {} + +func (p *EpFF) epff(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) { + ch := make(chan *upstream.Fs) + for _, r := range upstreams { + r := r // Closure + go func() { + if !exists(ctx, r, path) { + r = nil + } + ch <- r + }() + } + var r *upstream.Fs + for i := 0; i < len(upstreams); i++ { + r = <- ch + if r != nil { + // close remaining goroutines + go func(num int) { + defer close(ch) + for i := 0; i < num; i++ { + <- ch + } + }(len(upstreams) - 1 - i) + } + } + if r == nil { + return nil, fs.ErrorObjectNotFound + } + return r, nil +} + +// Action category policy, governing the modification of files and directories +func (p *EpFF) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams = filterRO(upstreams) + if len(upstreams) == 0 { + return nil, fs.ErrorPermissionDenied + } + r, err := p.epff(ctx, upstreams, path) + return []*upstream.Fs{r}, err +} + +// ActionEntries is ACTION category policy but receving a set of candidate entries +func (p *EpFF) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + entries = filterROEntries(entries) + if len(entries) == 0 { + return nil, fs.ErrorPermissionDenied + } + return entries[:1], nil +} + +// Create category policy, governing the creation of files and directories +func (p *EpFF) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams = filterNC(upstreams) + if len(upstreams) == 0 { + return nil, fs.ErrorPermissionDenied + } + r, err := p.epff(ctx, upstreams, path) + return []*upstream.Fs{r}, err +} + +// CreateEntries is CREATE category policy but receving a set of candidate entries +func (p *EpFF) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + entries = filterNCEntries(entries) + if len(entries) == 0 { + return nil, fs.ErrorPermissionDenied + } + return entries[:1], nil +} + +// Search category policy, governing the access to files and directories +func (p *EpFF) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + return p.epff(ctx, upstreams, path) +} + +// SearchEntries is SEARCH category policy but receving a set of candidate entries +func (p *EpFF) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + return entries[0], nil +} \ No newline at end of file diff --git a/backend/union/policy/epmfs.go b/backend/union/policy/epmfs.go new file mode 100644 index 000000000..4693c8e9a --- /dev/null +++ b/backend/union/policy/epmfs.go @@ -0,0 +1,110 @@ +package policy + +import ( + "context" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/backend/union/upstream" +) + +func init(){ + registerPolicy("epmfs", &EpMfs{}) +} + +// EpMfs stands for existing path, most free space +// Of all the branches on which the path exists choose the drive with the most free space. +type EpMfs struct { + EpAll +} + +func (p *EpMfs) mfs(upstreams []*upstream.Fs) (*upstream.Fs, error) { + var maxFreeSpace int64 + var mfsupstream *upstream.Fs + for _, r := range upstreams { + space, err := r.GetFreeSpace() + if err != nil { + return nil, err + } + if maxFreeSpace < space { + maxFreeSpace = space + mfsupstream = r + } + } + return mfsupstream, nil +} + +func (p *EpMfs) mfsEntries(entries []upstream.Entry) (upstream.Entry, error) { + var maxFreeSpace int64 + var mfsEntry upstream.Entry + for _, e := range entries { + space, err := e.UpstreamFs().GetFreeSpace() + if err != nil { + return nil, err + } + if maxFreeSpace < space { + maxFreeSpace = space + mfsEntry = e + } + } + return mfsEntry, nil +} + +// Action category policy, governing the modification of files and directories +func (p *EpMfs) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + upstreams, err := p.EpAll.Action(ctx, upstreams, path) + if err != nil { + return nil, err + } + r, err := p.mfs(upstreams) + return []*upstream.Fs{r}, err +} + +// ActionEntries is ACTION category policy but receving a set of candidate entries +func (p *EpMfs) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + entries, err := p.EpAll.ActionEntries(entries...) + if err != nil { + return nil, err + } + e, err := p.mfsEntries(entries) + return []upstream.Entry{e}, err +} + +// Create category policy, governing the creation of files and directories +func (p *EpMfs) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + upstreams, err := p.EpAll.Create(ctx, upstreams, path) + if err != nil { + return nil, err + } + r, err := p.mfs(upstreams) + return []*upstream.Fs{r}, err +} + +// CreateEntries is CREATE category policy but receving a set of candidate entries +func (p *EpMfs) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + entries, err := p.EpAll.CreateEntries(entries...) + if err != nil { + return nil, err + } + e, err := p.mfsEntries(entries) + return []upstream.Entry{e}, err +} + +// Search category policy, governing the access to files and directories +func (p *EpMfs) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams, err := p.epall(ctx, upstreams, path) + if err != nil { + return nil, err + } + return p.mfs(upstreams) +} + +// SearchEntries is SEARCH category policy but receving a set of candidate entries +func (p *EpMfs) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + return p.mfsEntries(entries) +} \ No newline at end of file diff --git a/backend/union/policy/ff.go b/backend/union/policy/ff.go new file mode 100644 index 000000000..87be6904a --- /dev/null +++ b/backend/union/policy/ff.go @@ -0,0 +1,32 @@ +package policy + +import ( + "context" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/backend/union/upstream" +) + +func init(){ + registerPolicy("ff", &FF{}) +} + +// FF stands for first found +// Search category: same as epff. +// Action category: same as epff. +// Create category: Given the order of the candiates, act on the first one found. +type FF struct { + EpFF +} + +// Create category policy, governing the creation of files and directories +func (p *FF) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams = filterNC(upstreams) + if len(upstreams) == 0 { + return upstreams, fs.ErrorPermissionDenied + } + return upstreams[:1], nil +} \ No newline at end of file diff --git a/backend/union/policy/mfs.go b/backend/union/policy/mfs.go new file mode 100644 index 000000000..3143ca2ec --- /dev/null +++ b/backend/union/policy/mfs.go @@ -0,0 +1,31 @@ +package policy + +import ( + "context" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/backend/union/upstream" +) + +func init(){ + registerPolicy("mfs", &Mfs{}) +} + +// Mfs stands for most free space +// Of all the candidates on which the path exists choose the one with the most free space. +type Mfs struct { + EpMfs +} + +// Create category policy, governing the creation of files and directories +func (p *Mfs) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams = filterNC(upstreams) + if len(upstreams) == 0 { + return nil, fs.ErrorPermissionDenied + } + r, err := p.mfs(upstreams) + return []*upstream.Fs{r}, err +} diff --git a/backend/union/policy/policy.go b/backend/union/policy/policy.go new file mode 100644 index 000000000..91651d497 --- /dev/null +++ b/backend/union/policy/policy.go @@ -0,0 +1,123 @@ +package policy + +import ( + "context" + "strings" + "path" + "path/filepath" + + "github.com/pkg/errors" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/backend/union/upstream" +) + +var policies = make(map[string]Policy) + +// Policy is the interface of a set of defined behavior choosing +// the upstream Fs to operate on +type Policy interface { + // Action category policy, governing the modification of files and directories + Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) + + // Create category policy, governing the creation of files and directories + Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) + + // Search category policy, governing the access to files and directories + Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) + + // ActionEntries is ACTION category policy but receving a set of candidate entries + ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) + + // CreateEntries is CREATE category policy but receving a set of candidate entries + CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) + + // SearchEntries is SEARCH category policy but receving a set of candidate entries + SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) +} + +func registerPolicy(name string, p Policy) { + policies[strings.ToLower(name)] = p +} + +// Get a Policy from the list +func Get(name string) (Policy, error) { + p, ok := policies[strings.ToLower(name)] + if !ok { + return nil, errors.Errorf("didn't find policy called %q", name) + } + return p, nil +} + +func filterRO(ufs []*upstream.Fs) (wufs []*upstream.Fs) { + for _, u := range ufs { + if u.IsWritable() { + wufs = append(wufs, u) + } + } + return wufs +} + +func filterROEntries(ue []upstream.Entry) (wue []upstream.Entry) { + for _, e := range ue { + if e.UpstreamFs().IsWritable() { + wue = append(wue, e) + } + } + return wue +} + +func filterNC(ufs []*upstream.Fs) (wufs []*upstream.Fs) { + for _, u := range ufs { + if u.IsCreatable() { + wufs = append(wufs, u) + } + } + return wufs +} + +func filterNCEntries(ue []upstream.Entry) (wue []upstream.Entry) { + for _, e := range ue { + if e.UpstreamFs().IsCreatable() { + wue = append(wue, e) + } + } + return wue +} + +func parentDir(absPath string) string { + parent := path.Dir(strings.TrimRight(absPath, "/")) + if parent == "." { + parent = "" + } + return parent +} + +func clean(absPath string) string { + cleanPath := path.Clean(filepath.ToSlash(absPath)) + if cleanPath == "." { + cleanPath = "" + } + return cleanPath +} + +func exists(ctx context.Context, f fs.Fs, remote string) bool { + remote = clean(remote) + dir := parentDir(remote) + if remote == dir { + return true + } + found := false + entries, _ := f.List(ctx, dir); + for _, e := range entries { + eRemote := e.Remote() + if f.Features().CaseInsensitive { + found = strings.EqualFold(remote, eRemote) + } else { + found = (remote == eRemote) + } + if found { + break + } + } + return found +} diff --git a/backend/union/union.go b/backend/union/union.go index 8934a2526..2b7db71c2 100644 --- a/backend/union/union.go +++ b/backend/union/union.go @@ -2,16 +2,20 @@ package union import ( "context" + "bufio" "fmt" "io" "path" "path/filepath" "strings" + "sync" + "sync/atomic" "time" "github.com/pkg/errors" + "github.com/rclone/rclone/backend/union/upstream" + "github.com/rclone/rclone/backend/union/policy" "github.com/rclone/rclone/fs" - "github.com/rclone/rclone/fs/cache" "github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/config/configstruct" "github.com/rclone/rclone/fs/hash" @@ -21,12 +25,32 @@ import ( func init() { fsi := &fs.RegInfo{ Name: "union", - Description: "Union merges the contents of several remotes", + Description: "Union merges the contents of several upstream fs", NewFs: NewFs, Options: []fs.Option{{ - Name: "remotes", - Help: "List of space separated remotes.\nCan be 'remotea:test/dir remoteb:', '\"remotea:test/space dir\" remoteb:', etc.\nThe last remote is used to write to.", + Name: "upstreams", + Help: "List of space separated upstreams.\nCan be 'upstreama:test/dir upstreamb:', '\"remotea:test/space:ro dir\" remoteb:', etc.\n", Required: true, + }, { + Name: "action_policy", + Help: "Policy to choose upstream on ACTION class.", + Required: true, + Default: "epall", + }, { + Name: "create_policy", + Help: "Policy to choose upstream on CREATE class.", + Required: true, + Default: "epmfs", + }, { + Name: "search_policy", + Help: "Policy to choose upstream on SEARCH class.", + Required: true, + Default: "ff", + }, { + Name: "cache_time", + Help: "Cache time of usage and free space (in seconds)", + Required: true, + Default: 120, }}, } fs.Register(fsi) @@ -34,34 +58,75 @@ func init() { // Options defines the configuration for this backend type Options struct { - Remotes fs.SpaceSepList `config:"remotes"` + Upstreams fs.SpaceSepList `config:"upstreams"` + ActionPolicy string `config:"action_policy"` + CreatePolicy string `config:"create_policy"` + SearchPolicy string `config:"search_policy"` + CacheTime int `config:"cache_time"` } -// Fs represents a union of remotes +// Fs represents a union of upstreams type Fs struct { - name string // name of this remote - features *fs.Features // optional features - opt Options // options for this Fs - root string // the path we are working on - remotes []fs.Fs // slice of remotes - wr fs.Fs // writable remote - hashSet hash.Set // intersection of hash types + name string // name of this remote + features *fs.Features // optional features + opt Options // options for this Fs + root string // the path we are working on + upstreams []*upstream.Fs // slice of upstreams + hashSet hash.Set // intersection of hash types + actionPolicy policy.Policy // policy for ACTION + createPolicy policy.Policy // policy for CREATE + searchPolicy policy.Policy // policy for SEARCH } // Object describes a union Object // // This is a wrapped object which returns the Union Fs as its parent type Object struct { - fs.Object + *upstream.Object fs *Fs // what this object is part of + co []upstream.Entry } -// Wrap an existing object in the union Object -func (f *Fs) wrapObject(o fs.Object) *Object { - return &Object{ - Object: o, - fs: f, +// Directory describes a union Directory +// +// This is a wrapped object contains all candidates +type Directory struct { + *upstream.Directory + cd []upstream.Entry +} + +type entry interface { + upstream.Entry + candidates() []upstream.Entry +} + +// Wrap candidate objects in to an union Object +func (f *Fs) wrapEntries(entries ...upstream.Entry) (entry, error) { + e, err := f.searchEntries(entries...) + if err != nil { + return nil, err } + switch e.(type) { + case *upstream.Object: + return &Object { + Object: e.(*upstream.Object), + fs: f, + co: entries, + }, nil + case *upstream.Directory: + return &Directory { + Directory: e.(*upstream.Directory), + cd: entries, + }, nil + default: + return nil, errors.Errorf("unknown object type %T", e) + } +} + +// UnWrap returns the Object that this Object is wrapping or +// nil if it isn't wrapping anything +func (o *Object) UnWrap() *upstream.Object { + return o.Object } // Fs returns the union Fs as the parent @@ -69,6 +134,133 @@ func (o *Object) Fs() fs.Info { return o.fs } +func (o *Object) candidates() []upstream.Entry { + return o.co +} + +func (d *Directory) candidates() []upstream.Entry { + return d.cd +} + +// Update in to the object with the modTime given of the given size +// +// When called from outside a Fs by rclone, src.Size() will always be >= 0. +// But for unknown-sized objects (indicated by src.Size() == -1), Upload should either +// return an error or update the object properly (rather than e.g. calling panic). +func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { + f := o.Fs().(*Fs) + entries, err := f.actionEntries(o.candidates()...) + if err != nil { + return err + } + // Get mutliple reader + readers := make([]io.Reader, len(entries)) + writers := make([]io.Writer, len(entries)) + errs := make([]error, len(entries) + 1) + for i := range entries { + r, w := io.Pipe() + bw := bufio.NewWriter(w) + readers[i], writers[i] = r, bw + defer func () { + w.Close() + }() + } + go func() { + mw := io.MultiWriter(writers...) + _, errs[len(entries)] = io.Copy(mw, in) + for _, bw := range writers { + bw.(*bufio.Writer).Flush() + } + }() + // Multi-threading + multithread(len(entries), func(i int){ + if o, ok := entries[i].(*upstream.Object); ok { + errs[i] = o.Update(ctx, readers[i], src, options...) + } else { + errs[i] = fs.ErrorNotAFile + } + }) + // Get an object for future operation + for _, err := range errs { + if err != nil { + return err + } + } + return nil +} + +// Remove candidate objects selected by ACTION policy +func (o *Object) Remove(ctx context.Context) error { + f := o.Fs().(*Fs) + entries, err := f.actionEntries(o.candidates()...) + if err != nil { + return err + } + errs := make([]error, len(entries)) + multithread(len(entries), func(i int){ + if o, ok := entries[i].(*upstream.Object); ok { + errs[i] = o.Remove(ctx) + } else { + errs[i] = fs.ErrorNotAFile + } + }) + for _, err := range errs { + if err != nil { + return err + } + } + return nil +} +// SetModTime sets the metadata on the object to set the modification date +func (o *Object) SetModTime(ctx context.Context, t time.Time) error { + f := o.Fs().(*Fs) + entries, err := f.actionEntries(o.candidates()...) + if err != nil { + return err + } + var wg sync.WaitGroup + errs := make([]error, len(entries)) + multithread(len(entries), func(i int){ + if o, ok := entries[i].(*upstream.Object); ok { + errs[i] = o.SetModTime(ctx, t) + } else { + errs[i] = fs.ErrorNotAFile + } + }) + wg.Wait() + for _, err := range errs { + if err != nil { + return err + } + } + return nil +} + +// ModTime returns the modification date of the directory +// It returns the latest ModTime of all candidates +func (d *Directory) ModTime(ctx context.Context) (t time.Time) { + entries := d.candidates() + times := make([]time.Time, len(entries)) + multithread(len(entries), func(i int){ + times[i] = entries[i].ModTime(ctx) + }) + for _, ti := range times { + if t.Before(ti) { + t = ti + } + } + return t +} + +// Size returns the size of the directory +// It returns the sum of all candidates +func (d *Directory) Size() (s int64) { + for _, e := range d.candidates() { + s += e.Size() + } + return s +} + // Name of the remote (as passed into NewFs) func (f *Fs) Name() string { return f.name @@ -91,7 +283,20 @@ func (f *Fs) Features() *fs.Features { // Rmdir removes the root directory of the Fs object func (f *Fs) Rmdir(ctx context.Context, dir string) error { - return f.wr.Rmdir(ctx, dir) + upstreams, err := f.action(ctx, dir) + if err != nil { + return err + } + errs := make([]error, len(upstreams)) + multithread(len(upstreams), func(i int){ + errs[i] = upstreams[i].Rmdir(ctx, dir) + }) + for _, err := range errs { + if err != nil { + return err + } + } + return nil } // Hashes returns hash.HashNone to indicate remote hashing is unavailable @@ -101,7 +306,21 @@ func (f *Fs) Hashes() hash.Set { // Mkdir makes the root directory of the Fs object func (f *Fs) Mkdir(ctx context.Context, dir string) error { - return f.wr.Mkdir(ctx, dir) + parent := parentDir(dir) + upstreams, err := f.create(ctx, parent) + if err != nil { + return err + } + errs := make([]error, len(upstreams)) + multithread(len(upstreams), func(i int){ + errs[i] = upstreams[i].Mkdir(ctx, dir) + }) + for _, err := range errs { + if err != nil { + return err + } + } + return nil } // Purge all files in the root and the root directory @@ -111,7 +330,27 @@ func (f *Fs) Mkdir(ctx context.Context, dir string) error { // // Return an error if it doesn't exist func (f *Fs) Purge(ctx context.Context) error { - return f.wr.Features().Purge(ctx) + for _, r := range f.upstreams { + if r.Features().Purge == nil { + return fs.ErrorCantPurge + } + } + for _, r := range f.upstreams { + // Can't Purge if any read-only upstreams + if !r.IsWritable() { + return fs.ErrorPermissionDenied + } + } + errs := make([]error, len(f.upstreams)) + multithread(len(f.upstreams), func(i int){ + errs[i] = f.upstreams[i].Features().Purge(ctx) + }) + for _, err := range errs { + if err != nil { + return err + } + } + return nil } // Copy src to this remote using server side copy operations. @@ -124,15 +363,26 @@ func (f *Fs) Purge(ctx context.Context) error { // // If it isn't possible then return fs.ErrorCantCopy func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { - if src.Fs() != f.wr { + srcObj, ok := src.(*Object) + if !ok { fs.Debugf(src, "Can't copy - not same remote type") return nil, fs.ErrorCantCopy } - o, err := f.wr.Features().Copy(ctx, src, remote) - if err != nil { + o := srcObj.UnWrap() + u := o.UpstreamFs() + do := u.Features().Copy + if do == nil { + return nil, fs.ErrorCantCopy + } + if !u.IsCreatable() { + return nil, fs.ErrorPermissionDenied + } + co, err := do(ctx, o, remote) + if err != nil || co == nil { return nil, err } - return f.wrapObject(o), nil + wo, err := f.wrapEntries(u.WrapObject(co)) + return wo.(*Object), err } // Move src to this remote using server side move operations. @@ -145,15 +395,52 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, // // If it isn't possible then return fs.ErrorCantMove func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { - if src.Fs() != f.wr { + o, ok := src.(*Object) + if !ok { fs.Debugf(src, "Can't move - not same remote type") return nil, fs.ErrorCantMove } - o, err := f.wr.Features().Move(ctx, src, remote) + entries, err := f.actionEntries(o.candidates()...) if err != nil { return nil, err } - return f.wrapObject(o), err + for _, e := range entries { + if e.UpstreamFs().Features().Move == nil { + return nil, fs.ErrorCantMove + } + } + objs := make([]*upstream.Object, len(entries)) + errs := make([]error, len(entries)) + multithread(len(entries), func(i int){ + u := entries[i].UpstreamFs() + o, ok := entries[i].(*upstream.Object); + if !ok { + errs[i] = fs.ErrorNotAFile + return + } + mo, err := u.Features().Move(ctx, o, remote) + if err != nil || mo == nil { + errs[i] = err + return + } + objs[i] = u.WrapObject(mo) + }) + var en []upstream.Entry + for _, o := range objs { + if o != nil { + en = append(en, o) + } + } + e, err := f.wrapEntries(en...) + if err != nil { + return nil, err + } + for _, err := range errs { + if err != nil { + return e.(*Object), err + } + } + return e.(*Object), nil } // DirMove moves src, srcRemote to this remote at dstRemote @@ -165,12 +452,31 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, // // If destination exists then return fs.ErrorDirExists func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) error { - srcFs, ok := src.(*Fs) + _, ok := src.(*Fs) if !ok { - fs.Debugf(srcFs, "Can't move directory - not same remote type") + fs.Debugf(src, "Can't move directory - not same remote type") return fs.ErrorCantDirMove } - return f.wr.Features().DirMove(ctx, srcFs.wr, srcRemote, dstRemote) + upstreams, err := f.action(ctx, srcRemote) + if err != nil { + return err + } + for _, u := range upstreams { + if u.Features().DirMove == nil { + return fs.ErrorCantDirMove + } + } + errs := make([]error, len(upstreams)) + multithread(len(upstreams), func(i int){ + u := upstreams[i] + errs[i] = u.Features().DirMove(ctx, u, srcRemote, dstRemote) + }) + for _, err := range errs { + if err != nil { + return err + } + } + return nil } // ChangeNotify calls the passed function with a path @@ -183,23 +489,23 @@ func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string // regularly. When the channel gets closed, the implementation // should stop polling and release resources. func (f *Fs) ChangeNotify(ctx context.Context, fn func(string, fs.EntryType), ch <-chan time.Duration) { - var remoteChans []chan time.Duration + var uChans []chan time.Duration - for _, remote := range f.remotes { - if ChangeNotify := remote.Features().ChangeNotify; ChangeNotify != nil { + for _, u := range f.upstreams { + if ChangeNotify := u.Features().ChangeNotify; ChangeNotify != nil { ch := make(chan time.Duration) - remoteChans = append(remoteChans, ch) + uChans = append(uChans, ch) ChangeNotify(ctx, fn, ch) } } go func() { for i := range ch { - for _, c := range remoteChans { + for _, c := range uChans { c <- i } } - for _, c := range remoteChans { + for _, c := range uChans { close(c) } }() @@ -208,29 +514,81 @@ func (f *Fs) ChangeNotify(ctx context.Context, fn func(string, fs.EntryType), ch // DirCacheFlush resets the directory cache - used in testing // as an optional interface func (f *Fs) DirCacheFlush() { - for _, remote := range f.remotes { - if DirCacheFlush := remote.Features().DirCacheFlush; DirCacheFlush != nil { - DirCacheFlush() + multithread(len(f.upstreams), func(i int){ + do := f.upstreams[i].Features().DirCacheFlush; + if do != nil { + do() + } + }) +} + +func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, stream bool, options ...fs.OpenOption) (fs.Object, error) { + srcPath := src.Remote() + u, err := f.search(ctx, srcPath) + upstreams := []*upstream.Fs{u} + if err != nil || len(upstreams) == 0 { + upstreams, err = f.create(ctx, parentDir(srcPath)) + if err != nil { + return nil, err } } -} - -// PutStream uploads to the remote path with the modTime given of indeterminate size -// -// May create the object even if it returns an error - if so -// will return the object and the error, otherwise will return -// nil and the error -func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { - o, err := f.wr.Features().PutStream(ctx, in, src, options...) - if err != nil { - return nil, err + // Get mutliple reader + readers := make([]*io.PipeReader, len(upstreams)) + writers := make([]*io.PipeWriter, len(upstreams)) + errs := make([]error, len(upstreams) + 1) + for i := range upstreams { + r, w := io.Pipe() + readers[i], writers[i] = r, w } - return f.wrapObject(o), err -} - -// About gets quota information from the Fs -func (f *Fs) About(ctx context.Context) (*fs.Usage, error) { - return f.wr.Features().About(ctx) + go func() { + bufw := make([]io.Writer, len(writers)) + for i, w := range writers { + bufw[i] = bufio.NewWriter(w) + } + mw := io.MultiWriter(bufw...) + _, err := io.Copy(mw, in) + if err != nil { + errs[len(upstreams)] = err + } + for _, bw := range bufw { + bw.(*bufio.Writer).Flush() + } + }() + // Multi-threading + var wg sync.WaitGroup + objs := make([]upstream.Entry, len(upstreams)) + for i, u := range upstreams { + wg.Add(1) + i, u := i, u // Closure + go func() { + defer wg.Done() + var o fs.Object + var err error + if stream { + o, err = u.Features().PutStream(ctx, readers[i], src, options...) + } else { + o, err = u.Put(ctx, readers[i], src, options...) + } + if err != nil { + errs[i] = err + return + } + objs[i] = u.WrapObject(o) + }() + } + wg.Wait() + for _, w := range writers { + w.Close() + } + + // Get an object for future operation + e, _ := f.wrapEntries(objs...) + for _, err := range errs { + if err != nil { + return e.(*Object), err + } + } + return e.(*Object), nil } // Put in to the remote path with the modTime given of the given size @@ -239,11 +597,65 @@ func (f *Fs) About(ctx context.Context) (*fs.Usage, error) { // will return the object and the error, otherwise will return // nil and the error func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { - o, err := f.wr.Put(ctx, in, src, options...) - if err != nil { - return nil, err + return f.put(ctx, in, src, false, options...) +} + +// PutStream uploads to the remote path with the modTime given of indeterminate size +// +// May create the object even if it returns an error - if so +// will return the object and the error, otherwise will return +// nil and the error +func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + return f.put(ctx, in, src, true, options...) +} + +// About gets quota information from the Fs +func (f *Fs) About(ctx context.Context) (*fs.Usage, error) { + usage := &fs.Usage{ + Total: new(int64), + Used: new(int64), + Trashed: new(int64), + Other: new(int64), + Free: new(int64), + Objects: new(int64), } - return f.wrapObject(o), err + for _, u := range f.upstreams { + usg, err := u.About(ctx) + if err != nil { + return nil, err + } + if usg.Total != nil && usage.Total != nil { + *usage.Total += *usg.Total + } else { + usage.Total = nil + } + if usg.Used != nil && usage.Used != nil { + *usage.Used += *usg.Used + } else { + usage.Used = nil + } + if usg.Trashed != nil && usage.Trashed != nil { + *usage.Trashed += *usg.Trashed + } else { + usage.Trashed = nil + } + if usg.Other != nil && usage.Other != nil { + *usage.Other += *usg.Other + } else { + usage.Other = nil + } + if usg.Free != nil && usage.Free != nil { + *usage.Free += *usg.Free + } else { + usage.Free = nil + } + if usg.Objects != nil && usage.Objects != nil { + *usage.Objects += *usg.Objects + } else { + usage.Objects = nil + } + } + return usage, nil } // List the objects and directories in dir into entries. The @@ -256,60 +668,132 @@ func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options . // This should return ErrDirNotFound if the directory isn't // found. func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { - set := make(map[string]fs.DirEntry) - found := false - for _, remote := range f.remotes { - var remoteEntries, err = remote.List(ctx, dir) - if err == fs.ErrorDirNotFound { - continue + var found int32 + entriess := make([][]upstream.Entry, len(f.upstreams)) + errs := make([]error, len(f.upstreams)) + multithread(len(f.upstreams), func(i int){ + u := f.upstreams[i] + entries, err := u.List(ctx, dir) + if err != nil && err != fs.ErrorDirNotFound { + errs[i] = err + return } - if err != nil { - return nil, errors.Wrapf(err, "List failed on %v", remote) + atomic.StoreInt32(&found, 1) + uEntries := make([]upstream.Entry, len(entries)) + for j, e := range entries { + uEntries[j], _ = u.WrapEntry(e) } - found = true - for _, remoteEntry := range remoteEntries { - set[remoteEntry.Remote()] = remoteEntry - } - } - if !found { + entriess[i] = uEntries + }) + if found == 0 { return nil, fs.ErrorDirNotFound } - for _, entry := range set { - if o, ok := entry.(fs.Object); ok { - entry = f.wrapObject(o) + entries, err = f.mergeDirEntries(entriess) + if err != nil { + return entries, err + } + for _, err := range errs { + if err != nil { + return entries, err } - entries = append(entries, entry) } return entries, nil } -// NewObject creates a new remote union file object based on the first Object it finds (reverse remote order) -func (f *Fs) NewObject(ctx context.Context, path string) (fs.Object, error) { - for i := range f.remotes { - var remote = f.remotes[len(f.remotes)-i-1] - var obj, err = remote.NewObject(ctx, path) - if err == fs.ErrorObjectNotFound { - continue +// NewObject creates a new remote union file object +func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { + objs := make([]*upstream.Object, len(f.upstreams)) + errs := make([]error, len(f.upstreams)) + multithread(len(f.upstreams), func(i int){ + u := f.upstreams[i] + o, err := u.NewObject(ctx, remote) + if err != nil && err != fs.ErrorObjectNotFound { + errs[i] = err + return } - if err != nil { - return nil, errors.Wrapf(err, "NewObject failed on %v", remote) + objs[i] = u.WrapObject(o) + }) + var entries []upstream.Entry + for _, o := range objs { + if o != nil { + entries = append(entries, o) } - return f.wrapObject(obj), nil } - return nil, fs.ErrorObjectNotFound + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + e, err := f.wrapEntries(entries...) + if err != nil { + return nil, err + } + for _, err := range errs { + if err != nil { + return e.(*Object), err + } + } + return e.(*Object), nil } -// Precision is the greatest Precision of all remotes +// Precision is the greatest Precision of all upstreams func (f *Fs) Precision() time.Duration { var greatestPrecision time.Duration - for _, remote := range f.remotes { - if remote.Precision() > greatestPrecision { - greatestPrecision = remote.Precision() + for _, u := range f.upstreams { + if u.Precision() > greatestPrecision { + greatestPrecision = u.Precision() } } return greatestPrecision } +func (f *Fs) action(ctx context.Context, path string) ([]*upstream.Fs, error) { + return f.actionPolicy.Action(ctx, f.upstreams, path) +} + +func (f *Fs) actionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + return f.actionPolicy.ActionEntries(entries...) +} + +func (f *Fs) create(ctx context.Context, path string) ([]*upstream.Fs, error) { + return f.createPolicy.Create(ctx, f.upstreams, path) +} + +func (f *Fs) createEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + return f.createPolicy.CreateEntries(entries...) +} + +func (f *Fs) search(ctx context.Context, path string) (*upstream.Fs, error) { + return f.searchPolicy.Search(ctx, f.upstreams, path) +} + +func (f *Fs) searchEntries(entries ...upstream.Entry) (upstream.Entry, error) { + return f.searchPolicy.SearchEntries(entries...) +} + +func (f *Fs) mergeDirEntries(entriess [][]upstream.Entry) (fs.DirEntries, error) { + entryMap := make(map[string]([]upstream.Entry)) + for _, en := range entriess { + if en == nil { + continue + } + for _, entry := range en { + remote := entry.Remote() + if f.Features().CaseInsensitive { + remote = strings.ToLower(remote) + } + entryMap[remote] = append(entryMap[remote], entry) + } + } + var entries fs.DirEntries + for path := range entryMap { + e, err := f.wrapEntries(entryMap[path]...) + if err != nil { + return nil, err + } + entries = append(entries, e) + } + return entries, nil +} + // NewFs constructs an Fs from the path. // // The returned Fs is the actual Fs, referenced by remote in the config @@ -320,51 +804,46 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { if err != nil { return nil, err } - if len(opt.Remotes) == 0 { - return nil, errors.New("union can't point to an empty remote - check the value of the remotes setting") + if len(opt.Upstreams) == 0 { + return nil, errors.New("union can't point to an empty upstream - check the value of the upstreams setting") } - if len(opt.Remotes) == 1 { - return nil, errors.New("union can't point to a single remote - check the value of the remotes setting") + if len(opt.Upstreams) == 1 { + return nil, errors.New("union can't point to a single upstream - check the value of the upstreams setting") } - for _, remote := range opt.Remotes { - if strings.HasPrefix(remote, name+":") { - return nil, errors.New("can't point union remote at itself - check the value of the remote setting") + for _, u := range opt.Upstreams { + if strings.HasPrefix(u, name+":") { + return nil, errors.New("can't point union remote at itself - check the value of the upstreams setting") } } - var remotes []fs.Fs - for i := range opt.Remotes { + var upstreams []*upstream.Fs + for i := range opt.Upstreams { // Last remote first so we return the correct (last) matching fs in case of fs.ErrorIsFile - var remote = opt.Remotes[len(opt.Remotes)-i-1] - _, configName, fsPath, err := fs.ParseRemote(remote) + var u = opt.Upstreams[len(opt.Upstreams)-i-1] + uFs, err := upstream.New(u, root, time.Duration(opt.CacheTime) * time.Second) if err != nil { return nil, err } - var rootString = path.Join(fsPath, filepath.ToSlash(root)) - if configName != "local" { - rootString = configName + ":" + rootString - } - myFs, err := cache.Get(rootString) - if err != nil { - if err == fs.ErrorIsFile { - return myFs, err - } - return nil, err - } - remotes = append(remotes, myFs) - } - - // Reverse the remotes again so they are in the order as before - for i, j := 0, len(remotes)-1; i < j; i, j = i+1, j-1 { - remotes[i], remotes[j] = remotes[j], remotes[i] + upstreams = append(upstreams, uFs) } f := &Fs{ - name: name, - root: root, - opt: *opt, - remotes: remotes, - wr: remotes[len(remotes)-1], + name: name, + root: root, + opt: *opt, + upstreams: upstreams, + } + f.actionPolicy, err = policy.Get(opt.ActionPolicy) + if err != nil { + return nil, err + } + f.createPolicy, err = policy.Get(opt.CreatePolicy) + if err != nil { + return nil, err + } + f.searchPolicy, err = policy.Get(opt.SearchPolicy) + if err != nil { + return nil, err } var features = (&fs.Features{ CaseInsensitive: true, @@ -376,9 +855,14 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { SetTier: true, GetTier: true, }).Fill(f) - features = features.Mask(f.wr) // mask the features just on the writable fs + for _, f := range upstreams { + if !f.IsWritable() { + continue + } + features = features.Mask(f) // Mask all writable upstream fs + } - // Really need the union of all remotes for these, so + // Really need the union of all upstreams for these, so // re-instate and calculate separately. features.ChangeNotify = f.ChangeNotify features.DirCacheFlush = f.DirCacheFlush @@ -388,12 +872,12 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { // Clear ChangeNotify and DirCacheFlush if all are nil clearChangeNotify := true clearDirCacheFlush := true - for _, remote := range f.remotes { - remoteFeatures := remote.Features() - if remoteFeatures.ChangeNotify != nil { + for _, u := range f.upstreams { + uFeatures := u.Features() + if uFeatures.ChangeNotify != nil { clearChangeNotify = false } - if remoteFeatures.DirCacheFlush != nil { + if uFeatures.DirCacheFlush != nil { clearDirCacheFlush = false } } @@ -407,15 +891,36 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { f.features = features // Get common intersection of hashes - hashSet := f.remotes[0].Hashes() - for _, remote := range f.remotes[1:] { - hashSet = hashSet.Overlap(remote.Hashes()) + hashSet := f.upstreams[0].Hashes() + for _, u := range f.upstreams[1:] { + hashSet = hashSet.Overlap(u.Hashes()) } f.hashSet = hashSet return f, nil } +func parentDir(absPath string) string { + parent := path.Dir(strings.TrimRight(filepath.ToSlash(absPath), "/")) + if parent == "." { + parent = "" + } + return parent +} + +func multithread(num int, fn func(int)) { + var wg sync.WaitGroup + for i := 0; i < num; i++ { + wg.Add(1) + i := i + go func() { + defer wg.Done() + fn(i) + }() + } + wg.Wait() +} + // Check the interfaces are satisfied var ( _ fs.Fs = (*Fs)(nil) diff --git a/backend/union/upstream/upstream.go b/backend/union/upstream/upstream.go new file mode 100644 index 000000000..fa2cc2ab7 --- /dev/null +++ b/backend/union/upstream/upstream.go @@ -0,0 +1,311 @@ +package upstream + +import ( + "context" + "io" + "path" + "path/filepath" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/pkg/errors" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/cache" +) + +var ( + // ErrUsageFieldNotSupported stats the usage field is not supported by the backend + ErrUsageFieldNotSupported = errors.New("this usage field is not supported") +) + +const ( + unInitilized uint32 = iota + initilizing + normal + updating +) + +// Fs is a wrap of any fs and its configs +type Fs struct { + fs.Fs + writable bool + creatable bool + usage *fs.Usage // Cache the usage + cacheMutex sync.RWMutex + cacheExpiry int64 // usage cache expiry time + cacheTime time.Duration // cache duration + cacheState uint32 // if the cache is updating +} + +// Directory describes a wrapped Directory +// +// This is a wrapped Directory which contains the upstream Fs +type Directory struct { + fs.Directory + f *Fs +} + +// Object describes a wrapped Object +// +// This is a wrapped Object which contains the upstream Fs +type Object struct { + fs.Object + f *Fs +} + +// Entry describe a warpped fs.DirEntry interface with the +// information of upstream Fs +type Entry interface { + fs.DirEntry + UpstreamFs() *Fs +} + +// New creates a new Fs based on the +// string formatted `type:root_path(:ro/:nc)` +func New(remote, root string, cacheTime time.Duration) (*Fs, error) { + _, configName, fsPath, err := fs.ParseRemote(remote) + if err != nil { + return nil, err + } + rFs := &Fs{ + writable: true, + creatable: true, + cacheExpiry: time.Now().Unix(), + cacheTime: cacheTime, + usage: &fs.Usage{}, + } + if strings.HasSuffix(fsPath, ":ro") { + rFs.writable = false + rFs.creatable = false + fsPath = fsPath[0 : len(fsPath)-3] + } else if strings.HasSuffix(fsPath, ":nc") { + rFs.writable = true + rFs.creatable = false + fsPath = fsPath[0 : len(fsPath)-3] + } + var rootString = path.Join(fsPath, filepath.ToSlash(root)) + if configName != "local" { + rootString = configName + ":" + rootString + } + myFs, err := cache.Get(rootString) + if err != nil { + return nil, err + } + rFs.Fs = myFs + return rFs, nil +} + +// WrapDirectory wraps a fs.Directory to include the info +// of the upstream Fs +func (f *Fs) WrapDirectory(e fs.Directory) *Directory { + if e == nil { + return nil + } + return &Directory{ + Directory: e, + f: f, + } +} + +// WrapObject wraps a fs.Object to include the info +// of the upstream Fs +func (f *Fs) WrapObject(o fs.Object) *Object { + if o == nil { + return nil + } + return &Object{ + Object: o, + f: f, + } +} + +// WrapEntry wraps a fs.DirEntry to include the info +// of the upstream Fs +func (f *Fs) WrapEntry(e fs.DirEntry) (Entry, error) { + switch e.(type) { + case fs.Object: + return f.WrapObject(e.(fs.Object)), nil + case fs.Directory: + return f.WrapDirectory(e.(fs.Directory)), nil + default: + return nil, errors.Errorf("unknown object type %T", e) + } +} + +// UpstreamFs get the upstream Fs the entry is stored in +func (e *Directory) UpstreamFs() *Fs { + return e.f +} + +// UpstreamFs get the upstream Fs the entry is stored in +func (o *Object) UpstreamFs() *Fs { + return o.f +} + +// IsCreatable return if the fs is allowed to create new objects +func (f *Fs) IsCreatable() bool { + return f.creatable +} + +// IsWritable return if the fs is allowed to write +func (f *Fs) IsWritable() bool { + return f.writable +} + +// Put in to the remote path with the modTime given of the given size +// +// May create the object even if it returns an error - if so +// will return the object and the error, otherwise will return +// nil and the error +func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + o, err := f.Fs.Put(ctx, in, src, options...) + if err != nil { + return o, err + } + f.cacheMutex.Lock() + defer f.cacheMutex.Unlock() + size := src.Size() + if f.usage.Used != nil { + *f.usage.Used += size + } + if f.usage.Free != nil { + *f.usage.Free -= size + } + return o, nil +} + +// PutStream uploads to the remote path with the modTime given of indeterminate size +// +// May create the object even if it returns an error - if so +// will return the object and the error, otherwise will return +// nil and the error +func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + do := f.Features().PutStream + if do == nil { + return nil, fs.ErrorNotImplemented + } + o, err := do(ctx, in, src, options...) + if err != nil { + return o, err + } + f.cacheMutex.Lock() + defer f.cacheMutex.Unlock() + size := o.Size() + if f.usage.Used != nil { + *f.usage.Used += size + } + if f.usage.Free != nil { + *f.usage.Free -= size + } + return o, nil +} + +// Update in to the object with the modTime given of the given size +// +// When called from outside a Fs by rclone, src.Size() will always be >= 0. +// But for unknown-sized objects (indicated by src.Size() == -1), Upload should either +// return an error or update the object properly (rather than e.g. calling panic). +func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { + size := o.Size() + err := o.Object.Update(ctx, in, src, options...) + if err != nil { + return err + } + o.f.cacheMutex.Lock() + defer o.f.cacheMutex.Unlock() + delta := o.Size() - size + if delta <= 0 { + return nil + } + if o.f.usage.Used != nil { + *o.f.usage.Used += size + } + if o.f.usage.Free != nil { + *o.f.usage.Free -= size + } + return nil +} + +// About gets quota information from the Fs +func (f *Fs) About(ctx context.Context) (*fs.Usage, error) { + if atomic.LoadInt64(&f.cacheExpiry) <= time.Now().Unix() { + err := f.updateUsage() + if err != nil { + return nil, ErrUsageFieldNotSupported + } + } + f.cacheMutex.RLock() + defer f.cacheMutex.RUnlock() + return f.usage, nil +} + +// GetFreeSpace get the free space of the fs +func (f *Fs) GetFreeSpace() (int64, error) { + if atomic.LoadInt64(&f.cacheExpiry) <= time.Now().Unix() { + err := f.updateUsage() + if err != nil { + return 0, ErrUsageFieldNotSupported + } + } + f.cacheMutex.RLock() + defer f.cacheMutex.RUnlock() + if f.usage.Free == nil { + return 0, ErrUsageFieldNotSupported + } + return *f.usage.Free, nil +} + +// GetUsedSpace get the used space of the fs +func (f *Fs) GetUsedSpace() (int64, error) { + if atomic.LoadInt64(&f.cacheExpiry) <= time.Now().Unix() { + err := f.updateUsage() + if err != nil { + return 0, ErrUsageFieldNotSupported + } + } + f.cacheMutex.RLock() + defer f.cacheMutex.RUnlock() + if f.usage.Used == nil { + return 0, ErrUsageFieldNotSupported + } + return *f.usage.Used, nil +} + +func (f *Fs) updateUsage() error { + if do := f.Fs.Features().About; do == nil { + return ErrUsageFieldNotSupported + } + if atomic.LoadUint32(&f.cacheState) == unInitilized { + f.cacheMutex.Lock() + defer f.cacheMutex.Unlock() + if !atomic.CompareAndSwapUint32(&f.cacheState, unInitilized, initilizing) { + return f.updateUsage() + } + return f.updateUsageCore(false) + } + if atomic.CompareAndSwapUint32(&f.cacheState, normal, updating) { + go f.updateUsageCore(true) + } + return nil +} + +func (f *Fs) updateUsageCore(lock bool) error { + defer func() { + atomic.StoreInt64(&f.cacheExpiry, time.Now().Add(f.cacheTime).Unix()) + atomic.StoreUint32(&f.cacheState, normal) + }() + if (lock) { + f.cacheMutex.Lock() + defer f.cacheMutex.Unlock() + } + // Run in background, should not be cancelled by user + ctx, _ := context.WithTimeout(context.Background(), 15*time.Second) + usage, err := f.Features().About(ctx) + if err != nil { + return err + } + f.usage = usage + return nil +} \ No newline at end of file