mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-24 17:35:01 +01:00
daemon/job/active: push mode: awful hack for handling of concurrent snapshots + stale remote operation
We have the problem that there are legitimate use cases where a user does not want their machine to fill up with snapshots, even if it means unreplicated must be destroyed. This can be expressed by *not* configuring the keep rule `not_replicated` for the snapshot-creating side. This commit only addresses push mode because we don't support pruning in the source job. We adivse users in the docs to use push mode if they have above use case, so this is fine - at least for 0.1. Ideally, the replication.Replication would communicate to the pruner which snapshots are currently part of the replication plan, and then we'd need some conflict resolution to determine whether it's more important to destroy the snapshots or to replicate them (destroy should win?). However, we don't have the infrastructure for this yet (we could parse the replication report, but that's just ugly). And we want to get 0.1 out, so showtime for a dirty hack: We start replication, and ideally, replication and pruning is done before new snapshot have been taken. If so: great. However, what happens if snapshots have been taken and we are not done with replication and / or pruning? * If replicatoin is making progress according to its state, let it run. This covers the *important* situation of initial replication, where replication may easily take longer than a single snapshotting interval. * If replication is in an error state, cancel it through context cancellation. * As with the pruner below, the main problem here is that status output will only contain "context cancelled" after the cancellation, instead of showing the reason why it was cancelled. Not nice, but oh well, the logs provide enough detail for this niche situation... * If we are past replication, we're still pruning * Leave the local (send-side) pruning alone. Again, we only implement this hack for push, so we know sender is local, and it will only fail hard, not retry. * If the remote (receiver-side) pruner is in an error state, cancel it through context cancellation. * Otherwise, let it run. Note that every time we "let it run", we tolerate a temporary excess of snapshots, but given sufficiently aggressive timeouts and the assumption that the snapshot interval is much greater than the timeouts, this is not a significant problem in practice.
This commit is contained in:
parent
a85abe8bae
commit
aeb87ffbcf
@ -238,11 +238,11 @@ outer:
|
||||
}
|
||||
invocationCount++
|
||||
invLog := log.WithField("invocation", invocationCount)
|
||||
j.do(WithLogger(ctx, invLog))
|
||||
j.do(WithLogger(ctx, invLog), periodicDone)
|
||||
}
|
||||
}
|
||||
|
||||
func (j *ActiveSide) do(ctx context.Context) {
|
||||
func (j *ActiveSide) do(ctx context.Context, periodicWakeup <-chan struct{}) {
|
||||
|
||||
log := GetLogger(ctx)
|
||||
ctx = logging.WithSubsystemLoggers(ctx, log)
|
||||
@ -277,14 +277,129 @@ func (j *ActiveSide) do(ctx context.Context) {
|
||||
})
|
||||
|
||||
log.Info("start replication")
|
||||
tasks.replication.Drive(ctx, sender, receiver)
|
||||
replicationDone := make(chan struct{})
|
||||
replicationCtx, replicationCancel := context.WithCancel(ctx)
|
||||
defer replicationCancel()
|
||||
go func() {
|
||||
tasks.replication.Drive(replicationCtx, sender, receiver)
|
||||
close(replicationDone)
|
||||
}()
|
||||
outer:
|
||||
for {
|
||||
select {
|
||||
case <-replicationDone:
|
||||
// fine!
|
||||
break outer
|
||||
case <-periodicWakeup:
|
||||
// Replication took longer than the periodic interval.
|
||||
//
|
||||
// For pull jobs, this isn't so bad because nothing changes on the active side
|
||||
// if replication doesn't go forward.
|
||||
//
|
||||
// For push jobs, this means snapshots were taken.
|
||||
// We need to invoke the pruner now, because otherwise an infinitely stuck replication
|
||||
// will cause this side to fill up with snapshots.
|
||||
//
|
||||
// However, there are cases where replication progresses and just takes longer,
|
||||
// and we don't want these situations be interrupted by a prune, which will require
|
||||
// re-planning and starting over (think of initial replication as an example).
|
||||
//
|
||||
// Therefore, we prohibit pruning of snapshots that are part of the current replication plan.
|
||||
// If there is no such plan, we kill the replication.
|
||||
|
||||
tasks = j.updateTasks(func(tasks *activeSideTasks) {
|
||||
tasks.prunerSender = j.prunerFactory.BuildSenderPruner(ctx, sender, sender)
|
||||
tasks.prunerReceiver = j.prunerFactory.BuildReceiverPruner(ctx, receiver, sender)
|
||||
})
|
||||
if j.mode.Type() == TypePush {
|
||||
|
||||
rep := tasks.replication.Report()
|
||||
state, err := replication.StateString(rep.Status)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
switch state {
|
||||
case replication.Planning:
|
||||
fallthrough
|
||||
case replication.PlanningError:
|
||||
fallthrough
|
||||
case replication.WorkingWait:
|
||||
log.WithField("repl_state", state.String()).
|
||||
Info("cancelling replication after new snapshots invalidated its current state")
|
||||
replicationCancel()
|
||||
log.Info("waiting for replication to stop")
|
||||
<-replicationDone // no need to wait for ctx.Done, replication is already bound to global cancel
|
||||
break outer
|
||||
default:
|
||||
log.WithField("repl_state", state.String()).
|
||||
Warn("new snapshots while replication is running and making progress")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
var pruningWg sync.WaitGroup
|
||||
log.Info("start pruning sender")
|
||||
tasks.prunerSender.Prune()
|
||||
pruningWg.Add(1)
|
||||
go func() {
|
||||
defer pruningWg.Done()
|
||||
tasks := j.updateTasks(func(tasks *activeSideTasks) {
|
||||
tasks.prunerSender = j.prunerFactory.BuildSenderPruner(ctx, sender, sender)
|
||||
})
|
||||
tasks.prunerSender.Prune()
|
||||
// FIXME no need to do the cancellation dance with sender, we know it's local for push
|
||||
// FIXME and we don't worry about pull ATM
|
||||
}()
|
||||
log.Info("start pruning receiver")
|
||||
tasks.prunerReceiver.Prune()
|
||||
pruningWg.Add(1)
|
||||
go func() {
|
||||
defer pruningWg.Done()
|
||||
|
||||
receiverPrunerCtx, receiverPrunerCancel := context.WithCancel(ctx)
|
||||
defer receiverPrunerCancel()
|
||||
tasks := j.updateTasks(func(tasks *activeSideTasks) {
|
||||
tasks.prunerReceiver = j.prunerFactory.BuildReceiverPruner(receiverPrunerCtx, receiver, sender)
|
||||
})
|
||||
receiverPrunerDone := make(chan struct{})
|
||||
go func() {
|
||||
defer close(receiverPrunerDone)
|
||||
tasks.prunerReceiver.Prune()
|
||||
}()
|
||||
|
||||
outer:
|
||||
for {
|
||||
select {
|
||||
case <-receiverPrunerDone:
|
||||
// fine!
|
||||
break outer
|
||||
case <-periodicWakeup:
|
||||
// see comments for similar apporach with replication above
|
||||
if j.mode.Type() == TypePush {
|
||||
rep := tasks.prunerReceiver.Report()
|
||||
state, err := pruner.StateString(rep.State)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
switch state {
|
||||
case pruner.PlanWait:
|
||||
fallthrough
|
||||
case pruner.ExecWait:
|
||||
log.WithField("pruner_state", state.String()).
|
||||
Info("cancelling failing prune on receiver because new snapshots were taken on sender")
|
||||
receiverPrunerCancel()
|
||||
log.Info("waiting for receiver pruner to stop")
|
||||
<-receiverPrunerDone
|
||||
break outer
|
||||
default:
|
||||
log.WithField("pruner_state", state.String()).
|
||||
Warn("new snapshots while prune on receiver is still running")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
pruningWg.Wait() // if pruners handle ctx cancellation correctly, we don't need to wait for it here
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user