From 193abbe6b117a91f5348789a2e92bfedb340f921 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sun, 17 Jul 2022 21:33:07 +0200 Subject: [PATCH] fix active child tasks panic with endpoint.ListAbstractionsStreamed The goroutine that does endTask() for "list-abstractions-streamed-producer" can be preempted after it has closed the out and outErrs channel, but before it calls endTask(). If the parent ("handler") then gets scheduled and and ends itself, it will observe an active child task "list-abstractions-streamed-producer". This is easy to demo by injecting a sleep here: --- a/endpoint/endpoint_zfs_abstraction.go +++ b/endpoint/endpoint_zfs_abstraction.go @@ -575,6 +576,7 @@ func ListAbstractionsStreamed(ctx context.Context, query ListZFSHoldsAndBookmark ctx, endTask := trace.WithTask(ctx, "list-abstractions-streamed-producer") go func() { defer endTask() + defer time.Sleep(10 * time.Second) defer close(out) defer close(outErrs) fixes https://github.com/zrepl/zrepl/issues/607 --- client/zfsabstractions_list.go | 3 ++- endpoint/endpoint_zfs_abstraction.go | 20 +++++++++++++------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/client/zfsabstractions_list.go b/client/zfsabstractions_list.go index 2f7a151..ed0c40f 100644 --- a/client/zfsabstractions_list.go +++ b/client/zfsabstractions_list.go @@ -44,10 +44,11 @@ func doZabsList(ctx context.Context, sc *cli.Subcommand, args []string) error { return errors.Wrap(err, "invalid filter specification on command line") } - abstractions, errors, err := endpoint.ListAbstractionsStreamed(ctx, q) + abstractions, errors, drainDone, err := endpoint.ListAbstractionsStreamed(ctx, q) if err != nil { return err // context clear by invocation of command } + defer drainDone() var line chainlock.L var wg sync.WaitGroup diff --git a/endpoint/endpoint_zfs_abstraction.go b/endpoint/endpoint_zfs_abstraction.go index 9bfa3be..c8682bb 100644 --- a/endpoint/endpoint_zfs_abstraction.go +++ b/endpoint/endpoint_zfs_abstraction.go @@ -516,10 +516,12 @@ func (e ListAbstractionsErrors) Error() string { } func ListAbstractions(ctx context.Context, query ListZFSHoldsAndBookmarksQuery) (out []Abstraction, outErrs []ListAbstractionsError, err error) { - outChan, outErrsChan, err := ListAbstractionsStreamed(ctx, query) + outChan, outErrsChan, drainDone, err := ListAbstractionsStreamed(ctx, query) if err != nil { return nil, nil, err } + defer drainDone() + var wg sync.WaitGroup wg.Add(1) go func() { @@ -540,19 +542,20 @@ func ListAbstractions(ctx context.Context, query ListZFSHoldsAndBookmarksQuery) } // if err != nil, the returned channels are both nil -// if err == nil, both channels must be fully drained by the caller to avoid leaking goroutines -func ListAbstractionsStreamed(ctx context.Context, query ListZFSHoldsAndBookmarksQuery) (<-chan Abstraction, <-chan ListAbstractionsError, error) { +// if err == nil, both channels must be fully drained by the caller to avoid leaking goroutines. +// After draining is done, the caller must call the returned drainDone func. +func ListAbstractionsStreamed(ctx context.Context, query ListZFSHoldsAndBookmarksQuery) (_ <-chan Abstraction, _ <-chan ListAbstractionsError, drainDone func(), _ error) { // impl note: structure the query processing in such a way that // a minimum amount of zfs shell-outs needs to be done if err := query.Validate(); err != nil { - return nil, nil, errors.Wrap(err, "validate query") + return nil, nil, nil, errors.Wrap(err, "validate query") } fss, err := query.FS.Filesystems(ctx) if err != nil { - return nil, nil, errors.Wrap(err, "list filesystems") + return nil, nil, nil, errors.Wrap(err, "list filesystems") } outErrs := make(chan ListAbstractionsError) @@ -574,7 +577,6 @@ func ListAbstractionsStreamed(ctx context.Context, query ListZFSHoldsAndBookmark sem := semaphore.New(int64(query.Concurrency)) ctx, endTask := trace.WithTask(ctx, "list-abstractions-streamed-producer") go func() { - defer endTask() defer close(out) defer close(outErrs) @@ -596,7 +598,11 @@ func ListAbstractionsStreamed(ctx context.Context, query ListZFSHoldsAndBookmark } }() - return out, outErrs, nil + drainDone = func() { + endTask() + } + + return out, outErrs, drainDone, nil } func listAbstractionsImplFS(ctx context.Context, fs string, query *ListZFSHoldsAndBookmarksQuery, emitCandidate putListAbstraction, errCb putListAbstractionErr) {