feat: local: list objects in parallel controlled by the --checkers option -- fixes #6632

Signed-off-by: Anagh Kumar Baranwal <6824881+darthShadow@users.noreply.github.com>
This commit is contained in:
Anagh Kumar Baranwal 2023-08-01 18:42:00 +05:30
parent 25703ad20e
commit 050c7159de
No known key found for this signature in database

View File

@ -28,6 +28,7 @@ import (
"github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/file" "github.com/rclone/rclone/lib/file"
"github.com/rclone/rclone/lib/readers" "github.com/rclone/rclone/lib/readers"
"golang.org/x/sync/errgroup"
"golang.org/x/text/unicode/norm" "golang.org/x/text/unicode/norm"
) )
@ -486,6 +487,7 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
for { for {
var fis []os.FileInfo var fis []os.FileInfo
if useReadDir { if useReadDir {
// Windows and Plan9 read the directory entries with the stat information in which // Windows and Plan9 read the directory entries with the stat information in which
// shouldn't fail because of unreadable entries. // shouldn't fail because of unreadable entries.
@ -495,92 +497,145 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
} }
} else { } else {
// For other OSes we read the names only (which shouldn't fail) then stat the // For other OSes we read the names only (which shouldn't fail) then stat the
// individual ourselves so we can log errors but not fail the directory read. // individual ourselves, so we can log errors but not fail the directory read.
var names []string var names []string
names, err = fd.Readdirnames(1024) names, err = fd.Readdirnames(1024)
if err == io.EOF && len(names) == 0 { if err == io.EOF && len(names) == 0 {
break break
} }
fis = make([]os.FileInfo, len(names))
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(fs.GetConfig(ctx).Checkers)
if err == nil { if err == nil {
for _, name := range names { for i, name := range names {
namepath := filepath.Join(fsDirPath, name) i, name := i, name // https://golang.org/doc/faq#closures_and_goroutines
fi, fierr := os.Lstat(namepath) g.Go(func() error {
if os.IsNotExist(fierr) { // No point in continuing if context has been cancelled
// skip entry removed by a concurrent goroutine if gCtx.Err() != nil {
continue return nil
}
if fierr != nil {
// Don't report errors on any file names that are excluded
if useFilter {
newRemote := f.cleanRemote(dir, name)
if !filter.IncludeRemote(newRemote) {
continue
}
} }
fierr = fmt.Errorf("failed to get info about directory entry %q: %w", namepath, fierr)
fs.Errorf(dir, "%v", fierr) var err error
_ = accounting.Stats(ctx).Error(fserrors.NoRetryError(fierr)) // fail the sync namepath := filepath.Join(fsDirPath, name)
continue fi, fierr := os.Lstat(namepath)
} if os.IsNotExist(fierr) {
fis = append(fis, fi) // skip entry removed by a concurrent goroutine
return nil
}
if fierr != nil {
if useFilter {
newRemote := f.cleanRemote(dir, name)
if !filter.IncludeRemote(newRemote) {
return nil
}
}
err = fmt.Errorf("failed to get info about directory entry %q: %w", namepath, fierr)
fs.Errorf(dir, "%v", err)
_ = accounting.Stats(gCtx).Error(fserrors.NoRetryError(err)) // fail the sync
return nil
}
fis[i] = fi
return nil
})
} }
} }
err = g.Wait()
} }
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to read directory entry: %w", err) return nil, fmt.Errorf("failed to read directory entry: %w", err)
} }
for _, fi := range fis { loopEntries := make(fs.DirEntries, len(fis))
name := fi.Name()
mode := fi.Mode() g, gCtx := errgroup.WithContext(ctx)
newRemote := f.cleanRemote(dir, name) g.SetLimit(fs.GetConfig(ctx).Checkers)
// Follow symlinks if required for i, fi := range fis {
if f.opt.FollowSymlinks && (mode&os.ModeSymlink) != 0 { if fi == nil {
localPath := filepath.Join(fsDirPath, name) continue
fi, err = os.Stat(localPath)
// Quietly skip errors on excluded files and directories
if err != nil && useFilter && !filter.IncludeRemote(newRemote) {
continue
}
if os.IsNotExist(err) || isCircularSymlinkError(err) {
// Skip bad symlinks and circular symlinks
err = fserrors.NoRetryError(fmt.Errorf("symlink: %w", err))
fs.Errorf(newRemote, "Listing error: %v", err)
err = accounting.Stats(ctx).Error(err)
continue
}
if err != nil {
return nil, err
}
mode = fi.Mode()
} }
if fi.IsDir() { i, fi := i, fi // https://golang.org/doc/faq#closures_and_goroutines
// Ignore directories which are symlinks. These are junction points under windows which g.Go(func() error {
// are kind of a souped up symlink. Unix doesn't have directories which are symlinks. // No point in continuing if context has been cancelled
if (mode&os.ModeSymlink) == 0 && f.dev == readDevice(fi, f.opt.OneFileSystem) { if gCtx.Err() != nil {
d := fs.NewDir(newRemote, fi.ModTime()) return nil
entries = append(entries, d)
} }
} else {
// Check whether this link should be translated var err error
if f.opt.TranslateSymlinks && fi.Mode()&os.ModeSymlink != 0 { name := fi.Name()
newRemote += linkSuffix mode := fi.Mode()
newRemote := f.cleanRemote(dir, name)
// Follow symlinks if required
if f.opt.FollowSymlinks && (mode&os.ModeSymlink) != 0 {
localPath := filepath.Join(fsDirPath, name)
fi, err = os.Stat(localPath)
if err != nil {
// Quietly skip errors on excluded files and directories
if useFilter && !filter.IncludeRemote(newRemote) {
return nil
}
if os.IsNotExist(err) || isCircularSymlinkError(err) {
// Skip bad symlinks and circular symlinks
err = fserrors.NoRetryError(fmt.Errorf("symlink: %w", err))
fs.Errorf(newRemote, "Listing error: %v", err)
_ = accounting.Stats(gCtx).Error(err)
return nil
}
return err
}
mode = fi.Mode()
} }
// Don't include non directory if not included
// we leave directory filtering to the layer above // No point in continuing if context has been cancelled
if useFilter && !filter.IncludeRemote(newRemote) { if gCtx.Err() != nil {
continue return nil
} }
fso, err := f.newObjectWithInfo(newRemote, fi)
if err != nil { if fi.IsDir() {
return nil, err // Ignore directories which are symlinks. These are junction points under windows which
} // are kind of a souped up symlink. Unix doesn't have directories which are symlinks.
if fso.Storable() { if (mode&os.ModeSymlink) == 0 && f.dev == readDevice(fi, f.opt.OneFileSystem) {
entries = append(entries, fso) d := fs.NewDir(newRemote, fi.ModTime())
loopEntries[i] = d
return nil
}
} else {
// Check whether this link should be translated
if f.opt.TranslateSymlinks && fi.Mode()&os.ModeSymlink != 0 {
newRemote += linkSuffix
}
// Don't include non directory if not included
// we leave directory filtering to the layer above
if useFilter && !filter.IncludeRemote(newRemote) {
return nil
}
fso, err := f.newObjectWithInfo(newRemote, fi)
if err != nil {
return err
}
if fso.Storable() {
loopEntries[i] = fso
return nil
}
} }
return nil
})
}
err = g.Wait()
if err != nil {
return nil, err
}
for _, entry := range loopEntries {
if entry == nil {
continue
} }
entries = append(entries, entry)
} }
} }
return entries, nil return entries, nil
} }