From b056e7b2b90a99731ec1f144441f2478d78f81ec Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 20 May 2020 13:10:37 +0200 Subject: [PATCH] [#321] endpoint: ListAbstractions: acutally emit one Abstraction per matching hold --- endpoint/endpoint_send_abstractions_cache.go | 4 + endpoint/endpoint_zfs_abstraction.go | 14 +- ...straction_cursor_and_last_received_hold.go | 26 ++- platformtest/tests/generated_cases.go | 3 + .../tests/recvForceIntoEncryptedErr.go | 45 +++++ platformtest/tests/replication.go | 189 ++++++++++++++++++ 6 files changed, 269 insertions(+), 12 deletions(-) create mode 100644 platformtest/tests/recvForceIntoEncryptedErr.go diff --git a/endpoint/endpoint_send_abstractions_cache.go b/endpoint/endpoint_send_abstractions_cache.go index beb8e40..94f4695 100644 --- a/endpoint/endpoint_send_abstractions_cache.go +++ b/endpoint/endpoint_send_abstractions_cache.go @@ -25,6 +25,10 @@ func init() { var sendAbstractionsCacheSingleton = newSendAbstractionsCache() +func SendAbstractionsCacheInvalidate(fs string) { + sendAbstractionsCacheSingleton.InvalidateFSCache(fs) +} + type sendAbstractionsCacheDidLoadFSState int const ( diff --git a/endpoint/endpoint_zfs_abstraction.go b/endpoint/endpoint_zfs_abstraction.go index 7e4bb5a..0662f9a 100644 --- a/endpoint/endpoint_zfs_abstraction.go +++ b/endpoint/endpoint_zfs_abstraction.go @@ -614,23 +614,23 @@ func listAbstractionsImplFS(ctx context.Context, fs string, query *ListZFSHoldsA panic("implementation error: extractors misconfigured for " + at) } for _, v := range fsvs { - var a Abstraction if v.Type == zfs.Bookmark && bmE != nil { - a = bmE(fsp, v) + if a := bmE(fsp, v); a != nil { + emitCandidate(a) + } } - if v.Type == zfs.Snapshot && holdE != nil && query.CreateTXG.Contains(v.GetCreateTXG()) && (!v.UserRefs.Valid || v.UserRefs.Value > 0) { + if v.Type == zfs.Snapshot && holdE != nil && query.CreateTXG.Contains(v.GetCreateTXG()) && (!v.UserRefs.Valid || v.UserRefs.Value > 0) { // FIXME review v.UserRefsValid holds, err := zfs.ZFSHolds(ctx, fsp.ToString(), v.Name) if err != nil { errCb(err, v.ToAbsPath(fsp), "get hold on snap") continue } for _, tag := range holds { - a = holdE(fsp, v, tag) + if a := holdE(fsp, v, tag); a != nil { + emitCandidate(a) + } } } - if a != nil { - emitCandidate(a) - } } } } diff --git a/endpoint/endpoint_zfs_abstraction_cursor_and_last_received_hold.go b/endpoint/endpoint_zfs_abstraction_cursor_and_last_received_hold.go index 81870c6..ec21301 100644 --- a/endpoint/endpoint_zfs_abstraction_cursor_and_last_received_hold.go +++ b/endpoint/endpoint_zfs_abstraction_cursor_and_last_received_hold.go @@ -183,22 +183,22 @@ func LastReceivedHoldTag(jobID JobID) (string, error) { } func lastReceivedHoldImpl(jobid string) (string, error) { - tag := fmt.Sprintf("zrepl_last_received_J_%s", jobid) + tag := fmt.Sprintf("%s%s", ReplicationCursorBookmarkNamePrefix, jobid) if err := zfs.ValidHoldTag(tag); err != nil { return "", err } return tag, nil } -func MoveLastReceivedHold(ctx context.Context, fs string, to zfs.FilesystemVersion, jobID JobID) error { +func CreateLastReceivedHold(ctx context.Context, fs string, to zfs.FilesystemVersion, jobID JobID) (Abstraction, error) { if !to.IsSnapshot() { - return errors.Errorf("last-received-hold: target must be a snapshot: %s", to.FullPath(fs)) + return nil, errors.Errorf("last-received-hold: target must be a snapshot: %s", to.FullPath(fs)) } tag, err := LastReceivedHoldTag(jobID) if err != nil { - return errors.Wrap(err, "last-received-hold: hold tag") + return nil, errors.Wrap(err, "last-received-hold: hold tag") } // we never want to be without a hold @@ -206,7 +206,23 @@ func MoveLastReceivedHold(ctx context.Context, fs string, to zfs.FilesystemVersi err = zfs.ZFSHold(ctx, fs, to, tag) if err != nil { - return errors.Wrap(err, "last-received-hold: hold newly received") + return nil, errors.Wrap(err, "last-received-hold: hold newly received") + } + + return &holdBasedAbstraction{ + Type: AbstractionLastReceivedHold, + FS: fs, + FilesystemVersion: to, + JobID: jobID, + Tag: tag, + }, nil +} + +func MoveLastReceivedHold(ctx context.Context, fs string, to zfs.FilesystemVersion, jobID JobID) error { + + _, err := CreateLastReceivedHold(ctx, fs, to, jobID) + if err != nil { + return err } q := ListZFSHoldsAndBookmarksQuery{ diff --git a/platformtest/tests/generated_cases.go b/platformtest/tests/generated_cases.go index a50a640..01ed559 100644 --- a/platformtest/tests/generated_cases.go +++ b/platformtest/tests/generated_cases.go @@ -13,6 +13,9 @@ var Cases = []Case{BatchDestroy, ListFilesystemVersionsUserrefs, ListFilesystemVersionsZeroExistIsNotAnError, ListFilesystemsNoFilter, + ReceiveForceIntoEncryptedErr, + ReplicationIncrementalCleansUpStaleAbstractionsWithCacheOnSecondReplication, + ReplicationIncrementalCleansUpStaleAbstractionsWithoutCacheOnSecondReplication, ReplicationIncrementalIsPossibleIfCommonSnapshotIsDestroyed, ReplicationIsResumableFullSend, ResumableRecvAndTokenHandling, diff --git a/platformtest/tests/recvForceIntoEncryptedErr.go b/platformtest/tests/recvForceIntoEncryptedErr.go new file mode 100644 index 0000000..b4ab7fb --- /dev/null +++ b/platformtest/tests/recvForceIntoEncryptedErr.go @@ -0,0 +1,45 @@ +package tests + +import ( + "fmt" + + "github.com/stretchr/testify/require" + "github.com/zrepl/zrepl/platformtest" + "github.com/zrepl/zrepl/zfs" +) + +func ReceiveForceIntoEncryptedErr(ctx *platformtest.Context) { + platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, ` + DESTROYROOT + CREATEROOT + + "foo bar" encrypted + + "sender" encrypted + + "sender@1" + `) + + rfs := fmt.Sprintf("%s/foo bar", ctx.RootDataset) + sfs := fmt.Sprintf("%s/sender", ctx.RootDataset) + sfsSnap1 := sendArgVersion(ctx, sfs, "@1") + + sendArgs, err := zfs.ZFSSendArgsUnvalidated{ + FS: sfs, + Encrypted: &zfs.NilBool{B: false}, + From: nil, + To: &sfsSnap1, + ResumeToken: "", + }.Validate(ctx) + require.NoError(ctx, err) + + sendStream, err := zfs.ZFSSend(ctx, sendArgs) + require.NoError(ctx, err) + + recvOpts := zfs.RecvOptions{ + RollbackAndForceRecv: true, + SavePartialRecvState: false, + } + err = zfs.ZFSRecv(ctx, rfs, &zfs.ZFSSendArgVersion{RelName: "@1", GUID: sfsSnap1.GUID}, sendStream, recvOpts) + require.Error(ctx, err) + re, ok := err.(*zfs.RecvDestroyOrOverwriteEncryptedErr) + require.True(ctx, ok) + require.Contains(ctx, re.Error(), "zfs receive -F cannot be used to destroy an encrypted filesystem or overwrite an unencrypted one with an encrypted on") +} diff --git a/platformtest/tests/replication.go b/platformtest/tests/replication.go index f7f5be2..8176b63 100644 --- a/platformtest/tests/replication.go +++ b/platformtest/tests/replication.go @@ -120,6 +120,195 @@ func ReplicationIncrementalIsPossibleIfCommonSnapshotIsDestroyed(ctx *platformte } +func ReplicationIncrementalCleansUpStaleAbstractionsWithCacheOnSecondReplication(ctx *platformtest.Context) { + implReplicationIncrementalCleansUpStaleAbstractions(ctx, true) +} + +func ReplicationIncrementalCleansUpStaleAbstractionsWithoutCacheOnSecondReplication(ctx *platformtest.Context) { + implReplicationIncrementalCleansUpStaleAbstractions(ctx, false) +} + +func implReplicationIncrementalCleansUpStaleAbstractions(ctx *platformtest.Context, invalidateCacheBeforeSecondReplication bool) { + + platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, ` + CREATEROOT + + "sender" + + "sender@1" + + "sender@2" + + "sender#2" "sender@2" + + "sender@3" + + "receiver" + R zfs create -p "${ROOTDS}/receiver/${ROOTDS}" + `) + + sjid := endpoint.MustMakeJobID("sender-job") + ojid := endpoint.MustMakeJobID("other-job") + rjid := endpoint.MustMakeJobID("receiver-job") + + sfs := ctx.RootDataset + "/sender" + rfsRoot := ctx.RootDataset + "/receiver" + + rep := replicationInvocation{ + sjid: sjid, + rjid: rjid, + sfs: sfs, + rfsRoot: rfsRoot, + } + rfs := rep.ReceiveSideFilesystem() + + // first replication + report := rep.Do(ctx) + ctx.Logf("\n%s", pretty.Sprint(report)) + + // assert most recent send-side version @3 exists on receiver (=replication succeeded) + rSnap3 := fsversion(ctx, rfs, "@3") + // assert the source-side versions not managed by zrepl still exist + snap1 := fsversion(ctx, sfs, "@1") + snap2 := fsversion(ctx, sfs, "@2") + _ = fsversion(ctx, sfs, "#2") // non-replicationc-cursor bookmarks should not be affected + snap3 := fsversion(ctx, sfs, "@3") + // assert a replication cursor is in place + snap3CursorName, err := endpoint.ReplicationCursorBookmarkName(sfs, snap3.Guid, sjid) + require.NoError(ctx, err) + _ = fsversion(ctx, sfs, "#"+snap3CursorName) + // assert a last-received hold is in place + expectRjidHoldTag, err := endpoint.LastReceivedHoldTag(rjid) + require.NoError(ctx, err) + holds, err := zfs.ZFSHolds(ctx, rfs, rSnap3.Name) + require.NoError(ctx, err) + require.Contains(ctx, holds, expectRjidHoldTag) + + // create artifical stale replication cursors + createArtificalStaleAbstractions := func(jobId endpoint.JobID) []endpoint.Abstraction { + snap2Cursor, err := endpoint.CreateReplicationCursor(ctx, sfs, snap2, jobId) // no shadow + require.NoError(ctx, err) + // create artifical stale step holds jobId + snap1Hold, err := endpoint.HoldStep(ctx, sfs, snap1, jobId) // no shadow + require.NoError(ctx, err) + snap2Hold, err := endpoint.HoldStep(ctx, sfs, snap2, jobId) // no shadow + require.NoError(ctx, err) + return []endpoint.Abstraction{snap2Cursor, snap1Hold, snap2Hold} + } + createArtificalStaleAbstractions(sjid) + ojidSendAbstractions := createArtificalStaleAbstractions(ojid) + + snap3ojidLastReceivedHold, err := endpoint.CreateLastReceivedHold(ctx, rfs, fsversion(ctx, rfs, "@3"), ojid) + require.NoError(ctx, err) + require.True(ctx, zfs.FilesystemVersionEqualIdentity(fsversion(ctx, rfs, "@3"), snap3ojidLastReceivedHold.GetFilesystemVersion())) + + // take another 2 snapshots + mustSnapshot(ctx, sfs+"@4") + mustSnapshot(ctx, sfs+"@5") + snap5 := fsversion(ctx, sfs, "@5") + + if invalidateCacheBeforeSecondReplication { + endpoint.SendAbstractionsCacheInvalidate(sfs) + } + + // do another replication + // - ojid's abstractions should not be affected on either side + // - stale abstractions of sjid and rjid should be cleaned up + // - 1 replication cursors and 1 last-received hold should be present + + checkOjidAbstractionsExist := func() { + var expectedOjidAbstractions []endpoint.Abstraction + expectedOjidAbstractions = append(expectedOjidAbstractions, ojidSendAbstractions...) + expectedOjidAbstractions = append(expectedOjidAbstractions, snap3ojidLastReceivedHold) + + sfsAndRfsFilter := filters.NewDatasetMapFilter(2, true) + require.NoError(ctx, sfsAndRfsFilter.Add(sfs, "ok")) + require.NoError(ctx, sfsAndRfsFilter.Add(rfs, "ok")) + rAbs, rAbsErrs, err := endpoint.ListAbstractions(ctx, endpoint.ListZFSHoldsAndBookmarksQuery{ + FS: endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{Filter: sfsAndRfsFilter}, + JobID: &ojid, + What: endpoint.AbstractionTypesAll, + Concurrency: 1, + }) + require.NoError(ctx, err) + require.Len(ctx, rAbsErrs, 0) + ctx.Logf("rAbs=%s", rAbs) + ctx.Logf("expectedOjidAbstractions=%s", expectedOjidAbstractions) + require.Equal(ctx, len(expectedOjidAbstractions), len(rAbs)) + for _, ea := range expectedOjidAbstractions { + ctx.Logf("looking for %s %#v", ea, ea.GetFilesystemVersion()) + found := false + for _, a := range rAbs { + eq := endpoint.AbstractionEquals(ea, a) + ctx.Logf("comp=%v for %s %#v", eq, a, a.GetFilesystemVersion()) + found = found || eq + } + require.True(ctx, found, "%s", ea) + } + } + checkOjidAbstractionsExist() + + report = rep.Do(ctx) + ctx.Logf("\n%s", pretty.Sprint(report)) + + checkOjidAbstractionsExist() + + _ = fsversion(ctx, sfs, "@1") + _ = fsversion(ctx, sfs, "@2") + _ = fsversion(ctx, sfs, "#2") + _ = fsversion(ctx, sfs, "@3") + _ = fsversion(ctx, sfs, "@4") + _ = fsversion(ctx, sfs, "@5") + + _ = fsversion(ctx, rfs, "@3") + _ = fsversion(ctx, rfs, "@4") + _ = fsversion(ctx, rfs, "@5") + + // check bookmark situation + { + sBms, err := zfs.ZFSListFilesystemVersions(ctx, mustDatasetPath(sfs), zfs.ListFilesystemVersionsOptions{ + Types: zfs.Bookmarks, + }) + ctx.Logf("sbms=%s", sBms) + require.NoError(ctx, err) + + snap5SjidCursorName, err := endpoint.ReplicationCursorBookmarkName(sfs, snap5.Guid, sjid) + require.NoError(ctx, err) + snap2SjidCursorName, err := endpoint.ReplicationCursorBookmarkName(sfs, snap2.Guid, sjid) + require.NoError(ctx, err) + snap2OjidCursorName, err := endpoint.ReplicationCursorBookmarkName(sfs, snap2.Guid, ojid) + require.NoError(ctx, err) + var bmNames []string + for _, bm := range sBms { + bmNames = append(bmNames, bm.Name) + } + + if invalidateCacheBeforeSecondReplication { + require.Len(ctx, sBms, 3) + require.Contains(ctx, bmNames, snap5SjidCursorName) + require.Contains(ctx, bmNames, snap2OjidCursorName) + require.Contains(ctx, bmNames, "2") + } else { + require.Len(ctx, sBms, 4) + require.Contains(ctx, bmNames, snap5SjidCursorName) + require.Contains(ctx, bmNames, snap2SjidCursorName) + require.Contains(ctx, bmNames, snap2OjidCursorName) + require.Contains(ctx, bmNames, "2") + } + } + + // check last-received hold moved + { + rAbs, rAbsErrs, err := endpoint.ListAbstractions(ctx, endpoint.ListZFSHoldsAndBookmarksQuery{ + FS: endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{FS: &rfs}, + JobID: &rjid, + What: endpoint.AbstractionTypesAll, + Concurrency: 1, + }) + require.NoError(ctx, err) + require.Len(ctx, rAbsErrs, 0) + require.Len(ctx, rAbs, 1) + require.Equal(ctx, rAbs[0].GetType(), endpoint.AbstractionLastReceivedHold) + require.Equal(ctx, *rAbs[0].GetJobID(), rjid) + require.Equal(ctx, rAbs[0].GetFilesystemVersion().GetGuid(), snap5.GetGuid()) + } + +} + type PartialSender struct { *endpoint.Sender failAfterByteCount int64