zrepl/client/zfsabstractions_list.go
Christian Schwarz 193abbe6b1 fix active child tasks panic with endpoint.ListAbstractionsStreamed
The goroutine that does endTask() for
"list-abstractions-streamed-producer" can be preempted
after it has closed the out and outErrs channel,
but before it calls endTask().
If the parent ("handler") then gets scheduled and
and ends itself, it will observe an active child task
"list-abstractions-streamed-producer".

This is easy to demo by injecting a sleep here:

  --- a/endpoint/endpoint_zfs_abstraction.go
  +++ b/endpoint/endpoint_zfs_abstraction.go
  @@ -575,6 +576,7 @@ func ListAbstractionsStreamed(ctx context.Context, query ListZFSHoldsAndBookmark
          ctx, endTask := trace.WithTask(ctx, "list-abstractions-streamed-producer")
          go func() {
                  defer endTask()
  +               defer time.Sleep(10 * time.Second)
                  defer close(out)
                  defer close(outErrs)

fixes https://github.com/zrepl/zrepl/issues/607
2022-07-17 21:44:03 +02:00

101 lines
2.1 KiB
Go

package client
import (
"context"
"encoding/json"
"fmt"
"os"
"sync"
"github.com/fatih/color"
"github.com/pkg/errors"
"github.com/spf13/pflag"
"github.com/zrepl/zrepl/cli"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/util/chainlock"
)
var zabsListFlags struct {
Filter zabsFilterFlags
Json bool
}
var zabsCmdList = &cli.Subcommand{
Use: "list",
Short: `list zrepl ZFS abstractions`,
Run: doZabsList,
NoRequireConfig: true,
SetupFlags: func(f *pflag.FlagSet) {
zabsListFlags.Filter.registerZabsFilterFlags(f, "list")
f.BoolVar(&zabsListFlags.Json, "json", false, "emit JSON")
},
}
func doZabsList(ctx context.Context, sc *cli.Subcommand, args []string) error {
var err error
if len(args) > 0 {
return errors.New("this subcommand takes no positional arguments")
}
q, err := zabsListFlags.Filter.Query()
if err != nil {
return errors.Wrap(err, "invalid filter specification on command line")
}
abstractions, errors, drainDone, err := endpoint.ListAbstractionsStreamed(ctx, q)
if err != nil {
return err // context clear by invocation of command
}
defer drainDone()
var line chainlock.L
var wg sync.WaitGroup
defer wg.Wait()
// print results
wg.Add(1)
go func() {
defer wg.Done()
enc := json.NewEncoder(os.Stdout)
for a := range abstractions {
func() {
defer line.Lock().Unlock()
if zabsListFlags.Json {
enc.SetIndent("", " ")
if err := enc.Encode(a); err != nil {
panic(err)
}
fmt.Println()
} else {
fmt.Println(a)
}
}()
}
}()
// print errors to stderr
errorColor := color.New(color.FgRed)
var errorsSlice []endpoint.ListAbstractionsError
wg.Add(1)
go func() {
defer wg.Done()
for err := range errors {
func() {
defer line.Lock().Unlock()
errorsSlice = append(errorsSlice, err)
errorColor.Fprintf(os.Stderr, "%s\n", err)
}()
}
}()
wg.Wait()
if len(errorsSlice) > 0 {
errorColor.Add(color.Bold).Fprintf(os.Stderr, "there were errors in listing the abstractions")
return fmt.Errorf("")
} else {
return nil
}
}