This commit is contained in:
Christian Schwarz 2021-04-16 10:10:16 +02:00
parent f28676b8d7
commit 5976264bce
3 changed files with 140 additions and 13 deletions

View File

@ -58,7 +58,7 @@ const (
ActiveSideDone // also errors
)
type activeSideTasks struct {
type activeSideReplicationAndTriggerRemotePruneSequence struct {
state ActiveSideState
// valid for state ActiveSideReplicating, ActiveSidePruneSender, ActiveSidePruneReceiver, ActiveSideDone
@ -67,10 +67,8 @@ type activeSideTasks struct {
replicationDone *report.Report
// valid for state ActiveSidePruneSender, ActiveSidePruneReceiver, ActiveSideDone
prunerSender, prunerReceiver *pruner.Pruner
// valid for state ActiveSidePruneReceiver, ActiveSideDone
prunerSenderCancel, prunerReceiverCancel context.CancelFunc
pruneRemote *pruner.Pruner
pruneRemoteCancel context.CancelFunc
}
func (a *ActiveSide) updateTasks(u func(*activeSideTasks)) activeSideTasks {
@ -427,6 +425,13 @@ func (j *ActiveSide) Run(ctx context.Context) {
defer log.Info("job exiting")
type Activity interface {
Trigger() (interface{}, error)
}
var periodicActivity Activity
var replicationActivity Activity
periodicDone := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -610,6 +615,10 @@ func (j *ActiveSide) Trigger(req ActiveSideTriggerRequest) (*ActiveSideSignalRes
return &ActiveSideSignalResponse{InvocationId: invocationId}, nil
}
type ReplicationPlusRemotePruneSequence struct {
}
func (j *ActiveSide) do(ctx context.Context) {
j.mode.ConnectEndpoints(ctx, j.connecter)

View File

@ -7,7 +7,7 @@
// Poll(PollRequest, chan PollResponse),
// Reset(ResetRequest, chan (ResetResponse, error)),
// }
//
//
// enum State {
// Running{
// invocationId: u32,
@ -17,7 +17,7 @@
// nextInvocationId: u32,
// }
// }
//
//
// for msg := <- t.internalMsgs {
// match (msg, state) {
// ...
@ -30,9 +30,6 @@ import (
"fmt"
"math"
"sync"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/logger"
)
type T struct {

View File

@ -1,11 +1,17 @@
package trigger
package trigger_test
import (
"context"
"net/http"
"sync"
"testing"
"github.com/stretchr/testify/require"
"github.com/zrepl/zrepl/daemon/job/trigger"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/replication"
"github.com/zrepl/zrepl/replication/driver"
"github.com/zrepl/zrepl/replication/logic"
)
func TestBasics(t *testing.T) {
@ -13,7 +19,7 @@ func TestBasics(t *testing.T) {
var wg sync.WaitGroup
defer wg.Wait()
tr := New()
tr := trigger.New()
triggered := make(chan int)
waitForTriggerError := make(chan error)
@ -70,7 +76,7 @@ func TestBasics(t *testing.T) {
require.Equal(t, 2, v)
t.Logf("reset")
resetResponse, err := tr.Reset(ResetRequest{InvocationId: triggerResponse.InvocationId})
resetResponse, err := tr.Reset(trigger.ResetRequest{InvocationId: triggerResponse.InvocationId})
require.NoError(t, err)
t.Logf("reset response: %#v", resetResponse)
close(waitForResetCallToBeMadeByMainGoroutine)
@ -82,3 +88,118 @@ func TestBasics(t *testing.T) {
require.Equal(t, taskCtx.Err(), wfte)
}
type PushJob struct {
snap *Snapshotter
repl *ReplicationAndTriggerRemotePruningSequence
}
func (j *PushJob) Handle(w http.ResponseWriter, r *http.Request) {
panic("unimplemented")
}
func (j *PushJob) HandleTrigger(w http.ResponseWriter, r *http.Request) {
panic("unimplemented")
}
func (j *PushJob) Run(ctx context.Context) {
}
type SnapshotSequence struct {
}
type ReplicationAndTriggerRemotePruningSequence struct {
}
func (s ReplicationAndTriggerRemotePruningSequence) Run(ctx context.Context) {
j.mode.ConnectEndpoints(ctx, j.connecter)
defer j.mode.DisconnectEndpoints()
sender, receiver := j.mode.SenderReceiver()
{
select {
case <-ctx.Done():
return
default:
}
ctx, endSpan := trace.WithSpan(ctx, "replication")
ctx, repCancel := context.WithCancel(ctx)
var repWait driver.WaitFunc
j.updateTasks(func(tasks *activeSideTasks) {
// reset it
*tasks = activeSideTasks{}
tasks.replicationCancel = func() { repCancel(); endSpan() }
tasks.replicationReport, repWait = replication.Do(
ctx, j.replicationDriverConfig, logic.NewPlanner(j.promRepStateSecs, j.promBytesReplicated, sender, receiver, j.mode.PlannerPolicy()),
)
tasks.state = ActiveSideReplicating
})
GetLogger(ctx).Info("start replication")
repWait(true) // wait blocking
repCancel() // always cancel to free up context resources
replicationReport := j.tasks.replicationReport()
j.promReplicationErrors.Set(float64(replicationReport.GetFailedFilesystemsCountInLatestAttempt()))
j.updateTasks(func(tasks *activeSideTasks) {
tasks.replicationDone = replicationReport
})
endSpan()
}
{
select {
case <-ctx.Done():
return
default:
}
ctx, endSpan := trace.WithSpan(ctx, "prune_sender")
ctx, senderCancel := context.WithCancel(ctx)
tasks := j.updateTasks(func(tasks *activeSideTasks) {
tasks.prunerSender = j.prunerFactory.BuildSenderPruner(ctx, sender, sender)
tasks.prunerSenderCancel = func() { senderCancel(); endSpan() }
tasks.state = ActiveSidePruneSender
})
GetLogger(ctx).Info("start pruning sender")
tasks.prunerSender.Prune()
GetLogger(ctx).Info("finished pruning sender")
senderCancel()
endSpan()
}
{
select {
case <-ctx.Done():
return
default:
}
ctx, endSpan := trace.WithSpan(ctx, "prune_recever")
ctx, receiverCancel := context.WithCancel(ctx)
tasks := j.updateTasks(func(tasks *activeSideTasks) {
tasks.prunerReceiver = j.prunerFactory.BuildReceiverPruner(ctx, receiver, sender)
tasks.prunerReceiverCancel = func() { receiverCancel(); endSpan() }
tasks.state = ActiveSidePruneReceiver
})
GetLogger(ctx).Info("start pruning receiver")
tasks.prunerReceiver.Prune()
GetLogger(ctx).Info("finished pruning receiver")
receiverCancel()
endSpan()
}
j.updateTasks(func(tasks *activeSideTasks) {
tasks.state = ActiveSideDone
})
}
func TestUseCase(t *testing.T) {
var as ActiveSide
as.
}