mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-22 16:34:32 +01:00
replication/driver: enforce ordering during initial replication in order to support encrypted send
fixes #277
This commit is contained in:
parent
1540a478b0
commit
d59b64df86
@ -146,6 +146,12 @@ type fs struct {
|
||||
|
||||
l *chainlock.L
|
||||
|
||||
// ordering relationship that must be maintained for initial replication
|
||||
initialRepOrd struct {
|
||||
parents, children []*fs
|
||||
parentDidUpdate chan struct{}
|
||||
}
|
||||
|
||||
planning struct {
|
||||
done bool
|
||||
err *timedError
|
||||
@ -296,6 +302,7 @@ func (a *attempt) do(ctx context.Context, prev *attempt) {
|
||||
fs: pfs,
|
||||
l: a.l,
|
||||
}
|
||||
fs.initialRepOrd.parentDidUpdate = make(chan struct{}, 1)
|
||||
a.fss = append(a.fss, fs)
|
||||
}
|
||||
|
||||
@ -354,6 +361,18 @@ func (a *attempt) do(ctx context.Context, prev *attempt) {
|
||||
}
|
||||
// invariant: prevs contains an entry for each unambiguous correspondence
|
||||
|
||||
// build up parent-child relationship (FIXME (O(n^2), but who's going to have that many filesystems...))
|
||||
for _, f1 := range a.fss {
|
||||
fs1 := f1.fs.ReportInfo().Name
|
||||
for _, f2 := range a.fss {
|
||||
fs2 := f2.fs.ReportInfo().Name
|
||||
if strings.HasPrefix(fs1, fs2) && fs1 != fs2 {
|
||||
f1.initialRepOrd.parents = append(f1.initialRepOrd.parents, f2)
|
||||
f2.initialRepOrd.children = append(f2.initialRepOrd.children, f1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
stepQueue := newStepQueue()
|
||||
defer stepQueue.Start(envconst.Int("ZREPL_REPLICATION_EXPERIMENTAL_REPLICATION_CONCURRENCY", 1))() // TODO parallel replication
|
||||
var fssesDone sync.WaitGroup
|
||||
@ -374,9 +393,27 @@ func (f *fs) debug(format string, args ...interface{}) {
|
||||
debugPrefix("fs=%s", f.fs.ReportInfo().Name)(format, args...)
|
||||
}
|
||||
|
||||
// wake up children that watch for f.{planning.{err,done},planned.{step,stepErr}}
|
||||
func (f *fs) initialRepOrdWakeupChildren() {
|
||||
var children []string
|
||||
for _, c := range f.initialRepOrd.children {
|
||||
// no locking required, c.fs does not change
|
||||
children = append(children, c.fs.ReportInfo().Name)
|
||||
}
|
||||
f.debug("wakeup children %s", children)
|
||||
for _, child := range f.initialRepOrd.children {
|
||||
select {
|
||||
// no locking required, child.initialRepOrd does not change
|
||||
case child.initialRepOrd.parentDidUpdate <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
|
||||
|
||||
defer f.l.Lock().Unlock()
|
||||
defer f.initialRepOrdWakeupChildren()
|
||||
|
||||
// get planned steps from replication logic
|
||||
var psteps []Step
|
||||
@ -390,7 +427,6 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
|
||||
psteps, err = f.fs.PlanFS(ctx) // no shadow
|
||||
errTime = time.Now() // no shadow
|
||||
})
|
||||
f.planning.done = true
|
||||
if err != nil {
|
||||
f.planning.err = newTimedError(err, errTime)
|
||||
return
|
||||
@ -402,6 +438,8 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
|
||||
}
|
||||
f.planned.steps = append(f.planned.steps, step)
|
||||
}
|
||||
// we're not done planning yet, f.planned.steps might still be changed by next block
|
||||
// => don't set f.planning.done just yet
|
||||
f.debug("initial len(fs.planned.steps) = %d", len(f.planned.steps))
|
||||
|
||||
// for not-first attempts, only allow fs.planned.steps
|
||||
@ -456,24 +494,91 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
|
||||
}
|
||||
f.debug("post-prev-merge len(fs.planned.steps) = %d", len(f.planned.steps))
|
||||
|
||||
// now we are done planning (f.planned.steps won't change from now on)
|
||||
f.planning.done = true
|
||||
|
||||
// wait for parents' initial replication
|
||||
var parents []string
|
||||
for _, p := range f.initialRepOrd.parents {
|
||||
parents = append(parents, p.fs.ReportInfo().Name)
|
||||
}
|
||||
f.debug("wait for parents %s", parents)
|
||||
for {
|
||||
var initialReplicatingParentsWithErrors []string
|
||||
allParentsDidInitialReplication := true
|
||||
f.l.DropWhile(func() {
|
||||
for _, p := range f.initialRepOrd.parents {
|
||||
p.l.Lock()
|
||||
|
||||
parentDidInitialReplication :=
|
||||
// (get the preconditions that allow us to inspect p.planned)
|
||||
p.planning.done && p.planning.err == nil &&
|
||||
// if there are no steps to be done, the filesystem must exist on the receiving side
|
||||
// (otherwise we'd replicate it, and there would be a step for that)
|
||||
(len(p.planned.steps) == 0 ||
|
||||
// OR if it has completed at least one step
|
||||
// (remember that .step points to the next step to be done)
|
||||
// (TODO technically, we could make this step ready in the moment the recv-side
|
||||
// dataset exists, but we'd have to ask the receiver for that -> pool ListFilesystems RPC)
|
||||
(p.planned.step >= 1))
|
||||
|
||||
allParentsDidInitialReplication = allParentsDidInitialReplication && parentDidInitialReplication
|
||||
|
||||
if parentDidInitialReplication && (p.planning.err != nil || p.planned.stepErr != nil) {
|
||||
initialReplicatingParentsWithErrors = append(initialReplicatingParentsWithErrors, p.fs.ReportInfo().Name)
|
||||
}
|
||||
|
||||
p.l.Unlock()
|
||||
}
|
||||
})
|
||||
|
||||
if len(initialReplicatingParentsWithErrors) > 0 {
|
||||
f.planned.stepErr = newTimedError(fmt.Errorf("parent(s) failed during initial replication: %s", initialReplicatingParentsWithErrors), time.Now())
|
||||
return
|
||||
}
|
||||
|
||||
if allParentsDidInitialReplication {
|
||||
break // good to go
|
||||
}
|
||||
|
||||
// wait for wakeups from parents, then check again
|
||||
// lock must not be held while waiting in order for reporting to work
|
||||
f.l.DropWhile(func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
f.planned.stepErr = newTimedError(ctx.Err(), time.Now())
|
||||
return
|
||||
case <-f.initialRepOrd.parentDidUpdate:
|
||||
// loop
|
||||
}
|
||||
})
|
||||
if f.planned.stepErr != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
f.debug("all parents ready, start replication %s", parents)
|
||||
|
||||
// do our steps
|
||||
for i, s := range f.planned.steps {
|
||||
var (
|
||||
err error
|
||||
errTime time.Time
|
||||
)
|
||||
// lock must not be held while executing step in order for reporting to work
|
||||
f.l.DropWhile(func() {
|
||||
// wait for parallel replication
|
||||
targetDate := s.step.TargetDate()
|
||||
defer pq.WaitReady(f, targetDate)()
|
||||
err = s.step.Step(ctx) // no shadow
|
||||
errTime = time.Now() // no shadow
|
||||
// do the step
|
||||
err, errTime = s.step.Step(ctx), time.Now() // no shadow
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
f.planned.stepErr = newTimedError(err, errTime)
|
||||
break
|
||||
}
|
||||
f.planned.step = i + 1 // fs.planned.step must be == len(fs.planned.steps) if all went OK
|
||||
|
||||
f.initialRepOrdWakeupChildren()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// caller must hold lock l
|
||||
|
@ -26,6 +26,6 @@ type debugFunc func(format string, args ...interface{})
|
||||
func debugPrefix(prefixFormat string, prefixFormatArgs ...interface{}) debugFunc {
|
||||
prefix := fmt.Sprintf(prefixFormat, prefixFormatArgs...)
|
||||
return func(format string, args ...interface{}) {
|
||||
debug("%s: %s", prefix, fmt.Sprintf(format, args))
|
||||
debug("%s: %s", prefix, fmt.Sprintf(format, args...))
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user