zfs: introduce pkg zfs/zfscmd for command logging, status, prometheus metrics

refs #196
This commit is contained in:
Christian Schwarz
2020-03-27 12:35:57 +01:00
parent 9568e46f05
commit 4e13ea46de
17 changed files with 511 additions and 119 deletions

View File

@ -81,7 +81,7 @@ type tui struct {
indent int indent int
lock sync.Mutex //For report and error lock sync.Mutex //For report and error
report map[string]job.Status report map[string]*job.Status
err error err error
jobFilter string jobFilter string
@ -219,7 +219,7 @@ func runStatus(s *cli.Subcommand, args []string) error {
defer termbox.Close() defer termbox.Close()
update := func() { update := func() {
m := make(map[string]job.Status) var m daemon.Status
err2 := jsonRequestResponse(httpc, daemon.ControlJobEndpointStatus, err2 := jsonRequestResponse(httpc, daemon.ControlJobEndpointStatus,
struct{}{}, struct{}{},
@ -228,7 +228,7 @@ func runStatus(s *cli.Subcommand, args []string) error {
t.lock.Lock() t.lock.Lock()
t.err = err2 t.err = err2
t.report = m t.report = m.Jobs
t.lock.Unlock() t.lock.Unlock()
t.draw() t.draw()
} }

View File

@ -20,6 +20,7 @@ import (
"github.com/zrepl/zrepl/util/envconst" "github.com/zrepl/zrepl/util/envconst"
"github.com/zrepl/zrepl/version" "github.com/zrepl/zrepl/version"
"github.com/zrepl/zrepl/zfs" "github.com/zrepl/zrepl/zfs"
"github.com/zrepl/zrepl/zfs/zfscmd"
) )
type controlJob struct { type controlJob struct {
@ -117,7 +118,9 @@ func (j *controlJob) Run(ctx context.Context) {
mux.Handle(ControlJobEndpointStatus, mux.Handle(ControlJobEndpointStatus,
// don't log requests to status endpoint, too spammy // don't log requests to status endpoint, too spammy
jsonResponder{log, func() (interface{}, error) { jsonResponder{log, func() (interface{}, error) {
s := j.jobs.status() jobs := j.jobs.status()
globalZFS := zfscmd.GetReport()
s := Status{Jobs: jobs, Global: GlobalStatus{ZFSCmds: globalZFS}}
return s, nil return s, nil
}}) }})

View File

@ -20,6 +20,7 @@ import (
"github.com/zrepl/zrepl/daemon/logging" "github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/version" "github.com/zrepl/zrepl/version"
"github.com/zrepl/zrepl/zfs/zfscmd"
) )
func Run(conf *config.Config) error { func Run(conf *config.Config) error {
@ -81,6 +82,9 @@ func Run(conf *config.Config) error {
jobs.start(ctx, job, true) jobs.start(ctx, job, true)
} }
// register global (=non job-local) metrics
zfscmd.RegisterMetrics(prometheus.DefaultRegisterer)
log.Info("starting daemon") log.Info("starting daemon")
// start regular jobs // start regular jobs
@ -128,6 +132,15 @@ func (s *jobs) wait() <-chan struct{} {
return ch return ch
} }
type Status struct {
Jobs map[string]*job.Status
Global GlobalStatus
}
type GlobalStatus struct {
ZFSCmds *zfscmd.Report
}
func (s *jobs) status() map[string]*job.Status { func (s *jobs) status() map[string]*job.Status {
s.m.RLock() s.m.RLock()
defer s.m.RUnlock() defer s.m.RUnlock()
@ -207,6 +220,7 @@ func (s *jobs) start(ctx context.Context, j job.Job, internal bool) {
s.jobs[jobName] = j s.jobs[jobName] = j
ctx = job.WithLogger(ctx, jobLog) ctx = job.WithLogger(ctx, jobLog)
ctx = zfscmd.WithJobID(ctx, j.Name())
ctx, wakeup := wakeup.Context(ctx) ctx, wakeup := wakeup.Context(ctx)
ctx, resetFunc := reset.Context(ctx) ctx, resetFunc := reset.Context(ctx)
s.wakeups[jobName] = wakeup s.wakeups[jobName] = wakeup

View File

@ -22,6 +22,7 @@ import (
"github.com/zrepl/zrepl/rpc/transportmux" "github.com/zrepl/zrepl/rpc/transportmux"
"github.com/zrepl/zrepl/tlsconf" "github.com/zrepl/zrepl/tlsconf"
"github.com/zrepl/zrepl/transport" "github.com/zrepl/zrepl/transport"
"github.com/zrepl/zrepl/zfs/zfscmd"
) )
func OutletsFromConfig(in config.LoggingOutletEnumList) (*logger.Outlets, error) { func OutletsFromConfig(in config.LoggingOutletEnumList) (*logger.Outlets, error) {
@ -79,6 +80,7 @@ const (
SubsysRPC Subsystem = "rpc" SubsysRPC Subsystem = "rpc"
SubsysRPCControl Subsystem = "rpc.ctrl" SubsysRPCControl Subsystem = "rpc.ctrl"
SubsysRPCData Subsystem = "rpc.data" SubsysRPCData Subsystem = "rpc.data"
SubsysZFSCmd Subsystem = "zfs.cmd"
) )
func WithSubsystemLoggers(ctx context.Context, log logger.Logger) context.Context { func WithSubsystemLoggers(ctx context.Context, log logger.Logger) context.Context {
@ -90,6 +92,7 @@ func WithSubsystemLoggers(ctx context.Context, log logger.Logger) context.Contex
ctx = hooks.WithLogger(ctx, log.WithField(SubsysField, SubsysHooks)) ctx = hooks.WithLogger(ctx, log.WithField(SubsysField, SubsysHooks))
ctx = transport.WithLogger(ctx, log.WithField(SubsysField, SubsysTransport)) ctx = transport.WithLogger(ctx, log.WithField(SubsysField, SubsysTransport))
ctx = transportmux.WithLogger(ctx, log.WithField(SubsysField, SubsysTransportMux)) ctx = transportmux.WithLogger(ctx, log.WithField(SubsysField, SubsysTransportMux))
ctx = zfscmd.WithLogger(ctx, log.WithField(SubsysField, SubsysZFSCmd))
ctx = rpc.WithLoggers(ctx, ctx = rpc.WithLoggers(ctx,
rpc.Loggers{ rpc.Loggers{
General: log.WithField(SubsysField, SubsysRPC), General: log.WithField(SubsysField, SubsysRPC),

View File

@ -10,6 +10,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/zrepl/zrepl/util/envconst" "github.com/zrepl/zrepl/util/envconst"
"github.com/zrepl/zrepl/zfs/zfscmd"
) )
var encryptionCLISupport struct { var encryptionCLISupport struct {
@ -21,7 +22,7 @@ var encryptionCLISupport struct {
func EncryptionCLISupported(ctx context.Context) (bool, error) { func EncryptionCLISupported(ctx context.Context) (bool, error) {
encryptionCLISupport.once.Do(func() { encryptionCLISupport.once.Do(func() {
// "feature discovery" // "feature discovery"
cmd := exec.CommandContext(ctx, "zfs", "load-key") cmd := zfscmd.CommandContext(ctx, "zfs", "load-key")
output, err := cmd.CombinedOutput() output, err := cmd.CombinedOutput()
if ee, ok := err.(*exec.ExitError); !ok || ok && !ee.Exited() { if ee, ok := err.(*exec.ExitError); !ok || ok && !ee.Exited() {
encryptionCLISupport.err = errors.Wrap(err, "native encryption cli support feature check failed") encryptionCLISupport.err = errors.Wrap(err, "native encryption cli support feature check failed")

View File

@ -6,7 +6,6 @@ import (
"context" "context"
"fmt" "fmt"
"os" "os"
"os/exec"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
@ -15,6 +14,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/zrepl/zrepl/util/envconst" "github.com/zrepl/zrepl/util/envconst"
"github.com/zrepl/zrepl/zfs/zfscmd"
) )
// no need for feature tests, holds have been around forever // no need for feature tests, holds have been around forever
@ -48,7 +48,7 @@ func ZFSHold(ctx context.Context, fs string, v ZFSSendArgVersion, tag string) er
return err return err
} }
fullPath := v.FullPath(fs) fullPath := v.FullPath(fs)
output, err := exec.CommandContext(ctx, "zfs", "hold", tag, fullPath).CombinedOutput() output, err := zfscmd.CommandContext(ctx, "zfs", "hold", tag, fullPath).CombinedOutput()
if err != nil { if err != nil {
if bytes.Contains(output, []byte("tag already exists on this dataset")) { if bytes.Contains(output, []byte("tag already exists on this dataset")) {
goto success goto success
@ -67,7 +67,7 @@ func ZFSHolds(ctx context.Context, fs, snap string) ([]string, error) {
return nil, fmt.Errorf("`snap` must not be empty") return nil, fmt.Errorf("`snap` must not be empty")
} }
dp := fmt.Sprintf("%s@%s", fs, snap) dp := fmt.Sprintf("%s@%s", fs, snap)
output, err := exec.CommandContext(ctx, "zfs", "holds", "-H", dp).CombinedOutput() output, err := zfscmd.CommandContext(ctx, "zfs", "holds", "-H", dp).CombinedOutput()
if err != nil { if err != nil {
return nil, &ZFSError{output, errors.Wrap(err, "zfs holds failed")} return nil, &ZFSError{output, errors.Wrap(err, "zfs holds failed")}
} }
@ -104,11 +104,13 @@ func ZFSRelease(ctx context.Context, tag string, snaps ...string) error {
} }
args := []string{"release", tag} args := []string{"release", tag}
args = append(args, snaps[i:j]...) args = append(args, snaps[i:j]...)
output, err := exec.CommandContext(ctx, "zfs", args...).CombinedOutput() output, err := zfscmd.CommandContext(ctx, "zfs", args...).CombinedOutput()
if pe, ok := err.(*os.PathError); err != nil && ok && pe.Err == syscall.E2BIG { if pe, ok := err.(*os.PathError); err != nil && ok && pe.Err == syscall.E2BIG {
maxInvocationLen = maxInvocationLen / 2 maxInvocationLen = maxInvocationLen / 2
continue continue
} }
// further error handling part of error scraper below
maxInvocationLen = maxInvocationLen + os.Getpagesize() maxInvocationLen = maxInvocationLen + os.Getpagesize()
i = j i = j
@ -166,7 +168,7 @@ func doZFSReleaseAllOlderAndIncOrExcludingGUID(ctx context.Context, fs string, s
return fmt.Errorf("`tag` must not be empty`") return fmt.Errorf("`tag` must not be empty`")
} }
output, err := exec.CommandContext(ctx, output, err := zfscmd.CommandContext(ctx,
"zfs", "list", "-o", "type,name,createtxg,guid,userrefs", "zfs", "list", "-o", "type,name,createtxg,guid,userrefs",
"-H", "-t", "snapshot,bookmark", "-r", "-d", "1", fs).CombinedOutput() "-H", "-t", "snapshot,bookmark", "-r", "-d", "1", fs).CombinedOutput()
if err != nil { if err != nil {

View File

@ -1,12 +1,12 @@
package zfs package zfs
import ( import (
"bytes"
"context" "context"
"crypto/sha512" "crypto/sha512"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"os/exec"
"github.com/zrepl/zrepl/zfs/zfscmd"
) )
const ( const (
@ -82,21 +82,15 @@ func ZFSCreatePlaceholderFilesystem(ctx context.Context, p *DatasetPath) (err er
if p.Length() == 1 { if p.Length() == 1 {
return fmt.Errorf("cannot create %q: pools cannot be created with zfs create", p.ToString()) return fmt.Errorf("cannot create %q: pools cannot be created with zfs create", p.ToString())
} }
cmd := exec.CommandContext(ctx, ZFS_BINARY, "create", cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, "create",
"-o", fmt.Sprintf("%s=%s", PlaceholderPropertyName, placeholderPropertyOn), "-o", fmt.Sprintf("%s=%s", PlaceholderPropertyName, placeholderPropertyOn),
"-o", "mountpoint=none", "-o", "mountpoint=none",
p.ToString()) p.ToString())
stderr := bytes.NewBuffer(make([]byte, 0, 1024)) stdio, err := cmd.CombinedOutput()
cmd.Stderr = stderr if err != nil {
if err = cmd.Start(); err != nil {
return err
}
if err = cmd.Wait(); err != nil {
err = &ZFSError{ err = &ZFSError{
Stderr: stderr.Bytes(), Stderr: stdio,
WaitErr: err, WaitErr: err,
} }
} }

View File

@ -13,6 +13,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/zrepl/zrepl/util/envconst" "github.com/zrepl/zrepl/util/envconst"
"github.com/zrepl/zrepl/zfs/zfscmd"
) )
// NOTE: Update ZFSSendARgs.Validate when changing fields (potentially SECURITY SENSITIVE) // NOTE: Update ZFSSendARgs.Validate when changing fields (potentially SECURITY SENSITIVE)
@ -42,7 +43,7 @@ var resumeSendSupportedCheck struct {
func ResumeSendSupported(ctx context.Context) (bool, error) { func ResumeSendSupported(ctx context.Context) (bool, error) {
resumeSendSupportedCheck.once.Do(func() { resumeSendSupportedCheck.once.Do(func() {
// "feature discovery" // "feature discovery"
cmd := exec.CommandContext(ctx, "zfs", "send") cmd := zfscmd.CommandContext(ctx, "zfs", "send")
output, err := cmd.CombinedOutput() output, err := cmd.CombinedOutput()
if ee, ok := err.(*exec.ExitError); !ok || ok && !ee.Exited() { if ee, ok := err.(*exec.ExitError); !ok || ok && !ee.Exited() {
resumeSendSupportedCheck.err = errors.Wrap(err, "resumable send cli support feature check failed") resumeSendSupportedCheck.err = errors.Wrap(err, "resumable send cli support feature check failed")
@ -86,7 +87,7 @@ func ResumeRecvSupported(ctx context.Context, fs *DatasetPath) (bool, error) {
} }
if !sup.flagSupport.checked { if !sup.flagSupport.checked {
output, err := exec.CommandContext(ctx, "zfs", "receive").CombinedOutput() output, err := zfscmd.CommandContext(ctx, ZFS_BINARY, "receive").CombinedOutput()
upgradeWhile(func() { upgradeWhile(func() {
sup.flagSupport.checked = true sup.flagSupport.checked = true
if ee, ok := err.(*exec.ExitError); err != nil && (!ok || ok && !ee.Exited()) { if ee, ok := err.(*exec.ExitError); err != nil && (!ok || ok && !ee.Exited()) {
@ -124,7 +125,7 @@ func ResumeRecvSupported(ctx context.Context, fs *DatasetPath) (bool, error) {
if poolSup, ok = sup.poolSupported[pool]; !ok || // shadow if poolSup, ok = sup.poolSupported[pool]; !ok || // shadow
(!poolSup.supported && time.Since(poolSup.lastCheck) > resumeRecvPoolSupportRecheckTimeout) { (!poolSup.supported && time.Since(poolSup.lastCheck) > resumeRecvPoolSupportRecheckTimeout) {
output, err := exec.CommandContext(ctx, "zpool", "get", "-H", "-p", "-o", "value", "feature@extensible_dataset", pool).CombinedOutput() output, err := zfscmd.CommandContext(ctx, "zpool", "get", "-H", "-p", "-o", "value", "feature@extensible_dataset", pool).CombinedOutput()
if err != nil { if err != nil {
debug("resume recv pool support check result: %#v", sup.flagSupport) debug("resume recv pool support check result: %#v", sup.flagSupport)
poolSup.supported = false poolSup.supported = false
@ -181,7 +182,7 @@ func ParseResumeToken(ctx context.Context, token string) (*ResumeToken, error) {
// toname = pool1/test@b // toname = pool1/test@b
//cannot resume send: 'pool1/test@b' used in the initial send no longer exists //cannot resume send: 'pool1/test@b' used in the initial send no longer exists
cmd := exec.CommandContext(ctx, ZFS_BINARY, "send", "-nvt", string(token)) cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, "send", "-nvt", string(token))
output, err := cmd.CombinedOutput() output, err := cmd.CombinedOutput()
if err != nil { if err != nil {
if exitErr, ok := err.(*exec.ExitError); ok { if exitErr, ok := err.(*exec.ExitError); ok {

View File

@ -11,6 +11,7 @@ import (
"syscall" "syscall"
"github.com/zrepl/zrepl/util/envconst" "github.com/zrepl/zrepl/util/envconst"
"github.com/zrepl/zrepl/zfs/zfscmd"
) )
func ZFSDestroyFilesystemVersion(ctx context.Context, filesystem *DatasetPath, version *FilesystemVersion) (err error) { func ZFSDestroyFilesystemVersion(ctx context.Context, filesystem *DatasetPath, version *FilesystemVersion) (err error) {
@ -224,7 +225,7 @@ var batchDestroyFeatureCheck struct {
func (d destroyerImpl) DestroySnapshotsCommaSyntaxSupported(ctx context.Context) (bool, error) { func (d destroyerImpl) DestroySnapshotsCommaSyntaxSupported(ctx context.Context) (bool, error) {
batchDestroyFeatureCheck.once.Do(func() { batchDestroyFeatureCheck.once.Do(func() {
// "feature discovery" // "feature discovery"
cmd := exec.CommandContext(ctx, ZFS_BINARY, "destroy") cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, "destroy")
output, err := cmd.CombinedOutput() output, err := cmd.CombinedOutput()
if _, ok := err.(*exec.ExitError); !ok { if _, ok := err.(*exec.ExitError); !ok {
debug("destroy feature check failed: %T %s", err, err) debug("destroy feature check failed: %T %s", err, err)

View File

@ -21,6 +21,7 @@ import (
"github.com/zrepl/zrepl/util/circlog" "github.com/zrepl/zrepl/util/circlog"
"github.com/zrepl/zrepl/util/envconst" "github.com/zrepl/zrepl/util/envconst"
"github.com/zrepl/zrepl/zfs/zfscmd"
) )
var ( var (
@ -172,13 +173,9 @@ func ZFSList(ctx context.Context, properties []string, zfsArgs ...string) (res [
"-o", strings.Join(properties, ",")) "-o", strings.Join(properties, ","))
args = append(args, zfsArgs...) args = append(args, zfsArgs...)
cmd := exec.CommandContext(ctx, ZFS_BINARY, args...) cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
stdout, stderrBuf, err := cmd.StdoutPipeWithErrorBuf()
var stdout io.Reader if err != nil {
stderr := bytes.NewBuffer(make([]byte, 0, 1024))
cmd.Stderr = stderr
if stdout, err = cmd.StdoutPipe(); err != nil {
return return
} }
@ -205,7 +202,7 @@ func ZFSList(ctx context.Context, properties []string, zfsArgs ...string) (res [
if waitErr := cmd.Wait(); waitErr != nil { if waitErr := cmd.Wait(); waitErr != nil {
err := &ZFSError{ err := &ZFSError{
Stderr: stderr.Bytes(), Stderr: stderrBuf.Bytes(),
WaitErr: waitErr, WaitErr: waitErr,
} }
return nil, err return nil, err
@ -244,15 +241,12 @@ func ZFSListChan(ctx context.Context, out chan ZFSListResult, properties []strin
} }
} }
cmd := exec.CommandContext(ctx, ZFS_BINARY, args...) cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
stdout, err := cmd.StdoutPipe() stdout, stderrBuf, err := cmd.StdoutPipeWithErrorBuf()
if err != nil { if err != nil {
sendResult(nil, err) sendResult(nil, err)
return return
} }
// TODO bounded buffer
stderr := bytes.NewBuffer(make([]byte, 0, 1024))
cmd.Stderr = stderr
if err = cmd.Start(); err != nil { if err = cmd.Start(); err != nil {
sendResult(nil, err) sendResult(nil, err)
return return
@ -280,7 +274,7 @@ func ZFSListChan(ctx context.Context, out chan ZFSListResult, properties []strin
if err := cmd.Wait(); err != nil { if err := cmd.Wait(); err != nil {
if err, ok := err.(*exec.ExitError); ok { if err, ok := err.(*exec.ExitError); ok {
sendResult(nil, &ZFSError{ sendResult(nil, &ZFSError{
Stderr: stderr.Bytes(), Stderr: stderrBuf.Bytes(),
WaitErr: err, WaitErr: err,
}) })
} else { } else {
@ -419,7 +413,7 @@ func pipeWithCapacityHint(capacity int) (r, w *os.File, err error) {
} }
type sendStream struct { type sendStream struct {
cmd *exec.Cmd cmd *zfscmd.Cmd
kill context.CancelFunc kill context.CancelFunc
closeMtx sync.Mutex closeMtx sync.Mutex
@ -836,7 +830,6 @@ func ZFSSend(ctx context.Context, sendArgs ZFSSendArgs) (*ReadCloserCopier, erro
args = append(args, sargs...) args = append(args, sargs...)
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
cmd := exec.CommandContext(ctx, ZFS_BINARY, args...)
// setup stdout with an os.Pipe to control pipe buffer size // setup stdout with an os.Pipe to control pipe buffer size
stdoutReader, stdoutWriter, err := pipeWithCapacityHint(ZFSSendPipeCapacityHint) stdoutReader, stdoutWriter, err := pipeWithCapacityHint(ZFSSendPipeCapacityHint)
@ -844,11 +837,14 @@ func ZFSSend(ctx context.Context, sendArgs ZFSSendArgs) (*ReadCloserCopier, erro
cancel() cancel()
return nil, err return nil, err
} }
cmd.Stdout = stdoutWriter
stderrBuf := circlog.MustNewCircularLog(zfsSendStderrCaptureMaxSize) stderrBuf := circlog.MustNewCircularLog(zfsSendStderrCaptureMaxSize)
cmd.Stderr = stderrBuf
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
cmd.SetStdio(zfscmd.Stdio{
Stdin: nil,
Stdout: stdoutWriter,
Stderr: stderrBuf,
})
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
cancel() cancel()
@ -856,6 +852,7 @@ func ZFSSend(ctx context.Context, sendArgs ZFSSendArgs) (*ReadCloserCopier, erro
stdoutReader.Close() stdoutReader.Close()
return nil, errors.Wrap(err, "cannot start zfs send command") return nil, errors.Wrap(err, "cannot start zfs send command")
} }
// close our writing-end of the pipe so that we don't wait for ourselves when reading from the reading end
stdoutWriter.Close() stdoutWriter.Close()
stream := &sendStream{ stream := &sendStream{
@ -996,7 +993,7 @@ func ZFSSendDry(ctx context.Context, sendArgs ZFSSendArgs) (_ *DrySendInfo, err
} }
args = append(args, sargs...) args = append(args, sargs...)
cmd := exec.CommandContext(ctx, ZFS_BINARY, args...) cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
output, err := cmd.CombinedOutput() output, err := cmd.CombinedOutput()
if err != nil { if err != nil {
return nil, &ZFSError{output, err} return nil, &ZFSError{output, err}
@ -1115,24 +1112,26 @@ func ZFSRecv(ctx context.Context, fs string, v *ZFSSendArgVersion, streamCopier
ctx, cancelCmd := context.WithCancel(ctx) ctx, cancelCmd := context.WithCancel(ctx)
defer cancelCmd() defer cancelCmd()
cmd := exec.CommandContext(ctx, ZFS_BINARY, args...) cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
stderr := bytes.NewBuffer(make([]byte, 0, 1024))
cmd.Stderr = stderr
// TODO report bug upstream // TODO report bug upstream
// Setup an unused stdout buffer. // Setup an unused stdout buffer.
// Otherwise, ZoL v0.6.5.9-1 3.16.0-4-amd64 writes the following error to stderr and exits with code 1 // Otherwise, ZoL v0.6.5.9-1 3.16.0-4-amd64 writes the following error to stderr and exits with code 1
// cannot receive new filesystem stream: invalid backup stream // cannot receive new filesystem stream: invalid backup stream
stdout := bytes.NewBuffer(make([]byte, 0, 1024)) stdout := bytes.NewBuffer(make([]byte, 0, 1024))
cmd.Stdout = stdout
stderr := bytes.NewBuffer(make([]byte, 0, 1024))
stdin, stdinWriter, err := pipeWithCapacityHint(ZFSRecvPipeCapacityHint) stdin, stdinWriter, err := pipeWithCapacityHint(ZFSRecvPipeCapacityHint)
if err != nil { if err != nil {
return err return err
} }
cmd.Stdin = stdin cmd.SetStdio(zfscmd.Stdio{
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
})
if err = cmd.Start(); err != nil { if err = cmd.Start(); err != nil {
stdinWriter.Close() stdinWriter.Close()
@ -1142,7 +1141,7 @@ func ZFSRecv(ctx context.Context, fs string, v *ZFSSendArgVersion, streamCopier
stdin.Close() stdin.Close()
defer stdinWriter.Close() defer stdinWriter.Close()
pid := cmd.Process.Pid pid := cmd.Process().Pid
debug := func(format string, args ...interface{}) { debug := func(format string, args ...interface{}) {
debug("recv: pid=%v: %s", pid, fmt.Sprintf(format, args...)) debug("recv: pid=%v: %s", pid, fmt.Sprintf(format, args...))
} }
@ -1230,7 +1229,7 @@ func ZFSRecvClearResumeToken(ctx context.Context, fs string) (err error) {
return err return err
} }
cmd := exec.CommandContext(ctx, ZFS_BINARY, "recv", "-A", fs) cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, "recv", "-A", fs)
o, err := cmd.CombinedOutput() o, err := cmd.CombinedOutput()
if err != nil { if err != nil {
if bytes.Contains(o, []byte("does not have any resumable receive state to abort")) { if bytes.Contains(o, []byte("does not have any resumable receive state to abort")) {
@ -1280,18 +1279,11 @@ func zfsSet(ctx context.Context, path string, props *ZFSProperties) (err error)
} }
args = append(args, path) args = append(args, path)
cmd := exec.CommandContext(ctx, ZFS_BINARY, args...) cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
stdio, err := cmd.CombinedOutput()
stderr := bytes.NewBuffer(make([]byte, 0, 1024)) if err != nil {
cmd.Stderr = stderr
if err = cmd.Start(); err != nil {
return err
}
if err = cmd.Wait(); err != nil {
err = &ZFSError{ err = &ZFSError{
Stderr: stderr.Bytes(), Stderr: stdio,
WaitErr: err, WaitErr: err,
} }
} }
@ -1414,7 +1406,7 @@ func (s zfsPropertySource) zfsGetSourceFieldPrefixes() []string {
func zfsGet(ctx context.Context, path string, props []string, allowedSources zfsPropertySource) (*ZFSProperties, error) { func zfsGet(ctx context.Context, path string, props []string, allowedSources zfsPropertySource) (*ZFSProperties, error) {
args := []string{"get", "-Hp", "-o", "property,value,source", strings.Join(props, ","), path} args := []string{"get", "-Hp", "-o", "property,value,source", strings.Join(props, ","), path}
cmd := exec.CommandContext(ctx, ZFS_BINARY, args...) cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
stdout, err := cmd.Output() stdout, err := cmd.Output()
if err != nil { if err != nil {
if exitErr, ok := err.(*exec.ExitError); ok { if exitErr, ok := err.(*exec.ExitError); ok {
@ -1576,28 +1568,21 @@ func ZFSDestroy(ctx context.Context, arg string) (err error) {
defer prometheus.NewTimer(prom.ZFSDestroyDuration.WithLabelValues(dstype, filesystem)) defer prometheus.NewTimer(prom.ZFSDestroyDuration.WithLabelValues(dstype, filesystem))
cmd := exec.CommandContext(ctx, ZFS_BINARY, "destroy", arg) cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, "destroy", arg)
stdio, err := cmd.CombinedOutput()
var stderr bytes.Buffer if err != nil {
cmd.Stderr = &stderr
if err = cmd.Start(); err != nil {
return err
}
if err = cmd.Wait(); err != nil {
err = &ZFSError{ err = &ZFSError{
Stderr: stderr.Bytes(), Stderr: stdio,
WaitErr: err, WaitErr: err,
} }
if destroyOneOrMoreSnapshotsNoneExistedErrorRegexp.Match(stderr.Bytes()) { if destroyOneOrMoreSnapshotsNoneExistedErrorRegexp.Match(stdio) {
err = &DatasetDoesNotExist{arg} err = &DatasetDoesNotExist{arg}
} else if match := destroyBookmarkDoesNotExist.FindStringSubmatch(stderr.String()); match != nil && match[1] == arg { } else if match := destroyBookmarkDoesNotExist.FindStringSubmatch(string(stdio)); match != nil && match[1] == arg {
err = &DatasetDoesNotExist{arg} err = &DatasetDoesNotExist{arg}
} else if dsNotExistErr := tryDatasetDoesNotExist(filesystem, stderr.Bytes()); dsNotExistErr != nil { } else if dsNotExistErr := tryDatasetDoesNotExist(filesystem, stdio); dsNotExistErr != nil {
err = dsNotExistErr err = dsNotExistErr
} else if dserr := tryParseDestroySnapshotsError(arg, stderr.Bytes()); dserr != nil { } else if dserr := tryParseDestroySnapshotsError(arg, stdio); dserr != nil {
err = dserr err = dserr
} }
@ -1624,18 +1609,12 @@ func ZFSSnapshot(ctx context.Context, fs *DatasetPath, name string, recursive bo
if err := EntityNamecheck(snapname, EntityTypeSnapshot); err != nil { if err := EntityNamecheck(snapname, EntityTypeSnapshot); err != nil {
return errors.Wrap(err, "zfs snapshot") return errors.Wrap(err, "zfs snapshot")
} }
cmd := exec.CommandContext(ctx, ZFS_BINARY, "snapshot", snapname)
stderr := bytes.NewBuffer(make([]byte, 0, 1024)) cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, "snapshot", snapname)
cmd.Stderr = stderr stdio, err := cmd.CombinedOutput()
if err != nil {
if err = cmd.Start(); err != nil {
return err
}
if err = cmd.Wait(); err != nil {
err = &ZFSError{ err = &ZFSError{
Stderr: stderr.Bytes(), Stderr: stdio,
WaitErr: err, WaitErr: err,
} }
} }
@ -1687,20 +1666,12 @@ func ZFSBookmark(ctx context.Context, fs string, v ZFSSendArgVersion, bookmark s
debug("bookmark: %q %q", snapname, bookmarkname) debug("bookmark: %q %q", snapname, bookmarkname)
cmd := exec.CommandContext(ctx, ZFS_BINARY, "bookmark", snapname, bookmarkname) cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, "bookmark", snapname, bookmarkname)
stdio, err := cmd.CombinedOutput()
stderr := bytes.NewBuffer(make([]byte, 0, 1024)) if err != nil {
cmd.Stderr = stderr if ddne := tryDatasetDoesNotExist(snapname, stdio); err != nil {
if err = cmd.Start(); err != nil {
return err
}
if err = cmd.Wait(); err != nil {
if ddne := tryDatasetDoesNotExist(snapname, stderr.Bytes()); err != nil {
return ddne return ddne
} else if zfsBookmarkExistsRegex.Match(stderr.Bytes()) { } else if zfsBookmarkExistsRegex.Match(stdio) {
// check if this was idempotent // check if this was idempotent
bookGuid, err := ZFSGetGUID(ctx, fs, "#"+bookmark) bookGuid, err := ZFSGetGUID(ctx, fs, "#"+bookmark)
@ -1714,13 +1685,13 @@ func ZFSBookmark(ctx context.Context, fs string, v ZFSSendArgVersion, bookmark s
} }
return &BookmarkExists{ return &BookmarkExists{
fs: fs, bookmarkOrigin: v, bookmark: bookmark, fs: fs, bookmarkOrigin: v, bookmark: bookmark,
zfsMsg: stderr.String(), zfsMsg: string(stdio),
bookGuid: bookGuid, bookGuid: bookGuid,
} }
} else { } else {
return &ZFSError{ return &ZFSError{
Stderr: stderr.Bytes(), Stderr: stdio,
WaitErr: err, WaitErr: err,
} }
} }
@ -1742,18 +1713,11 @@ func ZFSRollback(ctx context.Context, fs *DatasetPath, snapshot FilesystemVersio
args = append(args, rollbackArgs...) args = append(args, rollbackArgs...)
args = append(args, snapabs) args = append(args, snapabs)
cmd := exec.CommandContext(ctx, ZFS_BINARY, args...) cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
stdio, err := cmd.CombinedOutput()
stderr := bytes.NewBuffer(make([]byte, 0, 1024)) if err != nil {
cmd.Stderr = stderr
if err = cmd.Start(); err != nil {
return err
}
if err = cmd.Wait(); err != nil {
err = &ZFSError{ err = &ZFSError{
Stderr: stderr.Bytes(), Stderr: stdio,
WaitErr: err, WaitErr: err,
} }
} }

122
zfs/zfscmd/zfscmd.go Normal file
View File

@ -0,0 +1,122 @@
// Package zfscmd provides a wrapper around packate os/exec.
// Functionality provided by the wrapper:
// - logging start and end of command execution
// - status report of active commands
// - prometheus metrics of runtimes
package zfscmd
import (
"context"
"io"
"os"
"os/exec"
"strings"
"sync"
"time"
"github.com/zrepl/zrepl/util/circlog"
)
type Cmd struct {
cmd *exec.Cmd
ctx context.Context
mtx sync.RWMutex
startedAt, waitReturnedAt time.Time
}
func CommandContext(ctx context.Context, name string, arg ...string) *Cmd {
cmd := exec.CommandContext(ctx, name, arg...)
return &Cmd{cmd: cmd, ctx: ctx}
}
// err.(*exec.ExitError).Stderr will NOT be set
func (c *Cmd) CombinedOutput() (o []byte, err error) {
o, err = c.cmd.CombinedOutput()
return
}
// err.(*exec.ExitError).Stderr will be set
func (c *Cmd) Output() (o []byte, err error) {
o, err = c.cmd.Output()
return
}
// Careful: err.(*exec.ExitError).Stderr will not be set, even if you don't open an StderrPipe
func (c *Cmd) StdoutPipeWithErrorBuf() (p io.ReadCloser, errBuf *circlog.CircularLog, err error) {
p, err = c.cmd.StdoutPipe()
errBuf = circlog.MustNewCircularLog(1 << 15)
c.cmd.Stderr = errBuf
return p, errBuf, err
}
type Stdio struct {
Stdin io.ReadCloser
Stdout io.Writer
Stderr io.Writer
}
func (c *Cmd) SetStdio(stdio Stdio) {
c.cmd.Stdin = stdio.Stdin
c.cmd.Stderr = stdio.Stderr
c.cmd.Stdout = stdio.Stdout
}
func (c *Cmd) String() string {
return strings.Join(c.cmd.Args, " ") // includes argv[0] if initialized with CommandContext, that's the only way we o it
}
func (c *Cmd) log() Logger {
return getLogger(c.ctx).WithField("cmd", c.String())
}
func (c *Cmd) Start() (err error) {
startPreLogging(c, time.Now())
err = c.cmd.Start()
now := time.Now()
c.mtx.Lock()
c.startedAt = now
c.mtx.Unlock()
startPostReport(c, err, now)
startPostLogging(c, err, now)
return err
}
// only call this after a successful call to .Start()
func (c *Cmd) Process() *os.Process {
if c.startedAt.IsZero() {
panic("calling Process() only allowed after successful call to Start()")
}
return c.cmd.Process
}
func (c *Cmd) Wait() (err error) {
waitPreLogging(c, time.Now())
err = c.cmd.Wait()
now := time.Now()
if !c.waitReturnedAt.IsZero() {
// ignore duplicate waits
return
}
c.mtx.Lock()
c.waitReturnedAt = now
c.mtx.Unlock()
waitPostReport(c, now)
waitPostLogging(c, err, now)
waitPostPrometheus(c, err, now)
return err
}
// returns 0 if the command did not yet finish
func (c *Cmd) Runtime() time.Duration {
if c.waitReturnedAt.IsZero() {
return 0
}
return c.waitReturnedAt.Sub(c.startedAt)
}

View File

@ -0,0 +1,39 @@
package zfscmd
import (
"context"
"github.com/zrepl/zrepl/logger"
)
type contextKey int
const (
contextKeyLogger contextKey = iota
contextKeyJobID
)
type Logger = logger.Logger
func WithJobID(ctx context.Context, jobID string) context.Context {
return context.WithValue(ctx, contextKeyJobID, jobID)
}
func getJobIDOrDefault(ctx context.Context, def string) string {
ret, ok := ctx.Value(contextKeyJobID).(string)
if !ok {
return def
}
return ret
}
func WithLogger(ctx context.Context, log Logger) context.Context {
return context.WithValue(ctx, contextKeyLogger, log)
}
func getLogger(ctx context.Context) Logger {
if l, ok := ctx.Value(contextKeyLogger).(Logger); ok {
return l
}
return logger.NewNullLogger()
}

View File

@ -0,0 +1,40 @@
package zfscmd
import (
"time"
)
// Implementation Note:
//
// Pre-events logged with debug
// Post-event without error logged with info
// Post-events with error logged at error level
func startPreLogging(c *Cmd, now time.Time) {
c.log().Debug("starting command")
}
func startPostLogging(c *Cmd, err error, now time.Time) {
if err == nil {
c.log().Info("started command")
} else {
c.log().WithError(err).Error("cannot start command")
}
}
func waitPreLogging(c *Cmd, now time.Time) {
c.log().Debug("start waiting")
}
func waitPostLogging(c *Cmd, err error, now time.Time) {
log := c.log().
WithField("total_time_s", c.Runtime().Seconds()).
WithField("systemtime_s", c.cmd.ProcessState.SystemTime().Seconds()).
WithField("usertime_s", c.cmd.ProcessState.UserTime().Seconds())
if err == nil {
log.Info("command exited without error")
} else {
log.WithError(err).Error("command exited with error")
}
}

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
set -euo pipefail
echo "to stderr" 1>&2
echo "to stdout"
exit "$1"

View File

@ -0,0 +1,60 @@
package zfscmd
import (
"bytes"
"io"
"os/exec"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const testBin = "./zfscmd_platform_test.bash"
func TestCmdStderrBehaviorOutput(t *testing.T) {
stdout, err := exec.Command(testBin, "0").Output()
require.NoError(t, err)
require.Equal(t, []byte("to stdout\n"), stdout)
stdout, err = exec.Command(testBin, "1").Output()
assert.Equal(t, []byte("to stdout\n"), stdout)
require.Error(t, err)
ee, ok := err.(*exec.ExitError)
require.True(t, ok)
require.Equal(t, ee.Stderr, []byte("to stderr\n"))
}
func TestCmdStderrBehaviorCombinedOutput(t *testing.T) {
stdio, err := exec.Command(testBin, "0").CombinedOutput()
require.NoError(t, err)
require.Equal(t, "to stderr\nto stdout\n", string(stdio))
stdio, err = exec.Command(testBin, "1").CombinedOutput()
require.Equal(t, "to stderr\nto stdout\n", string(stdio))
require.Error(t, err)
ee, ok := err.(*exec.ExitError)
require.True(t, ok)
require.Empty(t, ee.Stderr) // !!!! maybe not what one would expect
}
func TestCmdStderrBehaviorStdoutPipe(t *testing.T) {
cmd := exec.Command(testBin, "1")
stdoutPipe, err := cmd.StdoutPipe()
require.NoError(t, err)
err = cmd.Start()
require.NoError(t, err)
defer cmd.Wait()
var stdout bytes.Buffer
_, err = io.Copy(&stdout, stdoutPipe)
require.NoError(t, err)
require.Equal(t, "to stdout\n", stdout.String())
err = cmd.Wait()
require.Error(t, err)
ee, ok := err.(*exec.ExitError)
require.True(t, ok)
require.Empty(t, ee.Stderr) // !!!!! probably not what one would expect if we only redirect stdout
}

View File

@ -0,0 +1,73 @@
package zfscmd
import (
"time"
"github.com/prometheus/client_golang/prometheus"
)
var metrics struct {
totaltime *prometheus.HistogramVec
systemtime *prometheus.HistogramVec
usertime *prometheus.HistogramVec
}
var timeLabels = []string{"jobid", "zfsbinary", "zfsverb"}
var timeBuckets = []float64{0.01, 0.1, 0.2, 0.5, 0.75, 1, 2, 5, 10, 60}
func init() {
metrics.totaltime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "zrepl",
Subsystem: "zfscmd",
Name: "runtime",
Help: "number of seconds that the command took from start until wait returned",
Buckets: timeBuckets,
}, timeLabels)
metrics.systemtime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "zrepl",
Subsystem: "zfscmd",
Name: "systemtime",
Help: "https://golang.org/pkg/os/#ProcessState.SystemTime",
Buckets: timeBuckets,
}, timeLabels)
metrics.usertime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "zrepl",
Subsystem: "zfscmd",
Name: "usertime",
Help: "https://golang.org/pkg/os/#ProcessState.UserTime",
Buckets: timeBuckets,
}, timeLabels)
}
func RegisterMetrics(r prometheus.Registerer) {
r.MustRegister(metrics.totaltime)
r.MustRegister(metrics.systemtime)
r.MustRegister(metrics.usertime)
}
func waitPostPrometheus(c *Cmd, err error, now time.Time) {
if len(c.cmd.Args) < 2 {
getLogger(c.ctx).WithField("args", c.cmd.Args).
Warn("prometheus: cannot turn zfs command into metric")
return
}
// Note: do not start parsing other aspects
// of the ZFS command line. This is not the suitable layer
// for such a task.
jobid := getJobIDOrDefault(c.ctx, "_nojobid")
labelValues := []string{jobid, c.cmd.Args[0], c.cmd.Args[1]}
metrics.totaltime.
WithLabelValues(labelValues...).
Observe(c.Runtime().Seconds())
metrics.systemtime.WithLabelValues(labelValues...).
Observe(c.cmd.ProcessState.SystemTime().Seconds())
metrics.usertime.WithLabelValues(labelValues...).
Observe(c.cmd.ProcessState.UserTime().Seconds())
}

View File

@ -0,0 +1,68 @@
package zfscmd
import (
"fmt"
"sync"
"time"
)
type Report struct {
Active []ActiveCommand
}
type ActiveCommand struct {
Path string
Args []string
StartedAt time.Time
}
func GetReport() *Report {
active.mtx.RLock()
defer active.mtx.RUnlock()
var activeCommands []ActiveCommand
for c := range active.cmds {
c.mtx.RLock()
activeCommands = append(activeCommands, ActiveCommand{
Path: c.cmd.Path,
Args: c.cmd.Args,
StartedAt: c.startedAt,
})
c.mtx.RUnlock()
}
return &Report{
Active: activeCommands,
}
}
var active struct {
mtx sync.RWMutex
cmds map[*Cmd]bool
}
func init() {
active.cmds = make(map[*Cmd]bool)
}
func startPostReport(c *Cmd, err error, now time.Time) {
if err != nil {
return
}
active.mtx.Lock()
prev := active.cmds[c]
if prev {
panic("impl error: duplicate active command")
}
active.cmds[c] = true
active.mtx.Unlock()
}
func waitPostReport(c *Cmd, now time.Time) {
active.mtx.Lock()
defer active.mtx.Unlock()
prev := active.cmds[c]
if !prev {
panic(fmt.Sprintf("impl error: onWaitDone must only be called on an active command: %s", c))
}
delete(active.cmds, c)
}