From 5976264bce05f04699a15d264b608320cd003096 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 16 Apr 2021 10:10:16 +0200 Subject: [PATCH] WIP --- daemon/job/active.go | 19 +++-- daemon/job/trigger/trigger.go | 7 +- daemon/job/trigger/trigger_test.go | 127 ++++++++++++++++++++++++++++- 3 files changed, 140 insertions(+), 13 deletions(-) diff --git a/daemon/job/active.go b/daemon/job/active.go index 0462f85..81aa681 100644 --- a/daemon/job/active.go +++ b/daemon/job/active.go @@ -58,7 +58,7 @@ const ( ActiveSideDone // also errors ) -type activeSideTasks struct { +type activeSideReplicationAndTriggerRemotePruneSequence struct { state ActiveSideState // valid for state ActiveSideReplicating, ActiveSidePruneSender, ActiveSidePruneReceiver, ActiveSideDone @@ -67,10 +67,8 @@ type activeSideTasks struct { replicationDone *report.Report // valid for state ActiveSidePruneSender, ActiveSidePruneReceiver, ActiveSideDone - prunerSender, prunerReceiver *pruner.Pruner - - // valid for state ActiveSidePruneReceiver, ActiveSideDone - prunerSenderCancel, prunerReceiverCancel context.CancelFunc + pruneRemote *pruner.Pruner + pruneRemoteCancel context.CancelFunc } func (a *ActiveSide) updateTasks(u func(*activeSideTasks)) activeSideTasks { @@ -427,6 +425,13 @@ func (j *ActiveSide) Run(ctx context.Context) { defer log.Info("job exiting") + type Activity interface { + Trigger() (interface{}, error) + } + + var periodicActivity Activity + var replicationActivity Activity + periodicDone := make(chan struct{}) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -610,6 +615,10 @@ func (j *ActiveSide) Trigger(req ActiveSideTriggerRequest) (*ActiveSideSignalRes return &ActiveSideSignalResponse{InvocationId: invocationId}, nil } +type ReplicationPlusRemotePruneSequence struct { + +} + func (j *ActiveSide) do(ctx context.Context) { j.mode.ConnectEndpoints(ctx, j.connecter) diff --git a/daemon/job/trigger/trigger.go b/daemon/job/trigger/trigger.go index 5aea02b..66b87bf 100644 --- a/daemon/job/trigger/trigger.go +++ b/daemon/job/trigger/trigger.go @@ -7,7 +7,7 @@ // Poll(PollRequest, chan PollResponse), // Reset(ResetRequest, chan (ResetResponse, error)), // } -// +// // enum State { // Running{ // invocationId: u32, @@ -17,7 +17,7 @@ // nextInvocationId: u32, // } // } -// +// // for msg := <- t.internalMsgs { // match (msg, state) { // ... @@ -30,9 +30,6 @@ import ( "fmt" "math" "sync" - - "github.com/zrepl/zrepl/daemon/logging" - "github.com/zrepl/zrepl/logger" ) type T struct { diff --git a/daemon/job/trigger/trigger_test.go b/daemon/job/trigger/trigger_test.go index 0542530..fe55a14 100644 --- a/daemon/job/trigger/trigger_test.go +++ b/daemon/job/trigger/trigger_test.go @@ -1,11 +1,17 @@ -package trigger +package trigger_test import ( "context" + "net/http" "sync" "testing" "github.com/stretchr/testify/require" + "github.com/zrepl/zrepl/daemon/job/trigger" + "github.com/zrepl/zrepl/daemon/logging/trace" + "github.com/zrepl/zrepl/replication" + "github.com/zrepl/zrepl/replication/driver" + "github.com/zrepl/zrepl/replication/logic" ) func TestBasics(t *testing.T) { @@ -13,7 +19,7 @@ func TestBasics(t *testing.T) { var wg sync.WaitGroup defer wg.Wait() - tr := New() + tr := trigger.New() triggered := make(chan int) waitForTriggerError := make(chan error) @@ -70,7 +76,7 @@ func TestBasics(t *testing.T) { require.Equal(t, 2, v) t.Logf("reset") - resetResponse, err := tr.Reset(ResetRequest{InvocationId: triggerResponse.InvocationId}) + resetResponse, err := tr.Reset(trigger.ResetRequest{InvocationId: triggerResponse.InvocationId}) require.NoError(t, err) t.Logf("reset response: %#v", resetResponse) close(waitForResetCallToBeMadeByMainGoroutine) @@ -82,3 +88,118 @@ func TestBasics(t *testing.T) { require.Equal(t, taskCtx.Err(), wfte) } + +type PushJob struct { + snap *Snapshotter + repl *ReplicationAndTriggerRemotePruningSequence +} + +func (j *PushJob) Handle(w http.ResponseWriter, r *http.Request) { + panic("unimplemented") +} + +func (j *PushJob) HandleTrigger(w http.ResponseWriter, r *http.Request) { + panic("unimplemented") +} + +func (j *PushJob) Run(ctx context.Context) { + +} + +type SnapshotSequence struct { + +} + +type ReplicationAndTriggerRemotePruningSequence struct { + +} + +func (s ReplicationAndTriggerRemotePruningSequence) Run(ctx context.Context) { + + j.mode.ConnectEndpoints(ctx, j.connecter) + defer j.mode.DisconnectEndpoints() + + sender, receiver := j.mode.SenderReceiver() + + { + select { + case <-ctx.Done(): + return + default: + } + ctx, endSpan := trace.WithSpan(ctx, "replication") + ctx, repCancel := context.WithCancel(ctx) + var repWait driver.WaitFunc + j.updateTasks(func(tasks *activeSideTasks) { + // reset it + *tasks = activeSideTasks{} + tasks.replicationCancel = func() { repCancel(); endSpan() } + tasks.replicationReport, repWait = replication.Do( + ctx, j.replicationDriverConfig, logic.NewPlanner(j.promRepStateSecs, j.promBytesReplicated, sender, receiver, j.mode.PlannerPolicy()), + ) + tasks.state = ActiveSideReplicating + }) + GetLogger(ctx).Info("start replication") + repWait(true) // wait blocking + repCancel() // always cancel to free up context resources + replicationReport := j.tasks.replicationReport() + j.promReplicationErrors.Set(float64(replicationReport.GetFailedFilesystemsCountInLatestAttempt())) + j.updateTasks(func(tasks *activeSideTasks) { + tasks.replicationDone = replicationReport + }) + endSpan() + } + + { + select { + case <-ctx.Done(): + return + default: + } + ctx, endSpan := trace.WithSpan(ctx, "prune_sender") + ctx, senderCancel := context.WithCancel(ctx) + tasks := j.updateTasks(func(tasks *activeSideTasks) { + tasks.prunerSender = j.prunerFactory.BuildSenderPruner(ctx, sender, sender) + tasks.prunerSenderCancel = func() { senderCancel(); endSpan() } + tasks.state = ActiveSidePruneSender + }) + GetLogger(ctx).Info("start pruning sender") + tasks.prunerSender.Prune() + GetLogger(ctx).Info("finished pruning sender") + senderCancel() + endSpan() + } + { + select { + case <-ctx.Done(): + return + default: + } + ctx, endSpan := trace.WithSpan(ctx, "prune_recever") + ctx, receiverCancel := context.WithCancel(ctx) + tasks := j.updateTasks(func(tasks *activeSideTasks) { + tasks.prunerReceiver = j.prunerFactory.BuildReceiverPruner(ctx, receiver, sender) + tasks.prunerReceiverCancel = func() { receiverCancel(); endSpan() } + tasks.state = ActiveSidePruneReceiver + }) + GetLogger(ctx).Info("start pruning receiver") + tasks.prunerReceiver.Prune() + GetLogger(ctx).Info("finished pruning receiver") + receiverCancel() + endSpan() + } + + j.updateTasks(func(tasks *activeSideTasks) { + tasks.state = ActiveSideDone + }) + +} + + +func TestUseCase(t *testing.T) { + + var as ActiveSide + + as. + +}