diff --git a/fs/march/march.go b/fs/march/march.go index 685c3c671..a7a86c726 100644 --- a/fs/march/march.go +++ b/fs/march/march.go @@ -2,10 +2,11 @@ package march import ( + "cmp" "context" "fmt" "path" - "sort" + "slices" "strings" "sync" @@ -14,9 +15,17 @@ import ( "github.com/rclone/rclone/fs/filter" "github.com/rclone/rclone/fs/list" "github.com/rclone/rclone/fs/walk" + "golang.org/x/sync/errgroup" "golang.org/x/text/unicode/norm" ) +// matchTransformFn converts a name into a form which is used for +// comparison in matchListings. +type matchTransformFn func(name string) string + +// list a directory into callback returning err +type listDirFn func(dir string, callback fs.ListRCallback) (err error) + // March holds the data used to traverse two Fs simultaneously, // calling Callback for each match type March struct { @@ -35,7 +44,6 @@ type March struct { srcListDir listDirFn // function to call to list a directory in the src dstListDir listDirFn // function to call to list a directory in the dst transforms []matchTransformFn - limiter chan struct{} // make sure we don't do too many operations at once } // Marcher is called on each match @@ -70,12 +78,19 @@ func (m *March) init(ctx context.Context) { if m.Fdst.Features().CaseInsensitive || ci.IgnoreCaseSync { m.transforms = append(m.transforms, strings.ToLower) } - // Limit parallelism for operations - m.limiter = make(chan struct{}, ci.Checkers) } -// list a directory into entries, err -type listDirFn func(dir string) (entries fs.DirEntries, err error) +// key turns a directory entry into a sort key using the defined transforms. +func (m *March) key(entry fs.DirEntry) string { + if entry == nil { + return "" + } + name := path.Base(entry.Remote()) + for _, transform := range m.transforms { + name = transform(name) + } + return name +} // makeListDir makes constructs a listing function for the given fs // and includeAll flags for marching through the file system. @@ -85,9 +100,9 @@ func (m *March) makeListDir(ctx context.Context, f fs.Fs, includeAll bool) listD fi := filter.GetConfig(ctx) if !(ci.UseListR && f.Features().ListR != nil) && // !--fast-list active and !(ci.NoTraverse && fi.HaveFilesFrom()) { // !(--files-from and --no-traverse) - return func(dir string) (entries fs.DirEntries, err error) { + return func(dir string, callback fs.ListRCallback) (err error) { dirCtx := filter.SetUseFilter(m.Ctx, f.Features().FilterAware && !includeAll) // make filter-aware backends constrain List - return list.DirSorted(dirCtx, f, includeAll, dir) + return list.DirSortedFn(dirCtx, f, includeAll, dir, callback, m.key) } } @@ -99,7 +114,7 @@ func (m *March) makeListDir(ctx context.Context, f fs.Fs, includeAll bool) listD dirs dirtree.DirTree dirsErr error ) - return func(dir string) (entries fs.DirEntries, err error) { + return func(dir string, callback fs.ListRCallback) (err error) { mu.Lock() defer mu.Unlock() if !started { @@ -108,15 +123,23 @@ func (m *March) makeListDir(ctx context.Context, f fs.Fs, includeAll bool) listD started = true } if dirsErr != nil { - return nil, dirsErr + return dirsErr } entries, ok := dirs[dir] if !ok { - err = fs.ErrorDirNotFound - } else { - delete(dirs, dir) + return fs.ErrorDirNotFound } - return entries, err + delete(dirs, dir) + + // We use a stable sort here just in case there are + // duplicates. Assuming the remote delivers the entries in a + // consistent order, this will give the best user experience + // in syncing as it will use the first entry for the sync + // comparison. + slices.SortStableFunc(entries, func(a, b fs.DirEntry) int { + return cmp.Compare(m.key(a), m.key(b)) + }) + return callback(entries) } } @@ -233,148 +256,95 @@ func (m *March) aborting() bool { return false } -// matchEntry is an entry plus transformed name -type matchEntry struct { - entry fs.DirEntry - leaf string - name string -} - -// matchEntries contains many matchEntry~s -type matchEntries []matchEntry - -// Len is part of sort.Interface. -func (es matchEntries) Len() int { return len(es) } - -// Swap is part of sort.Interface. -func (es matchEntries) Swap(i, j int) { es[i], es[j] = es[j], es[i] } - -// Less is part of sort.Interface. -// -// Compare in order (name, leaf, remote) -func (es matchEntries) Less(i, j int) bool { - ei, ej := &es[i], &es[j] - if ei.name == ej.name { - if ei.leaf == ej.leaf { - return fs.CompareDirEntries(ei.entry, ej.entry) < 0 - } - return ei.leaf < ej.leaf - } - return ei.name < ej.name -} - -// Sort the directory entries by (name, leaf, remote) -// -// We use a stable sort here just in case there are -// duplicates. Assuming the remote delivers the entries in a -// consistent order, this will give the best user experience -// in syncing as it will use the first entry for the sync -// comparison. -func (es matchEntries) sort() { - sort.Stable(es) -} - -// make a matchEntries from a newMatch entries -func newMatchEntries(entries fs.DirEntries, transforms []matchTransformFn) matchEntries { - es := make(matchEntries, len(entries)) - for i := range es { - es[i].entry = entries[i] - name := path.Base(entries[i].Remote()) - es[i].leaf = name - for _, transform := range transforms { - name = transform(name) - } - es[i].name = name - } - es.sort() - return es -} - -// matchPair is a matched pair of direntries returned by matchListings -type matchPair struct { - src, dst fs.DirEntry -} - -// matchTransformFn converts a name into a form which is used for -// comparison in matchListings. -type matchTransformFn func(name string) string - // Process the two listings, matching up the items in the two slices // using the transform function on each name first. // // Into srcOnly go Entries which only exist in the srcList // Into dstOnly go Entries which only exist in the dstList -// Into matches go matchPair's of src and dst which have the same name +// Into match go matchPair's of src and dst which have the same name // // This checks for duplicates and checks the list is sorted. -func matchListings(srcListEntries, dstListEntries fs.DirEntries, transforms []matchTransformFn) (srcOnly fs.DirEntries, dstOnly fs.DirEntries, matches []matchPair) { - srcList := newMatchEntries(srcListEntries, transforms) - dstList := newMatchEntries(dstListEntries, transforms) - - for iSrc, iDst := 0, 0; ; iSrc, iDst = iSrc+1, iDst+1 { - var src, dst fs.DirEntry - var srcName, dstName string - if iSrc < len(srcList) { - src = srcList[iSrc].entry - srcName = srcList[iSrc].name +func (m *March) matchListings(srcChan, dstChan <-chan fs.DirEntry, srcOnly, dstOnly func(fs.DirEntry), match func(dst, src fs.DirEntry)) error { + var ( + srcPrev, dstPrev fs.DirEntry + srcPrevName, dstPrevName string + src, dst fs.DirEntry + srcName, dstName string + ) + srcDone := func() { + srcPrevName = srcName + srcPrev = src + src = nil + srcName = "" + } + dstDone := func() { + dstPrevName = dstName + dstPrev = dst + dst = nil + dstName = "" + } + for { + if m.aborting() { + return m.Ctx.Err() } - if iDst < len(dstList) { - dst = dstList[iDst].entry - dstName = dstList[iDst].name + // Reload src and dst if needed - we set them to nil if used + if src == nil { + src = <-srcChan + srcName = m.key(src) + } + if dst == nil { + dst = <-dstChan + dstName = m.key(dst) } if src == nil && dst == nil { break } - if src != nil && iSrc > 0 { - prev := srcList[iSrc-1].entry - prevName := srcList[iSrc-1].name - if srcName == prevName && fs.DirEntryType(prev) == fs.DirEntryType(src) { + if src != nil && srcPrev != nil { + if srcName == srcPrevName && fs.DirEntryType(srcPrev) == fs.DirEntryType(src) { fs.Logf(src, "Duplicate %s found in source - ignoring", fs.DirEntryType(src)) - iDst-- // ignore the src and retry the dst + srcDone() // skip the src and retry the dst continue - } else if srcName < prevName { + } else if srcName < srcPrevName { // this should never happen since we sort the listings panic("Out of order listing in source") } } - if dst != nil && iDst > 0 { - prev := dstList[iDst-1].entry - prevName := dstList[iDst-1].name - if dstName == prevName && fs.DirEntryType(dst) == fs.DirEntryType(prev) { + if dst != nil && dstPrev != nil { + if dstName == dstPrevName && fs.DirEntryType(dst) == fs.DirEntryType(dstPrev) { fs.Logf(dst, "Duplicate %s found in destination - ignoring", fs.DirEntryType(dst)) - iSrc-- // ignore the dst and retry the src + dstDone() // skip the dst and retry the src continue - } else if dstName < prevName { + } else if dstName < dstPrevName { // this should never happen since we sort the listings panic("Out of order listing in destination") } } - if src != nil && dst != nil { + switch { + case src != nil && dst != nil: // we can't use CompareDirEntries because srcName, dstName could - // be different then src.Remote() or dst.Remote() + // be different from src.Remote() or dst.Remote() srcType := fs.DirEntryType(src) dstType := fs.DirEntryType(dst) if srcName > dstName || (srcName == dstName && srcType > dstType) { - src = nil - iSrc-- + dstOnly(dst) + dstDone() } else if srcName < dstName || (srcName == dstName && srcType < dstType) { - dst = nil - iDst-- + srcOnly(src) + srcDone() + } else { + match(dst, src) + dstDone() + srcDone() } - } - // Debugf(nil, "src = %v, dst = %v", src, dst) - switch { - case src == nil && dst == nil: - // do nothing case src == nil: - dstOnly = append(dstOnly, dst) + dstOnly(dst) + dstDone() case dst == nil: - srcOnly = append(srcOnly, src) - default: - matches = append(matches, matchPair{src: src, dst: dst}) + srcOnly(src) + srcDone() } } - return + return nil } // processJob processes a listDirJob listing the source and @@ -385,27 +355,125 @@ func matchListings(srcListEntries, dstListEntries fs.DirEntries, transforms []ma func (m *March) processJob(job listDirJob) ([]listDirJob, error) { var ( jobs []listDirJob - srcList, dstList fs.DirEntries + srcChan = make(chan fs.DirEntry, 100) + dstChan = make(chan fs.DirEntry, 100) srcListErr, dstListErr error wg sync.WaitGroup - mu sync.Mutex + ci = fs.GetConfig(m.Ctx) ) // List the src and dst directories if !job.noSrc { + srcChan := srcChan // duplicate this as we may override it later wg.Add(1) go func() { defer wg.Done() - srcList, srcListErr = m.srcListDir(job.srcRemote) + srcListErr = m.srcListDir(job.srcRemote, func(entries fs.DirEntries) error { + for _, entry := range entries { + srcChan <- entry + } + return nil + }) + close(srcChan) + }() + } else { + close(srcChan) + } + startedDst := false + if !m.NoTraverse && !job.noDst { + startedDst = true + wg.Add(1) + go func() { + defer wg.Done() + dstListErr = m.dstListDir(job.dstRemote, func(entries fs.DirEntries) error { + for _, entry := range entries { + dstChan <- entry + } + return nil + }) + close(dstChan) }() } - if !m.NoTraverse && !job.noDst { + // If NoTraverse is set, then try to find a matching object + // for each item in the srcList to head dst object + if m.NoTraverse && !m.NoCheckDest { + originalSrcChan := srcChan + srcChan = make(chan fs.DirEntry, 100) + ls, err := list.NewSorter(m.Ctx, list.SortToChan(dstChan), m.key) + if err != nil { + return nil, err + } + + startedDst = true wg.Add(1) go func() { defer wg.Done() - dstList, dstListErr = m.dstListDir(job.dstRemote) + defer ls.CleanUp() + + g, gCtx := errgroup.WithContext(m.Ctx) + g.SetLimit(ci.Checkers) + for src := range originalSrcChan { + srcChan <- src + if srcObj, ok := src.(fs.Object); ok { + g.Go(func() error { + leaf := path.Base(srcObj.Remote()) + dstObj, err := m.Fdst.NewObject(gCtx, path.Join(job.dstRemote, leaf)) + if err == nil { + _ = ls.Add(fs.DirEntries{dstObj}) // ignore errors + } + return nil // ignore errors + }) + } + } + dstListErr = g.Wait() + sendErr := ls.Send() + if dstListErr == nil { + dstListErr = sendErr + } + close(srcChan) + close(dstChan) }() } + if !startedDst { + close(dstChan) + } + + // Work out what to do and do it + err := m.matchListings(srcChan, dstChan, func(src fs.DirEntry) { + recurse := m.Callback.SrcOnly(src) + if recurse && job.srcDepth > 0 { + jobs = append(jobs, listDirJob{ + srcRemote: src.Remote(), + dstRemote: src.Remote(), + srcDepth: job.srcDepth - 1, + noDst: true, + }) + } + + }, func(dst fs.DirEntry) { + recurse := m.Callback.DstOnly(dst) + if recurse && job.dstDepth > 0 { + jobs = append(jobs, listDirJob{ + srcRemote: dst.Remote(), + dstRemote: dst.Remote(), + dstDepth: job.dstDepth - 1, + noSrc: true, + }) + } + }, func(dst, src fs.DirEntry) { + recurse := m.Callback.Match(m.Ctx, dst, src) + if recurse && job.srcDepth > 0 && job.dstDepth > 0 { + jobs = append(jobs, listDirJob{ + srcRemote: src.Remote(), + dstRemote: dst.Remote(), + srcDepth: job.srcDepth - 1, + dstDepth: job.dstDepth - 1, + }) + } + }) + if err != nil { + return nil, err + } // Wait for listings to complete and report errors wg.Wait() @@ -430,73 +498,5 @@ func (m *March) processJob(job listDirJob) ([]listDirJob, error) { return nil, dstListErr } - // If NoTraverse is set, then try to find a matching object - // for each item in the srcList to head dst object - if m.NoTraverse && !m.NoCheckDest { - for _, src := range srcList { - wg.Add(1) - m.limiter <- struct{}{} - go func(src fs.DirEntry) { - defer wg.Done() - if srcObj, ok := src.(fs.Object); ok { - leaf := path.Base(srcObj.Remote()) - dstObj, err := m.Fdst.NewObject(m.Ctx, path.Join(job.dstRemote, leaf)) - if err == nil { - mu.Lock() - dstList = append(dstList, dstObj) - mu.Unlock() - } - } - <-m.limiter - }(src) - } - wg.Wait() - } - - // Work out what to do and do it - srcOnly, dstOnly, matches := matchListings(srcList, dstList, m.transforms) - for _, src := range srcOnly { - if m.aborting() { - return nil, m.Ctx.Err() - } - recurse := m.Callback.SrcOnly(src) - if recurse && job.srcDepth > 0 { - jobs = append(jobs, listDirJob{ - srcRemote: src.Remote(), - dstRemote: src.Remote(), - srcDepth: job.srcDepth - 1, - noDst: true, - }) - } - - } - for _, dst := range dstOnly { - if m.aborting() { - return nil, m.Ctx.Err() - } - recurse := m.Callback.DstOnly(dst) - if recurse && job.dstDepth > 0 { - jobs = append(jobs, listDirJob{ - srcRemote: dst.Remote(), - dstRemote: dst.Remote(), - dstDepth: job.dstDepth - 1, - noSrc: true, - }) - } - } - for _, match := range matches { - if m.aborting() { - return nil, m.Ctx.Err() - } - recurse := m.Callback.Match(m.Ctx, match.dst, match.src) - if recurse && job.srcDepth > 0 && job.dstDepth > 0 { - jobs = append(jobs, listDirJob{ - srcRemote: match.src.Remote(), - dstRemote: match.dst.Remote(), - srcDepth: job.srcDepth - 1, - dstDepth: job.dstDepth - 1, - }) - } - } return jobs, nil } diff --git a/fs/march/march_test.go b/fs/march/march_test.go index 9906b8ee8..2d1d03e74 100644 --- a/fs/march/march_test.go +++ b/fs/march/march_test.go @@ -14,6 +14,8 @@ import ( "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/filter" "github.com/rclone/rclone/fs/fserrors" + "github.com/rclone/rclone/fs/list" + "github.com/rclone/rclone/fs/walk" "github.com/rclone/rclone/fstest" "github.com/rclone/rclone/fstest/mockdir" "github.com/rclone/rclone/fstest/mockobject" @@ -147,6 +149,8 @@ func TestMarch(t *testing.T) { dirDstOnly []string fileMatch []string dirMatch []string + noTraverse bool + fastList bool }{ { what: "source only", @@ -167,6 +171,45 @@ func TestMarch(t *testing.T) { fileDstOnly: []string{"dstOnly", "dstOnlyDir/sub"}, dirDstOnly: []string{"dstOnlyDir"}, }, + { + what: "no traverse source only", + fileSrcOnly: []string{"test", "test2", "test3", "sub dir/test4"}, + dirSrcOnly: []string{"sub dir"}, + noTraverse: true, + }, + { + what: "no traverse identical", + fileMatch: []string{"test", "test2", "sub dir/test3", "sub dir/sub sub dir/test4"}, + noTraverse: true, + }, + { + what: "no traverse typical sync", + fileSrcOnly: []string{"srcOnly", "srcOnlyDir/sub"}, + fileMatch: []string{"match", "matchDir/match file"}, + noTraverse: true, + }, + { + what: "fast list source only", + fileSrcOnly: []string{"test", "test2", "test3", "sub dir/test4"}, + dirSrcOnly: []string{"sub dir"}, + fastList: true, + }, + { + what: "fast list identical", + fileMatch: []string{"test", "test2", "sub dir/test3", "sub dir/sub sub dir/test4"}, + dirMatch: []string{"sub dir", "sub dir/sub sub dir"}, + fastList: true, + }, + { + what: "fast list typical sync", + fileSrcOnly: []string{"srcOnly", "srcOnlyDir/sub"}, + dirSrcOnly: []string{"srcOnlyDir"}, + fileMatch: []string{"match", "matchDir/match file"}, + dirMatch: []string{"matchDir"}, + fileDstOnly: []string{"dstOnly", "dstOnlyDir/sub"}, + dirDstOnly: []string{"dstOnlyDir"}, + fastList: true, + }, } { t.Run(fmt.Sprintf("TestMarch-%s", test.what), func(t *testing.T) { r := fstest.NewRun(t) @@ -187,18 +230,33 @@ func TestMarch(t *testing.T) { match = append(match, r.WriteBoth(ctx, f, "hello world", t1)) } + ctx, ci := fs.AddConfig(ctx) + ci.UseListR = test.fastList + + fi := filter.GetConfig(ctx) + + // Local backend doesn't implement ListR, so monkey patch it for this test + if test.fastList && r.Flocal.Features().ListR == nil { + r.Flocal.Features().ListR = func(ctx context.Context, dir string, callback fs.ListRCallback) error { + r.Flocal.Features().ListR = nil // disable ListR to avoid infinite recursion + return walk.ListR(ctx, r.Flocal, dir, true, -1, walk.ListAll, callback) + } + defer func() { + r.Flocal.Features().ListR = nil + }() + } + mt := &marchTester{ ctx: ctx, cancel: cancel, - noTraverse: false, + noTraverse: test.noTraverse, } - fi := filter.GetConfig(ctx) m := &March{ Ctx: ctx, Fdst: r.Fremote, Fsrc: r.Flocal, Dir: "", - NoTraverse: mt.noTraverse, + NoTraverse: test.noTraverse, Callback: mt, DstIncludeAll: fi.Opt.DeleteExcluded, } @@ -216,95 +274,9 @@ func TestMarch(t *testing.T) { } } -func TestMarchNoTraverse(t *testing.T) { - for _, test := range []struct { - what string - fileSrcOnly []string - dirSrcOnly []string - fileMatch []string - dirMatch []string - }{ - { - what: "source only", - fileSrcOnly: []string{"test", "test2", "test3", "sub dir/test4"}, - dirSrcOnly: []string{"sub dir"}, - }, - { - what: "identical", - fileMatch: []string{"test", "test2", "sub dir/test3", "sub dir/sub sub dir/test4"}, - }, - { - what: "typical sync", - fileSrcOnly: []string{"srcOnly", "srcOnlyDir/sub"}, - fileMatch: []string{"match", "matchDir/match file"}, - }, - } { - t.Run(fmt.Sprintf("TestMarch-%s", test.what), func(t *testing.T) { - r := fstest.NewRun(t) - - var srcOnly []fstest.Item - var match []fstest.Item - - ctx, cancel := context.WithCancel(context.Background()) - - for _, f := range test.fileSrcOnly { - srcOnly = append(srcOnly, r.WriteFile(f, "hello world", t1)) - } - for _, f := range test.fileMatch { - match = append(match, r.WriteBoth(ctx, f, "hello world", t1)) - } - - mt := &marchTester{ - ctx: ctx, - cancel: cancel, - noTraverse: true, - } - fi := filter.GetConfig(ctx) - m := &March{ - Ctx: ctx, - Fdst: r.Fremote, - Fsrc: r.Flocal, - Dir: "", - NoTraverse: mt.noTraverse, - Callback: mt, - DstIncludeAll: fi.Opt.DeleteExcluded, - } - - mt.processError(m.Run(ctx)) - mt.cancel() - err := mt.currentError() - require.NoError(t, err) - - precision := fs.GetModifyWindow(ctx, r.Fremote, r.Flocal) - fstest.CompareItems(t, mt.srcOnly, srcOnly, test.dirSrcOnly, precision, "srcOnly") - fstest.CompareItems(t, mt.match, match, test.dirMatch, precision, "match") - }) - } -} - -func TestNewMatchEntries(t *testing.T) { - var ( - a = mockobject.Object("path/a") - A = mockobject.Object("path/A") - B = mockobject.Object("path/B") - c = mockobject.Object("path/c") - ) - - es := newMatchEntries(fs.DirEntries{a, A, B, c}, nil) - assert.Equal(t, es, matchEntries{ - {name: "A", leaf: "A", entry: A}, - {name: "B", leaf: "B", entry: B}, - {name: "a", leaf: "a", entry: a}, - {name: "c", leaf: "c", entry: c}, - }) - - es = newMatchEntries(fs.DirEntries{a, A, B, c}, []matchTransformFn{strings.ToLower}) - assert.Equal(t, es, matchEntries{ - {name: "a", leaf: "A", entry: A}, - {name: "a", leaf: "a", entry: a}, - {name: "b", leaf: "B", entry: B}, - {name: "c", leaf: "c", entry: c}, - }) +// matchPair is a matched pair of direntries returned by matchListings +type matchPair struct { + src, dst fs.DirEntry } func TestMatchListings(t *testing.T) { @@ -414,11 +386,11 @@ func TestMatchListings(t *testing.T) { { what: "Case insensitive duplicate - transform to lower case", input: fs.DirEntries{ - a, a, - A, A, + a, A, + A, a, }, matches: []matchPair{ - {A, A}, + {a, A}, // the first duplicate will be returned with a stable sort }, transforms: []matchTransformFn{strings.ToLower}, }, @@ -507,22 +479,61 @@ func TestMatchListings(t *testing.T) { }, } { t.Run(fmt.Sprintf("TestMatchListings-%s", test.what), func(t *testing.T) { - var srcList, dstList fs.DirEntries - for i := 0; i < len(test.input); i += 2 { - src, dst := test.input[i], test.input[i+1] - if src != nil { - srcList = append(srcList, src) - } - if dst != nil { - dstList = append(dstList, dst) - } + ctx := context.Background() + var wg sync.WaitGroup + + // Skeleton March for testing + m := March{ + Ctx: context.Background(), + transforms: test.transforms, } - srcOnly, dstOnly, matches := matchListings(srcList, dstList, test.transforms) + + // Make a channel to send the source (0) or dest (1) using a list.Sorter + makeChan := func(offset int) <-chan fs.DirEntry { + out := make(chan fs.DirEntry) + ls, err := list.NewSorter(ctx, nil, list.SortToChan(out), m.key) + require.NoError(t, err) + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < len(test.input); i += 2 { + entry := test.input[i+offset] + if entry != nil { + require.NoError(t, ls.Add(fs.DirEntries{entry})) + } + } + require.NoError(t, ls.Send()) + ls.CleanUp() + close(out) + }() + return out + } + + var srcOnly fs.DirEntries + srcOnlyFn := func(entry fs.DirEntry) { + srcOnly = append(srcOnly, entry) + } + var dstOnly fs.DirEntries + dstOnlyFn := func(entry fs.DirEntry) { + dstOnly = append(dstOnly, entry) + } + var matches []matchPair + matchFn := func(dst, src fs.DirEntry) { + matches = append(matches, matchPair{dst: dst, src: src}) + } + + err := m.matchListings(makeChan(0), makeChan(1), srcOnlyFn, dstOnlyFn, matchFn) + require.NoError(t, err) + wg.Wait() assert.Equal(t, test.srcOnly, srcOnly, test.what, "srcOnly differ") assert.Equal(t, test.dstOnly, dstOnly, test.what, "dstOnly differ") assert.Equal(t, test.matches, matches, test.what, "matches differ") + // now swap src and dst - dstOnly, srcOnly, matches = matchListings(dstList, srcList, test.transforms) + srcOnly, dstOnly, matches = nil, nil, nil + err = m.matchListings(makeChan(0), makeChan(1), srcOnlyFn, dstOnlyFn, matchFn) + require.NoError(t, err) + wg.Wait() assert.Equal(t, test.srcOnly, srcOnly, test.what, "srcOnly differ") assert.Equal(t, test.dstOnly, dstOnly, test.what, "dstOnly differ") assert.Equal(t, test.matches, matches, test.what, "matches differ")