[#277] replication/driver: enforce ordering during initial replication in order to support encrypted send

fixes #277
This commit is contained in:
Christian Schwarz 2020-04-07 23:45:20 +02:00
parent b4abebce00
commit 0280727985
3 changed files with 137 additions and 8 deletions

View File

@ -146,6 +146,12 @@ type fs struct {
l *chainlock.L
// ordering relationship that must be maintained for initial replication
initialRepOrd struct {
parents, children []*fs
parentDidUpdate chan struct{}
}
planning struct {
done bool
err *timedError
@ -296,6 +302,7 @@ func (a *attempt) do(ctx context.Context, prev *attempt) {
fs: pfs,
l: a.l,
}
fs.initialRepOrd.parentDidUpdate = make(chan struct{}, 1)
a.fss = append(a.fss, fs)
}
@ -354,6 +361,18 @@ func (a *attempt) do(ctx context.Context, prev *attempt) {
}
// invariant: prevs contains an entry for each unambiguous correspondence
// build up parent-child relationship (FIXME (O(n^2), but who's going to have that many filesystems...))
for _, f1 := range a.fss {
fs1 := f1.fs.ReportInfo().Name
for _, f2 := range a.fss {
fs2 := f2.fs.ReportInfo().Name
if strings.HasPrefix(fs1, fs2) && fs1 != fs2 {
f1.initialRepOrd.parents = append(f1.initialRepOrd.parents, f2)
f2.initialRepOrd.children = append(f2.initialRepOrd.children, f1)
}
}
}
stepQueue := newStepQueue()
defer stepQueue.Start(envconst.Int("ZREPL_REPLICATION_EXPERIMENTAL_REPLICATION_CONCURRENCY", 1))() // TODO parallel replication
var fssesDone sync.WaitGroup
@ -374,9 +393,27 @@ func (f *fs) debug(format string, args ...interface{}) {
debugPrefix("fs=%s", f.fs.ReportInfo().Name)(format, args...)
}
// wake up children that watch for f.{planning.{err,done},planned.{step,stepErr}}
func (f *fs) initialRepOrdWakeupChildren() {
var children []string
for _, c := range f.initialRepOrd.children {
// no locking required, c.fs does not change
children = append(children, c.fs.ReportInfo().Name)
}
f.debug("wakeup children %s", children)
for _, child := range f.initialRepOrd.children {
select {
// no locking required, child.initialRepOrd does not change
case child.initialRepOrd.parentDidUpdate <- struct{}{}:
default:
}
}
}
func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
defer f.l.Lock().Unlock()
defer f.initialRepOrdWakeupChildren()
// get planned steps from replication logic
var psteps []Step
@ -390,7 +427,6 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
psteps, err = f.fs.PlanFS(ctx) // no shadow
errTime = time.Now() // no shadow
})
f.planning.done = true
if err != nil {
f.planning.err = newTimedError(err, errTime)
return
@ -402,6 +438,8 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
}
f.planned.steps = append(f.planned.steps, step)
}
// we're not done planning yet, f.planned.steps might still be changed by next block
// => don't set f.planning.done just yet
f.debug("initial len(fs.planned.steps) = %d", len(f.planned.steps))
// for not-first attempts, only allow fs.planned.steps
@ -456,24 +494,110 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
}
f.debug("post-prev-merge len(fs.planned.steps) = %d", len(f.planned.steps))
// now we are done planning (f.planned.steps won't change from now on)
f.planning.done = true
// wait for parents' initial replication
var parents []string
for _, p := range f.initialRepOrd.parents {
parents = append(parents, p.fs.ReportInfo().Name)
}
f.debug("wait for parents %s", parents)
for {
var initialReplicatingParentsWithErrors []string
allParentsPresentOnReceiver := true
f.l.DropWhile(func() {
for _, p := range f.initialRepOrd.parents {
p.l.HoldWhile(func() {
// (get the preconditions that allow us to inspect p.planned)
parentHasPlanningDone := p.planning.done && p.planning.err == nil
if !parentHasPlanningDone {
// if the parent couldn't be planned, we cannot know whether it needs initial replication
// or incremental replication => be conservative and assume it was initial replication
allParentsPresentOnReceiver = false
if p.planning.err != nil {
initialReplicatingParentsWithErrors = append(initialReplicatingParentsWithErrors, p.fs.ReportInfo().Name)
}
return
}
// now allowed to inspect p.planned
// if there are no steps to be done, the filesystem must exist on the receiving side
// (otherwise we'd replicate it, and there would be a step for that)
// (FIXME hardcoded initial replication policy, assuming the policy will always do _some_ initial replication)
parentHasNoSteps := len(p.planned.steps) == 0
// OR if it has completed at least one step
// (remember that .step points to the next step to be done)
// (TODO technically, we could make this step ready in the moment the recv-side
// dataset exists, i.e. after the first few megabytes of transferred data, but we'd have to ask the receiver for that -> poll ListFilesystems RPC)
parentHasTakenAtLeastOneSuccessfulStep := !parentHasNoSteps && p.planned.step >= 1
parentFirstStepIsIncremental := // no need to lock for .report() because step.l == it's fs.l
len(p.planned.steps) > 0 && p.planned.steps[0].report().IsIncremental()
f.debug("parentHasNoSteps=%v parentFirstStepIsIncremental=%v parentHasTakenAtLeastOneSuccessfulStep=%v",
parentHasNoSteps, parentFirstStepIsIncremental, parentHasTakenAtLeastOneSuccessfulStep)
parentPresentOnReceiver := parentHasNoSteps || parentFirstStepIsIncremental || parentHasTakenAtLeastOneSuccessfulStep
allParentsPresentOnReceiver = allParentsPresentOnReceiver && parentPresentOnReceiver // no shadow
if !parentPresentOnReceiver && p.planned.stepErr != nil {
initialReplicatingParentsWithErrors = append(initialReplicatingParentsWithErrors, p.fs.ReportInfo().Name)
}
})
}
})
if len(initialReplicatingParentsWithErrors) > 0 {
f.planned.stepErr = newTimedError(fmt.Errorf("parent(s) failed during initial replication: %s", initialReplicatingParentsWithErrors), time.Now())
return
}
if allParentsPresentOnReceiver {
break // good to go
}
// wait for wakeups from parents, then check again
// lock must not be held while waiting in order for reporting to work
f.l.DropWhile(func() {
select {
case <-ctx.Done():
f.planned.stepErr = newTimedError(ctx.Err(), time.Now())
return
case <-f.initialRepOrd.parentDidUpdate:
// loop
}
})
if f.planned.stepErr != nil {
return
}
}
f.debug("all parents ready, start replication %s", parents)
// do our steps
for i, s := range f.planned.steps {
var (
err error
errTime time.Time
)
// lock must not be held while executing step in order for reporting to work
f.l.DropWhile(func() {
// wait for parallel replication
targetDate := s.step.TargetDate()
defer pq.WaitReady(f, targetDate)()
err = s.step.Step(ctx) // no shadow
errTime = time.Now() // no shadow
// do the step
err, errTime = s.step.Step(ctx), time.Now() // no shadow
})
if err != nil {
f.planned.stepErr = newTimedError(err, errTime)
break
}
f.planned.step = i + 1 // fs.planned.step must be == len(fs.planned.steps) if all went OK
f.initialRepOrdWakeupChildren()
}
}
// caller must hold lock l

View File

@ -26,6 +26,6 @@ type debugFunc func(format string, args ...interface{})
func debugPrefix(prefixFormat string, prefixFormatArgs ...interface{}) debugFunc {
prefix := fmt.Sprintf(prefixFormat, prefixFormatArgs...)
return func(format string, args ...interface{}) {
debug("%s: %s", prefix, fmt.Sprintf(format, args))
debug("%s: %s", prefix, fmt.Sprintf(format, args...))
}
}

View File

@ -40,3 +40,8 @@ func (l *L) DropWhile(f func()) {
defer l.Unlock().Lock()
f()
}
func (l *L) HoldWhile(f func()) {
defer l.Lock().Unlock()
f()
}