fix active child tasks panic with endpoint.ListAbstractionsStreamed

The goroutine that does endTask() for
"list-abstractions-streamed-producer" can be preempted
after it has closed the out and outErrs channel,
but before it calls endTask().
If the parent ("handler") then gets scheduled and
and ends itself, it will observe an active child task
"list-abstractions-streamed-producer".

This is easy to demo by injecting a sleep here:

  --- a/endpoint/endpoint_zfs_abstraction.go
  +++ b/endpoint/endpoint_zfs_abstraction.go
  @@ -575,6 +576,7 @@ func ListAbstractionsStreamed(ctx context.Context, query ListZFSHoldsAndBookmark
          ctx, endTask := trace.WithTask(ctx, "list-abstractions-streamed-producer")
          go func() {
                  defer endTask()
  +               defer time.Sleep(10 * time.Second)
                  defer close(out)
                  defer close(outErrs)

fixes https://github.com/zrepl/zrepl/issues/607
This commit is contained in:
Christian Schwarz 2022-07-17 21:33:07 +02:00
parent 02b215128e
commit 193abbe6b1
2 changed files with 15 additions and 8 deletions

View File

@ -44,10 +44,11 @@ func doZabsList(ctx context.Context, sc *cli.Subcommand, args []string) error {
return errors.Wrap(err, "invalid filter specification on command line") return errors.Wrap(err, "invalid filter specification on command line")
} }
abstractions, errors, err := endpoint.ListAbstractionsStreamed(ctx, q) abstractions, errors, drainDone, err := endpoint.ListAbstractionsStreamed(ctx, q)
if err != nil { if err != nil {
return err // context clear by invocation of command return err // context clear by invocation of command
} }
defer drainDone()
var line chainlock.L var line chainlock.L
var wg sync.WaitGroup var wg sync.WaitGroup

View File

@ -516,10 +516,12 @@ func (e ListAbstractionsErrors) Error() string {
} }
func ListAbstractions(ctx context.Context, query ListZFSHoldsAndBookmarksQuery) (out []Abstraction, outErrs []ListAbstractionsError, err error) { func ListAbstractions(ctx context.Context, query ListZFSHoldsAndBookmarksQuery) (out []Abstraction, outErrs []ListAbstractionsError, err error) {
outChan, outErrsChan, err := ListAbstractionsStreamed(ctx, query) outChan, outErrsChan, drainDone, err := ListAbstractionsStreamed(ctx, query)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
defer drainDone()
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
go func() { go func() {
@ -540,19 +542,20 @@ func ListAbstractions(ctx context.Context, query ListZFSHoldsAndBookmarksQuery)
} }
// if err != nil, the returned channels are both nil // if err != nil, the returned channels are both nil
// if err == nil, both channels must be fully drained by the caller to avoid leaking goroutines // if err == nil, both channels must be fully drained by the caller to avoid leaking goroutines.
func ListAbstractionsStreamed(ctx context.Context, query ListZFSHoldsAndBookmarksQuery) (<-chan Abstraction, <-chan ListAbstractionsError, error) { // After draining is done, the caller must call the returned drainDone func.
func ListAbstractionsStreamed(ctx context.Context, query ListZFSHoldsAndBookmarksQuery) (_ <-chan Abstraction, _ <-chan ListAbstractionsError, drainDone func(), _ error) {
// impl note: structure the query processing in such a way that // impl note: structure the query processing in such a way that
// a minimum amount of zfs shell-outs needs to be done // a minimum amount of zfs shell-outs needs to be done
if err := query.Validate(); err != nil { if err := query.Validate(); err != nil {
return nil, nil, errors.Wrap(err, "validate query") return nil, nil, nil, errors.Wrap(err, "validate query")
} }
fss, err := query.FS.Filesystems(ctx) fss, err := query.FS.Filesystems(ctx)
if err != nil { if err != nil {
return nil, nil, errors.Wrap(err, "list filesystems") return nil, nil, nil, errors.Wrap(err, "list filesystems")
} }
outErrs := make(chan ListAbstractionsError) outErrs := make(chan ListAbstractionsError)
@ -574,7 +577,6 @@ func ListAbstractionsStreamed(ctx context.Context, query ListZFSHoldsAndBookmark
sem := semaphore.New(int64(query.Concurrency)) sem := semaphore.New(int64(query.Concurrency))
ctx, endTask := trace.WithTask(ctx, "list-abstractions-streamed-producer") ctx, endTask := trace.WithTask(ctx, "list-abstractions-streamed-producer")
go func() { go func() {
defer endTask()
defer close(out) defer close(out)
defer close(outErrs) defer close(outErrs)
@ -596,7 +598,11 @@ func ListAbstractionsStreamed(ctx context.Context, query ListZFSHoldsAndBookmark
} }
}() }()
return out, outErrs, nil drainDone = func() {
endTask()
}
return out, outErrs, drainDone, nil
} }
func listAbstractionsImplFS(ctx context.Context, fs string, query *ListZFSHoldsAndBookmarksQuery, emitCandidate putListAbstraction, errCb putListAbstractionErr) { func listAbstractionsImplFS(ctx context.Context, fs string, query *ListZFSHoldsAndBookmarksQuery, emitCandidate putListAbstraction, errCb putListAbstractionErr) {