Files
zrepl/daemon/job/trigger/trigger_test.go
Christian Schwarz 5976264bce WIP
2021-04-16 10:10:16 +02:00

206 lines
4.8 KiB
Go

package trigger_test
import (
"context"
"net/http"
"sync"
"testing"
"github.com/stretchr/testify/require"
"github.com/zrepl/zrepl/daemon/job/trigger"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/replication"
"github.com/zrepl/zrepl/replication/driver"
"github.com/zrepl/zrepl/replication/logic"
)
func TestBasics(t *testing.T) {
var wg sync.WaitGroup
defer wg.Wait()
tr := trigger.New()
triggered := make(chan int)
waitForTriggerError := make(chan error)
waitForResetCallToBeMadeByMainGoroutine := make(chan struct{})
postResetAssertionsDone := make(chan struct{})
taskCtx := context.Background()
taskCtx, cancelTaskCtx := context.WithCancel(taskCtx)
wg.Add(1)
go func() {
defer wg.Done()
taskCtx := context.WithValue(taskCtx, "mykey", "myvalue")
triggers := 0
outer:
for {
invocationCtx, err := tr.WaitForTrigger(taskCtx)
if err != nil {
waitForTriggerError <- err
return
}
require.Equal(t, invocationCtx.Value("mykey"), "myvalue")
triggers++
triggered <- triggers
switch triggers {
case 1:
continue outer
case 2:
<-waitForResetCallToBeMadeByMainGoroutine
require.Equal(t, context.Canceled, invocationCtx.Err(), "Reset() cancels invocation context")
require.Nil(t, taskCtx.Err(), "Reset() does not cancel task context")
close(postResetAssertionsDone)
}
}
}()
t.Logf("trigger 1")
_, err := tr.Trigger()
require.NoError(t, err)
v := <-triggered
require.Equal(t, 1, v)
t.Logf("trigger 2")
triggerResponse, err := tr.Trigger()
require.NoError(t, err)
v = <-triggered
require.Equal(t, 2, v)
t.Logf("reset")
resetResponse, err := tr.Reset(trigger.ResetRequest{InvocationId: triggerResponse.InvocationId})
require.NoError(t, err)
t.Logf("reset response: %#v", resetResponse)
close(waitForResetCallToBeMadeByMainGoroutine)
<-postResetAssertionsDone
t.Logf("cancel the context")
cancelTaskCtx()
wfte := <-waitForTriggerError
require.Equal(t, taskCtx.Err(), wfte)
}
type PushJob struct {
snap *Snapshotter
repl *ReplicationAndTriggerRemotePruningSequence
}
func (j *PushJob) Handle(w http.ResponseWriter, r *http.Request) {
panic("unimplemented")
}
func (j *PushJob) HandleTrigger(w http.ResponseWriter, r *http.Request) {
panic("unimplemented")
}
func (j *PushJob) Run(ctx context.Context) {
}
type SnapshotSequence struct {
}
type ReplicationAndTriggerRemotePruningSequence struct {
}
func (s ReplicationAndTriggerRemotePruningSequence) Run(ctx context.Context) {
j.mode.ConnectEndpoints(ctx, j.connecter)
defer j.mode.DisconnectEndpoints()
sender, receiver := j.mode.SenderReceiver()
{
select {
case <-ctx.Done():
return
default:
}
ctx, endSpan := trace.WithSpan(ctx, "replication")
ctx, repCancel := context.WithCancel(ctx)
var repWait driver.WaitFunc
j.updateTasks(func(tasks *activeSideTasks) {
// reset it
*tasks = activeSideTasks{}
tasks.replicationCancel = func() { repCancel(); endSpan() }
tasks.replicationReport, repWait = replication.Do(
ctx, j.replicationDriverConfig, logic.NewPlanner(j.promRepStateSecs, j.promBytesReplicated, sender, receiver, j.mode.PlannerPolicy()),
)
tasks.state = ActiveSideReplicating
})
GetLogger(ctx).Info("start replication")
repWait(true) // wait blocking
repCancel() // always cancel to free up context resources
replicationReport := j.tasks.replicationReport()
j.promReplicationErrors.Set(float64(replicationReport.GetFailedFilesystemsCountInLatestAttempt()))
j.updateTasks(func(tasks *activeSideTasks) {
tasks.replicationDone = replicationReport
})
endSpan()
}
{
select {
case <-ctx.Done():
return
default:
}
ctx, endSpan := trace.WithSpan(ctx, "prune_sender")
ctx, senderCancel := context.WithCancel(ctx)
tasks := j.updateTasks(func(tasks *activeSideTasks) {
tasks.prunerSender = j.prunerFactory.BuildSenderPruner(ctx, sender, sender)
tasks.prunerSenderCancel = func() { senderCancel(); endSpan() }
tasks.state = ActiveSidePruneSender
})
GetLogger(ctx).Info("start pruning sender")
tasks.prunerSender.Prune()
GetLogger(ctx).Info("finished pruning sender")
senderCancel()
endSpan()
}
{
select {
case <-ctx.Done():
return
default:
}
ctx, endSpan := trace.WithSpan(ctx, "prune_recever")
ctx, receiverCancel := context.WithCancel(ctx)
tasks := j.updateTasks(func(tasks *activeSideTasks) {
tasks.prunerReceiver = j.prunerFactory.BuildReceiverPruner(ctx, receiver, sender)
tasks.prunerReceiverCancel = func() { receiverCancel(); endSpan() }
tasks.state = ActiveSidePruneReceiver
})
GetLogger(ctx).Info("start pruning receiver")
tasks.prunerReceiver.Prune()
GetLogger(ctx).Info("finished pruning receiver")
receiverCancel()
endSpan()
}
j.updateTasks(func(tasks *activeSideTasks) {
tasks.state = ActiveSideDone
})
}
func TestUseCase(t *testing.T) {
var as ActiveSide
as.
}