From bbdc6f5465782b38473842d011055137cdf8a412 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 4 Jul 2023 20:21:48 +0200 Subject: [PATCH] fix handling of tenative cursor presence if protection strategy doesn't use it (#714) Before this PR, we would panic in the `check` phase of `endpoint.Send()`'s `TryBatchDestroy` call in the following cases: the current protection strategy does NOT produce a tentative replication cursor AND * `FromVersion` is a tentative cursor bookmark * `FromVersion` is a snapshot, and there exists a tentative cursor bookmark for that snapshot * `FromVersion` is a bookmark != tentative cursor bookmark, but there exists a tentative cursor bookmark for the same snapshot as the `FromVersion` bookmark In those cases, the `check` concluded that we would delete `FromVersion`. It came to that conclusion because the tentative cursor isn't part of `obsoleteAbs` if the protection strategy doesn't produce a tentative replication cursor. The scenarios above can happen if the user changes the protection strategy from "with tentative cursor" to one "without tentative replication cursor", while there is a tentative replication cursor on disk. The workaround was to rename the tentative cursor. In all cases above, `TryBatchDestroy` would have destroyed the tentative cursor. In case 1, that would fail the `Send` step and potentially break replication if the cursor is the last common bookmark. The `check` conclusion was correct. In cases 2 and 3, deleting the tentative cursor would have been fine because `FromVersion` was a different entity than the tentative cursor. So, destroying the tentative cursor would be the right call. The solution in this PR is as follows: * add the `FromVersion` to the `liveAbs` set of live abstractions * rewrite the `check` closure to use the full dataset path (`fullpath`) to identify the concrete ZFS object instead of the `zfs.FilesystemVersionEqualIdentity`, which is only identified by matching GUID. * Holds have no dataset path and are not the `FromVersion` in any case, so disregard them. fixes #666 --- endpoint/endpoint.go | 76 ++++++++++++++------- endpoint/endpoint_guarantees.go | 6 ++ endpoint/endpoint_zfs_abstraction.go | 51 +++++++++++++- platformtest/tests/generated_cases.go | 1 + platformtest/tests/replication.go | 95 ++++++++++++++++++++++++++- 5 files changed, 200 insertions(+), 29 deletions(-) diff --git a/endpoint/endpoint.go b/endpoint/endpoint.go index dcc9d9f..1ae9dc9 100644 --- a/endpoint/endpoint.go +++ b/endpoint/endpoint.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "path" + "strings" "github.com/kr/pretty" "github.com/pkg/errors" @@ -233,6 +234,21 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea // // Note further that a resuming send, due to the idempotent nature of func CreateReplicationCursor and HoldStep, // will never lose its step holds because we just (idempotently re-)created them above, before attempting the cleanup. + destroyTypes := AbstractionTypeSet{ + AbstractionStepHold: true, + AbstractionTentativeReplicationCursorBookmark: true, + } + // The replication planner can also pick an endpoint zfs abstraction as FromVersion. + // Keep it, so that the replication will succeed. + // + // NB: there is no abstraction for snapshots, so, we only need to check bookmarks. + if sendArgs.FromVersion != nil && sendArgs.FromVersion.IsBookmark() { + dp, err := zfs.NewDatasetPath(sendArgs.FS) + if err != nil { + panic(err) // sendArgs is validated, this shouldn't happen + } + liveAbs = append(liveAbs, destroyTypes.ExtractBookmark(dp, sendArgs.FromVersion)) + } func() { ctx, endSpan := trace.WithSpan(ctx, "cleanup-stale-abstractions") defer endSpan() @@ -245,35 +261,45 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea return keep } check := func(obsoleteAbs []Abstraction) { - // last line of defense: check that we don't destroy the incremental `from` and `to` - // if we did that, we might be about to blow away the last common filesystem version between sender and receiver - mustLiveVersions := []zfs.FilesystemVersion{sendArgs.ToVersion} - if sendArgs.FromVersion != nil { - mustLiveVersions = append(mustLiveVersions, *sendArgs.FromVersion) + // Ensure that we don't delete `From` or `To`. + // Regardless of whether they are in AbstractionTypeSet or not. + // And produce a nice error message in case we do, to aid debugging the resulting panic. + // + // This is especially important for `From`. We could break incremental replication + // if we deleted the last common filesystem version between sender and receiver. + type Problem struct { + sendArgsWhat string + fullpath string + obsoleteAbs Abstraction } - for _, staleVersion := range obsoleteAbs { - for _, mustLiveVersion := range mustLiveVersions { - isSendArg := zfs.FilesystemVersionEqualIdentity(mustLiveVersion, staleVersion.GetFilesystemVersion()) - stepHoldBasedGuaranteeStrategy := false - k := replicationGuaranteeStrategy.Kind() - switch k { - case ReplicationGuaranteeKindResumability: - stepHoldBasedGuaranteeStrategy = true - case ReplicationGuaranteeKindIncremental: - case ReplicationGuaranteeKindNone: - default: - panic(fmt.Sprintf("this is supposed to be an exhaustive match, got %v", k)) - } - isSnapshot := mustLiveVersion.IsSnapshot() - if isSendArg && (!isSnapshot || stepHoldBasedGuaranteeStrategy) { - panic(fmt.Sprintf("impl error: %q would be destroyed because it is considered stale but it is part of of sendArgs=%s", mustLiveVersion.String(), pretty.Sprint(sendArgs))) + problems := make([]Problem, 0) + checkFullpaths := make(map[string]string, 2) + checkFullpaths["ToVersion"] = sendArgs.ToVersion.FullPath(sendArgs.FS) + if sendArgs.FromVersion != nil { + checkFullpaths["FromVersion"] = sendArgs.FromVersion.FullPath(sendArgs.FS) + } + for _, a := range obsoleteAbs { + for what, fullpath := range checkFullpaths { + if a.GetFullPath() == fullpath && a.GetType().IsSnapshotOrBookmark() { + problems = append(problems, Problem{ + sendArgsWhat: what, + fullpath: fullpath, + obsoleteAbs: a, + }) } } } - } - destroyTypes := AbstractionTypeSet{ - AbstractionStepHold: true, - AbstractionTentativeReplicationCursorBookmark: true, + if len(problems) == 0 { + return + } + var msg strings.Builder + fmt.Fprintf(&msg, "cleaning up send stale would destroy send args:\n") + fmt.Fprintf(&msg, " SendArgs: %s\n", pretty.Sprint(sendArgs)) + for _, check := range problems { + fmt.Fprintf(&msg, "would delete %s %s because it was deemed an obsolete abstraction: %s\n", + check.sendArgsWhat, check.fullpath, check.obsoleteAbs) + } + panic(msg.String()) } abstractionsCacheSingleton.TryBatchDestroy(ctx, s.jobId, sendArgs.FS, destroyTypes, keep, check) }() diff --git a/endpoint/endpoint_guarantees.go b/endpoint/endpoint_guarantees.go index ce91550..8a125aa 100644 --- a/endpoint/endpoint_guarantees.go +++ b/endpoint/endpoint_guarantees.go @@ -89,6 +89,8 @@ func ReplicationGuaranteeFromKind(k ReplicationGuaranteeKind) ReplicationGuarant type ReplicationGuaranteeNone struct{} +func (g ReplicationGuaranteeNone) String() string { return "none" } + func (g ReplicationGuaranteeNone) Kind() ReplicationGuaranteeKind { return ReplicationGuaranteeKindNone } @@ -107,6 +109,8 @@ func (g ReplicationGuaranteeNone) SenderPostRecvConfirmed(ctx context.Context, j type ReplicationGuaranteeIncremental struct{} +func (g ReplicationGuaranteeIncremental) String() string { return "incremental" } + func (g ReplicationGuaranteeIncremental) Kind() ReplicationGuaranteeKind { return ReplicationGuaranteeKindIncremental } @@ -144,6 +148,8 @@ func (g ReplicationGuaranteeIncremental) SenderPostRecvConfirmed(ctx context.Con type ReplicationGuaranteeResumability struct{} +func (g ReplicationGuaranteeResumability) String() string { return "resumability" } + func (g ReplicationGuaranteeResumability) Kind() ReplicationGuaranteeKind { return ReplicationGuaranteeKindResumability } diff --git a/endpoint/endpoint_zfs_abstraction.go b/endpoint/endpoint_zfs_abstraction.go index a3a6981..934081c 100644 --- a/endpoint/endpoint_zfs_abstraction.go +++ b/endpoint/endpoint_zfs_abstraction.go @@ -31,7 +31,7 @@ const ( AbstractionReplicationCursorBookmarkV2 AbstractionType = "replication-cursor-bookmark-v2" ) -var AbstractionTypesAll = map[AbstractionType]bool{ +var AbstractionTypesAll = AbstractionTypeSet{ AbstractionStepHold: true, AbstractionLastReceivedHold: true, AbstractionTentativeReplicationCursorBookmark: true, @@ -181,6 +181,38 @@ func (s AbstractionTypeSet) Validate() error { return nil } +// Use the `BookmarkExtractor()` method of each abstraction type in this set +// to try extract an abstraction from the given FilesystemVersion. +// +// Abstraction types in this set that don't have a bookmark extractor are skipped. +// +// Panics if more than one abstraction type matches. +func (s AbstractionTypeSet) ExtractBookmark(dp *zfs.DatasetPath, v *zfs.FilesystemVersion) Abstraction { + matched := make(AbstractionTypeSet, 1) + var matchedAbs Abstraction + for absType := range s { + extractor := absType.BookmarkExtractor() + if extractor == nil { + continue + } + abstraction := extractor(dp, *v) + if abstraction != nil { + matched[absType] = true + matchedAbs = abstraction + } + } + if len(matched) == 0 { + return nil + } + if len(matched) == 1 { + if matchedAbs == nil { + panic("loop above should always set matchedAbs if there is a match") + } + return matchedAbs + } + panic(fmt.Sprintf("abstraction types extractors should not overlap: %s", matched)) +} + type BookmarkExtractor func(fs *zfs.DatasetPath, v zfs.FilesystemVersion) Abstraction // returns nil if the abstraction type is not bookmark-based @@ -238,6 +270,23 @@ func (t AbstractionType) BookmarkNamer() func(fs string, guid uint64, jobId JobI } } +func (t AbstractionType) IsSnapshotOrBookmark() bool { + switch t { + case AbstractionTentativeReplicationCursorBookmark: + return true + case AbstractionReplicationCursorBookmarkV1: + return true + case AbstractionReplicationCursorBookmarkV2: + return true + case AbstractionStepHold: + return false + case AbstractionLastReceivedHold: + return false + default: + panic(fmt.Sprintf("unimpl: %q", t)) + } +} + type ListZFSHoldsAndBookmarksQuery struct { FS ListZFSHoldsAndBookmarksQueryFilesystemFilter // What abstraction types should match (any contained in the set) diff --git a/platformtest/tests/generated_cases.go b/platformtest/tests/generated_cases.go index 16891aa..60443f6 100644 --- a/platformtest/tests/generated_cases.go +++ b/platformtest/tests/generated_cases.go @@ -20,6 +20,7 @@ var Cases = []Case{BatchDestroy, ReplicationIncrementalCleansUpStaleAbstractionsWithCacheOnSecondReplication, ReplicationIncrementalCleansUpStaleAbstractionsWithoutCacheOnSecondReplication, ReplicationIncrementalDestroysStepHoldsIffIncrementalStepHoldsAreDisabledButStepHoldsExist, + ReplicationIncrementalHandlesFromVersionEqTentativeCursorCorrectly, ReplicationIncrementalIsPossibleIfCommonSnapshotIsDestroyed, ReplicationInitialAll, ReplicationInitialFail, diff --git a/platformtest/tests/replication.go b/platformtest/tests/replication.go index 283737b..2c7a512 100644 --- a/platformtest/tests/replication.go +++ b/platformtest/tests/replication.go @@ -248,7 +248,10 @@ func implReplicationIncrementalCleansUpStaleAbstractions(ctx *platformtest.Conte require.NoError(ctx, err) snap2Hold, err := endpoint.HoldStep(ctx, sfs, snap2, jobId) // no shadow require.NoError(ctx, err) - return []endpoint.Abstraction{snap2Cursor, snap1Hold, snap2Hold} + // create artificial tentative cursor + snap3TentativeCursor, err := endpoint.CreateTentativeReplicationCursor(ctx, sfs, snap3, jobId) + require.NoError(ctx, err) + return []endpoint.Abstraction{snap2Cursor, snap1Hold, snap2Hold, snap3TentativeCursor} } createArtificalStaleAbstractions(sjid) ojidSendAbstractions := createArtificalStaleAbstractions(ojid) @@ -333,21 +336,29 @@ func implReplicationIncrementalCleansUpStaleAbstractions(ctx *platformtest.Conte require.NoError(ctx, err) snap2OjidCursorName, err := endpoint.ReplicationCursorBookmarkName(sfs, snap2.Guid, ojid) require.NoError(ctx, err) + snap3SjidTentativeCursorName, err := endpoint.TentativeReplicationCursorBookmarkName(sfs, snap3.Guid, sjid) + require.NoError(ctx, err) + snap3OjidTentativeCursorName, err := endpoint.TentativeReplicationCursorBookmarkName(sfs, snap3.Guid, ojid) + require.NoError(ctx, err) var bmNames []string for _, bm := range sBms { bmNames = append(bmNames, bm.Name) } if invalidateCacheBeforeSecondReplication { - require.Len(ctx, sBms, 3) + require.Len(ctx, sBms, 4) require.Contains(ctx, bmNames, snap5SjidCursorName) require.Contains(ctx, bmNames, snap2OjidCursorName) + require.Contains(ctx, bmNames, snap3OjidTentativeCursorName) require.Contains(ctx, bmNames, "2") } else { - require.Len(ctx, sBms, 4) + require.Len(ctx, sBms, 6) + ctx.Logf("%s", pretty.Sprint(sBms)) require.Contains(ctx, bmNames, snap5SjidCursorName) require.Contains(ctx, bmNames, snap2SjidCursorName) require.Contains(ctx, bmNames, snap2OjidCursorName) + require.Contains(ctx, bmNames, snap3SjidTentativeCursorName) + require.Contains(ctx, bmNames, snap3OjidTentativeCursorName) require.Contains(ctx, bmNames, "2") } } @@ -370,6 +381,84 @@ func implReplicationIncrementalCleansUpStaleAbstractions(ctx *platformtest.Conte } +func ReplicationIncrementalHandlesFromVersionEqTentativeCursorCorrectly(ctx *platformtest.Context) { + + platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, ` + CREATEROOT + + "sender" + + "sender@1" + + "receiver" + R zfs create -p "${ROOTDS}/receiver/${ROOTDS}" + `) + + sjid := endpoint.MustMakeJobID("sender-job") + rjid := endpoint.MustMakeJobID("receiver-job") + + sfs := ctx.RootDataset + "/sender" + rfsRoot := ctx.RootDataset + "/receiver" + + rep := replicationInvocation{ + sjid: sjid, + rjid: rjid, + sfs: sfs, + rfsRoot: rfsRoot, + // It doesn't really matter what guarantee we use here, as the second replication will configure another. + // But, in the real world, the only way for a stale tentative cursor to appear is if the guarantee is set to + // incremental replication and we crash before converting the tentative cursor into a regular cursor. + guarantee: pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeIncrementalReplication), + } + + // Do initial replication to set up the test. + rep1 := rep.Do(ctx) + ctx.Logf("\n%s", pretty.Sprint(rep1)) + sfsDs := mustDatasetPath(sfs) + snap1_sender := mustGetFilesystemVersion(ctx, sfs+"@1") + snap1_replicationCursor_name, err := endpoint.ReplicationCursorBookmarkName(sfs, snap1_sender.Guid, sjid) + require.NoError(ctx, err) + snap1_replicationCursor := mustGetFilesystemVersion(ctx, sfs+"#"+snap1_replicationCursor_name) + + // The second replication will be done with a guarantee kind that doesn't create tentative cursors by itself. + // So, it would generally be right to clean up any tentative cursors on sfs since they're stale abstractions. + // However, if the cursor is used as the `from` version in any send step, we must not destroy it, as that + // would break incremental replication. + // NB: we only need to test the first step as all subsequent steps will be snapshot->snapshot. + rep.guarantee = pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeNothing) + // create the artificial cursor + snap1_tentativeCursor, err := endpoint.CreateTentativeReplicationCursor(ctx, sfs, snap1_sender, sjid) + require.NoError(ctx, err) + endpoint.AbstractionsCacheInvalidate(sfs) + // remove other bookmarks of snap1, and snap1 itself, to force the replication planner to use the tentative cursor + err = zfs.ZFSDestroyFilesystemVersion(ctx, sfsDs, &snap1_sender) + require.NoError(ctx, err) + err = zfs.ZFSDestroyFilesystemVersion(ctx, sfsDs, &snap1_replicationCursor) + require.NoError(ctx, err) + versions, err := zfs.ZFSListFilesystemVersions(ctx, sfsDs, zfs.ListFilesystemVersionsOptions{}) + require.NoError(ctx, err) + require.Len(ctx, versions, 1) + require.Equal(ctx, versions[0].Guid, snap1_tentativeCursor.GetFilesystemVersion().Guid) + // create another snapshot so that replication does one incremental step `tentative_cursor` -> `@2` + mustSnapshot(ctx, sfs+"@2") + mustGetFilesystemVersion(ctx, sfs+"@2") + // do the replication + rep2 := rep.Do(ctx) + ctx.Logf("\n%s", pretty.Sprint(rep2)) + + // Ensure that the tentative cursor was used. + require.Len(ctx, rep2.Attempts, 1) + require.Equal(ctx, rep2.Attempts[0].State, report.AttemptDone) + require.Len(ctx, rep2.Attempts[0].Filesystems, 1) + require.Nil(ctx, rep2.Attempts[0].Filesystems[0].Error()) + require.Len(ctx, rep2.Attempts[0].Filesystems[0].Steps, 1) + require.EqualValues(ctx, rep2.Attempts[0].Filesystems[0].CurrentStep, 1) + require.Len(ctx, rep2.Attempts[0].Filesystems[0].Steps, 1) + require.Equal(ctx, rep2.Attempts[0].Filesystems[0].Steps[0].Info.From, snap1_tentativeCursor.GetFilesystemVersion().RelName()) + + // Ensure that the tentative cursor was destroyed as part of SendPost. + _, err = zfs.ZFSGetFilesystemVersion(ctx, snap1_replicationCursor.FullPath(sfs)) + _, ok := err.(*zfs.DatasetDoesNotExist) + require.True(ctx, ok) +} + type PartialSender struct { *endpoint.Sender failAfterByteCount int64