mirror of
https://github.com/zrepl/zrepl.git
synced 2025-01-22 22:28:47 +01:00
c600cc1f60
We had too many spurious test failures in the past. But on a developer machine, the tests don't usually fail because the system isn't loaded as much. So, only disable test on CircleCI.
198 lines
5.9 KiB
Go
198 lines
5.9 KiB
Go
package driver
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/montanaflynn/stats"
|
|
"github.com/stretchr/testify/assert"
|
|
|
|
"github.com/zrepl/zrepl/daemon/logging/trace"
|
|
"github.com/zrepl/zrepl/util/zreplcircleci"
|
|
)
|
|
|
|
func TestPqNotconcurrent(t *testing.T) {
|
|
zreplcircleci.SkipOnCircleCI(t, "because it relies on scheduler responsiveness < 500ms")
|
|
|
|
ctx, end := trace.WithTaskFromStack(context.Background())
|
|
defer end()
|
|
var ctr uint32
|
|
q := newStepQueue()
|
|
var wg sync.WaitGroup
|
|
wg.Add(4)
|
|
go func() {
|
|
ctx, end := trace.WithTaskFromStack(ctx)
|
|
defer end()
|
|
defer wg.Done()
|
|
defer q.WaitReady(ctx, "1", time.Unix(9999, 0))()
|
|
ret := atomic.AddUint32(&ctr, 1)
|
|
assert.Equal(t, uint32(1), ret)
|
|
time.Sleep(1 * time.Second)
|
|
}()
|
|
|
|
// give goroutine "1" 500ms to enter queue, get the active slot and enter time.Sleep
|
|
defer q.Start(1)()
|
|
time.Sleep(500 * time.Millisecond)
|
|
|
|
// while "1" is still running, queue in "2", "3" and "4"
|
|
go func() {
|
|
ctx, end := trace.WithTaskFromStack(ctx)
|
|
defer end()
|
|
defer wg.Done()
|
|
defer q.WaitReady(ctx, "2", time.Unix(2, 0))()
|
|
ret := atomic.AddUint32(&ctr, 1)
|
|
assert.Equal(t, uint32(2), ret)
|
|
}()
|
|
go func() {
|
|
ctx, end := trace.WithTaskFromStack(ctx)
|
|
defer end()
|
|
defer wg.Done()
|
|
defer q.WaitReady(ctx, "3", time.Unix(3, 0))()
|
|
ret := atomic.AddUint32(&ctr, 1)
|
|
assert.Equal(t, uint32(3), ret)
|
|
}()
|
|
go func() {
|
|
ctx, end := trace.WithTaskFromStack(ctx)
|
|
defer end()
|
|
defer wg.Done()
|
|
defer q.WaitReady(ctx, "4", time.Unix(4, 0))()
|
|
ret := atomic.AddUint32(&ctr, 1)
|
|
assert.Equal(t, uint32(4), ret)
|
|
}()
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
type record struct {
|
|
fs int
|
|
step int
|
|
globalCtr uint32
|
|
wakeAt time.Duration // relative to begin
|
|
}
|
|
|
|
func (r record) String() string {
|
|
return fmt.Sprintf("fs %08d step %08d globalCtr %08d wakeAt %2.8f", r.fs, r.step, r.globalCtr, r.wakeAt.Seconds())
|
|
}
|
|
|
|
// This tests uses stepPq concurrently, simulating the following scenario:
|
|
// Given a number of filesystems F, each filesystem has N steps to take.
|
|
// The number of concurrent steps is limited to C.
|
|
// The target date for each step is the step number N.
|
|
// Hence, there are always F filesystems runnable (calling WaitReady)
|
|
// The priority queue prioritizes steps with lower target data (= lower step number).
|
|
// Hence, all steps with lower numbers should be woken up before steps with higher numbers.
|
|
// However, scheduling is not 100% deterministic (runtime, OS scheduler, etc).
|
|
// Hence, perform some statistics on the wakeup times and assert that the mean wakeup
|
|
// times for each step are close together.
|
|
func TestPqConcurrent(t *testing.T) {
|
|
zreplcircleci.SkipOnCircleCI(t, "because it relies on scheduler responsiveness < 500ms")
|
|
|
|
ctx, end := trace.WithTaskFromStack(context.Background())
|
|
defer end()
|
|
|
|
q := newStepQueue()
|
|
var wg sync.WaitGroup
|
|
filesystems := 100
|
|
stepsPerFS := 20
|
|
sleepTimePerStep := 50 * time.Millisecond
|
|
wg.Add(filesystems)
|
|
var globalCtr uint32
|
|
|
|
begin := time.Now()
|
|
records := make(chan []record, filesystems)
|
|
for fs := 0; fs < filesystems; fs++ {
|
|
go func(fs int) {
|
|
ctx, end := trace.WithTaskFromStack(ctx)
|
|
defer end()
|
|
defer wg.Done()
|
|
recs := make([]record, 0)
|
|
for step := 0; step < stepsPerFS; step++ {
|
|
pos := atomic.AddUint32(&globalCtr, 1)
|
|
t := time.Unix(int64(step), 0)
|
|
done := q.WaitReady(ctx, fs, t)
|
|
wakeAt := time.Since(begin)
|
|
time.Sleep(sleepTimePerStep)
|
|
done()
|
|
recs = append(recs, record{fs, step, pos, wakeAt})
|
|
}
|
|
records <- recs
|
|
}(fs)
|
|
}
|
|
concurrency := 5
|
|
defer q.Start(concurrency)()
|
|
wg.Wait()
|
|
close(records)
|
|
t.Logf("loop done")
|
|
|
|
flattenedRecs := make([]record, 0)
|
|
for recs := range records {
|
|
flattenedRecs = append(flattenedRecs, recs...)
|
|
}
|
|
|
|
sort.Slice(flattenedRecs, func(i, j int) bool {
|
|
return flattenedRecs[i].globalCtr < flattenedRecs[j].globalCtr
|
|
})
|
|
|
|
wakeTimesByStep := map[int][]float64{}
|
|
for _, rec := range flattenedRecs {
|
|
wakeTimes, ok := wakeTimesByStep[rec.step]
|
|
if !ok {
|
|
wakeTimes = []float64{}
|
|
}
|
|
wakeTimes = append(wakeTimes, rec.wakeAt.Seconds())
|
|
wakeTimesByStep[rec.step] = wakeTimes
|
|
}
|
|
|
|
meansByStepId := make([]float64, stepsPerFS)
|
|
interQuartileRangesByStepIdx := make([]float64, stepsPerFS)
|
|
for step := 0; step < stepsPerFS; step++ {
|
|
t.Logf("step %d", step)
|
|
mean, _ := stats.Mean(wakeTimesByStep[step])
|
|
meansByStepId[step] = mean
|
|
t.Logf("\tmean: %v", mean)
|
|
median, _ := stats.Median(wakeTimesByStep[step])
|
|
t.Logf("\tmedian: %v", median)
|
|
midhinge, _ := stats.Midhinge(wakeTimesByStep[step])
|
|
t.Logf("\tmidhinge: %v", midhinge)
|
|
min, _ := stats.Min(wakeTimesByStep[step])
|
|
t.Logf("\tmin: %v", min)
|
|
max, _ := stats.Max(wakeTimesByStep[step])
|
|
t.Logf("\tmax: %v", max)
|
|
quartiles, _ := stats.Quartile(wakeTimesByStep[step])
|
|
t.Logf("\t%#v", quartiles)
|
|
interQuartileRange, _ := stats.InterQuartileRange(wakeTimesByStep[step])
|
|
t.Logf("\tinter-quartile range: %v", interQuartileRange)
|
|
interQuartileRangesByStepIdx[step] = interQuartileRange
|
|
}
|
|
|
|
iqrMean, _ := stats.Mean(interQuartileRangesByStepIdx)
|
|
t.Logf("inter-quartile-range mean: %v", iqrMean)
|
|
iqrDev, _ := stats.StandardDeviation(interQuartileRangesByStepIdx)
|
|
t.Logf("inter-quartile-range deviation: %v", iqrDev)
|
|
|
|
// each step should have the same "distribution" (=~ "spread")
|
|
assert.True(t, iqrDev < 0.01)
|
|
|
|
minTimeForAllStepsWithIdxI := sleepTimePerStep.Seconds() * float64(filesystems) / float64(concurrency)
|
|
t.Logf("minTimeForAllStepsWithIdxI = %11.8f", minTimeForAllStepsWithIdxI)
|
|
for i, mean := range meansByStepId {
|
|
// we can't just do (i + 0.5) * minTimeforAllStepsWithIdxI
|
|
// because this doesn't account for drift
|
|
idealMean := 0.5 * minTimeForAllStepsWithIdxI
|
|
if i > 0 {
|
|
previousMean := meansByStepId[i-1]
|
|
idealMean = previousMean + minTimeForAllStepsWithIdxI
|
|
}
|
|
deltaFromIdeal := idealMean - mean
|
|
t.Logf("step %02d delta from ideal mean wake time: %11.8f - %11.8f = %11.8f", i, idealMean, mean, deltaFromIdeal)
|
|
assert.True(t, math.Abs(deltaFromIdeal) < 0.05)
|
|
}
|
|
|
|
}
|