diff --git a/daemon/logging/trace/trace.go b/daemon/logging/trace/trace.go index 6db9281..1e1e5fe 100644 --- a/daemon/logging/trace/trace.go +++ b/daemon/logging/trace/trace.go @@ -94,10 +94,11 @@ import ( "context" "fmt" "strings" - "sync/atomic" "time" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/zrepl/zrepl/util/chainlock" ) var metrics struct { @@ -128,16 +129,18 @@ func RegisterMetrics(r prometheus.Registerer) { var taskNamer = newUniqueTaskNamer(metrics.uniqueConcurrentTaskNameBitvecLength) type traceNode struct { - id string - annotation string - parentTask *traceNode - activeChildTasks int32 // only for task nodes, insignificant for span nodes - parentSpan *traceNode - hasActiveChildSpan int32 + id string + annotation string + parentTask *traceNode + + mtx chainlock.L + + activeChildTasks int32 // only for task nodes, insignificant for span nodes + parentSpan *traceNode + activeChildSpan *traceNode // nil if task or span doesn't have an active child span startedAt time.Time endedAt time.Time - ended int32 } // Returned from WithTask or WithSpan. @@ -146,6 +149,9 @@ type traceNode struct { // Wrong call order / forgetting to call it will result in panics. type DoneFunc func() +var ErrTaskStillHasActiveChildTasks = fmt.Errorf("end task: task still has active child tasks") +var ErrParentTaskAlreadyEnded = fmt.Errorf("create task: parent task already ended") + // Start a new root task or create a child task of an existing task. // // This is required when starting a new goroutine and @@ -163,7 +169,7 @@ func WithTask(ctx context.Context, taskName string) (context.Context, DoneFunc) if nodeI != nil { node := nodeI.(*traceNode) if node.parentSpan != nil { - parentTask = node.parentSpan // FIXME review this + parentTask = node.parentTask } else { parentTask = node } @@ -172,20 +178,24 @@ func WithTask(ctx context.Context, taskName string) (context.Context, DoneFunc) taskName, taskNameDone := taskNamer.UniqueConcurrentTaskName(taskName) this := &traceNode{ - id: genID(), - annotation: taskName, - parentTask: parentTask, - activeChildTasks: 0, - hasActiveChildSpan: 0, - parentSpan: nil, + id: genID(), + annotation: taskName, + parentTask: parentTask, + activeChildTasks: 0, + parentSpan: nil, + activeChildSpan: nil, startedAt: time.Now(), - ended: 0, endedAt: time.Time{}, } if this.parentTask != nil { - atomic.AddInt32(&this.parentTask.activeChildTasks, 1) + this.parentTask.mtx.HoldWhile(func() { + if !this.parentTask.endedAt.IsZero() { + panic(ErrParentTaskAlreadyEnded) + } + this.parentTask.activeChildTasks++ + }) } ctx = context.WithValue(ctx, contextKeyTraceNode, this) @@ -195,19 +205,35 @@ func WithTask(ctx context.Context, taskName string) (context.Context, DoneFunc) metrics.activeTasks.Inc() endTaskFunc := func() { - if nc := atomic.LoadInt32(&this.activeChildTasks); nc != 0 { - panic(fmt.Sprintf("this task must have 0 active child tasks, got %v", nc)) - } - if !atomic.CompareAndSwapInt32(&this.ended, 0, 1) { - return - } - this.endedAt = time.Now() - - if this.parentTask != nil { - if atomic.AddInt32(&this.parentTask.activeChildTasks, -1) < 0 { - panic("parent task with negative activeChildTasks count") + // only hold locks while manipulating the tree + // (trace writer might block too long and unlike spans, tasks are updated concurrently) + alreadyEnded := func() (alreadyEnded bool) { + if this.parentTask != nil { + defer this.parentTask.mtx.Lock().Unlock() } + defer this.mtx.Lock().Unlock() + + if this.activeChildTasks != 0 { + panic(errors.Wrapf(ErrTaskStillHasActiveChildTasks, "end task: %v active child tasks", this.activeChildSpan)) + } + + // support idempotent task ends + if !this.endedAt.IsZero() { + return true + } + this.endedAt = time.Now() + + if this.parentTask != nil { + this.parentTask.activeChildTasks-- + if this.parentTask.activeChildTasks < 0 { + panic("impl error: parent task with negative activeChildTasks count") + } + } + return false + }() + if alreadyEnded { + return } chrometraceEndTask(this) @@ -220,6 +246,9 @@ func WithTask(ctx context.Context, taskName string) (context.Context, DoneFunc) return ctx, endTaskFunc } +var ErrAlreadyActiveChildSpan = fmt.Errorf("create child span: span already has an active child span") +var ErrSpanStillHasActiveChildSpan = fmt.Errorf("end span: span still has active child spans") + // Start a new span. // Important: ctx must have an active task (see WithTask) func WithSpan(ctx context.Context, annotation string) (context.Context, DoneFunc) { @@ -237,28 +266,43 @@ func WithSpan(ctx context.Context, annotation string) (context.Context, DoneFunc } this := &traceNode{ - id: genID(), - annotation: annotation, - parentTask: parentTask, - parentSpan: parentSpan, - hasActiveChildSpan: 0, + id: genID(), + annotation: annotation, + parentTask: parentTask, + parentSpan: parentSpan, + activeChildSpan: nil, startedAt: time.Now(), - ended: 0, endedAt: time.Time{}, } - if !atomic.CompareAndSwapInt32(&parentSpan.hasActiveChildSpan, 0, 1) { - panic("already has active child span") - } + parentSpan.mtx.HoldWhile(func() { + if parentSpan.activeChildSpan != nil { + panic(ErrAlreadyActiveChildSpan) + } + parentSpan.activeChildSpan = this + }) ctx = context.WithValue(ctx, contextKeyTraceNode, this) chrometraceBeginSpan(this) endTaskFunc := func() { - if !atomic.CompareAndSwapInt32(&parentSpan.hasActiveChildSpan, 1, 0) { - panic("impl error: hasActiveChildSpan should not change to 0 while we hold it") + + defer parentSpan.mtx.Lock().Unlock() + if parentSpan.activeChildSpan != this && this.endedAt.IsZero() { + panic("impl error: activeChildSpan should not change while != nil because there can only be one") } + + defer this.mtx.Lock().Unlock() + if this.activeChildSpan != nil { + panic(ErrSpanStillHasActiveChildSpan) + } + + if !this.endedAt.IsZero() { + return // support idempotent span ends + } + + parentSpan.activeChildSpan = nil this.endedAt = time.Now() chrometraceEndSpan(this) diff --git a/daemon/logging/trace/trace_test.go b/daemon/logging/trace/trace_test.go new file mode 100644 index 0000000..e826db3 --- /dev/null +++ b/daemon/logging/trace/trace_test.go @@ -0,0 +1,172 @@ +package trace + +import ( + "context" + "fmt" + "testing" + + "github.com/gitchander/permutation" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRegularSpanUsage(t *testing.T) { + root, endRoot := WithTask(context.Background(), "root") + defer endRoot() + + s1, endS1 := WithSpan(root, "parent") + s2, endS2 := WithSpan(s1, "child") + _, endS3 := WithSpan(s2, "grand-child") + require.NotPanics(t, func() { endS3() }) + require.NotPanics(t, func() { endS2() }) + + // reuse + _, endS4 := WithSpan(s1, "child-2") + require.NotPanics(t, func() { endS4() }) + + // close parent + require.NotPanics(t, func() { endS1() }) +} + +func TestMultipleActiveChildSpansNotAllowed(t *testing.T) { + root, endRoot := WithTask(context.Background(), "root") + defer endRoot() + + s1, _ := WithSpan(root, "s1") + _, endS2 := WithSpan(s1, "s1-child1") + + require.PanicsWithValue(t, ErrAlreadyActiveChildSpan, func() { + _, _ = WithSpan(s1, "s1-child2") + }) + + endS2() + + require.NotPanics(t, func() { + _, _ = WithSpan(s1, "s1-child2") + }) +} + +func TestForkingChildSpansNotAllowed(t *testing.T) { + root, endRoot := WithTask(context.Background(), "root") + defer endRoot() + + s1, _ := WithSpan(root, "s1") + sc, endSC := WithSpan(s1, "s1-child") + _, _ = WithSpan(sc, "s1-child-child") + + require.PanicsWithValue(t, ErrSpanStillHasActiveChildSpan, func() { + endSC() + }) +} + +func TestRegularTaskUsage(t *testing.T) { + // assert concurrent activities on different tasks can end in any order + closeOrder := []int{0, 1, 2} + closeOrders := permutation.New(permutation.IntSlice(closeOrder)) + for closeOrders.Next() { + t.Run(fmt.Sprintf("%v", closeOrder), func(t *testing.T) { + root, endRoot := WithTask(context.Background(), "root") + defer endRoot() + + c1, endC1 := WithTask(root, "c1") + defer endC1() + c2, endC2 := WithTask(root, "c2") + defer endC2() + + // begin 3 concurrent activities + _, endAR := WithSpan(root, "aR") + _, endAC1 := WithSpan(c1, "aC1") + _, endAC2 := WithSpan(c2, "aC2") + + endFuncs := []DoneFunc{endAR, endAC1, endAC2} + for _, i := range closeOrder { + require.NotPanics(t, func() { + endFuncs[i]() + }, "%v", i) + } + }) + } +} + +func TestTaskEndWithActiveChildTaskNotAllowed(t *testing.T) { + root, _ := WithTask(context.Background(), "root") + c, endC := WithTask(root, "child") + _, _ = WithTask(c, "grand-child") + func() { + defer func() { + r := recover() + require.NotNil(t, r) + err, ok := r.(error) + require.True(t, ok) + require.Equal(t, ErrTaskStillHasActiveChildTasks, errors.Cause(err)) + }() + endC() + }() + +} + +func TestIdempotentEndTask(t *testing.T) { + _, end := WithTask(context.Background(), "root") + end() + require.NotPanics(t, func() { end() }) +} + +func TestCannotReuseEndedTask(t *testing.T) { + root, end := WithTask(context.Background(), "root") + end() + require.PanicsWithValue(t, ErrParentTaskAlreadyEnded, func() { WithTask(root, "child-after-parent-ended") }) +} + +func TestSpansPanicIfNoParentTask(t *testing.T) { + require.Panics(t, func() { WithSpan(context.Background(), "taskless-span") }) +} + +func TestIdempotentEndSpan(t *testing.T) { + root, _ := WithTask(context.Background(), "root") + _, end := WithSpan(root, "span") + end() + require.NotPanics(t, func() { end() }) +} + +func logAndGetTraceNode(t *testing.T, descr string, ctx context.Context) *traceNode { + n, ok := ctx.Value(contextKeyTraceNode).(*traceNode) + require.True(t, ok) + t.Logf("% 20s %p %#v", descr, n, n) + return n +} + +func TestWhiteboxHierachy(t *testing.T) { + root, e1 := WithTask(context.Background(), "root") + rootN := logAndGetTraceNode(t, "root", root) + assert.Nil(t, rootN.parentTask) + assert.Nil(t, rootN.parentSpan) + + child, e2 := WithSpan(root, "child") + childN := logAndGetTraceNode(t, "child", child) + assert.Equal(t, rootN, childN.parentTask) + assert.Equal(t, rootN, childN.parentSpan) + + grandchild, e3 := WithSpan(child, "grandchild") + grandchildN := logAndGetTraceNode(t, "grandchild", grandchild) + assert.Equal(t, rootN, grandchildN.parentTask) + assert.Equal(t, childN, grandchildN.parentSpan) + + gcTask, e4 := WithTask(grandchild, "grandchild-task") + gcTaskN := logAndGetTraceNode(t, "grandchild-task", gcTask) + assert.Equal(t, rootN, gcTaskN.parentTask) + assert.Nil(t, gcTaskN.parentSpan) + + // it is allowed that a child task outlives the _span_ in which it was created + // (albeit not its parent task) + e3() + e2() + gcTaskSpan, e5 := WithSpan(gcTask, "granschild-task-span") + gcTaskSpanN := logAndGetTraceNode(t, "granschild-task-span", gcTaskSpan) + assert.Equal(t, gcTaskN, gcTaskSpanN.parentTask) + assert.Equal(t, gcTaskN, gcTaskSpanN.parentSpan) + e5() + + e4() + e1() +} diff --git a/go.mod b/go.mod index 7184e0a..e6b4929 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.12 require ( github.com/fatih/color v1.7.0 github.com/gdamore/tcell v1.2.0 + github.com/gitchander/permutation v0.0.0-20181107151852-9e56b92e9909 github.com/go-logfmt/logfmt v0.4.0 github.com/go-sql-driver/mysql v1.4.1-0.20190907122137-b2c03bcae3d4 github.com/golang/protobuf v1.3.2 diff --git a/go.sum b/go.sum index 0466b3e..8cbdec3 100644 --- a/go.sum +++ b/go.sum @@ -40,6 +40,8 @@ github.com/gdamore/encoding v1.0.0 h1:+7OoQ1Bc6eTm5niUzBa0Ctsh6JbMW6Ra+YNuAtDBdk github.com/gdamore/encoding v1.0.0/go.mod h1:alR0ol34c49FCSBLjhosxzcPHQbf2trDkoo5dl+VrEg= github.com/gdamore/tcell v1.2.0 h1:ikixzsxc8K8o3V2/CEmyoEW8mJZaNYQQ3NP3VIQdUe4= github.com/gdamore/tcell v1.2.0/go.mod h1:Hjvr+Ofd+gLglo7RYKxxnzCBmev3BzsS67MebKS4zMM= +github.com/gitchander/permutation v0.0.0-20181107151852-9e56b92e9909 h1:9NC8seTx6/zRmMTAdsHj/uOMi0EGHGQtjyLafBjk77Q= +github.com/gitchander/permutation v0.0.0-20181107151852-9e56b92e9909/go.mod h1:lP+DW8LR6Rw3ru9Vo2/y/3iiLaLWmofYql/va+7zJOk= github.com/go-critic/go-critic v0.3.4/go.mod h1:AHR42Lk/E/aOznsrYdMYeIQS5RH10HZHSqP+rD6AJrc= github.com/go-critic/go-critic v0.3.5-0.20190526074819-1df300866540/go.mod h1:+sE8vrLDS2M0pZkBk0wy6+nLdKexVDrl/jBqQOTDThA= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=