From 0f3da73ef11316f8cdd46e937cac21e373a86c83 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sat, 29 Aug 2020 19:18:00 +0200 Subject: [PATCH] [#347] zfscmd + zfs: define .Start() semantics, apply to call sites in pkg zfs fixes #347 --- zfs/versions.go | 24 +++++++++++++++++------- zfs/zfs.go | 10 ++++++++++ zfs/zfscmd/zfscmd.go | 15 ++++++++++++++- 3 files changed, 41 insertions(+), 8 deletions(-) diff --git a/zfs/versions.go b/zfs/versions.go index 604f5ec..bf45f10 100644 --- a/zfs/versions.go +++ b/zfs/versions.go @@ -6,6 +6,7 @@ import ( "fmt" "strconv" "strings" + "sync" "time" "github.com/pkg/errors" @@ -218,14 +219,23 @@ func ZFSListFilesystemVersions(ctx context.Context, fs *DatasetPath, options Lis promTimer := prometheus.NewTimer(prom.ZFSListFilesystemVersionDuration.WithLabelValues(fs.ToString())) defer promTimer.ObserveDuration() + // Note: we don't create a separate trace.Task here because our loop that consumes + // the goroutine's output doesn't use ctx. ctx, cancel := context.WithCancel(ctx) - defer cancel() - go ZFSListChan(ctx, listResults, - []string{"name", "guid", "createtxg", "creation", "userrefs"}, - fs, - "-r", "-d", "1", - "-t", options.typesFlagArgs(), - "-s", "createtxg", fs.ToString()) + // make sure the goroutine doesn't outlive this function call + var wg sync.WaitGroup + wg.Add(1) + defer wg.Wait() + defer cancel() // on exit, cancel list process before waiting for it + go func() { + defer wg.Done() + ZFSListChan(ctx, listResults, + []string{"name", "guid", "createtxg", "creation", "userrefs"}, + fs, + "-r", "-d", "1", + "-t", options.typesFlagArgs(), + "-s", "createtxg", fs.ToString()) + }() res = make([]FilesystemVersion, 0) for listResult := range listResults { diff --git a/zfs/zfs.go b/zfs/zfs.go index f7c3193..443c444 100644 --- a/zfs/zfs.go +++ b/zfs/zfs.go @@ -173,6 +173,8 @@ func ZFSList(ctx context.Context, properties []string, zfsArgs ...string) (res [ "-o", strings.Join(properties, ",")) args = append(args, zfsArgs...) + ctx, cancel := context.WithCancel(ctx) + defer cancel() cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...) stdout, stderrBuf, err := cmd.StdoutPipeWithErrorBuf() if err != nil { @@ -182,6 +184,11 @@ func ZFSList(ctx context.Context, properties []string, zfsArgs ...string) (res [ if err = cmd.Start(); err != nil { return } + // in case we return early, we want to kill the zfs list process and wait for it to exit + defer func() { + _ = cmd.Wait() + }() + defer cancel() s := bufio.NewScanner(stdout) buf := make([]byte, 1024) @@ -244,6 +251,8 @@ func ZFSListChan(ctx context.Context, out chan ZFSListResult, properties []strin } } + ctx, cancel := context.WithCancel(ctx) + defer cancel() cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...) stdout, stderrBuf, err := cmd.StdoutPipeWithErrorBuf() if err != nil { @@ -259,6 +268,7 @@ func ZFSListChan(ctx context.Context, out chan ZFSListResult, properties []strin // in which case we'll return an 'unexpected output' error and not the exit status _ = cmd.Wait() }() + defer cancel() // in case we return before our regular call to cmd.Wait(), kill the zfs list process s := bufio.NewScanner(stdout) buf := make([]byte, 1024) // max line length diff --git a/zfs/zfscmd/zfscmd.go b/zfs/zfscmd/zfscmd.go index 9834fa2..d07c605 100644 --- a/zfs/zfscmd/zfscmd.go +++ b/zfs/zfscmd/zfscmd.go @@ -79,6 +79,13 @@ func (c *Cmd) log() Logger { return getLogger(c.ctx).WithField("cmd", c.String()) } +// Start the command. +// +// This creates a new trace.WithTask as a child task of the ctx passed to CommandContext. +// If the process is successfully started (err == nil), it is the CALLER'S RESPONSIBILITY to ensure that +// the spawned process does not outlive the ctx's trace.Task. +// +// If this method returns an error, the Cmd instance is invalid. Start must not be called repeatedly. func (c *Cmd) Start() (err error) { c.startPre(true) err = c.cmd.Start() @@ -86,7 +93,9 @@ func (c *Cmd) Start() (err error) { return err } -// only call this after a successful call to .Start() +// Get the underlying os.Process. +// +// Only call this method after a successful call to .Start(). func (c *Cmd) Process() *os.Process { if c.startedAt.IsZero() { panic("calling Process() only allowed after successful call to Start()") @@ -94,6 +103,10 @@ func (c *Cmd) Process() *os.Process { return c.cmd.Process } +// Blocking wait for the process to exit. +// May be called concurrently and repeatly (exec.Cmd.Wait() semantics apply). +// +// Only call this method after a successful call to .Start(). func (c *Cmd) Wait() (err error) { c.waitPre() err = c.cmd.Wait()