[#347] zfscmd + zfs: define .Start() semantics, apply to call sites in pkg zfs

fixes #347
This commit is contained in:
Christian Schwarz 2020-08-29 19:18:00 +02:00
parent fecc9416ab
commit 0f3da73ef1
3 changed files with 41 additions and 8 deletions

View File

@ -6,6 +6,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/pkg/errors"
@ -218,14 +219,23 @@ func ZFSListFilesystemVersions(ctx context.Context, fs *DatasetPath, options Lis
promTimer := prometheus.NewTimer(prom.ZFSListFilesystemVersionDuration.WithLabelValues(fs.ToString()))
defer promTimer.ObserveDuration()
// Note: we don't create a separate trace.Task here because our loop that consumes
// the goroutine's output doesn't use ctx.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go ZFSListChan(ctx, listResults,
[]string{"name", "guid", "createtxg", "creation", "userrefs"},
fs,
"-r", "-d", "1",
"-t", options.typesFlagArgs(),
"-s", "createtxg", fs.ToString())
// make sure the goroutine doesn't outlive this function call
var wg sync.WaitGroup
wg.Add(1)
defer wg.Wait()
defer cancel() // on exit, cancel list process before waiting for it
go func() {
defer wg.Done()
ZFSListChan(ctx, listResults,
[]string{"name", "guid", "createtxg", "creation", "userrefs"},
fs,
"-r", "-d", "1",
"-t", options.typesFlagArgs(),
"-s", "createtxg", fs.ToString())
}()
res = make([]FilesystemVersion, 0)
for listResult := range listResults {

View File

@ -173,6 +173,8 @@ func ZFSList(ctx context.Context, properties []string, zfsArgs ...string) (res [
"-o", strings.Join(properties, ","))
args = append(args, zfsArgs...)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
stdout, stderrBuf, err := cmd.StdoutPipeWithErrorBuf()
if err != nil {
@ -182,6 +184,11 @@ func ZFSList(ctx context.Context, properties []string, zfsArgs ...string) (res [
if err = cmd.Start(); err != nil {
return
}
// in case we return early, we want to kill the zfs list process and wait for it to exit
defer func() {
_ = cmd.Wait()
}()
defer cancel()
s := bufio.NewScanner(stdout)
buf := make([]byte, 1024)
@ -244,6 +251,8 @@ func ZFSListChan(ctx context.Context, out chan ZFSListResult, properties []strin
}
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
stdout, stderrBuf, err := cmd.StdoutPipeWithErrorBuf()
if err != nil {
@ -259,6 +268,7 @@ func ZFSListChan(ctx context.Context, out chan ZFSListResult, properties []strin
// in which case we'll return an 'unexpected output' error and not the exit status
_ = cmd.Wait()
}()
defer cancel() // in case we return before our regular call to cmd.Wait(), kill the zfs list process
s := bufio.NewScanner(stdout)
buf := make([]byte, 1024) // max line length

View File

@ -79,6 +79,13 @@ func (c *Cmd) log() Logger {
return getLogger(c.ctx).WithField("cmd", c.String())
}
// Start the command.
//
// This creates a new trace.WithTask as a child task of the ctx passed to CommandContext.
// If the process is successfully started (err == nil), it is the CALLER'S RESPONSIBILITY to ensure that
// the spawned process does not outlive the ctx's trace.Task.
//
// If this method returns an error, the Cmd instance is invalid. Start must not be called repeatedly.
func (c *Cmd) Start() (err error) {
c.startPre(true)
err = c.cmd.Start()
@ -86,7 +93,9 @@ func (c *Cmd) Start() (err error) {
return err
}
// only call this after a successful call to .Start()
// Get the underlying os.Process.
//
// Only call this method after a successful call to .Start().
func (c *Cmd) Process() *os.Process {
if c.startedAt.IsZero() {
panic("calling Process() only allowed after successful call to Start()")
@ -94,6 +103,10 @@ func (c *Cmd) Process() *os.Process {
return c.cmd.Process
}
// Blocking wait for the process to exit.
// May be called concurrently and repeatly (exec.Cmd.Wait() semantics apply).
//
// Only call this method after a successful call to .Start().
func (c *Cmd) Wait() (err error) {
c.waitPre()
err = c.cmd.Wait()