2020-03-27 12:35:57 +01:00
|
|
|
// Package zfscmd provides a wrapper around packate os/exec.
|
|
|
|
// Functionality provided by the wrapper:
|
|
|
|
// - logging start and end of command execution
|
|
|
|
// - status report of active commands
|
|
|
|
// - prometheus metrics of runtimes
|
|
|
|
package zfscmd
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"io"
|
|
|
|
"os"
|
|
|
|
"os/exec"
|
|
|
|
"strings"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
2024-10-18 19:21:17 +02:00
|
|
|
"github.com/zrepl/zrepl/internal/daemon/logging/trace"
|
|
|
|
"github.com/zrepl/zrepl/internal/util/circlog"
|
2020-03-27 12:35:57 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
type Cmd struct {
|
2020-04-21 11:57:56 +02:00
|
|
|
cmd *exec.Cmd
|
|
|
|
ctx context.Context
|
|
|
|
mtx sync.RWMutex
|
|
|
|
startedAt, waitStartedAt, waitReturnedAt time.Time
|
2020-04-11 15:49:41 +02:00
|
|
|
waitReturnEndSpanCb trace.DoneFunc
|
2020-03-27 12:35:57 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
func CommandContext(ctx context.Context, name string, arg ...string) *Cmd {
|
|
|
|
cmd := exec.CommandContext(ctx, name, arg...)
|
|
|
|
return &Cmd{cmd: cmd, ctx: ctx}
|
|
|
|
}
|
|
|
|
|
|
|
|
// err.(*exec.ExitError).Stderr will NOT be set
|
|
|
|
func (c *Cmd) CombinedOutput() (o []byte, err error) {
|
2020-04-11 15:49:41 +02:00
|
|
|
c.startPre(false)
|
2020-03-27 12:35:57 +01:00
|
|
|
c.startPost(nil)
|
|
|
|
c.waitPre()
|
|
|
|
o, err = c.cmd.CombinedOutput()
|
|
|
|
c.waitPost(err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// err.(*exec.ExitError).Stderr will be set
|
|
|
|
func (c *Cmd) Output() (o []byte, err error) {
|
2020-04-11 15:49:41 +02:00
|
|
|
c.startPre(false)
|
2020-03-27 12:35:57 +01:00
|
|
|
c.startPost(nil)
|
|
|
|
c.waitPre()
|
|
|
|
o, err = c.cmd.Output()
|
|
|
|
c.waitPost(err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Careful: err.(*exec.ExitError).Stderr will not be set, even if you don't open an StderrPipe
|
|
|
|
func (c *Cmd) StdoutPipeWithErrorBuf() (p io.ReadCloser, errBuf *circlog.CircularLog, err error) {
|
|
|
|
p, err = c.cmd.StdoutPipe()
|
|
|
|
errBuf = circlog.MustNewCircularLog(1 << 15)
|
|
|
|
c.cmd.Stderr = errBuf
|
|
|
|
return p, errBuf, err
|
|
|
|
}
|
|
|
|
|
|
|
|
type Stdio struct {
|
|
|
|
Stdin io.ReadCloser
|
|
|
|
Stdout io.Writer
|
|
|
|
Stderr io.Writer
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Cmd) SetStdio(stdio Stdio) {
|
|
|
|
c.cmd.Stdin = stdio.Stdin
|
|
|
|
c.cmd.Stderr = stdio.Stderr
|
|
|
|
c.cmd.Stdout = stdio.Stdout
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Cmd) String() string {
|
|
|
|
return strings.Join(c.cmd.Args, " ") // includes argv[0] if initialized with CommandContext, that's the only way we o it
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Cmd) log() Logger {
|
|
|
|
return getLogger(c.ctx).WithField("cmd", c.String())
|
|
|
|
}
|
|
|
|
|
2020-08-29 19:18:00 +02:00
|
|
|
// Start the command.
|
|
|
|
//
|
|
|
|
// This creates a new trace.WithTask as a child task of the ctx passed to CommandContext.
|
|
|
|
// If the process is successfully started (err == nil), it is the CALLER'S RESPONSIBILITY to ensure that
|
|
|
|
// the spawned process does not outlive the ctx's trace.Task.
|
|
|
|
//
|
|
|
|
// If this method returns an error, the Cmd instance is invalid. Start must not be called repeatedly.
|
2020-03-27 12:35:57 +01:00
|
|
|
func (c *Cmd) Start() (err error) {
|
2020-04-11 15:49:41 +02:00
|
|
|
c.startPre(true)
|
2020-03-27 12:35:57 +01:00
|
|
|
err = c.cmd.Start()
|
|
|
|
c.startPost(err)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-08-29 19:18:00 +02:00
|
|
|
// Get the underlying os.Process.
|
|
|
|
//
|
|
|
|
// Only call this method after a successful call to .Start().
|
2020-03-27 12:35:57 +01:00
|
|
|
func (c *Cmd) Process() *os.Process {
|
|
|
|
if c.startedAt.IsZero() {
|
|
|
|
panic("calling Process() only allowed after successful call to Start()")
|
|
|
|
}
|
|
|
|
return c.cmd.Process
|
|
|
|
}
|
|
|
|
|
2020-08-29 19:18:00 +02:00
|
|
|
// Blocking wait for the process to exit.
|
|
|
|
// May be called concurrently and repeatly (exec.Cmd.Wait() semantics apply).
|
|
|
|
//
|
|
|
|
// Only call this method after a successful call to .Start().
|
2020-03-27 12:35:57 +01:00
|
|
|
func (c *Cmd) Wait() (err error) {
|
|
|
|
c.waitPre()
|
|
|
|
err = c.cmd.Wait()
|
|
|
|
c.waitPost(err)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-04-11 15:49:41 +02:00
|
|
|
func (c *Cmd) startPre(newTask bool) {
|
|
|
|
if newTask {
|
|
|
|
// avoid explosion of tasks with name c.String()
|
|
|
|
c.ctx, c.waitReturnEndSpanCb = trace.WithTaskAndSpan(c.ctx, "zfscmd", c.String())
|
|
|
|
} else {
|
|
|
|
c.ctx, c.waitReturnEndSpanCb = trace.WithSpan(c.ctx, c.String())
|
|
|
|
}
|
2020-03-27 12:35:57 +01:00
|
|
|
startPreLogging(c, time.Now())
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Cmd) startPost(err error) {
|
|
|
|
|
2020-08-29 19:18:00 +02:00
|
|
|
now := time.Now()
|
2020-03-27 12:35:57 +01:00
|
|
|
c.startedAt = now
|
|
|
|
|
|
|
|
startPostReport(c, err, now)
|
|
|
|
startPostLogging(c, err, now)
|
2020-08-29 19:18:00 +02:00
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
c.waitReturnEndSpanCb()
|
|
|
|
}
|
2020-03-27 12:35:57 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Cmd) waitPre() {
|
2020-04-21 11:57:56 +02:00
|
|
|
now := time.Now()
|
|
|
|
|
|
|
|
// ignore duplicate waits
|
|
|
|
c.mtx.Lock()
|
|
|
|
// ignore duplicate waits
|
|
|
|
if !c.waitStartedAt.IsZero() {
|
|
|
|
c.mtx.Unlock()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
c.waitStartedAt = now
|
|
|
|
c.mtx.Unlock()
|
|
|
|
|
|
|
|
waitPreLogging(c, now)
|
2020-03-27 12:35:57 +01:00
|
|
|
}
|
|
|
|
|
zfscmd: fix crash in zfscmd_prometheus.go due to incorrectly extracted ProcessState
fixup of 96e188d7c4837ff1188ab923daeb0a0f4cce31ae
refs #196
refs #301
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x10 pc=0x9a472a]
goroutine 15826 [running]:
os.(*ProcessState).systemTime(...)
/home/cs/go1.13/src/os/exec_unix.go:98
os.(*ProcessState).SystemTime(...)
/home/cs/go1.13/src/os/exec.go:141
github.com/zrepl/zrepl/zfs/zfscmd.waitPostPrometheus(0xc000c04800, 0xe21ce0, 0xc000068270, 0xbf9f80d88107e861, 0x19bae710e6, 0x13a8b60)
/home/cs/zrepl/zrepl/zfs/zfscmd/zfscmd_prometheus.go:69 +0x22a
github.com/zrepl/zrepl/zfs/zfscmd.(*Cmd).waitPost(0xc000c04800, 0xe21ce0, 0xc000068270)
/home/cs/zrepl/zrepl/zfs/zfscmd/zfscmd.go:155 +0x18a
github.com/zrepl/zrepl/zfs/zfscmd.(*Cmd).CombinedOutput(0xc000c04800, 0xc0004b8270, 0xd02eea, 0x3, 0xc0001f6c40, 0x3)
/home/cs/zrepl/zrepl/zfs/zfscmd/zfscmd.go:40 +0xb3
github.com/zrepl/zrepl/zfs.ZFSRelease(0xe36aa0, 0xc0004b8270, 0xc0009a3a40, 0x13, 0xc0004a5d00, 0x1, 0x1, 0xed62eb221, 0x13a8b60)
/home/cs/zrepl/zrepl/zfs/holds.go:102 +0x2a7
github.com/zrepl/zrepl/endpoint.ReleaseStep(0xe36aa0, 0xc0004b8270, 0xc0004befc0, 0xe, 0xd08482, 0x8, 0xc0001cb02f, 0x2, 0x1eeea3bff89dc90b, 0x134d6, ...)
/home/cs/zrepl/zrepl/endpoint/endpoint_zfs_abstraction_step.go:130 +0x367
github.com/zrepl/zrepl/endpoint.(*Sender).SendCompleted.func2(0xc000459190, 0xc000390e30, 0xc00041fd80, 0xc0004befc0, 0xe, 0xd08482, 0x8, 0xc0001cb02f, 0x2, 0x1eeea3bff89dc90b, ...)
/home/cs/zrepl/zrepl/endpoint/endpoint.go:419 +0x1c3
created by github.com/zrepl/zrepl/endpoint.(*Sender).SendCompleted
/home/cs/zrepl/zrepl/endpoint/endpoint.go:413 +0x776
2020-04-20 13:57:19 +02:00
|
|
|
type usage struct {
|
|
|
|
total_secs, system_secs, user_secs float64
|
|
|
|
}
|
|
|
|
|
2020-03-27 12:35:57 +01:00
|
|
|
func (c *Cmd) waitPost(err error) {
|
|
|
|
now := time.Now()
|
|
|
|
|
|
|
|
c.mtx.Lock()
|
2020-04-21 11:57:56 +02:00
|
|
|
// ignore duplicate waits
|
|
|
|
if !c.waitReturnedAt.IsZero() {
|
|
|
|
c.mtx.Unlock()
|
|
|
|
return
|
|
|
|
}
|
2020-03-27 12:35:57 +01:00
|
|
|
c.waitReturnedAt = now
|
|
|
|
c.mtx.Unlock()
|
|
|
|
|
zfscmd: fix crash in zfscmd_prometheus.go due to incorrectly extracted ProcessState
fixup of 96e188d7c4837ff1188ab923daeb0a0f4cce31ae
refs #196
refs #301
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x10 pc=0x9a472a]
goroutine 15826 [running]:
os.(*ProcessState).systemTime(...)
/home/cs/go1.13/src/os/exec_unix.go:98
os.(*ProcessState).SystemTime(...)
/home/cs/go1.13/src/os/exec.go:141
github.com/zrepl/zrepl/zfs/zfscmd.waitPostPrometheus(0xc000c04800, 0xe21ce0, 0xc000068270, 0xbf9f80d88107e861, 0x19bae710e6, 0x13a8b60)
/home/cs/zrepl/zrepl/zfs/zfscmd/zfscmd_prometheus.go:69 +0x22a
github.com/zrepl/zrepl/zfs/zfscmd.(*Cmd).waitPost(0xc000c04800, 0xe21ce0, 0xc000068270)
/home/cs/zrepl/zrepl/zfs/zfscmd/zfscmd.go:155 +0x18a
github.com/zrepl/zrepl/zfs/zfscmd.(*Cmd).CombinedOutput(0xc000c04800, 0xc0004b8270, 0xd02eea, 0x3, 0xc0001f6c40, 0x3)
/home/cs/zrepl/zrepl/zfs/zfscmd/zfscmd.go:40 +0xb3
github.com/zrepl/zrepl/zfs.ZFSRelease(0xe36aa0, 0xc0004b8270, 0xc0009a3a40, 0x13, 0xc0004a5d00, 0x1, 0x1, 0xed62eb221, 0x13a8b60)
/home/cs/zrepl/zrepl/zfs/holds.go:102 +0x2a7
github.com/zrepl/zrepl/endpoint.ReleaseStep(0xe36aa0, 0xc0004b8270, 0xc0004befc0, 0xe, 0xd08482, 0x8, 0xc0001cb02f, 0x2, 0x1eeea3bff89dc90b, 0x134d6, ...)
/home/cs/zrepl/zrepl/endpoint/endpoint_zfs_abstraction_step.go:130 +0x367
github.com/zrepl/zrepl/endpoint.(*Sender).SendCompleted.func2(0xc000459190, 0xc000390e30, 0xc00041fd80, 0xc0004befc0, 0xe, 0xd08482, 0x8, 0xc0001cb02f, 0x2, 0x1eeea3bff89dc90b, ...)
/home/cs/zrepl/zrepl/endpoint/endpoint.go:419 +0x1c3
created by github.com/zrepl/zrepl/endpoint.(*Sender).SendCompleted
/home/cs/zrepl/zrepl/endpoint/endpoint.go:413 +0x776
2020-04-20 13:57:19 +02:00
|
|
|
// build usage
|
|
|
|
var u usage
|
|
|
|
{
|
|
|
|
var s *os.ProcessState
|
|
|
|
if err == nil {
|
|
|
|
s = c.cmd.ProcessState
|
|
|
|
} else if ee, ok := err.(*exec.ExitError); ok {
|
|
|
|
s = ee.ProcessState
|
|
|
|
}
|
|
|
|
|
|
|
|
if s == nil {
|
|
|
|
u = usage{
|
|
|
|
total_secs: c.Runtime().Seconds(),
|
|
|
|
system_secs: -1,
|
|
|
|
user_secs: -1,
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
u = usage{
|
|
|
|
total_secs: c.Runtime().Seconds(),
|
|
|
|
system_secs: s.SystemTime().Seconds(),
|
|
|
|
user_secs: s.UserTime().Seconds(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
waitPostReport(c, u, now)
|
|
|
|
waitPostLogging(c, u, err, now)
|
|
|
|
waitPostPrometheus(c, u, err, now)
|
2020-04-11 15:49:41 +02:00
|
|
|
|
|
|
|
// must be last because c.ctx might be used by other waitPost calls
|
|
|
|
c.waitReturnEndSpanCb()
|
2020-03-27 12:35:57 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// returns 0 if the command did not yet finish
|
|
|
|
func (c *Cmd) Runtime() time.Duration {
|
|
|
|
if c.waitReturnedAt.IsZero() {
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
return c.waitReturnedAt.Sub(c.startedAt)
|
|
|
|
}
|
zfs: rewrite SendStream, fix bug in Close() on FreeBSD, add platformtests
This commit was motivated by https://github.com/zrepl/zrepl/issues/495
where, on FreeBSD with OpenZFS 2.0, a SendStream.Close() call might wait indefinitely for `zfs send` to exit.
The reason is that, due to the refactoring done for redacted send & recv
(https://github.com/openzfs/zfs/commit/30af21b02569ac192f52ce6e6511015f8a8d5729),
the `dump_bytes` function, which writes to the pipe, executes in a separate thread (synctask taskq) iff not `HAVE_LARGE_STACKS`.
The `zfs send` process/thread waits for that taskq thread using an uninterruptible primitive.
So when we SIGKILL `zfs send`, that signal doesn't reach the right thread to interrupt the pipe write.
Theoretically this affects both Linux and FreeBSD, but most Linux users `HAVE_LARGE_STACKS` and since https://github.com/penzfs/zfs/pull/12350/files OpenZFS on FreeBSD `HAVE_LARGE_STACKS` as well.
However, at least until FreeBSD 13.1, possibly for the entire 13 lifecycle, we're going to have to live with that oddity.
Measures taken in this commit:
- Report the behavior as an upstream bug https://github.com/openzfs/zfs/issues/12500
- Change SendStream code so that it closes zrepl's read-end of the pipe (see comment in code)
- Clean up and make explicit SendStream's state handling
- Write extensive platformtests for SendStream
- They pass on my Linux install and on FreeBSD 12
- FreeBSD 13 still needs testing.
fixes https://github.com/zrepl/zrepl/issues/495
2021-08-20 17:43:28 +02:00
|
|
|
|
|
|
|
func (c *Cmd) TestOnly_ExecCmd() *exec.Cmd {
|
|
|
|
return c.cmd
|
|
|
|
}
|