mirror of
https://github.com/zrepl/zrepl.git
synced 2025-08-18 18:58:15 +02:00
fixup "replication/driver: enforce ordering during initial replication in order to support encrypted send": correctly propagate non-inital parent failures
This commit is contained in:
@@ -505,30 +505,49 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
|
|||||||
f.debug("wait for parents %s", parents)
|
f.debug("wait for parents %s", parents)
|
||||||
for {
|
for {
|
||||||
var initialReplicatingParentsWithErrors []string
|
var initialReplicatingParentsWithErrors []string
|
||||||
allParentsDidInitialReplication := true
|
allParentsPresentOnReceiver := true
|
||||||
f.l.DropWhile(func() {
|
f.l.DropWhile(func() {
|
||||||
for _, p := range f.initialRepOrd.parents {
|
for _, p := range f.initialRepOrd.parents {
|
||||||
p.l.Lock()
|
p.l.HoldWhile(func() {
|
||||||
|
|
||||||
parentDidInitialReplication :=
|
|
||||||
// (get the preconditions that allow us to inspect p.planned)
|
// (get the preconditions that allow us to inspect p.planned)
|
||||||
p.planning.done && p.planning.err == nil &&
|
parentHasPlanningDone := p.planning.done && p.planning.err == nil
|
||||||
// if there are no steps to be done, the filesystem must exist on the receiving side
|
if !parentHasPlanningDone {
|
||||||
// (otherwise we'd replicate it, and there would be a step for that)
|
// if the parent couldn't be planned, we cannot know whether it needs initial replication
|
||||||
(len(p.planned.steps) == 0 ||
|
// or incremental replication => be conservative and assume it was initial replication
|
||||||
// OR if it has completed at least one step
|
allParentsPresentOnReceiver = false
|
||||||
// (remember that .step points to the next step to be done)
|
if p.planning.err != nil {
|
||||||
// (TODO technically, we could make this step ready in the moment the recv-side
|
initialReplicatingParentsWithErrors = append(initialReplicatingParentsWithErrors, p.fs.ReportInfo().Name)
|
||||||
// dataset exists, but we'd have to ask the receiver for that -> pool ListFilesystems RPC)
|
}
|
||||||
(p.planned.step >= 1))
|
return
|
||||||
|
}
|
||||||
|
// now allowed to inspect p.planned
|
||||||
|
|
||||||
allParentsDidInitialReplication = allParentsDidInitialReplication && parentDidInitialReplication
|
// if there are no steps to be done, the filesystem must exist on the receiving side
|
||||||
|
// (otherwise we'd replicate it, and there would be a step for that)
|
||||||
|
// (FIXME hardcoded initial replication policy, assuming the policy will always do _some_ initial replication)
|
||||||
|
parentHasNoSteps := len(p.planned.steps) == 0
|
||||||
|
|
||||||
if parentDidInitialReplication && (p.planning.err != nil || p.planned.stepErr != nil) {
|
// OR if it has completed at least one step
|
||||||
initialReplicatingParentsWithErrors = append(initialReplicatingParentsWithErrors, p.fs.ReportInfo().Name)
|
// (remember that .step points to the next step to be done)
|
||||||
}
|
// (TODO technically, we could make this step ready in the moment the recv-side
|
||||||
|
// dataset exists, i.e. after the first few megabytes of transferred data, but we'd have to ask the receiver for that -> poll ListFilesystems RPC)
|
||||||
|
parentHasTakenAtLeastOneSuccessfulStep := !parentHasNoSteps && p.planned.step >= 1
|
||||||
|
|
||||||
p.l.Unlock()
|
parentFirstStepIsIncremental := // no need to lock for .report() because step.l == it's fs.l
|
||||||
|
len(p.planned.steps) > 0 && p.planned.steps[0].report().IsIncremental()
|
||||||
|
|
||||||
|
f.debug("parentHasNoSteps=%v parentFirstStepIsIncremental=%v parentHasTakenAtLeastOneSuccessfulStep=%v",
|
||||||
|
parentHasNoSteps, parentFirstStepIsIncremental, parentHasTakenAtLeastOneSuccessfulStep)
|
||||||
|
|
||||||
|
parentPresentOnReceiver := parentHasNoSteps || parentFirstStepIsIncremental || parentHasTakenAtLeastOneSuccessfulStep
|
||||||
|
|
||||||
|
allParentsPresentOnReceiver = allParentsPresentOnReceiver && parentPresentOnReceiver // no shadow
|
||||||
|
|
||||||
|
if !parentPresentOnReceiver && p.planned.stepErr != nil {
|
||||||
|
initialReplicatingParentsWithErrors = append(initialReplicatingParentsWithErrors, p.fs.ReportInfo().Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -537,7 +556,7 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if allParentsDidInitialReplication {
|
if allParentsPresentOnReceiver {
|
||||||
break // good to go
|
break // good to go
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -40,3 +40,8 @@ func (l *L) DropWhile(f func()) {
|
|||||||
defer l.Unlock().Lock()
|
defer l.Unlock().Lock()
|
||||||
f()
|
f()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *L) HoldWhile(f func()) {
|
||||||
|
defer l.Lock().Unlock()
|
||||||
|
f()
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user