From 0d96627ffba8350ebac14df3ac126c66eb1a9ed8 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sun, 22 Nov 2020 15:14:26 +0100 Subject: [PATCH] [#388] trace: make WithTaskGroup actually concurrent --- daemon/logging/trace/trace_convenience.go | 12 +++-- .../logging/trace/trace_convenience_test.go | 46 +++++++++++++++++++ 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/daemon/logging/trace/trace_convenience.go b/daemon/logging/trace/trace_convenience.go index f14ce53..c6e1888 100644 --- a/daemon/logging/trace/trace_convenience.go +++ b/daemon/logging/trace/trace_convenience.go @@ -43,15 +43,19 @@ func WithTaskAndSpan(ctx context.Context, task string, span string) (context.Con } // create a span during which several child tasks are spawned using the `add` function +// +// IMPORTANT FOR USERS: Caller must ensure that the capturing behavior is correct, the Go linter doesn't catch this. func WithTaskGroup(ctx context.Context, taskGroup string) (_ context.Context, add func(f func(context.Context)), waitEnd DoneFunc) { var wg sync.WaitGroup ctx, endSpan := WithSpan(ctx, taskGroup) add = func(f func(context.Context)) { wg.Add(1) - defer wg.Done() - ctx, endTask := WithTask(ctx, taskGroup) - defer endTask() - f(ctx) + go func() { + defer wg.Done() + ctx, endTask := WithTask(ctx, taskGroup) + defer endTask() + f(ctx) + }() } waitEnd = func() { wg.Wait() diff --git a/daemon/logging/trace/trace_convenience_test.go b/daemon/logging/trace/trace_convenience_test.go index 3f27bfd..ce2a07b 100644 --- a/daemon/logging/trace/trace_convenience_test.go +++ b/daemon/logging/trace/trace_convenience_test.go @@ -1,7 +1,10 @@ package trace import ( + "context" + "sync/atomic" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -14,3 +17,46 @@ func TestGetCallerOrPanic(t *testing.T) { // zrepl prefix is stripped assert.Equal(t, "daemon/logging/trace.TestGetCallerOrPanic", ret) } + +func TestWithTaskGroupRunTasksConcurrently(t *testing.T) { + + // spawn a task group where each task waits for the other to start + // => without concurrency, they would hang + + rootCtx, endRoot := WithTaskFromStack(context.Background()) + defer endRoot() + + _, add, waitEnd := WithTaskGroup(rootCtx, "test-task-group") + + schedulerTimeout := 2 * time.Second + timeout := time.After(schedulerTimeout) + var hadTimeout uint32 + started0, started1 := make(chan struct{}), make(chan struct{}) + for i := 0; i < 2; i++ { + i := i // capture by copy + add(func(ctx context.Context) { + switch i { + case 0: + close(started0) + select { + case <-started1: + case <-timeout: + atomic.AddUint32(&hadTimeout, 1) + } + case 1: + close(started1) + select { + case <-started0: + case <-timeout: + atomic.AddUint32(&hadTimeout, 1) + } + default: + panic("unreachable") + } + }) + } + + waitEnd() + assert.Zero(t, hadTimeout, "either bad impl or scheduler timeout (which is %v)", schedulerTimeout) + +}