diff --git a/fs/march/march.go b/fs/march/march.go index e3ffd58c7..67a5b5bfa 100644 --- a/fs/march/march.go +++ b/fs/march/march.go @@ -16,7 +16,6 @@ import ( "github.com/rclone/rclone/fs/list" "github.com/rclone/rclone/fs/walk" "github.com/rclone/rclone/lib/transform" - "golang.org/x/sync/errgroup" "golang.org/x/text/unicode/norm" ) @@ -291,6 +290,7 @@ func (m *March) matchListings(srcChan, dstChan <-chan fs.DirEntry, srcOnly, dstO srcPrev, dstPrev fs.DirEntry srcPrevName, dstPrevName string src, dst fs.DirEntry + srcHasMore, dstHasMore = true, true srcName, dstName string ) srcDone := func() { @@ -311,14 +311,14 @@ func (m *March) matchListings(srcChan, dstChan <-chan fs.DirEntry, srcOnly, dstO } // Reload src and dst if needed - we set them to nil if used if src == nil { - src = <-srcChan + src, srcHasMore = <-srcChan srcName = m.srcKey(src) } if dst == nil { - dst = <-dstChan + dst, dstHasMore = <-dstChan dstName = m.dstKey(dst) } - if src == nil && dst == nil { + if !srcHasMore && !dstHasMore { break } if src != nil && srcPrev != nil { @@ -419,38 +419,65 @@ func (m *March) processJob(job listDirJob) ([]listDirJob, error) { // If NoTraverse is set, then try to find a matching object // for each item in the srcList to head dst object if m.NoTraverse && !m.NoCheckDest { + startedDst = true + workers := ci.Checkers originalSrcChan := srcChan srcChan = make(chan fs.DirEntry, 100) - ls, err := list.NewSorter(m.Ctx, m.Fdst, list.SortToChan(dstChan), m.dstKey) - if err != nil { - return nil, err + + type matchTask struct { + src fs.DirEntry // src object to find in destination + dstMatch chan<- fs.DirEntry // channel to receive matching dst object or nil + } + matchTasks := make(chan matchTask, workers) + dstMatches := make(chan (<-chan fs.DirEntry), workers) + + // Create the tasks from the originalSrcChan. These are put into matchTasks for + // processing and dstMatches so they can be retrieved in order. + go func() { + for src := range originalSrcChan { + srcChan <- src + dstMatch := make(chan fs.DirEntry, 1) + matchTasks <- matchTask{ + src: src, + dstMatch: dstMatch, + } + dstMatches <- dstMatch + } + close(matchTasks) + }() + + // Get the tasks from the queue and find a matching object. + var workerWg sync.WaitGroup + for range workers { + workerWg.Add(1) + go func() { + defer workerWg.Done() + for t := range matchTasks { + leaf := path.Base(t.src.Remote()) + dst, err := m.Fdst.NewObject(m.Ctx, path.Join(job.dstRemote, leaf)) + if err != nil { + dst = nil + } + t.dstMatch <- dst + } + }() } - startedDst = true + // Close dstResults when all the workers have finished + go func() { + workerWg.Wait() + close(dstMatches) + }() + + // Read the matches in order and send them to dstChan if found. wg.Add(1) go func() { defer wg.Done() - defer ls.CleanUp() - - g, gCtx := errgroup.WithContext(m.Ctx) - g.SetLimit(ci.Checkers) - for src := range originalSrcChan { - srcChan <- src - if srcObj, ok := src.(fs.Object); ok { - g.Go(func() error { - leaf := path.Base(srcObj.Remote()) - dstObj, err := m.Fdst.NewObject(gCtx, path.Join(job.dstRemote, leaf)) - if err == nil { - _ = ls.Add(fs.DirEntries{dstObj}) // ignore errors - } - return nil // ignore errors - }) - } - } - dstListErr = g.Wait() - sendErr := ls.Send() - if dstListErr == nil { - dstListErr = sendErr + for dstMatch := range dstMatches { + dst := <-dstMatch + // Note that dst may be nil here + // We send these on so we don't deadlock the reader + dstChan <- dst } close(srcChan) close(dstChan) diff --git a/fs/sync/sync_test.go b/fs/sync/sync_test.go index a57fb2ebe..9adf5b11a 100644 --- a/fs/sync/sync_test.go +++ b/fs/sync/sync_test.go @@ -216,6 +216,35 @@ func TestCopyNoTraverse(t *testing.T) { r.CheckRemoteItems(t, file1) } +func TestCopyNoTraverseDeadlock(t *testing.T) { + r := fstest.NewRun(t) + if !r.Fremote.Features().IsLocal { + t.Skip("Only runs on local") + } + const nFiles = 200 + t1 := fstest.Time("2001-02-03T04:05:06.499999999Z") + + // Create lots of source files. + items := make([]fstest.Item, nFiles) + for i := range items { + name := fmt.Sprintf("file%d.txt", i) + items[i] = r.WriteFile(name, fmt.Sprintf("content%d", i), t1) + } + r.CheckLocalItems(t, items...) + + // Set --no-traverse + ctx, ci := fs.AddConfig(context.Background()) + ci.NoTraverse = true + + // Initial copy to establish destination. + require.NoError(t, CopyDir(ctx, r.Fremote, r.Flocal, false)) + r.CheckRemoteItems(t, items...) + + // Second copy which shouldn't deadlock + require.NoError(t, CopyDir(ctx, r.Flocal, r.Fremote, false)) + r.CheckRemoteItems(t, items...) +} + // Now with --check-first func TestCopyCheckFirst(t *testing.T) { ctx := context.Background()