From b87a9c8354a18d9bd4e3d71c371567330951da84 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Mon, 17 Apr 2023 19:32:15 +0100 Subject: [PATCH] walk: do filtering in parallel to speed up `rclone delete` - FIXME okish See: https://forum.rclone.org/t/rclone-delete-on-s3-very-slow/37543 --- fs/walk/walk.go | 50 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/fs/walk/walk.go b/fs/walk/walk.go index 01251ea1b..d0a67a794 100644 --- a/fs/walk/walk.go +++ b/fs/walk/walk.go @@ -15,6 +15,7 @@ import ( "github.com/rclone/rclone/fs/dirtree" "github.com/rclone/rclone/fs/filter" "github.com/rclone/rclone/fs/list" + "golang.org/x/sync/errgroup" ) // ErrorSkipDir is used as a return value from Walk to indicate that the @@ -294,6 +295,27 @@ func listR(ctx context.Context, f fs.Fs, path string, includeAll bool, listType if synthesizeDirs { dm = newDirMap(path) } + // nil the entry out if not included + filterEntry := func(ctx context.Context, entry *fs.DirEntry) (err error) { + var include bool + switch x := (*entry).(type) { + case fs.Object: + include = fi.IncludeObject(ctx, x) + case fs.Directory: + include, err = includeDirectory(x.Remote()) + if err != nil { + return err + } + default: + return fmt.Errorf("unknown object type %T", entry) + } + if !include { + fs.Debugf(entry, "Excluded from sync (and deletion)") + *entry = nil + } + return nil + } + ci := fs.GetConfig(ctx) var mu sync.Mutex err := doListR(ctx, path, func(entries fs.DirEntries) (err error) { if synthesizeDirs { @@ -304,24 +326,22 @@ func listR(ctx context.Context, f fs.Fs, path string, includeAll bool, listType } listType.Filter(&entries) if !includeAll { + g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(ci.Checkers) + for i := range entries { + entry := &entries[i] + g.Go(func() error { + return filterEntry(gCtx, entry) + }) + } + err = g.Wait() + if err != nil { + return err + } filteredEntries := entries[:0] for _, entry := range entries { - var include bool - switch x := entry.(type) { - case fs.Object: - include = fi.IncludeObject(ctx, x) - case fs.Directory: - include, err = includeDirectory(x.Remote()) - if err != nil { - return err - } - default: - return fmt.Errorf("unknown object type %T", entry) - } - if include { + if entry != nil { filteredEntries = append(filteredEntries, entry) - } else { - fs.Debugf(entry, "Excluded from sync (and deletion)") } } entries = filteredEntries