From 37e21f767c4a873c56fd0f9d01b5c16fa14919f4 Mon Sep 17 00:00:00 2001 From: Max Sum Date: Sun, 1 Dec 2019 00:49:53 +0800 Subject: [PATCH] union: implement new policies Implement eplfs, eplus, eprand, lfs, lus, newest and rand. --- backend/union/policy/epall.go | 2 +- backend/union/policy/epff.go | 2 +- backend/union/policy/eplfs.go | 114 ++++++++++++++++++++++++++ backend/union/policy/eplus.go | 114 ++++++++++++++++++++++++++ backend/union/policy/eprand.go | 86 +++++++++++++++++++ backend/union/policy/lfs.go | 33 ++++++++ backend/union/policy/lus.go | 33 ++++++++ backend/union/policy/newest.go | 145 +++++++++++++++++++++++++++++++++ backend/union/policy/policy.go | 12 ++- backend/union/policy/rand.go | 86 +++++++++++++++++++ 10 files changed, 621 insertions(+), 6 deletions(-) create mode 100644 backend/union/policy/eplfs.go create mode 100644 backend/union/policy/eplus.go create mode 100644 backend/union/policy/eprand.go create mode 100644 backend/union/policy/lfs.go create mode 100644 backend/union/policy/lus.go create mode 100644 backend/union/policy/newest.go create mode 100644 backend/union/policy/rand.go diff --git a/backend/union/policy/epall.go b/backend/union/policy/epall.go index dc493151c..03efc40e7 100644 --- a/backend/union/policy/epall.go +++ b/backend/union/policy/epall.go @@ -27,7 +27,7 @@ func (p *EpAll) epall(ctx context.Context, upstreams []*upstream.Fs, path string wg.Add(1) i, u := i, u // Closure go func() { - if exists(ctx, u, path) { + if findEntry(ctx, u, path) != nil { ufs[i] = u } wg.Done() diff --git a/backend/union/policy/epff.go b/backend/union/policy/epff.go index 16589fe81..2efc424f3 100644 --- a/backend/union/policy/epff.go +++ b/backend/union/policy/epff.go @@ -20,7 +20,7 @@ func (p *EpFF) epff(ctx context.Context, upstreams []*upstream.Fs, path string) for _, u := range upstreams { u := u // Closure go func() { - if !exists(ctx, u, path) { + if findEntry(ctx, u, path) == nil { u = nil } ch <- u diff --git a/backend/union/policy/eplfs.go b/backend/union/policy/eplfs.go new file mode 100644 index 000000000..6776fd8bf --- /dev/null +++ b/backend/union/policy/eplfs.go @@ -0,0 +1,114 @@ +package policy + +import ( + "context" + "math" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/backend/union/upstream" +) + +func init(){ + registerPolicy("eplfs", &EpLfs{}) +} + +// EpLfs stands for existing path, least free space +// Of all the candidates on which the path exists choose the one with the least free space. +type EpLfs struct { + EpAll +} + +func (p *EpLfs) lfs(upstreams []*upstream.Fs) (*upstream.Fs, error) { + var minFreeSpace int64 = math.MaxInt64 + var lfsupstream *upstream.Fs + for _, u := range upstreams { + space, err := u.GetFreeSpace() + if err != nil { + return nil, err + } + if space < minFreeSpace { + minFreeSpace = space + lfsupstream = u + } + } + if lfsupstream == nil { + return nil, fs.ErrorObjectNotFound + } + return lfsupstream, nil +} + +func (p *EpLfs) lfsEntries(entries []upstream.Entry) (upstream.Entry, error) { + var minFreeSpace int64 + var lfsEntry upstream.Entry + for _, e := range entries { + space, err := e.UpstreamFs().GetFreeSpace() + if err != nil { + return nil, err + } + if space < minFreeSpace { + minFreeSpace = space + lfsEntry = e + } + } + return lfsEntry, nil +} + +// Action category policy, governing the modification of files and directories +func (p *EpLfs) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + upstreams, err := p.EpAll.Action(ctx, upstreams, path) + if err != nil { + return nil, err + } + u, err := p.lfs(upstreams) + return []*upstream.Fs{u}, err +} + +// ActionEntries is ACTION category policy but receving a set of candidate entries +func (p *EpLfs) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + entries, err := p.EpAll.ActionEntries(entries...) + if err != nil { + return nil, err + } + e, err := p.lfsEntries(entries) + return []upstream.Entry{e}, err +} + +// Create category policy, governing the creation of files and directories +func (p *EpLfs) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + upstreams, err := p.EpAll.Create(ctx, upstreams, path) + if err != nil { + return nil, err + } + u, err := p.lfs(upstreams) + return []*upstream.Fs{u}, err +} + +// CreateEntries is CREATE category policy but receving a set of candidate entries +func (p *EpLfs) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + entries, err := p.EpAll.CreateEntries(entries...) + if err != nil { + return nil, err + } + e, err := p.lfsEntries(entries) + return []upstream.Entry{e}, err +} + +// Search category policy, governing the access to files and directories +func (p *EpLfs) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams, err := p.epall(ctx, upstreams, path) + if err != nil { + return nil, err + } + return p.lfs(upstreams) +} + +// SearchEntries is SEARCH category policy but receving a set of candidate entries +func (p *EpLfs) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + return p.lfsEntries(entries) +} \ No newline at end of file diff --git a/backend/union/policy/eplus.go b/backend/union/policy/eplus.go new file mode 100644 index 000000000..eca166940 --- /dev/null +++ b/backend/union/policy/eplus.go @@ -0,0 +1,114 @@ +package policy + +import ( + "context" + "math" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/backend/union/upstream" +) + +func init(){ + registerPolicy("eplus", &EpLus{}) +} + +// EpLus stands for existing path, least used space +// Of all the candidates on which the path exists choose the one with the least used space. +type EpLus struct { + EpAll +} + +func (p *EpLus) lus(upstreams []*upstream.Fs) (*upstream.Fs, error) { + var minUsedSpace int64 = math.MaxInt64 + var lusupstream *upstream.Fs + for _, u := range upstreams { + space, err := u.GetUsedSpace() + if err != nil { + return nil, err + } + if space < minUsedSpace { + minUsedSpace = space + lusupstream = u + } + } + if lusupstream == nil { + return nil, fs.ErrorObjectNotFound + } + return lusupstream, nil +} + +func (p *EpLus) lusEntries(entries []upstream.Entry) (upstream.Entry, error) { + var minUsedSpace int64 + var lusEntry upstream.Entry + for _, e := range entries { + space, err := e.UpstreamFs().GetFreeSpace() + if err != nil { + return nil, err + } + if space < minUsedSpace { + minUsedSpace = space + lusEntry = e + } + } + return lusEntry, nil +} + +// Action category policy, governing the modification of files and directories +func (p *EpLus) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + upstreams, err := p.EpAll.Action(ctx, upstreams, path) + if err != nil { + return nil, err + } + u, err := p.lus(upstreams) + return []*upstream.Fs{u}, err +} + +// ActionEntries is ACTION category policy but receving a set of candidate entries +func (p *EpLus) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + entries, err := p.EpAll.ActionEntries(entries...) + if err != nil { + return nil, err + } + e, err := p.lusEntries(entries) + return []upstream.Entry{e}, err +} + +// Create category policy, governing the creation of files and directories +func (p *EpLus) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + upstreams, err := p.EpAll.Create(ctx, upstreams, path) + if err != nil { + return nil, err + } + u, err := p.lus(upstreams) + return []*upstream.Fs{u}, err +} + +// CreateEntries is CREATE category policy but receving a set of candidate entries +func (p *EpLus) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + entries, err := p.EpAll.CreateEntries(entries...) + if err != nil { + return nil, err + } + e, err := p.lusEntries(entries) + return []upstream.Entry{e}, err +} + +// Search category policy, governing the access to files and directories +func (p *EpLus) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams, err := p.epall(ctx, upstreams, path) + if err != nil { + return nil, err + } + return p.lus(upstreams) +} + +// SearchEntries is SEARCH category policy but receving a set of candidate entries +func (p *EpLus) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + return p.lusEntries(entries) +} \ No newline at end of file diff --git a/backend/union/policy/eprand.go b/backend/union/policy/eprand.go new file mode 100644 index 000000000..efecbb8f5 --- /dev/null +++ b/backend/union/policy/eprand.go @@ -0,0 +1,86 @@ +package policy + +import ( + "context" + "math/rand" + "time" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/backend/union/upstream" +) + +func init(){ + registerPolicy("eprand", &EpRand{}) +} + +// EpRand stands for existing path, random +// Calls epall and then randomizes. Returns one candidate. +type EpRand struct { + EpAll +} + +func (p *EpRand) rand(upstreams []*upstream.Fs) *upstream.Fs { + rand.Seed(time.Now().Unix()) + return upstreams[rand.Intn(len(upstreams))] +} + +func (p *EpRand) randEntries(entries []upstream.Entry) upstream.Entry { + rand.Seed(time.Now().Unix()) + return entries[rand.Intn(len(entries))] +} + +// Action category policy, governing the modification of files and directories +func (p *EpRand) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + upstreams, err := p.EpAll.Action(ctx, upstreams, path) + if err != nil { + return nil, err + } + return []*upstream.Fs{p.rand(upstreams)}, nil +} + +// ActionEntries is ACTION category policy but receving a set of candidate entries +func (p *EpRand) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + entries, err := p.EpAll.ActionEntries(entries...) + if err != nil { + return nil, err + } + return []upstream.Entry{p.randEntries(entries)}, nil +} + +// Create category policy, governing the creation of files and directories +func (p *EpRand) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + upstreams, err := p.EpAll.Create(ctx, upstreams, path) + if err != nil { + return nil, err + } + return []*upstream.Fs{p.rand(upstreams)}, nil +} + +// CreateEntries is CREATE category policy but receving a set of candidate entries +func (p *EpRand) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + entries, err := p.EpAll.CreateEntries(entries...) + if err != nil { + return nil, err + } + return []upstream.Entry{p.randEntries(entries)}, nil +} + +// Search category policy, governing the access to files and directories +func (p *EpRand) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams, err := p.epall(ctx, upstreams, path) + if err != nil { + return nil, err + } + return p.rand(upstreams), nil +} + +// SearchEntries is SEARCH category policy but receving a set of candidate entries +func (p *EpRand) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + return p.randEntries(entries), nil +} \ No newline at end of file diff --git a/backend/union/policy/lfs.go b/backend/union/policy/lfs.go new file mode 100644 index 000000000..7e5cc99c2 --- /dev/null +++ b/backend/union/policy/lfs.go @@ -0,0 +1,33 @@ +package policy + +import ( + "context" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/backend/union/upstream" +) + +func init(){ + registerPolicy("lfs", &Lfs{}) +} + +// Lfs stands for least free space +// Search category: same as eplfs. +// Action category: same as eplfs. +// Create category: Pick the drive with the least free space. +type Lfs struct { + EpLfs +} + +// Create category policy, governing the creation of files and directories +func (p *Lfs) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams = filterNC(upstreams) + if len(upstreams) == 0 { + return nil, fs.ErrorPermissionDenied + } + u, err := p.lfs(upstreams) + return []*upstream.Fs{u}, err +} diff --git a/backend/union/policy/lus.go b/backend/union/policy/lus.go new file mode 100644 index 000000000..edb6cef48 --- /dev/null +++ b/backend/union/policy/lus.go @@ -0,0 +1,33 @@ +package policy + +import ( + "context" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/backend/union/upstream" +) + +func init(){ + registerPolicy("lus", &Lus{}) +} + +// Lus stands for most free space +// Search category: same as eplus. +// Action category: same as eplus. +// Create category: Pick the drive with the least used space. +type Lus struct { + EpLus +} + +// Create category policy, governing the creation of files and directories +func (p *Lus) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams = filterNC(upstreams) + if len(upstreams) == 0 { + return nil, fs.ErrorPermissionDenied + } + u, err := p.lus(upstreams) + return []*upstream.Fs{u}, err +} diff --git a/backend/union/policy/newest.go b/backend/union/policy/newest.go new file mode 100644 index 000000000..bb5425def --- /dev/null +++ b/backend/union/policy/newest.go @@ -0,0 +1,145 @@ +package policy + +import ( + "context" + "sync" + "time" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/backend/union/upstream" +) + +func init(){ + registerPolicy("newest", &Newest{}) +} + +// Newest policy picks the file / directory with the largest mtime +// It implies the existance of a path +type Newest struct { + EpAll +} + +func (p *Newest) newest(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) { + var wg sync.WaitGroup + ufs := make([]*upstream.Fs, len(upstreams)) + mtimes := make([]time.Time, len(upstreams)) + for i, u := range upstreams { + wg.Add(1) + i, u := i, u // Closure + go func() { + defer wg.Done() + if e := findEntry(ctx, u, path); e != nil { + ufs[i] = u + mtimes[i] = e.ModTime(ctx) + } + }() + } + wg.Wait() + maxMtime := time.Time{} + var newestFs *upstream.Fs + for i, u := range ufs { + if u != nil && mtimes[i].After(maxMtime) { + maxMtime = mtimes[i] + newestFs = u + } + } + if newestFs == nil { + return nil, fs.ErrorObjectNotFound + } + return newestFs, nil +} + +func (p *Newest) newestEntries(entries []upstream.Entry) (upstream.Entry, error) { + var wg sync.WaitGroup + mtimes := make([]time.Time, len(entries)) + ctx, _ := context.WithTimeout(context.Background(), 5 * time.Second) + for i, e := range entries { + wg.Add(1) + i, e := i, e // Closure + go func() { + defer wg.Done() + mtimes[i] = e.ModTime(ctx) + }() + } + wg.Wait() + maxMtime := time.Time{} + var newestEntry upstream.Entry + for i, t := range mtimes { + if t.After(maxMtime) { + maxMtime = t + newestEntry = entries[i] + } + } + if newestEntry == nil { + return nil, fs.ErrorObjectNotFound + } + return newestEntry, nil +} + +// Action category policy, governing the modification of files and directories +func (p *Newest) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams = filterRO(upstreams) + if len(upstreams) == 0 { + return nil, fs.ErrorPermissionDenied + } + u, err := p.newest(ctx, upstreams, path) + return []*upstream.Fs{u}, err +} + +// ActionEntries is ACTION category policy but receving a set of candidate entries +func (p *Newest) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + entries = filterROEntries(entries) + if len(entries) == 0 { + return nil, fs.ErrorPermissionDenied + } + e, err := p.newestEntries(entries) + return []upstream.Entry{e}, err +} + +// Create category policy, governing the creation of files and directories +func (p *Newest) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams = filterNC(upstreams) + if len(upstreams) == 0 { + return nil, fs.ErrorPermissionDenied + } + u, err := p.newest(ctx, upstreams, path) + return []*upstream.Fs{u}, err +} + +// CreateEntries is CREATE category policy but receving a set of candidate entries +func (p *Newest) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + entries = filterNCEntries(entries) + if len(entries) == 0 { + return nil, fs.ErrorPermissionDenied + } + e, err := p.newestEntries(entries) + return []upstream.Entry{e}, err +} + +// Search category policy, governing the access to files and directories +func (p *Newest) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + return p.newest(ctx, upstreams, path) +} + +// SearchEntries is SEARCH category policy but receving a set of candidate entries +func (p *Newest) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + return p.newestEntries(entries) +} \ No newline at end of file diff --git a/backend/union/policy/policy.go b/backend/union/policy/policy.go index 91651d497..f5ac2496f 100644 --- a/backend/union/policy/policy.go +++ b/backend/union/policy/policy.go @@ -3,8 +3,10 @@ package policy import ( "context" "strings" + "math/rand" "path" "path/filepath" + "time" "github.com/pkg/errors" "github.com/rclone/rclone/fs" @@ -100,11 +102,13 @@ func clean(absPath string) string { return cleanPath } -func exists(ctx context.Context, f fs.Fs, remote string) bool { +func findEntry(ctx context.Context, f fs.Fs, remote string) fs.DirEntry { remote = clean(remote) dir := parentDir(remote) if remote == dir { - return true + // random modtime for root + randomNow := time.Unix(time.Now().Unix() - rand.Int63n(10000), 0) + return fs.NewDir("", randomNow) } found := false entries, _ := f.List(ctx, dir); @@ -116,8 +120,8 @@ func exists(ctx context.Context, f fs.Fs, remote string) bool { found = (remote == eRemote) } if found { - break + return e } } - return found + return nil } diff --git a/backend/union/policy/rand.go b/backend/union/policy/rand.go new file mode 100644 index 000000000..c2b0f2a88 --- /dev/null +++ b/backend/union/policy/rand.go @@ -0,0 +1,86 @@ +package policy + +import ( + "context" + "math/rand" + "time" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/backend/union/upstream" +) + +func init(){ + registerPolicy("rand", &Rand{}) +} + +// Rand stands for random +// Calls all and then randomizes. Returns one candidate. +type Rand struct { + All +} + +func (p *Rand) rand(upstreams []*upstream.Fs) *upstream.Fs { + rand.Seed(time.Now().Unix()) + return upstreams[rand.Intn(len(upstreams))] +} + +func (p *Rand) randEntries(entries []upstream.Entry) upstream.Entry { + rand.Seed(time.Now().Unix()) + return entries[rand.Intn(len(entries))] +} + +// Action category policy, governing the modification of files and directories +func (p *Rand) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + upstreams, err := p.All.Action(ctx, upstreams, path) + if err != nil { + return nil, err + } + return []*upstream.Fs{p.rand(upstreams)}, nil +} + +// ActionEntries is ACTION category policy but receving a set of candidate entries +func (p *Rand) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + entries, err := p.All.ActionEntries(entries...) + if err != nil { + return nil, err + } + return []upstream.Entry{p.randEntries(entries)}, nil +} + +// Create category policy, governing the creation of files and directories +func (p *Rand) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + upstreams, err := p.All.Create(ctx, upstreams, path) + if err != nil { + return nil, err + } + return []*upstream.Fs{p.rand(upstreams)}, nil +} + +// CreateEntries is CREATE category policy but receving a set of candidate entries +func (p *Rand) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + entries, err := p.All.CreateEntries(entries...) + if err != nil { + return nil, err + } + return []upstream.Entry{p.randEntries(entries)}, nil +} + +// Search category policy, governing the access to files and directories +func (p *Rand) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams, err := p.epall(ctx, upstreams, path) + if err != nil { + return nil, err + } + return p.rand(upstreams), nil +} + +// SearchEntries is SEARCH category policy but receving a set of candidate entries +func (p *Rand) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + return p.randEntries(entries), nil +} \ No newline at end of file