mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-21 16:03:32 +01:00
fix handling of tenative cursor presence if protection strategy doesn't use it (#714)
Before this PR, we would panic in the `check` phase of `endpoint.Send()`'s `TryBatchDestroy` call in the following cases: the current protection strategy does NOT produce a tentative replication cursor AND * `FromVersion` is a tentative cursor bookmark * `FromVersion` is a snapshot, and there exists a tentative cursor bookmark for that snapshot * `FromVersion` is a bookmark != tentative cursor bookmark, but there exists a tentative cursor bookmark for the same snapshot as the `FromVersion` bookmark In those cases, the `check` concluded that we would delete `FromVersion`. It came to that conclusion because the tentative cursor isn't part of `obsoleteAbs` if the protection strategy doesn't produce a tentative replication cursor. The scenarios above can happen if the user changes the protection strategy from "with tentative cursor" to one "without tentative replication cursor", while there is a tentative replication cursor on disk. The workaround was to rename the tentative cursor. In all cases above, `TryBatchDestroy` would have destroyed the tentative cursor. In case 1, that would fail the `Send` step and potentially break replication if the cursor is the last common bookmark. The `check` conclusion was correct. In cases 2 and 3, deleting the tentative cursor would have been fine because `FromVersion` was a different entity than the tentative cursor. So, destroying the tentative cursor would be the right call. The solution in this PR is as follows: * add the `FromVersion` to the `liveAbs` set of live abstractions * rewrite the `check` closure to use the full dataset path (`fullpath`) to identify the concrete ZFS object instead of the `zfs.FilesystemVersionEqualIdentity`, which is only identified by matching GUID. * Holds have no dataset path and are not the `FromVersion` in any case, so disregard them. fixes #666
This commit is contained in:
parent
bc5e1ede04
commit
bbdc6f5465
@ -7,6 +7,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
"github.com/kr/pretty"
|
||||
"github.com/pkg/errors"
|
||||
@ -233,6 +234,21 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
|
||||
//
|
||||
// Note further that a resuming send, due to the idempotent nature of func CreateReplicationCursor and HoldStep,
|
||||
// will never lose its step holds because we just (idempotently re-)created them above, before attempting the cleanup.
|
||||
destroyTypes := AbstractionTypeSet{
|
||||
AbstractionStepHold: true,
|
||||
AbstractionTentativeReplicationCursorBookmark: true,
|
||||
}
|
||||
// The replication planner can also pick an endpoint zfs abstraction as FromVersion.
|
||||
// Keep it, so that the replication will succeed.
|
||||
//
|
||||
// NB: there is no abstraction for snapshots, so, we only need to check bookmarks.
|
||||
if sendArgs.FromVersion != nil && sendArgs.FromVersion.IsBookmark() {
|
||||
dp, err := zfs.NewDatasetPath(sendArgs.FS)
|
||||
if err != nil {
|
||||
panic(err) // sendArgs is validated, this shouldn't happen
|
||||
}
|
||||
liveAbs = append(liveAbs, destroyTypes.ExtractBookmark(dp, sendArgs.FromVersion))
|
||||
}
|
||||
func() {
|
||||
ctx, endSpan := trace.WithSpan(ctx, "cleanup-stale-abstractions")
|
||||
defer endSpan()
|
||||
@ -245,35 +261,45 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
|
||||
return keep
|
||||
}
|
||||
check := func(obsoleteAbs []Abstraction) {
|
||||
// last line of defense: check that we don't destroy the incremental `from` and `to`
|
||||
// if we did that, we might be about to blow away the last common filesystem version between sender and receiver
|
||||
mustLiveVersions := []zfs.FilesystemVersion{sendArgs.ToVersion}
|
||||
if sendArgs.FromVersion != nil {
|
||||
mustLiveVersions = append(mustLiveVersions, *sendArgs.FromVersion)
|
||||
// Ensure that we don't delete `From` or `To`.
|
||||
// Regardless of whether they are in AbstractionTypeSet or not.
|
||||
// And produce a nice error message in case we do, to aid debugging the resulting panic.
|
||||
//
|
||||
// This is especially important for `From`. We could break incremental replication
|
||||
// if we deleted the last common filesystem version between sender and receiver.
|
||||
type Problem struct {
|
||||
sendArgsWhat string
|
||||
fullpath string
|
||||
obsoleteAbs Abstraction
|
||||
}
|
||||
for _, staleVersion := range obsoleteAbs {
|
||||
for _, mustLiveVersion := range mustLiveVersions {
|
||||
isSendArg := zfs.FilesystemVersionEqualIdentity(mustLiveVersion, staleVersion.GetFilesystemVersion())
|
||||
stepHoldBasedGuaranteeStrategy := false
|
||||
k := replicationGuaranteeStrategy.Kind()
|
||||
switch k {
|
||||
case ReplicationGuaranteeKindResumability:
|
||||
stepHoldBasedGuaranteeStrategy = true
|
||||
case ReplicationGuaranteeKindIncremental:
|
||||
case ReplicationGuaranteeKindNone:
|
||||
default:
|
||||
panic(fmt.Sprintf("this is supposed to be an exhaustive match, got %v", k))
|
||||
}
|
||||
isSnapshot := mustLiveVersion.IsSnapshot()
|
||||
if isSendArg && (!isSnapshot || stepHoldBasedGuaranteeStrategy) {
|
||||
panic(fmt.Sprintf("impl error: %q would be destroyed because it is considered stale but it is part of of sendArgs=%s", mustLiveVersion.String(), pretty.Sprint(sendArgs)))
|
||||
problems := make([]Problem, 0)
|
||||
checkFullpaths := make(map[string]string, 2)
|
||||
checkFullpaths["ToVersion"] = sendArgs.ToVersion.FullPath(sendArgs.FS)
|
||||
if sendArgs.FromVersion != nil {
|
||||
checkFullpaths["FromVersion"] = sendArgs.FromVersion.FullPath(sendArgs.FS)
|
||||
}
|
||||
for _, a := range obsoleteAbs {
|
||||
for what, fullpath := range checkFullpaths {
|
||||
if a.GetFullPath() == fullpath && a.GetType().IsSnapshotOrBookmark() {
|
||||
problems = append(problems, Problem{
|
||||
sendArgsWhat: what,
|
||||
fullpath: fullpath,
|
||||
obsoleteAbs: a,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
destroyTypes := AbstractionTypeSet{
|
||||
AbstractionStepHold: true,
|
||||
AbstractionTentativeReplicationCursorBookmark: true,
|
||||
if len(problems) == 0 {
|
||||
return
|
||||
}
|
||||
var msg strings.Builder
|
||||
fmt.Fprintf(&msg, "cleaning up send stale would destroy send args:\n")
|
||||
fmt.Fprintf(&msg, " SendArgs: %s\n", pretty.Sprint(sendArgs))
|
||||
for _, check := range problems {
|
||||
fmt.Fprintf(&msg, "would delete %s %s because it was deemed an obsolete abstraction: %s\n",
|
||||
check.sendArgsWhat, check.fullpath, check.obsoleteAbs)
|
||||
}
|
||||
panic(msg.String())
|
||||
}
|
||||
abstractionsCacheSingleton.TryBatchDestroy(ctx, s.jobId, sendArgs.FS, destroyTypes, keep, check)
|
||||
}()
|
||||
|
@ -89,6 +89,8 @@ func ReplicationGuaranteeFromKind(k ReplicationGuaranteeKind) ReplicationGuarant
|
||||
|
||||
type ReplicationGuaranteeNone struct{}
|
||||
|
||||
func (g ReplicationGuaranteeNone) String() string { return "none" }
|
||||
|
||||
func (g ReplicationGuaranteeNone) Kind() ReplicationGuaranteeKind {
|
||||
return ReplicationGuaranteeKindNone
|
||||
}
|
||||
@ -107,6 +109,8 @@ func (g ReplicationGuaranteeNone) SenderPostRecvConfirmed(ctx context.Context, j
|
||||
|
||||
type ReplicationGuaranteeIncremental struct{}
|
||||
|
||||
func (g ReplicationGuaranteeIncremental) String() string { return "incremental" }
|
||||
|
||||
func (g ReplicationGuaranteeIncremental) Kind() ReplicationGuaranteeKind {
|
||||
return ReplicationGuaranteeKindIncremental
|
||||
}
|
||||
@ -144,6 +148,8 @@ func (g ReplicationGuaranteeIncremental) SenderPostRecvConfirmed(ctx context.Con
|
||||
|
||||
type ReplicationGuaranteeResumability struct{}
|
||||
|
||||
func (g ReplicationGuaranteeResumability) String() string { return "resumability" }
|
||||
|
||||
func (g ReplicationGuaranteeResumability) Kind() ReplicationGuaranteeKind {
|
||||
return ReplicationGuaranteeKindResumability
|
||||
}
|
||||
|
@ -31,7 +31,7 @@ const (
|
||||
AbstractionReplicationCursorBookmarkV2 AbstractionType = "replication-cursor-bookmark-v2"
|
||||
)
|
||||
|
||||
var AbstractionTypesAll = map[AbstractionType]bool{
|
||||
var AbstractionTypesAll = AbstractionTypeSet{
|
||||
AbstractionStepHold: true,
|
||||
AbstractionLastReceivedHold: true,
|
||||
AbstractionTentativeReplicationCursorBookmark: true,
|
||||
@ -181,6 +181,38 @@ func (s AbstractionTypeSet) Validate() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Use the `BookmarkExtractor()` method of each abstraction type in this set
|
||||
// to try extract an abstraction from the given FilesystemVersion.
|
||||
//
|
||||
// Abstraction types in this set that don't have a bookmark extractor are skipped.
|
||||
//
|
||||
// Panics if more than one abstraction type matches.
|
||||
func (s AbstractionTypeSet) ExtractBookmark(dp *zfs.DatasetPath, v *zfs.FilesystemVersion) Abstraction {
|
||||
matched := make(AbstractionTypeSet, 1)
|
||||
var matchedAbs Abstraction
|
||||
for absType := range s {
|
||||
extractor := absType.BookmarkExtractor()
|
||||
if extractor == nil {
|
||||
continue
|
||||
}
|
||||
abstraction := extractor(dp, *v)
|
||||
if abstraction != nil {
|
||||
matched[absType] = true
|
||||
matchedAbs = abstraction
|
||||
}
|
||||
}
|
||||
if len(matched) == 0 {
|
||||
return nil
|
||||
}
|
||||
if len(matched) == 1 {
|
||||
if matchedAbs == nil {
|
||||
panic("loop above should always set matchedAbs if there is a match")
|
||||
}
|
||||
return matchedAbs
|
||||
}
|
||||
panic(fmt.Sprintf("abstraction types extractors should not overlap: %s", matched))
|
||||
}
|
||||
|
||||
type BookmarkExtractor func(fs *zfs.DatasetPath, v zfs.FilesystemVersion) Abstraction
|
||||
|
||||
// returns nil if the abstraction type is not bookmark-based
|
||||
@ -238,6 +270,23 @@ func (t AbstractionType) BookmarkNamer() func(fs string, guid uint64, jobId JobI
|
||||
}
|
||||
}
|
||||
|
||||
func (t AbstractionType) IsSnapshotOrBookmark() bool {
|
||||
switch t {
|
||||
case AbstractionTentativeReplicationCursorBookmark:
|
||||
return true
|
||||
case AbstractionReplicationCursorBookmarkV1:
|
||||
return true
|
||||
case AbstractionReplicationCursorBookmarkV2:
|
||||
return true
|
||||
case AbstractionStepHold:
|
||||
return false
|
||||
case AbstractionLastReceivedHold:
|
||||
return false
|
||||
default:
|
||||
panic(fmt.Sprintf("unimpl: %q", t))
|
||||
}
|
||||
}
|
||||
|
||||
type ListZFSHoldsAndBookmarksQuery struct {
|
||||
FS ListZFSHoldsAndBookmarksQueryFilesystemFilter
|
||||
// What abstraction types should match (any contained in the set)
|
||||
|
@ -20,6 +20,7 @@ var Cases = []Case{BatchDestroy,
|
||||
ReplicationIncrementalCleansUpStaleAbstractionsWithCacheOnSecondReplication,
|
||||
ReplicationIncrementalCleansUpStaleAbstractionsWithoutCacheOnSecondReplication,
|
||||
ReplicationIncrementalDestroysStepHoldsIffIncrementalStepHoldsAreDisabledButStepHoldsExist,
|
||||
ReplicationIncrementalHandlesFromVersionEqTentativeCursorCorrectly,
|
||||
ReplicationIncrementalIsPossibleIfCommonSnapshotIsDestroyed,
|
||||
ReplicationInitialAll,
|
||||
ReplicationInitialFail,
|
||||
|
@ -248,7 +248,10 @@ func implReplicationIncrementalCleansUpStaleAbstractions(ctx *platformtest.Conte
|
||||
require.NoError(ctx, err)
|
||||
snap2Hold, err := endpoint.HoldStep(ctx, sfs, snap2, jobId) // no shadow
|
||||
require.NoError(ctx, err)
|
||||
return []endpoint.Abstraction{snap2Cursor, snap1Hold, snap2Hold}
|
||||
// create artificial tentative cursor
|
||||
snap3TentativeCursor, err := endpoint.CreateTentativeReplicationCursor(ctx, sfs, snap3, jobId)
|
||||
require.NoError(ctx, err)
|
||||
return []endpoint.Abstraction{snap2Cursor, snap1Hold, snap2Hold, snap3TentativeCursor}
|
||||
}
|
||||
createArtificalStaleAbstractions(sjid)
|
||||
ojidSendAbstractions := createArtificalStaleAbstractions(ojid)
|
||||
@ -333,21 +336,29 @@ func implReplicationIncrementalCleansUpStaleAbstractions(ctx *platformtest.Conte
|
||||
require.NoError(ctx, err)
|
||||
snap2OjidCursorName, err := endpoint.ReplicationCursorBookmarkName(sfs, snap2.Guid, ojid)
|
||||
require.NoError(ctx, err)
|
||||
snap3SjidTentativeCursorName, err := endpoint.TentativeReplicationCursorBookmarkName(sfs, snap3.Guid, sjid)
|
||||
require.NoError(ctx, err)
|
||||
snap3OjidTentativeCursorName, err := endpoint.TentativeReplicationCursorBookmarkName(sfs, snap3.Guid, ojid)
|
||||
require.NoError(ctx, err)
|
||||
var bmNames []string
|
||||
for _, bm := range sBms {
|
||||
bmNames = append(bmNames, bm.Name)
|
||||
}
|
||||
|
||||
if invalidateCacheBeforeSecondReplication {
|
||||
require.Len(ctx, sBms, 3)
|
||||
require.Len(ctx, sBms, 4)
|
||||
require.Contains(ctx, bmNames, snap5SjidCursorName)
|
||||
require.Contains(ctx, bmNames, snap2OjidCursorName)
|
||||
require.Contains(ctx, bmNames, snap3OjidTentativeCursorName)
|
||||
require.Contains(ctx, bmNames, "2")
|
||||
} else {
|
||||
require.Len(ctx, sBms, 4)
|
||||
require.Len(ctx, sBms, 6)
|
||||
ctx.Logf("%s", pretty.Sprint(sBms))
|
||||
require.Contains(ctx, bmNames, snap5SjidCursorName)
|
||||
require.Contains(ctx, bmNames, snap2SjidCursorName)
|
||||
require.Contains(ctx, bmNames, snap2OjidCursorName)
|
||||
require.Contains(ctx, bmNames, snap3SjidTentativeCursorName)
|
||||
require.Contains(ctx, bmNames, snap3OjidTentativeCursorName)
|
||||
require.Contains(ctx, bmNames, "2")
|
||||
}
|
||||
}
|
||||
@ -370,6 +381,84 @@ func implReplicationIncrementalCleansUpStaleAbstractions(ctx *platformtest.Conte
|
||||
|
||||
}
|
||||
|
||||
func ReplicationIncrementalHandlesFromVersionEqTentativeCursorCorrectly(ctx *platformtest.Context) {
|
||||
|
||||
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
|
||||
CREATEROOT
|
||||
+ "sender"
|
||||
+ "sender@1"
|
||||
+ "receiver"
|
||||
R zfs create -p "${ROOTDS}/receiver/${ROOTDS}"
|
||||
`)
|
||||
|
||||
sjid := endpoint.MustMakeJobID("sender-job")
|
||||
rjid := endpoint.MustMakeJobID("receiver-job")
|
||||
|
||||
sfs := ctx.RootDataset + "/sender"
|
||||
rfsRoot := ctx.RootDataset + "/receiver"
|
||||
|
||||
rep := replicationInvocation{
|
||||
sjid: sjid,
|
||||
rjid: rjid,
|
||||
sfs: sfs,
|
||||
rfsRoot: rfsRoot,
|
||||
// It doesn't really matter what guarantee we use here, as the second replication will configure another.
|
||||
// But, in the real world, the only way for a stale tentative cursor to appear is if the guarantee is set to
|
||||
// incremental replication and we crash before converting the tentative cursor into a regular cursor.
|
||||
guarantee: pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeIncrementalReplication),
|
||||
}
|
||||
|
||||
// Do initial replication to set up the test.
|
||||
rep1 := rep.Do(ctx)
|
||||
ctx.Logf("\n%s", pretty.Sprint(rep1))
|
||||
sfsDs := mustDatasetPath(sfs)
|
||||
snap1_sender := mustGetFilesystemVersion(ctx, sfs+"@1")
|
||||
snap1_replicationCursor_name, err := endpoint.ReplicationCursorBookmarkName(sfs, snap1_sender.Guid, sjid)
|
||||
require.NoError(ctx, err)
|
||||
snap1_replicationCursor := mustGetFilesystemVersion(ctx, sfs+"#"+snap1_replicationCursor_name)
|
||||
|
||||
// The second replication will be done with a guarantee kind that doesn't create tentative cursors by itself.
|
||||
// So, it would generally be right to clean up any tentative cursors on sfs since they're stale abstractions.
|
||||
// However, if the cursor is used as the `from` version in any send step, we must not destroy it, as that
|
||||
// would break incremental replication.
|
||||
// NB: we only need to test the first step as all subsequent steps will be snapshot->snapshot.
|
||||
rep.guarantee = pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeNothing)
|
||||
// create the artificial cursor
|
||||
snap1_tentativeCursor, err := endpoint.CreateTentativeReplicationCursor(ctx, sfs, snap1_sender, sjid)
|
||||
require.NoError(ctx, err)
|
||||
endpoint.AbstractionsCacheInvalidate(sfs)
|
||||
// remove other bookmarks of snap1, and snap1 itself, to force the replication planner to use the tentative cursor
|
||||
err = zfs.ZFSDestroyFilesystemVersion(ctx, sfsDs, &snap1_sender)
|
||||
require.NoError(ctx, err)
|
||||
err = zfs.ZFSDestroyFilesystemVersion(ctx, sfsDs, &snap1_replicationCursor)
|
||||
require.NoError(ctx, err)
|
||||
versions, err := zfs.ZFSListFilesystemVersions(ctx, sfsDs, zfs.ListFilesystemVersionsOptions{})
|
||||
require.NoError(ctx, err)
|
||||
require.Len(ctx, versions, 1)
|
||||
require.Equal(ctx, versions[0].Guid, snap1_tentativeCursor.GetFilesystemVersion().Guid)
|
||||
// create another snapshot so that replication does one incremental step `tentative_cursor` -> `@2`
|
||||
mustSnapshot(ctx, sfs+"@2")
|
||||
mustGetFilesystemVersion(ctx, sfs+"@2")
|
||||
// do the replication
|
||||
rep2 := rep.Do(ctx)
|
||||
ctx.Logf("\n%s", pretty.Sprint(rep2))
|
||||
|
||||
// Ensure that the tentative cursor was used.
|
||||
require.Len(ctx, rep2.Attempts, 1)
|
||||
require.Equal(ctx, rep2.Attempts[0].State, report.AttemptDone)
|
||||
require.Len(ctx, rep2.Attempts[0].Filesystems, 1)
|
||||
require.Nil(ctx, rep2.Attempts[0].Filesystems[0].Error())
|
||||
require.Len(ctx, rep2.Attempts[0].Filesystems[0].Steps, 1)
|
||||
require.EqualValues(ctx, rep2.Attempts[0].Filesystems[0].CurrentStep, 1)
|
||||
require.Len(ctx, rep2.Attempts[0].Filesystems[0].Steps, 1)
|
||||
require.Equal(ctx, rep2.Attempts[0].Filesystems[0].Steps[0].Info.From, snap1_tentativeCursor.GetFilesystemVersion().RelName())
|
||||
|
||||
// Ensure that the tentative cursor was destroyed as part of SendPost.
|
||||
_, err = zfs.ZFSGetFilesystemVersion(ctx, snap1_replicationCursor.FullPath(sfs))
|
||||
_, ok := err.(*zfs.DatasetDoesNotExist)
|
||||
require.True(ctx, ok)
|
||||
}
|
||||
|
||||
type PartialSender struct {
|
||||
*endpoint.Sender
|
||||
failAfterByteCount int64
|
||||
|
Loading…
Reference in New Issue
Block a user