mirror of
https://github.com/zrepl/zrepl.git
synced 2025-02-16 10:29:54 +01:00
[#388] trace: make WithTaskGroup actually concurrent
This commit is contained in:
parent
61acc7494a
commit
0d96627ffb
@ -43,15 +43,19 @@ func WithTaskAndSpan(ctx context.Context, task string, span string) (context.Con
|
|||||||
}
|
}
|
||||||
|
|
||||||
// create a span during which several child tasks are spawned using the `add` function
|
// create a span during which several child tasks are spawned using the `add` function
|
||||||
|
//
|
||||||
|
// IMPORTANT FOR USERS: Caller must ensure that the capturing behavior is correct, the Go linter doesn't catch this.
|
||||||
func WithTaskGroup(ctx context.Context, taskGroup string) (_ context.Context, add func(f func(context.Context)), waitEnd DoneFunc) {
|
func WithTaskGroup(ctx context.Context, taskGroup string) (_ context.Context, add func(f func(context.Context)), waitEnd DoneFunc) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
ctx, endSpan := WithSpan(ctx, taskGroup)
|
ctx, endSpan := WithSpan(ctx, taskGroup)
|
||||||
add = func(f func(context.Context)) {
|
add = func(f func(context.Context)) {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
defer wg.Done()
|
go func() {
|
||||||
ctx, endTask := WithTask(ctx, taskGroup)
|
defer wg.Done()
|
||||||
defer endTask()
|
ctx, endTask := WithTask(ctx, taskGroup)
|
||||||
f(ctx)
|
defer endTask()
|
||||||
|
f(ctx)
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
waitEnd = func() {
|
waitEnd = func() {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
@ -1,7 +1,10 @@
|
|||||||
package trace
|
package trace
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
@ -14,3 +17,46 @@ func TestGetCallerOrPanic(t *testing.T) {
|
|||||||
// zrepl prefix is stripped
|
// zrepl prefix is stripped
|
||||||
assert.Equal(t, "daemon/logging/trace.TestGetCallerOrPanic", ret)
|
assert.Equal(t, "daemon/logging/trace.TestGetCallerOrPanic", ret)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWithTaskGroupRunTasksConcurrently(t *testing.T) {
|
||||||
|
|
||||||
|
// spawn a task group where each task waits for the other to start
|
||||||
|
// => without concurrency, they would hang
|
||||||
|
|
||||||
|
rootCtx, endRoot := WithTaskFromStack(context.Background())
|
||||||
|
defer endRoot()
|
||||||
|
|
||||||
|
_, add, waitEnd := WithTaskGroup(rootCtx, "test-task-group")
|
||||||
|
|
||||||
|
schedulerTimeout := 2 * time.Second
|
||||||
|
timeout := time.After(schedulerTimeout)
|
||||||
|
var hadTimeout uint32
|
||||||
|
started0, started1 := make(chan struct{}), make(chan struct{})
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
i := i // capture by copy
|
||||||
|
add(func(ctx context.Context) {
|
||||||
|
switch i {
|
||||||
|
case 0:
|
||||||
|
close(started0)
|
||||||
|
select {
|
||||||
|
case <-started1:
|
||||||
|
case <-timeout:
|
||||||
|
atomic.AddUint32(&hadTimeout, 1)
|
||||||
|
}
|
||||||
|
case 1:
|
||||||
|
close(started1)
|
||||||
|
select {
|
||||||
|
case <-started0:
|
||||||
|
case <-timeout:
|
||||||
|
atomic.AddUint32(&hadTimeout, 1)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
panic("unreachable")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
waitEnd()
|
||||||
|
assert.Zero(t, hadTimeout, "either bad impl or scheduler timeout (which is %v)", schedulerTimeout)
|
||||||
|
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user