From 5615f4929ae77cd308d486d0ea9019457372c582 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 5 Sep 2024 23:26:42 +0200 Subject: [PATCH] fix: replication of placeholder filesystems (#744) fixes https://github.com/zrepl/zrepl/issues/742 Before this PR, when chaining replication from A => B => C, if B had placeholders and the `filesystems` included these placeholders, we'd incorrectly fail the planning phase with error `sender does not have any versions`. The non-placeholder child filesystems of these placeholders would then fail to replicate because of the initial-replication-dependency-tracking that we do, i.e., their parent failed to initially replication, hence they fail to replicate as well (`parent(s) failed during initial replication`). We can do better than that because we have the information whether a sender-side filesystem is a placeholder. This PR makes the planner act on that information. The outcome is that placeholders are replicated as placeholders (albeit the receiver remains in control of how these placeholders are created, i.e., `recv.placeholders`) The mechanism to do it is: 1. Don't plan any replication steps for filesystems that are placeholders on the sender. 2. Ensure that, if a receiving-side filesystem exists, it is indeed a placeholder. Check (2) may seem overly restrictive, but, the goal here is not just to mirror all non-placeholder filesystems, but also to mirror the hierarchy. Testing performed: - [x] confirm with issue reporter that this PR fixes their issue - [x] add a regression test that fails without the changes in this PR --- endpoint/endpoint.go | 27 +++++-- platformtest/tests/generated_cases.go | 1 + platformtest/tests/replication.go | 97 ++++++++++++++++++++++++ replication/driver/replication_driver.go | 42 +++++----- replication/logic/replication_logic.go | 16 ++++ 5 files changed, 159 insertions(+), 24 deletions(-) diff --git a/endpoint/endpoint.go b/endpoint/endpoint.go index 1ae9dc9..5240443 100644 --- a/endpoint/endpoint.go +++ b/endpoint/endpoint.go @@ -104,13 +104,28 @@ func (s *Sender) ListFilesystems(ctx context.Context, r *pdu.ListFilesystemReq) if err != nil { return nil, err } - rfss := make([]*pdu.Filesystem, len(fss)) - for i := range fss { - rfss[i] = &pdu.Filesystem{ - Path: fss[i].ToString(), - // ResumeToken does not make sense from Sender - IsPlaceholder: false, // sender FSs are never placeholders + rfss := make([]*pdu.Filesystem, 0, len(fss)) + for _, a := range fss { + // TODO: dedup code with Receiver.ListFilesystems + l := getLogger(ctx).WithField("fs", a) + ph, err := zfs.ZFSGetFilesystemPlaceholderState(ctx, a) + if err != nil { + l.WithError(err).Error("error getting placeholder state") + return nil, errors.Wrapf(err, "cannot get placeholder state for fs %q", a) } + l.WithField("placeholder_state", fmt.Sprintf("%#v", ph)).Debug("placeholder state") + if !ph.FSExists { + l.Error("inconsistent placeholder state: filesystem must exists") + err := errors.Errorf("inconsistent placeholder state: filesystem %q must exist in this context", a.ToString()) + return nil, err + } + + fs := &pdu.Filesystem{ + Path: a.ToString(), + // ResumeToken does not make sense from Sender + IsPlaceholder: ph.IsPlaceholder, + } + rfss = append(rfss, fs) } res := &pdu.ListFilesystemRes{Filesystems: rfss} return res, nil diff --git a/platformtest/tests/generated_cases.go b/platformtest/tests/generated_cases.go index 60443f6..7a9fa6e 100644 --- a/platformtest/tests/generated_cases.go +++ b/platformtest/tests/generated_cases.go @@ -28,6 +28,7 @@ var Cases = []Case{BatchDestroy, ReplicationIsResumableFullSend__both_GuaranteeResumability, ReplicationIsResumableFullSend__initial_GuaranteeIncrementalReplication_incremental_GuaranteeIncrementalReplication, ReplicationIsResumableFullSend__initial_GuaranteeResumability_incremental_GuaranteeIncrementalReplication, + ReplicationOfPlaceholderFilesystemsInChainedReplicationScenario, ReplicationPlaceholderEncryption__EncryptOnReceiverUseCase__WorksIfConfiguredWithInherit, ReplicationPlaceholderEncryption__UnspecifiedIsOkForClientIdentityPlaceholder, ReplicationPlaceholderEncryption__UnspecifiedLeadsToFailureAtRuntimeWhenCreatingPlaceholders, diff --git a/platformtest/tests/replication.go b/platformtest/tests/replication.go index 2c7a512..1bbdd44 100644 --- a/platformtest/tests/replication.go +++ b/platformtest/tests/replication.go @@ -1483,3 +1483,100 @@ func ReplicationInitialFail(ctx *platformtest.Context) { require.NotNil(ctx, report.Attempts[0].Filesystems[0].PlanError) require.Contains(ctx, report.Attempts[0].Filesystems[0].PlanError.Err, "automatic conflict resolution for initial replication is disabled in config") } + +// https://github.com/zrepl/zrepl/issues/742 +func ReplicationOfPlaceholderFilesystemsInChainedReplicationScenario(ctx *platformtest.Context) { + + // + // Setup datasets + // + platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, ` + CREATEROOT + + "host1" + + "host1/a" + + "host1/a/b" + + "host1/a/b@1" + + "host2" + + "host2/sink" + + "host3" + + "host3/sink" + `) + + // Replicate host1/a to host2/sink + host1_a := ctx.RootDataset + "/host1/a" + host1_b := ctx.RootDataset + "/host1/a/b" + + host2_sink := ctx.RootDataset + "/host2/sink" + host2_a := host2_sink + "/" + host1_a + host2_b := host2_sink + "/" + host1_b + + host3_sink := ctx.RootDataset + "/host3/sink" + host3_a := host3_sink + "/" + host2_a + host3_b := host3_sink + "/" + host2_b + + type job struct { + sender, receiver endpoint.JobID + receiver_root string + sender_filesystems_filter map[string]bool + } + h1_to_h2 := job{ + sender: endpoint.MustMakeJobID("h1-to-h2-sender"), + receiver: endpoint.MustMakeJobID("h1-to-h2-receiver"), + sender_filesystems_filter: map[string]bool{ + // omit host1_a so that it becomes a placeholder on host2 + host1_b: true, + }, + receiver_root: host2_sink, + } + h2_to_h3 := job{ + sender: endpoint.MustMakeJobID("h2-to-h3-sender"), + receiver: endpoint.MustMakeJobID("h2-to-h3-receiver"), + sender_filesystems_filter: map[string]bool{ + host2_sink: false, + host2_sink + "<": true, + }, + receiver_root: host3_sink, + } + + do_repl := func(j job) { + + sfilter := filters.NewDatasetMapFilter(len(j.sender_filesystems_filter), true) + + for lhs, rhs := range j.sender_filesystems_filter { + var err error + if rhs { + err = sfilter.Add(lhs, filters.MapFilterResultOk) + } else { + err = sfilter.Add(lhs, filters.MapFilterResultOmit) + } + require.NoError(ctx, err) + } + + rep := replicationInvocation{ + sjid: j.sender, + rjid: j.receiver, + sfilter: sfilter, + rfsRoot: j.receiver_root, + guarantee: pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeResumability), + receiverConfigHook: func(rc *endpoint.ReceiverConfig) { + rc.PlaceholderEncryption = endpoint.PlaceholderCreationEncryptionPropertyOff + }, + } + + r := rep.Do(ctx) + ctx.Logf("\n%s", pretty.Sprint(r)) + } + + do_repl(h1_to_h2) + do_repl(h2_to_h3) + + // assert that the replication worked + mustGetFilesystemVersion(ctx, host3_b+"@1") + + // assert placeholder status + for _, fs := range []string{host2_a, host3_a} { + st, err := zfs.ZFSGetFilesystemPlaceholderState(ctx, mustDatasetPath(fs)) + require.NoError(ctx, err) + require.True(ctx, st.IsPlaceholder) + } +} diff --git a/replication/driver/replication_driver.go b/replication/driver/replication_driver.go index 0d85501..aa8adbd 100644 --- a/replication/driver/replication_driver.go +++ b/replication/driver/replication_driver.go @@ -529,27 +529,31 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) { // find the highest of the previously uncompleted steps for which we can also find a step // in our current plan prevUncompleted := prev.planned.steps[prev.planned.step:] - var target struct{ prev, cur int } - target.prev = -1 - target.cur = -1 - out: - for p := len(prevUncompleted) - 1; p >= 0; p-- { - for q := len(f.planned.steps) - 1; q >= 0; q-- { - if prevUncompleted[p].step.TargetEquals(f.planned.steps[q].step) { - target.prev = p - target.cur = q - break out + if len(prevUncompleted) == 0 || len(f.planned.steps) == 0 { + f.debug("no steps planned in previous attempt or this attempt, no correlation necessary len(prevUncompleted)=%d len(f.planned.steps)=%d", len(prevUncompleted), len(f.planned.steps)) + } else { + var target struct{ prev, cur int } + target.prev = -1 + target.cur = -1 + out: + for p := len(prevUncompleted) - 1; p >= 0; p-- { + for q := len(f.planned.steps) - 1; q >= 0; q-- { + if prevUncompleted[p].step.TargetEquals(f.planned.steps[q].step) { + target.prev = p + target.cur = q + break out + } } } - } - if target.prev == -1 || target.cur == -1 { - f.debug("no correlation possible between previous attempt and this attempt's plan") - f.planning.err = newTimedError(fmt.Errorf("cannot correlate previously failed attempt to current plan"), time.Now()) - return - } + if target.prev == -1 || target.cur == -1 { + f.debug("no correlation possible between previous attempt and this attempt's plan") + f.planning.err = newTimedError(fmt.Errorf("cannot correlate previously failed attempt to current plan"), time.Now()) + return + } - f.planned.steps = f.planned.steps[0:target.cur] - f.debug("found correlation, new steps are len(fs.planned.steps) = %d", len(f.planned.steps)) + f.planned.steps = f.planned.steps[0:target.cur] + f.debug("found correlation, new steps are len(fs.planned.steps) = %d", len(f.planned.steps)) + } } else { f.debug("previous attempt does not exist or did not finish planning, no correlation possible, taking this attempt's plan as is") } @@ -600,6 +604,8 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) { f.debug("parentHasNoSteps=%v parentFirstStepIsIncremental=%v parentHasTakenAtLeastOneSuccessfulStep=%v", parentHasNoSteps, parentFirstStepIsIncremental, parentHasTakenAtLeastOneSuccessfulStep) + // If the parent is a placeholder on the sender, `parentHasNoSteps` is true because we plan no steps for sender placeholders. + // The receiver will create the necessary placeholders when they start receiving the first non-placeholder child filesystem. parentPresentOnReceiver := parentHasNoSteps || parentFirstStepIsIncremental || parentHasTakenAtLeastOneSuccessfulStep allParentsPresentOnReceiver = allParentsPresentOnReceiver && parentPresentOnReceiver // no shadow diff --git a/replication/logic/replication_logic.go b/replication/logic/replication_logic.go index 9d187a9..41a1e3b 100644 --- a/replication/logic/replication_logic.go +++ b/replication/logic/replication_logic.go @@ -333,6 +333,22 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) { log(ctx).Debug("assessing filesystem") + if fs.senderFS.IsPlaceholder { + log(ctx).Debug("sender filesystem is placeholder") + if fs.receiverFS != nil { + if fs.receiverFS.IsPlaceholder { + // all good, fall through + log(ctx).Debug("receiver filesystem is placeholder") + } else { + err := fmt.Errorf("sender filesystem is placeholder, but receiver filesystem is not") + log(ctx).Error(err.Error()) + return nil, err + } + } + log(ctx).Debug("no steps required for replicating placeholders, the endpoint.Receiver will create a placeholder when we receive the first non-placeholder child filesystem") + return nil, nil + } + sfsvsres, err := fs.sender.ListFilesystemVersions(ctx, &pdu.ListFilesystemVersionsReq{Filesystem: fs.Path}) if err != nil { log(ctx).WithError(err).Error("cannot get remote filesystem versions")