diff --git a/daemon/job/active.go b/daemon/job/active.go index 482b368..2822eba 100644 --- a/daemon/job/active.go +++ b/daemon/job/active.go @@ -238,11 +238,11 @@ outer: } invocationCount++ invLog := log.WithField("invocation", invocationCount) - j.do(WithLogger(ctx, invLog)) + j.do(WithLogger(ctx, invLog), periodicDone) } } -func (j *ActiveSide) do(ctx context.Context) { +func (j *ActiveSide) do(ctx context.Context, periodicWakeup <-chan struct{}) { log := GetLogger(ctx) ctx = logging.WithSubsystemLoggers(ctx, log) @@ -277,14 +277,129 @@ func (j *ActiveSide) do(ctx context.Context) { }) log.Info("start replication") - tasks.replication.Drive(ctx, sender, receiver) + replicationDone := make(chan struct{}) + replicationCtx, replicationCancel := context.WithCancel(ctx) + defer replicationCancel() + go func() { + tasks.replication.Drive(replicationCtx, sender, receiver) + close(replicationDone) + }() + outer: + for { + select { + case <-replicationDone: + // fine! + break outer + case <-periodicWakeup: + // Replication took longer than the periodic interval. + // + // For pull jobs, this isn't so bad because nothing changes on the active side + // if replication doesn't go forward. + // + // For push jobs, this means snapshots were taken. + // We need to invoke the pruner now, because otherwise an infinitely stuck replication + // will cause this side to fill up with snapshots. + // + // However, there are cases where replication progresses and just takes longer, + // and we don't want these situations be interrupted by a prune, which will require + // re-planning and starting over (think of initial replication as an example). + // + // Therefore, we prohibit pruning of snapshots that are part of the current replication plan. + // If there is no such plan, we kill the replication. - tasks = j.updateTasks(func(tasks *activeSideTasks) { - tasks.prunerSender = j.prunerFactory.BuildSenderPruner(ctx, sender, sender) - tasks.prunerReceiver = j.prunerFactory.BuildReceiverPruner(ctx, receiver, sender) - }) + if j.mode.Type() == TypePush { + + rep := tasks.replication.Report() + state, err := replication.StateString(rep.Status) + if err != nil { + panic(err) + } + + switch state { + case replication.Planning: + fallthrough + case replication.PlanningError: + fallthrough + case replication.WorkingWait: + log.WithField("repl_state", state.String()). + Info("cancelling replication after new snapshots invalidated its current state") + replicationCancel() + log.Info("waiting for replication to stop") + <-replicationDone // no need to wait for ctx.Done, replication is already bound to global cancel + break outer + default: + log.WithField("repl_state", state.String()). + Warn("new snapshots while replication is running and making progress") + } + + } + + } + } + + + var pruningWg sync.WaitGroup log.Info("start pruning sender") - tasks.prunerSender.Prune() + pruningWg.Add(1) + go func() { + defer pruningWg.Done() + tasks := j.updateTasks(func(tasks *activeSideTasks) { + tasks.prunerSender = j.prunerFactory.BuildSenderPruner(ctx, sender, sender) + }) + tasks.prunerSender.Prune() + // FIXME no need to do the cancellation dance with sender, we know it's local for push + // FIXME and we don't worry about pull ATM + }() log.Info("start pruning receiver") - tasks.prunerReceiver.Prune() + pruningWg.Add(1) + go func() { + defer pruningWg.Done() + + receiverPrunerCtx, receiverPrunerCancel := context.WithCancel(ctx) + defer receiverPrunerCancel() + tasks := j.updateTasks(func(tasks *activeSideTasks) { + tasks.prunerReceiver = j.prunerFactory.BuildReceiverPruner(receiverPrunerCtx, receiver, sender) + }) + receiverPrunerDone := make(chan struct{}) + go func() { + defer close(receiverPrunerDone) + tasks.prunerReceiver.Prune() + }() + + outer: + for { + select { + case <-receiverPrunerDone: + // fine! + break outer + case <-periodicWakeup: + // see comments for similar apporach with replication above + if j.mode.Type() == TypePush { + rep := tasks.prunerReceiver.Report() + state, err := pruner.StateString(rep.State) + if err != nil { + panic(err) + } + switch state { + case pruner.PlanWait: + fallthrough + case pruner.ExecWait: + log.WithField("pruner_state", state.String()). + Info("cancelling failing prune on receiver because new snapshots were taken on sender") + receiverPrunerCancel() + log.Info("waiting for receiver pruner to stop") + <-receiverPrunerDone + break outer + default: + log.WithField("pruner_state", state.String()). + Warn("new snapshots while prune on receiver is still running") + } + } + } + } + + }() + + pruningWg.Wait() // if pruners handle ctx cancellation correctly, we don't need to wait for it here + }