diff --git a/client/zfsabstractions_list.go b/client/zfsabstractions_list.go index 2f7a151..ed0c40f 100644 --- a/client/zfsabstractions_list.go +++ b/client/zfsabstractions_list.go @@ -44,10 +44,11 @@ func doZabsList(ctx context.Context, sc *cli.Subcommand, args []string) error { return errors.Wrap(err, "invalid filter specification on command line") } - abstractions, errors, err := endpoint.ListAbstractionsStreamed(ctx, q) + abstractions, errors, drainDone, err := endpoint.ListAbstractionsStreamed(ctx, q) if err != nil { return err // context clear by invocation of command } + defer drainDone() var line chainlock.L var wg sync.WaitGroup diff --git a/endpoint/endpoint_zfs_abstraction.go b/endpoint/endpoint_zfs_abstraction.go index 9bfa3be..c8682bb 100644 --- a/endpoint/endpoint_zfs_abstraction.go +++ b/endpoint/endpoint_zfs_abstraction.go @@ -516,10 +516,12 @@ func (e ListAbstractionsErrors) Error() string { } func ListAbstractions(ctx context.Context, query ListZFSHoldsAndBookmarksQuery) (out []Abstraction, outErrs []ListAbstractionsError, err error) { - outChan, outErrsChan, err := ListAbstractionsStreamed(ctx, query) + outChan, outErrsChan, drainDone, err := ListAbstractionsStreamed(ctx, query) if err != nil { return nil, nil, err } + defer drainDone() + var wg sync.WaitGroup wg.Add(1) go func() { @@ -540,19 +542,20 @@ func ListAbstractions(ctx context.Context, query ListZFSHoldsAndBookmarksQuery) } // if err != nil, the returned channels are both nil -// if err == nil, both channels must be fully drained by the caller to avoid leaking goroutines -func ListAbstractionsStreamed(ctx context.Context, query ListZFSHoldsAndBookmarksQuery) (<-chan Abstraction, <-chan ListAbstractionsError, error) { +// if err == nil, both channels must be fully drained by the caller to avoid leaking goroutines. +// After draining is done, the caller must call the returned drainDone func. +func ListAbstractionsStreamed(ctx context.Context, query ListZFSHoldsAndBookmarksQuery) (_ <-chan Abstraction, _ <-chan ListAbstractionsError, drainDone func(), _ error) { // impl note: structure the query processing in such a way that // a minimum amount of zfs shell-outs needs to be done if err := query.Validate(); err != nil { - return nil, nil, errors.Wrap(err, "validate query") + return nil, nil, nil, errors.Wrap(err, "validate query") } fss, err := query.FS.Filesystems(ctx) if err != nil { - return nil, nil, errors.Wrap(err, "list filesystems") + return nil, nil, nil, errors.Wrap(err, "list filesystems") } outErrs := make(chan ListAbstractionsError) @@ -574,7 +577,6 @@ func ListAbstractionsStreamed(ctx context.Context, query ListZFSHoldsAndBookmark sem := semaphore.New(int64(query.Concurrency)) ctx, endTask := trace.WithTask(ctx, "list-abstractions-streamed-producer") go func() { - defer endTask() defer close(out) defer close(outErrs) @@ -596,7 +598,11 @@ func ListAbstractionsStreamed(ctx context.Context, query ListZFSHoldsAndBookmark } }() - return out, outErrs, nil + drainDone = func() { + endTask() + } + + return out, outErrs, drainDone, nil } func listAbstractionsImplFS(ctx context.Context, fs string, query *ListZFSHoldsAndBookmarksQuery, emitCandidate putListAbstraction, errCb putListAbstractionErr) {