mirror of
https://github.com/rclone/rclone.git
synced 2025-01-05 05:49:33 +01:00
sync: implement --list-cutoff to allow on disk sorting for reduced memory use
Before this change, rclone had to load an entire directory into RAM in order to sort it so it could be synced. With directories with millions of entries, this used too much memory. This fixes the probem by using an on disk sort when there are more than --list-cutoff entries in a directory. Fixes #7974
This commit is contained in:
parent
d523f00d31
commit
59a5530ce7
@ -1442,6 +1442,19 @@ backends and the VFS. There are individual flags for just enabling it
|
|||||||
for the VFS `--vfs-links` and the local backend `--local-links` if
|
for the VFS `--vfs-links` and the local backend `--local-links` if
|
||||||
required.
|
required.
|
||||||
|
|
||||||
|
### --list-cutoff N {#list-cutoff}
|
||||||
|
|
||||||
|
When syncing rclone needs to sort directory entries before comparing
|
||||||
|
them. Below this threshold (1,000,000) by default, rclone will store
|
||||||
|
the directory entries in memory. 1,000,000 entries will take approx
|
||||||
|
1GB of RAM to store. Above this threshold rclone will store directory
|
||||||
|
entries on disk and sort them without using a lot of memory.
|
||||||
|
|
||||||
|
Doing this is slightly less efficient then sorting them in memory and
|
||||||
|
will only work well for the bucket based backends (eg s3, b2,
|
||||||
|
azureblob, swift) but these are the only backends likely to have
|
||||||
|
millions of entries in a directory.
|
||||||
|
|
||||||
### --log-file=FILE ###
|
### --log-file=FILE ###
|
||||||
|
|
||||||
Log all of rclone's output to FILE. This is not active by default.
|
Log all of rclone's output to FILE. This is not active by default.
|
||||||
|
@ -233,12 +233,18 @@ value, say `export GOGC=20`. This will make the garbage collector
|
|||||||
work harder, reducing memory size at the expense of CPU usage.
|
work harder, reducing memory size at the expense of CPU usage.
|
||||||
|
|
||||||
The most common cause of rclone using lots of memory is a single
|
The most common cause of rclone using lots of memory is a single
|
||||||
directory with millions of files in. Rclone has to load this entirely
|
directory with millions of files in.
|
||||||
into memory as rclone objects. Each rclone object takes 0.5k-1k of
|
|
||||||
memory. There is
|
Before rclone v1.69 has to load this entirely into memory as rclone
|
||||||
|
objects. Each rclone object takes 0.5k-1k of memory. There is
|
||||||
[a workaround for this](https://github.com/rclone/rclone/wiki/Big-syncs-with-millions-of-files)
|
[a workaround for this](https://github.com/rclone/rclone/wiki/Big-syncs-with-millions-of-files)
|
||||||
which involves a bit of scripting.
|
which involves a bit of scripting.
|
||||||
|
|
||||||
|
However with rclone v1.69 and later rclone will automatically save
|
||||||
|
directory entries to disk when a directory with more than
|
||||||
|
[`--list-cutoff`](/docs/#list-cutoff) (1,000,000 by default) entries
|
||||||
|
is detected.
|
||||||
|
|
||||||
### Rclone changes fullwidth Unicode punctuation marks in file names
|
### Rclone changes fullwidth Unicode punctuation marks in file names
|
||||||
|
|
||||||
For example: On a Windows system, you have a file with name `Test:1.jpg`,
|
For example: On a Windows system, you have a file with name `Test:1.jpg`,
|
||||||
|
@ -277,6 +277,11 @@ var ConfigOptionsInfo = Options{{
|
|||||||
Default: false,
|
Default: false,
|
||||||
Help: "Use recursive list if available; uses more memory but fewer transactions",
|
Help: "Use recursive list if available; uses more memory but fewer transactions",
|
||||||
Groups: "Listing",
|
Groups: "Listing",
|
||||||
|
}, {
|
||||||
|
Name: "list_cutoff",
|
||||||
|
Default: 1_000_000,
|
||||||
|
Help: "To save memory, sort directory listings on disk above this threshold",
|
||||||
|
Groups: "Sync",
|
||||||
}, {
|
}, {
|
||||||
Name: "tpslimit",
|
Name: "tpslimit",
|
||||||
Default: 0.0,
|
Default: 0.0,
|
||||||
@ -585,6 +590,7 @@ type ConfigInfo struct {
|
|||||||
Suffix string `config:"suffix"`
|
Suffix string `config:"suffix"`
|
||||||
SuffixKeepExtension bool `config:"suffix_keep_extension"`
|
SuffixKeepExtension bool `config:"suffix_keep_extension"`
|
||||||
UseListR bool `config:"fast_list"`
|
UseListR bool `config:"fast_list"`
|
||||||
|
ListCutoff int `config:"list_cutoff"`
|
||||||
BufferSize SizeSuffix `config:"buffer_size"`
|
BufferSize SizeSuffix `config:"buffer_size"`
|
||||||
BwLimit BwTimetable `config:"bwlimit"`
|
BwLimit BwTimetable `config:"bwlimit"`
|
||||||
BwLimitFile BwTimetable `config:"bwlimit_file"`
|
BwLimitFile BwTimetable `config:"bwlimit_file"`
|
||||||
|
@ -64,7 +64,7 @@ func DirSortedFn(ctx context.Context, f fs.Fs, includeAll bool, dir string, call
|
|||||||
fi := filter.GetConfig(ctx)
|
fi := filter.GetConfig(ctx)
|
||||||
|
|
||||||
// Sort the entries, in or out of memory
|
// Sort the entries, in or out of memory
|
||||||
sorter, err := NewSorter(ctx, callback, keyFn)
|
sorter, err := NewSorter(ctx, f, callback, keyFn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to create directory sorter: %w", err)
|
return fmt.Errorf("failed to create directory sorter: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -3,16 +3,30 @@ package list
|
|||||||
import (
|
import (
|
||||||
"cmp"
|
"cmp"
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
"slices"
|
"slices"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/lanrat/extsort"
|
||||||
"github.com/rclone/rclone/fs"
|
"github.com/rclone/rclone/fs"
|
||||||
|
"github.com/rclone/rclone/lib/errcount"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// NewObjecter is the minimum facilities we need from the fs.Fs passed into NewSorter.
|
||||||
|
type NewObjecter interface {
|
||||||
|
// NewObject finds the Object at remote. If it can't be found
|
||||||
|
// it returns the error ErrorObjectNotFound.
|
||||||
|
NewObject(ctx context.Context, remote string) (fs.Object, error)
|
||||||
|
}
|
||||||
|
|
||||||
// Sorter implements an efficient mechanism for sorting list entries.
|
// Sorter implements an efficient mechanism for sorting list entries.
|
||||||
//
|
//
|
||||||
// If there are a large number of entries, this may be done on disk
|
// If there are a large number of entries (above `--list-cutoff`),
|
||||||
// instead of in memory.
|
// this may be done on disk instead of in memory.
|
||||||
//
|
//
|
||||||
// Supply entries with the Add method, call Send at the end to deliver
|
// Supply entries with the Add method, call Send at the end to deliver
|
||||||
// the sorted entries and finalise with CleanUp regardless of whether
|
// the sorted entries and finalise with CleanUp regardless of whether
|
||||||
@ -21,11 +35,21 @@ import (
|
|||||||
// Sorted entries are delivered to the callback supplied to NewSorter
|
// Sorted entries are delivered to the callback supplied to NewSorter
|
||||||
// when the Send method is called.
|
// when the Send method is called.
|
||||||
type Sorter struct {
|
type Sorter struct {
|
||||||
ctx context.Context
|
ctx context.Context // context for everything
|
||||||
mu sync.Mutex
|
ci *fs.ConfigInfo // config we are using
|
||||||
callback fs.ListRCallback
|
cancel func() // cancel all background operations
|
||||||
entries fs.DirEntries
|
mu sync.Mutex // protect the below
|
||||||
keyFn KeyFn
|
f NewObjecter // fs that we are listing
|
||||||
|
callback fs.ListRCallback // where to send the sorted entries to
|
||||||
|
entries fs.DirEntries // accumulated entries
|
||||||
|
keyFn KeyFn // transform an entry into a sort key
|
||||||
|
cutoff int // number of entries above which we start extsort
|
||||||
|
extSort bool // true if we are ext sorting
|
||||||
|
inputChan chan string // for sending data to the ext sort
|
||||||
|
outputChan chan string // for receiving data from the ext sort
|
||||||
|
errChan chan error // for getting errors from the ext sort
|
||||||
|
sorter *extsort.StringSorter // external string sort
|
||||||
|
errs *errcount.ErrCount // accumulate errors
|
||||||
}
|
}
|
||||||
|
|
||||||
// KeyFn turns an entry into a sort key
|
// KeyFn turns an entry into a sort key
|
||||||
@ -39,17 +63,109 @@ func identityKeyFn(entry fs.DirEntry) string {
|
|||||||
// NewSorter creates a new Sorter with callback for sorted entries to
|
// NewSorter creates a new Sorter with callback for sorted entries to
|
||||||
// be delivered to. keyFn is used to process each entry to get a key
|
// be delivered to. keyFn is used to process each entry to get a key
|
||||||
// function, if nil then it will just use entry.Remote()
|
// function, if nil then it will just use entry.Remote()
|
||||||
func NewSorter(ctx context.Context, callback fs.ListRCallback, keyFn KeyFn) (*Sorter, error) {
|
func NewSorter(ctx context.Context, f NewObjecter, callback fs.ListRCallback, keyFn KeyFn) (*Sorter, error) {
|
||||||
|
ci := fs.GetConfig(ctx)
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
if keyFn == nil {
|
if keyFn == nil {
|
||||||
keyFn = identityKeyFn
|
keyFn = identityKeyFn
|
||||||
}
|
}
|
||||||
return &Sorter{
|
return &Sorter{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
|
ci: ci,
|
||||||
|
cancel: cancel,
|
||||||
|
f: f,
|
||||||
callback: callback,
|
callback: callback,
|
||||||
keyFn: keyFn,
|
keyFn: keyFn,
|
||||||
|
cutoff: ci.ListCutoff,
|
||||||
|
errs: errcount.New(),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Turn a directory entry into a combined key and data for extsort
|
||||||
|
func (ls *Sorter) entryToKey(entry fs.DirEntry) string {
|
||||||
|
// To start with we just use the Remote to recover the object
|
||||||
|
// To make more efficient we would serialize the object here
|
||||||
|
remote := entry.Remote()
|
||||||
|
remote = strings.TrimRight(remote, "/")
|
||||||
|
if _, isDir := entry.(fs.Directory); isDir {
|
||||||
|
remote += "/"
|
||||||
|
}
|
||||||
|
key := ls.keyFn(entry) + "\x00" + remote
|
||||||
|
return key
|
||||||
|
}
|
||||||
|
|
||||||
|
// Turn an exsort key back into a directory entry
|
||||||
|
func (ls *Sorter) keyToEntry(ctx context.Context, key string) (entry fs.DirEntry, err error) {
|
||||||
|
null := strings.IndexRune(key, '\x00')
|
||||||
|
if null < 0 {
|
||||||
|
return nil, errors.New("sorter: failed to deserialize: missing null")
|
||||||
|
}
|
||||||
|
remote := key[null+1:]
|
||||||
|
if remote, isDir := strings.CutSuffix(remote, "/"); isDir {
|
||||||
|
// Is a directory
|
||||||
|
//
|
||||||
|
// Note this creates a very minimal directory entry which should be fine for the
|
||||||
|
// bucket based remotes this code will be run on.
|
||||||
|
entry = fs.NewDir(remote, time.Time{})
|
||||||
|
} else {
|
||||||
|
obj, err := ls.f.NewObject(ctx, remote)
|
||||||
|
if err != nil {
|
||||||
|
fs.Errorf(ls.f, "sorter: failed to re-create object %q: %v", remote, err)
|
||||||
|
return nil, fmt.Errorf("sorter: failed to re-create object: %w", err)
|
||||||
|
}
|
||||||
|
entry = obj
|
||||||
|
}
|
||||||
|
return entry, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ls *Sorter) sendEntriesToExtSort(entries fs.DirEntries) (err error) {
|
||||||
|
for _, entry := range entries {
|
||||||
|
select {
|
||||||
|
case ls.inputChan <- ls.entryToKey(entry):
|
||||||
|
case err = <-ls.errChan:
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case err = <-ls.errChan:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ls *Sorter) startExtSort() (err error) {
|
||||||
|
fs.Logf(ls.f, "Switching to on disk sorting as more than %d entries in one directory detected", ls.cutoff)
|
||||||
|
ls.inputChan = make(chan string, 100)
|
||||||
|
// Options to control the extsort
|
||||||
|
opt := extsort.Config{
|
||||||
|
NumWorkers: 8, // small effect
|
||||||
|
ChanBuffSize: 1024, // small effect
|
||||||
|
SortedChanBuffSize: 1024, // makes a lot of difference
|
||||||
|
ChunkSize: 32 * 1024, // tuned for 50 char records (UUID sized)
|
||||||
|
// Defaults
|
||||||
|
// ChunkSize: int(1e6), // amount of records to store in each chunk which will be written to disk
|
||||||
|
// NumWorkers: 2, // maximum number of workers to use for parallel sorting
|
||||||
|
// ChanBuffSize: 1, // buffer size for merging chunks
|
||||||
|
// SortedChanBuffSize: 10, // buffer size for passing records to output
|
||||||
|
// TempFilesDir: "", // empty for use OS default ex: /tmp
|
||||||
|
}
|
||||||
|
ls.sorter, ls.outputChan, ls.errChan = extsort.Strings(ls.inputChan, &opt)
|
||||||
|
go ls.sorter.Sort(ls.ctx)
|
||||||
|
|
||||||
|
// Show we are extsorting now
|
||||||
|
ls.extSort = true
|
||||||
|
|
||||||
|
// Send the accumulated entries to the sorter
|
||||||
|
fs.Debugf(ls.f, "Sending accumulated directory entries to disk")
|
||||||
|
err = ls.sendEntriesToExtSort(ls.entries)
|
||||||
|
fs.Debugf(ls.f, "Done sending accumulated directory entries to disk")
|
||||||
|
clear(ls.entries)
|
||||||
|
ls.entries = nil
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Add entries to the list sorter.
|
// Add entries to the list sorter.
|
||||||
//
|
//
|
||||||
// Does not call the callback.
|
// Does not call the callback.
|
||||||
@ -58,15 +174,133 @@ func NewSorter(ctx context.Context, callback fs.ListRCallback, keyFn KeyFn) (*So
|
|||||||
func (ls *Sorter) Add(entries fs.DirEntries) error {
|
func (ls *Sorter) Add(entries fs.DirEntries) error {
|
||||||
ls.mu.Lock()
|
ls.mu.Lock()
|
||||||
defer ls.mu.Unlock()
|
defer ls.mu.Unlock()
|
||||||
ls.entries = append(ls.entries, entries...)
|
if ls.extSort {
|
||||||
|
err := ls.sendEntriesToExtSort(entries)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ls.entries = append(ls.entries, entries...)
|
||||||
|
if len(ls.entries) >= ls.cutoff {
|
||||||
|
err := ls.startExtSort()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Number of entries to batch in list helper
|
||||||
|
const listHelperBatchSize = 100
|
||||||
|
|
||||||
|
// listHelper is used to turn keys into entries concurrently
|
||||||
|
type listHelper struct {
|
||||||
|
ls *Sorter // parent
|
||||||
|
keys []string // keys being built up
|
||||||
|
entries fs.DirEntries // entries processed concurrently as a batch
|
||||||
|
errs []error // errors processed concurrently
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewlistHelper should be with the callback passed in
|
||||||
|
func (ls *Sorter) newListHelper() *listHelper {
|
||||||
|
return &listHelper{
|
||||||
|
ls: ls,
|
||||||
|
entries: make(fs.DirEntries, listHelperBatchSize),
|
||||||
|
errs: make([]error, listHelperBatchSize),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// send sends the stored entries to the callback if there are >= max
|
||||||
|
// entries.
|
||||||
|
func (lh *listHelper) send(max int) (err error) {
|
||||||
|
if len(lh.keys) < max {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Turn this batch into objects in parallel
|
||||||
|
g, gCtx := errgroup.WithContext(lh.ls.ctx)
|
||||||
|
g.SetLimit(lh.ls.ci.Checkers)
|
||||||
|
for i, key := range lh.keys {
|
||||||
|
i, key := i, key // can remove when go1.22 is minimum version
|
||||||
|
g.Go(func() error {
|
||||||
|
lh.entries[i], lh.errs[i] = lh.ls.keyToEntry(gCtx, key)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
err = g.Wait()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Account errors and collect OK entries
|
||||||
|
toSend := lh.entries[:0]
|
||||||
|
for i := range lh.keys {
|
||||||
|
entry, err := lh.entries[i], lh.errs[i]
|
||||||
|
if err != nil {
|
||||||
|
lh.ls.errs.Add(err)
|
||||||
|
} else if entry != nil {
|
||||||
|
toSend = append(toSend, entry)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// fmt.Println(lh.keys)
|
||||||
|
// fmt.Println(toSend)
|
||||||
|
err = lh.ls.callback(toSend)
|
||||||
|
|
||||||
|
clear(lh.entries)
|
||||||
|
clear(lh.errs)
|
||||||
|
lh.keys = lh.keys[:0]
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add an entry to the stored entries and send them if there are more
|
||||||
|
// than a certain amount
|
||||||
|
func (lh *listHelper) Add(key string) error {
|
||||||
|
lh.keys = append(lh.keys, key)
|
||||||
|
return lh.send(100)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flush the stored entries (if any) sending them to the callback
|
||||||
|
func (lh *listHelper) Flush() error {
|
||||||
|
return lh.send(1)
|
||||||
|
}
|
||||||
|
|
||||||
// Send the sorted entries to the callback.
|
// Send the sorted entries to the callback.
|
||||||
func (ls *Sorter) Send() error {
|
func (ls *Sorter) Send() (err error) {
|
||||||
ls.mu.Lock()
|
ls.mu.Lock()
|
||||||
defer ls.mu.Unlock()
|
defer ls.mu.Unlock()
|
||||||
|
|
||||||
|
if ls.extSort {
|
||||||
|
close(ls.inputChan)
|
||||||
|
|
||||||
|
list := ls.newListHelper()
|
||||||
|
|
||||||
|
outer:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case key, ok := <-ls.outputChan:
|
||||||
|
if !ok {
|
||||||
|
break outer
|
||||||
|
}
|
||||||
|
err := list.Add(key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case err := <-ls.errChan:
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err = list.Flush()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return ls.errs.Err("sorter")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
// Sort the directory entries by Remote
|
// Sort the directory entries by Remote
|
||||||
//
|
//
|
||||||
// We use a stable sort here just in case there are
|
// We use a stable sort here just in case there are
|
||||||
@ -90,7 +324,10 @@ func (ls *Sorter) CleanUp() {
|
|||||||
ls.mu.Lock()
|
ls.mu.Lock()
|
||||||
defer ls.mu.Unlock()
|
defer ls.mu.Unlock()
|
||||||
|
|
||||||
|
ls.cancel()
|
||||||
|
clear(ls.entries)
|
||||||
ls.entries = nil
|
ls.entries = nil
|
||||||
|
ls.extSort = false
|
||||||
}
|
}
|
||||||
|
|
||||||
// SortToChan makes a callback for the Sorter which sends the output
|
// SortToChan makes a callback for the Sorter which sends the output
|
||||||
|
@ -5,6 +5,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"slices"
|
"slices"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/rclone/rclone/fs"
|
"github.com/rclone/rclone/fs"
|
||||||
@ -22,7 +23,7 @@ func TestSorter(t *testing.T) {
|
|||||||
require.Equal(t, fs.DirEntries{oA, da}, entries)
|
require.Equal(t, fs.DirEntries{oA, da}, entries)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
ls, err := NewSorter(ctx, callback, nil)
|
ls, err := NewSorter(ctx, nil, callback, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Equal(t, fmt.Sprintf("%p", callback), fmt.Sprintf("%p", ls.callback))
|
assert.Equal(t, fmt.Sprintf("%p", callback), fmt.Sprintf("%p", ls.callback))
|
||||||
assert.Equal(t, fmt.Sprintf("%p", identityKeyFn), fmt.Sprintf("%p", ls.keyFn))
|
assert.Equal(t, fmt.Sprintf("%p", identityKeyFn), fmt.Sprintf("%p", ls.keyFn))
|
||||||
@ -55,7 +56,7 @@ func TestSorterIdentity(t *testing.T) {
|
|||||||
assert.Equal(t, "a", entries[0].Remote())
|
assert.Equal(t, "a", entries[0].Remote())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
ls, err := NewSorter(ctx, callback, nil)
|
ls, err := NewSorter(ctx, nil, callback, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer ls.CleanUp()
|
defer ls.CleanUp()
|
||||||
|
|
||||||
@ -86,7 +87,7 @@ func TestSorterKeyFn(t *testing.T) {
|
|||||||
assert.Equal(t, "z", entries[0].Remote())
|
assert.Equal(t, "z", entries[0].Remote())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
ls, err := NewSorter(ctx, callback, keyFn)
|
ls, err := NewSorter(ctx, nil, callback, keyFn)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer ls.CleanUp()
|
defer ls.CleanUp()
|
||||||
|
|
||||||
@ -102,3 +103,212 @@ func TestSorterKeyFn(t *testing.T) {
|
|||||||
err = ls.Send()
|
err = ls.Send()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// testFs implements enough of the fs.Fs interface for Sorter
|
||||||
|
type testFs struct {
|
||||||
|
t *testing.T
|
||||||
|
entriesMap map[string]fs.DirEntry
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewObject finds the Object at remote. If it can't be found
|
||||||
|
// it returns the error ErrorObjectNotFound.
|
||||||
|
func (f *testFs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
||||||
|
entry, ok := f.entriesMap[remote]
|
||||||
|
assert.True(f.t, ok, "entry not found")
|
||||||
|
if !ok {
|
||||||
|
return nil, fs.ErrorObjectNotFound
|
||||||
|
}
|
||||||
|
obj, ok := entry.(fs.Object)
|
||||||
|
assert.True(f.t, ok, "expected entry to be object: %#v", entry)
|
||||||
|
if !ok {
|
||||||
|
return nil, fs.ErrorObjectNotFound
|
||||||
|
}
|
||||||
|
return obj, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// String outputs info about the Fs
|
||||||
|
func (f *testFs) String() string {
|
||||||
|
return "testFs"
|
||||||
|
}
|
||||||
|
|
||||||
|
// used to sort the entries case insensitively
|
||||||
|
func keyCaseInsensitive(entry fs.DirEntry) string {
|
||||||
|
return strings.ToLower(entry.Remote())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test the external sorting
|
||||||
|
func testSorterExt(t *testing.T, cutoff, N int, wantExtSort bool, keyFn KeyFn) {
|
||||||
|
ctx := context.Background()
|
||||||
|
ctx, ci := fs.AddConfig(ctx)
|
||||||
|
ci.ListCutoff = cutoff
|
||||||
|
|
||||||
|
// Make the directory entries
|
||||||
|
entriesMap := make(map[string]fs.DirEntry, N)
|
||||||
|
for i := 0; i < N; i++ {
|
||||||
|
remote := fmt.Sprintf("%010d", i)
|
||||||
|
prefix := "a"
|
||||||
|
if i%3 == 0 {
|
||||||
|
prefix = "A"
|
||||||
|
}
|
||||||
|
remote = prefix + remote
|
||||||
|
if i%2 == 0 {
|
||||||
|
entriesMap[remote] = mockobject.New(remote)
|
||||||
|
} else {
|
||||||
|
entriesMap[remote] = mockdir.New(remote)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert.Equal(t, N, len(entriesMap))
|
||||||
|
f := &testFs{t: t, entriesMap: entriesMap}
|
||||||
|
|
||||||
|
// In the callback delete entries from the map when they are
|
||||||
|
// found
|
||||||
|
prevKey := ""
|
||||||
|
callback := func(entries fs.DirEntries) error {
|
||||||
|
for _, gotEntry := range entries {
|
||||||
|
remote := gotEntry.Remote()
|
||||||
|
key := remote
|
||||||
|
if keyFn != nil {
|
||||||
|
key = keyFn(gotEntry)
|
||||||
|
}
|
||||||
|
require.Less(t, prevKey, key, "Not sorted")
|
||||||
|
prevKey = key
|
||||||
|
wantEntry, ok := entriesMap[remote]
|
||||||
|
assert.True(t, ok, "Entry not found %q", remote)
|
||||||
|
_, wantDir := wantEntry.(fs.Directory)
|
||||||
|
_, gotDir := wantEntry.(fs.Directory)
|
||||||
|
_, wantObj := wantEntry.(fs.Object)
|
||||||
|
_, gotObj := wantEntry.(fs.Object)
|
||||||
|
require.True(t, (wantDir && gotDir) || (wantObj && gotObj), "Wrong types %#v, %#v", wantEntry, gotEntry)
|
||||||
|
delete(entriesMap, remote)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ls, err := NewSorter(ctx, f, callback, keyFn)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Send the entries in random (map) order
|
||||||
|
for _, entry := range entriesMap {
|
||||||
|
err = ls.Add(fs.DirEntries{entry})
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check we are extsorting if required
|
||||||
|
assert.Equal(t, wantExtSort, ls.extSort)
|
||||||
|
|
||||||
|
// Test Send
|
||||||
|
err = ls.Send()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// All the entries should have been seen
|
||||||
|
assert.Equal(t, 0, len(entriesMap))
|
||||||
|
|
||||||
|
// Test Cleanup
|
||||||
|
ls.CleanUp()
|
||||||
|
assert.Equal(t, fs.DirEntries(nil), ls.entries)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test the external sorting
|
||||||
|
func TestSorterExt(t *testing.T) {
|
||||||
|
for _, test := range []struct {
|
||||||
|
cutoff int
|
||||||
|
N int
|
||||||
|
wantExtSort bool
|
||||||
|
keyFn KeyFn
|
||||||
|
}{
|
||||||
|
{cutoff: 1000, N: 100, wantExtSort: false},
|
||||||
|
{cutoff: 100, N: 1000, wantExtSort: true},
|
||||||
|
{cutoff: 1000, N: 100, wantExtSort: false, keyFn: keyCaseInsensitive},
|
||||||
|
{cutoff: 100, N: 1000, wantExtSort: true, keyFn: keyCaseInsensitive},
|
||||||
|
{cutoff: 100001, N: 100000, wantExtSort: false},
|
||||||
|
{cutoff: 100000, N: 100001, wantExtSort: true},
|
||||||
|
// {cutoff: 100_000, N: 1_000_000, wantExtSort: true},
|
||||||
|
// {cutoff: 100_000, N: 10_000_000, wantExtSort: true},
|
||||||
|
} {
|
||||||
|
t.Run(fmt.Sprintf("cutoff=%d,N=%d,wantExtSort=%v,keyFn=%v", test.cutoff, test.N, test.wantExtSort, test.keyFn != nil), func(t *testing.T) {
|
||||||
|
testSorterExt(t, test.cutoff, test.N, test.wantExtSort, test.keyFn)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// benchFs implements enough of the fs.Fs interface for Sorter
|
||||||
|
type benchFs struct{}
|
||||||
|
|
||||||
|
// NewObject finds the Object at remote. If it can't be found
|
||||||
|
// it returns the error ErrorObjectNotFound.
|
||||||
|
func (benchFs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
||||||
|
// Recreate the mock objects
|
||||||
|
return mockobject.New(remote), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// String outputs info about the Fs
|
||||||
|
func (benchFs) String() string {
|
||||||
|
return "benchFs"
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkSorterExt(t *testing.B) {
|
||||||
|
const cutoff = 1000
|
||||||
|
const N = 10_000_000
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
ctx, ci := fs.AddConfig(ctx)
|
||||||
|
ci.ListCutoff = cutoff
|
||||||
|
keyFn := keyCaseInsensitive
|
||||||
|
|
||||||
|
// In the callback check entries are in order
|
||||||
|
prevKey := ""
|
||||||
|
entriesReceived := 0
|
||||||
|
callback := func(entries fs.DirEntries) error {
|
||||||
|
for _, gotEntry := range entries {
|
||||||
|
remote := gotEntry.Remote()
|
||||||
|
key := remote
|
||||||
|
if keyFn != nil {
|
||||||
|
key = keyFn(gotEntry)
|
||||||
|
}
|
||||||
|
require.Less(t, prevKey, key, "Not sorted")
|
||||||
|
prevKey = key
|
||||||
|
entriesReceived++
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
f := benchFs{}
|
||||||
|
ls, err := NewSorter(ctx, f, callback, keyFn)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Send the entries in reverse order in batches of 1000 like the backends do
|
||||||
|
var entries = make(fs.DirEntries, 0, 1000)
|
||||||
|
for i := N - 1; i >= 0; i-- {
|
||||||
|
remote := fmt.Sprintf("%050d", i) // UUID length plus a bit
|
||||||
|
prefix := "a"
|
||||||
|
if i%3 == 0 {
|
||||||
|
prefix = "A"
|
||||||
|
}
|
||||||
|
remote = prefix + remote
|
||||||
|
if i%2 == 0 {
|
||||||
|
entries = append(entries, mockobject.New(remote))
|
||||||
|
} else {
|
||||||
|
entries = append(entries, mockdir.New(remote))
|
||||||
|
}
|
||||||
|
if len(entries) > 1000 {
|
||||||
|
err = ls.Add(entries)
|
||||||
|
require.NoError(t, err)
|
||||||
|
entries = entries[:0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err = ls.Add(entries)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Check we are extsorting
|
||||||
|
assert.True(t, ls.extSort)
|
||||||
|
|
||||||
|
// Test Send
|
||||||
|
err = ls.Send()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// All the entries should have been seen
|
||||||
|
assert.Equal(t, N, entriesReceived)
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
|
ls.CleanUp()
|
||||||
|
}
|
||||||
|
@ -399,7 +399,7 @@ func (m *March) processJob(job listDirJob) ([]listDirJob, error) {
|
|||||||
if m.NoTraverse && !m.NoCheckDest {
|
if m.NoTraverse && !m.NoCheckDest {
|
||||||
originalSrcChan := srcChan
|
originalSrcChan := srcChan
|
||||||
srcChan = make(chan fs.DirEntry, 100)
|
srcChan = make(chan fs.DirEntry, 100)
|
||||||
ls, err := list.NewSorter(m.Ctx, list.SortToChan(dstChan), m.key)
|
ls, err := list.NewSorter(m.Ctx, m.Fdst, list.SortToChan(dstChan), m.key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
1
go.mod
1
go.mod
@ -167,6 +167,7 @@ require (
|
|||||||
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
|
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
|
||||||
github.com/kr/fs v0.1.0 // indirect
|
github.com/kr/fs v0.1.0 // indirect
|
||||||
github.com/kylelemons/godebug v1.1.0 // indirect
|
github.com/kylelemons/godebug v1.1.0 // indirect
|
||||||
|
github.com/lanrat/extsort v1.0.2 // indirect
|
||||||
github.com/lpar/date v1.0.0 // indirect
|
github.com/lpar/date v1.0.0 // indirect
|
||||||
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
|
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
|
||||||
github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed // indirect
|
github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed // indirect
|
||||||
|
2
go.sum
2
go.sum
@ -431,6 +431,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
|||||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||||
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
|
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
|
||||||
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
|
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
|
||||||
|
github.com/lanrat/extsort v1.0.2 h1:p3MLVpQEPwEGPzeLBb+1eSErzRl6Bgjgr+qnIs2RxrU=
|
||||||
|
github.com/lanrat/extsort v1.0.2/go.mod h1:ivzsdLm8Tv+88qbdpMElV6Z15StlzPUtZSKsGb51hnQ=
|
||||||
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
|
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
|
||||||
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
|
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
|
||||||
github.com/lpar/date v1.0.0 h1:bq/zVqFTUmsxvd/CylidY4Udqpr9BOFrParoP6p0x/I=
|
github.com/lpar/date v1.0.0 h1:bq/zVqFTUmsxvd/CylidY4Udqpr9BOFrParoP6p0x/I=
|
||||||
|
Loading…
Reference in New Issue
Block a user