diff --git a/daemon/job/active.go b/daemon/job/active.go index 2822eba..482b368 100644 --- a/daemon/job/active.go +++ b/daemon/job/active.go @@ -238,11 +238,11 @@ outer: } invocationCount++ invLog := log.WithField("invocation", invocationCount) - j.do(WithLogger(ctx, invLog), periodicDone) + j.do(WithLogger(ctx, invLog)) } } -func (j *ActiveSide) do(ctx context.Context, periodicWakeup <-chan struct{}) { +func (j *ActiveSide) do(ctx context.Context) { log := GetLogger(ctx) ctx = logging.WithSubsystemLoggers(ctx, log) @@ -277,129 +277,14 @@ func (j *ActiveSide) do(ctx context.Context, periodicWakeup <-chan struct{}) { }) log.Info("start replication") - replicationDone := make(chan struct{}) - replicationCtx, replicationCancel := context.WithCancel(ctx) - defer replicationCancel() - go func() { - tasks.replication.Drive(replicationCtx, sender, receiver) - close(replicationDone) - }() - outer: - for { - select { - case <-replicationDone: - // fine! - break outer - case <-periodicWakeup: - // Replication took longer than the periodic interval. - // - // For pull jobs, this isn't so bad because nothing changes on the active side - // if replication doesn't go forward. - // - // For push jobs, this means snapshots were taken. - // We need to invoke the pruner now, because otherwise an infinitely stuck replication - // will cause this side to fill up with snapshots. - // - // However, there are cases where replication progresses and just takes longer, - // and we don't want these situations be interrupted by a prune, which will require - // re-planning and starting over (think of initial replication as an example). - // - // Therefore, we prohibit pruning of snapshots that are part of the current replication plan. - // If there is no such plan, we kill the replication. + tasks.replication.Drive(ctx, sender, receiver) - if j.mode.Type() == TypePush { - - rep := tasks.replication.Report() - state, err := replication.StateString(rep.Status) - if err != nil { - panic(err) - } - - switch state { - case replication.Planning: - fallthrough - case replication.PlanningError: - fallthrough - case replication.WorkingWait: - log.WithField("repl_state", state.String()). - Info("cancelling replication after new snapshots invalidated its current state") - replicationCancel() - log.Info("waiting for replication to stop") - <-replicationDone // no need to wait for ctx.Done, replication is already bound to global cancel - break outer - default: - log.WithField("repl_state", state.String()). - Warn("new snapshots while replication is running and making progress") - } - - } - - } - } - - - var pruningWg sync.WaitGroup + tasks = j.updateTasks(func(tasks *activeSideTasks) { + tasks.prunerSender = j.prunerFactory.BuildSenderPruner(ctx, sender, sender) + tasks.prunerReceiver = j.prunerFactory.BuildReceiverPruner(ctx, receiver, sender) + }) log.Info("start pruning sender") - pruningWg.Add(1) - go func() { - defer pruningWg.Done() - tasks := j.updateTasks(func(tasks *activeSideTasks) { - tasks.prunerSender = j.prunerFactory.BuildSenderPruner(ctx, sender, sender) - }) - tasks.prunerSender.Prune() - // FIXME no need to do the cancellation dance with sender, we know it's local for push - // FIXME and we don't worry about pull ATM - }() + tasks.prunerSender.Prune() log.Info("start pruning receiver") - pruningWg.Add(1) - go func() { - defer pruningWg.Done() - - receiverPrunerCtx, receiverPrunerCancel := context.WithCancel(ctx) - defer receiverPrunerCancel() - tasks := j.updateTasks(func(tasks *activeSideTasks) { - tasks.prunerReceiver = j.prunerFactory.BuildReceiverPruner(receiverPrunerCtx, receiver, sender) - }) - receiverPrunerDone := make(chan struct{}) - go func() { - defer close(receiverPrunerDone) - tasks.prunerReceiver.Prune() - }() - - outer: - for { - select { - case <-receiverPrunerDone: - // fine! - break outer - case <-periodicWakeup: - // see comments for similar apporach with replication above - if j.mode.Type() == TypePush { - rep := tasks.prunerReceiver.Report() - state, err := pruner.StateString(rep.State) - if err != nil { - panic(err) - } - switch state { - case pruner.PlanWait: - fallthrough - case pruner.ExecWait: - log.WithField("pruner_state", state.String()). - Info("cancelling failing prune on receiver because new snapshots were taken on sender") - receiverPrunerCancel() - log.Info("waiting for receiver pruner to stop") - <-receiverPrunerDone - break outer - default: - log.WithField("pruner_state", state.String()). - Warn("new snapshots while prune on receiver is still running") - } - } - } - } - - }() - - pruningWg.Wait() // if pruners handle ctx cancellation correctly, we don't need to wait for it here - + tasks.prunerReceiver.Prune() }