[#307] add package trace, integrate it with logging, and adopt it throughout zrepl

package trace:

- introduce the concept of tasks and spans, tracked as linked list within ctx
    - see package-level docs for an overview of the concepts
    - **main feature 1**: unique stack of task and span IDs
        - makes it easy to follow a series of log entries in concurrent code
    - **main feature 2**: ability to produce a chrome://tracing-compatible trace file
        - either via an env variable or a `zrepl pprof` subcommand
        - this is not a CPU profile, we already have go pprof for that
        - but it is very useful to visually inspect where the
          replication / snapshotter / pruner spends its time
          ( fixes #307 )

usage in package daemon/logging:

- goal: every log entry should have a trace field with the ID stack from package trace

- make `logging.GetLogger(ctx, Subsys)` the authoritative `logger.Logger` factory function
    - the context carries a linked list of injected fields which
      `logging.GetLogger` adds to the logger it returns
    - `logging.GetLogger` also uses package `trace` to get the
      task-and-span-stack and injects it into the returned logger's fields
This commit is contained in:
Christian Schwarz 2020-04-11 15:49:41 +02:00
parent bcb5965617
commit 10a14a8c50
75 changed files with 1934 additions and 462 deletions

View File

@ -1,11 +1,13 @@
package cli
import (
"context"
"fmt"
"os"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/config"
)
@ -75,7 +77,7 @@ type Subcommand struct {
Short string
Example string
NoRequireConfig bool
Run func(subcommand *Subcommand, args []string) error
Run func(ctx context.Context, subcommand *Subcommand, args []string) error
SetupFlags func(f *pflag.FlagSet)
SetupSubcommands func() []*Subcommand
@ -96,7 +98,11 @@ func (s *Subcommand) Config() *config.Config {
func (s *Subcommand) run(cmd *cobra.Command, args []string) {
s.tryParseConfig()
err := s.Run(s, args)
ctx := context.Background()
endTask := trace.WithTaskFromStackUpdateCtx(&ctx)
defer endTask()
err := s.Run(ctx, s, args)
endTask()
if err != nil {
fmt.Fprintf(os.Stderr, "%s\n", err)
os.Exit(1)

View File

@ -1,6 +1,7 @@
package client
import (
"context"
"encoding/json"
"fmt"
"os"
@ -29,7 +30,7 @@ var ConfigcheckCmd = &cli.Subcommand{
f.StringVar(&configcheckArgs.format, "format", "", "dump parsed config object [pretty|yaml|json]")
f.StringVar(&configcheckArgs.what, "what", "all", "what to print [all|config|jobs|logging]")
},
Run: func(subcommand *cli.Subcommand, args []string) error {
Run: func(ctx context.Context, subcommand *cli.Subcommand, args []string) error {
formatMap := map[string]func(interface{}){
"": func(i interface{}) {},
"pretty": func(i interface{}) {

View File

@ -48,14 +48,13 @@ var migratePlaceholder0_1Args struct {
dryRun bool
}
func doMigratePlaceholder0_1(sc *cli.Subcommand, args []string) error {
func doMigratePlaceholder0_1(ctx context.Context, sc *cli.Subcommand, args []string) error {
if len(args) != 0 {
return fmt.Errorf("migration does not take arguments, got %v", args)
}
cfg := sc.Config()
ctx := context.Background()
allFSS, err := zfs.ZFSListMapping(ctx, zfs.NoFilter())
if err != nil {
return errors.Wrap(err, "cannot list filesystems")
@ -124,7 +123,7 @@ var fail = color.New(color.FgRed)
var migrateReplicationCursorSkipSentinel = fmt.Errorf("skipping this filesystem")
func doMigrateReplicationCursor(sc *cli.Subcommand, args []string) error {
func doMigrateReplicationCursor(ctx context.Context, sc *cli.Subcommand, args []string) error {
if len(args) != 0 {
return fmt.Errorf("migration does not take arguments, got %v", args)
}
@ -137,8 +136,6 @@ func doMigrateReplicationCursor(sc *cli.Subcommand, args []string) error {
return fmt.Errorf("exiting migration after error")
}
ctx := context.Background()
v1cursorJobs := make([]job.Job, 0, len(cfg.Jobs))
for i, j := range cfg.Jobs {
if jobs[i].Name() != j.Name() {

View File

@ -1,67 +1,10 @@
package client
import (
"errors"
"log"
"os"
"github.com/zrepl/zrepl/cli"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon"
)
var pprofArgs struct {
daemon.PprofServerControlMsg
}
import "github.com/zrepl/zrepl/cli"
var PprofCmd = &cli.Subcommand{
Use: "pprof off | [on TCP_LISTEN_ADDRESS]",
Short: "start a http server exposing go-tool-compatible profiling endpoints at TCP_LISTEN_ADDRESS",
Run: func(subcommand *cli.Subcommand, args []string) error {
if len(args) < 1 {
goto enargs
}
switch args[0] {
case "on":
pprofArgs.Run = true
if len(args) != 2 {
return errors.New("must specify TCP_LISTEN_ADDRESS as second positional argument")
}
pprofArgs.HttpListenAddress = args[1]
case "off":
if len(args) != 1 {
goto enargs
}
pprofArgs.Run = false
}
RunPProf(subcommand.Config())
return nil
enargs:
return errors.New("invalid number of positional arguments")
Use: "pprof",
SetupSubcommands: func() []*cli.Subcommand {
return []*cli.Subcommand{PprofListenCmd, pprofActivityTraceCmd}
},
}
func RunPProf(conf *config.Config) {
log := log.New(os.Stderr, "", 0)
die := func() {
log.Printf("exiting after error")
os.Exit(1)
}
log.Printf("connecting to zrepl daemon")
httpc, err := controlHttpClient(conf.Global.Control.SockPath)
if err != nil {
log.Printf("error creating http client: %s", err)
die()
}
err = jsonRequestResponse(httpc, daemon.ControlJobEndpointPProf, pprofArgs.PprofServerControlMsg, struct{}{})
if err != nil {
log.Printf("error sending control message: %s", err)
die()
}
log.Printf("finished")
}

View File

@ -0,0 +1,44 @@
package client
import (
"context"
"io"
"log"
"os"
"golang.org/x/net/websocket"
"github.com/zrepl/zrepl/cli"
)
var pprofActivityTraceCmd = &cli.Subcommand{
Use: "activity-trace ZREPL_PPROF_HOST:ZREPL_PPROF_PORT",
Short: "attach to zrepl daemon with activated pprof listener and dump an activity-trace to stdout",
Run: runPProfActivityTrace,
}
func runPProfActivityTrace(ctx context.Context, subcommand *cli.Subcommand, args []string) error {
log := log.New(os.Stderr, "", 0)
die := func() {
log.Printf("exiting after error")
os.Exit(1)
}
if len(args) != 1 {
log.Printf("exactly one positional argument is required")
die()
}
url := "ws://" + args[0] + "/debug/zrepl/activity-trace" // FIXME dont' repeat that
log.Printf("attaching to activity trace stream %s", url)
ws, err := websocket.Dial(url, "", url)
if err != nil {
log.Printf("error: %s", err)
die()
}
_, err = io.Copy(os.Stdout, ws)
return err
}

68
client/pprof_listen.go Normal file
View File

@ -0,0 +1,68 @@
package client
import (
"context"
"errors"
"log"
"os"
"github.com/zrepl/zrepl/cli"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon"
)
var pprofListenCmd struct {
daemon.PprofServerControlMsg
}
var PprofListenCmd = &cli.Subcommand{
Use: "listen off | [on TCP_LISTEN_ADDRESS]",
Short: "start a http server exposing go-tool-compatible profiling endpoints at TCP_LISTEN_ADDRESS",
Run: func(ctx context.Context, subcommand *cli.Subcommand, args []string) error {
if len(args) < 1 {
goto enargs
}
switch args[0] {
case "on":
pprofListenCmd.Run = true
if len(args) != 2 {
return errors.New("must specify TCP_LISTEN_ADDRESS as second positional argument")
}
pprofListenCmd.HttpListenAddress = args[1]
case "off":
if len(args) != 1 {
goto enargs
}
pprofListenCmd.Run = false
}
RunPProf(subcommand.Config())
return nil
enargs:
return errors.New("invalid number of positional arguments")
},
}
func RunPProf(conf *config.Config) {
log := log.New(os.Stderr, "", 0)
die := func() {
log.Printf("exiting after error")
os.Exit(1)
}
log.Printf("connecting to zrepl daemon")
httpc, err := controlHttpClient(conf.Global.Control.SockPath)
if err != nil {
log.Printf("error creating http client: %s", err)
die()
}
err = jsonRequestResponse(httpc, daemon.ControlJobEndpointPProf, pprofListenCmd.PprofServerControlMsg, struct{}{})
if err != nil {
log.Printf("error sending control message: %s", err)
die()
}
log.Printf("finished")
}

View File

@ -1,6 +1,8 @@
package client
import (
"context"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/cli"
@ -11,7 +13,7 @@ import (
var SignalCmd = &cli.Subcommand{
Use: "signal [wakeup|reset] JOB",
Short: "wake up a job from wait state or abort its current invocation",
Run: func(subcommand *cli.Subcommand, args []string) error {
Run: func(ctx context.Context, subcommand *cli.Subcommand, args []string) error {
return runSignalCmd(subcommand.Config(), args)
},
}

View File

@ -1,6 +1,7 @@
package client
import (
"context"
"fmt"
"io"
"math"
@ -180,7 +181,7 @@ var StatusCmd = &cli.Subcommand{
Run: runStatus,
}
func runStatus(s *cli.Subcommand, args []string) error {
func runStatus(ctx context.Context, s *cli.Subcommand, args []string) error {
httpc, err := controlHttpClient(s.Config().Global.Control.SockPath)
if err != nil {
return err

View File

@ -17,7 +17,7 @@ import (
var StdinserverCmd = &cli.Subcommand{
Use: "stdinserver CLIENT_IDENTITY",
Short: "stdinserver transport mode (started from authorized_keys file as forced command)",
Run: func(subcommand *cli.Subcommand, args []string) error {
Run: func(ctx context.Context, subcommand *cli.Subcommand, args []string) error {
return runStdinserver(subcommand.Config(), args)
},
}

View File

@ -39,7 +39,7 @@ var testFilter = &cli.Subcommand{
Run: runTestFilterCmd,
}
func runTestFilterCmd(subcommand *cli.Subcommand, args []string) error {
func runTestFilterCmd(ctx context.Context, subcommand *cli.Subcommand, args []string) error {
if testFilterArgs.job == "" {
return fmt.Errorf("must specify --job flag")
@ -49,7 +49,6 @@ func runTestFilterCmd(subcommand *cli.Subcommand, args []string) error {
}
conf := subcommand.Config()
ctx := context.Background()
var confFilter config.FilesystemsFilter
job, err := conf.Job(testFilterArgs.job)
@ -137,10 +136,9 @@ var testPlaceholder = &cli.Subcommand{
Run: runTestPlaceholder,
}
func runTestPlaceholder(subcommand *cli.Subcommand, args []string) error {
func runTestPlaceholder(ctx context.Context, subcommand *cli.Subcommand, args []string) error {
var checkDPs []*zfs.DatasetPath
ctx := context.Background()
// all actions first
if testPlaceholderArgs.all {
@ -197,11 +195,11 @@ var testDecodeResumeToken = &cli.Subcommand{
Run: runTestDecodeResumeTokenCmd,
}
func runTestDecodeResumeTokenCmd(subcommand *cli.Subcommand, args []string) error {
func runTestDecodeResumeTokenCmd(ctx context.Context, subcommand *cli.Subcommand, args []string) error {
if testDecodeResumeTokenArgs.token == "" {
return fmt.Errorf("token argument must be specified")
}
token, err := zfs.ParseResumeToken(context.Background(), testDecodeResumeTokenArgs.token)
token, err := zfs.ParseResumeToken(ctx, testDecodeResumeTokenArgs.token)
if err != nil {
return err
}

View File

@ -1,6 +1,7 @@
package client
import (
"context"
"fmt"
"os"
@ -25,7 +26,7 @@ var VersionCmd = &cli.Subcommand{
SetupFlags: func(f *pflag.FlagSet) {
f.StringVar(&versionArgs.Show, "show", "", "version info to show (client|daemon)")
},
Run: func(subcommand *cli.Subcommand, args []string) error {
Run: func(ctx context.Context, subcommand *cli.Subcommand, args []string) error {
versionArgs.Config = subcommand.Config()
versionArgs.ConfigErr = subcommand.ConfigParsingError()
return runVersionCmd()

View File

@ -28,7 +28,7 @@ var zabsCmdCreateStepHold = &cli.Subcommand{
},
}
func doZabsCreateStep(sc *cli.Subcommand, args []string) error {
func doZabsCreateStep(ctx context.Context, sc *cli.Subcommand, args []string) error {
if len(args) > 0 {
return errors.New("subcommand takes no arguments")
}
@ -44,8 +44,6 @@ func doZabsCreateStep(sc *cli.Subcommand, args []string) error {
return errors.Errorf("jobid must be set")
}
ctx := context.Background()
v, err := zfs.ZFSGetFilesystemVersion(ctx, f.target)
if err != nil {
return errors.Wrapf(err, "get info about target %q", f.target)

View File

@ -32,9 +32,8 @@ var zabsCmdList = &cli.Subcommand{
},
}
func doZabsList(sc *cli.Subcommand, args []string) error {
func doZabsList(ctx context.Context, sc *cli.Subcommand, args []string) error {
var err error
ctx := context.Background()
if len(args) > 0 {
return errors.New("this subcommand takes no positional arguments")

View File

@ -43,9 +43,8 @@ var zabsCmdReleaseStale = &cli.Subcommand{
SetupFlags: registerZabsReleaseFlags,
}
func doZabsReleaseAll(sc *cli.Subcommand, args []string) error {
func doZabsReleaseAll(ctx context.Context, sc *cli.Subcommand, args []string) error {
var err error
ctx := context.Background()
if len(args) > 0 {
return errors.New("this subcommand takes no positional arguments")
@ -68,10 +67,9 @@ func doZabsReleaseAll(sc *cli.Subcommand, args []string) error {
return doZabsRelease_Common(ctx, abstractions)
}
func doZabsReleaseStale(sc *cli.Subcommand, args []string) error {
func doZabsReleaseStale(ctx context.Context, sc *cli.Subcommand, args []string) error {
var err error
ctx := context.Background()
if len(args) > 0 {
return errors.New("this subcommand takes no positional arguments")

View File

@ -12,6 +12,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/job"
@ -23,9 +24,8 @@ import (
"github.com/zrepl/zrepl/zfs/zfscmd"
)
func Run(conf *config.Config) error {
ctx, cancel := context.WithCancel(context.Background())
func Run(ctx context.Context, conf *config.Config) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sigChan := make(chan os.Signal, 1)
@ -39,6 +39,7 @@ func Run(conf *config.Config) error {
if err != nil {
return errors.Wrap(err, "cannot build logging from config")
}
outlets.Add(newPrometheusLogOutlet(), logger.Debug)
confJobs, err := job.JobsFromConfig(conf)
if err != nil {
@ -48,14 +49,23 @@ func Run(conf *config.Config) error {
log := logger.NewLogger(outlets, 1*time.Second)
log.Info(version.NewZreplVersionInformation().String())
ctx = logging.WithLoggers(ctx, logging.SubsystemLoggersWithUniversalLogger(log))
trace.RegisterCallback(trace.Callback{
OnBegin: func(ctx context.Context) { logging.GetLogger(ctx, logging.SubsysTraceData).Debug("begin span") },
OnEnd: func(ctx context.Context, spanInfo trace.SpanInfo) {
logging.
GetLogger(ctx, logging.SubsysTraceData).
WithField("duration_s", spanInfo.EndedAt().Sub(spanInfo.StartedAt()).Seconds()).
Debug("finished span " + spanInfo.TaskAndSpanStack(trace.SpanStackKindAnnotation))
},
})
for _, job := range confJobs {
if IsInternalJobName(job.Name()) {
panic(fmt.Sprintf("internal job name used for config job '%s'", job.Name())) //FIXME
}
}
ctx = job.WithLogger(ctx, log)
jobs := newJobs()
// start control socket
@ -84,6 +94,7 @@ func Run(conf *config.Config) error {
// register global (=non job-local) metrics
zfscmd.RegisterMetrics(prometheus.DefaultRegisterer)
trace.RegisterMetrics(prometheus.DefaultRegisterer)
log.Info("starting daemon")
@ -98,6 +109,8 @@ func Run(conf *config.Config) error {
case <-ctx.Done():
log.WithError(ctx.Err()).Info("context finished")
}
log.Info("waiting for jobs to finish")
<-jobs.wait()
log.Info("daemon exiting")
return nil
}
@ -120,14 +133,11 @@ func newJobs() *jobs {
}
}
const (
logJobField string = "job"
)
func (s *jobs) wait() <-chan struct{} {
ch := make(chan struct{})
go func() {
s.wg.Wait()
close(ch)
}()
return ch
}
@ -202,9 +212,8 @@ func (s *jobs) start(ctx context.Context, j job.Job, internal bool) {
s.m.Lock()
defer s.m.Unlock()
jobLog := job.GetLogger(ctx).
WithField(logJobField, j.Name()).
WithOutlet(newPrometheusLogOutlet(j.Name()), logger.Debug)
ctx = logging.WithInjectedField(ctx, logging.JobField, j.Name())
jobName := j.Name()
if !internal && IsInternalJobName(jobName) {
panic(fmt.Sprintf("internal job name used for non-internal job %s", jobName))
@ -219,7 +228,6 @@ func (s *jobs) start(ctx context.Context, j job.Job, internal bool) {
j.RegisterMetrics(prometheus.DefaultRegisterer)
s.jobs[jobName] = j
ctx = job.WithLogger(ctx, jobLog)
ctx = zfscmd.WithJobID(ctx, j.Name())
ctx, wakeup := wakeup.Context(ctx)
ctx, resetFunc := reset.Context(ctx)
@ -229,8 +237,8 @@ func (s *jobs) start(ctx context.Context, j job.Job, internal bool) {
s.wg.Add(1)
go func() {
defer s.wg.Done()
jobLog.Info("starting job")
defer jobLog.Info("job exited")
job.GetLogger(ctx).Info("starting job")
defer job.GetLogger(ctx).Info("job exited")
j.Run(ctx)
}()
}

View File

@ -6,29 +6,17 @@ import (
"context"
"sync"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/util/envconst"
)
type contextKey int
const (
contextKeyLog contextKey = 0
)
type Logger = logger.Logger
func WithLogger(ctx context.Context, log Logger) context.Context {
return context.WithValue(ctx, contextKeyLog, log)
}
func GetLogger(ctx context.Context) Logger { return getLogger(ctx) }
func getLogger(ctx context.Context) Logger {
if log, ok := ctx.Value(contextKeyLog).(Logger); ok {
return log
}
return logger.NewNullLogger()
return logging.GetLogger(ctx, logging.SubsysHooks)
}
const MAX_HOOK_LOG_SIZE_DEFAULT int = 1 << 20

View File

@ -10,9 +10,11 @@ import (
"text/template"
"github.com/stretchr/testify/require"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/hooks"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/zfs"
)
@ -69,6 +71,9 @@ func curry(f comparisonAssertionFunc, expected interface{}, right bool) (ret val
}
func TestHooks(t *testing.T) {
ctx, end := trace.WithTaskFromStack(context.Background())
defer end()
testFSName := "testpool/testdataset"
testSnapshotName := "testsnap"
@ -418,9 +423,8 @@ jobs:
cbReached = false
ctx := context.Background()
if testing.Verbose() && !tt.SuppressOutput {
ctx = hooks.WithLogger(ctx, log)
ctx = logging.WithLoggers(ctx, logging.SubsystemLoggersWithUniversalLogger(log))
}
plan.Run(ctx, false)
report := plan.Report()

View File

@ -8,12 +8,13 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/filters"
"github.com/zrepl/zrepl/daemon/job/reset"
"github.com/zrepl/zrepl/daemon/job/wakeup"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/daemon/pruner"
"github.com/zrepl/zrepl/daemon/snapper"
"github.com/zrepl/zrepl/endpoint"
@ -79,7 +80,7 @@ func (a *ActiveSide) updateTasks(u func(*activeSideTasks)) activeSideTasks {
}
type activeMode interface {
ConnectEndpoints(rpcLoggers rpc.Loggers, connecter transport.Connecter)
ConnectEndpoints(ctx context.Context, connecter transport.Connecter)
DisconnectEndpoints()
SenderReceiver() (logic.Sender, logic.Receiver)
Type() Type
@ -98,14 +99,14 @@ type modePush struct {
snapper *snapper.PeriodicOrManual
}
func (m *modePush) ConnectEndpoints(loggers rpc.Loggers, connecter transport.Connecter) {
func (m *modePush) ConnectEndpoints(ctx context.Context, connecter transport.Connecter) {
m.setupMtx.Lock()
defer m.setupMtx.Unlock()
if m.receiver != nil || m.sender != nil {
panic("inconsistent use of ConnectEndpoints and DisconnectEndpoints")
}
m.sender = endpoint.NewSender(*m.senderConfig)
m.receiver = rpc.NewClient(connecter, loggers)
m.receiver = rpc.NewClient(connecter, rpc.GetLoggersOrPanic(ctx))
}
func (m *modePush) DisconnectEndpoints() {
@ -176,14 +177,14 @@ type modePull struct {
interval config.PositiveDurationOrManual
}
func (m *modePull) ConnectEndpoints(loggers rpc.Loggers, connecter transport.Connecter) {
func (m *modePull) ConnectEndpoints(ctx context.Context, connecter transport.Connecter) {
m.setupMtx.Lock()
defer m.setupMtx.Unlock()
if m.receiver != nil || m.sender != nil {
panic("inconsistent use of ConnectEndpoints and DisconnectEndpoints")
}
m.receiver = endpoint.NewReceiver(m.receiverConfig)
m.sender = rpc.NewClient(connecter, loggers)
m.sender = rpc.NewClient(connecter, rpc.GetLoggersOrPanic(ctx))
}
func (m *modePull) DisconnectEndpoints() {
@ -376,15 +377,18 @@ func (j *ActiveSide) SenderConfig() *endpoint.SenderConfig {
}
func (j *ActiveSide) Run(ctx context.Context) {
ctx, endTask := trace.WithTaskAndSpan(ctx, "active-side-job", j.Name())
defer endTask()
log := GetLogger(ctx)
ctx = logging.WithSubsystemLoggers(ctx, log)
defer log.Info("job exiting")
periodicDone := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go j.mode.RunPeriodic(ctx, periodicDone)
periodicCtx, endTask := trace.WithTask(ctx, "periodic")
defer endTask()
go j.mode.RunPeriodic(periodicCtx, periodicDone)
invocationCount := 0
outer:
@ -400,17 +404,15 @@ outer:
case <-periodicDone:
}
invocationCount++
invLog := log.WithField("invocation", invocationCount)
j.do(WithLogger(ctx, invLog))
invocationCtx, endSpan := trace.WithSpan(ctx, fmt.Sprintf("invocation-%d", invocationCount))
j.do(invocationCtx)
endSpan()
}
}
func (j *ActiveSide) do(ctx context.Context) {
log := GetLogger(ctx)
ctx = logging.WithSubsystemLoggers(ctx, log)
loggers := rpc.GetLoggersOrPanic(ctx) // filled by WithSubsystemLoggers
j.mode.ConnectEndpoints(loggers, j.connecter)
j.mode.ConnectEndpoints(ctx, j.connecter)
defer j.mode.DisconnectEndpoints()
// allow cancellation of an invocation (this function)
@ -433,20 +435,22 @@ func (j *ActiveSide) do(ctx context.Context) {
return
default:
}
ctx, endSpan := trace.WithSpan(ctx, "replication")
ctx, repCancel := context.WithCancel(ctx)
var repWait driver.WaitFunc
j.updateTasks(func(tasks *activeSideTasks) {
// reset it
*tasks = activeSideTasks{}
tasks.replicationCancel = repCancel
tasks.replicationCancel = func() { repCancel(); endSpan() }
tasks.replicationReport, repWait = replication.Do(
ctx, logic.NewPlanner(j.promRepStateSecs, j.promBytesReplicated, sender, receiver, j.mode.PlannerPolicy()),
)
tasks.state = ActiveSideReplicating
})
log.Info("start replication")
GetLogger(ctx).Info("start replication")
repWait(true) // wait blocking
repCancel() // always cancel to free up context resources
endSpan()
}
{
@ -455,16 +459,18 @@ func (j *ActiveSide) do(ctx context.Context) {
return
default:
}
ctx, endSpan := trace.WithSpan(ctx, "prune_sender")
ctx, senderCancel := context.WithCancel(ctx)
tasks := j.updateTasks(func(tasks *activeSideTasks) {
tasks.prunerSender = j.prunerFactory.BuildSenderPruner(ctx, sender, sender)
tasks.prunerSenderCancel = senderCancel
tasks.prunerSenderCancel = func() { senderCancel(); endSpan() }
tasks.state = ActiveSidePruneSender
})
log.Info("start pruning sender")
GetLogger(ctx).Info("start pruning sender")
tasks.prunerSender.Prune()
log.Info("finished pruning sender")
GetLogger(ctx).Info("finished pruning sender")
senderCancel()
endSpan()
}
{
select {
@ -472,16 +478,18 @@ func (j *ActiveSide) do(ctx context.Context) {
return
default:
}
ctx, endSpan := trace.WithSpan(ctx, "prune_recever")
ctx, receiverCancel := context.WithCancel(ctx)
tasks := j.updateTasks(func(tasks *activeSideTasks) {
tasks.prunerReceiver = j.prunerFactory.BuildReceiverPruner(ctx, receiver, sender)
tasks.prunerReceiverCancel = receiverCancel
tasks.prunerReceiverCancel = func() { receiverCancel(); endSpan() }
tasks.state = ActiveSidePruneReceiver
})
log.Info("start pruning receiver")
GetLogger(ctx).Info("start pruning receiver")
tasks.prunerReceiver.Prune()
log.Info("finished pruning receiver")
GetLogger(ctx).Info("finished pruning receiver")
receiverCancel()
endSpan()
}
j.updateTasks(func(tasks *activeSideTasks) {

View File

@ -7,6 +7,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/zfs"
@ -14,21 +15,8 @@ import (
type Logger = logger.Logger
type contextKey int
const (
contextKeyLog contextKey = iota
)
func GetLogger(ctx context.Context) Logger {
if l, ok := ctx.Value(contextKeyLog).(Logger); ok {
return l
}
return logger.NewNullLogger()
}
func WithLogger(ctx context.Context, l Logger) context.Context {
return context.WithValue(ctx, contextKeyLog, l)
return logging.GetLogger(ctx, logging.SubsysJob)
}
type Job interface {

View File

@ -6,6 +6,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/filters"
@ -164,12 +165,14 @@ func (j *PassiveSide) SenderConfig() *endpoint.SenderConfig {
func (*PassiveSide) RegisterMetrics(registerer prometheus.Registerer) {}
func (j *PassiveSide) Run(ctx context.Context) {
ctx, endTask := trace.WithTaskAndSpan(ctx, "passive-side-job", j.Name())
defer endTask()
log := GetLogger(ctx)
defer log.Info("job exiting")
ctx = logging.WithSubsystemLoggers(ctx, log)
{
ctx, cancel := context.WithCancel(ctx) // shadowing
ctx, endTask := trace.WithTask(ctx, "periodic") // shadowing
defer endTask()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go j.mode.RunPeriodic(ctx)
}
@ -179,8 +182,14 @@ func (j *PassiveSide) Run(ctx context.Context) {
panic(fmt.Sprintf("implementation error: j.mode.Handler() returned nil: %#v", j))
}
ctxInterceptor := func(handlerCtx context.Context) context.Context {
return logging.WithSubsystemLoggers(handlerCtx, log)
ctxInterceptor := func(handlerCtx context.Context, info rpc.HandlerContextInterceptorData, handler func(ctx context.Context)) {
// the handlerCtx is clean => need to inherit logging and tracing config from job context
handlerCtx = logging.WithInherit(handlerCtx, ctx)
handlerCtx = trace.WithInherit(handlerCtx, ctx)
handlerCtx, endTask := trace.WithTaskAndSpan(handlerCtx, "handler", fmt.Sprintf("job=%q client=%q method=%q", j.Name(), info.ClientIdentity(), info.FullMethod()))
defer endTask()
handler(handlerCtx)
}
rpcLoggers := rpc.GetLoggersOrPanic(ctx) // WithSubsystemLoggers above

View File

@ -2,15 +2,16 @@ package job
import (
"context"
"fmt"
"sort"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/filters"
"github.com/zrepl/zrepl/daemon/job/wakeup"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/daemon/pruner"
"github.com/zrepl/zrepl/daemon/snapper"
"github.com/zrepl/zrepl/endpoint"
@ -89,15 +90,18 @@ func (j *SnapJob) OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) {
func (j *SnapJob) SenderConfig() *endpoint.SenderConfig { return nil }
func (j *SnapJob) Run(ctx context.Context) {
ctx, endTask := trace.WithTaskAndSpan(ctx, "snap-job", j.Name())
defer endTask()
log := GetLogger(ctx)
ctx = logging.WithSubsystemLoggers(ctx, log)
defer log.Info("job exiting")
periodicDone := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go j.snapper.Run(ctx, periodicDone)
periodicCtx, endTask := trace.WithTask(ctx, "snapshotting")
defer endTask()
go j.snapper.Run(periodicCtx, periodicDone)
invocationCount := 0
outer:
@ -112,8 +116,10 @@ outer:
case <-periodicDone:
}
invocationCount++
invLog := log.WithField("invocation", invocationCount)
j.doPrune(WithLogger(ctx, invLog))
invocationCtx, endSpan := trace.WithSpan(ctx, fmt.Sprintf("invocation-%d", invocationCount))
j.doPrune(invocationCtx)
endSpan()
}
}
@ -161,8 +167,9 @@ func (h alwaysUpToDateReplicationCursorHistory) ListFilesystems(ctx context.Cont
}
func (j *SnapJob) doPrune(ctx context.Context) {
ctx, endSpan := trace.WithSpan(ctx, "snap-job-do-prune")
defer endSpan()
log := GetLogger(ctx)
ctx = logging.WithSubsystemLoggers(ctx, log)
sender := endpoint.NewSender(endpoint.SenderConfig{
JobID: j.name,
FSF: j.fsfilter,

View File

@ -9,20 +9,10 @@ import (
"github.com/mattn/go-isatty"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/hooks"
"github.com/zrepl/zrepl/daemon/pruner"
"github.com/zrepl/zrepl/daemon/snapper"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/replication/driver"
"github.com/zrepl/zrepl/replication/logic"
"github.com/zrepl/zrepl/rpc"
"github.com/zrepl/zrepl/rpc/transportmux"
"github.com/zrepl/zrepl/tlsconf"
"github.com/zrepl/zrepl/transport"
"github.com/zrepl/zrepl/zfs/zfscmd"
)
func OutletsFromConfig(in config.LoggingOutletEnumList) (*logger.Outlets, error) {
@ -70,6 +60,8 @@ func OutletsFromConfig(in config.LoggingOutletEnumList) (*logger.Outlets, error)
type Subsystem string
const (
SubsysMeta Subsystem = "meta"
SubsysJob Subsystem = "job"
SubsysReplication Subsystem = "repl"
SubsysEndpoint Subsystem = "endpoint"
SubsysPruning Subsystem = "pruning"
@ -81,30 +73,103 @@ const (
SubsysRPCControl Subsystem = "rpc.ctrl"
SubsysRPCData Subsystem = "rpc.data"
SubsysZFSCmd Subsystem = "zfs.cmd"
SubsysTraceData Subsystem = "trace.data"
SubsysPlatformtest Subsystem = "platformtest"
)
func WithSubsystemLoggers(ctx context.Context, log logger.Logger) context.Context {
ctx = logic.WithLogger(ctx, log.WithField(SubsysField, SubsysReplication))
ctx = driver.WithLogger(ctx, log.WithField(SubsysField, SubsysReplication))
ctx = endpoint.WithLogger(ctx, log.WithField(SubsysField, SubsysEndpoint))
ctx = pruner.WithLogger(ctx, log.WithField(SubsysField, SubsysPruning))
ctx = snapper.WithLogger(ctx, log.WithField(SubsysField, SubsysSnapshot))
ctx = hooks.WithLogger(ctx, log.WithField(SubsysField, SubsysHooks))
ctx = transport.WithLogger(ctx, log.WithField(SubsysField, SubsysTransport))
ctx = transportmux.WithLogger(ctx, log.WithField(SubsysField, SubsysTransportMux))
ctx = zfscmd.WithLogger(ctx, log.WithField(SubsysField, SubsysZFSCmd))
ctx = rpc.WithLoggers(ctx,
rpc.Loggers{
General: log.WithField(SubsysField, SubsysRPC),
Control: log.WithField(SubsysField, SubsysRPCControl),
Data: log.WithField(SubsysField, SubsysRPCData),
},
)
return ctx
var AllSubsystems = []Subsystem{
SubsysMeta,
SubsysJob,
SubsysReplication,
SubsysEndpoint,
SubsysPruning,
SubsysSnapshot,
SubsysHooks,
SubsysTransport,
SubsysTransportMux,
SubsysRPC,
SubsysRPCControl,
SubsysRPCData,
SubsysZFSCmd,
SubsysTraceData,
SubsysPlatformtest,
}
func LogSubsystem(log logger.Logger, subsys Subsystem) logger.Logger {
return log.ReplaceField(SubsysField, subsys)
type injectedField struct {
field string
value interface{}
parent *injectedField
}
func WithInjectedField(ctx context.Context, field string, value interface{}) context.Context {
var parent *injectedField
parentI := ctx.Value(contextKeyInjectedField)
if parentI != nil {
parent = parentI.(*injectedField)
}
// TODO sanity-check `field` now
this := &injectedField{field, value, parent}
return context.WithValue(ctx, contextKeyInjectedField, this)
}
func iterInjectedFields(ctx context.Context, cb func(field string, value interface{})) {
injI := ctx.Value(contextKeyInjectedField)
if injI == nil {
return
}
inj := injI.(*injectedField)
for ; inj != nil; inj = inj.parent {
cb(inj.field, inj.value)
}
}
type SubsystemLoggers map[Subsystem]logger.Logger
func SubsystemLoggersWithUniversalLogger(l logger.Logger) SubsystemLoggers {
loggers := make(SubsystemLoggers)
for _, s := range AllSubsystems {
loggers[s] = l
}
return loggers
}
func WithLoggers(ctx context.Context, loggers SubsystemLoggers) context.Context {
return context.WithValue(ctx, contextKeyLoggers, loggers)
}
func GetLoggers(ctx context.Context) SubsystemLoggers {
loggers, ok := ctx.Value(contextKeyLoggers).(SubsystemLoggers)
if !ok {
return nil
}
return loggers
}
func GetLogger(ctx context.Context, subsys Subsystem) logger.Logger {
return getLoggerImpl(ctx, subsys, true)
}
func getLoggerImpl(ctx context.Context, subsys Subsystem, panicIfEnded bool) logger.Logger {
loggers, ok := ctx.Value(contextKeyLoggers).(SubsystemLoggers)
if !ok || loggers == nil {
return logger.NewNullLogger()
}
l, ok := loggers[subsys]
if !ok {
return logger.NewNullLogger()
}
l = l.WithField(SubsysField, subsys)
l = l.WithField(SpanField, trace.GetSpanStackOrDefault(ctx, *trace.StackKindId, "NOSPAN"))
fields := make(logger.Fields)
iterInjectedFields(ctx, func(field string, value interface{}) {
fields[field] = value
})
l = l.WithFields(fields)
return l
}
func parseLogFormat(i interface{}) (f EntryFormatter, err error) {

View File

@ -0,0 +1,24 @@
package logging
import "context"
type contextKey int
const (
contextKeyLoggers contextKey = 1 + iota
contextKeyInjectedField
)
var contextKeys = []contextKey{
contextKeyLoggers,
contextKeyInjectedField,
}
func WithInherit(ctx, inheritFrom context.Context) context.Context {
for _, k := range contextKeys {
if v := inheritFrom.Value(k); v != nil {
ctx = context.WithValue(ctx, k, v) // no shadow
}
}
return ctx
}

View File

@ -22,6 +22,7 @@ const (
const (
JobField string = "job"
SubsysField string = "subsystem"
SpanField string = "span"
)
type MetadataFlags int64
@ -85,7 +86,7 @@ func (f *HumanFormatter) Format(e *logger.Entry) (out []byte, err error) {
fmt.Fprintf(&line, "[%s]", col.Sprint(e.Level.Short()))
}
prefixFields := []string{JobField, SubsysField}
prefixFields := []string{JobField, SubsysField, SpanField}
prefixed := make(map[string]bool, len(prefixFields)+2)
for _, field := range prefixFields {
val, ok := e.Fields[field]
@ -174,8 +175,8 @@ func (f *LogfmtFormatter) Format(e *logger.Entry) ([]byte, error) {
}
// at least try and put job and task in front
prefixed := make(map[string]bool, 2)
prefix := []string{JobField, SubsysField}
prefixed := make(map[string]bool, 3)
prefix := []string{JobField, SubsysField, SpanField}
for _, pf := range prefix {
v, ok := e.Fields[pf]
if !ok {

View File

@ -0,0 +1,386 @@
// package trace provides activity tracing via ctx through Tasks and Spans
//
// Basic Concepts
//
// Tracing can be used to identify where a piece of code spends its time.
//
// The Go standard library provides package runtime/trace which is useful to identify CPU bottlenecks or
// to understand what happens inside the Go runtime.
// However, it is not ideal for application level tracing, in particular if those traces should be understandable
// to tech-savvy users (albeit not developers).
//
// This package provides the concept of Tasks and Spans to express what activity is happening within an application:
//
// - Neither task nor span is really tangible but instead contained within the context.Context tree
// - Tasks represent concurrent activity (i.e. goroutines).
// - Spans represent a semantic stack trace within a task.
//
// As a consequence, whenever a context is propagated across goroutine boundary, you need to create a child task:
//
// go func(ctx context.Context) {
// ctx, endTask = WithTask(ctx, "what-happens-inside-the-child-task")
// defer endTask()
// // ...
// }(ctx)
//
// Within the task, you can open up a hierarchy of spans.
// In contrast to tasks, which have can multiple concurrently running child tasks,
// spans must nest and not cross the goroutine boundary.
//
// ctx, endSpan = WithSpan(ctx, "copy-dir")
// defer endSpan()
// for _, f := range dir.Files() {
// func() {
// ctx, endSpan := WithSpan(ctx, fmt.Sprintf("copy-file %q", f))
// defer endspan()
// b, _ := ioutil.ReadFile(f)
// _ = ioutil.WriteFile(f + ".copy", b, 0600)
// }()
// }
//
// In combination:
// ctx, endTask = WithTask(ctx, "copy-dirs")
// defer endTask()
// for i := range dirs {
// go func(dir string) {
// ctx, endTask := WithTask(ctx, "copy-dir")
// defer endTask()
// for _, f := range filesIn(dir) {
// func() {
// ctx, endSpan := WithSpan(ctx, fmt.Sprintf("copy-file %q", f))
// defer endspan()
// b, _ := ioutil.ReadFile(f)
// _ = ioutil.WriteFile(f + ".copy", b, 0600)
// }()
// }
// }()
// }
//
// Note that a span ends at the time you call endSpan - not before and not after that.
// If you violate the stack-like nesting of spans by forgetting an endSpan() invocation,
// the out-of-order endSpan() will panic.
//
// A similar rule applies to the endTask closure returned by WithTask:
// If a task has live child tasks at the time you call endTask(), the call will panic.
//
// Recovering from endSpan() or endTask() panics will corrupt the trace stack and lead to corrupt tracefile output.
//
//
// Best Practices For Naming Tasks And Spans
//
// Tasks should always have string constants as names, and must not contain the `#` character. WHy?
// First, the visualization by chrome://tracing draws a horizontal bar for each task in the trace.
// Also, the package appends `#NUM` for each concurrently running instance of a task name.
// Note that the `#NUM` suffix will be reused if a task has ended, in order to avoid an
// infinite number of horizontal bars in the visualization.
//
//
// Chrome-compatible Tracefile Support
//
// The activity trace generated by usage of WithTask and WithSpan can be rendered to a JSON output file
// that can be loaded into chrome://tracing .
// Apart from function GetSpanStackOrDefault, this is the main benefit of this package.
//
// First, there is a convenience environment variable 'ZREPL_ACTIVITY_TRACE' that can be set to an output path.
// From process start onward, a trace is written to that path.
//
// More consumers can attach to the activity trace through the ChrometraceClientWebsocketHandler websocket handler.
//
// If a write error is encountered with any consumer (including the env-var based one), the consumer is closed and
// will not receive further trace output.
package trace
import (
"context"
"fmt"
"strings"
"time"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/util/chainlock"
)
var metrics struct {
activeTasks prometheus.Gauge
}
var taskNamer *uniqueConcurrentTaskNamer = newUniqueTaskNamer()
func init() {
metrics.activeTasks = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "zrepl",
Subsystem: "trace",
Name: "active_tasks",
Help: "number of active (tracing-level) tasks in the daemon",
})
}
func RegisterMetrics(r prometheus.Registerer) {
r.MustRegister(metrics.activeTasks)
}
type traceNode struct {
id string
annotation string
parentTask *traceNode
mtx chainlock.L
activeChildTasks int32 // only for task nodes, insignificant for span nodes
parentSpan *traceNode
activeChildSpan *traceNode // nil if task or span doesn't have an active child span
startedAt time.Time
endedAt time.Time
}
func (s *traceNode) StartedAt() time.Time { return s.startedAt }
func (s *traceNode) EndedAt() time.Time { return s.endedAt }
// Returned from WithTask or WithSpan.
// Must be called once the task or span ends.
// See package-level docs for nesting rules.
// Wrong call order / forgetting to call it will result in panics.
type DoneFunc func()
var ErrTaskStillHasActiveChildTasks = fmt.Errorf("end task: task still has active child tasks")
// Start a new root task or create a child task of an existing task.
//
// This is required when starting a new goroutine and
// passing an existing task context to it.
//
// taskName should be a constantand must not contain '#'
//
// The implementation ensures that,
// if multiple tasks with the same name exist simultaneously,
// a unique suffix is appended to uniquely identify the task opened with this function.
func WithTask(ctx context.Context, taskName string) (context.Context, DoneFunc) {
var parentTask *traceNode
nodeI := ctx.Value(contextKeyTraceNode)
if nodeI != nil {
node := nodeI.(*traceNode)
if node.parentSpan != nil {
parentTask = node.parentTask
} else {
parentTask = node
}
}
// find the first ancestor that hasn't ended yet (nil if need be)
if parentTask != nil {
parentTask.mtx.Lock()
}
for parentTask != nil && !parentTask.endedAt.IsZero() {
thisParent := parentTask
parentTask = parentTask.parentTask
// lock parent first such that it isn't modified by other callers to thisParent
if parentTask != nil {
parentTask.mtx.Lock()
}
thisParent.mtx.Unlock()
}
// invariant: either parentTask != nil and we hold the lock on parentTask, or parentTask is nil
taskName, taskNameDone := taskNamer.UniqueConcurrentTaskName(taskName)
this := &traceNode{
id: genID(),
annotation: taskName,
parentTask: parentTask,
activeChildTasks: 0,
parentSpan: nil,
activeChildSpan: nil,
startedAt: time.Now(),
endedAt: time.Time{},
}
if parentTask != nil {
this.parentTask.activeChildTasks++
parentTask.mtx.Unlock()
}
ctx = context.WithValue(ctx, contextKeyTraceNode, this)
chrometraceBeginTask(this)
metrics.activeTasks.Inc()
endTaskFunc := func() {
// only hold locks while manipulating the tree
// (trace writer might block too long and unlike spans, tasks are updated concurrently)
alreadyEnded := func() (alreadyEnded bool) {
if this.parentTask != nil {
defer this.parentTask.mtx.Lock().Unlock()
}
defer this.mtx.Lock().Unlock()
if this.activeChildTasks != 0 {
panic(errors.Wrapf(ErrTaskStillHasActiveChildTasks, "end task: %v active child tasks", this.activeChildTasks))
}
// support idempotent task ends
if !this.endedAt.IsZero() {
return true
}
this.endedAt = time.Now()
if this.parentTask != nil {
this.parentTask.activeChildTasks--
if this.parentTask.activeChildTasks < 0 {
panic("impl error: parent task with negative activeChildTasks count")
}
}
return false
}()
if alreadyEnded {
return
}
chrometraceEndTask(this)
metrics.activeTasks.Dec()
taskNameDone()
}
return ctx, endTaskFunc
}
var ErrAlreadyActiveChildSpan = fmt.Errorf("create child span: span already has an active child span")
var ErrSpanStillHasActiveChildSpan = fmt.Errorf("end span: span still has active child spans")
// Start a new span.
// Important: ctx must have an active task (see WithTask)
func WithSpan(ctx context.Context, annotation string) (context.Context, DoneFunc) {
var parentSpan, parentTask *traceNode
nodeI := ctx.Value(contextKeyTraceNode)
if nodeI != nil {
parentSpan = nodeI.(*traceNode)
if parentSpan.parentSpan == nil {
parentTask = parentSpan
} else {
parentTask = parentSpan.parentTask
}
} else {
panic("must be called from within a task")
}
this := &traceNode{
id: genID(),
annotation: annotation,
parentTask: parentTask,
parentSpan: parentSpan,
activeChildSpan: nil,
startedAt: time.Now(),
endedAt: time.Time{},
}
parentSpan.mtx.HoldWhile(func() {
if parentSpan.activeChildSpan != nil {
panic(ErrAlreadyActiveChildSpan)
}
parentSpan.activeChildSpan = this
})
ctx = context.WithValue(ctx, contextKeyTraceNode, this)
chrometraceBeginSpan(this)
callbackEndSpan := callbackBeginSpan(ctx)
endTaskFunc := func() {
defer parentSpan.mtx.Lock().Unlock()
if parentSpan.activeChildSpan != this && this.endedAt.IsZero() {
panic("impl error: activeChildSpan should not change while != nil because there can only be one")
}
defer this.mtx.Lock().Unlock()
if this.activeChildSpan != nil {
panic(ErrSpanStillHasActiveChildSpan)
}
if !this.endedAt.IsZero() {
return // support idempotent span ends
}
parentSpan.activeChildSpan = nil
this.endedAt = time.Now()
chrometraceEndSpan(this)
callbackEndSpan(this)
}
return ctx, endTaskFunc
}
type StackKind struct {
symbolizeTask func(t *traceNode) string
symbolizeSpan func(s *traceNode) string
}
var (
StackKindId = &StackKind{
symbolizeTask: func(t *traceNode) string { return t.id },
symbolizeSpan: func(s *traceNode) string { return s.id },
}
SpanStackKindCombined = &StackKind{
symbolizeTask: func(t *traceNode) string { return fmt.Sprintf("(%s %q)", t.id, t.annotation) },
symbolizeSpan: func(s *traceNode) string { return fmt.Sprintf("(%s %q)", s.id, s.annotation) },
}
SpanStackKindAnnotation = &StackKind{
symbolizeTask: func(t *traceNode) string { return t.annotation },
symbolizeSpan: func(s *traceNode) string { return s.annotation },
}
)
func (n *traceNode) task() *traceNode {
task := n.parentTask
if n.parentSpan == nil {
task = n
}
return task
}
func (n *traceNode) TaskName() string {
task := n.task()
return task.annotation
}
func (this *traceNode) TaskAndSpanStack(kind *StackKind) (spanIdStack string) {
task := this.task()
var spansInTask []*traceNode
for s := this; s != nil; s = s.parentSpan {
spansInTask = append(spansInTask, s)
}
var tasks []*traceNode
for t := task; t != nil; t = t.parentTask {
tasks = append(tasks, t)
}
var taskIdsRev []string
for i := len(tasks) - 1; i >= 0; i-- {
taskIdsRev = append(taskIdsRev, kind.symbolizeTask(tasks[i]))
}
var spanIdsRev []string
for i := len(spansInTask) - 1; i >= 0; i-- {
spanIdsRev = append(spanIdsRev, kind.symbolizeSpan(spansInTask[i]))
}
taskStack := strings.Join(taskIdsRev, "$")
return fmt.Sprintf("%s$%s", taskStack, strings.Join(spanIdsRev, "."))
}
func GetSpanStackOrDefault(ctx context.Context, kind StackKind, def string) string {
if nI := ctx.Value(contextKeyTraceNode); nI != nil {
n := nI.(*traceNode)
return n.TaskAndSpanStack(StackKindId)
} else {
return def
}
}

View File

@ -0,0 +1,54 @@
package trace
import (
"context"
"time"
"github.com/zrepl/zrepl/util/chainlock"
)
type SpanInfo interface {
StartedAt() time.Time
EndedAt() time.Time
TaskAndSpanStack(kind *StackKind) string
}
type Callback struct {
OnBegin func(ctx context.Context)
OnEnd func(ctx context.Context, spanInfo SpanInfo)
}
var callbacks struct {
mtx chainlock.L
cs []Callback
}
func RegisterCallback(c Callback) {
callbacks.mtx.HoldWhile(func() {
callbacks.cs = append(callbacks.cs, c)
})
}
func callbackBeginSpan(ctx context.Context) func(SpanInfo) {
// capture the current state of callbacks into a local variable
// this is safe because the slice is append-only and immutable
// (it is important that a callback registered _after_ callbackBeginSpin is called does not get called on OnEnd)
var cbs []Callback
callbacks.mtx.HoldWhile(func() {
cbs = callbacks.cs
})
for _, cb := range cbs {
if cb.OnBegin != nil {
cb.OnBegin(ctx)
}
}
return func(spanInfo SpanInfo) {
for _, cb := range cbs {
if cb.OnEnd != nil {
cb.OnEnd(ctx, spanInfo)
}
}
}
}

View File

@ -0,0 +1,230 @@
package trace
// The functions in this file are concerned with the generation
// of trace files based on the information from WithTask and WithSpan.
//
// The emitted trace files are open-ended array of JSON objects
// that follow the Chrome trace file format:
// https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/preview
//
// The emitted JSON can be loaded into Chrome's chrome://tracing view.
//
// The trace file can be written to a file whose path is specified in an env file,
// and be written to web sockets established on ChrometraceHttpHandler
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"os"
"sync"
"sync/atomic"
"golang.org/x/net/websocket"
"github.com/zrepl/zrepl/util/envconst"
)
var chrometracePID string
func init() {
var err error
chrometracePID, err = os.Hostname()
if err != nil {
panic(err)
}
chrometracePID = fmt.Sprintf("%q", chrometracePID)
}
type chrometraceEvent struct {
Cat string `json:"cat,omitempty"`
Name string `json:"name"`
Stack []string `json:"stack,omitempty"`
Phase string `json:"ph"`
TimestampUnixMicroseconds int64 `json:"ts"`
DurationMicroseconds int64 `json:"dur,omitempty"`
Pid string `json:"pid"`
Tid string `json:"tid"`
Id string `json:"id,omitempty"`
}
func chrometraceBeginSpan(s *traceNode) {
taskName := s.TaskName()
chrometraceWrite(chrometraceEvent{
Name: s.annotation,
Phase: "B",
TimestampUnixMicroseconds: s.startedAt.UnixNano() / 1000,
Pid: chrometracePID,
Tid: taskName,
})
}
func chrometraceEndSpan(s *traceNode) {
taskName := s.TaskName()
chrometraceWrite(chrometraceEvent{
Name: s.annotation,
Phase: "E",
TimestampUnixMicroseconds: s.endedAt.UnixNano() / 1000,
Pid: chrometracePID,
Tid: taskName,
})
}
var chrometraceFlowId uint64
func chrometraceBeginTask(s *traceNode) {
chrometraceBeginSpan(s)
if s.parentTask == nil {
return
}
// beginning of a task that has a parent
// => use flow events to link parent and child
flowId := atomic.AddUint64(&chrometraceFlowId, 1)
flowIdStr := fmt.Sprintf("%x", flowId)
parentTask := s.parentTask.TaskName()
chrometraceWrite(chrometraceEvent{
Cat: "task", // seems to be necessary, otherwise the GUI shows some `indexOf` JS error
Name: "child-task",
Phase: "s",
TimestampUnixMicroseconds: s.startedAt.UnixNano() / 1000, // yes, the child's timestamp (=> from-point of the flow line is at right x-position of parent's bar)
Pid: chrometracePID,
Tid: parentTask,
Id: flowIdStr,
})
childTask := s.TaskName()
if parentTask == childTask {
panic(parentTask)
}
chrometraceWrite(chrometraceEvent{
Cat: "task", // seems to be necessary, otherwise the GUI shows some `indexOf` JS error
Name: "child-task",
Phase: "f",
TimestampUnixMicroseconds: s.startedAt.UnixNano() / 1000,
Pid: chrometracePID,
Tid: childTask,
Id: flowIdStr,
})
}
func chrometraceEndTask(s *traceNode) {
chrometraceEndSpan(s)
}
type chrometraceConsumerRegistration struct {
w io.Writer
// errored must have capacity 1, the writer thread will send to it non-blocking, then close it
errored chan error
}
var chrometraceConsumers struct {
register chan chrometraceConsumerRegistration
consumers map[chrometraceConsumerRegistration]bool
write chan []byte
}
func init() {
chrometraceConsumers.register = make(chan chrometraceConsumerRegistration)
chrometraceConsumers.consumers = make(map[chrometraceConsumerRegistration]bool)
chrometraceConsumers.write = make(chan []byte)
go func() {
kickConsumer := func(c chrometraceConsumerRegistration, err error) {
debug("chrometrace kicking consumer %#v after error %v", c, err)
select {
case c.errored <- err:
default:
}
close(c.errored)
delete(chrometraceConsumers.consumers, c)
}
for {
select {
case reg := <-chrometraceConsumers.register:
debug("registered chrometrace consumer %#v", reg)
chrometraceConsumers.consumers[reg] = true
n, err := reg.w.Write([]byte("[\n"))
if err != nil {
kickConsumer(reg, err)
} else if n != 2 {
kickConsumer(reg, fmt.Errorf("short write: %v", n))
}
// successfully registered
case buf := <-chrometraceConsumers.write:
debug("chrometrace write request: %s", string(buf))
var r bytes.Reader
for c := range chrometraceConsumers.consumers {
r.Reset(buf)
n, err := io.Copy(c.w, &r)
debug("chrometrace wrote n=%v bytes to consumer %#v", n, c)
if err != nil {
kickConsumer(c, err)
}
}
}
}
}()
}
func chrometraceWrite(i interface{}) {
var buf bytes.Buffer
err := json.NewEncoder(&buf).Encode(i)
if err != nil {
panic(err)
}
buf.WriteString(",")
chrometraceConsumers.write <- buf.Bytes()
}
func ChrometraceClientWebsocketHandler(conn *websocket.Conn) {
defer conn.Close()
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func() {
defer wg.Done()
r := bufio.NewReader(conn)
_, _, _ = r.ReadLine() // ignore errors
conn.Close()
}()
errored := make(chan error, 1)
chrometraceConsumers.register <- chrometraceConsumerRegistration{
w: conn,
errored: errored,
}
wg.Add(1)
go func() {
defer wg.Done()
<-errored
conn.Close()
}()
}
var chrometraceFileConsumerPath = envconst.String("ZREPL_ACTIVITY_TRACE", "")
func init() {
if chrometraceFileConsumerPath != "" {
var err error
f, err := os.Create(chrometraceFileConsumerPath)
if err != nil {
panic(err)
}
errored := make(chan error, 1)
chrometraceConsumers.register <- chrometraceConsumerRegistration{
w: f,
errored: errored,
}
go func() {
<-errored
f.Close()
}()
}
}

View File

@ -0,0 +1,27 @@
package trace
import "context"
type contextKey int
const (
contextKeyTraceNode contextKey = 1 + iota
)
var contextKeys = []contextKey{
contextKeyTraceNode,
}
// WithInherit inherits the task hierarchy from inheritFrom into ctx.
// The returned context is a child of ctx, but its task and span are those of inheritFrom.
//
// Note that in most use cases, callers most likely want to call WithTask since it will most likely
// be in some sort of connection handler context.
func WithInherit(ctx, inheritFrom context.Context) context.Context {
for _, k := range contextKeys {
if v := inheritFrom.Value(k); v != nil {
ctx = context.WithValue(ctx, k, v) // no shadow
}
}
return ctx
}

View File

@ -0,0 +1,74 @@
package trace
import (
"context"
"fmt"
"runtime"
"strings"
"sync"
)
// use like this:
//
// defer WithSpanFromStackUpdateCtx(&existingCtx)()
//
//
func WithSpanFromStackUpdateCtx(ctx *context.Context) DoneFunc {
childSpanCtx, end := WithSpan(*ctx, getMyCallerOrPanic())
*ctx = childSpanCtx
return end
}
// derive task name from call stack (caller's name)
func WithTaskFromStack(ctx context.Context) (context.Context, DoneFunc) {
return WithTask(ctx, getMyCallerOrPanic())
}
// derive task name from call stack (caller's name) and update *ctx
// to point to be the child task ctx
func WithTaskFromStackUpdateCtx(ctx *context.Context) DoneFunc {
child, end := WithTask(*ctx, getMyCallerOrPanic())
*ctx = child
return end
}
// create a task and a span within it in one call
func WithTaskAndSpan(ctx context.Context, task string, span string) (context.Context, DoneFunc) {
ctx, endTask := WithTask(ctx, task)
ctx, endSpan := WithSpan(ctx, fmt.Sprintf("%s %s", task, span))
return ctx, func() {
endSpan()
endTask()
}
}
// create a span during which several child tasks are spawned using the `add` function
func WithTaskGroup(ctx context.Context, taskGroup string) (_ context.Context, add func(f func(context.Context)), waitEnd DoneFunc) {
var wg sync.WaitGroup
ctx, endSpan := WithSpan(ctx, taskGroup)
add = func(f func(context.Context)) {
wg.Add(1)
defer wg.Done()
ctx, endTask := WithTask(ctx, taskGroup)
defer endTask()
f(ctx)
}
waitEnd = func() {
wg.Wait()
endSpan()
}
return ctx, add, waitEnd
}
func getMyCallerOrPanic() string {
pc, _, _, ok := runtime.Caller(2)
if !ok {
panic("cannot get caller")
}
details := runtime.FuncForPC(pc)
if ok && details != nil {
const prefix = "github.com/zrepl/zrepl"
return strings.TrimPrefix(strings.TrimPrefix(details.Name(), prefix), "/")
}
return ""
}

View File

@ -0,0 +1,16 @@
package trace
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestGetCallerOrPanic(t *testing.T) {
withStackFromCtxMock := func() string {
return getMyCallerOrPanic()
}
ret := withStackFromCtxMock()
// zrepl prefix is stripped
assert.Equal(t, "daemon/logging/trace.TestGetCallerOrPanic", ret)
}

View File

@ -0,0 +1,15 @@
package trace
import (
"fmt"
"os"
)
const debugEnabled = false
func debug(format string, args ...interface{}) {
if !debugEnabled {
return
}
fmt.Fprintf(os.Stderr, format+"\n", args...)
}

View File

@ -0,0 +1,47 @@
package trace
import (
"encoding/base64"
"math/rand"
"os"
"strings"
"time"
"github.com/zrepl/zrepl/util/envconst"
)
var genIdPRNG = rand.New(rand.NewSource(1))
func init() {
genIdPRNG.Seed(time.Now().UnixNano())
genIdPRNG.Seed(int64(os.Getpid()))
}
var genIdNumBytes = envconst.Int("ZREPL_TRACE_ID_NUM_BYTES", 3)
func init() {
if genIdNumBytes < 1 {
panic("trace node id byte length must be at least 1")
}
}
func genID() string {
var out strings.Builder
enc := base64.NewEncoder(base64.RawStdEncoding, &out)
buf := make([]byte, genIdNumBytes)
for i := 0; i < len(buf); {
n, err := genIdPRNG.Read(buf[i:])
if err != nil {
panic(err)
}
i += n
}
n, err := enc.Write(buf[:])
if err != nil || n != len(buf) {
panic(err)
}
if err := enc.Close(); err != nil {
panic(err)
}
return out.String()
}

View File

@ -0,0 +1,205 @@
package trace
import (
"context"
"fmt"
"testing"
"github.com/gitchander/permutation"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestRegularSpanUsage(t *testing.T) {
root, endRoot := WithTask(context.Background(), "root")
defer endRoot()
s1, endS1 := WithSpan(root, "parent")
s2, endS2 := WithSpan(s1, "child")
_, endS3 := WithSpan(s2, "grand-child")
require.NotPanics(t, func() { endS3() })
require.NotPanics(t, func() { endS2() })
// reuse
_, endS4 := WithSpan(s1, "child-2")
require.NotPanics(t, func() { endS4() })
// close parent
require.NotPanics(t, func() { endS1() })
}
func TestMultipleActiveChildSpansNotAllowed(t *testing.T) {
root, endRoot := WithTask(context.Background(), "root")
defer endRoot()
s1, _ := WithSpan(root, "s1")
_, endS2 := WithSpan(s1, "s1-child1")
require.PanicsWithValue(t, ErrAlreadyActiveChildSpan, func() {
_, _ = WithSpan(s1, "s1-child2")
})
endS2()
require.NotPanics(t, func() {
_, _ = WithSpan(s1, "s1-child2")
})
}
func TestForkingChildSpansNotAllowed(t *testing.T) {
root, endRoot := WithTask(context.Background(), "root")
defer endRoot()
s1, _ := WithSpan(root, "s1")
sc, endSC := WithSpan(s1, "s1-child")
_, _ = WithSpan(sc, "s1-child-child")
require.PanicsWithValue(t, ErrSpanStillHasActiveChildSpan, func() {
endSC()
})
}
func TestRegularTaskUsage(t *testing.T) {
// assert concurrent activities on different tasks can end in any order
closeOrder := []int{0, 1, 2}
closeOrders := permutation.New(permutation.IntSlice(closeOrder))
for closeOrders.Next() {
t.Run(fmt.Sprintf("%v", closeOrder), func(t *testing.T) {
root, endRoot := WithTask(context.Background(), "root")
defer endRoot()
c1, endC1 := WithTask(root, "c1")
defer endC1()
c2, endC2 := WithTask(root, "c2")
defer endC2()
// begin 3 concurrent activities
_, endAR := WithSpan(root, "aR")
_, endAC1 := WithSpan(c1, "aC1")
_, endAC2 := WithSpan(c2, "aC2")
endFuncs := []DoneFunc{endAR, endAC1, endAC2}
for _, i := range closeOrder {
require.NotPanics(t, func() {
endFuncs[i]()
}, "%v", i)
}
})
}
}
func TestTaskEndWithActiveChildTaskNotAllowed(t *testing.T) {
root, _ := WithTask(context.Background(), "root")
c, endC := WithTask(root, "child")
_, _ = WithTask(c, "grand-child")
func() {
defer func() {
r := recover()
require.NotNil(t, r)
err, ok := r.(error)
require.True(t, ok)
require.Equal(t, ErrTaskStillHasActiveChildTasks, errors.Cause(err))
}()
endC()
}()
}
func TestIdempotentEndTask(t *testing.T) {
_, end := WithTask(context.Background(), "root")
end()
require.NotPanics(t, func() { end() })
}
func TestSpansPanicIfNoParentTask(t *testing.T) {
require.Panics(t, func() { WithSpan(context.Background(), "taskless-span") })
}
func TestIdempotentEndSpan(t *testing.T) {
root, _ := WithTask(context.Background(), "root")
_, end := WithSpan(root, "span")
end()
require.NotPanics(t, func() { end() })
}
func logAndGetTraceNode(t *testing.T, descr string, ctx context.Context) *traceNode {
n, ok := ctx.Value(contextKeyTraceNode).(*traceNode)
require.True(t, ok)
t.Logf("% 20s %p %#v", descr, n, n)
return n
}
func TestWhiteboxHierachy(t *testing.T) {
root, e1 := WithTask(context.Background(), "root")
rootN := logAndGetTraceNode(t, "root", root)
assert.Nil(t, rootN.parentTask)
assert.Nil(t, rootN.parentSpan)
child, e2 := WithSpan(root, "child")
childN := logAndGetTraceNode(t, "child", child)
assert.Equal(t, rootN, childN.parentTask)
assert.Equal(t, rootN, childN.parentSpan)
grandchild, e3 := WithSpan(child, "grandchild")
grandchildN := logAndGetTraceNode(t, "grandchild", grandchild)
assert.Equal(t, rootN, grandchildN.parentTask)
assert.Equal(t, childN, grandchildN.parentSpan)
gcTask, e4 := WithTask(grandchild, "grandchild-task")
gcTaskN := logAndGetTraceNode(t, "grandchild-task", gcTask)
assert.Equal(t, rootN, gcTaskN.parentTask)
assert.Nil(t, gcTaskN.parentSpan)
// it is allowed that a child task outlives the _span_ in which it was created
// (albeit not its parent task)
e3()
e2()
gcTaskSpan, e5 := WithSpan(gcTask, "granschild-task-span")
gcTaskSpanN := logAndGetTraceNode(t, "granschild-task-span", gcTaskSpan)
assert.Equal(t, gcTaskN, gcTaskSpanN.parentTask)
assert.Equal(t, gcTaskN, gcTaskSpanN.parentSpan)
e5()
e4()
e1()
}
func TestOrphanTasksBecomeNewRootWhenAllAncestorsAreDead(t *testing.T) {
parent, e1 := WithTask(context.Background(), "parent")
_ = logAndGetTraceNode(t, "parent-task", parent)
e1()
var child context.Context
var e2 DoneFunc
require.NotPanics(t, func() {
child, e2 = WithTask(parent, "child")
})
childN := logAndGetTraceNode(t, "child-task", child)
assert.Nil(t, childN.parentTask)
grandchild, _ := WithTask(child, "grandchild")
grandchildN := logAndGetTraceNode(t, "grandchild-task", grandchild)
assert.Equal(t, childN, grandchildN.parentTask)
require.Panics(t, func() { e2() }, "if the parent was alive at creation of child task, wrong termination order remains a panicable offense though")
}
func TestOrphanTaskBecomesChildToNearestLiveAncestor(t *testing.T) {
parent, e1 := WithTask(context.Background(), "parent")
parentN := logAndGetTraceNode(t, "parent-task", parent)
child, e2 := WithTask(parent, "child")
childN := logAndGetTraceNode(t, "child-task", child)
assert.Equal(t, parentN, childN.parentTask)
e2()
grandchild, e3 := WithTask(child, "grandchild")
grandchildN := logAndGetTraceNode(t, "grandchild-task", grandchild)
assert.Equal(t, parentN, grandchildN.parentTask) // child is already dead
require.NotPanics(t, func() {
e3()
e1()
})
}

View File

@ -0,0 +1,53 @@
package trace
import (
"fmt"
"strings"
"sync"
"github.com/willf/bitset"
)
type uniqueConcurrentTaskNamer struct {
mtx sync.Mutex
active map[string]*bitset.BitSet
}
// bitvecLengthGauge may be nil
func newUniqueTaskNamer() *uniqueConcurrentTaskNamer {
return &uniqueConcurrentTaskNamer{
active: make(map[string]*bitset.BitSet),
}
}
// appends `#%d` to `name` such that until `done` is called,
// it is guaranteed that `#%d` is not returned a second time for the same `name`
func (namer *uniqueConcurrentTaskNamer) UniqueConcurrentTaskName(name string) (uniqueName string, done func()) {
if strings.Contains(name, "#") {
panic(name)
}
namer.mtx.Lock()
act, ok := namer.active[name]
if !ok {
act = bitset.New(64) // FIXME magic const
namer.active[name] = act
}
id, ok := act.NextClear(0)
if !ok {
// if !ok, all bits are 1 and act.Len() returns the next bit
id = act.Len()
// FIXME unbounded growth without reclamation
}
act.Set(id)
namer.mtx.Unlock()
return fmt.Sprintf("%s#%d", name, id), func() {
namer.mtx.Lock()
defer namer.mtx.Unlock()
act, ok := namer.active[name]
if !ok {
panic("must be initialized upon entry")
}
act.Clear(id)
}
}

View File

@ -0,0 +1,58 @@
package trace
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"github.com/stretchr/testify/require"
"github.com/willf/bitset"
)
func TestBitsetFeaturesForUniqueConcurrentTaskNamer(t *testing.T) {
var b bitset.BitSet
require.Equal(t, uint(0), b.Len())
require.Equal(t, uint(0), b.Count())
b.Set(0)
require.Equal(t, uint(1), b.Len())
require.Equal(t, uint(1), b.Count())
b.Set(8)
require.Equal(t, uint(9), b.Len())
require.Equal(t, uint(2), b.Count())
b.Set(1)
require.Equal(t, uint(9), b.Len())
require.Equal(t, uint(3), b.Count())
}
func TestUniqueConcurrentTaskNamer(t *testing.T) {
namer := newUniqueTaskNamer()
var wg sync.WaitGroup
const N = 8128
const Q = 23
var fails uint32
var m sync.Map
wg.Add(N)
for i := 0; i < N; i++ {
go func(i int) {
defer wg.Done()
name := fmt.Sprintf("%d", i/Q)
uniqueName, done := namer.UniqueConcurrentTaskName(name)
act, _ := m.LoadOrStore(uniqueName, i)
if act.(int) != i {
atomic.AddUint32(&fails, 1)
}
m.Delete(uniqueName)
done()
}(i)
}
wg.Wait()
require.Equal(t, uint32(0), fails)
}

View File

@ -1,6 +1,8 @@
package daemon
import (
"context"
"github.com/zrepl/zrepl/cli"
"github.com/zrepl/zrepl/logger"
)
@ -10,7 +12,7 @@ type Logger = logger.Logger
var DaemonCmd = &cli.Subcommand{
Use: "daemon",
Short: "run the zrepl daemon",
Run: func(subcommand *cli.Subcommand, args []string) error {
return Run(subcommand.Config())
Run: func(ctx context.Context, subcommand *cli.Subcommand, args []string) error {
return Run(ctx, subcommand.Config())
},
}

View File

@ -9,7 +9,10 @@ import (
"net/http/pprof"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/net/websocket"
"github.com/zrepl/zrepl/daemon/job"
"github.com/zrepl/zrepl/daemon/logging/trace"
)
type pprofServer struct {
@ -66,6 +69,7 @@ outer:
mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
mux.Handle("/metrics", promhttp.Handler())
mux.Handle("/debug/zrepl/activity-trace", websocket.Handler(trace.ChrometraceClientWebsocketHandler))
go func() {
err := http.Serve(s.listener, mux)
if ctx.Err() != nil {

View File

@ -10,6 +10,7 @@ import (
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/job"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/rpc/dataconn/frameconn"
@ -86,16 +87,19 @@ func (j *prometheusJob) Run(ctx context.Context) {
}
type prometheusJobOutlet struct {
jobName string
}
var _ logger.Outlet = prometheusJobOutlet{}
func newPrometheusLogOutlet(jobName string) prometheusJobOutlet {
return prometheusJobOutlet{jobName}
func newPrometheusLogOutlet() prometheusJobOutlet {
return prometheusJobOutlet{}
}
func (o prometheusJobOutlet) WriteEntry(entry logger.Entry) error {
prom.taskLogEntries.WithLabelValues(o.jobName, entry.Level.String()).Inc()
jobFieldVal, ok := entry.Fields[logging.JobField].(string)
if !ok {
jobFieldVal = "_nojobid"
}
prom.taskLogEntries.WithLabelValues(jobFieldVal, entry.Level.String()).Inc()
return nil
}

View File

@ -12,6 +12,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/pruning"
"github.com/zrepl/zrepl/replication/logic/pdu"
@ -35,17 +36,13 @@ type Logger = logger.Logger
type contextKey int
const contextKeyLogger contextKey = 0
func WithLogger(ctx context.Context, log Logger) context.Context {
return context.WithValue(ctx, contextKeyLogger, log)
}
const (
contextKeyPruneSide contextKey = 1 + iota
)
func GetLogger(ctx context.Context) Logger {
if l, ok := ctx.Value(contextKeyLogger).(Logger); ok {
return l
}
return logger.NewNullLogger()
pruneSide := ctx.Value(contextKeyPruneSide).(string)
return logging.GetLogger(ctx, logging.SubsysPruning).WithField("prune_side", pruneSide)
}
type args struct {
@ -138,7 +135,7 @@ func NewPrunerFactory(in config.PruningSenderReceiver, promPruneSecs *prometheus
func (f *PrunerFactory) BuildSenderPruner(ctx context.Context, target Target, receiver History) *Pruner {
p := &Pruner{
args: args{
WithLogger(ctx, GetLogger(ctx).WithField("prune_side", "sender")),
context.WithValue(ctx, contextKeyPruneSide, "sender"),
target,
receiver,
f.senderRules,
@ -154,7 +151,7 @@ func (f *PrunerFactory) BuildSenderPruner(ctx context.Context, target Target, re
func (f *PrunerFactory) BuildReceiverPruner(ctx context.Context, target Target, receiver History) *Pruner {
p := &Pruner{
args: args{
WithLogger(ctx, GetLogger(ctx).WithField("prune_side", "receiver")),
context.WithValue(ctx, contextKeyPruneSide, "receiver"),
target,
receiver,
f.receiverRules,
@ -170,7 +167,7 @@ func (f *PrunerFactory) BuildReceiverPruner(ctx context.Context, target Target,
func (f *LocalPrunerFactory) BuildLocalPruner(ctx context.Context, target Target, receiver History) *Pruner {
p := &Pruner{
args: args{
ctx,
context.WithValue(ctx, contextKeyPruneSide, "local"),
target,
receiver,
f.keepRules,

View File

@ -8,10 +8,12 @@ import (
"time"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/filters"
"github.com/zrepl/zrepl/daemon/hooks"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/util/envconst"
"github.com/zrepl/zrepl/zfs"
@ -45,7 +47,6 @@ type snapProgress struct {
type args struct {
ctx context.Context
log Logger
prefix string
interval time.Duration
fsf *filters.DatasetMapFilter
@ -102,23 +103,10 @@ func (s State) sf() state {
type updater func(u func(*Snapper)) State
type state func(a args, u updater) state
type contextKey int
const (
contextKeyLog contextKey = 0
)
type Logger = logger.Logger
func WithLogger(ctx context.Context, log Logger) context.Context {
return context.WithValue(ctx, contextKeyLog, log)
}
func getLogger(ctx context.Context) Logger {
if log, ok := ctx.Value(contextKeyLog).(Logger); ok {
return log
}
return logger.NewNullLogger()
return logging.GetLogger(ctx, logging.SubsysSnapshot)
}
func PeriodicFromConfig(g *config.Global, fsf *filters.DatasetMapFilter, in *config.SnapshottingPeriodic) (*Snapper, error) {
@ -146,13 +134,12 @@ func PeriodicFromConfig(g *config.Global, fsf *filters.DatasetMapFilter, in *con
}
func (s *Snapper) Run(ctx context.Context, snapshotsTaken chan<- struct{}) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
getLogger(ctx).Debug("start")
defer getLogger(ctx).Debug("stop")
s.args.snapshotsTaken = snapshotsTaken
s.args.ctx = ctx
s.args.log = getLogger(ctx)
s.args.dryRun = false // for future expansion
u := func(u func(*Snapper)) State {
@ -190,7 +177,7 @@ func onErr(err error, u updater) state {
case Snapshotting:
s.state = ErrorWait
}
s.args.log.WithError(err).WithField("pre_state", preState).WithField("post_state", s.state).Error("snapshotting error")
getLogger(s.args.ctx).WithError(err).WithField("pre_state", preState).WithField("post_state", s.state).Error("snapshotting error")
}).sf()
}
@ -209,7 +196,7 @@ func syncUp(a args, u updater) state {
if err != nil {
return onErr(err, u)
}
syncPoint, err := findSyncPoint(a.log, fss, a.prefix, a.interval)
syncPoint, err := findSyncPoint(a.ctx, fss, a.prefix, a.interval)
if err != nil {
return onErr(err, u)
}
@ -266,18 +253,18 @@ func snapshot(a args, u updater) state {
suffix := time.Now().In(time.UTC).Format("20060102_150405_000")
snapname := fmt.Sprintf("%s%s", a.prefix, suffix)
l := a.log.
WithField("fs", fs.ToString()).
WithField("snap", snapname)
ctx := logging.WithInjectedField(a.ctx, "fs", fs.ToString())
ctx = logging.WithInjectedField(ctx, "snap", snapname)
hookEnvExtra := hooks.Env{
hooks.EnvFS: fs.ToString(),
hooks.EnvSnapshot: snapname,
}
jobCallback := hooks.NewCallbackHookForFilesystem("snapshot", fs, func(_ context.Context) (err error) {
jobCallback := hooks.NewCallbackHookForFilesystem("snapshot", fs, func(ctx context.Context) (err error) {
l := getLogger(ctx)
l.Debug("create snapshot")
err = zfs.ZFSSnapshot(a.ctx, fs, snapname, false) // TODO propagate context to ZFSSnapshot
err = zfs.ZFSSnapshot(ctx, fs, snapname, false) // TODO propagate context to ZFSSnapshot
if err != nil {
l.WithError(err).Error("cannot create snapshot")
}
@ -290,7 +277,7 @@ func snapshot(a args, u updater) state {
{
filteredHooks, err := a.hooks.CopyFilteredForFilesystem(fs)
if err != nil {
l.WithError(err).Error("unexpected filter error")
getLogger(ctx).WithError(err).Error("unexpected filter error")
fsHadErr = true
goto updateFSState
}
@ -303,7 +290,7 @@ func snapshot(a args, u updater) state {
plan, planErr = hooks.NewPlan(&filteredHooks, hooks.PhaseSnapshot, jobCallback, hookEnvExtra)
if planErr != nil {
fsHadErr = true
l.WithError(planErr).Error("cannot create job hook plan")
getLogger(ctx).WithError(planErr).Error("cannot create job hook plan")
goto updateFSState
}
}
@ -314,15 +301,14 @@ func snapshot(a args, u updater) state {
progress.state = SnapStarted
})
{
l := hooks.GetLogger(a.ctx).WithField("fs", fs.ToString()).WithField("snap", snapname)
l.WithField("report", plan.Report().String()).Debug("begin run job plan")
plan.Run(hooks.WithLogger(a.ctx, l), a.dryRun)
getLogger(ctx).WithField("report", plan.Report().String()).Debug("begin run job plan")
plan.Run(ctx, a.dryRun)
planReport = plan.Report()
fsHadErr = planReport.HadError() // not just fatal errors
if fsHadErr {
l.WithField("report", planReport.String()).Error("end run job plan with error")
getLogger(ctx).WithField("report", planReport.String()).Error("end run job plan with error")
} else {
l.WithField("report", planReport.String()).Info("end run job plan successful")
getLogger(ctx).WithField("report", planReport.String()).Info("end run job plan successful")
}
}
@ -342,7 +328,7 @@ func snapshot(a args, u updater) state {
case a.snapshotsTaken <- struct{}{}:
default:
if a.snapshotsTaken != nil {
a.log.Warn("callback channel is full, discarding snapshot update event")
getLogger(a.ctx).Warn("callback channel is full, discarding snapshot update event")
}
}
@ -355,7 +341,7 @@ func snapshot(a args, u updater) state {
break
}
}
a.log.WithField("hook", h.String()).WithField("hook_number", hookIdx+1).Warn("hook did not match any snapshotted filesystems")
getLogger(a.ctx).WithField("hook", h.String()).WithField("hook_number", hookIdx+1).Warn("hook did not match any snapshotted filesystems")
}
}
@ -376,7 +362,7 @@ func wait(a args, u updater) state {
lastTick := snapper.lastInvocation
snapper.sleepUntil = lastTick.Add(a.interval)
sleepUntil = snapper.sleepUntil
log := a.log.WithField("sleep_until", sleepUntil).WithField("duration", a.interval)
log := getLogger(a.ctx).WithField("sleep_until", sleepUntil).WithField("duration", a.interval)
logFunc := log.Debug
if snapper.state == ErrorWait || snapper.state == SyncUpErrWait {
logFunc = log.Error
@ -404,7 +390,7 @@ func listFSes(ctx context.Context, mf *filters.DatasetMapFilter) (fss []*zfs.Dat
var syncUpWarnNoSnapshotUntilSyncupMinDuration = envconst.Duration("ZREPL_SNAPPER_SYNCUP_WARN_MIN_DURATION", 1*time.Second)
// see docs/snapshotting.rst
func findSyncPoint(log Logger, fss []*zfs.DatasetPath, prefix string, interval time.Duration) (syncPoint time.Time, err error) {
func findSyncPoint(ctx context.Context, fss []*zfs.DatasetPath, prefix string, interval time.Duration) (syncPoint time.Time, err error) {
const (
prioHasVersions int = iota
@ -426,10 +412,10 @@ func findSyncPoint(log Logger, fss []*zfs.DatasetPath, prefix string, interval t
now := time.Now()
log.Debug("examine filesystem state to find sync point")
getLogger(ctx).Debug("examine filesystem state to find sync point")
for _, d := range fss {
l := log.WithField("fs", d.ToString())
syncPoint, err := findSyncPointFSNextOptimalSnapshotTime(l, now, interval, prefix, d)
ctx := logging.WithInjectedField(ctx, "fs", d.ToString())
syncPoint, err := findSyncPointFSNextOptimalSnapshotTime(ctx, now, interval, prefix, d)
if err == findSyncPointFSNoFilesystemVersionsErr {
snaptimes = append(snaptimes, snapTime{
ds: d,
@ -438,9 +424,9 @@ func findSyncPoint(log Logger, fss []*zfs.DatasetPath, prefix string, interval t
})
} else if err != nil {
hardErrs++
l.WithError(err).Error("cannot determine optimal sync point for this filesystem")
getLogger(ctx).WithError(err).Error("cannot determine optimal sync point for this filesystem")
} else {
l.WithField("syncPoint", syncPoint).Debug("found optimal sync point for this filesystem")
getLogger(ctx).WithField("syncPoint", syncPoint).Debug("found optimal sync point for this filesystem")
snaptimes = append(snaptimes, snapTime{
ds: d,
prio: prioHasVersions,
@ -467,7 +453,7 @@ func findSyncPoint(log Logger, fss []*zfs.DatasetPath, prefix string, interval t
})
winnerSyncPoint := snaptimes[0].time
l := log.WithField("syncPoint", winnerSyncPoint.String())
l := getLogger(ctx).WithField("syncPoint", winnerSyncPoint.String())
l.Info("determined sync point")
if winnerSyncPoint.Sub(now) > syncUpWarnNoSnapshotUntilSyncupMinDuration {
for _, st := range snaptimes {
@ -483,9 +469,9 @@ func findSyncPoint(log Logger, fss []*zfs.DatasetPath, prefix string, interval t
var findSyncPointFSNoFilesystemVersionsErr = fmt.Errorf("no filesystem versions")
func findSyncPointFSNextOptimalSnapshotTime(l Logger, now time.Time, interval time.Duration, prefix string, d *zfs.DatasetPath) (time.Time, error) {
func findSyncPointFSNextOptimalSnapshotTime(ctx context.Context, now time.Time, interval time.Duration, prefix string, d *zfs.DatasetPath) (time.Time, error) {
fsvs, err := zfs.ZFSListFilesystemVersions(context.TODO(), d, zfs.ListFilesystemVersionsOptions{
fsvs, err := zfs.ZFSListFilesystemVersions(ctx, d, zfs.ListFilesystemVersionsOptions{
Types: zfs.Snapshots,
ShortnamePrefix: prefix,
})
@ -502,7 +488,7 @@ func findSyncPointFSNextOptimalSnapshotTime(l Logger, now time.Time, interval ti
})
latest := fsvs[len(fsvs)-1]
l.WithField("creation", latest.Creation).Debug("found latest snapshot")
getLogger(ctx).WithField("creation", latest.Creation).Debug("found latest snapshot")
since := now.Sub(latest.Creation)
if since < 0 {

View File

@ -3,25 +3,18 @@ package endpoint
import (
"context"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/logger"
)
type contextKey int
const (
contextKeyLogger contextKey = iota
ClientIdentityKey
ClientIdentityKey contextKey = iota
)
type Logger = logger.Logger
func WithLogger(ctx context.Context, log Logger) context.Context {
return context.WithValue(ctx, contextKeyLogger, log)
}
func getLogger(ctx context.Context) Logger {
if l, ok := ctx.Value(contextKeyLogger).(Logger); ok {
return l
}
return logger.NewNullLogger()
return logging.GetLogger(ctx, logging.SubsysEndpoint)
}

View File

@ -10,6 +10,7 @@ import (
"sync"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/replication/logic/pdu"
"github.com/zrepl/zrepl/util/chainedio"
@ -73,6 +74,8 @@ func (s *Sender) filterCheckFS(fs string) (*zfs.DatasetPath, error) {
}
func (s *Sender) ListFilesystems(ctx context.Context, r *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
fss, err := zfs.ZFSListMapping(ctx, s.FSFilter)
if err != nil {
return nil, err
@ -95,6 +98,8 @@ func (s *Sender) ListFilesystems(ctx context.Context, r *pdu.ListFilesystemReq)
}
func (s *Sender) ListFilesystemVersions(ctx context.Context, r *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
lp, err := s.filterCheckFS(r.GetFilesystem())
if err != nil {
return nil, err
@ -113,6 +118,7 @@ func (s *Sender) ListFilesystemVersions(ctx context.Context, r *pdu.ListFilesyst
}
func (p *Sender) HintMostRecentCommonAncestor(ctx context.Context, r *pdu.HintMostRecentCommonAncestorReq) (*pdu.HintMostRecentCommonAncestorRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
fsp, err := p.filterCheckFS(r.GetFilesystem())
if err != nil {
@ -245,6 +251,7 @@ func sendArgsFromPDUAndValidateExistsAndGetVersion(ctx context.Context, fs strin
}
func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
_, err := s.filterCheckFS(r.Filesystem)
if err != nil {
@ -350,6 +357,7 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
}
func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
orig := r.GetOriginalReq() // may be nil, always use proto getters
fsp, err := p.filterCheckFS(orig.GetFilesystem())
@ -371,27 +379,30 @@ func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*p
return nil, errors.Wrap(err, "validate `to` exists")
}
log := func(ctx context.Context) Logger {
log := getLogger(ctx).WithField("to_guid", to.Guid).
WithField("fs", fs).
WithField("to", to.RelName)
if from != nil {
log = log.WithField("from", from.RelName).WithField("from_guid", from.Guid)
}
return log
}
log.Debug("move replication cursor to most recent common version")
log(ctx).Debug("move replication cursor to most recent common version")
destroyedCursors, err := MoveReplicationCursor(ctx, fs, to, p.jobId)
if err != nil {
if err == zfs.ErrBookmarkCloningNotSupported {
log.Debug("not setting replication cursor, bookmark cloning not supported")
log(ctx).Debug("not setting replication cursor, bookmark cloning not supported")
} else {
msg := "cannot move replication cursor, keeping hold on `to` until successful"
log.WithError(err).Error(msg)
log(ctx).WithError(err).Error(msg)
err = errors.Wrap(err, msg)
// it is correct to not release the hold if we can't move the cursor!
return &pdu.SendCompletedRes{}, err
}
} else {
log.Info("successfully moved replication cursor")
log(ctx).Info("successfully moved replication cursor")
}
// kick off releasing of step holds / bookmarks
@ -401,21 +412,27 @@ func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*p
wg.Add(2)
go func() {
defer wg.Done()
log.Debug("release step-hold of or step-bookmark on `to`")
ctx, endTask := trace.WithTask(ctx, "release-step-hold-to")
defer endTask()
log(ctx).Debug("release step-hold of or step-bookmark on `to`")
err = ReleaseStep(ctx, fs, to, p.jobId)
if err != nil {
log.WithError(err).Error("cannot release step-holds on or destroy step-bookmark of `to`")
log(ctx).WithError(err).Error("cannot release step-holds on or destroy step-bookmark of `to`")
} else {
log.Info("successfully released step-holds on or destroyed step-bookmark of `to`")
log(ctx).Info("successfully released step-holds on or destroyed step-bookmark of `to`")
}
}()
go func() {
defer wg.Done()
ctx, endTask := trace.WithTask(ctx, "release-step-hold-from")
defer endTask()
if from == nil {
return
}
log.Debug("release step-hold of or step-bookmark on `from`")
log(ctx).Debug("release step-hold of or step-bookmark on `from`")
err := ReleaseStep(ctx, fs, *from, p.jobId)
if err != nil {
if dne, ok := err.(*zfs.DatasetDoesNotExist); ok {
@ -424,15 +441,15 @@ func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*p
// In that case, nonexistence of `from` is not an error, otherwise it is.
for _, c := range destroyedCursors {
if c.GetFullPath() == dne.Path {
log.Info("`from` was a replication cursor and has already been destroyed")
log(ctx).Info("`from` was a replication cursor and has already been destroyed")
return
}
}
// fallthrough
}
log.WithError(err).Error("cannot release step-holds on or destroy step-bookmark of `from`")
log(ctx).WithError(err).Error("cannot release step-holds on or destroy step-bookmark of `from`")
} else {
log.Info("successfully released step-holds on or destroyed step-bookmark of `from`")
log(ctx).Info("successfully released step-holds on or destroyed step-bookmark of `from`")
}
}()
wg.Wait()
@ -441,6 +458,8 @@ func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*p
}
func (p *Sender) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
dp, err := p.filterCheckFS(req.Filesystem)
if err != nil {
return nil, err
@ -449,6 +468,8 @@ func (p *Sender) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshots
}
func (p *Sender) Ping(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
res := pdu.PingRes{
Echo: req.GetMessage(),
}
@ -456,14 +477,20 @@ func (p *Sender) Ping(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, erro
}
func (p *Sender) PingDataconn(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
return p.Ping(ctx, req)
}
func (p *Sender) WaitForConnectivity(ctx context.Context) error {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
return nil
}
func (p *Sender) ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
dp, err := p.filterCheckFS(req.Filesystem)
if err != nil {
return nil, err
@ -594,6 +621,8 @@ func (f subroot) MapToLocal(fs string) (*zfs.DatasetPath, error) {
}
func (s *Receiver) ListFilesystems(ctx context.Context, req *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
root := s.clientRootFromCtx(ctx)
filtered, err := zfs.ZFSListMapping(ctx, subroot{root})
if err != nil {
@ -644,6 +673,8 @@ func (s *Receiver) ListFilesystems(ctx context.Context, req *pdu.ListFilesystemR
}
func (s *Receiver) ListFilesystemVersions(ctx context.Context, req *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
root := s.clientRootFromCtx(ctx)
lp, err := subroot{root}.MapToLocal(req.GetFilesystem())
if err != nil {
@ -665,6 +696,8 @@ func (s *Receiver) ListFilesystemVersions(ctx context.Context, req *pdu.ListFile
}
func (s *Receiver) Ping(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
res := pdu.PingRes{
Echo: req.GetMessage(),
}
@ -672,24 +705,30 @@ func (s *Receiver) Ping(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, er
}
func (s *Receiver) PingDataconn(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
return s.Ping(ctx, req)
}
func (s *Receiver) WaitForConnectivity(ctx context.Context) error {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
return nil
}
func (s *Receiver) ReplicationCursor(context.Context, *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) {
func (s *Receiver) ReplicationCursor(ctx context.Context, _ *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
return nil, fmt.Errorf("ReplicationCursor not implemented for Receiver")
}
func (s *Receiver) Send(ctx context.Context, req *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
return nil, nil, fmt.Errorf("receiver does not implement Send()")
}
var maxConcurrentZFSRecvSemaphore = semaphore.New(envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_RECV", 10))
func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io.ReadCloser) (*pdu.ReceiveRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
getLogger(ctx).Debug("incoming Receive")
defer receive.Close()
@ -918,6 +957,8 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io.
}
func (s *Receiver) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
root := s.clientRootFromCtx(ctx)
lp, err := subroot{root}.MapToLocal(req.Filesystem)
if err != nil {
@ -927,6 +968,7 @@ func (s *Receiver) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapsho
}
func (p *Receiver) HintMostRecentCommonAncestor(ctx context.Context, r *pdu.HintMostRecentCommonAncestorReq) (*pdu.HintMostRecentCommonAncestorRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
// we don't move last-received-hold as part of this hint
// because that wouldn't give us any benefit wrt resumability.
//
@ -935,7 +977,9 @@ func (p *Receiver) HintMostRecentCommonAncestor(ctx context.Context, r *pdu.Hint
return &pdu.HintMostRecentCommonAncestorRes{}, nil
}
func (p *Receiver) SendCompleted(context.Context, *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) {
func (p *Receiver) SendCompleted(ctx context.Context, _ *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
return &pdu.SendCompletedRes{}, nil
}
@ -957,7 +1001,7 @@ func doDestroySnapshots(ctx context.Context, lp *zfs.DatasetPath, snaps []*pdu.F
ErrOut: &errs[i],
}
}
zfs.ZFSDestroyFilesystemVersions(reqs)
zfs.ZFSDestroyFilesystemVersions(ctx, reqs)
for i := range reqs {
if errs[i] != nil {
if de, ok := errs[i].(*zfs.DestroySnapshotsError); ok && len(de.Reason) == 1 {

View File

@ -11,6 +11,7 @@ import (
"github.com/pkg/errors"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/util/envconst"
"github.com/zrepl/zrepl/util/semaphore"
"github.com/zrepl/zrepl/zfs"
@ -529,15 +530,16 @@ func ListAbstractionsStreamed(ctx context.Context, query ListZFSHoldsAndBookmark
}
sem := semaphore.New(int64(query.Concurrency))
ctx, endTask := trace.WithTask(ctx, "list-abstractions-streamed-producer")
go func() {
defer endTask()
defer close(out)
defer close(outErrs)
var wg sync.WaitGroup
defer wg.Wait()
_, add, wait := trace.WithTaskGroup(ctx, "list-abstractions-impl-fs")
defer wait()
for i := range fss {
wg.Add(1)
go func(i int) {
defer wg.Done()
add(func(ctx context.Context) {
g, err := sem.Acquire(ctx)
if err != nil {
errCb(err, fss[i], err.Error())
@ -547,7 +549,7 @@ func ListAbstractionsStreamed(ctx context.Context, query ListZFSHoldsAndBookmark
defer g.Release()
listAbstractionsImplFS(ctx, fss[i], &query, emitAbstraction, errCb)
}()
}(i)
})
}
}()

3
go.mod
View File

@ -5,6 +5,7 @@ go 1.12
require (
github.com/fatih/color v1.7.0
github.com/gdamore/tcell v1.2.0
github.com/gitchander/permutation v0.0.0-20181107151852-9e56b92e9909
github.com/go-logfmt/logfmt v0.4.0
github.com/go-sql-driver/mysql v1.4.1-0.20190907122137-b2c03bcae3d4
github.com/golang/protobuf v1.3.2
@ -23,10 +24,12 @@ require (
github.com/pkg/profile v1.2.1
github.com/problame/go-netssh v0.0.0-20191209123953-18d8aa6923c7
github.com/prometheus/client_golang v1.2.1
github.com/prometheus/common v0.7.0
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44 // go1.12 thinks it needs this
github.com/spf13/cobra v0.0.2
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.4.0
github.com/willf/bitset v1.1.10
github.com/yudai/gojsondiff v0.0.0-20170107030110-7b1b7adf999d
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // go1.12 thinks it needs this
github.com/zrepl/yaml-config v0.0.0-20191220194647-cbb6b0cf4bdd

8
go.sum
View File

@ -7,8 +7,10 @@ github.com/OpenPeeDeeP/depguard v0.0.0-20181229194401-1f388ab2d810/go.mod h1:7/4
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alvaroloes/enumer v1.1.1/go.mod h1:FxrjvuXoDAx9isTJrv4c+T410zFi0DtXIT0m65DJ+Wo=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
@ -38,6 +40,8 @@ github.com/gdamore/encoding v1.0.0 h1:+7OoQ1Bc6eTm5niUzBa0Ctsh6JbMW6Ra+YNuAtDBdk
github.com/gdamore/encoding v1.0.0/go.mod h1:alR0ol34c49FCSBLjhosxzcPHQbf2trDkoo5dl+VrEg=
github.com/gdamore/tcell v1.2.0 h1:ikixzsxc8K8o3V2/CEmyoEW8mJZaNYQQ3NP3VIQdUe4=
github.com/gdamore/tcell v1.2.0/go.mod h1:Hjvr+Ofd+gLglo7RYKxxnzCBmev3BzsS67MebKS4zMM=
github.com/gitchander/permutation v0.0.0-20181107151852-9e56b92e9909 h1:9NC8seTx6/zRmMTAdsHj/uOMi0EGHGQtjyLafBjk77Q=
github.com/gitchander/permutation v0.0.0-20181107151852-9e56b92e9909/go.mod h1:lP+DW8LR6Rw3ru9Vo2/y/3iiLaLWmofYql/va+7zJOk=
github.com/go-critic/go-critic v0.3.4/go.mod h1:AHR42Lk/E/aOznsrYdMYeIQS5RH10HZHSqP+rD6AJrc=
github.com/go-critic/go-critic v0.3.5-0.20190526074819-1df300866540/go.mod h1:+sE8vrLDS2M0pZkBk0wy6+nLdKexVDrl/jBqQOTDThA=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
@ -242,6 +246,7 @@ github.com/shurcooL/go-goon v0.0.0-20170922171312-37c2f522c041/go.mod h1:N5mDOms
github.com/sirupsen/logrus v1.0.5/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sourcegraph/go-diff v0.5.1/go.mod h1:j2dHj3m8aZgQO8lMTcTnBcXkRRRqi34cd2MNlA9u1mE=
github.com/spf13/afero v1.1.0/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
@ -277,6 +282,8 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC
github.com/valyala/fasthttp v1.2.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s=
github.com/valyala/quicktemplate v1.1.1/go.mod h1:EH+4AkTd43SvgIbQHYu59/cJyxDoOVRUAfrukLPuGJ4=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/willf/bitset v1.1.10 h1:NotGKqX0KwQ72NUzqrjZq5ipPNDQex9lo3WpaS8L2sc=
github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yudai/gojsondiff v0.0.0-20170107030110-7b1b7adf999d h1:yJIizrfO599ot2kQ6Af1enICnwBD3XoxgX3MrMwot2M=
github.com/yudai/gojsondiff v0.0.0-20170107030110-7b1b7adf999d/go.mod h1:AY32+k2cwILAkW1fbgxQ5mUmMiZFgLIV+FBNExI05xg=
@ -366,6 +373,7 @@ google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9M
google.golang.org/grpc v1.17.0 h1:TRJYBgMclJvGYn2rIMjj+h9KtMt5r1Ij7ODVRIZkwhk=
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@ -11,6 +11,7 @@ import (
"github.com/fatih/color"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/logging"
@ -68,7 +69,9 @@ func doMain() error {
logger.Error(err.Error())
panic(err)
}
ctx := platformtest.WithLogger(context.Background(), logger)
ctx := context.Background()
defer trace.WithTaskFromStackUpdateCtx(&ctx)()
ctx = logging.WithLoggers(ctx, logging.SubsystemLoggersWithUniversalLogger(logger))
ex := platformtest.NewEx(logger)
type invocation struct {

View File

@ -3,22 +3,12 @@ package platformtest
import (
"context"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/logger"
)
type Logger = logger.Logger
type contextKey int
const (
contextKeyLogger contextKey = iota
)
func WithLogger(ctx context.Context, logger Logger) context.Context {
ctx = context.WithValue(ctx, contextKeyLogger, logger)
return ctx
}
func GetLog(ctx context.Context) Logger {
return ctx.Value(contextKeyLogger).(Logger)
return logging.GetLogger(ctx, logging.SubsysPlatformtest)
}

View File

@ -32,7 +32,7 @@ func BatchDestroy(ctx *platformtest.Context) {
Name: "2",
},
}
zfs.ZFSDestroyFilesystemVersions(reqs)
zfs.ZFSDestroyFilesystemVersions(ctx, reqs)
if *reqs[0].ErrOut != nil {
panic("expecting no error")
}

View File

@ -10,6 +10,7 @@ import (
"sync"
"time"
"github.com/zrepl/zrepl/daemon/logging/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@ -287,6 +288,17 @@ func Do(ctx context.Context, planner Planner) (ReportFunc, WaitFunc) {
}
func (a *attempt) do(ctx context.Context, prev *attempt) {
prevs := a.doGlobalPlanning(ctx, prev)
if prevs == nil {
return
}
a.doFilesystems(ctx, prevs)
}
// if no error occurs, returns a map that maps this attempt's a.fss to `prev`'s a.fss
func (a *attempt) doGlobalPlanning(ctx context.Context, prev *attempt) map[*fs]*fs {
ctx, endSpan := trace.WithSpan(ctx, "plan")
defer endSpan()
pfss, err := a.planner.Plan(ctx)
errTime := time.Now()
defer a.l.Lock().Unlock()
@ -294,7 +306,7 @@ func (a *attempt) do(ctx context.Context, prev *attempt) {
a.planErr = newTimedError(err, errTime)
a.fss = nil
a.finishedAt = time.Now()
return
return nil
}
for _, pfs := range pfss {
@ -351,7 +363,7 @@ func (a *attempt) do(ctx context.Context, prev *attempt) {
a.planErr = newTimedError(errors.New(msg.String()), now)
a.fss = nil
a.finishedAt = now
return
return nil
}
for cur, fss := range prevFSs {
if len(fss) > 0 {
@ -373,6 +385,15 @@ func (a *attempt) do(ctx context.Context, prev *attempt) {
}
}
return prevs
}
func (a *attempt) doFilesystems(ctx context.Context, prevs map[*fs]*fs) {
ctx, endSpan := trace.WithSpan(ctx, "do-repl")
defer endSpan()
defer a.l.Lock().Unlock()
stepQueue := newStepQueue()
defer stepQueue.Start(envconst.Int("ZREPL_REPLICATION_EXPERIMENTAL_REPLICATION_CONCURRENCY", 1))() // TODO parallel replication
var fssesDone sync.WaitGroup
@ -380,6 +401,9 @@ func (a *attempt) do(ctx context.Context, prev *attempt) {
fssesDone.Add(1)
go func(f *fs) {
defer fssesDone.Done()
// avoid explosion of tasks with name f.report().Info.Name
ctx, endTask := trace.WithTaskAndSpan(ctx, "repl-fs", f.report().Info.Name)
defer endTask()
f.do(ctx, stepQueue, prevs[f])
}(f)
}
@ -423,7 +447,7 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
// TODO hacky
// choose target time that is earlier than any snapshot, so fs planning is always prioritized
targetDate := time.Unix(0, 0)
defer pq.WaitReady(f, targetDate)()
defer pq.WaitReady(ctx, f, targetDate)()
psteps, err = f.fs.PlanFS(ctx) // no shadow
errTime = time.Now() // no shadow
})
@ -584,8 +608,10 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
f.l.DropWhile(func() {
// wait for parallel replication
targetDate := s.step.TargetDate()
defer pq.WaitReady(f, targetDate)()
defer pq.WaitReady(ctx, f, targetDate)()
// do the step
ctx, endSpan := trace.WithSpan(ctx, fmt.Sprintf("%#v", s.step.ReportInfo()))
defer endSpan()
err, errTime = s.step.Step(ctx), time.Now() // no shadow
})

View File

@ -3,23 +3,10 @@ package driver
import (
"context"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/logger"
)
type Logger = logger.Logger
type contextKey int
const contextKeyLogger contextKey = iota + 1
func getLog(ctx context.Context) Logger {
l, ok := ctx.Value(contextKeyLogger).(Logger)
if !ok {
l = logger.NewNullLogger()
}
return l
}
func WithLogger(ctx context.Context, log Logger) context.Context {
return context.WithValue(ctx, contextKeyLogger, log)
func getLog(ctx context.Context) logger.Logger {
return logging.GetLogger(ctx, logging.SubsysReplication)
}

View File

@ -10,6 +10,7 @@ import (
"time"
"github.com/stretchr/testify/require"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/replication/report"
@ -149,6 +150,7 @@ func (f *mockStep) ReportInfo() *report.StepInfo {
func TestReplication(t *testing.T) {
ctx := context.Background()
defer trace.WithTaskFromStackUpdateCtx(&ctx)()
mp := &mockPlanner{}
getReport, wait := Do(ctx, mp)

View File

@ -2,8 +2,10 @@ package driver
import (
"container/heap"
"context"
"time"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/util/chainlock"
)
@ -155,7 +157,8 @@ func (q *stepQueue) sendAndWaitForWakeup(ident interface{}, targetDate time.Time
}
// Wait for the ident with targetDate to be selected to run.
func (q *stepQueue) WaitReady(ident interface{}, targetDate time.Time) StepCompletedFunc {
func (q *stepQueue) WaitReady(ctx context.Context, ident interface{}, targetDate time.Time) StepCompletedFunc {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
if targetDate.IsZero() {
panic("targetDate of zero is reserved for marking Done")
}

View File

@ -1,6 +1,7 @@
package driver
import (
"context"
"fmt"
"math"
"sort"
@ -11,18 +12,23 @@ import (
"github.com/montanaflynn/stats"
"github.com/stretchr/testify/assert"
"github.com/zrepl/zrepl/daemon/logging/trace"
)
// FIXME: this test relies on timing and is thus rather flaky
// (relies on scheduler responsiveness of < 500ms)
func TestPqNotconcurrent(t *testing.T) {
ctx, end := trace.WithTaskFromStack(context.Background())
defer end()
var ctr uint32
q := newStepQueue()
var wg sync.WaitGroup
wg.Add(4)
go func() {
ctx, end := trace.WithTaskFromStack(ctx)
defer end()
defer wg.Done()
defer q.WaitReady("1", time.Unix(9999, 0))()
defer q.WaitReady(ctx, "1", time.Unix(9999, 0))()
ret := atomic.AddUint32(&ctr, 1)
assert.Equal(t, uint32(1), ret)
time.Sleep(1 * time.Second)
@ -34,20 +40,26 @@ func TestPqNotconcurrent(t *testing.T) {
// while "1" is still running, queue in "2", "3" and "4"
go func() {
ctx, end := trace.WithTaskFromStack(ctx)
defer end()
defer wg.Done()
defer q.WaitReady("2", time.Unix(2, 0))()
defer q.WaitReady(ctx, "2", time.Unix(2, 0))()
ret := atomic.AddUint32(&ctr, 1)
assert.Equal(t, uint32(2), ret)
}()
go func() {
ctx, end := trace.WithTaskFromStack(ctx)
defer end()
defer wg.Done()
defer q.WaitReady("3", time.Unix(3, 0))()
defer q.WaitReady(ctx, "3", time.Unix(3, 0))()
ret := atomic.AddUint32(&ctr, 1)
assert.Equal(t, uint32(3), ret)
}()
go func() {
ctx, end := trace.WithTaskFromStack(ctx)
defer end()
defer wg.Done()
defer q.WaitReady("4", time.Unix(4, 0))()
defer q.WaitReady(ctx, "4", time.Unix(4, 0))()
ret := atomic.AddUint32(&ctr, 1)
assert.Equal(t, uint32(4), ret)
}()
@ -77,6 +89,8 @@ func (r record) String() string {
// Hence, perform some statistics on the wakeup times and assert that the mean wakeup
// times for each step are close together.
func TestPqConcurrent(t *testing.T) {
ctx, end := trace.WithTaskFromStack(context.Background())
defer end()
q := newStepQueue()
var wg sync.WaitGroup
@ -90,12 +104,14 @@ func TestPqConcurrent(t *testing.T) {
records := make(chan []record, filesystems)
for fs := 0; fs < filesystems; fs++ {
go func(fs int) {
ctx, end := trace.WithTaskFromStack(ctx)
defer end()
defer wg.Done()
recs := make([]record, 0)
for step := 0; step < stepsPerFS; step++ {
pos := atomic.AddUint32(&globalCtr, 1)
t := time.Unix(int64(step), 0)
done := q.WaitReady(fs, t)
done := q.WaitReady(ctx, fs, t)
wakeAt := time.Since(begin)
time.Sleep(sleepTimePerStep)
done()

View File

@ -9,7 +9,9 @@ import (
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/replication/driver"
. "github.com/zrepl/zrepl/replication/logic/diff"
"github.com/zrepl/zrepl/replication/logic/pdu"
@ -80,6 +82,8 @@ func (p *Planner) WaitForConnectivity(ctx context.Context) error {
var wg sync.WaitGroup
doPing := func(endpoint Endpoint, errOut *error) {
defer wg.Done()
ctx, endTask := trace.WithTaskFromStack(ctx)
defer endTask()
err := endpoint.WaitForConnectivity(ctx)
if err != nil {
*errOut = err
@ -303,9 +307,11 @@ func (p *Planner) doPlanning(ctx context.Context) ([]*Filesystem, error) {
func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) {
log := getLogger(ctx).WithField("filesystem", fs.Path)
log := func(ctx context.Context) logger.Logger {
return getLogger(ctx).WithField("filesystem", fs.Path)
}
log.Debug("assessing filesystem")
log(ctx).Debug("assessing filesystem")
if fs.policy.EncryptedSend == True && !fs.senderFS.GetIsEncrypted() {
return nil, fmt.Errorf("sender filesystem is not encrypted but policy mandates encrypted send")
@ -313,14 +319,14 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) {
sfsvsres, err := fs.sender.ListFilesystemVersions(ctx, &pdu.ListFilesystemVersionsReq{Filesystem: fs.Path})
if err != nil {
log.WithError(err).Error("cannot get remote filesystem versions")
log(ctx).WithError(err).Error("cannot get remote filesystem versions")
return nil, err
}
sfsvs := sfsvsres.GetVersions()
if len(sfsvs) < 1 {
err := errors.New("sender does not have any versions")
log.Error(err.Error())
log(ctx).Error(err.Error())
return nil, err
}
@ -328,7 +334,7 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) {
if fs.receiverFS != nil && !fs.receiverFS.GetIsPlaceholder() {
rfsvsres, err := fs.receiver.ListFilesystemVersions(ctx, &pdu.ListFilesystemVersionsReq{Filesystem: fs.Path})
if err != nil {
log.WithError(err).Error("receiver error")
log(ctx).WithError(err).Error("receiver error")
return nil, err
}
rfsvs = rfsvsres.GetVersions()
@ -340,17 +346,17 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) {
var resumeTokenRaw string
if fs.receiverFS != nil && fs.receiverFS.ResumeToken != "" {
resumeTokenRaw = fs.receiverFS.ResumeToken // shadow
log.WithField("receiverFS.ResumeToken", resumeTokenRaw).Debug("decode receiver fs resume token")
log(ctx).WithField("receiverFS.ResumeToken", resumeTokenRaw).Debug("decode receiver fs resume token")
resumeToken, err = zfs.ParseResumeToken(ctx, resumeTokenRaw) // shadow
if err != nil {
// TODO in theory, we could do replication without resume token, but that would mean that
// we need to discard the resumable state on the receiver's side.
// Would be easy by setting UsedResumeToken=false in the RecvReq ...
// FIXME / CHECK semantics UsedResumeToken if SendReq.ResumeToken == ""
log.WithError(err).Error("cannot decode resume token, aborting")
log(ctx).WithError(err).Error("cannot decode resume token, aborting")
return nil, err
}
log.WithField("token", resumeToken).Debug("decode resume token")
log(ctx).WithField("token", resumeToken).Debug("decode resume token")
}
// give both sides a hint about how far prior replication attempts got
@ -369,7 +375,10 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) {
var wg sync.WaitGroup
doHint := func(ep Endpoint, name string) {
defer wg.Done()
log := log.WithField("to_side", name).
ctx, endTask := trace.WithTask(ctx, "hint-mrca-"+name)
defer endTask()
log := log(ctx).WithField("to_side", name).
WithField("sender_mrca", sender_mrca.String())
log.Debug("hint most recent common ancestor")
hint := &pdu.HintMostRecentCommonAncestorReq{
@ -428,7 +437,7 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) {
encryptionMatches = true
}
log.WithField("fromVersion", fromVersion).
log(ctx).WithField("fromVersion", fromVersion).
WithField("toVersion", toVersion).
WithField("encryptionMatches", encryptionMatches).
Debug("result of resume-token-matching to sender's versions")
@ -484,11 +493,11 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) {
var msg string
path, msg = resolveConflict(conflict) // no shadowing allowed!
if path != nil {
log.WithField("conflict", conflict).Info("conflict")
log.WithField("resolution", msg).Info("automatically resolved")
log(ctx).WithField("conflict", conflict).Info("conflict")
log(ctx).WithField("resolution", msg).Info("automatically resolved")
} else {
log.WithField("conflict", conflict).Error("conflict")
log.WithField("problem", msg).Error("cannot resolve conflict")
log(ctx).WithField("conflict", conflict).Error("conflict")
log(ctx).WithField("problem", msg).Error("cannot resolve conflict")
}
}
if len(path) == 0 {
@ -522,37 +531,35 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) {
}
if len(steps) == 0 {
log.Info("planning determined that no replication steps are required")
log(ctx).Info("planning determined that no replication steps are required")
}
log.Debug("compute send size estimate")
log(ctx).Debug("compute send size estimate")
errs := make(chan error, len(steps))
var wg sync.WaitGroup
fanOutCtx, fanOutCancel := context.WithCancel(ctx)
_, fanOutAdd, fanOutWait := trace.WithTaskGroup(fanOutCtx, "compute-size-estimate")
defer fanOutCancel()
for _, step := range steps {
wg.Add(1)
go func(step *Step) {
defer wg.Done()
step := step // local copy that is moved into the closure
fanOutAdd(func(ctx context.Context) {
// TODO instead of the semaphore, rely on resource-exhaustion signaled by the remote endpoint to limit size-estimate requests
// Send is handled over rpc/dataconn ATM, which doesn't support the resource exhaustion status codes that gRPC defines
guard, err := fs.sizeEstimateRequestSem.Acquire(fanOutCtx)
guard, err := fs.sizeEstimateRequestSem.Acquire(ctx)
if err != nil {
fanOutCancel()
return
}
defer guard.Release()
err = step.updateSizeEstimate(fanOutCtx)
err = step.updateSizeEstimate(ctx)
if err != nil {
log.WithError(err).WithField("step", step).Error("error computing size estimate")
log(ctx).WithError(err).WithField("step", step).Error("error computing size estimate")
fanOutCancel()
}
errs <- err
}(step)
})
}
wg.Wait()
fanOutWait()
close(errs)
var significantErr error = nil
for err := range errs {
@ -566,7 +573,7 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) {
return nil, significantErr
}
log.Debug("filesystem planning finished")
log(ctx).Debug("filesystem planning finished")
return steps, nil
}

View File

@ -3,26 +3,10 @@ package logic
import (
"context"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/logger"
)
type contextKey int
const (
contextKeyLog contextKey = iota
)
type Logger = logger.Logger
func WithLogger(ctx context.Context, l Logger) context.Context {
ctx = context.WithValue(ctx, contextKeyLog, l)
return ctx
}
func getLogger(ctx context.Context) Logger {
l, ok := ctx.Value(contextKeyLog).(Logger)
if !ok {
l = logger.NewNullLogger()
}
return l
func getLogger(ctx context.Context) logger.Logger {
return logging.GetLogger(ctx, logging.SubsysReplication)
}

View File

@ -143,7 +143,6 @@ func (c *Client) ReqSend(ctx context.Context, req *pdu.SendReq) (*pdu.SendRes, i
}
func (c *Client) ReqRecv(ctx context.Context, req *pdu.ReceiveReq, stream io.ReadCloser) (*pdu.ReceiveRes, error) {
defer c.log.Debug("ReqRecv returns")
conn, err := c.getWire(ctx)
if err != nil {

View File

@ -33,16 +33,33 @@ type Handler interface {
type Logger = logger.Logger
type ContextInterceptorData interface {
FullMethod() string
ClientIdentity() string
}
type ContextInterceptor = func(ctx context.Context, data ContextInterceptorData, handler func(ctx context.Context))
type Server struct {
h Handler
wi WireInterceptor
ci ContextInterceptor
log Logger
}
func NewServer(wi WireInterceptor, logger Logger, handler Handler) *Server {
var noopContextInteceptor = func(ctx context.Context, _ ContextInterceptorData, handler func(context.Context)) {
handler(ctx)
}
// wi and ci may be nil
func NewServer(wi WireInterceptor, ci ContextInterceptor, logger Logger, handler Handler) *Server {
if ci == nil {
ci = noopContextInteceptor
}
return &Server{
h: handler,
wi: wi,
ci: ci,
log: logger,
}
}
@ -93,6 +110,14 @@ func (s *Server) Serve(ctx context.Context, l transport.AuthenticatedListener) {
}
}
type contextInterceptorData struct {
fullMethod string
clientIdentity string
}
func (d contextInterceptorData) FullMethod() string { return d.fullMethod }
func (d contextInterceptorData) ClientIdentity() string { return d.clientIdentity }
func (s *Server) serveConn(nc *transport.AuthConn) {
s.log.Debug("serveConn begin")
defer s.log.Debug("serveConn done")
@ -117,6 +142,17 @@ func (s *Server) serveConn(nc *transport.AuthConn) {
}
endpoint := string(header)
data := contextInterceptorData{
fullMethod: endpoint,
clientIdentity: nc.ClientIdentity(),
}
s.ci(ctx, data, func(ctx context.Context) {
s.serveConnRequest(ctx, endpoint, c)
})
}
func (s *Server) serveConnRequest(ctx context.Context, endpoint string, c *stream.Conn) {
reqStructured, err := c.ReadStreamedMessage(ctx, RequestStructuredMaxSize, ReqStructured)
if err != nil {
s.log.WithError(err).Error("error reading structured part")

View File

@ -112,7 +112,7 @@ func server() {
orDie(err)
l := tcpListener{nl.(*net.TCPListener), "fakeclientidentity"}
srv := dataconn.NewServer(nil, logger.NewStderrDebugLogger(), devNullHandler{})
srv := dataconn.NewServer(nil, nil, logger.NewStderrDebugLogger(), devNullHandler{})
ctx := context.Background()

View File

@ -7,6 +7,7 @@ import (
"io"
"net"
"strings"
"sync"
"sync/atomic"
"unicode/utf8"
@ -80,9 +81,13 @@ func doWriteStream(ctx context.Context, c *heartbeatconn.Conn, stream io.Reader,
err error
}
var wg sync.WaitGroup
defer wg.Wait()
reads := make(chan read, 5)
var stopReading uint32
wg.Add(1)
go func() {
defer wg.Done()
defer close(reads)
for atomic.LoadUint32(&stopReading) == 0 {
buffer := bufpool.Get(1 << FramePayloadShift)

View File

@ -99,10 +99,23 @@ func (*transportCredentials) OverrideServerName(string) error {
panic("not implemented")
}
type ContextInterceptor = func(ctx context.Context) context.Context
type ContextInterceptorData interface {
FullMethod() string
ClientIdentity() string
}
func NewInterceptors(logger Logger, clientIdentityKey interface{}, ctxInterceptor ContextInterceptor) (unary grpc.UnaryServerInterceptor, stream grpc.StreamServerInterceptor) {
unary = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
type contextInterceptorData struct {
fullMethod string
clientIdentity string
}
func (d contextInterceptorData) FullMethod() string { return d.fullMethod }
func (d contextInterceptorData) ClientIdentity() string { return d.clientIdentity }
type Interceptor = func(ctx context.Context, data ContextInterceptorData, handler func(ctx context.Context))
func NewInterceptors(logger Logger, clientIdentityKey interface{}, interceptor Interceptor) (unary grpc.UnaryServerInterceptor, stream grpc.StreamServerInterceptor) {
unary = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
logger.WithField("fullMethod", info.FullMethod).Debug("request")
p, ok := peer.FromContext(ctx)
if !ok {
@ -115,10 +128,18 @@ func NewInterceptors(logger Logger, clientIdentityKey interface{}, ctxIntercepto
}
logger.WithField("peer_client_identity", a.clientIdentity).Debug("peer client identity")
ctx = context.WithValue(ctx, clientIdentityKey, a.clientIdentity)
if ctxInterceptor != nil {
ctx = ctxInterceptor(ctx)
data := contextInterceptorData{
fullMethod: info.FullMethod,
clientIdentity: a.clientIdentity,
}
return handler(ctx, req)
var (
resp interface{}
err error
)
interceptor(ctx, data, func(ctx context.Context) {
resp, err = handler(ctx, req) // no-shadow
})
return resp, err
}
stream = func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
panic("unimplemented")

View File

@ -50,7 +50,7 @@ func ClientConn(cn transport.Connecter, log Logger) *grpc.ClientConn {
}
// NewServer is a convenience interface around the TransportCredentials and Interceptors interface.
func NewServer(authListener transport.AuthenticatedListener, clientIdentityKey interface{}, logger grpcclientidentity.Logger, ctxInterceptor grpcclientidentity.ContextInterceptor) (srv *grpc.Server, serve func() error) {
func NewServer(authListener transport.AuthenticatedListener, clientIdentityKey interface{}, logger grpcclientidentity.Logger, ctxInterceptor grpcclientidentity.Interceptor) (srv *grpc.Server, serve func() error) {
ka := grpc.KeepaliveParams(keepalive.ServerParameters{
Time: StartKeepalivesAfterInactivityDuration,
Timeout: KeepalivePeerTimeout,

View File

@ -10,6 +10,7 @@ import (
"sync/atomic"
"time"
"github.com/zrepl/zrepl/daemon/logging/trace"
"google.golang.org/grpc"
"github.com/google/uuid"
@ -83,6 +84,9 @@ func (c *Client) Close() {
// callers must ensure that the returned io.ReadCloser is closed
// TODO expose dataClient interface to the outside world
func (c *Client) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) {
ctx, endSpan := trace.WithSpan(ctx, "rpc.client.Send")
defer endSpan()
// TODO the returned sendStream may return a read error created by the remote side
res, stream, err := c.dataClient.ReqSend(ctx, r)
if err != nil {
@ -97,34 +101,58 @@ func (c *Client) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
}
func (c *Client) Receive(ctx context.Context, req *pdu.ReceiveReq, stream io.ReadCloser) (*pdu.ReceiveRes, error) {
ctx, endSpan := trace.WithSpan(ctx, "rpc.client.Receive")
defer endSpan()
return c.dataClient.ReqRecv(ctx, req, stream)
}
func (c *Client) ListFilesystems(ctx context.Context, in *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error) {
ctx, endSpan := trace.WithSpan(ctx, "rpc.client.ListFilesystems")
defer endSpan()
return c.controlClient.ListFilesystems(ctx, in)
}
func (c *Client) ListFilesystemVersions(ctx context.Context, in *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error) {
ctx, endSpan := trace.WithSpan(ctx, "rpc.client.ListFilesystemVersions")
defer endSpan()
return c.controlClient.ListFilesystemVersions(ctx, in)
}
func (c *Client) DestroySnapshots(ctx context.Context, in *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) {
ctx, endSpan := trace.WithSpan(ctx, "rpc.client.DestroySnapshots")
defer endSpan()
return c.controlClient.DestroySnapshots(ctx, in)
}
func (c *Client) ReplicationCursor(ctx context.Context, in *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) {
ctx, endSpan := trace.WithSpan(ctx, "rpc.client.ReplicationCursor")
defer endSpan()
return c.controlClient.ReplicationCursor(ctx, in)
}
func (c *Client) SendCompleted(ctx context.Context, in *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) {
ctx, endSpan := trace.WithSpan(ctx, "rpc.client.SendCompleted")
defer endSpan()
return c.controlClient.SendCompleted(ctx, in)
}
func (c *Client) HintMostRecentCommonAncestor(ctx context.Context, in *pdu.HintMostRecentCommonAncestorReq) (*pdu.HintMostRecentCommonAncestorRes, error) {
ctx, endSpan := trace.WithSpan(ctx, "rpc.client.HintMostRecentCommonAncestor")
defer endSpan()
return c.controlClient.HintMostRecentCommonAncestor(ctx, in)
}
func (c *Client) WaitForConnectivity(ctx context.Context) error {
ctx, endSpan := trace.WithSpan(ctx, "rpc.client.WaitForConnectivity")
defer endSpan()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
msg := uuid.New().String()

View File

@ -3,17 +3,12 @@ package rpc
import (
"context"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/logger"
)
type Logger = logger.Logger
type contextKey int
const (
contextKeyLoggers contextKey = iota
)
/// All fields must be non-nil
type Loggers struct {
General Logger
@ -21,11 +16,10 @@ type Loggers struct {
Data Logger
}
func WithLoggers(ctx context.Context, loggers Loggers) context.Context {
ctx = context.WithValue(ctx, contextKeyLoggers, loggers)
return ctx
}
func GetLoggersOrPanic(ctx context.Context) Loggers {
return ctx.Value(contextKeyLoggers).(Loggers)
return Loggers{
General: logging.GetLogger(ctx, logging.SubsysRPC),
Control: logging.GetLogger(ctx, logging.SubsysRPCControl),
Data: logging.GetLogger(ctx, logging.SubsysRPCData),
}
}

View File

@ -7,6 +7,7 @@ import (
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/replication/logic/pdu"
"github.com/zrepl/zrepl/rpc/dataconn"
"github.com/zrepl/zrepl/rpc/grpcclientidentity"
"github.com/zrepl/zrepl/rpc/grpcclientidentity/grpchelper"
"github.com/zrepl/zrepl/rpc/versionhandshake"
"github.com/zrepl/zrepl/transport"
@ -30,7 +31,20 @@ type Server struct {
dataServerServe serveFunc
}
type HandlerContextInterceptor func(ctx context.Context) context.Context
type HandlerContextInterceptorData interface {
FullMethod() string
ClientIdentity() string
}
type interceptorData struct {
prefixMethod string
wrapped HandlerContextInterceptorData
}
func (d interceptorData) ClientIdentity() string { return d.wrapped.ClientIdentity() }
func (d interceptorData) FullMethod() string { return d.prefixMethod + d.wrapped.FullMethod() }
type HandlerContextInterceptor func(ctx context.Context, data HandlerContextInterceptorData, handler func(ctx context.Context))
// config must be valid (use its Validate function).
func NewServer(handler Handler, loggers Loggers, ctxInterceptor HandlerContextInterceptor) *Server {
@ -38,7 +52,10 @@ func NewServer(handler Handler, loggers Loggers, ctxInterceptor HandlerContextIn
// setup control server
controlServerServe := func(ctx context.Context, controlListener transport.AuthenticatedListener, errOut chan<- error) {
controlServer, serve := grpchelper.NewServer(controlListener, endpoint.ClientIdentityKey, loggers.Control, ctxInterceptor)
var controlCtxInterceptor grpcclientidentity.Interceptor = func(ctx context.Context, data grpcclientidentity.ContextInterceptorData, handler func(ctx context.Context)) {
ctxInterceptor(ctx, interceptorData{"control://", data}, handler)
}
controlServer, serve := grpchelper.NewServer(controlListener, endpoint.ClientIdentityKey, loggers.Control, controlCtxInterceptor)
pdu.RegisterReplicationServer(controlServer, handler)
// give time for graceful stop until deadline expires, then hard stop
@ -59,12 +76,12 @@ func NewServer(handler Handler, loggers Loggers, ctxInterceptor HandlerContextIn
dataServerClientIdentitySetter := func(ctx context.Context, wire *transport.AuthConn) (context.Context, *transport.AuthConn) {
ci := wire.ClientIdentity()
ctx = context.WithValue(ctx, endpoint.ClientIdentityKey, ci)
if ctxInterceptor != nil {
ctx = ctxInterceptor(ctx) // SHADOWING
}
return ctx, wire
}
dataServer := dataconn.NewServer(dataServerClientIdentitySetter, loggers.Data, handler)
var dataCtxInterceptor dataconn.ContextInterceptor = func(ctx context.Context, data dataconn.ContextInterceptorData, handler func(ctx context.Context)) {
ctxInterceptor(ctx, interceptorData{"data://", data}, handler)
}
dataServer := dataconn.NewServer(dataServerClientIdentitySetter, dataCtxInterceptor, loggers.Data, handler)
dataServerServe := func(ctx context.Context, dataListener transport.AuthenticatedListener, errOut chan<- error) {
dataServer.Serve(ctx, dataListener)
errOut <- nil // TODO bad design of dataServer?

View File

@ -15,27 +15,15 @@ import (
"net"
"time"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/transport"
)
type contextKey int
const (
contextKeyLog contextKey = 1 + iota
)
type Logger = logger.Logger
func WithLogger(ctx context.Context, log Logger) context.Context {
return context.WithValue(ctx, contextKeyLog, log)
}
func getLog(ctx context.Context) Logger {
if l, ok := ctx.Value(contextKeyLog).(Logger); ok {
return l
}
return logger.NewNullLogger()
return logging.GetLogger(ctx, logging.SubsysTransportMux)
}
type acceptRes struct {

View File

@ -8,6 +8,7 @@ import (
"syscall"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/rpc/dataconn/timeoutconn"
"github.com/zrepl/zrepl/zfs"
@ -63,19 +64,8 @@ func ValidateClientIdentity(in string) error {
return nil
}
type contextKey int
const contextKeyLog contextKey = 0
type Logger = logger.Logger
func WithLogger(ctx context.Context, log Logger) context.Context {
return context.WithValue(ctx, contextKeyLog, log)
}
func GetLogger(ctx context.Context) Logger {
if log, ok := ctx.Value(contextKeyLog).(Logger); ok {
return log
}
return logger.NewNullLogger()
return logging.GetLogger(ctx, logging.SubsysTransport)
}

View File

@ -3,6 +3,7 @@ package semaphore
import (
"context"
"github.com/zrepl/zrepl/daemon/logging/trace"
wsemaphore "golang.org/x/sync/semaphore"
)
@ -21,6 +22,7 @@ type AcquireGuard struct {
// The returned AcquireGuard is not goroutine-safe.
func (s *S) Acquire(ctx context.Context) (*AcquireGuard, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
if err := s.ws.Acquire(ctx, 1); err != nil {
return nil, err
} else if err := ctx.Err(); err != nil {

View File

@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zrepl/zrepl/daemon/logging/trace"
)
func TestSemaphore(t *testing.T) {
@ -24,12 +25,17 @@ func TestSemaphore(t *testing.T) {
beforeT, afterT uint32
}
ctx := context.Background()
defer trace.WithTaskFromStackUpdateCtx(&ctx)()
var wg sync.WaitGroup
wg.Add(numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func() {
ctx, end := trace.WithTaskFromStack(ctx)
defer end()
defer wg.Done()
res, err := sem.Acquire(context.Background())
res, err := sem.Acquire(ctx)
require.NoError(t, err)
defer res.Release()
if time.Since(begin) > sleepTime {

View File

@ -38,8 +38,8 @@ func (o *DestroySnapOp) String() string {
return fmt.Sprintf("destroy operation %s@%s", o.Filesystem, o.Name)
}
func ZFSDestroyFilesystemVersions(reqs []*DestroySnapOp) {
doDestroy(context.TODO(), reqs, destroyerSingleton)
func ZFSDestroyFilesystemVersions(ctx context.Context, reqs []*DestroySnapOp) {
doDestroy(ctx, reqs, destroyerSingleton)
}
func setDestroySnapOpErr(b []*DestroySnapOp, err error) {

View File

@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"
)
// FIXME make this a platformtest
func TestZFSListHandlesProducesZFSErrorOnNonZeroExit(t *testing.T) {
t.SkipNow() // FIXME ZFS_BINARY does not work if tests run in parallel

View File

@ -23,7 +23,7 @@ type RuntimeLine struct {
Error string
}
var humanFormatterLineRE = regexp.MustCompile(`^(\[[^\]]+\]){2}\[zfs.cmd\]:\s+command\s+exited\s+(with|without)\s+error\s+(.+)`)
var humanFormatterLineRE = regexp.MustCompile(`^(\[[^\]]+\]){2}\[zfs.cmd\]\[[^\]]+\]:\s+command\s+exited\s+(with|without)\s+error\s+(.+)`)
func parseSecs(s string) (time.Duration, error) {
d, err := time.ParseDuration(s + "s")

View File

@ -30,7 +30,7 @@ func TestParseHumanFormatter(t *testing.T) {
tcs := []testCase{
{
Name: "human-formatter-noerror",
Input: `2020-04-04T00:00:05+02:00 [DEBG][jobname][zfs.cmd]: command exited without error usertime_s="0.008445" cmd="zfs list -H -p -o name -r -t filesystem,volume" systemtime_s="0.033783" invocation="84" total_time_s="0.037828619"`,
Input: `2020-04-04T00:00:05+02:00 [DEBG][jobname][zfs.cmd][task$stack$span.stack]: command exited without error usertime_s="0.008445" cmd="zfs list -H -p -o name -r -t filesystem,volume" systemtime_s="0.033783" invocation="84" total_time_s="0.037828619"`,
Expect: &RuntimeLine{
Cmd: "zfs list -H -p -o name -r -t filesystem,volume",
TotalTime: secs("0.037828619"),
@ -42,7 +42,7 @@ func TestParseHumanFormatter(t *testing.T) {
},
{
Name: "human-formatter-witherror",
Input: `2020-04-04T00:00:05+02:00 [DEBG][jobname][zfs.cmd]: command exited with error usertime_s="0.008445" cmd="zfs list -H -p -o name -r -t filesystem,volume" systemtime_s="0.033783" invocation="84" total_time_s="0.037828619" err="some error"`,
Input: `2020-04-04T00:00:05+02:00 [DEBG][jobname][zfs.cmd][task$stack$span.stack]: command exited with error usertime_s="0.008445" cmd="zfs list -H -p -o name -r -t filesystem,volume" systemtime_s="0.033783" invocation="84" total_time_s="0.037828619" err="some error"`,
Expect: &RuntimeLine{
Cmd: "zfs list -H -p -o name -r -t filesystem,volume",
TotalTime: secs("0.037828619"),
@ -54,7 +54,7 @@ func TestParseHumanFormatter(t *testing.T) {
},
{
Name: "from graylog",
Input: `2020-04-04T00:00:05+02:00 [DEBG][csnas][zfs.cmd]: command exited without error usertime_s="0" cmd="zfs send -i zroot/ezjail/synapse-12@zrepl_20200329_095518_000 zroot/ezjail/synapse-12@zrepl_20200329_102454_000" total_time_s="0.101598591" invocation="85" systemtime_s="0.041581"`,
Input: `2020-04-04T00:00:05+02:00 [DEBG][csnas][zfs.cmd][task$stack$span.stack]: command exited without error usertime_s="0" cmd="zfs send -i zroot/ezjail/synapse-12@zrepl_20200329_095518_000 zroot/ezjail/synapse-12@zrepl_20200329_102454_000" total_time_s="0.101598591" invocation="85" systemtime_s="0.041581"`,
Expect: &RuntimeLine{
Cmd: "zfs send -i zroot/ezjail/synapse-12@zrepl_20200329_095518_000 zroot/ezjail/synapse-12@zrepl_20200329_102454_000",
TotalTime: secs("0.101598591"),

View File

@ -14,6 +14,7 @@ import (
"sync"
"time"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/util/circlog"
)
@ -22,6 +23,7 @@ type Cmd struct {
ctx context.Context
mtx sync.RWMutex
startedAt, waitStartedAt, waitReturnedAt time.Time
waitReturnEndSpanCb trace.DoneFunc
}
func CommandContext(ctx context.Context, name string, arg ...string) *Cmd {
@ -31,7 +33,7 @@ func CommandContext(ctx context.Context, name string, arg ...string) *Cmd {
// err.(*exec.ExitError).Stderr will NOT be set
func (c *Cmd) CombinedOutput() (o []byte, err error) {
c.startPre()
c.startPre(false)
c.startPost(nil)
c.waitPre()
o, err = c.cmd.CombinedOutput()
@ -41,7 +43,7 @@ func (c *Cmd) CombinedOutput() (o []byte, err error) {
// err.(*exec.ExitError).Stderr will be set
func (c *Cmd) Output() (o []byte, err error) {
c.startPre()
c.startPre(false)
c.startPost(nil)
c.waitPre()
o, err = c.cmd.Output()
@ -78,7 +80,7 @@ func (c *Cmd) log() Logger {
}
func (c *Cmd) Start() (err error) {
c.startPre()
c.startPre(true)
err = c.cmd.Start()
c.startPost(err)
return err
@ -95,15 +97,17 @@ func (c *Cmd) Process() *os.Process {
func (c *Cmd) Wait() (err error) {
c.waitPre()
err = c.cmd.Wait()
if !c.waitReturnedAt.IsZero() {
// ignore duplicate waits
return err
}
c.waitPost(err)
return err
}
func (c *Cmd) startPre() {
func (c *Cmd) startPre(newTask bool) {
if newTask {
// avoid explosion of tasks with name c.String()
c.ctx, c.waitReturnEndSpanCb = trace.WithTaskAndSpan(c.ctx, "zfscmd", c.String())
} else {
c.ctx, c.waitReturnEndSpanCb = trace.WithSpan(c.ctx, c.String())
}
startPreLogging(c, time.Now())
}
@ -178,6 +182,9 @@ func (c *Cmd) waitPost(err error) {
waitPostReport(c, u, now)
waitPostLogging(c, u, err, now)
waitPostPrometheus(c, u, err, now)
// must be last because c.ctx might be used by other waitPost calls
c.waitReturnEndSpanCb()
}
// returns 0 if the command did not yet finish

View File

@ -3,14 +3,14 @@ package zfscmd
import (
"context"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/logger"
)
type contextKey int
const (
contextKeyLogger contextKey = iota
contextKeyJobID
contextKeyJobID contextKey = 1 + iota
)
type Logger = logger.Logger
@ -27,13 +27,6 @@ func getJobIDOrDefault(ctx context.Context, def string) string {
return ret
}
func WithLogger(ctx context.Context, log Logger) context.Context {
return context.WithValue(ctx, contextKeyLogger, log)
}
func getLogger(ctx context.Context) Logger {
if l, ok := ctx.Value(contextKeyLogger).(Logger); ok {
return l
}
return logger.NewNullLogger()
return logging.GetLogger(ctx, logging.SubsysZFSCmd)
}

View File

@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zrepl/zrepl/util/circlog"
)