mirror of
https://github.com/rclone/rclone.git
synced 2025-01-23 14:49:25 +01:00
drive: fix missing items when listing using --fast-list / ListR
This is caused by a bug in Google drive where, in some circumstances querying for "(A in parents) or (B in parents)" returns nothing whereas querying for "A in parents" and "B in parents" separately works fine. This has been reported here: https://issuetracker.google.com/issues/149522397 This workaround detects this condition by seeing if a listing for more than one directory at once returns nothing. If it does then it retries each one individually. This can potentially have a false positive if the user has multiple empty directories which are queried at once. The consequence of this will be that ListR is disabled for a while until the directories are found to be actually empty in which case it will be re-enabled. Fixes #3114 and Fixes #4289
This commit is contained in:
parent
6cd8d3c4a0
commit
6054476c9c
@ -24,6 +24,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"text/template"
|
"text/template"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -68,6 +69,8 @@ const (
|
|||||||
minChunkSize = 256 * fs.KibiByte
|
minChunkSize = 256 * fs.KibiByte
|
||||||
defaultChunkSize = 8 * fs.MebiByte
|
defaultChunkSize = 8 * fs.MebiByte
|
||||||
partialFields = "id,name,size,md5Checksum,trashed,modifiedTime,createdTime,mimeType,parents,webViewLink,shortcutDetails"
|
partialFields = "id,name,size,md5Checksum,trashed,modifiedTime,createdTime,mimeType,parents,webViewLink,shortcutDetails"
|
||||||
|
listRGrouping = 50 // number of IDs to search at once when using ListR
|
||||||
|
listRInputBuffer = 1000 // size of input buffer when using ListR
|
||||||
)
|
)
|
||||||
|
|
||||||
// Globals
|
// Globals
|
||||||
@ -558,6 +561,9 @@ type Fs struct {
|
|||||||
isTeamDrive bool // true if this is a team drive
|
isTeamDrive bool // true if this is a team drive
|
||||||
fileFields googleapi.Field // fields to fetch file info with
|
fileFields googleapi.Field // fields to fetch file info with
|
||||||
m configmap.Mapper
|
m configmap.Mapper
|
||||||
|
grouping int32 // number of IDs to search at once in ListR - read with atomic
|
||||||
|
listRmu *sync.Mutex // protects listRempties
|
||||||
|
listRempties map[string]struct{} // IDs of supposedly empty directories which triggered grouping disable
|
||||||
}
|
}
|
||||||
|
|
||||||
type baseObject struct {
|
type baseObject struct {
|
||||||
@ -1079,11 +1085,14 @@ func NewFs(name, path string, m configmap.Mapper) (fs.Fs, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
f := &Fs{
|
f := &Fs{
|
||||||
name: name,
|
name: name,
|
||||||
root: root,
|
root: root,
|
||||||
opt: *opt,
|
opt: *opt,
|
||||||
pacer: newPacer(opt),
|
pacer: newPacer(opt),
|
||||||
m: m,
|
m: m,
|
||||||
|
grouping: listRGrouping,
|
||||||
|
listRmu: new(sync.Mutex),
|
||||||
|
listRempties: make(map[string]struct{}),
|
||||||
}
|
}
|
||||||
f.isTeamDrive = opt.TeamDriveID != ""
|
f.isTeamDrive = opt.TeamDriveID != ""
|
||||||
f.fileFields = f.getFileFields()
|
f.fileFields = f.getFileFields()
|
||||||
@ -1634,15 +1643,17 @@ func (s listRSlices) Less(i, j int) bool {
|
|||||||
// In each cycle it will read up to grouping entries from the in channel without blocking.
|
// In each cycle it will read up to grouping entries from the in channel without blocking.
|
||||||
// If an error occurs it will be send to the out channel and then return. Once the in channel is closed,
|
// If an error occurs it will be send to the out channel and then return. Once the in channel is closed,
|
||||||
// nil is send to the out channel and the function returns.
|
// nil is send to the out channel and the function returns.
|
||||||
func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in <-chan listREntry, out chan<- error, cb func(fs.DirEntry) error, grouping int) {
|
func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in chan listREntry, out chan<- error, cb func(fs.DirEntry) error) {
|
||||||
var dirs []string
|
var dirs []string
|
||||||
var paths []string
|
var paths []string
|
||||||
|
var grouping int32
|
||||||
|
|
||||||
for dir := range in {
|
for dir := range in {
|
||||||
dirs = append(dirs[:0], dir.id)
|
dirs = append(dirs[:0], dir.id)
|
||||||
paths = append(paths[:0], dir.path)
|
paths = append(paths[:0], dir.path)
|
||||||
|
grouping = atomic.LoadInt32(&f.grouping)
|
||||||
waitloop:
|
waitloop:
|
||||||
for i := 1; i < grouping; i++ {
|
for i := int32(1); i < grouping; i++ {
|
||||||
select {
|
select {
|
||||||
case d, ok := <-in:
|
case d, ok := <-in:
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -1655,6 +1666,7 @@ func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in <-chan list
|
|||||||
}
|
}
|
||||||
listRSlices{dirs, paths}.Sort()
|
listRSlices{dirs, paths}.Sort()
|
||||||
var iErr error
|
var iErr error
|
||||||
|
foundItems := false
|
||||||
_, err := f.list(ctx, dirs, "", false, false, false, func(item *drive.File) bool {
|
_, err := f.list(ctx, dirs, "", false, false, false, func(item *drive.File) bool {
|
||||||
// shared with me items have no parents when at the root
|
// shared with me items have no parents when at the root
|
||||||
if f.opt.SharedWithMe && len(item.Parents) == 0 && len(paths) == 1 && paths[0] == "" {
|
if f.opt.SharedWithMe && len(item.Parents) == 0 && len(paths) == 1 && paths[0] == "" {
|
||||||
@ -1662,6 +1674,7 @@ func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in <-chan list
|
|||||||
}
|
}
|
||||||
for _, parent := range item.Parents {
|
for _, parent := range item.Parents {
|
||||||
var i int
|
var i int
|
||||||
|
foundItems = true
|
||||||
earlyExit := false
|
earlyExit := false
|
||||||
// If only one item in paths then no need to search for the ID
|
// If only one item in paths then no need to search for the ID
|
||||||
// assuming google drive is doing its job properly.
|
// assuming google drive is doing its job properly.
|
||||||
@ -1702,6 +1715,53 @@ func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in <-chan list
|
|||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
})
|
})
|
||||||
|
// Found no items in more than one directory. Retry these as
|
||||||
|
// individual directories This is to work around a bug in google
|
||||||
|
// drive where (A in parents) or (B in parents) returns nothing
|
||||||
|
// sometimes. See #3114, #4289 and
|
||||||
|
// https://issuetracker.google.com/issues/149522397
|
||||||
|
if len(dirs) > 1 && !foundItems {
|
||||||
|
if atomic.SwapInt32(&f.grouping, 1) != 1 {
|
||||||
|
fs.Logf(f, "Disabling ListR to work around bug in drive as multi listing (%d) returned no entries", len(dirs))
|
||||||
|
}
|
||||||
|
var recycled = make([]listREntry, len(dirs))
|
||||||
|
f.listRmu.Lock()
|
||||||
|
for i := range dirs {
|
||||||
|
recycled[i] = listREntry{id: dirs[i], path: paths[i]}
|
||||||
|
// Make a note of these dirs - if they all turn
|
||||||
|
// out to be empty then we can re-enable grouping
|
||||||
|
f.listRempties[dirs[i]] = struct{}{}
|
||||||
|
}
|
||||||
|
f.listRmu.Unlock()
|
||||||
|
// recycle these in the background so we don't deadlock
|
||||||
|
// the listR runners if they all get here
|
||||||
|
wg.Add(len(recycled))
|
||||||
|
go func() {
|
||||||
|
for _, entry := range recycled {
|
||||||
|
in <- entry
|
||||||
|
}
|
||||||
|
fs.Debugf(f, "Recycled %d entries", len(recycled))
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
// If using a grouping of 1 and dir was empty then check to see if it
|
||||||
|
// is part of the group that caused grouping to be disabled.
|
||||||
|
if grouping == 1 && len(dirs) == 1 && !foundItems {
|
||||||
|
f.listRmu.Lock()
|
||||||
|
if _, found := f.listRempties[dirs[0]]; found {
|
||||||
|
// Remove the ID
|
||||||
|
delete(f.listRempties, dirs[0])
|
||||||
|
// If no empties left => all the directories that
|
||||||
|
// triggered the grouping being set to 1 were actually
|
||||||
|
// empty so must have made a mistake
|
||||||
|
if len(f.listRempties) == 0 {
|
||||||
|
if atomic.SwapInt32(&f.grouping, listRGrouping) != listRGrouping {
|
||||||
|
fs.Logf(f, "Re-enabling ListR as previous detection was in error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f.listRmu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
for range dirs {
|
for range dirs {
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}
|
}
|
||||||
@ -1736,11 +1796,6 @@ func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in <-chan list
|
|||||||
// Don't implement this unless you have a more efficient way
|
// Don't implement this unless you have a more efficient way
|
||||||
// of listing recursively that doing a directory traversal.
|
// of listing recursively that doing a directory traversal.
|
||||||
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
|
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
|
||||||
const (
|
|
||||||
grouping = 50
|
|
||||||
inputBuffer = 1000
|
|
||||||
)
|
|
||||||
|
|
||||||
err = f.dirCache.FindRoot(ctx, false)
|
err = f.dirCache.FindRoot(ctx, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -1753,7 +1808,7 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (
|
|||||||
|
|
||||||
mu := sync.Mutex{} // protects in and overflow
|
mu := sync.Mutex{} // protects in and overflow
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
in := make(chan listREntry, inputBuffer)
|
in := make(chan listREntry, listRInputBuffer)
|
||||||
out := make(chan error, fs.Config.Checkers)
|
out := make(chan error, fs.Config.Checkers)
|
||||||
list := walk.NewListRHelper(callback)
|
list := walk.NewListRHelper(callback)
|
||||||
overflow := []listREntry{}
|
overflow := []listREntry{}
|
||||||
@ -1766,6 +1821,9 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (
|
|||||||
job := listREntry{actualID(d.ID()), d.Remote()}
|
job := listREntry{actualID(d.ID()), d.Remote()}
|
||||||
select {
|
select {
|
||||||
case in <- job:
|
case in <- job:
|
||||||
|
// Adding the wg after we've entered the item is
|
||||||
|
// safe here because we know when the callback
|
||||||
|
// is called we are holding a waitgroup.
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
default:
|
default:
|
||||||
overflow = append(overflow, job)
|
overflow = append(overflow, job)
|
||||||
@ -1779,7 +1837,7 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (
|
|||||||
in <- listREntry{directoryID, dir}
|
in <- listREntry{directoryID, dir}
|
||||||
|
|
||||||
for i := 0; i < fs.Config.Checkers; i++ {
|
for i := 0; i < fs.Config.Checkers; i++ {
|
||||||
go f.listRRunner(ctx, &wg, in, out, cb, grouping)
|
go f.listRRunner(ctx, &wg, in, out, cb)
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
// wait until the all directories are processed
|
// wait until the all directories are processed
|
||||||
@ -1789,8 +1847,8 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (
|
|||||||
mu.Lock()
|
mu.Lock()
|
||||||
l := len(overflow)
|
l := len(overflow)
|
||||||
// only fill half of the channel to prevent entries being put into overflow again
|
// only fill half of the channel to prevent entries being put into overflow again
|
||||||
if l > inputBuffer/2 {
|
if l > listRInputBuffer/2 {
|
||||||
l = inputBuffer / 2
|
l = listRInputBuffer / 2
|
||||||
}
|
}
|
||||||
wg.Add(l)
|
wg.Add(l)
|
||||||
for _, d := range overflow[:l] {
|
for _, d := range overflow[:l] {
|
||||||
|
Loading…
Reference in New Issue
Block a user