rclone/fs/walk.go
Nick Craig-Wood 5250fcdf08 core: fix data race in walk
This was detected by the race detector when the client of Walk() sorted entries.
2017-06-15 16:46:32 +01:00

417 lines
11 KiB
Go

// Walking directories
package fs
import (
"bytes"
"fmt"
"path"
"sort"
"strings"
"sync"
"github.com/pkg/errors"
)
// ErrorSkipDir is used as a return value from Walk to indicate that the
// directory named in the call is to be skipped. It is not returned as
// an error by any function.
var ErrorSkipDir = errors.New("skip this directory")
// ErrorCantListR is returned by WalkR if the underlying Fs isn't
// capable of doing a recursive listing.
var ErrorCantListR = errors.New("recursive directory listing not available")
// WalkFunc is the type of the function called for directory
// visited by Walk. The path argument contains remote path to the directory.
//
// If there was a problem walking to directory named by path, the
// incoming error will describe the problem and the function can
// decide how to handle that error (and Walk will not descend into
// that directory). If an error is returned, processing stops. The
// sole exception is when the function returns the special value
// ErrorSkipDir. If the function returns ErrorSkipDir, Walk skips the
// directory's contents entirely.
type WalkFunc func(path string, entries DirEntries, err error) error
// Walk lists the directory.
//
// If includeAll is not set it will use the filters defined.
//
// If maxLevel is < 0 then it will recurse indefinitely, else it will
// only do maxLevel levels.
//
// It calls fn for each tranche of DirEntries read.
//
// Note that fn will not be called concurrently whereas the directory
// listing will proceed concurrently.
//
// Parent directories are always listed before their children
//
// This is implemented by WalkR if Config.UseRecursiveListing is true
// and f supports it and level > 1, or WalkN otherwise.
//
// NB (f, path) to be replaced by fs.Dir at some point
func Walk(f Fs, path string, includeAll bool, maxLevel int, fn WalkFunc) error {
if (maxLevel < 0 || maxLevel > 1) && Config.UseListR && f.Features().ListR != nil {
return WalkR(f, path, includeAll, maxLevel, fn)
}
return WalkN(f, path, includeAll, maxLevel, fn)
}
// WalkN lists the directory.
//
// It implements Walk using non recursive directory listing.
func WalkN(f Fs, path string, includeAll bool, maxLevel int, fn WalkFunc) error {
return walk(f, path, includeAll, maxLevel, fn, ListDirSorted)
}
// WalkR lists the directory.
//
// It implements Walk using recursive directory listing if
// available, or returns ErrorCantListR if not.
func WalkR(f Fs, path string, includeAll bool, maxLevel int, fn WalkFunc) error {
listR := f.Features().ListR
if listR == nil {
return ErrorCantListR
}
return walkR(f, path, includeAll, maxLevel, fn, listR)
}
type listDirFunc func(fs Fs, includeAll bool, dir string) (entries DirEntries, err error)
func walk(f Fs, path string, includeAll bool, maxLevel int, fn WalkFunc, listDir listDirFunc) error {
var (
wg sync.WaitGroup // sync closing of go routines
traversing sync.WaitGroup // running directory traversals
doClose sync.Once // close the channel once
mu sync.Mutex // stop fn being called concurrently
)
// listJob describe a directory listing that needs to be done
type listJob struct {
remote string
depth int
}
in := make(chan listJob, Config.Checkers)
errs := make(chan error, 1)
quit := make(chan struct{})
closeQuit := func() {
doClose.Do(func() {
close(quit)
go func() {
for _ = range in {
traversing.Done()
}
}()
})
}
for i := 0; i < Config.Checkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case job, ok := <-in:
if !ok {
return
}
entries, err := listDir(f, includeAll, job.remote)
var jobs []listJob
if err == nil && job.depth != 0 {
entries.ForDir(func(dir *Dir) {
// Recurse for the directory
jobs = append(jobs, listJob{
remote: dir.Remote(),
depth: job.depth - 1,
})
})
}
mu.Lock()
err = fn(job.remote, entries, err)
mu.Unlock()
// NB once we have passed entries to fn we mustn't touch it again
if err != nil && err != ErrorSkipDir {
traversing.Done()
Stats.Error()
Errorf(job.remote, "error listing: %v", err)
closeQuit()
// Send error to error channel if space
select {
case errs <- err:
default:
}
continue
}
if err == nil && len(jobs) > 0 {
traversing.Add(len(jobs))
go func() {
// Now we have traversed this directory, send these
// jobs off for traversal in the background
for _, newJob := range jobs {
in <- newJob
}
}()
}
traversing.Done()
case <-quit:
return
}
}
}()
}
// Start the process
traversing.Add(1)
in <- listJob{
remote: path,
depth: maxLevel - 1,
}
traversing.Wait()
close(in)
wg.Wait()
close(errs)
// return the first error returned or nil
return <-errs
}
// DirTree is a map of directories to entries
type DirTree map[string]DirEntries
// parentDir finds the parent directory of path
func parentDir(entryPath string) string {
dirPath := path.Dir(entryPath)
if dirPath == "." {
dirPath = ""
}
return dirPath
}
// add an entry to the tree
func (dt DirTree) add(entry BasicInfo) {
dirPath := parentDir(entry.Remote())
dt[dirPath] = append(dt[dirPath], entry)
}
// add a directory entry to the tree
func (dt DirTree) addDir(entry BasicInfo) {
dt.add(entry)
// create the directory itself if it doesn't exist already
dirPath := entry.Remote()
if _, ok := dt[dirPath]; !ok {
dt[dirPath] = nil
}
}
// check that dirPath has a *Dir in its parent
func (dt DirTree) checkParent(root, dirPath string) {
if dirPath == root {
return
}
parentPath := parentDir(dirPath)
entries := dt[parentPath]
for _, entry := range entries {
if entry.Remote() == dirPath {
return
}
}
dt[parentPath] = append(entries, &Dir{
Name: dirPath,
})
dt.checkParent(root, parentPath)
}
// check every directory in the tree has *Dir in its parent
func (dt DirTree) checkParents(root string) {
for dirPath := range dt {
dt.checkParent(root, dirPath)
}
}
// Sort sorts all the Entries
func (dt DirTree) Sort() {
for _, entries := range dt {
sort.Sort(entries)
}
}
// Dirs returns the directories in sorted order
func (dt DirTree) Dirs() (dirNames []string) {
for dirPath := range dt {
dirNames = append(dirNames, dirPath)
}
sort.Strings(dirNames)
return dirNames
}
// String emits a simple representation of the DirTree
func (dt DirTree) String() string {
out := new(bytes.Buffer)
for _, dir := range dt.Dirs() {
fmt.Fprintf(out, "%s/\n", dir)
for _, entry := range dt[dir] {
flag := ""
if _, ok := entry.(*Dir); ok {
flag = "/"
}
fmt.Fprintf(out, " %s%s\n", path.Base(entry.Remote()), flag)
}
}
return out.String()
}
func walkRDirTree(f Fs, path string, includeAll bool, maxLevel int, listR ListRFn) (DirTree, error) {
dirs := make(DirTree)
var mu sync.Mutex
err := listR(path, func(entries DirEntries) error {
mu.Lock()
defer mu.Unlock()
for _, entry := range entries {
slashes := strings.Count(entry.Remote(), "/")
switch x := entry.(type) {
case Object:
// Make sure we don't delete excluded files if not required
if includeAll || Config.Filter.IncludeObject(x) {
if maxLevel < 0 || slashes <= maxLevel-1 {
dirs.add(x)
} else {
// Make sure we include any parent directories of excluded objects
dirPath := x.Remote()
for ; slashes > maxLevel-1; slashes-- {
dirPath = parentDir(dirPath)
}
dirs.checkParent(path, dirPath)
}
} else {
Debugf(x, "Excluded from sync (and deletion)")
}
case *Dir:
if includeAll || Config.Filter.IncludeDirectory(x.Remote()) {
if maxLevel < 0 || slashes <= maxLevel-1 {
if slashes == maxLevel-1 {
// Just add the object if at maxLevel
dirs.add(x)
} else {
dirs.addDir(x)
}
}
} else {
Debugf(x, "Excluded from sync (and deletion)")
}
}
}
return nil
})
if err != nil {
return nil, err
}
dirs.checkParents(path)
if len(dirs) == 0 {
dirs[path] = nil
}
dirs.Sort()
return dirs, nil
}
// NewDirTree returns a DirTree filled with the directory listing
// using the parameters supplied. This will return ErrorCantListR for
// remotes which don't support ListR.
func NewDirTree(f Fs, path string, includeAll bool, maxLevel int) (DirTree, error) {
listR := f.Features().ListR
if listR == nil {
return nil, ErrorCantListR
}
return walkRDirTree(f, path, includeAll, maxLevel, listR)
}
func walkR(f Fs, path string, includeAll bool, maxLevel int, fn WalkFunc, listR ListRFn) error {
dirs, err := walkRDirTree(f, path, includeAll, maxLevel, listR)
if err != nil {
return err
}
skipping := false
skipPrefix := ""
emptyDir := DirEntries{}
for _, dirPath := range dirs.Dirs() {
if skipping {
// Skip over directories as required
if strings.HasPrefix(dirPath, skipPrefix) {
continue
}
skipping = false
}
entries := dirs[dirPath]
if entries == nil {
entries = emptyDir
}
sort.Sort(entries)
err = fn(dirPath, entries, nil)
if err == ErrorSkipDir {
skipping = true
skipPrefix = dirPath
if skipPrefix != "" {
skipPrefix += "/"
}
} else if err != nil {
return err
}
}
return nil
}
// WalkGetAll runs Walk getting all the results
func WalkGetAll(f Fs, path string, includeAll bool, maxLevel int) (objs []Object, dirs []*Dir, err error) {
err = Walk(f, path, includeAll, maxLevel, func(dirPath string, entries DirEntries, err error) error {
if err != nil {
return err
}
for _, entry := range entries {
switch x := entry.(type) {
case Object:
objs = append(objs, x)
case *Dir:
dirs = append(dirs, x)
}
}
return nil
})
return
}
// ListRHelper is used in the implementation of ListR to accumulate DirEntries
type ListRHelper struct {
callback ListRCallback
entries DirEntries
}
// NewListRHelper should be called from ListR with the callback passed in
func NewListRHelper(callback ListRCallback) *ListRHelper {
return &ListRHelper{
callback: callback,
}
}
// send sends the stored entries to the callback if there are >= max
// entries.
func (lh *ListRHelper) send(max int) (err error) {
if len(lh.entries) >= max {
err = lh.callback(lh.entries)
lh.entries = lh.entries[:0]
}
return err
}
// Add an entry to the stored entries and send them if there are more
// than a certain amount
func (lh *ListRHelper) Add(entry BasicInfo) error {
if entry == nil {
return nil
}
lh.entries = append(lh.entries, entry)
return lh.send(100)
}
// Flush the stored entries (if any) sending them to the callback
func (lh *ListRHelper) Flush() error {
return lh.send(1)
}