diff --git a/docs/content/docs.md b/docs/content/docs.md index cb65ed746..1299a20ef 100644 --- a/docs/content/docs.md +++ b/docs/content/docs.md @@ -1442,6 +1442,19 @@ backends and the VFS. There are individual flags for just enabling it for the VFS `--vfs-links` and the local backend `--local-links` if required. +### --list-cutoff N {#list-cutoff} + +When syncing rclone needs to sort directory entries before comparing +them. Below this threshold (1,000,000) by default, rclone will store +the directory entries in memory. 1,000,000 entries will take approx +1GB of RAM to store. Above this threshold rclone will store directory +entries on disk and sort them without using a lot of memory. + +Doing this is slightly less efficient then sorting them in memory and +will only work well for the bucket based backends (eg s3, b2, +azureblob, swift) but these are the only backends likely to have +millions of entries in a directory. + ### --log-file=FILE ### Log all of rclone's output to FILE. This is not active by default. diff --git a/docs/content/faq.md b/docs/content/faq.md index 050caf8ef..8d4a0b19f 100644 --- a/docs/content/faq.md +++ b/docs/content/faq.md @@ -233,12 +233,18 @@ value, say `export GOGC=20`. This will make the garbage collector work harder, reducing memory size at the expense of CPU usage. The most common cause of rclone using lots of memory is a single -directory with millions of files in. Rclone has to load this entirely -into memory as rclone objects. Each rclone object takes 0.5k-1k of -memory. There is +directory with millions of files in. + +Before rclone v1.69 has to load this entirely into memory as rclone +objects. Each rclone object takes 0.5k-1k of memory. There is [a workaround for this](https://github.com/rclone/rclone/wiki/Big-syncs-with-millions-of-files) which involves a bit of scripting. +However with rclone v1.69 and later rclone will automatically save +directory entries to disk when a directory with more than +[`--list-cutoff`](/docs/#list-cutoff) (1,000,000 by default) entries +is detected. + ### Rclone changes fullwidth Unicode punctuation marks in file names For example: On a Windows system, you have a file with name `Test:1.jpg`, diff --git a/fs/config.go b/fs/config.go index e83d2cac1..c06e6c3ec 100644 --- a/fs/config.go +++ b/fs/config.go @@ -277,6 +277,11 @@ var ConfigOptionsInfo = Options{{ Default: false, Help: "Use recursive list if available; uses more memory but fewer transactions", Groups: "Listing", +}, { + Name: "list_cutoff", + Default: 1_000_000, + Help: "To save memory, sort directory listings on disk above this threshold", + Groups: "Sync", }, { Name: "tpslimit", Default: 0.0, @@ -585,6 +590,7 @@ type ConfigInfo struct { Suffix string `config:"suffix"` SuffixKeepExtension bool `config:"suffix_keep_extension"` UseListR bool `config:"fast_list"` + ListCutoff int `config:"list_cutoff"` BufferSize SizeSuffix `config:"buffer_size"` BwLimit BwTimetable `config:"bwlimit"` BwLimitFile BwTimetable `config:"bwlimit_file"` diff --git a/fs/list/list.go b/fs/list/list.go index 30aab86ed..1e9093734 100644 --- a/fs/list/list.go +++ b/fs/list/list.go @@ -64,7 +64,7 @@ func DirSortedFn(ctx context.Context, f fs.Fs, includeAll bool, dir string, call fi := filter.GetConfig(ctx) // Sort the entries, in or out of memory - sorter, err := NewSorter(ctx, callback, keyFn) + sorter, err := NewSorter(ctx, f, callback, keyFn) if err != nil { return fmt.Errorf("failed to create directory sorter: %w", err) } diff --git a/fs/list/sorter.go b/fs/list/sorter.go index b38d66fa3..da239d550 100644 --- a/fs/list/sorter.go +++ b/fs/list/sorter.go @@ -3,16 +3,30 @@ package list import ( "cmp" "context" + "errors" + "fmt" "slices" + "strings" "sync" + "time" + "github.com/lanrat/extsort" "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/lib/errcount" + "golang.org/x/sync/errgroup" ) +// NewObjecter is the minimum facilities we need from the fs.Fs passed into NewSorter. +type NewObjecter interface { + // NewObject finds the Object at remote. If it can't be found + // it returns the error ErrorObjectNotFound. + NewObject(ctx context.Context, remote string) (fs.Object, error) +} + // Sorter implements an efficient mechanism for sorting list entries. // -// If there are a large number of entries, this may be done on disk -// instead of in memory. +// If there are a large number of entries (above `--list-cutoff`), +// this may be done on disk instead of in memory. // // Supply entries with the Add method, call Send at the end to deliver // the sorted entries and finalise with CleanUp regardless of whether @@ -21,11 +35,21 @@ import ( // Sorted entries are delivered to the callback supplied to NewSorter // when the Send method is called. type Sorter struct { - ctx context.Context - mu sync.Mutex - callback fs.ListRCallback - entries fs.DirEntries - keyFn KeyFn + ctx context.Context // context for everything + ci *fs.ConfigInfo // config we are using + cancel func() // cancel all background operations + mu sync.Mutex // protect the below + f NewObjecter // fs that we are listing + callback fs.ListRCallback // where to send the sorted entries to + entries fs.DirEntries // accumulated entries + keyFn KeyFn // transform an entry into a sort key + cutoff int // number of entries above which we start extsort + extSort bool // true if we are ext sorting + inputChan chan string // for sending data to the ext sort + outputChan chan string // for receiving data from the ext sort + errChan chan error // for getting errors from the ext sort + sorter *extsort.StringSorter // external string sort + errs *errcount.ErrCount // accumulate errors } // KeyFn turns an entry into a sort key @@ -39,17 +63,109 @@ func identityKeyFn(entry fs.DirEntry) string { // NewSorter creates a new Sorter with callback for sorted entries to // be delivered to. keyFn is used to process each entry to get a key // function, if nil then it will just use entry.Remote() -func NewSorter(ctx context.Context, callback fs.ListRCallback, keyFn KeyFn) (*Sorter, error) { +func NewSorter(ctx context.Context, f NewObjecter, callback fs.ListRCallback, keyFn KeyFn) (*Sorter, error) { + ci := fs.GetConfig(ctx) + ctx, cancel := context.WithCancel(ctx) if keyFn == nil { keyFn = identityKeyFn } return &Sorter{ ctx: ctx, + ci: ci, + cancel: cancel, + f: f, callback: callback, keyFn: keyFn, + cutoff: ci.ListCutoff, + errs: errcount.New(), }, nil } +// Turn a directory entry into a combined key and data for extsort +func (ls *Sorter) entryToKey(entry fs.DirEntry) string { + // To start with we just use the Remote to recover the object + // To make more efficient we would serialize the object here + remote := entry.Remote() + remote = strings.TrimRight(remote, "/") + if _, isDir := entry.(fs.Directory); isDir { + remote += "/" + } + key := ls.keyFn(entry) + "\x00" + remote + return key +} + +// Turn an exsort key back into a directory entry +func (ls *Sorter) keyToEntry(ctx context.Context, key string) (entry fs.DirEntry, err error) { + null := strings.IndexRune(key, '\x00') + if null < 0 { + return nil, errors.New("sorter: failed to deserialize: missing null") + } + remote := key[null+1:] + if remote, isDir := strings.CutSuffix(remote, "/"); isDir { + // Is a directory + // + // Note this creates a very minimal directory entry which should be fine for the + // bucket based remotes this code will be run on. + entry = fs.NewDir(remote, time.Time{}) + } else { + obj, err := ls.f.NewObject(ctx, remote) + if err != nil { + fs.Errorf(ls.f, "sorter: failed to re-create object %q: %v", remote, err) + return nil, fmt.Errorf("sorter: failed to re-create object: %w", err) + } + entry = obj + } + return entry, nil +} + +func (ls *Sorter) sendEntriesToExtSort(entries fs.DirEntries) (err error) { + for _, entry := range entries { + select { + case ls.inputChan <- ls.entryToKey(entry): + case err = <-ls.errChan: + if err != nil { + return err + } + } + } + select { + case err = <-ls.errChan: + default: + } + return err +} + +func (ls *Sorter) startExtSort() (err error) { + fs.Logf(ls.f, "Switching to on disk sorting as more than %d entries in one directory detected", ls.cutoff) + ls.inputChan = make(chan string, 100) + // Options to control the extsort + opt := extsort.Config{ + NumWorkers: 8, // small effect + ChanBuffSize: 1024, // small effect + SortedChanBuffSize: 1024, // makes a lot of difference + ChunkSize: 32 * 1024, // tuned for 50 char records (UUID sized) + // Defaults + // ChunkSize: int(1e6), // amount of records to store in each chunk which will be written to disk + // NumWorkers: 2, // maximum number of workers to use for parallel sorting + // ChanBuffSize: 1, // buffer size for merging chunks + // SortedChanBuffSize: 10, // buffer size for passing records to output + // TempFilesDir: "", // empty for use OS default ex: /tmp + } + ls.sorter, ls.outputChan, ls.errChan = extsort.Strings(ls.inputChan, &opt) + go ls.sorter.Sort(ls.ctx) + + // Show we are extsorting now + ls.extSort = true + + // Send the accumulated entries to the sorter + fs.Debugf(ls.f, "Sending accumulated directory entries to disk") + err = ls.sendEntriesToExtSort(ls.entries) + fs.Debugf(ls.f, "Done sending accumulated directory entries to disk") + clear(ls.entries) + ls.entries = nil + return err +} + // Add entries to the list sorter. // // Does not call the callback. @@ -58,15 +174,133 @@ func NewSorter(ctx context.Context, callback fs.ListRCallback, keyFn KeyFn) (*So func (ls *Sorter) Add(entries fs.DirEntries) error { ls.mu.Lock() defer ls.mu.Unlock() - ls.entries = append(ls.entries, entries...) + if ls.extSort { + err := ls.sendEntriesToExtSort(entries) + if err != nil { + return err + } + } else { + ls.entries = append(ls.entries, entries...) + if len(ls.entries) >= ls.cutoff { + err := ls.startExtSort() + if err != nil { + return err + } + } + } return nil } +// Number of entries to batch in list helper +const listHelperBatchSize = 100 + +// listHelper is used to turn keys into entries concurrently +type listHelper struct { + ls *Sorter // parent + keys []string // keys being built up + entries fs.DirEntries // entries processed concurrently as a batch + errs []error // errors processed concurrently +} + +// NewlistHelper should be with the callback passed in +func (ls *Sorter) newListHelper() *listHelper { + return &listHelper{ + ls: ls, + entries: make(fs.DirEntries, listHelperBatchSize), + errs: make([]error, listHelperBatchSize), + } +} + +// send sends the stored entries to the callback if there are >= max +// entries. +func (lh *listHelper) send(max int) (err error) { + if len(lh.keys) < max { + return nil + } + + // Turn this batch into objects in parallel + g, gCtx := errgroup.WithContext(lh.ls.ctx) + g.SetLimit(lh.ls.ci.Checkers) + for i, key := range lh.keys { + i, key := i, key // can remove when go1.22 is minimum version + g.Go(func() error { + lh.entries[i], lh.errs[i] = lh.ls.keyToEntry(gCtx, key) + return nil + }) + } + err = g.Wait() + if err != nil { + return err + } + + // Account errors and collect OK entries + toSend := lh.entries[:0] + for i := range lh.keys { + entry, err := lh.entries[i], lh.errs[i] + if err != nil { + lh.ls.errs.Add(err) + } else if entry != nil { + toSend = append(toSend, entry) + } + } + + // fmt.Println(lh.keys) + // fmt.Println(toSend) + err = lh.ls.callback(toSend) + + clear(lh.entries) + clear(lh.errs) + lh.keys = lh.keys[:0] + return err +} + +// Add an entry to the stored entries and send them if there are more +// than a certain amount +func (lh *listHelper) Add(key string) error { + lh.keys = append(lh.keys, key) + return lh.send(100) +} + +// Flush the stored entries (if any) sending them to the callback +func (lh *listHelper) Flush() error { + return lh.send(1) +} + // Send the sorted entries to the callback. -func (ls *Sorter) Send() error { +func (ls *Sorter) Send() (err error) { ls.mu.Lock() defer ls.mu.Unlock() + if ls.extSort { + close(ls.inputChan) + + list := ls.newListHelper() + + outer: + for { + select { + case key, ok := <-ls.outputChan: + if !ok { + break outer + } + err := list.Add(key) + if err != nil { + return err + } + case err := <-ls.errChan: + if err != nil { + return err + } + } + } + err = list.Flush() + if err != nil { + return err + } + return ls.errs.Err("sorter") + + } + // Sort the directory entries by Remote // // We use a stable sort here just in case there are @@ -90,7 +324,10 @@ func (ls *Sorter) CleanUp() { ls.mu.Lock() defer ls.mu.Unlock() + ls.cancel() + clear(ls.entries) ls.entries = nil + ls.extSort = false } // SortToChan makes a callback for the Sorter which sends the output diff --git a/fs/list/sorter_test.go b/fs/list/sorter_test.go index 992389887..7d34e0975 100644 --- a/fs/list/sorter_test.go +++ b/fs/list/sorter_test.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "slices" + "strings" "testing" "github.com/rclone/rclone/fs" @@ -22,7 +23,7 @@ func TestSorter(t *testing.T) { require.Equal(t, fs.DirEntries{oA, da}, entries) return nil } - ls, err := NewSorter(ctx, callback, nil) + ls, err := NewSorter(ctx, nil, callback, nil) require.NoError(t, err) assert.Equal(t, fmt.Sprintf("%p", callback), fmt.Sprintf("%p", ls.callback)) assert.Equal(t, fmt.Sprintf("%p", identityKeyFn), fmt.Sprintf("%p", ls.keyFn)) @@ -55,7 +56,7 @@ func TestSorterIdentity(t *testing.T) { assert.Equal(t, "a", entries[0].Remote()) return nil } - ls, err := NewSorter(ctx, callback, nil) + ls, err := NewSorter(ctx, nil, callback, nil) require.NoError(t, err) defer ls.CleanUp() @@ -86,7 +87,7 @@ func TestSorterKeyFn(t *testing.T) { assert.Equal(t, "z", entries[0].Remote()) return nil } - ls, err := NewSorter(ctx, callback, keyFn) + ls, err := NewSorter(ctx, nil, callback, keyFn) require.NoError(t, err) defer ls.CleanUp() @@ -102,3 +103,212 @@ func TestSorterKeyFn(t *testing.T) { err = ls.Send() require.NoError(t, err) } + +// testFs implements enough of the fs.Fs interface for Sorter +type testFs struct { + t *testing.T + entriesMap map[string]fs.DirEntry +} + +// NewObject finds the Object at remote. If it can't be found +// it returns the error ErrorObjectNotFound. +func (f *testFs) NewObject(ctx context.Context, remote string) (fs.Object, error) { + entry, ok := f.entriesMap[remote] + assert.True(f.t, ok, "entry not found") + if !ok { + return nil, fs.ErrorObjectNotFound + } + obj, ok := entry.(fs.Object) + assert.True(f.t, ok, "expected entry to be object: %#v", entry) + if !ok { + return nil, fs.ErrorObjectNotFound + } + return obj, nil +} + +// String outputs info about the Fs +func (f *testFs) String() string { + return "testFs" +} + +// used to sort the entries case insensitively +func keyCaseInsensitive(entry fs.DirEntry) string { + return strings.ToLower(entry.Remote()) +} + +// Test the external sorting +func testSorterExt(t *testing.T, cutoff, N int, wantExtSort bool, keyFn KeyFn) { + ctx := context.Background() + ctx, ci := fs.AddConfig(ctx) + ci.ListCutoff = cutoff + + // Make the directory entries + entriesMap := make(map[string]fs.DirEntry, N) + for i := 0; i < N; i++ { + remote := fmt.Sprintf("%010d", i) + prefix := "a" + if i%3 == 0 { + prefix = "A" + } + remote = prefix + remote + if i%2 == 0 { + entriesMap[remote] = mockobject.New(remote) + } else { + entriesMap[remote] = mockdir.New(remote) + } + } + assert.Equal(t, N, len(entriesMap)) + f := &testFs{t: t, entriesMap: entriesMap} + + // In the callback delete entries from the map when they are + // found + prevKey := "" + callback := func(entries fs.DirEntries) error { + for _, gotEntry := range entries { + remote := gotEntry.Remote() + key := remote + if keyFn != nil { + key = keyFn(gotEntry) + } + require.Less(t, prevKey, key, "Not sorted") + prevKey = key + wantEntry, ok := entriesMap[remote] + assert.True(t, ok, "Entry not found %q", remote) + _, wantDir := wantEntry.(fs.Directory) + _, gotDir := wantEntry.(fs.Directory) + _, wantObj := wantEntry.(fs.Object) + _, gotObj := wantEntry.(fs.Object) + require.True(t, (wantDir && gotDir) || (wantObj && gotObj), "Wrong types %#v, %#v", wantEntry, gotEntry) + delete(entriesMap, remote) + } + return nil + } + + ls, err := NewSorter(ctx, f, callback, keyFn) + require.NoError(t, err) + + // Send the entries in random (map) order + for _, entry := range entriesMap { + err = ls.Add(fs.DirEntries{entry}) + require.NoError(t, err) + } + + // Check we are extsorting if required + assert.Equal(t, wantExtSort, ls.extSort) + + // Test Send + err = ls.Send() + require.NoError(t, err) + + // All the entries should have been seen + assert.Equal(t, 0, len(entriesMap)) + + // Test Cleanup + ls.CleanUp() + assert.Equal(t, fs.DirEntries(nil), ls.entries) +} + +// Test the external sorting +func TestSorterExt(t *testing.T) { + for _, test := range []struct { + cutoff int + N int + wantExtSort bool + keyFn KeyFn + }{ + {cutoff: 1000, N: 100, wantExtSort: false}, + {cutoff: 100, N: 1000, wantExtSort: true}, + {cutoff: 1000, N: 100, wantExtSort: false, keyFn: keyCaseInsensitive}, + {cutoff: 100, N: 1000, wantExtSort: true, keyFn: keyCaseInsensitive}, + {cutoff: 100001, N: 100000, wantExtSort: false}, + {cutoff: 100000, N: 100001, wantExtSort: true}, + // {cutoff: 100_000, N: 1_000_000, wantExtSort: true}, + // {cutoff: 100_000, N: 10_000_000, wantExtSort: true}, + } { + t.Run(fmt.Sprintf("cutoff=%d,N=%d,wantExtSort=%v,keyFn=%v", test.cutoff, test.N, test.wantExtSort, test.keyFn != nil), func(t *testing.T) { + testSorterExt(t, test.cutoff, test.N, test.wantExtSort, test.keyFn) + }) + } +} + +// benchFs implements enough of the fs.Fs interface for Sorter +type benchFs struct{} + +// NewObject finds the Object at remote. If it can't be found +// it returns the error ErrorObjectNotFound. +func (benchFs) NewObject(ctx context.Context, remote string) (fs.Object, error) { + // Recreate the mock objects + return mockobject.New(remote), nil +} + +// String outputs info about the Fs +func (benchFs) String() string { + return "benchFs" +} + +func BenchmarkSorterExt(t *testing.B) { + const cutoff = 1000 + const N = 10_000_000 + + ctx := context.Background() + ctx, ci := fs.AddConfig(ctx) + ci.ListCutoff = cutoff + keyFn := keyCaseInsensitive + + // In the callback check entries are in order + prevKey := "" + entriesReceived := 0 + callback := func(entries fs.DirEntries) error { + for _, gotEntry := range entries { + remote := gotEntry.Remote() + key := remote + if keyFn != nil { + key = keyFn(gotEntry) + } + require.Less(t, prevKey, key, "Not sorted") + prevKey = key + entriesReceived++ + } + return nil + } + + f := benchFs{} + ls, err := NewSorter(ctx, f, callback, keyFn) + require.NoError(t, err) + + // Send the entries in reverse order in batches of 1000 like the backends do + var entries = make(fs.DirEntries, 0, 1000) + for i := N - 1; i >= 0; i-- { + remote := fmt.Sprintf("%050d", i) // UUID length plus a bit + prefix := "a" + if i%3 == 0 { + prefix = "A" + } + remote = prefix + remote + if i%2 == 0 { + entries = append(entries, mockobject.New(remote)) + } else { + entries = append(entries, mockdir.New(remote)) + } + if len(entries) > 1000 { + err = ls.Add(entries) + require.NoError(t, err) + entries = entries[:0] + } + } + err = ls.Add(entries) + require.NoError(t, err) + + // Check we are extsorting + assert.True(t, ls.extSort) + + // Test Send + err = ls.Send() + require.NoError(t, err) + + // All the entries should have been seen + assert.Equal(t, N, entriesReceived) + + // Cleanup + ls.CleanUp() +} diff --git a/fs/march/march.go b/fs/march/march.go index 9d1cf6824..218d8c548 100644 --- a/fs/march/march.go +++ b/fs/march/march.go @@ -399,7 +399,7 @@ func (m *March) processJob(job listDirJob) ([]listDirJob, error) { if m.NoTraverse && !m.NoCheckDest { originalSrcChan := srcChan srcChan = make(chan fs.DirEntry, 100) - ls, err := list.NewSorter(m.Ctx, list.SortToChan(dstChan), m.key) + ls, err := list.NewSorter(m.Ctx, m.Fdst, list.SortToChan(dstChan), m.key) if err != nil { return nil, err } diff --git a/go.mod b/go.mod index 57c94d8b5..4607bf76b 100644 --- a/go.mod +++ b/go.mod @@ -167,6 +167,7 @@ require ( github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/kr/fs v0.1.0 // indirect github.com/kylelemons/godebug v1.1.0 // indirect + github.com/lanrat/extsort v1.0.2 // indirect github.com/lpar/date v1.0.0 // indirect github.com/lucasb-eyer/go-colorful v1.2.0 // indirect github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed // indirect diff --git a/go.sum b/go.sum index 9173d0e20..a87be2631 100644 --- a/go.sum +++ b/go.sum @@ -431,6 +431,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/lanrat/extsort v1.0.2 h1:p3MLVpQEPwEGPzeLBb+1eSErzRl6Bgjgr+qnIs2RxrU= +github.com/lanrat/extsort v1.0.2/go.mod h1:ivzsdLm8Tv+88qbdpMElV6Z15StlzPUtZSKsGb51hnQ= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= github.com/lpar/date v1.0.0 h1:bq/zVqFTUmsxvd/CylidY4Udqpr9BOFrParoP6p0x/I=