mirror of
https://github.com/rclone/rclone.git
synced 2025-01-22 06:09:21 +01:00
walk: do filtering in parallel to speed up rclone delete
- FIXME okish
See: https://forum.rclone.org/t/rclone-delete-on-s3-very-slow/37543
This commit is contained in:
parent
9a9ef040e3
commit
b87a9c8354
@ -15,6 +15,7 @@ import (
|
|||||||
"github.com/rclone/rclone/fs/dirtree"
|
"github.com/rclone/rclone/fs/dirtree"
|
||||||
"github.com/rclone/rclone/fs/filter"
|
"github.com/rclone/rclone/fs/filter"
|
||||||
"github.com/rclone/rclone/fs/list"
|
"github.com/rclone/rclone/fs/list"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ErrorSkipDir is used as a return value from Walk to indicate that the
|
// ErrorSkipDir is used as a return value from Walk to indicate that the
|
||||||
@ -294,6 +295,27 @@ func listR(ctx context.Context, f fs.Fs, path string, includeAll bool, listType
|
|||||||
if synthesizeDirs {
|
if synthesizeDirs {
|
||||||
dm = newDirMap(path)
|
dm = newDirMap(path)
|
||||||
}
|
}
|
||||||
|
// nil the entry out if not included
|
||||||
|
filterEntry := func(ctx context.Context, entry *fs.DirEntry) (err error) {
|
||||||
|
var include bool
|
||||||
|
switch x := (*entry).(type) {
|
||||||
|
case fs.Object:
|
||||||
|
include = fi.IncludeObject(ctx, x)
|
||||||
|
case fs.Directory:
|
||||||
|
include, err = includeDirectory(x.Remote())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unknown object type %T", entry)
|
||||||
|
}
|
||||||
|
if !include {
|
||||||
|
fs.Debugf(entry, "Excluded from sync (and deletion)")
|
||||||
|
*entry = nil
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
ci := fs.GetConfig(ctx)
|
||||||
var mu sync.Mutex
|
var mu sync.Mutex
|
||||||
err := doListR(ctx, path, func(entries fs.DirEntries) (err error) {
|
err := doListR(ctx, path, func(entries fs.DirEntries) (err error) {
|
||||||
if synthesizeDirs {
|
if synthesizeDirs {
|
||||||
@ -304,24 +326,22 @@ func listR(ctx context.Context, f fs.Fs, path string, includeAll bool, listType
|
|||||||
}
|
}
|
||||||
listType.Filter(&entries)
|
listType.Filter(&entries)
|
||||||
if !includeAll {
|
if !includeAll {
|
||||||
|
g, gCtx := errgroup.WithContext(ctx)
|
||||||
|
g.SetLimit(ci.Checkers)
|
||||||
|
for i := range entries {
|
||||||
|
entry := &entries[i]
|
||||||
|
g.Go(func() error {
|
||||||
|
return filterEntry(gCtx, entry)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
err = g.Wait()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
filteredEntries := entries[:0]
|
filteredEntries := entries[:0]
|
||||||
for _, entry := range entries {
|
for _, entry := range entries {
|
||||||
var include bool
|
if entry != nil {
|
||||||
switch x := entry.(type) {
|
|
||||||
case fs.Object:
|
|
||||||
include = fi.IncludeObject(ctx, x)
|
|
||||||
case fs.Directory:
|
|
||||||
include, err = includeDirectory(x.Remote())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("unknown object type %T", entry)
|
|
||||||
}
|
|
||||||
if include {
|
|
||||||
filteredEntries = append(filteredEntries, entry)
|
filteredEntries = append(filteredEntries, entry)
|
||||||
} else {
|
|
||||||
fs.Debugf(entry, "Excluded from sync (and deletion)")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
entries = filteredEntries
|
entries = filteredEntries
|
||||||
|
Loading…
Reference in New Issue
Block a user