zfs: introduce pkg zfs/zfscmd for command logging, status, prometheus metrics

refs #196
This commit is contained in:
Christian Schwarz
2020-03-27 12:35:57 +01:00
parent 9568e46f05
commit 1336c91865
22 changed files with 1035 additions and 119 deletions

View File

@ -21,6 +21,7 @@ import (
"github.com/zrepl/zrepl/util/circlog"
"github.com/zrepl/zrepl/util/envconst"
"github.com/zrepl/zrepl/zfs/zfscmd"
)
var (
@ -172,13 +173,9 @@ func ZFSList(ctx context.Context, properties []string, zfsArgs ...string) (res [
"-o", strings.Join(properties, ","))
args = append(args, zfsArgs...)
cmd := exec.CommandContext(ctx, ZFS_BINARY, args...)
var stdout io.Reader
stderr := bytes.NewBuffer(make([]byte, 0, 1024))
cmd.Stderr = stderr
if stdout, err = cmd.StdoutPipe(); err != nil {
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
stdout, stderrBuf, err := cmd.StdoutPipeWithErrorBuf()
if err != nil {
return
}
@ -205,7 +202,7 @@ func ZFSList(ctx context.Context, properties []string, zfsArgs ...string) (res [
if waitErr := cmd.Wait(); waitErr != nil {
err := &ZFSError{
Stderr: stderr.Bytes(),
Stderr: stderrBuf.Bytes(),
WaitErr: waitErr,
}
return nil, err
@ -244,15 +241,12 @@ func ZFSListChan(ctx context.Context, out chan ZFSListResult, properties []strin
}
}
cmd := exec.CommandContext(ctx, ZFS_BINARY, args...)
stdout, err := cmd.StdoutPipe()
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
stdout, stderrBuf, err := cmd.StdoutPipeWithErrorBuf()
if err != nil {
sendResult(nil, err)
return
}
// TODO bounded buffer
stderr := bytes.NewBuffer(make([]byte, 0, 1024))
cmd.Stderr = stderr
if err = cmd.Start(); err != nil {
sendResult(nil, err)
return
@ -280,7 +274,7 @@ func ZFSListChan(ctx context.Context, out chan ZFSListResult, properties []strin
if err := cmd.Wait(); err != nil {
if err, ok := err.(*exec.ExitError); ok {
sendResult(nil, &ZFSError{
Stderr: stderr.Bytes(),
Stderr: stderrBuf.Bytes(),
WaitErr: err,
})
} else {
@ -419,7 +413,7 @@ func pipeWithCapacityHint(capacity int) (r, w *os.File, err error) {
}
type sendStream struct {
cmd *exec.Cmd
cmd *zfscmd.Cmd
kill context.CancelFunc
closeMtx sync.Mutex
@ -836,7 +830,6 @@ func ZFSSend(ctx context.Context, sendArgs ZFSSendArgs) (*ReadCloserCopier, erro
args = append(args, sargs...)
ctx, cancel := context.WithCancel(ctx)
cmd := exec.CommandContext(ctx, ZFS_BINARY, args...)
// setup stdout with an os.Pipe to control pipe buffer size
stdoutReader, stdoutWriter, err := pipeWithCapacityHint(ZFSSendPipeCapacityHint)
@ -844,11 +837,14 @@ func ZFSSend(ctx context.Context, sendArgs ZFSSendArgs) (*ReadCloserCopier, erro
cancel()
return nil, err
}
cmd.Stdout = stdoutWriter
stderrBuf := circlog.MustNewCircularLog(zfsSendStderrCaptureMaxSize)
cmd.Stderr = stderrBuf
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
cmd.SetStdio(zfscmd.Stdio{
Stdin: nil,
Stdout: stdoutWriter,
Stderr: stderrBuf,
})
if err := cmd.Start(); err != nil {
cancel()
@ -856,6 +852,7 @@ func ZFSSend(ctx context.Context, sendArgs ZFSSendArgs) (*ReadCloserCopier, erro
stdoutReader.Close()
return nil, errors.Wrap(err, "cannot start zfs send command")
}
// close our writing-end of the pipe so that we don't wait for ourselves when reading from the reading end
stdoutWriter.Close()
stream := &sendStream{
@ -996,7 +993,7 @@ func ZFSSendDry(ctx context.Context, sendArgs ZFSSendArgs) (_ *DrySendInfo, err
}
args = append(args, sargs...)
cmd := exec.CommandContext(ctx, ZFS_BINARY, args...)
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
output, err := cmd.CombinedOutput()
if err != nil {
return nil, &ZFSError{output, err}
@ -1115,24 +1112,26 @@ func ZFSRecv(ctx context.Context, fs string, v *ZFSSendArgVersion, streamCopier
ctx, cancelCmd := context.WithCancel(ctx)
defer cancelCmd()
cmd := exec.CommandContext(ctx, ZFS_BINARY, args...)
stderr := bytes.NewBuffer(make([]byte, 0, 1024))
cmd.Stderr = stderr
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
// TODO report bug upstream
// Setup an unused stdout buffer.
// Otherwise, ZoL v0.6.5.9-1 3.16.0-4-amd64 writes the following error to stderr and exits with code 1
// cannot receive new filesystem stream: invalid backup stream
stdout := bytes.NewBuffer(make([]byte, 0, 1024))
cmd.Stdout = stdout
stderr := bytes.NewBuffer(make([]byte, 0, 1024))
stdin, stdinWriter, err := pipeWithCapacityHint(ZFSRecvPipeCapacityHint)
if err != nil {
return err
}
cmd.Stdin = stdin
cmd.SetStdio(zfscmd.Stdio{
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
})
if err = cmd.Start(); err != nil {
stdinWriter.Close()
@ -1142,7 +1141,7 @@ func ZFSRecv(ctx context.Context, fs string, v *ZFSSendArgVersion, streamCopier
stdin.Close()
defer stdinWriter.Close()
pid := cmd.Process.Pid
pid := cmd.Process().Pid
debug := func(format string, args ...interface{}) {
debug("recv: pid=%v: %s", pid, fmt.Sprintf(format, args...))
}
@ -1230,7 +1229,7 @@ func ZFSRecvClearResumeToken(ctx context.Context, fs string) (err error) {
return err
}
cmd := exec.CommandContext(ctx, ZFS_BINARY, "recv", "-A", fs)
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, "recv", "-A", fs)
o, err := cmd.CombinedOutput()
if err != nil {
if bytes.Contains(o, []byte("does not have any resumable receive state to abort")) {
@ -1280,18 +1279,11 @@ func zfsSet(ctx context.Context, path string, props *ZFSProperties) (err error)
}
args = append(args, path)
cmd := exec.CommandContext(ctx, ZFS_BINARY, args...)
stderr := bytes.NewBuffer(make([]byte, 0, 1024))
cmd.Stderr = stderr
if err = cmd.Start(); err != nil {
return err
}
if err = cmd.Wait(); err != nil {
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
stdio, err := cmd.CombinedOutput()
if err != nil {
err = &ZFSError{
Stderr: stderr.Bytes(),
Stderr: stdio,
WaitErr: err,
}
}
@ -1414,7 +1406,7 @@ func (s zfsPropertySource) zfsGetSourceFieldPrefixes() []string {
func zfsGet(ctx context.Context, path string, props []string, allowedSources zfsPropertySource) (*ZFSProperties, error) {
args := []string{"get", "-Hp", "-o", "property,value,source", strings.Join(props, ","), path}
cmd := exec.CommandContext(ctx, ZFS_BINARY, args...)
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
stdout, err := cmd.Output()
if err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
@ -1576,28 +1568,21 @@ func ZFSDestroy(ctx context.Context, arg string) (err error) {
defer prometheus.NewTimer(prom.ZFSDestroyDuration.WithLabelValues(dstype, filesystem))
cmd := exec.CommandContext(ctx, ZFS_BINARY, "destroy", arg)
var stderr bytes.Buffer
cmd.Stderr = &stderr
if err = cmd.Start(); err != nil {
return err
}
if err = cmd.Wait(); err != nil {
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, "destroy", arg)
stdio, err := cmd.CombinedOutput()
if err != nil {
err = &ZFSError{
Stderr: stderr.Bytes(),
Stderr: stdio,
WaitErr: err,
}
if destroyOneOrMoreSnapshotsNoneExistedErrorRegexp.Match(stderr.Bytes()) {
if destroyOneOrMoreSnapshotsNoneExistedErrorRegexp.Match(stdio) {
err = &DatasetDoesNotExist{arg}
} else if match := destroyBookmarkDoesNotExist.FindStringSubmatch(stderr.String()); match != nil && match[1] == arg {
} else if match := destroyBookmarkDoesNotExist.FindStringSubmatch(string(stdio)); match != nil && match[1] == arg {
err = &DatasetDoesNotExist{arg}
} else if dsNotExistErr := tryDatasetDoesNotExist(filesystem, stderr.Bytes()); dsNotExistErr != nil {
} else if dsNotExistErr := tryDatasetDoesNotExist(filesystem, stdio); dsNotExistErr != nil {
err = dsNotExistErr
} else if dserr := tryParseDestroySnapshotsError(arg, stderr.Bytes()); dserr != nil {
} else if dserr := tryParseDestroySnapshotsError(arg, stdio); dserr != nil {
err = dserr
}
@ -1624,18 +1609,12 @@ func ZFSSnapshot(ctx context.Context, fs *DatasetPath, name string, recursive bo
if err := EntityNamecheck(snapname, EntityTypeSnapshot); err != nil {
return errors.Wrap(err, "zfs snapshot")
}
cmd := exec.CommandContext(ctx, ZFS_BINARY, "snapshot", snapname)
stderr := bytes.NewBuffer(make([]byte, 0, 1024))
cmd.Stderr = stderr
if err = cmd.Start(); err != nil {
return err
}
if err = cmd.Wait(); err != nil {
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, "snapshot", snapname)
stdio, err := cmd.CombinedOutput()
if err != nil {
err = &ZFSError{
Stderr: stderr.Bytes(),
Stderr: stdio,
WaitErr: err,
}
}
@ -1687,20 +1666,12 @@ func ZFSBookmark(ctx context.Context, fs string, v ZFSSendArgVersion, bookmark s
debug("bookmark: %q %q", snapname, bookmarkname)
cmd := exec.CommandContext(ctx, ZFS_BINARY, "bookmark", snapname, bookmarkname)
stderr := bytes.NewBuffer(make([]byte, 0, 1024))
cmd.Stderr = stderr
if err = cmd.Start(); err != nil {
return err
}
if err = cmd.Wait(); err != nil {
if ddne := tryDatasetDoesNotExist(snapname, stderr.Bytes()); err != nil {
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, "bookmark", snapname, bookmarkname)
stdio, err := cmd.CombinedOutput()
if err != nil {
if ddne := tryDatasetDoesNotExist(snapname, stdio); err != nil {
return ddne
} else if zfsBookmarkExistsRegex.Match(stderr.Bytes()) {
} else if zfsBookmarkExistsRegex.Match(stdio) {
// check if this was idempotent
bookGuid, err := ZFSGetGUID(ctx, fs, "#"+bookmark)
@ -1714,13 +1685,13 @@ func ZFSBookmark(ctx context.Context, fs string, v ZFSSendArgVersion, bookmark s
}
return &BookmarkExists{
fs: fs, bookmarkOrigin: v, bookmark: bookmark,
zfsMsg: stderr.String(),
zfsMsg: string(stdio),
bookGuid: bookGuid,
}
} else {
return &ZFSError{
Stderr: stderr.Bytes(),
Stderr: stdio,
WaitErr: err,
}
}
@ -1742,18 +1713,11 @@ func ZFSRollback(ctx context.Context, fs *DatasetPath, snapshot FilesystemVersio
args = append(args, rollbackArgs...)
args = append(args, snapabs)
cmd := exec.CommandContext(ctx, ZFS_BINARY, args...)
stderr := bytes.NewBuffer(make([]byte, 0, 1024))
cmd.Stderr = stderr
if err = cmd.Start(); err != nil {
return err
}
if err = cmd.Wait(); err != nil {
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
stdio, err := cmd.CombinedOutput()
if err != nil {
err = &ZFSError{
Stderr: stderr.Bytes(),
Stderr: stdio,
WaitErr: err,
}
}