From 0280727985a632949c183f8b74e06a7dad04b4bc Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 7 Apr 2020 23:45:20 +0200 Subject: [PATCH] [#277] replication/driver: enforce ordering during initial replication in order to support encrypted send fixes #277 --- replication/driver/replication_driver.go | 138 +++++++++++++++++- .../driver/replication_driver_debug.go | 2 +- util/chainlock/chainlock.go | 5 + 3 files changed, 137 insertions(+), 8 deletions(-) diff --git a/replication/driver/replication_driver.go b/replication/driver/replication_driver.go index 626508e..1e33b3d 100644 --- a/replication/driver/replication_driver.go +++ b/replication/driver/replication_driver.go @@ -146,6 +146,12 @@ type fs struct { l *chainlock.L + // ordering relationship that must be maintained for initial replication + initialRepOrd struct { + parents, children []*fs + parentDidUpdate chan struct{} + } + planning struct { done bool err *timedError @@ -296,6 +302,7 @@ func (a *attempt) do(ctx context.Context, prev *attempt) { fs: pfs, l: a.l, } + fs.initialRepOrd.parentDidUpdate = make(chan struct{}, 1) a.fss = append(a.fss, fs) } @@ -354,6 +361,18 @@ func (a *attempt) do(ctx context.Context, prev *attempt) { } // invariant: prevs contains an entry for each unambiguous correspondence + // build up parent-child relationship (FIXME (O(n^2), but who's going to have that many filesystems...)) + for _, f1 := range a.fss { + fs1 := f1.fs.ReportInfo().Name + for _, f2 := range a.fss { + fs2 := f2.fs.ReportInfo().Name + if strings.HasPrefix(fs1, fs2) && fs1 != fs2 { + f1.initialRepOrd.parents = append(f1.initialRepOrd.parents, f2) + f2.initialRepOrd.children = append(f2.initialRepOrd.children, f1) + } + } + } + stepQueue := newStepQueue() defer stepQueue.Start(envconst.Int("ZREPL_REPLICATION_EXPERIMENTAL_REPLICATION_CONCURRENCY", 1))() // TODO parallel replication var fssesDone sync.WaitGroup @@ -374,9 +393,27 @@ func (f *fs) debug(format string, args ...interface{}) { debugPrefix("fs=%s", f.fs.ReportInfo().Name)(format, args...) } +// wake up children that watch for f.{planning.{err,done},planned.{step,stepErr}} +func (f *fs) initialRepOrdWakeupChildren() { + var children []string + for _, c := range f.initialRepOrd.children { + // no locking required, c.fs does not change + children = append(children, c.fs.ReportInfo().Name) + } + f.debug("wakeup children %s", children) + for _, child := range f.initialRepOrd.children { + select { + // no locking required, child.initialRepOrd does not change + case child.initialRepOrd.parentDidUpdate <- struct{}{}: + default: + } + } +} + func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) { defer f.l.Lock().Unlock() + defer f.initialRepOrdWakeupChildren() // get planned steps from replication logic var psteps []Step @@ -390,7 +427,6 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) { psteps, err = f.fs.PlanFS(ctx) // no shadow errTime = time.Now() // no shadow }) - f.planning.done = true if err != nil { f.planning.err = newTimedError(err, errTime) return @@ -402,6 +438,8 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) { } f.planned.steps = append(f.planned.steps, step) } + // we're not done planning yet, f.planned.steps might still be changed by next block + // => don't set f.planning.done just yet f.debug("initial len(fs.planned.steps) = %d", len(f.planned.steps)) // for not-first attempts, only allow fs.planned.steps @@ -456,24 +494,110 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) { } f.debug("post-prev-merge len(fs.planned.steps) = %d", len(f.planned.steps)) + // now we are done planning (f.planned.steps won't change from now on) + f.planning.done = true + + // wait for parents' initial replication + var parents []string + for _, p := range f.initialRepOrd.parents { + parents = append(parents, p.fs.ReportInfo().Name) + } + f.debug("wait for parents %s", parents) + for { + var initialReplicatingParentsWithErrors []string + allParentsPresentOnReceiver := true + f.l.DropWhile(func() { + for _, p := range f.initialRepOrd.parents { + p.l.HoldWhile(func() { + // (get the preconditions that allow us to inspect p.planned) + parentHasPlanningDone := p.planning.done && p.planning.err == nil + if !parentHasPlanningDone { + // if the parent couldn't be planned, we cannot know whether it needs initial replication + // or incremental replication => be conservative and assume it was initial replication + allParentsPresentOnReceiver = false + if p.planning.err != nil { + initialReplicatingParentsWithErrors = append(initialReplicatingParentsWithErrors, p.fs.ReportInfo().Name) + } + return + } + // now allowed to inspect p.planned + + // if there are no steps to be done, the filesystem must exist on the receiving side + // (otherwise we'd replicate it, and there would be a step for that) + // (FIXME hardcoded initial replication policy, assuming the policy will always do _some_ initial replication) + parentHasNoSteps := len(p.planned.steps) == 0 + + // OR if it has completed at least one step + // (remember that .step points to the next step to be done) + // (TODO technically, we could make this step ready in the moment the recv-side + // dataset exists, i.e. after the first few megabytes of transferred data, but we'd have to ask the receiver for that -> poll ListFilesystems RPC) + parentHasTakenAtLeastOneSuccessfulStep := !parentHasNoSteps && p.planned.step >= 1 + + parentFirstStepIsIncremental := // no need to lock for .report() because step.l == it's fs.l + len(p.planned.steps) > 0 && p.planned.steps[0].report().IsIncremental() + + f.debug("parentHasNoSteps=%v parentFirstStepIsIncremental=%v parentHasTakenAtLeastOneSuccessfulStep=%v", + parentHasNoSteps, parentFirstStepIsIncremental, parentHasTakenAtLeastOneSuccessfulStep) + + parentPresentOnReceiver := parentHasNoSteps || parentFirstStepIsIncremental || parentHasTakenAtLeastOneSuccessfulStep + + allParentsPresentOnReceiver = allParentsPresentOnReceiver && parentPresentOnReceiver // no shadow + + if !parentPresentOnReceiver && p.planned.stepErr != nil { + initialReplicatingParentsWithErrors = append(initialReplicatingParentsWithErrors, p.fs.ReportInfo().Name) + } + + }) + } + }) + + if len(initialReplicatingParentsWithErrors) > 0 { + f.planned.stepErr = newTimedError(fmt.Errorf("parent(s) failed during initial replication: %s", initialReplicatingParentsWithErrors), time.Now()) + return + } + + if allParentsPresentOnReceiver { + break // good to go + } + + // wait for wakeups from parents, then check again + // lock must not be held while waiting in order for reporting to work + f.l.DropWhile(func() { + select { + case <-ctx.Done(): + f.planned.stepErr = newTimedError(ctx.Err(), time.Now()) + return + case <-f.initialRepOrd.parentDidUpdate: + // loop + } + }) + if f.planned.stepErr != nil { + return + } + } + + f.debug("all parents ready, start replication %s", parents) + + // do our steps for i, s := range f.planned.steps { - var ( - err error - errTime time.Time - ) // lock must not be held while executing step in order for reporting to work f.l.DropWhile(func() { + // wait for parallel replication targetDate := s.step.TargetDate() defer pq.WaitReady(f, targetDate)() - err = s.step.Step(ctx) // no shadow - errTime = time.Now() // no shadow + // do the step + err, errTime = s.step.Step(ctx), time.Now() // no shadow }) + if err != nil { f.planned.stepErr = newTimedError(err, errTime) break } f.planned.step = i + 1 // fs.planned.step must be == len(fs.planned.steps) if all went OK + + f.initialRepOrdWakeupChildren() } + } // caller must hold lock l diff --git a/replication/driver/replication_driver_debug.go b/replication/driver/replication_driver_debug.go index 6b96f09..536379a 100644 --- a/replication/driver/replication_driver_debug.go +++ b/replication/driver/replication_driver_debug.go @@ -26,6 +26,6 @@ type debugFunc func(format string, args ...interface{}) func debugPrefix(prefixFormat string, prefixFormatArgs ...interface{}) debugFunc { prefix := fmt.Sprintf(prefixFormat, prefixFormatArgs...) return func(format string, args ...interface{}) { - debug("%s: %s", prefix, fmt.Sprintf(format, args)) + debug("%s: %s", prefix, fmt.Sprintf(format, args...)) } } diff --git a/util/chainlock/chainlock.go b/util/chainlock/chainlock.go index ef1d971..0e5fe99 100644 --- a/util/chainlock/chainlock.go +++ b/util/chainlock/chainlock.go @@ -40,3 +40,8 @@ func (l *L) DropWhile(f func()) { defer l.Unlock().Lock() f() } + +func (l *L) HoldWhile(f func()) { + defer l.Lock().Unlock() + f() +}