mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-22 08:23:50 +01:00
[#321] endpoint: ListAbstractions: acutally emit one Abstraction per matching hold
This commit is contained in:
parent
6e927f20f9
commit
b056e7b2b9
@ -25,6 +25,10 @@ func init() {
|
|||||||
|
|
||||||
var sendAbstractionsCacheSingleton = newSendAbstractionsCache()
|
var sendAbstractionsCacheSingleton = newSendAbstractionsCache()
|
||||||
|
|
||||||
|
func SendAbstractionsCacheInvalidate(fs string) {
|
||||||
|
sendAbstractionsCacheSingleton.InvalidateFSCache(fs)
|
||||||
|
}
|
||||||
|
|
||||||
type sendAbstractionsCacheDidLoadFSState int
|
type sendAbstractionsCacheDidLoadFSState int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -614,23 +614,23 @@ func listAbstractionsImplFS(ctx context.Context, fs string, query *ListZFSHoldsA
|
|||||||
panic("implementation error: extractors misconfigured for " + at)
|
panic("implementation error: extractors misconfigured for " + at)
|
||||||
}
|
}
|
||||||
for _, v := range fsvs {
|
for _, v := range fsvs {
|
||||||
var a Abstraction
|
|
||||||
if v.Type == zfs.Bookmark && bmE != nil {
|
if v.Type == zfs.Bookmark && bmE != nil {
|
||||||
a = bmE(fsp, v)
|
if a := bmE(fsp, v); a != nil {
|
||||||
|
emitCandidate(a)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if v.Type == zfs.Snapshot && holdE != nil && query.CreateTXG.Contains(v.GetCreateTXG()) && (!v.UserRefs.Valid || v.UserRefs.Value > 0) {
|
if v.Type == zfs.Snapshot && holdE != nil && query.CreateTXG.Contains(v.GetCreateTXG()) && (!v.UserRefs.Valid || v.UserRefs.Value > 0) { // FIXME review v.UserRefsValid
|
||||||
holds, err := zfs.ZFSHolds(ctx, fsp.ToString(), v.Name)
|
holds, err := zfs.ZFSHolds(ctx, fsp.ToString(), v.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errCb(err, v.ToAbsPath(fsp), "get hold on snap")
|
errCb(err, v.ToAbsPath(fsp), "get hold on snap")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, tag := range holds {
|
for _, tag := range holds {
|
||||||
a = holdE(fsp, v, tag)
|
if a := holdE(fsp, v, tag); a != nil {
|
||||||
|
emitCandidate(a)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if a != nil {
|
|
||||||
emitCandidate(a)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -183,22 +183,22 @@ func LastReceivedHoldTag(jobID JobID) (string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func lastReceivedHoldImpl(jobid string) (string, error) {
|
func lastReceivedHoldImpl(jobid string) (string, error) {
|
||||||
tag := fmt.Sprintf("zrepl_last_received_J_%s", jobid)
|
tag := fmt.Sprintf("%s%s", ReplicationCursorBookmarkNamePrefix, jobid)
|
||||||
if err := zfs.ValidHoldTag(tag); err != nil {
|
if err := zfs.ValidHoldTag(tag); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
return tag, nil
|
return tag, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func MoveLastReceivedHold(ctx context.Context, fs string, to zfs.FilesystemVersion, jobID JobID) error {
|
func CreateLastReceivedHold(ctx context.Context, fs string, to zfs.FilesystemVersion, jobID JobID) (Abstraction, error) {
|
||||||
|
|
||||||
if !to.IsSnapshot() {
|
if !to.IsSnapshot() {
|
||||||
return errors.Errorf("last-received-hold: target must be a snapshot: %s", to.FullPath(fs))
|
return nil, errors.Errorf("last-received-hold: target must be a snapshot: %s", to.FullPath(fs))
|
||||||
}
|
}
|
||||||
|
|
||||||
tag, err := LastReceivedHoldTag(jobID)
|
tag, err := LastReceivedHoldTag(jobID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "last-received-hold: hold tag")
|
return nil, errors.Wrap(err, "last-received-hold: hold tag")
|
||||||
}
|
}
|
||||||
|
|
||||||
// we never want to be without a hold
|
// we never want to be without a hold
|
||||||
@ -206,7 +206,23 @@ func MoveLastReceivedHold(ctx context.Context, fs string, to zfs.FilesystemVersi
|
|||||||
|
|
||||||
err = zfs.ZFSHold(ctx, fs, to, tag)
|
err = zfs.ZFSHold(ctx, fs, to, tag)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "last-received-hold: hold newly received")
|
return nil, errors.Wrap(err, "last-received-hold: hold newly received")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &holdBasedAbstraction{
|
||||||
|
Type: AbstractionLastReceivedHold,
|
||||||
|
FS: fs,
|
||||||
|
FilesystemVersion: to,
|
||||||
|
JobID: jobID,
|
||||||
|
Tag: tag,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func MoveLastReceivedHold(ctx context.Context, fs string, to zfs.FilesystemVersion, jobID JobID) error {
|
||||||
|
|
||||||
|
_, err := CreateLastReceivedHold(ctx, fs, to, jobID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
q := ListZFSHoldsAndBookmarksQuery{
|
q := ListZFSHoldsAndBookmarksQuery{
|
||||||
|
@ -13,6 +13,9 @@ var Cases = []Case{BatchDestroy,
|
|||||||
ListFilesystemVersionsUserrefs,
|
ListFilesystemVersionsUserrefs,
|
||||||
ListFilesystemVersionsZeroExistIsNotAnError,
|
ListFilesystemVersionsZeroExistIsNotAnError,
|
||||||
ListFilesystemsNoFilter,
|
ListFilesystemsNoFilter,
|
||||||
|
ReceiveForceIntoEncryptedErr,
|
||||||
|
ReplicationIncrementalCleansUpStaleAbstractionsWithCacheOnSecondReplication,
|
||||||
|
ReplicationIncrementalCleansUpStaleAbstractionsWithoutCacheOnSecondReplication,
|
||||||
ReplicationIncrementalIsPossibleIfCommonSnapshotIsDestroyed,
|
ReplicationIncrementalIsPossibleIfCommonSnapshotIsDestroyed,
|
||||||
ReplicationIsResumableFullSend,
|
ReplicationIsResumableFullSend,
|
||||||
ResumableRecvAndTokenHandling,
|
ResumableRecvAndTokenHandling,
|
||||||
|
45
platformtest/tests/recvForceIntoEncryptedErr.go
Normal file
45
platformtest/tests/recvForceIntoEncryptedErr.go
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
package tests
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"github.com/zrepl/zrepl/platformtest"
|
||||||
|
"github.com/zrepl/zrepl/zfs"
|
||||||
|
)
|
||||||
|
|
||||||
|
func ReceiveForceIntoEncryptedErr(ctx *platformtest.Context) {
|
||||||
|
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
|
||||||
|
DESTROYROOT
|
||||||
|
CREATEROOT
|
||||||
|
+ "foo bar" encrypted
|
||||||
|
+ "sender" encrypted
|
||||||
|
+ "sender@1"
|
||||||
|
`)
|
||||||
|
|
||||||
|
rfs := fmt.Sprintf("%s/foo bar", ctx.RootDataset)
|
||||||
|
sfs := fmt.Sprintf("%s/sender", ctx.RootDataset)
|
||||||
|
sfsSnap1 := sendArgVersion(ctx, sfs, "@1")
|
||||||
|
|
||||||
|
sendArgs, err := zfs.ZFSSendArgsUnvalidated{
|
||||||
|
FS: sfs,
|
||||||
|
Encrypted: &zfs.NilBool{B: false},
|
||||||
|
From: nil,
|
||||||
|
To: &sfsSnap1,
|
||||||
|
ResumeToken: "",
|
||||||
|
}.Validate(ctx)
|
||||||
|
require.NoError(ctx, err)
|
||||||
|
|
||||||
|
sendStream, err := zfs.ZFSSend(ctx, sendArgs)
|
||||||
|
require.NoError(ctx, err)
|
||||||
|
|
||||||
|
recvOpts := zfs.RecvOptions{
|
||||||
|
RollbackAndForceRecv: true,
|
||||||
|
SavePartialRecvState: false,
|
||||||
|
}
|
||||||
|
err = zfs.ZFSRecv(ctx, rfs, &zfs.ZFSSendArgVersion{RelName: "@1", GUID: sfsSnap1.GUID}, sendStream, recvOpts)
|
||||||
|
require.Error(ctx, err)
|
||||||
|
re, ok := err.(*zfs.RecvDestroyOrOverwriteEncryptedErr)
|
||||||
|
require.True(ctx, ok)
|
||||||
|
require.Contains(ctx, re.Error(), "zfs receive -F cannot be used to destroy an encrypted filesystem or overwrite an unencrypted one with an encrypted on")
|
||||||
|
}
|
@ -120,6 +120,195 @@ func ReplicationIncrementalIsPossibleIfCommonSnapshotIsDestroyed(ctx *platformte
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ReplicationIncrementalCleansUpStaleAbstractionsWithCacheOnSecondReplication(ctx *platformtest.Context) {
|
||||||
|
implReplicationIncrementalCleansUpStaleAbstractions(ctx, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func ReplicationIncrementalCleansUpStaleAbstractionsWithoutCacheOnSecondReplication(ctx *platformtest.Context) {
|
||||||
|
implReplicationIncrementalCleansUpStaleAbstractions(ctx, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func implReplicationIncrementalCleansUpStaleAbstractions(ctx *platformtest.Context, invalidateCacheBeforeSecondReplication bool) {
|
||||||
|
|
||||||
|
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
|
||||||
|
CREATEROOT
|
||||||
|
+ "sender"
|
||||||
|
+ "sender@1"
|
||||||
|
+ "sender@2"
|
||||||
|
+ "sender#2" "sender@2"
|
||||||
|
+ "sender@3"
|
||||||
|
+ "receiver"
|
||||||
|
R zfs create -p "${ROOTDS}/receiver/${ROOTDS}"
|
||||||
|
`)
|
||||||
|
|
||||||
|
sjid := endpoint.MustMakeJobID("sender-job")
|
||||||
|
ojid := endpoint.MustMakeJobID("other-job")
|
||||||
|
rjid := endpoint.MustMakeJobID("receiver-job")
|
||||||
|
|
||||||
|
sfs := ctx.RootDataset + "/sender"
|
||||||
|
rfsRoot := ctx.RootDataset + "/receiver"
|
||||||
|
|
||||||
|
rep := replicationInvocation{
|
||||||
|
sjid: sjid,
|
||||||
|
rjid: rjid,
|
||||||
|
sfs: sfs,
|
||||||
|
rfsRoot: rfsRoot,
|
||||||
|
}
|
||||||
|
rfs := rep.ReceiveSideFilesystem()
|
||||||
|
|
||||||
|
// first replication
|
||||||
|
report := rep.Do(ctx)
|
||||||
|
ctx.Logf("\n%s", pretty.Sprint(report))
|
||||||
|
|
||||||
|
// assert most recent send-side version @3 exists on receiver (=replication succeeded)
|
||||||
|
rSnap3 := fsversion(ctx, rfs, "@3")
|
||||||
|
// assert the source-side versions not managed by zrepl still exist
|
||||||
|
snap1 := fsversion(ctx, sfs, "@1")
|
||||||
|
snap2 := fsversion(ctx, sfs, "@2")
|
||||||
|
_ = fsversion(ctx, sfs, "#2") // non-replicationc-cursor bookmarks should not be affected
|
||||||
|
snap3 := fsversion(ctx, sfs, "@3")
|
||||||
|
// assert a replication cursor is in place
|
||||||
|
snap3CursorName, err := endpoint.ReplicationCursorBookmarkName(sfs, snap3.Guid, sjid)
|
||||||
|
require.NoError(ctx, err)
|
||||||
|
_ = fsversion(ctx, sfs, "#"+snap3CursorName)
|
||||||
|
// assert a last-received hold is in place
|
||||||
|
expectRjidHoldTag, err := endpoint.LastReceivedHoldTag(rjid)
|
||||||
|
require.NoError(ctx, err)
|
||||||
|
holds, err := zfs.ZFSHolds(ctx, rfs, rSnap3.Name)
|
||||||
|
require.NoError(ctx, err)
|
||||||
|
require.Contains(ctx, holds, expectRjidHoldTag)
|
||||||
|
|
||||||
|
// create artifical stale replication cursors
|
||||||
|
createArtificalStaleAbstractions := func(jobId endpoint.JobID) []endpoint.Abstraction {
|
||||||
|
snap2Cursor, err := endpoint.CreateReplicationCursor(ctx, sfs, snap2, jobId) // no shadow
|
||||||
|
require.NoError(ctx, err)
|
||||||
|
// create artifical stale step holds jobId
|
||||||
|
snap1Hold, err := endpoint.HoldStep(ctx, sfs, snap1, jobId) // no shadow
|
||||||
|
require.NoError(ctx, err)
|
||||||
|
snap2Hold, err := endpoint.HoldStep(ctx, sfs, snap2, jobId) // no shadow
|
||||||
|
require.NoError(ctx, err)
|
||||||
|
return []endpoint.Abstraction{snap2Cursor, snap1Hold, snap2Hold}
|
||||||
|
}
|
||||||
|
createArtificalStaleAbstractions(sjid)
|
||||||
|
ojidSendAbstractions := createArtificalStaleAbstractions(ojid)
|
||||||
|
|
||||||
|
snap3ojidLastReceivedHold, err := endpoint.CreateLastReceivedHold(ctx, rfs, fsversion(ctx, rfs, "@3"), ojid)
|
||||||
|
require.NoError(ctx, err)
|
||||||
|
require.True(ctx, zfs.FilesystemVersionEqualIdentity(fsversion(ctx, rfs, "@3"), snap3ojidLastReceivedHold.GetFilesystemVersion()))
|
||||||
|
|
||||||
|
// take another 2 snapshots
|
||||||
|
mustSnapshot(ctx, sfs+"@4")
|
||||||
|
mustSnapshot(ctx, sfs+"@5")
|
||||||
|
snap5 := fsversion(ctx, sfs, "@5")
|
||||||
|
|
||||||
|
if invalidateCacheBeforeSecondReplication {
|
||||||
|
endpoint.SendAbstractionsCacheInvalidate(sfs)
|
||||||
|
}
|
||||||
|
|
||||||
|
// do another replication
|
||||||
|
// - ojid's abstractions should not be affected on either side
|
||||||
|
// - stale abstractions of sjid and rjid should be cleaned up
|
||||||
|
// - 1 replication cursors and 1 last-received hold should be present
|
||||||
|
|
||||||
|
checkOjidAbstractionsExist := func() {
|
||||||
|
var expectedOjidAbstractions []endpoint.Abstraction
|
||||||
|
expectedOjidAbstractions = append(expectedOjidAbstractions, ojidSendAbstractions...)
|
||||||
|
expectedOjidAbstractions = append(expectedOjidAbstractions, snap3ojidLastReceivedHold)
|
||||||
|
|
||||||
|
sfsAndRfsFilter := filters.NewDatasetMapFilter(2, true)
|
||||||
|
require.NoError(ctx, sfsAndRfsFilter.Add(sfs, "ok"))
|
||||||
|
require.NoError(ctx, sfsAndRfsFilter.Add(rfs, "ok"))
|
||||||
|
rAbs, rAbsErrs, err := endpoint.ListAbstractions(ctx, endpoint.ListZFSHoldsAndBookmarksQuery{
|
||||||
|
FS: endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{Filter: sfsAndRfsFilter},
|
||||||
|
JobID: &ojid,
|
||||||
|
What: endpoint.AbstractionTypesAll,
|
||||||
|
Concurrency: 1,
|
||||||
|
})
|
||||||
|
require.NoError(ctx, err)
|
||||||
|
require.Len(ctx, rAbsErrs, 0)
|
||||||
|
ctx.Logf("rAbs=%s", rAbs)
|
||||||
|
ctx.Logf("expectedOjidAbstractions=%s", expectedOjidAbstractions)
|
||||||
|
require.Equal(ctx, len(expectedOjidAbstractions), len(rAbs))
|
||||||
|
for _, ea := range expectedOjidAbstractions {
|
||||||
|
ctx.Logf("looking for %s %#v", ea, ea.GetFilesystemVersion())
|
||||||
|
found := false
|
||||||
|
for _, a := range rAbs {
|
||||||
|
eq := endpoint.AbstractionEquals(ea, a)
|
||||||
|
ctx.Logf("comp=%v for %s %#v", eq, a, a.GetFilesystemVersion())
|
||||||
|
found = found || eq
|
||||||
|
}
|
||||||
|
require.True(ctx, found, "%s", ea)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
checkOjidAbstractionsExist()
|
||||||
|
|
||||||
|
report = rep.Do(ctx)
|
||||||
|
ctx.Logf("\n%s", pretty.Sprint(report))
|
||||||
|
|
||||||
|
checkOjidAbstractionsExist()
|
||||||
|
|
||||||
|
_ = fsversion(ctx, sfs, "@1")
|
||||||
|
_ = fsversion(ctx, sfs, "@2")
|
||||||
|
_ = fsversion(ctx, sfs, "#2")
|
||||||
|
_ = fsversion(ctx, sfs, "@3")
|
||||||
|
_ = fsversion(ctx, sfs, "@4")
|
||||||
|
_ = fsversion(ctx, sfs, "@5")
|
||||||
|
|
||||||
|
_ = fsversion(ctx, rfs, "@3")
|
||||||
|
_ = fsversion(ctx, rfs, "@4")
|
||||||
|
_ = fsversion(ctx, rfs, "@5")
|
||||||
|
|
||||||
|
// check bookmark situation
|
||||||
|
{
|
||||||
|
sBms, err := zfs.ZFSListFilesystemVersions(ctx, mustDatasetPath(sfs), zfs.ListFilesystemVersionsOptions{
|
||||||
|
Types: zfs.Bookmarks,
|
||||||
|
})
|
||||||
|
ctx.Logf("sbms=%s", sBms)
|
||||||
|
require.NoError(ctx, err)
|
||||||
|
|
||||||
|
snap5SjidCursorName, err := endpoint.ReplicationCursorBookmarkName(sfs, snap5.Guid, sjid)
|
||||||
|
require.NoError(ctx, err)
|
||||||
|
snap2SjidCursorName, err := endpoint.ReplicationCursorBookmarkName(sfs, snap2.Guid, sjid)
|
||||||
|
require.NoError(ctx, err)
|
||||||
|
snap2OjidCursorName, err := endpoint.ReplicationCursorBookmarkName(sfs, snap2.Guid, ojid)
|
||||||
|
require.NoError(ctx, err)
|
||||||
|
var bmNames []string
|
||||||
|
for _, bm := range sBms {
|
||||||
|
bmNames = append(bmNames, bm.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
if invalidateCacheBeforeSecondReplication {
|
||||||
|
require.Len(ctx, sBms, 3)
|
||||||
|
require.Contains(ctx, bmNames, snap5SjidCursorName)
|
||||||
|
require.Contains(ctx, bmNames, snap2OjidCursorName)
|
||||||
|
require.Contains(ctx, bmNames, "2")
|
||||||
|
} else {
|
||||||
|
require.Len(ctx, sBms, 4)
|
||||||
|
require.Contains(ctx, bmNames, snap5SjidCursorName)
|
||||||
|
require.Contains(ctx, bmNames, snap2SjidCursorName)
|
||||||
|
require.Contains(ctx, bmNames, snap2OjidCursorName)
|
||||||
|
require.Contains(ctx, bmNames, "2")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check last-received hold moved
|
||||||
|
{
|
||||||
|
rAbs, rAbsErrs, err := endpoint.ListAbstractions(ctx, endpoint.ListZFSHoldsAndBookmarksQuery{
|
||||||
|
FS: endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{FS: &rfs},
|
||||||
|
JobID: &rjid,
|
||||||
|
What: endpoint.AbstractionTypesAll,
|
||||||
|
Concurrency: 1,
|
||||||
|
})
|
||||||
|
require.NoError(ctx, err)
|
||||||
|
require.Len(ctx, rAbsErrs, 0)
|
||||||
|
require.Len(ctx, rAbs, 1)
|
||||||
|
require.Equal(ctx, rAbs[0].GetType(), endpoint.AbstractionLastReceivedHold)
|
||||||
|
require.Equal(ctx, *rAbs[0].GetJobID(), rjid)
|
||||||
|
require.Equal(ctx, rAbs[0].GetFilesystemVersion().GetGuid(), snap5.GetGuid())
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
type PartialSender struct {
|
type PartialSender struct {
|
||||||
*endpoint.Sender
|
*endpoint.Sender
|
||||||
failAfterByteCount int64
|
failAfterByteCount int64
|
||||||
|
Loading…
Reference in New Issue
Block a user