diff --git a/client/status.go b/client/status.go index 6fef777..25ff014 100644 --- a/client/status.go +++ b/client/status.go @@ -81,7 +81,7 @@ type tui struct { indent int lock sync.Mutex //For report and error - report map[string]job.Status + report map[string]*job.Status err error jobFilter string @@ -219,7 +219,7 @@ func runStatus(s *cli.Subcommand, args []string) error { defer termbox.Close() update := func() { - m := make(map[string]job.Status) + var m daemon.Status err2 := jsonRequestResponse(httpc, daemon.ControlJobEndpointStatus, struct{}{}, @@ -228,7 +228,7 @@ func runStatus(s *cli.Subcommand, args []string) error { t.lock.Lock() t.err = err2 - t.report = m + t.report = m.Jobs t.lock.Unlock() t.draw() } diff --git a/daemon/control.go b/daemon/control.go index f987ff9..7e84b6f 100644 --- a/daemon/control.go +++ b/daemon/control.go @@ -20,6 +20,7 @@ import ( "github.com/zrepl/zrepl/util/envconst" "github.com/zrepl/zrepl/version" "github.com/zrepl/zrepl/zfs" + "github.com/zrepl/zrepl/zfs/zfscmd" ) type controlJob struct { @@ -117,7 +118,9 @@ func (j *controlJob) Run(ctx context.Context) { mux.Handle(ControlJobEndpointStatus, // don't log requests to status endpoint, too spammy jsonResponder{log, func() (interface{}, error) { - s := j.jobs.status() + jobs := j.jobs.status() + globalZFS := zfscmd.GetReport() + s := Status{Jobs: jobs, Global: GlobalStatus{ZFSCmds: globalZFS}} return s, nil }}) diff --git a/daemon/daemon.go b/daemon/daemon.go index 265ddc4..dc2304f 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -20,6 +20,7 @@ import ( "github.com/zrepl/zrepl/daemon/logging" "github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/version" + "github.com/zrepl/zrepl/zfs/zfscmd" ) func Run(conf *config.Config) error { @@ -81,6 +82,9 @@ func Run(conf *config.Config) error { jobs.start(ctx, job, true) } + // register global (=non job-local) metrics + zfscmd.RegisterMetrics(prometheus.DefaultRegisterer) + log.Info("starting daemon") // start regular jobs @@ -128,6 +132,15 @@ func (s *jobs) wait() <-chan struct{} { return ch } +type Status struct { + Jobs map[string]*job.Status + Global GlobalStatus +} + +type GlobalStatus struct { + ZFSCmds *zfscmd.Report +} + func (s *jobs) status() map[string]*job.Status { s.m.RLock() defer s.m.RUnlock() @@ -207,6 +220,7 @@ func (s *jobs) start(ctx context.Context, j job.Job, internal bool) { s.jobs[jobName] = j ctx = job.WithLogger(ctx, jobLog) + ctx = zfscmd.WithJobID(ctx, j.Name()) ctx, wakeup := wakeup.Context(ctx) ctx, resetFunc := reset.Context(ctx) s.wakeups[jobName] = wakeup diff --git a/daemon/logging/build_logging.go b/daemon/logging/build_logging.go index 7b46d59..1216a11 100644 --- a/daemon/logging/build_logging.go +++ b/daemon/logging/build_logging.go @@ -22,6 +22,7 @@ import ( "github.com/zrepl/zrepl/rpc/transportmux" "github.com/zrepl/zrepl/tlsconf" "github.com/zrepl/zrepl/transport" + "github.com/zrepl/zrepl/zfs/zfscmd" ) func OutletsFromConfig(in config.LoggingOutletEnumList) (*logger.Outlets, error) { @@ -79,6 +80,7 @@ const ( SubsysRPC Subsystem = "rpc" SubsysRPCControl Subsystem = "rpc.ctrl" SubsysRPCData Subsystem = "rpc.data" + SubsysZFSCmd Subsystem = "zfs.cmd" ) func WithSubsystemLoggers(ctx context.Context, log logger.Logger) context.Context { @@ -90,6 +92,7 @@ func WithSubsystemLoggers(ctx context.Context, log logger.Logger) context.Contex ctx = hooks.WithLogger(ctx, log.WithField(SubsysField, SubsysHooks)) ctx = transport.WithLogger(ctx, log.WithField(SubsysField, SubsysTransport)) ctx = transportmux.WithLogger(ctx, log.WithField(SubsysField, SubsysTransportMux)) + ctx = zfscmd.WithLogger(ctx, log.WithField(SubsysField, SubsysZFSCmd)) ctx = rpc.WithLoggers(ctx, rpc.Loggers{ General: log.WithField(SubsysField, SubsysRPC), diff --git a/zfs/encryption.go b/zfs/encryption.go index 65ef1ab..8da07fc 100644 --- a/zfs/encryption.go +++ b/zfs/encryption.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" "github.com/zrepl/zrepl/util/envconst" + "github.com/zrepl/zrepl/zfs/zfscmd" ) var encryptionCLISupport struct { @@ -21,7 +22,7 @@ var encryptionCLISupport struct { func EncryptionCLISupported(ctx context.Context) (bool, error) { encryptionCLISupport.once.Do(func() { // "feature discovery" - cmd := exec.CommandContext(ctx, "zfs", "load-key") + cmd := zfscmd.CommandContext(ctx, "zfs", "load-key") output, err := cmd.CombinedOutput() if ee, ok := err.(*exec.ExitError); !ok || ok && !ee.Exited() { encryptionCLISupport.err = errors.Wrap(err, "native encryption cli support feature check failed") diff --git a/zfs/holds.go b/zfs/holds.go index 89759a5..d5bb1f2 100644 --- a/zfs/holds.go +++ b/zfs/holds.go @@ -6,7 +6,6 @@ import ( "context" "fmt" "os" - "os/exec" "sort" "strconv" "strings" @@ -15,6 +14,7 @@ import ( "github.com/pkg/errors" "github.com/zrepl/zrepl/util/envconst" + "github.com/zrepl/zrepl/zfs/zfscmd" ) // no need for feature tests, holds have been around forever @@ -48,7 +48,7 @@ func ZFSHold(ctx context.Context, fs string, v ZFSSendArgVersion, tag string) er return err } fullPath := v.FullPath(fs) - output, err := exec.CommandContext(ctx, "zfs", "hold", tag, fullPath).CombinedOutput() + output, err := zfscmd.CommandContext(ctx, "zfs", "hold", tag, fullPath).CombinedOutput() if err != nil { if bytes.Contains(output, []byte("tag already exists on this dataset")) { goto success @@ -67,7 +67,7 @@ func ZFSHolds(ctx context.Context, fs, snap string) ([]string, error) { return nil, fmt.Errorf("`snap` must not be empty") } dp := fmt.Sprintf("%s@%s", fs, snap) - output, err := exec.CommandContext(ctx, "zfs", "holds", "-H", dp).CombinedOutput() + output, err := zfscmd.CommandContext(ctx, "zfs", "holds", "-H", dp).CombinedOutput() if err != nil { return nil, &ZFSError{output, errors.Wrap(err, "zfs holds failed")} } @@ -104,11 +104,13 @@ func ZFSRelease(ctx context.Context, tag string, snaps ...string) error { } args := []string{"release", tag} args = append(args, snaps[i:j]...) - output, err := exec.CommandContext(ctx, "zfs", args...).CombinedOutput() + output, err := zfscmd.CommandContext(ctx, "zfs", args...).CombinedOutput() if pe, ok := err.(*os.PathError); err != nil && ok && pe.Err == syscall.E2BIG { maxInvocationLen = maxInvocationLen / 2 continue } + // further error handling part of error scraper below + maxInvocationLen = maxInvocationLen + os.Getpagesize() i = j @@ -166,7 +168,7 @@ func doZFSReleaseAllOlderAndIncOrExcludingGUID(ctx context.Context, fs string, s return fmt.Errorf("`tag` must not be empty`") } - output, err := exec.CommandContext(ctx, + output, err := zfscmd.CommandContext(ctx, "zfs", "list", "-o", "type,name,createtxg,guid,userrefs", "-H", "-t", "snapshot,bookmark", "-r", "-d", "1", fs).CombinedOutput() if err != nil { diff --git a/zfs/placeholder.go b/zfs/placeholder.go index db00ab3..0ffffa8 100644 --- a/zfs/placeholder.go +++ b/zfs/placeholder.go @@ -1,12 +1,12 @@ package zfs import ( - "bytes" "context" "crypto/sha512" "encoding/hex" "fmt" - "os/exec" + + "github.com/zrepl/zrepl/zfs/zfscmd" ) const ( @@ -82,21 +82,15 @@ func ZFSCreatePlaceholderFilesystem(ctx context.Context, p *DatasetPath) (err er if p.Length() == 1 { return fmt.Errorf("cannot create %q: pools cannot be created with zfs create", p.ToString()) } - cmd := exec.CommandContext(ctx, ZFS_BINARY, "create", + cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, "create", "-o", fmt.Sprintf("%s=%s", PlaceholderPropertyName, placeholderPropertyOn), "-o", "mountpoint=none", p.ToString()) - stderr := bytes.NewBuffer(make([]byte, 0, 1024)) - cmd.Stderr = stderr - - if err = cmd.Start(); err != nil { - return err - } - - if err = cmd.Wait(); err != nil { + stdio, err := cmd.CombinedOutput() + if err != nil { err = &ZFSError{ - Stderr: stderr.Bytes(), + Stderr: stdio, WaitErr: err, } } diff --git a/zfs/resume_token.go b/zfs/resume_token.go index 91f9299..5eb60cf 100644 --- a/zfs/resume_token.go +++ b/zfs/resume_token.go @@ -13,6 +13,7 @@ import ( "github.com/pkg/errors" "github.com/zrepl/zrepl/util/envconst" + "github.com/zrepl/zrepl/zfs/zfscmd" ) // NOTE: Update ZFSSendARgs.Validate when changing fields (potentially SECURITY SENSITIVE) @@ -42,7 +43,7 @@ var resumeSendSupportedCheck struct { func ResumeSendSupported(ctx context.Context) (bool, error) { resumeSendSupportedCheck.once.Do(func() { // "feature discovery" - cmd := exec.CommandContext(ctx, "zfs", "send") + cmd := zfscmd.CommandContext(ctx, "zfs", "send") output, err := cmd.CombinedOutput() if ee, ok := err.(*exec.ExitError); !ok || ok && !ee.Exited() { resumeSendSupportedCheck.err = errors.Wrap(err, "resumable send cli support feature check failed") @@ -86,7 +87,7 @@ func ResumeRecvSupported(ctx context.Context, fs *DatasetPath) (bool, error) { } if !sup.flagSupport.checked { - output, err := exec.CommandContext(ctx, "zfs", "receive").CombinedOutput() + output, err := zfscmd.CommandContext(ctx, ZFS_BINARY, "receive").CombinedOutput() upgradeWhile(func() { sup.flagSupport.checked = true if ee, ok := err.(*exec.ExitError); err != nil && (!ok || ok && !ee.Exited()) { @@ -124,7 +125,7 @@ func ResumeRecvSupported(ctx context.Context, fs *DatasetPath) (bool, error) { if poolSup, ok = sup.poolSupported[pool]; !ok || // shadow (!poolSup.supported && time.Since(poolSup.lastCheck) > resumeRecvPoolSupportRecheckTimeout) { - output, err := exec.CommandContext(ctx, "zpool", "get", "-H", "-p", "-o", "value", "feature@extensible_dataset", pool).CombinedOutput() + output, err := zfscmd.CommandContext(ctx, "zpool", "get", "-H", "-p", "-o", "value", "feature@extensible_dataset", pool).CombinedOutput() if err != nil { debug("resume recv pool support check result: %#v", sup.flagSupport) poolSup.supported = false @@ -181,7 +182,7 @@ func ParseResumeToken(ctx context.Context, token string) (*ResumeToken, error) { // toname = pool1/test@b //cannot resume send: 'pool1/test@b' used in the initial send no longer exists - cmd := exec.CommandContext(ctx, ZFS_BINARY, "send", "-nvt", string(token)) + cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, "send", "-nvt", string(token)) output, err := cmd.CombinedOutput() if err != nil { if exitErr, ok := err.(*exec.ExitError); ok { diff --git a/zfs/versions_destroy.go b/zfs/versions_destroy.go index 7e6b513..ecc1908 100644 --- a/zfs/versions_destroy.go +++ b/zfs/versions_destroy.go @@ -11,6 +11,7 @@ import ( "syscall" "github.com/zrepl/zrepl/util/envconst" + "github.com/zrepl/zrepl/zfs/zfscmd" ) func ZFSDestroyFilesystemVersion(ctx context.Context, filesystem *DatasetPath, version *FilesystemVersion) (err error) { @@ -224,7 +225,7 @@ var batchDestroyFeatureCheck struct { func (d destroyerImpl) DestroySnapshotsCommaSyntaxSupported(ctx context.Context) (bool, error) { batchDestroyFeatureCheck.once.Do(func() { // "feature discovery" - cmd := exec.CommandContext(ctx, ZFS_BINARY, "destroy") + cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, "destroy") output, err := cmd.CombinedOutput() if _, ok := err.(*exec.ExitError); !ok { debug("destroy feature check failed: %T %s", err, err) diff --git a/zfs/zfs.go b/zfs/zfs.go index 338fcba..67ea375 100644 --- a/zfs/zfs.go +++ b/zfs/zfs.go @@ -21,6 +21,7 @@ import ( "github.com/zrepl/zrepl/util/circlog" "github.com/zrepl/zrepl/util/envconst" + "github.com/zrepl/zrepl/zfs/zfscmd" ) var ( @@ -172,13 +173,9 @@ func ZFSList(ctx context.Context, properties []string, zfsArgs ...string) (res [ "-o", strings.Join(properties, ",")) args = append(args, zfsArgs...) - cmd := exec.CommandContext(ctx, ZFS_BINARY, args...) - - var stdout io.Reader - stderr := bytes.NewBuffer(make([]byte, 0, 1024)) - cmd.Stderr = stderr - - if stdout, err = cmd.StdoutPipe(); err != nil { + cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...) + stdout, stderrBuf, err := cmd.StdoutPipeWithErrorBuf() + if err != nil { return } @@ -205,7 +202,7 @@ func ZFSList(ctx context.Context, properties []string, zfsArgs ...string) (res [ if waitErr := cmd.Wait(); waitErr != nil { err := &ZFSError{ - Stderr: stderr.Bytes(), + Stderr: stderrBuf.Bytes(), WaitErr: waitErr, } return nil, err @@ -244,15 +241,12 @@ func ZFSListChan(ctx context.Context, out chan ZFSListResult, properties []strin } } - cmd := exec.CommandContext(ctx, ZFS_BINARY, args...) - stdout, err := cmd.StdoutPipe() + cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...) + stdout, stderrBuf, err := cmd.StdoutPipeWithErrorBuf() if err != nil { sendResult(nil, err) return } - // TODO bounded buffer - stderr := bytes.NewBuffer(make([]byte, 0, 1024)) - cmd.Stderr = stderr if err = cmd.Start(); err != nil { sendResult(nil, err) return @@ -280,7 +274,7 @@ func ZFSListChan(ctx context.Context, out chan ZFSListResult, properties []strin if err := cmd.Wait(); err != nil { if err, ok := err.(*exec.ExitError); ok { sendResult(nil, &ZFSError{ - Stderr: stderr.Bytes(), + Stderr: stderrBuf.Bytes(), WaitErr: err, }) } else { @@ -419,7 +413,7 @@ func pipeWithCapacityHint(capacity int) (r, w *os.File, err error) { } type sendStream struct { - cmd *exec.Cmd + cmd *zfscmd.Cmd kill context.CancelFunc closeMtx sync.Mutex @@ -836,7 +830,6 @@ func ZFSSend(ctx context.Context, sendArgs ZFSSendArgs) (*ReadCloserCopier, erro args = append(args, sargs...) ctx, cancel := context.WithCancel(ctx) - cmd := exec.CommandContext(ctx, ZFS_BINARY, args...) // setup stdout with an os.Pipe to control pipe buffer size stdoutReader, stdoutWriter, err := pipeWithCapacityHint(ZFSSendPipeCapacityHint) @@ -844,11 +837,14 @@ func ZFSSend(ctx context.Context, sendArgs ZFSSendArgs) (*ReadCloserCopier, erro cancel() return nil, err } - - cmd.Stdout = stdoutWriter - stderrBuf := circlog.MustNewCircularLog(zfsSendStderrCaptureMaxSize) - cmd.Stderr = stderrBuf + + cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...) + cmd.SetStdio(zfscmd.Stdio{ + Stdin: nil, + Stdout: stdoutWriter, + Stderr: stderrBuf, + }) if err := cmd.Start(); err != nil { cancel() @@ -856,6 +852,7 @@ func ZFSSend(ctx context.Context, sendArgs ZFSSendArgs) (*ReadCloserCopier, erro stdoutReader.Close() return nil, errors.Wrap(err, "cannot start zfs send command") } + // close our writing-end of the pipe so that we don't wait for ourselves when reading from the reading end stdoutWriter.Close() stream := &sendStream{ @@ -996,7 +993,7 @@ func ZFSSendDry(ctx context.Context, sendArgs ZFSSendArgs) (_ *DrySendInfo, err } args = append(args, sargs...) - cmd := exec.CommandContext(ctx, ZFS_BINARY, args...) + cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...) output, err := cmd.CombinedOutput() if err != nil { return nil, &ZFSError{output, err} @@ -1115,24 +1112,26 @@ func ZFSRecv(ctx context.Context, fs string, v *ZFSSendArgVersion, streamCopier ctx, cancelCmd := context.WithCancel(ctx) defer cancelCmd() - cmd := exec.CommandContext(ctx, ZFS_BINARY, args...) - - stderr := bytes.NewBuffer(make([]byte, 0, 1024)) - cmd.Stderr = stderr + cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...) // TODO report bug upstream // Setup an unused stdout buffer. // Otherwise, ZoL v0.6.5.9-1 3.16.0-4-amd64 writes the following error to stderr and exits with code 1 // cannot receive new filesystem stream: invalid backup stream stdout := bytes.NewBuffer(make([]byte, 0, 1024)) - cmd.Stdout = stdout + + stderr := bytes.NewBuffer(make([]byte, 0, 1024)) stdin, stdinWriter, err := pipeWithCapacityHint(ZFSRecvPipeCapacityHint) if err != nil { return err } - cmd.Stdin = stdin + cmd.SetStdio(zfscmd.Stdio{ + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + }) if err = cmd.Start(); err != nil { stdinWriter.Close() @@ -1142,7 +1141,7 @@ func ZFSRecv(ctx context.Context, fs string, v *ZFSSendArgVersion, streamCopier stdin.Close() defer stdinWriter.Close() - pid := cmd.Process.Pid + pid := cmd.Process().Pid debug := func(format string, args ...interface{}) { debug("recv: pid=%v: %s", pid, fmt.Sprintf(format, args...)) } @@ -1230,7 +1229,7 @@ func ZFSRecvClearResumeToken(ctx context.Context, fs string) (err error) { return err } - cmd := exec.CommandContext(ctx, ZFS_BINARY, "recv", "-A", fs) + cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, "recv", "-A", fs) o, err := cmd.CombinedOutput() if err != nil { if bytes.Contains(o, []byte("does not have any resumable receive state to abort")) { @@ -1280,18 +1279,11 @@ func zfsSet(ctx context.Context, path string, props *ZFSProperties) (err error) } args = append(args, path) - cmd := exec.CommandContext(ctx, ZFS_BINARY, args...) - - stderr := bytes.NewBuffer(make([]byte, 0, 1024)) - cmd.Stderr = stderr - - if err = cmd.Start(); err != nil { - return err - } - - if err = cmd.Wait(); err != nil { + cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...) + stdio, err := cmd.CombinedOutput() + if err != nil { err = &ZFSError{ - Stderr: stderr.Bytes(), + Stderr: stdio, WaitErr: err, } } @@ -1414,7 +1406,7 @@ func (s zfsPropertySource) zfsGetSourceFieldPrefixes() []string { func zfsGet(ctx context.Context, path string, props []string, allowedSources zfsPropertySource) (*ZFSProperties, error) { args := []string{"get", "-Hp", "-o", "property,value,source", strings.Join(props, ","), path} - cmd := exec.CommandContext(ctx, ZFS_BINARY, args...) + cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...) stdout, err := cmd.Output() if err != nil { if exitErr, ok := err.(*exec.ExitError); ok { @@ -1576,28 +1568,21 @@ func ZFSDestroy(ctx context.Context, arg string) (err error) { defer prometheus.NewTimer(prom.ZFSDestroyDuration.WithLabelValues(dstype, filesystem)) - cmd := exec.CommandContext(ctx, ZFS_BINARY, "destroy", arg) - - var stderr bytes.Buffer - cmd.Stderr = &stderr - - if err = cmd.Start(); err != nil { - return err - } - - if err = cmd.Wait(); err != nil { + cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, "destroy", arg) + stdio, err := cmd.CombinedOutput() + if err != nil { err = &ZFSError{ - Stderr: stderr.Bytes(), + Stderr: stdio, WaitErr: err, } - if destroyOneOrMoreSnapshotsNoneExistedErrorRegexp.Match(stderr.Bytes()) { + if destroyOneOrMoreSnapshotsNoneExistedErrorRegexp.Match(stdio) { err = &DatasetDoesNotExist{arg} - } else if match := destroyBookmarkDoesNotExist.FindStringSubmatch(stderr.String()); match != nil && match[1] == arg { + } else if match := destroyBookmarkDoesNotExist.FindStringSubmatch(string(stdio)); match != nil && match[1] == arg { err = &DatasetDoesNotExist{arg} - } else if dsNotExistErr := tryDatasetDoesNotExist(filesystem, stderr.Bytes()); dsNotExistErr != nil { + } else if dsNotExistErr := tryDatasetDoesNotExist(filesystem, stdio); dsNotExistErr != nil { err = dsNotExistErr - } else if dserr := tryParseDestroySnapshotsError(arg, stderr.Bytes()); dserr != nil { + } else if dserr := tryParseDestroySnapshotsError(arg, stdio); dserr != nil { err = dserr } @@ -1624,18 +1609,12 @@ func ZFSSnapshot(ctx context.Context, fs *DatasetPath, name string, recursive bo if err := EntityNamecheck(snapname, EntityTypeSnapshot); err != nil { return errors.Wrap(err, "zfs snapshot") } - cmd := exec.CommandContext(ctx, ZFS_BINARY, "snapshot", snapname) - stderr := bytes.NewBuffer(make([]byte, 0, 1024)) - cmd.Stderr = stderr - - if err = cmd.Start(); err != nil { - return err - } - - if err = cmd.Wait(); err != nil { + cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, "snapshot", snapname) + stdio, err := cmd.CombinedOutput() + if err != nil { err = &ZFSError{ - Stderr: stderr.Bytes(), + Stderr: stdio, WaitErr: err, } } @@ -1687,20 +1666,12 @@ func ZFSBookmark(ctx context.Context, fs string, v ZFSSendArgVersion, bookmark s debug("bookmark: %q %q", snapname, bookmarkname) - cmd := exec.CommandContext(ctx, ZFS_BINARY, "bookmark", snapname, bookmarkname) - - stderr := bytes.NewBuffer(make([]byte, 0, 1024)) - cmd.Stderr = stderr - - if err = cmd.Start(); err != nil { - return err - } - - if err = cmd.Wait(); err != nil { - - if ddne := tryDatasetDoesNotExist(snapname, stderr.Bytes()); err != nil { + cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, "bookmark", snapname, bookmarkname) + stdio, err := cmd.CombinedOutput() + if err != nil { + if ddne := tryDatasetDoesNotExist(snapname, stdio); err != nil { return ddne - } else if zfsBookmarkExistsRegex.Match(stderr.Bytes()) { + } else if zfsBookmarkExistsRegex.Match(stdio) { // check if this was idempotent bookGuid, err := ZFSGetGUID(ctx, fs, "#"+bookmark) @@ -1714,13 +1685,13 @@ func ZFSBookmark(ctx context.Context, fs string, v ZFSSendArgVersion, bookmark s } return &BookmarkExists{ fs: fs, bookmarkOrigin: v, bookmark: bookmark, - zfsMsg: stderr.String(), + zfsMsg: string(stdio), bookGuid: bookGuid, } } else { return &ZFSError{ - Stderr: stderr.Bytes(), + Stderr: stdio, WaitErr: err, } } @@ -1742,18 +1713,11 @@ func ZFSRollback(ctx context.Context, fs *DatasetPath, snapshot FilesystemVersio args = append(args, rollbackArgs...) args = append(args, snapabs) - cmd := exec.CommandContext(ctx, ZFS_BINARY, args...) - - stderr := bytes.NewBuffer(make([]byte, 0, 1024)) - cmd.Stderr = stderr - - if err = cmd.Start(); err != nil { - return err - } - - if err = cmd.Wait(); err != nil { + cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...) + stdio, err := cmd.CombinedOutput() + if err != nil { err = &ZFSError{ - Stderr: stderr.Bytes(), + Stderr: stdio, WaitErr: err, } } diff --git a/zfs/zfscmd/zfscmd-logging-scraper/README.md b/zfs/zfscmd/zfscmd-logging-scraper/README.md new file mode 100644 index 0000000..235b726 --- /dev/null +++ b/zfs/zfscmd/zfscmd-logging-scraper/README.md @@ -0,0 +1,11 @@ +The tool in this package (`go run . -h`) scrapes log lines produces by the `github.com/zrepl/zrepl/zfs/zfscmd` package +into a stream of JSON objects. + +The `analysis.ipynb` then runs some basic analysis on the collected log output. + +## Deps for the `scrape_graylog_csv.bash` script + +``` +pip install --upgrade git+https://github.com/lk-jeffpeck/csvfilter.git@ec433f14330fbbf5d41f56febfeedac22868a949 +``` + diff --git a/zfs/zfscmd/zfscmd-logging-scraper/analysis.ipynb b/zfs/zfscmd/zfscmd-logging-scraper/analysis.ipynb new file mode 100644 index 0000000..fb1236b --- /dev/null +++ b/zfs/zfscmd/zfscmd-logging-scraper/analysis.ipynb @@ -0,0 +1,260 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "import seaborn as sns\n", + "import matplotlib.pyplot as plt\n", + "import re\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# ./parsed.json is the stdout of the scraper tool in this directory\n", + "df = pd.read_json(\"./parsed.json\", lines=True)\n", + "df" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def parse_ds(entity):\n", + " m = re.search(r\"(?P[^@#]*)([@#].+)?\", entity)\n", + " return m.group(\"dataset\")\n", + " \n", + "def parse_cmd(row):\n", + " cmd = row.Cmd\n", + " binary, verb, *tail = re.split(r\"\\s+\", cmd) # NOTE whitespace in dataset names => don't use comp\n", + " \n", + " dataset = None\n", + " if binary == \"zfs\":\n", + " if verb == \"send\": \n", + " if len(tail) == 0:\n", + " verb = \"send-feature-test\"\n", + " else:\n", + " dataset = parse_ds(tail[-1])\n", + " if \"-n\" in tail:\n", + " verb = \"send-dry\"\n", + " elif verb == \"recv\" or verb == \"receive\":\n", + " verb = \"receive\"\n", + " if len(tail) > 0:\n", + " dataset = parse_ds(tail[-1])\n", + " else:\n", + " verb = \"receive-CLI-test\"\n", + " elif verb == \"get\":\n", + " dataset = parse_ds(tail[-1])\n", + " elif verb == \"list\":\n", + " if \"-r\" in tail and \"-d\" in tail and \"1\" in tail:\n", + " dataset = parse_ds(tail[-1])\n", + " verb = \"list-single-dataset\"\n", + " else:\n", + " dataset = \"!ALL_POOLS!\"\n", + " verb = \"list-all-filesystems\"\n", + " elif verb == \"bookmark\":\n", + " dataset = parse_ds(tail[-2])\n", + " elif verb == \"hold\":\n", + " dataset = parse_ds(tail[-1])\n", + " elif verb == \"snapshot\":\n", + " dataset = parse_ds(tail[-1])\n", + " elif verb == \"release\":\n", + " dss = tail[-1].split(\",\")\n", + " if len(dss) > 1:\n", + " raise Exception(\"cannot handle batch-release\")\n", + " dataset = parse_ds(dss[0])\n", + " elif verb == \"holds\" and \"-H\" in tail:\n", + " dss = tail[-1].split(\",\")\n", + " if len(dss) > 1:\n", + " raise Exception(\"cannot handle batch-holds\")\n", + " dataset = parse_ds(dss[0])\n", + " elif verb == \"destroy\":\n", + " dss = tail[-1].split(\",\")\n", + " if len(dss) > 1:\n", + " raise Exception(\"cannot handle batch-holds\")\n", + " dataset = parse_ds(dss[0])\n", + " \n", + " return {'action':binary + \"-\" + verb, 'dataset': dataset }\n", + " \n", + " \n", + "res = df.apply(parse_cmd, axis='columns', result_type='expand')\n", + "res = pd.concat([df, res], axis='columns')\n", + "for cat in [\"action\", \"dataset\"]:\n", + " res[cat] = res[cat].astype('category')\n", + "\n", + "res[\"LogTimeUnix\"] = pd.to_datetime(res.LogTime)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "res[\"OtherTime\"] = res.TotalTime - res.Usertime - res.Systime\n", + "x = res.melt(id_vars=[\"action\", \"dataset\"], value_vars=[\"TotalTime\", \"OtherTime\", \"Usertime\", \"Systime\"])\n", + "x" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"commands with NaN values\")\n", + "set(x[x.isna().any(axis=1)].action.values)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# (~x.action.astype('str').isin([\"zfs-send\", \"zfs-recv\"]))\n", + "totaltimes = x[(x.variable == \"TotalTime\")].groupby([\"action\", \"dataset\"]).sum().reset_index()\n", + "display(totaltimes)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "totaltimes_by_action = totaltimes.groupby(\"action\").sum().sort_values(by=\"value\")\n", + "totaltimes_by_action.plot.barh()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "totaltimes.groupby(\"dataset\").sum().plot.barh(fontsize=5)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "most_expensive_action = totaltimes_by_action.idxmax().value\n", + "display(most_expensive_action)\n", + "most_expensive_action_by_dataset = totaltimes[totaltimes.action == most_expensive_action].groupby(\"dataset\").sum().sort_values(by=\"value\")\n", + "most_expensive_action_by_dataset.plot.barh(rot=50, fontsize=5, figsize=(10, 20))\n", + "plt.savefig('most-expensive-command.pdf')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "res" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# %matplotlib notebook \n", + "\n", + "# res.index = res.LogTimeUnix\n", + "\n", + "# resampled = res.pivot(columns='action', values='TotalTime').resample(\"1s\").sum()\n", + "# resampled.cumsum().plot()\n", + "# res[\"BeginTime\"] = res.LogTimeUnix.dt.total_seconds()\n", + "# holds = res[res.action == \"zfs-holds\"]\n", + "# sns.stripplot(x=\"LogTimeUnix\", y=\"action\", data=res)\n", + "# res[\"LogTimeUnix\"].resample(\"20min\").sum()\n", + "# res[res.action == \"zfs-holds\"].plot.scatter(x=\"LogTimeUnix\", y=\"TotalTime\")\n", + "\n", + "#res[res.action == \"zfs-holds\"].pivot(columns='action', values=['TotalTime', 'Systime', \"Usertime\"]).resample(\"1s\").sum().cumsum().plot()\n", + "pivoted = res.reset_index(drop=True).pivot_table(values=['TotalTime', 'Systime', \"Usertime\"], index=\"LogTimeUnix\", columns=\"action\")\n", + "pivoted" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pivoted.cumsum()[[(\"TotalTime\", \"zfs-holds\"),(\"Systime\", \"zfs-holds\"),(\"Usertime\", \"zfs-holds\")]].plot()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pivoted = res.reset_index(drop=True).pivot_table(values=['TotalTime'], index=\"LogTimeUnix\", columns=\"action\")\n", + "cum_invocation_counts_per_action = pivoted.isna().astype(int).cumsum()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "cum_invocation_counts_per_action" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# zfs-get as reference value\n", + "cum_invocation_counts_per_action[[(\"TotalTime\",\"zfs-holds\"),(\"TotalTime\",\"zfs-get\")]].plot()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/zfs/zfscmd/zfscmd-logging-scraper/scrape_graylog_csv.bash b/zfs/zfscmd/zfscmd-logging-scraper/scrape_graylog_csv.bash new file mode 100755 index 0000000..e348097 --- /dev/null +++ b/zfs/zfscmd/zfscmd-logging-scraper/scrape_graylog_csv.bash @@ -0,0 +1,16 @@ +#!/usr/bin/env bash + +# This script converts output that was produced by zrepl and captured by Graylog +# back to something that the scraper in this package's main can understand +# Intended for human syslog +# logging: +# - type: syslog +# level: debug +# format: human + + +csvfilter --skip 1 -f 0,2 -q '"' --out-quotechar=' ' /dev/stdin | sed -E 's/^\s*([^,]*), /\1 [LEVEL]/' | \ + go run . -v \ + --dateRE '^([^\[]+) (\[.*)' \ + --dateFormat '2006-01-02T15:04:05.999999999Z' + diff --git a/zfs/zfscmd/zfscmd-logging-scraper/zfscmd_logging_scraper.go b/zfs/zfscmd/zfscmd-logging-scraper/zfscmd_logging_scraper.go new file mode 100644 index 0000000..6d33f08 --- /dev/null +++ b/zfs/zfscmd/zfscmd-logging-scraper/zfscmd_logging_scraper.go @@ -0,0 +1,129 @@ +package main + +import ( + "bufio" + "encoding/json" + "fmt" + "os" + "regexp" + "strings" + "time" + + "github.com/go-logfmt/logfmt" + "github.com/pkg/errors" + "github.com/spf13/pflag" + + "github.com/zrepl/zrepl/daemon/logging" +) + +type RuntimeLine struct { + LogTime time.Time + Cmd string + TotalTime, Usertime, Systime time.Duration + Error string +} + +var humanFormatterLineRE = regexp.MustCompile(`^(\[[^\]]+\]){2}\[zfs.cmd\]:\s+command\s+exited\s+(with|without)\s+error\s+(.+)`) + +func parseSecs(s string) (time.Duration, error) { + d, err := time.ParseDuration(s + "s") + if err != nil { + return 0, errors.Wrapf(err, "parse duration %q", s) + } + return d, nil +} + +func parseHumanFormatterNodate(line string) (l RuntimeLine, err error) { + m := humanFormatterLineRE.FindStringSubmatch(line) + if m == nil { + return l, errors.New("human formatter regex does not match") + } + + d := logfmt.NewDecoder(strings.NewReader(m[3])) + for d.ScanRecord() { + for d.ScanKeyval() { + k := string(d.Key()) + v := string(d.Value()) + switch k { + case "cmd": + l.Cmd = v + case "total_time_s": + l.TotalTime, err = parseSecs(v) + case "usertime_s": + l.Usertime, err = parseSecs(v) + case "systemtime_s": + l.Systime, err = parseSecs(v) + case "err": + l.Error = v + case "invocation": + continue // pass + default: + return l, errors.Errorf("unknown key %q", k) + } + if err != nil { + return l, err + } + } + } + if d.Err() != nil { + return l, errors.Wrap(d.Err(), "decode key value pairs") + } + return l, nil +} + +func parseLogLine(line string) (l RuntimeLine, err error) { + m := dateRegex.FindStringSubmatch(line) + if len(m) != 3 { + return l, errors.Errorf("invalid date regex match %v", m) + } + dateTrimmed := strings.TrimSpace(m[1]) + date, err := time.Parse(dateFormat, dateTrimmed) + if err != nil { + panic(fmt.Sprintf("cannot parse date %q: %s", dateTrimmed, err)) + } + logLine := m[2] + + l, err = parseHumanFormatterNodate(strings.TrimSpace(logLine)) + l.LogTime = date + return l, err +} + +var verbose bool +var dateRegexArg string +var dateRegex = regexp.MustCompile(`^([^\[]+)(.*)`) +var dateFormat = logging.HumanFormatterDateFormat + +func main() { + + pflag.StringVarP(&dateRegexArg, "dateRE", "d", "", "date regex") + pflag.StringVar(&dateFormat, "dateFormat", logging.HumanFormatterDateFormat, "go date format") + pflag.BoolVarP(&verbose, "verbose", "v", false, "verbose") + pflag.Parse() + + if dateRegexArg != "" { + dateRegex = regexp.MustCompile(dateRegexArg) + } + + input := bufio.NewScanner(os.Stdin) + input.Split(bufio.ScanLines) + + enc := json.NewEncoder(os.Stdout) + for input.Scan() { + + l, err := parseLogLine(input.Text()) + if err != nil && verbose { + fmt.Fprintf(os.Stderr, "ignoring line after error %v\n", err) + fmt.Fprintf(os.Stderr, "offending line was: %s\n", input.Text()) + } + if err == nil { + if err := enc.Encode(l); err != nil { + panic(err) + } + } + } + + if input.Err() != nil { + panic(input.Err()) + } + +} diff --git a/zfs/zfscmd/zfscmd-logging-scraper/zfscmd_logging_scrapter_test.go b/zfs/zfscmd/zfscmd-logging-scraper/zfscmd_logging_scrapter_test.go new file mode 100644 index 0000000..2bea1f2 --- /dev/null +++ b/zfs/zfscmd/zfscmd-logging-scraper/zfscmd_logging_scrapter_test.go @@ -0,0 +1,86 @@ +package main + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/zrepl/zrepl/daemon/logging" +) + +func TestParseHumanFormatter(t *testing.T) { + + type testCase struct { + Name string + Input string + Expect *RuntimeLine + ExpectErr string + } + + secs := func(s string) time.Duration { + d, err := parseSecs(s) + require.NoError(t, err) + return d + } + + logTime, err := time.Parse(logging.HumanFormatterDateFormat, "2020-04-04T00:00:05+02:00") + require.NoError(t, err) + + tcs := []testCase{ + { + Name: "human-formatter-noerror", + Input: `2020-04-04T00:00:05+02:00 [DEBG][jobname][zfs.cmd]: command exited without error usertime_s="0.008445" cmd="zfs list -H -p -o name -r -t filesystem,volume" systemtime_s="0.033783" invocation="84" total_time_s="0.037828619"`, + Expect: &RuntimeLine{ + Cmd: "zfs list -H -p -o name -r -t filesystem,volume", + TotalTime: secs("0.037828619"), + Usertime: secs("0.008445"), + Systime: secs("0.033783"), + Error: "", + LogTime: logTime, + }, + }, + { + Name: "human-formatter-witherror", + Input: `2020-04-04T00:00:05+02:00 [DEBG][jobname][zfs.cmd]: command exited with error usertime_s="0.008445" cmd="zfs list -H -p -o name -r -t filesystem,volume" systemtime_s="0.033783" invocation="84" total_time_s="0.037828619" err="some error"`, + Expect: &RuntimeLine{ + Cmd: "zfs list -H -p -o name -r -t filesystem,volume", + TotalTime: secs("0.037828619"), + Usertime: secs("0.008445"), + Systime: secs("0.033783"), + Error: "some error", + LogTime: logTime, + }, + }, + { + Name: "from graylog", + Input: `2020-04-04T00:00:05+02:00 [DEBG][csnas][zfs.cmd]: command exited without error usertime_s="0" cmd="zfs send -i zroot/ezjail/synapse-12@zrepl_20200329_095518_000 zroot/ezjail/synapse-12@zrepl_20200329_102454_000" total_time_s="0.101598591" invocation="85" systemtime_s="0.041581"`, + Expect: &RuntimeLine{ + Cmd: "zfs send -i zroot/ezjail/synapse-12@zrepl_20200329_095518_000 zroot/ezjail/synapse-12@zrepl_20200329_102454_000", + TotalTime: secs("0.101598591"), + Systime: secs("0.041581"), + Usertime: secs("0"), + Error: "", + LogTime: logTime, + }, + }, + } + + for _, c := range tcs { + t.Run(c.Name, func(t *testing.T) { + l, err := parseLogLine(c.Input) + t.Logf("l=%v", l) + t.Logf("err=%T %v", err, err) + if (c.Expect != nil && c.ExpectErr != "") || (c.Expect == nil && c.ExpectErr == "") { + t.Fatal("bad test case", c) + } + if c.Expect != nil { + require.Equal(t, *c.Expect, l) + } + if c.ExpectErr != "" { + require.EqualError(t, err, c.ExpectErr) + } + }) + } + +} diff --git a/zfs/zfscmd/zfscmd.go b/zfs/zfscmd/zfscmd.go new file mode 100644 index 0000000..544817a --- /dev/null +++ b/zfs/zfscmd/zfscmd.go @@ -0,0 +1,143 @@ +// Package zfscmd provides a wrapper around packate os/exec. +// Functionality provided by the wrapper: +// - logging start and end of command execution +// - status report of active commands +// - prometheus metrics of runtimes +package zfscmd + +import ( + "context" + "io" + "os" + "os/exec" + "strings" + "sync" + "time" + + "github.com/zrepl/zrepl/util/circlog" +) + +type Cmd struct { + cmd *exec.Cmd + ctx context.Context + mtx sync.RWMutex + startedAt, waitReturnedAt time.Time +} + +func CommandContext(ctx context.Context, name string, arg ...string) *Cmd { + cmd := exec.CommandContext(ctx, name, arg...) + return &Cmd{cmd: cmd, ctx: ctx} +} + +// err.(*exec.ExitError).Stderr will NOT be set +func (c *Cmd) CombinedOutput() (o []byte, err error) { + c.startPre() + c.startPost(nil) + c.waitPre() + o, err = c.cmd.CombinedOutput() + c.waitPost(err) + return +} + +// err.(*exec.ExitError).Stderr will be set +func (c *Cmd) Output() (o []byte, err error) { + c.startPre() + c.startPost(nil) + c.waitPre() + o, err = c.cmd.Output() + c.waitPost(err) + return +} + +// Careful: err.(*exec.ExitError).Stderr will not be set, even if you don't open an StderrPipe +func (c *Cmd) StdoutPipeWithErrorBuf() (p io.ReadCloser, errBuf *circlog.CircularLog, err error) { + p, err = c.cmd.StdoutPipe() + errBuf = circlog.MustNewCircularLog(1 << 15) + c.cmd.Stderr = errBuf + return p, errBuf, err +} + +type Stdio struct { + Stdin io.ReadCloser + Stdout io.Writer + Stderr io.Writer +} + +func (c *Cmd) SetStdio(stdio Stdio) { + c.cmd.Stdin = stdio.Stdin + c.cmd.Stderr = stdio.Stderr + c.cmd.Stdout = stdio.Stdout +} + +func (c *Cmd) String() string { + return strings.Join(c.cmd.Args, " ") // includes argv[0] if initialized with CommandContext, that's the only way we o it +} + +func (c *Cmd) log() Logger { + return getLogger(c.ctx).WithField("cmd", c.String()) +} + +func (c *Cmd) Start() (err error) { + c.startPre() + err = c.cmd.Start() + c.startPost(err) + return err +} + +// only call this after a successful call to .Start() +func (c *Cmd) Process() *os.Process { + if c.startedAt.IsZero() { + panic("calling Process() only allowed after successful call to Start()") + } + return c.cmd.Process +} + +func (c *Cmd) Wait() (err error) { + c.waitPre() + err = c.cmd.Wait() + if !c.waitReturnedAt.IsZero() { + // ignore duplicate waits + return err + } + c.waitPost(err) + return err +} + +func (c *Cmd) startPre() { + startPreLogging(c, time.Now()) +} + +func (c *Cmd) startPost(err error) { + now := time.Now() + + c.mtx.Lock() + c.startedAt = now + c.mtx.Unlock() + + startPostReport(c, err, now) + startPostLogging(c, err, now) +} + +func (c *Cmd) waitPre() { + waitPreLogging(c, time.Now()) +} + +func (c *Cmd) waitPost(err error) { + now := time.Now() + + c.mtx.Lock() + c.waitReturnedAt = now + c.mtx.Unlock() + + waitPostReport(c, now) + waitPostLogging(c, err, now) + waitPostPrometheus(c, err, now) +} + +// returns 0 if the command did not yet finish +func (c *Cmd) Runtime() time.Duration { + if c.waitReturnedAt.IsZero() { + return 0 + } + return c.waitReturnedAt.Sub(c.startedAt) +} diff --git a/zfs/zfscmd/zfscmd_context.go b/zfs/zfscmd/zfscmd_context.go new file mode 100644 index 0000000..b087652 --- /dev/null +++ b/zfs/zfscmd/zfscmd_context.go @@ -0,0 +1,39 @@ +package zfscmd + +import ( + "context" + + "github.com/zrepl/zrepl/logger" +) + +type contextKey int + +const ( + contextKeyLogger contextKey = iota + contextKeyJobID +) + +type Logger = logger.Logger + +func WithJobID(ctx context.Context, jobID string) context.Context { + return context.WithValue(ctx, contextKeyJobID, jobID) +} + +func getJobIDOrDefault(ctx context.Context, def string) string { + ret, ok := ctx.Value(contextKeyJobID).(string) + if !ok { + return def + } + return ret +} + +func WithLogger(ctx context.Context, log Logger) context.Context { + return context.WithValue(ctx, contextKeyLogger, log) +} + +func getLogger(ctx context.Context) Logger { + if l, ok := ctx.Value(contextKeyLogger).(Logger); ok { + return l + } + return logger.NewNullLogger() +} diff --git a/zfs/zfscmd/zfscmd_logging.go b/zfs/zfscmd/zfscmd_logging.go new file mode 100644 index 0000000..200f0a8 --- /dev/null +++ b/zfs/zfscmd/zfscmd_logging.go @@ -0,0 +1,41 @@ +package zfscmd + +import ( + "time" +) + +// Implementation Note: +// +// Pre-events logged with debug +// Post-event without error logged with info +// Post-events with error _also_ logged with info +// (Not all errors we observe at this layer) are actual errors in higher-level layers) + +func startPreLogging(c *Cmd, now time.Time) { + c.log().Debug("starting command") +} + +func startPostLogging(c *Cmd, err error, now time.Time) { + if err == nil { + c.log().Info("started command") + } else { + c.log().WithError(err).Error("cannot start command") + } +} + +func waitPreLogging(c *Cmd, now time.Time) { + c.log().Debug("start waiting") +} + +func waitPostLogging(c *Cmd, err error, now time.Time) { + log := c.log(). + WithField("total_time_s", c.Runtime().Seconds()). + WithField("systemtime_s", c.cmd.ProcessState.SystemTime().Seconds()). + WithField("usertime_s", c.cmd.ProcessState.UserTime().Seconds()) + + if err == nil { + log.Info("command exited without error") + } else { + log.WithError(err).Info("command exited with error") + } +} diff --git a/zfs/zfscmd/zfscmd_platform_test.bash b/zfs/zfscmd/zfscmd_platform_test.bash new file mode 100755 index 0000000..cc4ba3f --- /dev/null +++ b/zfs/zfscmd/zfscmd_platform_test.bash @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +set -euo pipefail + +echo "to stderr" 1>&2 +echo "to stdout" + +exit "$1" \ No newline at end of file diff --git a/zfs/zfscmd/zfscmd_platform_test.go b/zfs/zfscmd/zfscmd_platform_test.go new file mode 100644 index 0000000..982e32e --- /dev/null +++ b/zfs/zfscmd/zfscmd_platform_test.go @@ -0,0 +1,60 @@ +package zfscmd + +import ( + "bytes" + "io" + "os/exec" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const testBin = "./zfscmd_platform_test.bash" + +func TestCmdStderrBehaviorOutput(t *testing.T) { + + stdout, err := exec.Command(testBin, "0").Output() + require.NoError(t, err) + require.Equal(t, []byte("to stdout\n"), stdout) + + stdout, err = exec.Command(testBin, "1").Output() + assert.Equal(t, []byte("to stdout\n"), stdout) + require.Error(t, err) + ee, ok := err.(*exec.ExitError) + require.True(t, ok) + require.Equal(t, ee.Stderr, []byte("to stderr\n")) +} + +func TestCmdStderrBehaviorCombinedOutput(t *testing.T) { + + stdio, err := exec.Command(testBin, "0").CombinedOutput() + require.NoError(t, err) + require.Equal(t, "to stderr\nto stdout\n", string(stdio)) + + stdio, err = exec.Command(testBin, "1").CombinedOutput() + require.Equal(t, "to stderr\nto stdout\n", string(stdio)) + require.Error(t, err) + ee, ok := err.(*exec.ExitError) + require.True(t, ok) + require.Empty(t, ee.Stderr) // !!!! maybe not what one would expect +} + +func TestCmdStderrBehaviorStdoutPipe(t *testing.T) { + cmd := exec.Command(testBin, "1") + stdoutPipe, err := cmd.StdoutPipe() + require.NoError(t, err) + err = cmd.Start() + require.NoError(t, err) + defer cmd.Wait() + var stdout bytes.Buffer + _, err = io.Copy(&stdout, stdoutPipe) + require.NoError(t, err) + require.Equal(t, "to stdout\n", stdout.String()) + + err = cmd.Wait() + require.Error(t, err) + ee, ok := err.(*exec.ExitError) + require.True(t, ok) + require.Empty(t, ee.Stderr) // !!!!! probably not what one would expect if we only redirect stdout +} diff --git a/zfs/zfscmd/zfscmd_prometheus.go b/zfs/zfscmd/zfscmd_prometheus.go new file mode 100644 index 0000000..f68e998 --- /dev/null +++ b/zfs/zfscmd/zfscmd_prometheus.go @@ -0,0 +1,73 @@ +package zfscmd + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +var metrics struct { + totaltime *prometheus.HistogramVec + systemtime *prometheus.HistogramVec + usertime *prometheus.HistogramVec +} + +var timeLabels = []string{"jobid", "zfsbinary", "zfsverb"} +var timeBuckets = []float64{0.01, 0.1, 0.2, 0.5, 0.75, 1, 2, 5, 10, 60} + +func init() { + metrics.totaltime = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "zrepl", + Subsystem: "zfscmd", + Name: "runtime", + Help: "number of seconds that the command took from start until wait returned", + Buckets: timeBuckets, + }, timeLabels) + metrics.systemtime = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "zrepl", + Subsystem: "zfscmd", + Name: "systemtime", + Help: "https://golang.org/pkg/os/#ProcessState.SystemTime", + Buckets: timeBuckets, + }, timeLabels) + metrics.usertime = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "zrepl", + Subsystem: "zfscmd", + Name: "usertime", + Help: "https://golang.org/pkg/os/#ProcessState.UserTime", + Buckets: timeBuckets, + }, timeLabels) + +} + +func RegisterMetrics(r prometheus.Registerer) { + r.MustRegister(metrics.totaltime) + r.MustRegister(metrics.systemtime) + r.MustRegister(metrics.usertime) +} + +func waitPostPrometheus(c *Cmd, err error, now time.Time) { + + if len(c.cmd.Args) < 2 { + getLogger(c.ctx).WithField("args", c.cmd.Args). + Warn("prometheus: cannot turn zfs command into metric") + return + } + + // Note: do not start parsing other aspects + // of the ZFS command line. This is not the suitable layer + // for such a task. + + jobid := getJobIDOrDefault(c.ctx, "_nojobid") + + labelValues := []string{jobid, c.cmd.Args[0], c.cmd.Args[1]} + + metrics.totaltime. + WithLabelValues(labelValues...). + Observe(c.Runtime().Seconds()) + metrics.systemtime.WithLabelValues(labelValues...). + Observe(c.cmd.ProcessState.SystemTime().Seconds()) + metrics.usertime.WithLabelValues(labelValues...). + Observe(c.cmd.ProcessState.UserTime().Seconds()) + +} diff --git a/zfs/zfscmd/zfscmd_report.go b/zfs/zfscmd/zfscmd_report.go new file mode 100644 index 0000000..abd7b4f --- /dev/null +++ b/zfs/zfscmd/zfscmd_report.go @@ -0,0 +1,68 @@ +package zfscmd + +import ( + "fmt" + "sync" + "time" +) + +type Report struct { + Active []ActiveCommand +} + +type ActiveCommand struct { + Path string + Args []string + StartedAt time.Time +} + +func GetReport() *Report { + active.mtx.RLock() + defer active.mtx.RUnlock() + var activeCommands []ActiveCommand + for c := range active.cmds { + c.mtx.RLock() + activeCommands = append(activeCommands, ActiveCommand{ + Path: c.cmd.Path, + Args: c.cmd.Args, + StartedAt: c.startedAt, + }) + c.mtx.RUnlock() + } + return &Report{ + Active: activeCommands, + } +} + +var active struct { + mtx sync.RWMutex + cmds map[*Cmd]bool +} + +func init() { + active.cmds = make(map[*Cmd]bool) +} + +func startPostReport(c *Cmd, err error, now time.Time) { + if err != nil { + return + } + + active.mtx.Lock() + prev := active.cmds[c] + if prev { + panic("impl error: duplicate active command") + } + active.cmds[c] = true + active.mtx.Unlock() +} + +func waitPostReport(c *Cmd, now time.Time) { + active.mtx.Lock() + defer active.mtx.Unlock() + prev := active.cmds[c] + if !prev { + panic(fmt.Sprintf("impl error: onWaitDone must only be called on an active command: %s", c)) + } + delete(active.cmds, c) +}