zrepl/zfs/zfscmd/zfscmd.go
Christian Schwarz 10a14a8c50 [#307] add package trace, integrate it with logging, and adopt it throughout zrepl
package trace:

- introduce the concept of tasks and spans, tracked as linked list within ctx
    - see package-level docs for an overview of the concepts
    - **main feature 1**: unique stack of task and span IDs
        - makes it easy to follow a series of log entries in concurrent code
    - **main feature 2**: ability to produce a chrome://tracing-compatible trace file
        - either via an env variable or a `zrepl pprof` subcommand
        - this is not a CPU profile, we already have go pprof for that
        - but it is very useful to visually inspect where the
          replication / snapshotter / pruner spends its time
          ( fixes #307 )

usage in package daemon/logging:

- goal: every log entry should have a trace field with the ID stack from package trace

- make `logging.GetLogger(ctx, Subsys)` the authoritative `logger.Logger` factory function
    - the context carries a linked list of injected fields which
      `logging.GetLogger` adds to the logger it returns
    - `logging.GetLogger` also uses package `trace` to get the
      task-and-span-stack and injects it into the returned logger's fields
2020-05-19 11:30:02 +02:00

197 lines
4.2 KiB
Go

// Package zfscmd provides a wrapper around packate os/exec.
// Functionality provided by the wrapper:
// - logging start and end of command execution
// - status report of active commands
// - prometheus metrics of runtimes
package zfscmd
import (
"context"
"io"
"os"
"os/exec"
"strings"
"sync"
"time"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/util/circlog"
)
type Cmd struct {
cmd *exec.Cmd
ctx context.Context
mtx sync.RWMutex
startedAt, waitStartedAt, waitReturnedAt time.Time
waitReturnEndSpanCb trace.DoneFunc
}
func CommandContext(ctx context.Context, name string, arg ...string) *Cmd {
cmd := exec.CommandContext(ctx, name, arg...)
return &Cmd{cmd: cmd, ctx: ctx}
}
// err.(*exec.ExitError).Stderr will NOT be set
func (c *Cmd) CombinedOutput() (o []byte, err error) {
c.startPre(false)
c.startPost(nil)
c.waitPre()
o, err = c.cmd.CombinedOutput()
c.waitPost(err)
return
}
// err.(*exec.ExitError).Stderr will be set
func (c *Cmd) Output() (o []byte, err error) {
c.startPre(false)
c.startPost(nil)
c.waitPre()
o, err = c.cmd.Output()
c.waitPost(err)
return
}
// Careful: err.(*exec.ExitError).Stderr will not be set, even if you don't open an StderrPipe
func (c *Cmd) StdoutPipeWithErrorBuf() (p io.ReadCloser, errBuf *circlog.CircularLog, err error) {
p, err = c.cmd.StdoutPipe()
errBuf = circlog.MustNewCircularLog(1 << 15)
c.cmd.Stderr = errBuf
return p, errBuf, err
}
type Stdio struct {
Stdin io.ReadCloser
Stdout io.Writer
Stderr io.Writer
}
func (c *Cmd) SetStdio(stdio Stdio) {
c.cmd.Stdin = stdio.Stdin
c.cmd.Stderr = stdio.Stderr
c.cmd.Stdout = stdio.Stdout
}
func (c *Cmd) String() string {
return strings.Join(c.cmd.Args, " ") // includes argv[0] if initialized with CommandContext, that's the only way we o it
}
func (c *Cmd) log() Logger {
return getLogger(c.ctx).WithField("cmd", c.String())
}
func (c *Cmd) Start() (err error) {
c.startPre(true)
err = c.cmd.Start()
c.startPost(err)
return err
}
// only call this after a successful call to .Start()
func (c *Cmd) Process() *os.Process {
if c.startedAt.IsZero() {
panic("calling Process() only allowed after successful call to Start()")
}
return c.cmd.Process
}
func (c *Cmd) Wait() (err error) {
c.waitPre()
err = c.cmd.Wait()
c.waitPost(err)
return err
}
func (c *Cmd) startPre(newTask bool) {
if newTask {
// avoid explosion of tasks with name c.String()
c.ctx, c.waitReturnEndSpanCb = trace.WithTaskAndSpan(c.ctx, "zfscmd", c.String())
} else {
c.ctx, c.waitReturnEndSpanCb = trace.WithSpan(c.ctx, c.String())
}
startPreLogging(c, time.Now())
}
func (c *Cmd) startPost(err error) {
now := time.Now()
c.mtx.Lock()
c.startedAt = now
c.mtx.Unlock()
startPostReport(c, err, now)
startPostLogging(c, err, now)
}
func (c *Cmd) waitPre() {
now := time.Now()
// ignore duplicate waits
c.mtx.Lock()
// ignore duplicate waits
if !c.waitStartedAt.IsZero() {
c.mtx.Unlock()
return
}
c.waitStartedAt = now
c.mtx.Unlock()
waitPreLogging(c, now)
}
type usage struct {
total_secs, system_secs, user_secs float64
}
func (c *Cmd) waitPost(err error) {
now := time.Now()
c.mtx.Lock()
// ignore duplicate waits
if !c.waitReturnedAt.IsZero() {
c.mtx.Unlock()
return
}
c.waitReturnedAt = now
c.mtx.Unlock()
// build usage
var u usage
{
var s *os.ProcessState
if err == nil {
s = c.cmd.ProcessState
} else if ee, ok := err.(*exec.ExitError); ok {
s = ee.ProcessState
}
if s == nil {
u = usage{
total_secs: c.Runtime().Seconds(),
system_secs: -1,
user_secs: -1,
}
} else {
u = usage{
total_secs: c.Runtime().Seconds(),
system_secs: s.SystemTime().Seconds(),
user_secs: s.UserTime().Seconds(),
}
}
}
waitPostReport(c, u, now)
waitPostLogging(c, u, err, now)
waitPostPrometheus(c, u, err, now)
// must be last because c.ctx might be used by other waitPost calls
c.waitReturnEndSpanCb()
}
// returns 0 if the command did not yet finish
func (c *Cmd) Runtime() time.Duration {
if c.waitReturnedAt.IsZero() {
return 0
}
return c.waitReturnedAt.Sub(c.startedAt)
}