mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-22 08:23:50 +01:00
[#277] replication/driver: enforce ordering during initial replication in order to support encrypted send
fixes #277
This commit is contained in:
parent
b4abebce00
commit
0280727985
@ -146,6 +146,12 @@ type fs struct {
|
|||||||
|
|
||||||
l *chainlock.L
|
l *chainlock.L
|
||||||
|
|
||||||
|
// ordering relationship that must be maintained for initial replication
|
||||||
|
initialRepOrd struct {
|
||||||
|
parents, children []*fs
|
||||||
|
parentDidUpdate chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
planning struct {
|
planning struct {
|
||||||
done bool
|
done bool
|
||||||
err *timedError
|
err *timedError
|
||||||
@ -296,6 +302,7 @@ func (a *attempt) do(ctx context.Context, prev *attempt) {
|
|||||||
fs: pfs,
|
fs: pfs,
|
||||||
l: a.l,
|
l: a.l,
|
||||||
}
|
}
|
||||||
|
fs.initialRepOrd.parentDidUpdate = make(chan struct{}, 1)
|
||||||
a.fss = append(a.fss, fs)
|
a.fss = append(a.fss, fs)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -354,6 +361,18 @@ func (a *attempt) do(ctx context.Context, prev *attempt) {
|
|||||||
}
|
}
|
||||||
// invariant: prevs contains an entry for each unambiguous correspondence
|
// invariant: prevs contains an entry for each unambiguous correspondence
|
||||||
|
|
||||||
|
// build up parent-child relationship (FIXME (O(n^2), but who's going to have that many filesystems...))
|
||||||
|
for _, f1 := range a.fss {
|
||||||
|
fs1 := f1.fs.ReportInfo().Name
|
||||||
|
for _, f2 := range a.fss {
|
||||||
|
fs2 := f2.fs.ReportInfo().Name
|
||||||
|
if strings.HasPrefix(fs1, fs2) && fs1 != fs2 {
|
||||||
|
f1.initialRepOrd.parents = append(f1.initialRepOrd.parents, f2)
|
||||||
|
f2.initialRepOrd.children = append(f2.initialRepOrd.children, f1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
stepQueue := newStepQueue()
|
stepQueue := newStepQueue()
|
||||||
defer stepQueue.Start(envconst.Int("ZREPL_REPLICATION_EXPERIMENTAL_REPLICATION_CONCURRENCY", 1))() // TODO parallel replication
|
defer stepQueue.Start(envconst.Int("ZREPL_REPLICATION_EXPERIMENTAL_REPLICATION_CONCURRENCY", 1))() // TODO parallel replication
|
||||||
var fssesDone sync.WaitGroup
|
var fssesDone sync.WaitGroup
|
||||||
@ -374,9 +393,27 @@ func (f *fs) debug(format string, args ...interface{}) {
|
|||||||
debugPrefix("fs=%s", f.fs.ReportInfo().Name)(format, args...)
|
debugPrefix("fs=%s", f.fs.ReportInfo().Name)(format, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// wake up children that watch for f.{planning.{err,done},planned.{step,stepErr}}
|
||||||
|
func (f *fs) initialRepOrdWakeupChildren() {
|
||||||
|
var children []string
|
||||||
|
for _, c := range f.initialRepOrd.children {
|
||||||
|
// no locking required, c.fs does not change
|
||||||
|
children = append(children, c.fs.ReportInfo().Name)
|
||||||
|
}
|
||||||
|
f.debug("wakeup children %s", children)
|
||||||
|
for _, child := range f.initialRepOrd.children {
|
||||||
|
select {
|
||||||
|
// no locking required, child.initialRepOrd does not change
|
||||||
|
case child.initialRepOrd.parentDidUpdate <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
|
func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
|
||||||
|
|
||||||
defer f.l.Lock().Unlock()
|
defer f.l.Lock().Unlock()
|
||||||
|
defer f.initialRepOrdWakeupChildren()
|
||||||
|
|
||||||
// get planned steps from replication logic
|
// get planned steps from replication logic
|
||||||
var psteps []Step
|
var psteps []Step
|
||||||
@ -390,7 +427,6 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
|
|||||||
psteps, err = f.fs.PlanFS(ctx) // no shadow
|
psteps, err = f.fs.PlanFS(ctx) // no shadow
|
||||||
errTime = time.Now() // no shadow
|
errTime = time.Now() // no shadow
|
||||||
})
|
})
|
||||||
f.planning.done = true
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
f.planning.err = newTimedError(err, errTime)
|
f.planning.err = newTimedError(err, errTime)
|
||||||
return
|
return
|
||||||
@ -402,6 +438,8 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
|
|||||||
}
|
}
|
||||||
f.planned.steps = append(f.planned.steps, step)
|
f.planned.steps = append(f.planned.steps, step)
|
||||||
}
|
}
|
||||||
|
// we're not done planning yet, f.planned.steps might still be changed by next block
|
||||||
|
// => don't set f.planning.done just yet
|
||||||
f.debug("initial len(fs.planned.steps) = %d", len(f.planned.steps))
|
f.debug("initial len(fs.planned.steps) = %d", len(f.planned.steps))
|
||||||
|
|
||||||
// for not-first attempts, only allow fs.planned.steps
|
// for not-first attempts, only allow fs.planned.steps
|
||||||
@ -456,24 +494,110 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
|
|||||||
}
|
}
|
||||||
f.debug("post-prev-merge len(fs.planned.steps) = %d", len(f.planned.steps))
|
f.debug("post-prev-merge len(fs.planned.steps) = %d", len(f.planned.steps))
|
||||||
|
|
||||||
|
// now we are done planning (f.planned.steps won't change from now on)
|
||||||
|
f.planning.done = true
|
||||||
|
|
||||||
|
// wait for parents' initial replication
|
||||||
|
var parents []string
|
||||||
|
for _, p := range f.initialRepOrd.parents {
|
||||||
|
parents = append(parents, p.fs.ReportInfo().Name)
|
||||||
|
}
|
||||||
|
f.debug("wait for parents %s", parents)
|
||||||
|
for {
|
||||||
|
var initialReplicatingParentsWithErrors []string
|
||||||
|
allParentsPresentOnReceiver := true
|
||||||
|
f.l.DropWhile(func() {
|
||||||
|
for _, p := range f.initialRepOrd.parents {
|
||||||
|
p.l.HoldWhile(func() {
|
||||||
|
// (get the preconditions that allow us to inspect p.planned)
|
||||||
|
parentHasPlanningDone := p.planning.done && p.planning.err == nil
|
||||||
|
if !parentHasPlanningDone {
|
||||||
|
// if the parent couldn't be planned, we cannot know whether it needs initial replication
|
||||||
|
// or incremental replication => be conservative and assume it was initial replication
|
||||||
|
allParentsPresentOnReceiver = false
|
||||||
|
if p.planning.err != nil {
|
||||||
|
initialReplicatingParentsWithErrors = append(initialReplicatingParentsWithErrors, p.fs.ReportInfo().Name)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// now allowed to inspect p.planned
|
||||||
|
|
||||||
|
// if there are no steps to be done, the filesystem must exist on the receiving side
|
||||||
|
// (otherwise we'd replicate it, and there would be a step for that)
|
||||||
|
// (FIXME hardcoded initial replication policy, assuming the policy will always do _some_ initial replication)
|
||||||
|
parentHasNoSteps := len(p.planned.steps) == 0
|
||||||
|
|
||||||
|
// OR if it has completed at least one step
|
||||||
|
// (remember that .step points to the next step to be done)
|
||||||
|
// (TODO technically, we could make this step ready in the moment the recv-side
|
||||||
|
// dataset exists, i.e. after the first few megabytes of transferred data, but we'd have to ask the receiver for that -> poll ListFilesystems RPC)
|
||||||
|
parentHasTakenAtLeastOneSuccessfulStep := !parentHasNoSteps && p.planned.step >= 1
|
||||||
|
|
||||||
|
parentFirstStepIsIncremental := // no need to lock for .report() because step.l == it's fs.l
|
||||||
|
len(p.planned.steps) > 0 && p.planned.steps[0].report().IsIncremental()
|
||||||
|
|
||||||
|
f.debug("parentHasNoSteps=%v parentFirstStepIsIncremental=%v parentHasTakenAtLeastOneSuccessfulStep=%v",
|
||||||
|
parentHasNoSteps, parentFirstStepIsIncremental, parentHasTakenAtLeastOneSuccessfulStep)
|
||||||
|
|
||||||
|
parentPresentOnReceiver := parentHasNoSteps || parentFirstStepIsIncremental || parentHasTakenAtLeastOneSuccessfulStep
|
||||||
|
|
||||||
|
allParentsPresentOnReceiver = allParentsPresentOnReceiver && parentPresentOnReceiver // no shadow
|
||||||
|
|
||||||
|
if !parentPresentOnReceiver && p.planned.stepErr != nil {
|
||||||
|
initialReplicatingParentsWithErrors = append(initialReplicatingParentsWithErrors, p.fs.ReportInfo().Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
if len(initialReplicatingParentsWithErrors) > 0 {
|
||||||
|
f.planned.stepErr = newTimedError(fmt.Errorf("parent(s) failed during initial replication: %s", initialReplicatingParentsWithErrors), time.Now())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if allParentsPresentOnReceiver {
|
||||||
|
break // good to go
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for wakeups from parents, then check again
|
||||||
|
// lock must not be held while waiting in order for reporting to work
|
||||||
|
f.l.DropWhile(func() {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
f.planned.stepErr = newTimedError(ctx.Err(), time.Now())
|
||||||
|
return
|
||||||
|
case <-f.initialRepOrd.parentDidUpdate:
|
||||||
|
// loop
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if f.planned.stepErr != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
f.debug("all parents ready, start replication %s", parents)
|
||||||
|
|
||||||
|
// do our steps
|
||||||
for i, s := range f.planned.steps {
|
for i, s := range f.planned.steps {
|
||||||
var (
|
|
||||||
err error
|
|
||||||
errTime time.Time
|
|
||||||
)
|
|
||||||
// lock must not be held while executing step in order for reporting to work
|
// lock must not be held while executing step in order for reporting to work
|
||||||
f.l.DropWhile(func() {
|
f.l.DropWhile(func() {
|
||||||
|
// wait for parallel replication
|
||||||
targetDate := s.step.TargetDate()
|
targetDate := s.step.TargetDate()
|
||||||
defer pq.WaitReady(f, targetDate)()
|
defer pq.WaitReady(f, targetDate)()
|
||||||
err = s.step.Step(ctx) // no shadow
|
// do the step
|
||||||
errTime = time.Now() // no shadow
|
err, errTime = s.step.Step(ctx), time.Now() // no shadow
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
f.planned.stepErr = newTimedError(err, errTime)
|
f.planned.stepErr = newTimedError(err, errTime)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
f.planned.step = i + 1 // fs.planned.step must be == len(fs.planned.steps) if all went OK
|
f.planned.step = i + 1 // fs.planned.step must be == len(fs.planned.steps) if all went OK
|
||||||
|
|
||||||
|
f.initialRepOrdWakeupChildren()
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// caller must hold lock l
|
// caller must hold lock l
|
||||||
|
@ -26,6 +26,6 @@ type debugFunc func(format string, args ...interface{})
|
|||||||
func debugPrefix(prefixFormat string, prefixFormatArgs ...interface{}) debugFunc {
|
func debugPrefix(prefixFormat string, prefixFormatArgs ...interface{}) debugFunc {
|
||||||
prefix := fmt.Sprintf(prefixFormat, prefixFormatArgs...)
|
prefix := fmt.Sprintf(prefixFormat, prefixFormatArgs...)
|
||||||
return func(format string, args ...interface{}) {
|
return func(format string, args ...interface{}) {
|
||||||
debug("%s: %s", prefix, fmt.Sprintf(format, args))
|
debug("%s: %s", prefix, fmt.Sprintf(format, args...))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -40,3 +40,8 @@ func (l *L) DropWhile(f func()) {
|
|||||||
defer l.Unlock().Lock()
|
defer l.Unlock().Lock()
|
||||||
f()
|
f()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *L) HoldWhile(f func()) {
|
||||||
|
defer l.Lock().Unlock()
|
||||||
|
f()
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user