diff --git a/endpoint/endpoint.go b/endpoint/endpoint.go index 1ae9dc9..5240443 100644 --- a/endpoint/endpoint.go +++ b/endpoint/endpoint.go @@ -104,13 +104,28 @@ func (s *Sender) ListFilesystems(ctx context.Context, r *pdu.ListFilesystemReq) if err != nil { return nil, err } - rfss := make([]*pdu.Filesystem, len(fss)) - for i := range fss { - rfss[i] = &pdu.Filesystem{ - Path: fss[i].ToString(), - // ResumeToken does not make sense from Sender - IsPlaceholder: false, // sender FSs are never placeholders + rfss := make([]*pdu.Filesystem, 0, len(fss)) + for _, a := range fss { + // TODO: dedup code with Receiver.ListFilesystems + l := getLogger(ctx).WithField("fs", a) + ph, err := zfs.ZFSGetFilesystemPlaceholderState(ctx, a) + if err != nil { + l.WithError(err).Error("error getting placeholder state") + return nil, errors.Wrapf(err, "cannot get placeholder state for fs %q", a) } + l.WithField("placeholder_state", fmt.Sprintf("%#v", ph)).Debug("placeholder state") + if !ph.FSExists { + l.Error("inconsistent placeholder state: filesystem must exists") + err := errors.Errorf("inconsistent placeholder state: filesystem %q must exist in this context", a.ToString()) + return nil, err + } + + fs := &pdu.Filesystem{ + Path: a.ToString(), + // ResumeToken does not make sense from Sender + IsPlaceholder: ph.IsPlaceholder, + } + rfss = append(rfss, fs) } res := &pdu.ListFilesystemRes{Filesystems: rfss} return res, nil diff --git a/platformtest/tests/generated_cases.go b/platformtest/tests/generated_cases.go index 60443f6..7a9fa6e 100644 --- a/platformtest/tests/generated_cases.go +++ b/platformtest/tests/generated_cases.go @@ -28,6 +28,7 @@ var Cases = []Case{BatchDestroy, ReplicationIsResumableFullSend__both_GuaranteeResumability, ReplicationIsResumableFullSend__initial_GuaranteeIncrementalReplication_incremental_GuaranteeIncrementalReplication, ReplicationIsResumableFullSend__initial_GuaranteeResumability_incremental_GuaranteeIncrementalReplication, + ReplicationOfPlaceholderFilesystemsInChainedReplicationScenario, ReplicationPlaceholderEncryption__EncryptOnReceiverUseCase__WorksIfConfiguredWithInherit, ReplicationPlaceholderEncryption__UnspecifiedIsOkForClientIdentityPlaceholder, ReplicationPlaceholderEncryption__UnspecifiedLeadsToFailureAtRuntimeWhenCreatingPlaceholders, diff --git a/platformtest/tests/replication.go b/platformtest/tests/replication.go index 2c7a512..1bbdd44 100644 --- a/platformtest/tests/replication.go +++ b/platformtest/tests/replication.go @@ -1483,3 +1483,100 @@ func ReplicationInitialFail(ctx *platformtest.Context) { require.NotNil(ctx, report.Attempts[0].Filesystems[0].PlanError) require.Contains(ctx, report.Attempts[0].Filesystems[0].PlanError.Err, "automatic conflict resolution for initial replication is disabled in config") } + +// https://github.com/zrepl/zrepl/issues/742 +func ReplicationOfPlaceholderFilesystemsInChainedReplicationScenario(ctx *platformtest.Context) { + + // + // Setup datasets + // + platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, ` + CREATEROOT + + "host1" + + "host1/a" + + "host1/a/b" + + "host1/a/b@1" + + "host2" + + "host2/sink" + + "host3" + + "host3/sink" + `) + + // Replicate host1/a to host2/sink + host1_a := ctx.RootDataset + "/host1/a" + host1_b := ctx.RootDataset + "/host1/a/b" + + host2_sink := ctx.RootDataset + "/host2/sink" + host2_a := host2_sink + "/" + host1_a + host2_b := host2_sink + "/" + host1_b + + host3_sink := ctx.RootDataset + "/host3/sink" + host3_a := host3_sink + "/" + host2_a + host3_b := host3_sink + "/" + host2_b + + type job struct { + sender, receiver endpoint.JobID + receiver_root string + sender_filesystems_filter map[string]bool + } + h1_to_h2 := job{ + sender: endpoint.MustMakeJobID("h1-to-h2-sender"), + receiver: endpoint.MustMakeJobID("h1-to-h2-receiver"), + sender_filesystems_filter: map[string]bool{ + // omit host1_a so that it becomes a placeholder on host2 + host1_b: true, + }, + receiver_root: host2_sink, + } + h2_to_h3 := job{ + sender: endpoint.MustMakeJobID("h2-to-h3-sender"), + receiver: endpoint.MustMakeJobID("h2-to-h3-receiver"), + sender_filesystems_filter: map[string]bool{ + host2_sink: false, + host2_sink + "<": true, + }, + receiver_root: host3_sink, + } + + do_repl := func(j job) { + + sfilter := filters.NewDatasetMapFilter(len(j.sender_filesystems_filter), true) + + for lhs, rhs := range j.sender_filesystems_filter { + var err error + if rhs { + err = sfilter.Add(lhs, filters.MapFilterResultOk) + } else { + err = sfilter.Add(lhs, filters.MapFilterResultOmit) + } + require.NoError(ctx, err) + } + + rep := replicationInvocation{ + sjid: j.sender, + rjid: j.receiver, + sfilter: sfilter, + rfsRoot: j.receiver_root, + guarantee: pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeResumability), + receiverConfigHook: func(rc *endpoint.ReceiverConfig) { + rc.PlaceholderEncryption = endpoint.PlaceholderCreationEncryptionPropertyOff + }, + } + + r := rep.Do(ctx) + ctx.Logf("\n%s", pretty.Sprint(r)) + } + + do_repl(h1_to_h2) + do_repl(h2_to_h3) + + // assert that the replication worked + mustGetFilesystemVersion(ctx, host3_b+"@1") + + // assert placeholder status + for _, fs := range []string{host2_a, host3_a} { + st, err := zfs.ZFSGetFilesystemPlaceholderState(ctx, mustDatasetPath(fs)) + require.NoError(ctx, err) + require.True(ctx, st.IsPlaceholder) + } +} diff --git a/replication/driver/replication_driver.go b/replication/driver/replication_driver.go index 0d85501..aa8adbd 100644 --- a/replication/driver/replication_driver.go +++ b/replication/driver/replication_driver.go @@ -529,27 +529,31 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) { // find the highest of the previously uncompleted steps for which we can also find a step // in our current plan prevUncompleted := prev.planned.steps[prev.planned.step:] - var target struct{ prev, cur int } - target.prev = -1 - target.cur = -1 - out: - for p := len(prevUncompleted) - 1; p >= 0; p-- { - for q := len(f.planned.steps) - 1; q >= 0; q-- { - if prevUncompleted[p].step.TargetEquals(f.planned.steps[q].step) { - target.prev = p - target.cur = q - break out + if len(prevUncompleted) == 0 || len(f.planned.steps) == 0 { + f.debug("no steps planned in previous attempt or this attempt, no correlation necessary len(prevUncompleted)=%d len(f.planned.steps)=%d", len(prevUncompleted), len(f.planned.steps)) + } else { + var target struct{ prev, cur int } + target.prev = -1 + target.cur = -1 + out: + for p := len(prevUncompleted) - 1; p >= 0; p-- { + for q := len(f.planned.steps) - 1; q >= 0; q-- { + if prevUncompleted[p].step.TargetEquals(f.planned.steps[q].step) { + target.prev = p + target.cur = q + break out + } } } - } - if target.prev == -1 || target.cur == -1 { - f.debug("no correlation possible between previous attempt and this attempt's plan") - f.planning.err = newTimedError(fmt.Errorf("cannot correlate previously failed attempt to current plan"), time.Now()) - return - } + if target.prev == -1 || target.cur == -1 { + f.debug("no correlation possible between previous attempt and this attempt's plan") + f.planning.err = newTimedError(fmt.Errorf("cannot correlate previously failed attempt to current plan"), time.Now()) + return + } - f.planned.steps = f.planned.steps[0:target.cur] - f.debug("found correlation, new steps are len(fs.planned.steps) = %d", len(f.planned.steps)) + f.planned.steps = f.planned.steps[0:target.cur] + f.debug("found correlation, new steps are len(fs.planned.steps) = %d", len(f.planned.steps)) + } } else { f.debug("previous attempt does not exist or did not finish planning, no correlation possible, taking this attempt's plan as is") } @@ -600,6 +604,8 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) { f.debug("parentHasNoSteps=%v parentFirstStepIsIncremental=%v parentHasTakenAtLeastOneSuccessfulStep=%v", parentHasNoSteps, parentFirstStepIsIncremental, parentHasTakenAtLeastOneSuccessfulStep) + // If the parent is a placeholder on the sender, `parentHasNoSteps` is true because we plan no steps for sender placeholders. + // The receiver will create the necessary placeholders when they start receiving the first non-placeholder child filesystem. parentPresentOnReceiver := parentHasNoSteps || parentFirstStepIsIncremental || parentHasTakenAtLeastOneSuccessfulStep allParentsPresentOnReceiver = allParentsPresentOnReceiver && parentPresentOnReceiver // no shadow diff --git a/replication/logic/replication_logic.go b/replication/logic/replication_logic.go index 9d187a9..41a1e3b 100644 --- a/replication/logic/replication_logic.go +++ b/replication/logic/replication_logic.go @@ -333,6 +333,22 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) { log(ctx).Debug("assessing filesystem") + if fs.senderFS.IsPlaceholder { + log(ctx).Debug("sender filesystem is placeholder") + if fs.receiverFS != nil { + if fs.receiverFS.IsPlaceholder { + // all good, fall through + log(ctx).Debug("receiver filesystem is placeholder") + } else { + err := fmt.Errorf("sender filesystem is placeholder, but receiver filesystem is not") + log(ctx).Error(err.Error()) + return nil, err + } + } + log(ctx).Debug("no steps required for replicating placeholders, the endpoint.Receiver will create a placeholder when we receive the first non-placeholder child filesystem") + return nil, nil + } + sfsvsres, err := fs.sender.ListFilesystemVersions(ctx, &pdu.ListFilesystemVersionsReq{Filesystem: fs.Path}) if err != nil { log(ctx).WithError(err).Error("cannot get remote filesystem versions")