mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-25 01:44:43 +01:00
[WIP] factor out trace functionality into separate package and add Go docs
This commit is contained in:
parent
1ae087bfcf
commit
fc9dbdf449
@ -7,9 +7,9 @@ import (
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/pflag"
|
||||
"github.com/zrepl/zrepl/daemon/logging/trace"
|
||||
|
||||
"github.com/zrepl/zrepl/config"
|
||||
"github.com/zrepl/zrepl/daemon/logging"
|
||||
)
|
||||
|
||||
var rootArgs struct {
|
||||
@ -99,7 +99,7 @@ func (s *Subcommand) Config() *config.Config {
|
||||
func (s *Subcommand) run(cmd *cobra.Command, args []string) {
|
||||
s.tryParseConfig()
|
||||
ctx := context.Background()
|
||||
endTask := logging.WithTaskFromStackUpdateCtx(&ctx)
|
||||
endTask := trace.WithTaskFromStackUpdateCtx(&ctx)
|
||||
defer endTask()
|
||||
err := s.Run(ctx, s, args)
|
||||
endTask()
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/zrepl/zrepl/daemon/logging/trace"
|
||||
|
||||
"github.com/zrepl/zrepl/config"
|
||||
"github.com/zrepl/zrepl/daemon/job"
|
||||
@ -84,7 +85,7 @@ func Run(ctx context.Context, conf *config.Config) error {
|
||||
|
||||
// register global (=non job-local) metrics
|
||||
zfscmd.RegisterMetrics(prometheus.DefaultRegisterer)
|
||||
logging.RegisterMetrics(prometheus.DefaultRegisterer)
|
||||
trace.RegisterMetrics(prometheus.DefaultRegisterer)
|
||||
|
||||
log.Info("starting daemon")
|
||||
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"text/template"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/zrepl/zrepl/daemon/logging/trace"
|
||||
|
||||
"github.com/zrepl/zrepl/config"
|
||||
"github.com/zrepl/zrepl/daemon/hooks"
|
||||
@ -70,7 +71,7 @@ func curry(f comparisonAssertionFunc, expected interface{}, right bool) (ret val
|
||||
}
|
||||
|
||||
func TestHooks(t *testing.T) {
|
||||
ctx, end := logging.WithTaskFromStack(context.Background())
|
||||
ctx, end := trace.WithTaskFromStack(context.Background())
|
||||
defer end()
|
||||
|
||||
testFSName := "testpool/testdataset"
|
||||
|
@ -9,12 +9,12 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/common/log"
|
||||
"github.com/zrepl/zrepl/daemon/logging/trace"
|
||||
|
||||
"github.com/zrepl/zrepl/config"
|
||||
"github.com/zrepl/zrepl/daemon/filters"
|
||||
"github.com/zrepl/zrepl/daemon/job/reset"
|
||||
"github.com/zrepl/zrepl/daemon/job/wakeup"
|
||||
"github.com/zrepl/zrepl/daemon/logging"
|
||||
"github.com/zrepl/zrepl/daemon/pruner"
|
||||
"github.com/zrepl/zrepl/daemon/snapper"
|
||||
"github.com/zrepl/zrepl/endpoint"
|
||||
@ -377,7 +377,7 @@ func (j *ActiveSide) SenderConfig() *endpoint.SenderConfig {
|
||||
}
|
||||
|
||||
func (j *ActiveSide) Run(ctx context.Context) {
|
||||
ctx, endTask := logging.WithTaskAndSpan(ctx, "active-side-job", j.Name())
|
||||
ctx, endTask := trace.WithTaskAndSpan(ctx, "active-side-job", j.Name())
|
||||
defer endTask()
|
||||
log := GetLogger(ctx)
|
||||
|
||||
@ -386,7 +386,7 @@ func (j *ActiveSide) Run(ctx context.Context) {
|
||||
periodicDone := make(chan struct{})
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
periodicCtx, endTask := logging.WithTask(ctx, "periodic")
|
||||
periodicCtx, endTask := trace.WithTask(ctx, "periodic")
|
||||
defer endTask()
|
||||
go j.mode.RunPeriodic(periodicCtx, periodicDone)
|
||||
|
||||
@ -404,7 +404,7 @@ outer:
|
||||
case <-periodicDone:
|
||||
}
|
||||
invocationCount++
|
||||
invocationCtx, endSpan := logging.WithSpan(ctx, fmt.Sprintf("invocation-%d", invocationCount))
|
||||
invocationCtx, endSpan := trace.WithSpan(ctx, fmt.Sprintf("invocation-%d", invocationCount))
|
||||
j.do(invocationCtx)
|
||||
endSpan()
|
||||
}
|
||||
@ -435,7 +435,7 @@ func (j *ActiveSide) do(ctx context.Context) {
|
||||
return
|
||||
default:
|
||||
}
|
||||
ctx, endSpan := logging.WithSpan(ctx, "replication")
|
||||
ctx, endSpan := trace.WithSpan(ctx, "replication")
|
||||
ctx, repCancel := context.WithCancel(ctx)
|
||||
var repWait driver.WaitFunc
|
||||
j.updateTasks(func(tasks *activeSideTasks) {
|
||||
@ -459,7 +459,7 @@ func (j *ActiveSide) do(ctx context.Context) {
|
||||
return
|
||||
default:
|
||||
}
|
||||
ctx, endSpan := logging.WithSpan(ctx, "prune_sender")
|
||||
ctx, endSpan := trace.WithSpan(ctx, "prune_sender")
|
||||
ctx, senderCancel := context.WithCancel(ctx)
|
||||
tasks := j.updateTasks(func(tasks *activeSideTasks) {
|
||||
tasks.prunerSender = j.prunerFactory.BuildSenderPruner(ctx, sender, sender)
|
||||
@ -478,7 +478,7 @@ func (j *ActiveSide) do(ctx context.Context) {
|
||||
return
|
||||
default:
|
||||
}
|
||||
ctx, endSpan := logging.WithSpan(ctx, "prune_recever")
|
||||
ctx, endSpan := trace.WithSpan(ctx, "prune_recever")
|
||||
ctx, receiverCancel := context.WithCancel(ctx)
|
||||
tasks := j.updateTasks(func(tasks *activeSideTasks) {
|
||||
tasks.prunerReceiver = j.prunerFactory.BuildReceiverPruner(ctx, receiver, sender)
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/zrepl/zrepl/daemon/logging/trace"
|
||||
|
||||
"github.com/zrepl/zrepl/config"
|
||||
"github.com/zrepl/zrepl/daemon/filters"
|
||||
@ -164,12 +165,12 @@ func (j *PassiveSide) SenderConfig() *endpoint.SenderConfig {
|
||||
func (*PassiveSide) RegisterMetrics(registerer prometheus.Registerer) {}
|
||||
|
||||
func (j *PassiveSide) Run(ctx context.Context) {
|
||||
ctx, endTask := logging.WithTaskAndSpan(ctx, "passive-side-job", j.Name())
|
||||
ctx, endTask := trace.WithTaskAndSpan(ctx, "passive-side-job", j.Name())
|
||||
defer endTask()
|
||||
log := GetLogger(ctx)
|
||||
defer log.Info("job exiting")
|
||||
{
|
||||
ctx, endTask := logging.WithTask(ctx, "periodic") // shadowing
|
||||
ctx, endTask := trace.WithTask(ctx, "periodic") // shadowing
|
||||
defer endTask()
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
@ -182,10 +183,11 @@ func (j *PassiveSide) Run(ctx context.Context) {
|
||||
}
|
||||
|
||||
ctxInterceptor := func(handlerCtx context.Context, info rpc.HandlerContextInterceptorData, handler func(ctx context.Context)) {
|
||||
// the handlerCtx is clean => need to inherit logging config from job context
|
||||
// the handlerCtx is clean => need to inherit logging and tracing config from job context
|
||||
handlerCtx = logging.WithInherit(handlerCtx, ctx)
|
||||
handlerCtx = trace.WithInherit(handlerCtx, ctx)
|
||||
|
||||
handlerCtx, endTask := logging.WithTaskAndSpan(handlerCtx, "handler", fmt.Sprintf("job=%q client=%q method=%q", j.Name(), info.ClientIdentity(), info.FullMethod()))
|
||||
handlerCtx, endTask := trace.WithTaskAndSpan(handlerCtx, "handler", fmt.Sprintf("job=%q client=%q method=%q", j.Name(), info.ClientIdentity(), info.FullMethod()))
|
||||
defer endTask()
|
||||
handler(handlerCtx)
|
||||
}
|
||||
|
@ -7,11 +7,11 @@ import (
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/zrepl/zrepl/daemon/logging/trace"
|
||||
|
||||
"github.com/zrepl/zrepl/config"
|
||||
"github.com/zrepl/zrepl/daemon/filters"
|
||||
"github.com/zrepl/zrepl/daemon/job/wakeup"
|
||||
"github.com/zrepl/zrepl/daemon/logging"
|
||||
"github.com/zrepl/zrepl/daemon/pruner"
|
||||
"github.com/zrepl/zrepl/daemon/snapper"
|
||||
"github.com/zrepl/zrepl/endpoint"
|
||||
@ -90,7 +90,7 @@ func (j *SnapJob) OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) {
|
||||
func (j *SnapJob) SenderConfig() *endpoint.SenderConfig { return nil }
|
||||
|
||||
func (j *SnapJob) Run(ctx context.Context) {
|
||||
ctx, endTask := logging.WithTaskAndSpan(ctx, "snap-job", j.Name())
|
||||
ctx, endTask := trace.WithTaskAndSpan(ctx, "snap-job", j.Name())
|
||||
defer endTask()
|
||||
log := GetLogger(ctx)
|
||||
|
||||
@ -99,7 +99,7 @@ func (j *SnapJob) Run(ctx context.Context) {
|
||||
periodicDone := make(chan struct{})
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
periodicCtx, endTask := logging.WithTask(ctx, "snapshotting")
|
||||
periodicCtx, endTask := trace.WithTask(ctx, "snapshotting")
|
||||
defer endTask()
|
||||
go j.snapper.Run(periodicCtx, periodicDone)
|
||||
|
||||
@ -117,7 +117,7 @@ outer:
|
||||
}
|
||||
invocationCount++
|
||||
|
||||
invocationCtx, endSpan := logging.WithSpan(ctx, fmt.Sprintf("invocation-%d", invocationCount))
|
||||
invocationCtx, endSpan := trace.WithSpan(ctx, fmt.Sprintf("invocation-%d", invocationCount))
|
||||
j.doPrune(invocationCtx)
|
||||
endSpan()
|
||||
}
|
||||
@ -167,7 +167,7 @@ func (h alwaysUpToDateReplicationCursorHistory) ListFilesystems(ctx context.Cont
|
||||
}
|
||||
|
||||
func (j *SnapJob) doPrune(ctx context.Context) {
|
||||
ctx, endSpan := logging.WithSpan(ctx, "snap-job-do-prune")
|
||||
ctx, endSpan := trace.WithSpan(ctx, "snap-job-do-prune")
|
||||
defer endSpan()
|
||||
log := GetLogger(ctx)
|
||||
sender := endpoint.NewSender(endpoint.SenderConfig{
|
||||
|
@ -9,8 +9,8 @@ import (
|
||||
|
||||
"github.com/mattn/go-isatty"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/zrepl/zrepl/config"
|
||||
"github.com/zrepl/zrepl/daemon/logging/trace"
|
||||
"github.com/zrepl/zrepl/logger"
|
||||
"github.com/zrepl/zrepl/tlsconf"
|
||||
)
|
||||
@ -157,13 +157,7 @@ func getLoggerImpl(ctx context.Context, subsys Subsystem, panicIfEnded bool) log
|
||||
|
||||
l = l.WithField(SubsysField, subsys)
|
||||
|
||||
if nI := ctx.Value(contextKeyTraceNode); nI != nil {
|
||||
n := nI.(*traceNode)
|
||||
_, spanStack := currentTaskNameAndSpanStack(n)
|
||||
l = l.WithField(SpanField, spanStack)
|
||||
} else {
|
||||
l = l.WithField(SpanField, "NOSPAN")
|
||||
}
|
||||
l = l.WithField(SpanField, trace.GetSpanStackOrDefault(ctx, "NOSPAN"))
|
||||
|
||||
fields := make(logger.Fields)
|
||||
iterInjectedFields(ctx, func(field string, value interface{}) {
|
||||
|
@ -1,15 +0,0 @@
|
||||
package logging
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
)
|
||||
|
||||
const chrometraceEnableDebug = false
|
||||
|
||||
func chrometraceDebug(format string, args ...interface{}) {
|
||||
if !chrometraceEnableDebug {
|
||||
return
|
||||
}
|
||||
fmt.Fprintf(os.Stderr, format+"\n", args...)
|
||||
}
|
@ -5,13 +5,11 @@ import "context"
|
||||
type contextKey int
|
||||
|
||||
const (
|
||||
contextKeyTraceNode contextKey = 1 + iota
|
||||
contextKeyLoggers
|
||||
contextKeyLoggers contextKey = 1 + iota
|
||||
contextKeyInjectedField
|
||||
)
|
||||
|
||||
var contextKeys = []contextKey{
|
||||
contextKeyTraceNode,
|
||||
contextKeyLoggers,
|
||||
contextKeyInjectedField,
|
||||
}
|
||||
|
@ -1,20 +1,103 @@
|
||||
package logging
|
||||
// package trace provides activity tracing via ctx through Tasks and Spans
|
||||
//
|
||||
// Basic Concepts
|
||||
//
|
||||
// Tracing can be used to identify where a piece of code spends its time.
|
||||
//
|
||||
// The Go standard library provides package runtime/trace which is useful to identify CPU bottlenecks or
|
||||
// to understand what happens inside the Go runtime.
|
||||
// However, it is not ideal for application level tracing, in particular if those traces should be understandable
|
||||
// to tech-savvy users (albeit not developers).
|
||||
//
|
||||
// This package provides the concept of Tasks and Spans to express what activity is happening within an application:
|
||||
//
|
||||
// - Neither task nor span is really tangible but instead contained within the context.Context tree
|
||||
// - Tasks represent concurrent activity (i.e. goroutines).
|
||||
// - Spans represent a semantic stack trace within a task.
|
||||
//
|
||||
// As a consequence, whenever a context is propagated across goroutine boundary, you need to create a child task:
|
||||
//
|
||||
// go func(ctx context.Context) {
|
||||
// ctx, endTask = WithTask(ctx, "what-happens-inside-the-child-task")
|
||||
// defer endTask()
|
||||
// // ...
|
||||
// }(ctx)
|
||||
//
|
||||
// Within the task, you can open up a hierarchy of spans.
|
||||
// In contrast to tasks, which have can multiple concurrently running child tasks,
|
||||
// spans must nest and not cross the goroutine boundary.
|
||||
//
|
||||
// ctx, endSpan = WithSpan(ctx, "copy-dir")
|
||||
// defer endSpan()
|
||||
// for _, f := range dir.Files() {
|
||||
// func() {
|
||||
// ctx, endSpan := WithSpan(ctx, fmt.Sprintf("copy-file %q", f))
|
||||
// defer endspan()
|
||||
// b, _ := ioutil.ReadFile(f)
|
||||
// _ = ioutil.WriteFile(f + ".copy", b, 0600)
|
||||
// }()
|
||||
// }
|
||||
//
|
||||
// In combination:
|
||||
// ctx, endTask = WithTask(ctx, "copy-dirs")
|
||||
// defer endTask()
|
||||
// for i := range dirs {
|
||||
// go func(dir string) {
|
||||
// ctx, endTask := WithTask(ctx, "copy-dir")
|
||||
// defer endTask()
|
||||
// for _, f := range filesIn(dir) {
|
||||
// func() {
|
||||
// ctx, endSpan := WithSpan(ctx, fmt.Sprintf("copy-file %q", f))
|
||||
// defer endspan()
|
||||
// b, _ := ioutil.ReadFile(f)
|
||||
// _ = ioutil.WriteFile(f + ".copy", b, 0600)
|
||||
// }()
|
||||
// }
|
||||
// }()
|
||||
// }
|
||||
//
|
||||
// Note that a span ends at the time you call endSpan - not before and not after that.
|
||||
// If you violate the stack-like nesting of spans by forgetting an endSpan() invocation,
|
||||
// the out-of-order endSpan() will panic.
|
||||
//
|
||||
// A similar rule applies to the endTask closure returned by WithTask:
|
||||
// If a task has live child tasks at the time you call endTask(), the call will panic.
|
||||
//
|
||||
// Recovering from endSpan() or endTask() panics will corrupt the trace stack and lead to corrupt tracefile output.
|
||||
//
|
||||
//
|
||||
// Best Practices For Naming Tasks And Spans
|
||||
//
|
||||
// Tasks should always have string constants as names, and must not contain the `#` character. WHy?
|
||||
// First, the visualization by chrome://tracing draws a horizontal bar for each task in the trace.
|
||||
// Also, the package appends `#NUM` for each concurrently running instance of a task name.
|
||||
// Note that the `#NUM` suffix will be reused if a task has ended, in order to avoid an
|
||||
// infinite number of horizontal bars in the visualization.
|
||||
//
|
||||
//
|
||||
// Chrome-compatible Tracefile Support
|
||||
//
|
||||
// The activity trace generated by usage of WithTask and WithSpan can be rendered to a JSON output file
|
||||
// that can be loaded into chrome://tracing .
|
||||
// Apart from function GetSpanStackOrDefault, this is the main benefit of this package.
|
||||
//
|
||||
// First, there is a convenience environment variable 'ZREPL_ACTIVITY_TRACE' that can be set to an output path.
|
||||
// From process start onward, a trace is written to that path.
|
||||
//
|
||||
// More consumers can attach to the activity trace through the ChrometraceClientWebsocketHandler websocket handler.
|
||||
//
|
||||
// If a write error is encountered with any consumer (including the env-var based one), the consumer is closed and
|
||||
// will not receive further trace output.
|
||||
package trace
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/zrepl/zrepl/util/envconst"
|
||||
)
|
||||
|
||||
var metrics struct {
|
||||
@ -25,13 +108,13 @@ var metrics struct {
|
||||
func init() {
|
||||
metrics.activeTasks = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: "zrepl",
|
||||
Subsystem: "logging",
|
||||
Subsystem: "trace",
|
||||
Name: "active_tasks",
|
||||
Help: "number of active (tracing-level) tasks in the daemon",
|
||||
})
|
||||
metrics.uniqueConcurrentTaskNameBitvecLength = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "zrepl",
|
||||
Subsystem: "logging",
|
||||
Subsystem: "trace",
|
||||
Name: "unique_concurrent_task_name_bitvec_length",
|
||||
Help: "length of the bitvec used to find unique names for concurrent tasks",
|
||||
}, []string{"task_name"})
|
||||
@ -42,75 +125,27 @@ func RegisterMetrics(r prometheus.Registerer) {
|
||||
r.MustRegister(metrics.uniqueConcurrentTaskNameBitvecLength)
|
||||
}
|
||||
|
||||
type DoneFunc func()
|
||||
|
||||
func getMyCallerOrPanic() string {
|
||||
pc, _, _, ok := runtime.Caller(2)
|
||||
if !ok {
|
||||
panic("cannot get caller")
|
||||
}
|
||||
details := runtime.FuncForPC(pc)
|
||||
if ok && details != nil {
|
||||
const prefix = "github.com/zrepl/zrepl"
|
||||
return strings.TrimPrefix(strings.TrimPrefix(details.Name(), prefix), "/")
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// use like this:
|
||||
//
|
||||
// defer WithSpanFromStackUpdateCtx(&existingCtx)()
|
||||
//
|
||||
//
|
||||
func WithSpanFromStackUpdateCtx(ctx *context.Context) DoneFunc {
|
||||
childSpanCtx, end := WithSpan(*ctx, getMyCallerOrPanic())
|
||||
*ctx = childSpanCtx
|
||||
return end
|
||||
}
|
||||
|
||||
// derive task name from call stack (caller's name)
|
||||
func WithTaskFromStack(ctx context.Context) (context.Context, DoneFunc) {
|
||||
return WithTask(ctx, getMyCallerOrPanic())
|
||||
}
|
||||
|
||||
// derive task name from call stack (caller's name) and update *ctx
|
||||
// to point to be the child task ctx
|
||||
func WithTaskFromStackUpdateCtx(ctx *context.Context) DoneFunc {
|
||||
child, end := WithTask(*ctx, getMyCallerOrPanic())
|
||||
*ctx = child
|
||||
return end
|
||||
}
|
||||
|
||||
// create a task and a span within it in one call
|
||||
func WithTaskAndSpan(ctx context.Context, task string, span string) (context.Context, DoneFunc) {
|
||||
ctx, endTask := WithTask(ctx, task)
|
||||
ctx, endSpan := WithSpan(ctx, fmt.Sprintf("%s %s", task, span))
|
||||
return ctx, func() {
|
||||
endSpan()
|
||||
endTask()
|
||||
}
|
||||
}
|
||||
|
||||
// create a span during which several child tasks are spawned using the `add` function
|
||||
func WithTaskGroup(ctx context.Context, taskGroup string) (_ context.Context, add func(f func(context.Context)), waitEnd func()) {
|
||||
var wg sync.WaitGroup
|
||||
ctx, endSpan := WithSpan(ctx, taskGroup)
|
||||
add = func(f func(context.Context)) {
|
||||
wg.Add(1)
|
||||
defer wg.Done()
|
||||
ctx, endTask := WithTask(ctx, taskGroup)
|
||||
defer endTask()
|
||||
f(ctx)
|
||||
}
|
||||
waitEnd = func() {
|
||||
wg.Wait()
|
||||
endSpan()
|
||||
}
|
||||
return ctx, add, waitEnd
|
||||
}
|
||||
|
||||
var taskNamer = newUniqueTaskNamer(metrics.uniqueConcurrentTaskNameBitvecLength)
|
||||
|
||||
type traceNode struct {
|
||||
id string
|
||||
annotation string
|
||||
parentTask *traceNode
|
||||
activeChildTasks int32 // only for task nodes, insignificant for span nodes
|
||||
parentSpan *traceNode
|
||||
hasActiveChildSpan int32
|
||||
|
||||
startedAt time.Time
|
||||
endedAt time.Time
|
||||
ended int32
|
||||
}
|
||||
|
||||
// Returned from WithTask or WithSpan.
|
||||
// Must be called once the task or span ends.
|
||||
// See package-level docs for nesting rules.
|
||||
// Wrong call order / forgetting to call it will result in panics.
|
||||
type DoneFunc func()
|
||||
|
||||
// Start a new root task or create a child task of an existing task.
|
||||
//
|
||||
// This is required when starting a new goroutine and
|
||||
@ -137,7 +172,7 @@ func WithTask(ctx context.Context, taskName string) (context.Context, DoneFunc)
|
||||
taskName, taskNameDone := taskNamer.UniqueConcurrentTaskName(taskName)
|
||||
|
||||
this := &traceNode{
|
||||
id: newTraceNodeId(),
|
||||
id: genID(),
|
||||
annotation: taskName,
|
||||
parentTask: parentTask,
|
||||
activeChildTasks: 0,
|
||||
@ -185,9 +220,8 @@ func WithTask(ctx context.Context, taskName string) (context.Context, DoneFunc)
|
||||
return ctx, endTaskFunc
|
||||
}
|
||||
|
||||
// ctx must have an active task (see WithTask)
|
||||
//
|
||||
// spans must nest (stack-like), otherwise, this function panics
|
||||
// Start a new span.
|
||||
// Important: ctx must have an active task (see WithTask)
|
||||
func WithSpan(ctx context.Context, annotation string) (context.Context, DoneFunc) {
|
||||
var parentSpan, parentTask *traceNode
|
||||
nodeI := ctx.Value(contextKeyTraceNode)
|
||||
@ -203,7 +237,7 @@ func WithSpan(ctx context.Context, annotation string) (context.Context, DoneFunc
|
||||
}
|
||||
|
||||
this := &traceNode{
|
||||
id: newTraceNodeId(),
|
||||
id: genID(),
|
||||
annotation: annotation,
|
||||
parentTask: parentTask,
|
||||
parentSpan: parentSpan,
|
||||
@ -233,19 +267,6 @@ func WithSpan(ctx context.Context, annotation string) (context.Context, DoneFunc
|
||||
return ctx, endTaskFunc
|
||||
}
|
||||
|
||||
type traceNode struct {
|
||||
id string
|
||||
annotation string
|
||||
parentTask *traceNode
|
||||
activeChildTasks int32 // only for task nodes, insignificant for span nodes
|
||||
parentSpan *traceNode
|
||||
hasActiveChildSpan int32
|
||||
|
||||
startedAt time.Time
|
||||
endedAt time.Time
|
||||
ended int32
|
||||
}
|
||||
|
||||
func currentTaskNameAndSpanStack(this *traceNode) (taskName string, spanIdStack string) {
|
||||
|
||||
task := this.parentTask
|
||||
@ -279,38 +300,12 @@ func currentTaskNameAndSpanStack(this *traceNode) (taskName string, spanIdStack
|
||||
return task.annotation, spanIdStack
|
||||
}
|
||||
|
||||
var traceNodeIdPRNG = rand.New(rand.NewSource(1))
|
||||
|
||||
func init() {
|
||||
traceNodeIdPRNG.Seed(time.Now().UnixNano())
|
||||
traceNodeIdPRNG.Seed(int64(os.Getpid()))
|
||||
}
|
||||
|
||||
var traceNodeIdBytes = envconst.Int("ZREPL_LOGGING_TRACE_ID_BYTES", 3)
|
||||
|
||||
func init() {
|
||||
if traceNodeIdBytes < 1 {
|
||||
panic("trace node id byte length must be at least 1")
|
||||
func GetSpanStackOrDefault(ctx context.Context, def string) string {
|
||||
if nI := ctx.Value(contextKeyTraceNode); nI != nil {
|
||||
n := nI.(*traceNode)
|
||||
_, spanStack := currentTaskNameAndSpanStack(n)
|
||||
return spanStack
|
||||
} else {
|
||||
return def
|
||||
}
|
||||
}
|
||||
|
||||
func newTraceNodeId() string {
|
||||
var out strings.Builder
|
||||
enc := base64.NewEncoder(base64.RawStdEncoding, &out)
|
||||
buf := make([]byte, traceNodeIdBytes)
|
||||
for i := 0; i < len(buf); {
|
||||
n, err := traceNodeIdPRNG.Read(buf[i:])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
i += n
|
||||
}
|
||||
n, err := enc.Write(buf[:])
|
||||
if err != nil || n != len(buf) {
|
||||
panic(err)
|
||||
}
|
||||
if err := enc.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return out.String()
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package logging
|
||||
package trace
|
||||
|
||||
// The functions in this file are concerned with the generation
|
||||
// of trace files based on the information from WithTask and WithSpan.
|
||||
@ -134,7 +134,7 @@ func init() {
|
||||
chrometraceConsumers.write = make(chan []byte)
|
||||
go func() {
|
||||
kickConsumer := func(c chrometraceConsumerRegistration, err error) {
|
||||
chrometraceDebug("chrometrace kicking consumer %#v after error %v", c, err)
|
||||
debug("chrometrace kicking consumer %#v after error %v", c, err)
|
||||
select {
|
||||
case c.errored <- err:
|
||||
default:
|
||||
@ -145,7 +145,7 @@ func init() {
|
||||
for {
|
||||
select {
|
||||
case reg := <-chrometraceConsumers.register:
|
||||
chrometraceDebug("registered chrometrace consumer %#v", reg)
|
||||
debug("registered chrometrace consumer %#v", reg)
|
||||
chrometraceConsumers.consumers[reg] = true
|
||||
n, err := reg.w.Write([]byte("[\n"))
|
||||
if err != nil {
|
||||
@ -156,12 +156,12 @@ func init() {
|
||||
// successfully registered
|
||||
|
||||
case buf := <-chrometraceConsumers.write:
|
||||
chrometraceDebug("chrometrace write request: %s", string(buf))
|
||||
debug("chrometrace write request: %s", string(buf))
|
||||
var r bytes.Reader
|
||||
for c := range chrometraceConsumers.consumers {
|
||||
r.Reset(buf)
|
||||
n, err := io.Copy(c.w, &r)
|
||||
chrometraceDebug("chrometrace wrote n=%v bytes to consumer %#v", n, c)
|
||||
debug("chrometrace wrote n=%v bytes to consumer %#v", n, c)
|
||||
if err != nil {
|
||||
kickConsumer(c, err)
|
||||
}
|
27
daemon/logging/trace/trace_context.go
Normal file
27
daemon/logging/trace/trace_context.go
Normal file
@ -0,0 +1,27 @@
|
||||
package trace
|
||||
|
||||
import "context"
|
||||
|
||||
type contextKey int
|
||||
|
||||
const (
|
||||
contextKeyTraceNode contextKey = 1 + iota
|
||||
)
|
||||
|
||||
var contextKeys = []contextKey{
|
||||
contextKeyTraceNode,
|
||||
}
|
||||
|
||||
// WithInherit inherits the task hierarchy from inheritFrom into ctx.
|
||||
// The returned context is a child of ctx, but its task and span are those of inheritFrom.
|
||||
//
|
||||
// Note that in most use cases, callers most likely want to call WithTask since it will most likely
|
||||
// be in some sort of connection handler context.
|
||||
func WithInherit(ctx, inheritFrom context.Context) context.Context {
|
||||
for _, k := range contextKeys {
|
||||
if v := inheritFrom.Value(k); v != nil {
|
||||
ctx = context.WithValue(ctx, k, v) // no shadow
|
||||
}
|
||||
}
|
||||
return ctx
|
||||
}
|
74
daemon/logging/trace/trace_convenience.go
Normal file
74
daemon/logging/trace/trace_convenience.go
Normal file
@ -0,0 +1,74 @@
|
||||
package trace
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// use like this:
|
||||
//
|
||||
// defer WithSpanFromStackUpdateCtx(&existingCtx)()
|
||||
//
|
||||
//
|
||||
func WithSpanFromStackUpdateCtx(ctx *context.Context) DoneFunc {
|
||||
childSpanCtx, end := WithSpan(*ctx, getMyCallerOrPanic())
|
||||
*ctx = childSpanCtx
|
||||
return end
|
||||
}
|
||||
|
||||
// derive task name from call stack (caller's name)
|
||||
func WithTaskFromStack(ctx context.Context) (context.Context, DoneFunc) {
|
||||
return WithTask(ctx, getMyCallerOrPanic())
|
||||
}
|
||||
|
||||
// derive task name from call stack (caller's name) and update *ctx
|
||||
// to point to be the child task ctx
|
||||
func WithTaskFromStackUpdateCtx(ctx *context.Context) DoneFunc {
|
||||
child, end := WithTask(*ctx, getMyCallerOrPanic())
|
||||
*ctx = child
|
||||
return end
|
||||
}
|
||||
|
||||
// create a task and a span within it in one call
|
||||
func WithTaskAndSpan(ctx context.Context, task string, span string) (context.Context, DoneFunc) {
|
||||
ctx, endTask := WithTask(ctx, task)
|
||||
ctx, endSpan := WithSpan(ctx, fmt.Sprintf("%s %s", task, span))
|
||||
return ctx, func() {
|
||||
endSpan()
|
||||
endTask()
|
||||
}
|
||||
}
|
||||
|
||||
// create a span during which several child tasks are spawned using the `add` function
|
||||
func WithTaskGroup(ctx context.Context, taskGroup string) (_ context.Context, add func(f func(context.Context)), waitEnd DoneFunc) {
|
||||
var wg sync.WaitGroup
|
||||
ctx, endSpan := WithSpan(ctx, taskGroup)
|
||||
add = func(f func(context.Context)) {
|
||||
wg.Add(1)
|
||||
defer wg.Done()
|
||||
ctx, endTask := WithTask(ctx, taskGroup)
|
||||
defer endTask()
|
||||
f(ctx)
|
||||
}
|
||||
waitEnd = func() {
|
||||
wg.Wait()
|
||||
endSpan()
|
||||
}
|
||||
return ctx, add, waitEnd
|
||||
}
|
||||
|
||||
func getMyCallerOrPanic() string {
|
||||
pc, _, _, ok := runtime.Caller(2)
|
||||
if !ok {
|
||||
panic("cannot get caller")
|
||||
}
|
||||
details := runtime.FuncForPC(pc)
|
||||
if ok && details != nil {
|
||||
const prefix = "github.com/zrepl/zrepl"
|
||||
return strings.TrimPrefix(strings.TrimPrefix(details.Name(), prefix), "/")
|
||||
}
|
||||
return ""
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package logging
|
||||
package trace
|
||||
|
||||
import (
|
||||
"testing"
|
||||
@ -12,5 +12,5 @@ func TestGetCallerOrPanic(t *testing.T) {
|
||||
}
|
||||
ret := withStackFromCtxMock()
|
||||
// zrepl prefix is stripped
|
||||
assert.Equal(t, "daemon/logging.TestGetCallerOrPanic", ret)
|
||||
assert.Equal(t, "daemon/logging/trace.TestGetCallerOrPanic", ret)
|
||||
}
|
15
daemon/logging/trace/trace_debug.go
Normal file
15
daemon/logging/trace/trace_debug.go
Normal file
@ -0,0 +1,15 @@
|
||||
package trace
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
)
|
||||
|
||||
const debugEnabled = false
|
||||
|
||||
func debug(format string, args ...interface{}) {
|
||||
if !debugEnabled {
|
||||
return
|
||||
}
|
||||
fmt.Fprintf(os.Stderr, format+"\n", args...)
|
||||
}
|
47
daemon/logging/trace/trace_genID.go
Normal file
47
daemon/logging/trace/trace_genID.go
Normal file
@ -0,0 +1,47 @@
|
||||
package trace
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"math/rand"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/zrepl/zrepl/util/envconst"
|
||||
)
|
||||
|
||||
var genIdPRNG = rand.New(rand.NewSource(1))
|
||||
|
||||
func init() {
|
||||
genIdPRNG.Seed(time.Now().UnixNano())
|
||||
genIdPRNG.Seed(int64(os.Getpid()))
|
||||
}
|
||||
|
||||
var genIdNumBytes = envconst.Int("ZREPL_TRACE_ID_NUM_BYTES", 3)
|
||||
|
||||
func init() {
|
||||
if genIdNumBytes < 1 {
|
||||
panic("trace node id byte length must be at least 1")
|
||||
}
|
||||
}
|
||||
|
||||
func genID() string {
|
||||
var out strings.Builder
|
||||
enc := base64.NewEncoder(base64.RawStdEncoding, &out)
|
||||
buf := make([]byte, genIdNumBytes)
|
||||
for i := 0; i < len(buf); {
|
||||
n, err := genIdPRNG.Read(buf[i:])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
i += n
|
||||
}
|
||||
n, err := enc.Write(buf[:])
|
||||
if err != nil || n != len(buf) {
|
||||
panic(err)
|
||||
}
|
||||
if err := enc.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return out.String()
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package logging
|
||||
package trace
|
||||
|
||||
import (
|
||||
"fmt"
|
@ -1,4 +1,4 @@
|
||||
package logging
|
||||
package trace
|
||||
|
||||
import (
|
||||
"fmt"
|
@ -8,10 +8,10 @@ import (
|
||||
"net"
|
||||
"net/http/pprof"
|
||||
|
||||
"github.com/zrepl/zrepl/daemon/logging/trace"
|
||||
"golang.org/x/net/websocket"
|
||||
|
||||
"github.com/zrepl/zrepl/daemon/job"
|
||||
"github.com/zrepl/zrepl/daemon/logging"
|
||||
)
|
||||
|
||||
type pprofServer struct {
|
||||
@ -67,7 +67,7 @@ outer:
|
||||
mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
|
||||
mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
|
||||
mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
|
||||
mux.Handle("/debug/zrepl/activity-trace", websocket.Handler(logging.ChrometraceClientWebsocketHandler))
|
||||
mux.Handle("/debug/zrepl/activity-trace", websocket.Handler(trace.ChrometraceClientWebsocketHandler))
|
||||
go func() {
|
||||
err := http.Serve(s.listener, mux)
|
||||
if ctx.Err() != nil {
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/zrepl/zrepl/daemon/logging/trace"
|
||||
|
||||
"github.com/zrepl/zrepl/config"
|
||||
"github.com/zrepl/zrepl/daemon/filters"
|
||||
@ -133,7 +134,7 @@ func PeriodicFromConfig(g *config.Global, fsf *filters.DatasetMapFilter, in *con
|
||||
}
|
||||
|
||||
func (s *Snapper) Run(ctx context.Context, snapshotsTaken chan<- struct{}) {
|
||||
defer logging.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
getLogger(ctx).Debug("start")
|
||||
defer getLogger(ctx).Debug("stop")
|
||||
|
||||
|
@ -10,8 +10,8 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/zrepl/zrepl/daemon/logging/trace"
|
||||
|
||||
"github.com/zrepl/zrepl/daemon/logging"
|
||||
"github.com/zrepl/zrepl/replication/logic/pdu"
|
||||
"github.com/zrepl/zrepl/util/chainedio"
|
||||
"github.com/zrepl/zrepl/util/chainlock"
|
||||
@ -74,7 +74,7 @@ func (s *Sender) filterCheckFS(fs string) (*zfs.DatasetPath, error) {
|
||||
}
|
||||
|
||||
func (s *Sender) ListFilesystems(ctx context.Context, r *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error) {
|
||||
defer logging.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
|
||||
fss, err := zfs.ZFSListMapping(ctx, s.FSFilter)
|
||||
if err != nil {
|
||||
@ -98,7 +98,7 @@ func (s *Sender) ListFilesystems(ctx context.Context, r *pdu.ListFilesystemReq)
|
||||
}
|
||||
|
||||
func (s *Sender) ListFilesystemVersions(ctx context.Context, r *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error) {
|
||||
defer logging.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
|
||||
lp, err := s.filterCheckFS(r.GetFilesystem())
|
||||
if err != nil {
|
||||
@ -118,7 +118,7 @@ func (s *Sender) ListFilesystemVersions(ctx context.Context, r *pdu.ListFilesyst
|
||||
}
|
||||
|
||||
func (p *Sender) HintMostRecentCommonAncestor(ctx context.Context, r *pdu.HintMostRecentCommonAncestorReq) (*pdu.HintMostRecentCommonAncestorRes, error) {
|
||||
defer logging.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
|
||||
fsp, err := p.filterCheckFS(r.GetFilesystem())
|
||||
if err != nil {
|
||||
@ -251,7 +251,7 @@ func sendArgsFromPDUAndValidateExistsAndGetVersion(ctx context.Context, fs strin
|
||||
}
|
||||
|
||||
func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) {
|
||||
defer logging.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
|
||||
_, err := s.filterCheckFS(r.Filesystem)
|
||||
if err != nil {
|
||||
@ -357,7 +357,7 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
|
||||
}
|
||||
|
||||
func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) {
|
||||
defer logging.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
|
||||
orig := r.GetOriginalReq() // may be nil, always use proto getters
|
||||
fsp, err := p.filterCheckFS(orig.GetFilesystem())
|
||||
@ -412,7 +412,7 @@ func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*p
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
ctx, endTask := logging.WithTask(ctx, "release-step-hold-to")
|
||||
ctx, endTask := trace.WithTask(ctx, "release-step-hold-to")
|
||||
defer endTask()
|
||||
|
||||
log(ctx).Debug("release step-hold of or step-bookmark on `to`")
|
||||
@ -426,7 +426,7 @@ func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*p
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
ctx, endTask := logging.WithTask(ctx, "release-step-hold-from")
|
||||
ctx, endTask := trace.WithTask(ctx, "release-step-hold-from")
|
||||
defer endTask()
|
||||
|
||||
if from == nil {
|
||||
@ -458,7 +458,7 @@ func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*p
|
||||
}
|
||||
|
||||
func (p *Sender) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) {
|
||||
defer logging.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
|
||||
dp, err := p.filterCheckFS(req.Filesystem)
|
||||
if err != nil {
|
||||
@ -468,7 +468,7 @@ func (p *Sender) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshots
|
||||
}
|
||||
|
||||
func (p *Sender) Ping(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
|
||||
defer logging.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
|
||||
res := pdu.PingRes{
|
||||
Echo: req.GetMessage(),
|
||||
@ -477,19 +477,19 @@ func (p *Sender) Ping(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, erro
|
||||
}
|
||||
|
||||
func (p *Sender) PingDataconn(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
|
||||
defer logging.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
|
||||
return p.Ping(ctx, req)
|
||||
}
|
||||
|
||||
func (p *Sender) WaitForConnectivity(ctx context.Context) error {
|
||||
defer logging.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Sender) ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) {
|
||||
defer logging.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
|
||||
dp, err := p.filterCheckFS(req.Filesystem)
|
||||
if err != nil {
|
||||
@ -621,7 +621,7 @@ func (f subroot) MapToLocal(fs string) (*zfs.DatasetPath, error) {
|
||||
}
|
||||
|
||||
func (s *Receiver) ListFilesystems(ctx context.Context, req *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error) {
|
||||
defer logging.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
|
||||
root := s.clientRootFromCtx(ctx)
|
||||
filtered, err := zfs.ZFSListMapping(ctx, subroot{root})
|
||||
@ -673,7 +673,7 @@ func (s *Receiver) ListFilesystems(ctx context.Context, req *pdu.ListFilesystemR
|
||||
}
|
||||
|
||||
func (s *Receiver) ListFilesystemVersions(ctx context.Context, req *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error) {
|
||||
defer logging.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
|
||||
root := s.clientRootFromCtx(ctx)
|
||||
lp, err := subroot{root}.MapToLocal(req.GetFilesystem())
|
||||
@ -696,7 +696,7 @@ func (s *Receiver) ListFilesystemVersions(ctx context.Context, req *pdu.ListFile
|
||||
}
|
||||
|
||||
func (s *Receiver) Ping(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
|
||||
defer logging.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
|
||||
res := pdu.PingRes{
|
||||
Echo: req.GetMessage(),
|
||||
@ -705,29 +705,29 @@ func (s *Receiver) Ping(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, er
|
||||
}
|
||||
|
||||
func (s *Receiver) PingDataconn(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
|
||||
defer logging.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
return s.Ping(ctx, req)
|
||||
}
|
||||
|
||||
func (s *Receiver) WaitForConnectivity(ctx context.Context) error {
|
||||
defer logging.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Receiver) ReplicationCursor(ctx context.Context, _ *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) {
|
||||
defer logging.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
return nil, fmt.Errorf("ReplicationCursor not implemented for Receiver")
|
||||
}
|
||||
|
||||
func (s *Receiver) Send(ctx context.Context, req *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) {
|
||||
defer logging.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
return nil, nil, fmt.Errorf("receiver does not implement Send()")
|
||||
}
|
||||
|
||||
var maxConcurrentZFSRecvSemaphore = semaphore.New(envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_RECV", 10))
|
||||
|
||||
func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io.ReadCloser) (*pdu.ReceiveRes, error) {
|
||||
defer logging.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
|
||||
getLogger(ctx).Debug("incoming Receive")
|
||||
defer receive.Close()
|
||||
@ -957,7 +957,7 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io.
|
||||
}
|
||||
|
||||
func (s *Receiver) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) {
|
||||
defer logging.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
|
||||
root := s.clientRootFromCtx(ctx)
|
||||
lp, err := subroot{root}.MapToLocal(req.Filesystem)
|
||||
@ -968,7 +968,7 @@ func (s *Receiver) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapsho
|
||||
}
|
||||
|
||||
func (p *Receiver) HintMostRecentCommonAncestor(ctx context.Context, r *pdu.HintMostRecentCommonAncestorReq) (*pdu.HintMostRecentCommonAncestorRes, error) {
|
||||
defer logging.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
// we don't move last-received-hold as part of this hint
|
||||
// because that wouldn't give us any benefit wrt resumability.
|
||||
//
|
||||
@ -978,7 +978,7 @@ func (p *Receiver) HintMostRecentCommonAncestor(ctx context.Context, r *pdu.Hint
|
||||
}
|
||||
|
||||
func (p *Receiver) SendCompleted(ctx context.Context, _ *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) {
|
||||
defer logging.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
|
||||
return &pdu.SendCompletedRes{}, nil
|
||||
}
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/fatih/color"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/zrepl/zrepl/daemon/logging/trace"
|
||||
|
||||
"github.com/zrepl/zrepl/config"
|
||||
"github.com/zrepl/zrepl/daemon/logging"
|
||||
@ -69,7 +70,7 @@ func doMain() error {
|
||||
panic(err)
|
||||
}
|
||||
ctx := context.Background()
|
||||
defer logging.WithTaskFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithTaskFromStackUpdateCtx(&ctx)()
|
||||
ctx = platformtest.WithLogger(ctx, logger)
|
||||
ex := platformtest.NewEx(logger)
|
||||
|
||||
|
@ -10,10 +10,10 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/zrepl/zrepl/daemon/logging/trace"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/zrepl/zrepl/daemon/logging"
|
||||
"github.com/zrepl/zrepl/replication/report"
|
||||
"github.com/zrepl/zrepl/util/chainlock"
|
||||
"github.com/zrepl/zrepl/util/envconst"
|
||||
@ -297,7 +297,7 @@ func (a *attempt) do(ctx context.Context, prev *attempt) {
|
||||
|
||||
// if no error occurs, returns a map that maps this attempt's a.fss to `prev`'s a.fss
|
||||
func (a *attempt) doGlobalPlanning(ctx context.Context, prev *attempt) map[*fs]*fs {
|
||||
ctx, endSpan := logging.WithSpan(ctx, "plan")
|
||||
ctx, endSpan := trace.WithSpan(ctx, "plan")
|
||||
defer endSpan()
|
||||
pfss, err := a.planner.Plan(ctx)
|
||||
errTime := time.Now()
|
||||
@ -389,7 +389,7 @@ func (a *attempt) doGlobalPlanning(ctx context.Context, prev *attempt) map[*fs]*
|
||||
}
|
||||
|
||||
func (a *attempt) doFilesystems(ctx context.Context, prevs map[*fs]*fs) {
|
||||
ctx, endSpan := logging.WithSpan(ctx, "do-repl")
|
||||
ctx, endSpan := trace.WithSpan(ctx, "do-repl")
|
||||
defer endSpan()
|
||||
|
||||
defer a.l.Lock().Unlock()
|
||||
@ -402,7 +402,7 @@ func (a *attempt) doFilesystems(ctx context.Context, prevs map[*fs]*fs) {
|
||||
go func(f *fs) {
|
||||
defer fssesDone.Done()
|
||||
// avoid explosion of tasks with name f.report().Info.Name
|
||||
ctx, endTask := logging.WithTaskAndSpan(ctx, "repl-fs", f.report().Info.Name)
|
||||
ctx, endTask := trace.WithTaskAndSpan(ctx, "repl-fs", f.report().Info.Name)
|
||||
defer endTask()
|
||||
f.do(ctx, stepQueue, prevs[f])
|
||||
}(f)
|
||||
@ -610,7 +610,7 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
|
||||
targetDate := s.step.TargetDate()
|
||||
defer pq.WaitReady(ctx, f, targetDate)()
|
||||
// do the step
|
||||
ctx, endSpan := logging.WithSpan(ctx, fmt.Sprintf("%#v", s.step.ReportInfo()))
|
||||
ctx, endSpan := trace.WithSpan(ctx, fmt.Sprintf("%#v", s.step.ReportInfo()))
|
||||
defer endSpan()
|
||||
err, errTime = s.step.Step(ctx), time.Now() // no shadow
|
||||
})
|
||||
|
@ -10,8 +10,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/zrepl/zrepl/daemon/logging/trace"
|
||||
|
||||
"github.com/zrepl/zrepl/daemon/logging"
|
||||
"github.com/zrepl/zrepl/replication/report"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
@ -150,7 +150,7 @@ func (f *mockStep) ReportInfo() *report.StepInfo {
|
||||
func TestReplication(t *testing.T) {
|
||||
|
||||
ctx := context.Background()
|
||||
defer logging.WithTaskFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithTaskFromStackUpdateCtx(&ctx)()
|
||||
|
||||
mp := &mockPlanner{}
|
||||
getReport, wait := Do(ctx, mp)
|
||||
|
@ -5,7 +5,7 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/zrepl/zrepl/daemon/logging"
|
||||
"github.com/zrepl/zrepl/daemon/logging/trace"
|
||||
"github.com/zrepl/zrepl/util/chainlock"
|
||||
)
|
||||
|
||||
@ -158,7 +158,7 @@ func (q *stepQueue) sendAndWaitForWakeup(ident interface{}, targetDate time.Time
|
||||
|
||||
// Wait for the ident with targetDate to be selected to run.
|
||||
func (q *stepQueue) WaitReady(ctx context.Context, ident interface{}, targetDate time.Time) StepCompletedFunc {
|
||||
defer logging.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
if targetDate.IsZero() {
|
||||
panic("targetDate of zero is reserved for marking Done")
|
||||
}
|
||||
|
@ -12,21 +12,20 @@ import (
|
||||
|
||||
"github.com/montanaflynn/stats"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/zrepl/zrepl/daemon/logging"
|
||||
"github.com/zrepl/zrepl/daemon/logging/trace"
|
||||
)
|
||||
|
||||
// FIXME: this test relies on timing and is thus rather flaky
|
||||
// (relies on scheduler responsiveness of < 500ms)
|
||||
func TestPqNotconcurrent(t *testing.T) {
|
||||
ctx, end := logging.WithTaskFromStack(context.Background())
|
||||
ctx, end := trace.WithTaskFromStack(context.Background())
|
||||
defer end()
|
||||
var ctr uint32
|
||||
q := newStepQueue()
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(4)
|
||||
go func() {
|
||||
ctx, end := logging.WithTaskFromStack(ctx)
|
||||
ctx, end := trace.WithTaskFromStack(ctx)
|
||||
defer end()
|
||||
defer wg.Done()
|
||||
defer q.WaitReady(ctx, "1", time.Unix(9999, 0))()
|
||||
@ -41,7 +40,7 @@ func TestPqNotconcurrent(t *testing.T) {
|
||||
|
||||
// while "1" is still running, queue in "2", "3" and "4"
|
||||
go func() {
|
||||
ctx, end := logging.WithTaskFromStack(ctx)
|
||||
ctx, end := trace.WithTaskFromStack(ctx)
|
||||
defer end()
|
||||
defer wg.Done()
|
||||
defer q.WaitReady(ctx, "2", time.Unix(2, 0))()
|
||||
@ -49,7 +48,7 @@ func TestPqNotconcurrent(t *testing.T) {
|
||||
assert.Equal(t, uint32(2), ret)
|
||||
}()
|
||||
go func() {
|
||||
ctx, end := logging.WithTaskFromStack(ctx)
|
||||
ctx, end := trace.WithTaskFromStack(ctx)
|
||||
defer end()
|
||||
defer wg.Done()
|
||||
defer q.WaitReady(ctx, "3", time.Unix(3, 0))()
|
||||
@ -57,7 +56,7 @@ func TestPqNotconcurrent(t *testing.T) {
|
||||
assert.Equal(t, uint32(3), ret)
|
||||
}()
|
||||
go func() {
|
||||
ctx, end := logging.WithTaskFromStack(ctx)
|
||||
ctx, end := trace.WithTaskFromStack(ctx)
|
||||
defer end()
|
||||
defer wg.Done()
|
||||
defer q.WaitReady(ctx, "4", time.Unix(4, 0))()
|
||||
@ -90,7 +89,7 @@ func (r record) String() string {
|
||||
// Hence, perform some statistics on the wakeup times and assert that the mean wakeup
|
||||
// times for each step are close together.
|
||||
func TestPqConcurrent(t *testing.T) {
|
||||
ctx, end := logging.WithTaskFromStack(context.Background())
|
||||
ctx, end := trace.WithTaskFromStack(context.Background())
|
||||
defer end()
|
||||
|
||||
q := newStepQueue()
|
||||
@ -105,7 +104,7 @@ func TestPqConcurrent(t *testing.T) {
|
||||
records := make(chan []record, filesystems)
|
||||
for fs := 0; fs < filesystems; fs++ {
|
||||
go func(fs int) {
|
||||
ctx, end := logging.WithTaskFromStack(ctx)
|
||||
ctx, end := trace.WithTaskFromStack(ctx)
|
||||
defer end()
|
||||
defer wg.Done()
|
||||
recs := make([]record, 0)
|
||||
|
@ -9,8 +9,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/zrepl/zrepl/daemon/logging/trace"
|
||||
|
||||
"github.com/zrepl/zrepl/daemon/logging"
|
||||
"github.com/zrepl/zrepl/logger"
|
||||
"github.com/zrepl/zrepl/replication/driver"
|
||||
. "github.com/zrepl/zrepl/replication/logic/diff"
|
||||
@ -82,7 +82,7 @@ func (p *Planner) WaitForConnectivity(ctx context.Context) error {
|
||||
var wg sync.WaitGroup
|
||||
doPing := func(endpoint Endpoint, errOut *error) {
|
||||
defer wg.Done()
|
||||
ctx, endTask := logging.WithTaskFromStack(ctx)
|
||||
ctx, endTask := trace.WithTaskFromStack(ctx)
|
||||
defer endTask()
|
||||
err := endpoint.WaitForConnectivity(ctx)
|
||||
if err != nil {
|
||||
@ -375,7 +375,7 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) {
|
||||
var wg sync.WaitGroup
|
||||
doHint := func(ep Endpoint, name string) {
|
||||
defer wg.Done()
|
||||
ctx, endTask := logging.WithTask(ctx, "hint-mrca-"+name)
|
||||
ctx, endTask := trace.WithTask(ctx, "hint-mrca-"+name)
|
||||
defer endTask()
|
||||
|
||||
log := log(ctx).WithField("to_side", name).
|
||||
@ -537,7 +537,7 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) {
|
||||
log(ctx).Debug("compute send size estimate")
|
||||
errs := make(chan error, len(steps))
|
||||
fanOutCtx, fanOutCancel := context.WithCancel(ctx)
|
||||
_, fanOutAdd, fanOutWait := logging.WithTaskGroup(fanOutCtx, "compute-size-estimate")
|
||||
_, fanOutAdd, fanOutWait := trace.WithTaskGroup(fanOutCtx, "compute-size-estimate")
|
||||
defer fanOutCancel()
|
||||
for _, step := range steps {
|
||||
step := step // local copy that is moved into the closure
|
||||
|
@ -10,11 +10,11 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/zrepl/zrepl/daemon/logging/trace"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
||||
"github.com/zrepl/zrepl/daemon/logging"
|
||||
"github.com/zrepl/zrepl/replication/logic"
|
||||
"github.com/zrepl/zrepl/replication/logic/pdu"
|
||||
"github.com/zrepl/zrepl/rpc/dataconn"
|
||||
@ -84,7 +84,7 @@ func (c *Client) Close() {
|
||||
// callers must ensure that the returned io.ReadCloser is closed
|
||||
// TODO expose dataClient interface to the outside world
|
||||
func (c *Client) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) {
|
||||
ctx, endSpan := logging.WithSpan(ctx, "rpc.client.Send")
|
||||
ctx, endSpan := trace.WithSpan(ctx, "rpc.client.Send")
|
||||
defer endSpan()
|
||||
|
||||
// TODO the returned sendStream may return a read error created by the remote side
|
||||
@ -101,56 +101,56 @@ func (c *Client) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
|
||||
}
|
||||
|
||||
func (c *Client) Receive(ctx context.Context, req *pdu.ReceiveReq, stream io.ReadCloser) (*pdu.ReceiveRes, error) {
|
||||
ctx, endSpan := logging.WithSpan(ctx, "rpc.client.Receive")
|
||||
ctx, endSpan := trace.WithSpan(ctx, "rpc.client.Receive")
|
||||
defer endSpan()
|
||||
|
||||
return c.dataClient.ReqRecv(ctx, req, stream)
|
||||
}
|
||||
|
||||
func (c *Client) ListFilesystems(ctx context.Context, in *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error) {
|
||||
ctx, endSpan := logging.WithSpan(ctx, "rpc.client.ListFilesystems")
|
||||
ctx, endSpan := trace.WithSpan(ctx, "rpc.client.ListFilesystems")
|
||||
defer endSpan()
|
||||
|
||||
return c.controlClient.ListFilesystems(ctx, in)
|
||||
}
|
||||
|
||||
func (c *Client) ListFilesystemVersions(ctx context.Context, in *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error) {
|
||||
ctx, endSpan := logging.WithSpan(ctx, "rpc.client.ListFilesystemVersions")
|
||||
ctx, endSpan := trace.WithSpan(ctx, "rpc.client.ListFilesystemVersions")
|
||||
defer endSpan()
|
||||
|
||||
return c.controlClient.ListFilesystemVersions(ctx, in)
|
||||
}
|
||||
|
||||
func (c *Client) DestroySnapshots(ctx context.Context, in *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) {
|
||||
ctx, endSpan := logging.WithSpan(ctx, "rpc.client.DestroySnapshots")
|
||||
ctx, endSpan := trace.WithSpan(ctx, "rpc.client.DestroySnapshots")
|
||||
defer endSpan()
|
||||
|
||||
return c.controlClient.DestroySnapshots(ctx, in)
|
||||
}
|
||||
|
||||
func (c *Client) ReplicationCursor(ctx context.Context, in *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) {
|
||||
ctx, endSpan := logging.WithSpan(ctx, "rpc.client.ReplicationCursor")
|
||||
ctx, endSpan := trace.WithSpan(ctx, "rpc.client.ReplicationCursor")
|
||||
defer endSpan()
|
||||
|
||||
return c.controlClient.ReplicationCursor(ctx, in)
|
||||
}
|
||||
|
||||
func (c *Client) SendCompleted(ctx context.Context, in *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) {
|
||||
ctx, endSpan := logging.WithSpan(ctx, "rpc.client.SendCompleted")
|
||||
ctx, endSpan := trace.WithSpan(ctx, "rpc.client.SendCompleted")
|
||||
defer endSpan()
|
||||
|
||||
return c.controlClient.SendCompleted(ctx, in)
|
||||
}
|
||||
|
||||
func (c *Client) HintMostRecentCommonAncestor(ctx context.Context, in *pdu.HintMostRecentCommonAncestorReq) (*pdu.HintMostRecentCommonAncestorRes, error) {
|
||||
ctx, endSpan := logging.WithSpan(ctx, "rpc.client.HintMostRecentCommonAncestor")
|
||||
ctx, endSpan := trace.WithSpan(ctx, "rpc.client.HintMostRecentCommonAncestor")
|
||||
defer endSpan()
|
||||
|
||||
return c.controlClient.HintMostRecentCommonAncestor(ctx, in)
|
||||
}
|
||||
|
||||
func (c *Client) WaitForConnectivity(ctx context.Context) error {
|
||||
ctx, endSpan := logging.WithSpan(ctx, "rpc.client.WaitForConnectivity")
|
||||
ctx, endSpan := trace.WithSpan(ctx, "rpc.client.WaitForConnectivity")
|
||||
defer endSpan()
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
@ -3,9 +3,8 @@ package semaphore
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zrepl/zrepl/daemon/logging/trace"
|
||||
wsemaphore "golang.org/x/sync/semaphore"
|
||||
|
||||
"github.com/zrepl/zrepl/daemon/logging"
|
||||
)
|
||||
|
||||
type S struct {
|
||||
@ -23,7 +22,7 @@ type AcquireGuard struct {
|
||||
|
||||
// The returned AcquireGuard is not goroutine-safe.
|
||||
func (s *S) Acquire(ctx context.Context) (*AcquireGuard, error) {
|
||||
defer logging.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
||||
if err := s.ws.Acquire(ctx, 1); err != nil {
|
||||
return nil, err
|
||||
} else if err := ctx.Err(); err != nil {
|
||||
|
@ -9,8 +9,7 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/zrepl/zrepl/daemon/logging"
|
||||
"github.com/zrepl/zrepl/daemon/logging/trace"
|
||||
)
|
||||
|
||||
func TestSemaphore(t *testing.T) {
|
||||
@ -27,13 +26,13 @@ func TestSemaphore(t *testing.T) {
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
defer logging.WithTaskFromStackUpdateCtx(&ctx)()
|
||||
defer trace.WithTaskFromStackUpdateCtx(&ctx)()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(numGoroutines)
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
go func() {
|
||||
ctx, end := logging.WithTaskFromStack(ctx)
|
||||
ctx, end := trace.WithTaskFromStack(ctx)
|
||||
defer end()
|
||||
defer wg.Done()
|
||||
res, err := sem.Acquire(ctx)
|
||||
|
@ -14,7 +14,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/zrepl/zrepl/daemon/logging"
|
||||
"github.com/zrepl/zrepl/daemon/logging/trace"
|
||||
"github.com/zrepl/zrepl/util/circlog"
|
||||
)
|
||||
|
||||
@ -23,7 +23,7 @@ type Cmd struct {
|
||||
ctx context.Context
|
||||
mtx sync.RWMutex
|
||||
startedAt, waitStartedAt, waitReturnedAt time.Time
|
||||
waitReturnEndSpanCb logging.DoneFunc
|
||||
waitReturnEndSpanCb trace.DoneFunc
|
||||
}
|
||||
|
||||
func CommandContext(ctx context.Context, name string, arg ...string) *Cmd {
|
||||
@ -104,9 +104,9 @@ func (c *Cmd) Wait() (err error) {
|
||||
func (c *Cmd) startPre(newTask bool) {
|
||||
if newTask {
|
||||
// avoid explosion of tasks with name c.String()
|
||||
c.ctx, c.waitReturnEndSpanCb = logging.WithTaskAndSpan(c.ctx, "zfscmd", c.String())
|
||||
c.ctx, c.waitReturnEndSpanCb = trace.WithTaskAndSpan(c.ctx, "zfscmd", c.String())
|
||||
} else {
|
||||
c.ctx, c.waitReturnEndSpanCb = logging.WithSpan(c.ctx, c.String())
|
||||
c.ctx, c.waitReturnEndSpanCb = trace.WithSpan(c.ctx, c.String())
|
||||
}
|
||||
startPreLogging(c, time.Now())
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user