2020-05-24 18:18:02 +02:00
|
|
|
package tests
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
2020-07-26 12:24:05 +02:00
|
|
|
"os"
|
2020-05-24 18:18:02 +02:00
|
|
|
"path"
|
2020-06-01 14:39:59 +02:00
|
|
|
"sort"
|
2021-08-22 20:11:15 +02:00
|
|
|
"strings"
|
2021-02-28 23:33:28 +01:00
|
|
|
"time"
|
2020-05-24 18:18:02 +02:00
|
|
|
|
|
|
|
"github.com/kr/pretty"
|
|
|
|
"github.com/stretchr/testify/require"
|
2020-08-31 16:04:00 +02:00
|
|
|
|
2024-10-18 19:21:17 +02:00
|
|
|
"github.com/zrepl/zrepl/internal/daemon/filters"
|
|
|
|
"github.com/zrepl/zrepl/internal/endpoint"
|
|
|
|
"github.com/zrepl/zrepl/internal/platformtest"
|
|
|
|
"github.com/zrepl/zrepl/internal/replication"
|
|
|
|
"github.com/zrepl/zrepl/internal/replication/driver"
|
|
|
|
"github.com/zrepl/zrepl/internal/replication/logic"
|
|
|
|
"github.com/zrepl/zrepl/internal/replication/logic/pdu"
|
|
|
|
"github.com/zrepl/zrepl/internal/replication/report"
|
|
|
|
"github.com/zrepl/zrepl/internal/util/bandwidthlimit"
|
|
|
|
"github.com/zrepl/zrepl/internal/util/limitio"
|
|
|
|
"github.com/zrepl/zrepl/internal/util/nodefault"
|
|
|
|
"github.com/zrepl/zrepl/internal/zfs"
|
|
|
|
zfsprop "github.com/zrepl/zrepl/internal/zfs/property"
|
2020-05-24 18:18:02 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
// mimics the replication invocations of an active-side job
|
|
|
|
// for a single sender-receiver filesystem pair
|
|
|
|
//
|
|
|
|
// each invocation of method Do results in the construction
|
|
|
|
// of a new sender and receiver instance and one blocking invocation
|
|
|
|
// of the replication engine without encryption
|
|
|
|
type replicationInvocation struct {
|
rework resume token validation to allow resuming from raw sends of unencrypted datasets
Before this change, resuming from an unencrypted dataset with
send.raw=true specified wouldn't work with zrepl due to overly
restrictive resume token checking.
An initial PR to fix this was made in https://github.com/zrepl/zrepl/pull/503
but it didn't address the core of the problem.
The core of the problem was that zrepl assumed that if a resume token
contained `rawok=true, compressok=true`, the resulting send would be
encrypted. But if the sender dataset was unencrypted, such a resume would
actually result in an unencrypted send.
Which could be totally legitimate but zrepl failed to recognize that.
BACKGROUND
==========
The following snippets of OpenZFS code are insightful regarding how the
various ${X}ok values in the resume token are handled:
- https://github.com/openzfs/zfs/blob/6c3c5fcfbe27d9193cd131753cc7e47ee2784621/module/zfs/dmu_send.c#L1947-L2012
- https://github.com/openzfs/zfs/blob/6c3c5fcfbe27d9193cd131753cc7e47ee2784621/module/zfs/dmu_recv.c#L877-L891
- https://github.com/openzfs/zfs/blob/6c3c5fc/lib/libzfs/libzfs_sendrecv.c#L1663-L1672
Basically, some zfs send flags make the DMU send code set some DMU send
stream featureflags, although it's not a pure mapping, i.e, which DMU
send stream flags are used depends somewhat on the dataset (e.g., is it
encrypted or not, or, does it use zstd or not).
Then, the receiver looks at some (but not all) feature flags and maps
them to ${X}ok dataset zap attributes.
These are funnelled back to the sender 1:1 through the resume_token.
And the sender turns them into lzc flags.
As an example, let's look at zfs send --raw.
if the sender requests a raw send on an unencrypted dataset, the send
stream (and hence the resume token) will not have the raw stream
featureflag set, and hence the resume token will not have the rawok
field set. Instead, it will have compressok, embedok, and depending
on whether large blocks are present in the dataset, largeblockok set.
WHAT'S ZREPL'S ROLE IN THIS?
============================
zrepl provides a virtual encrypted sendflag that is like `raw`,
but further ensures that we only send encrypted datasets.
For any other resume token stuff, it shoudn't do any checking,
because it's a futile effort to keep up with ZFS send/recv features
that are orthogonal to encryption.
CHANGES MADE IN THIS COMMIT
===========================
- Rip out a bunch of needless checking that zrepl would do during
planning. These checks were there to give better error messages,
but actually, the error messages created by the endpoint.Sender.Send
RPC upon send args validation failure are good enough.
- Add platformtests to validate all combinations of
(Unencrypted/Encrypted FS) x (send.encrypted = true | false) x (send.raw = true | false)
for cases both non-resuming and resuming send.
Additional manual testing done:
1. With zrepl 0.5, setup with unencrypted dataset, send.raw=true specified, no send.encrypted specified.
2. Observe that regular non-resuming send works, but resuming doesn't work.
3. Upgrade zrepl to this change.
4. Observe that both regular and resuming send works.
closes https://github.com/zrepl/zrepl/pull/613
2022-07-10 14:56:35 +02:00
|
|
|
sjid, rjid endpoint.JobID
|
|
|
|
sfs string
|
|
|
|
sfilter *filters.DatasetMapFilter
|
|
|
|
rfsRoot string
|
|
|
|
interceptSender func(e *endpoint.Sender) logic.Sender
|
|
|
|
interceptReceiver func(e *endpoint.Receiver) logic.Receiver
|
|
|
|
guarantee *pdu.ReplicationConfigProtection
|
|
|
|
senderConfigHook func(*endpoint.SenderConfig)
|
|
|
|
receiverConfigHook func(*endpoint.ReceiverConfig)
|
|
|
|
plannerPolicyHook func(*logic.PlannerPolicy)
|
|
|
|
skipSendArgsValidation bool
|
2020-05-24 18:18:02 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (i replicationInvocation) Do(ctx *platformtest.Context) *report.Report {
|
|
|
|
|
|
|
|
if i.interceptSender == nil {
|
|
|
|
i.interceptSender = func(e *endpoint.Sender) logic.Sender { return e }
|
|
|
|
}
|
2020-07-26 12:24:05 +02:00
|
|
|
if i.interceptReceiver == nil {
|
|
|
|
i.interceptReceiver = func(e *endpoint.Receiver) logic.Receiver { return e }
|
|
|
|
}
|
2020-05-24 18:18:02 +02:00
|
|
|
|
2020-07-26 17:58:20 +02:00
|
|
|
if i.sfs != "" && i.sfilter != nil || i.sfs == "" && i.sfilter == nil {
|
|
|
|
panic("either sfs or sfilter must be set")
|
|
|
|
}
|
|
|
|
if i.sfilter == nil {
|
|
|
|
i.sfilter = filters.NewDatasetMapFilter(1, true)
|
|
|
|
err := i.sfilter.Add(i.sfs, "ok")
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
}
|
2020-09-07 01:20:57 +02:00
|
|
|
|
|
|
|
senderConfig := endpoint.SenderConfig{
|
2021-09-19 19:24:20 +02:00
|
|
|
FSF: i.sfilter.AsFilter(),
|
|
|
|
Encrypt: &nodefault.Bool{B: false},
|
|
|
|
JobID: i.sjid,
|
|
|
|
BandwidthLimit: bandwidthlimit.NoLimitConfig(),
|
2020-09-07 01:20:57 +02:00
|
|
|
}
|
|
|
|
if i.senderConfigHook != nil {
|
|
|
|
i.senderConfigHook(&senderConfig)
|
|
|
|
}
|
|
|
|
|
|
|
|
receiverConfig := endpoint.ReceiverConfig{
|
2020-05-24 18:18:02 +02:00
|
|
|
JobID: i.rjid,
|
|
|
|
AppendClientIdentity: false,
|
|
|
|
RootWithoutClientComponent: mustDatasetPath(i.rfsRoot),
|
2021-09-19 19:24:20 +02:00
|
|
|
BandwidthLimit: bandwidthlimit.NoLimitConfig(),
|
2021-11-21 21:16:37 +01:00
|
|
|
PlaceholderEncryption: endpoint.PlaceholderCreationEncryptionPropertyUnspecified,
|
2020-09-07 01:20:57 +02:00
|
|
|
}
|
|
|
|
if i.receiverConfigHook != nil {
|
|
|
|
i.receiverConfigHook(&receiverConfig)
|
|
|
|
}
|
|
|
|
|
|
|
|
require.Equal(ctx, senderConfig.JobID, i.sjid)
|
|
|
|
require.Equal(ctx, receiverConfig.JobID, i.rjid)
|
|
|
|
|
|
|
|
sender := i.interceptSender(endpoint.NewSender(senderConfig))
|
|
|
|
receiver := i.interceptReceiver(endpoint.NewReceiver(receiverConfig))
|
2020-05-24 18:18:02 +02:00
|
|
|
plannerPolicy := logic.PlannerPolicy{
|
2021-01-24 23:31:45 +01:00
|
|
|
ReplicationConfig: &pdu.ReplicationConfig{
|
|
|
|
Protection: i.guarantee,
|
2020-06-27 23:53:33 +02:00
|
|
|
},
|
2022-05-01 14:46:38 +02:00
|
|
|
ConflictResolution: &logic.ConflictResolution{
|
|
|
|
InitialReplication: logic.InitialReplicationAutoResolutionMostRecent,
|
|
|
|
},
|
2021-08-22 20:30:34 +02:00
|
|
|
SizeEstimationConcurrency: 1,
|
2020-05-24 18:18:02 +02:00
|
|
|
}
|
2022-05-01 14:46:38 +02:00
|
|
|
if i.plannerPolicyHook != nil {
|
|
|
|
i.plannerPolicyHook(&plannerPolicy)
|
|
|
|
}
|
2020-05-24 18:18:02 +02:00
|
|
|
|
rework resume token validation to allow resuming from raw sends of unencrypted datasets
Before this change, resuming from an unencrypted dataset with
send.raw=true specified wouldn't work with zrepl due to overly
restrictive resume token checking.
An initial PR to fix this was made in https://github.com/zrepl/zrepl/pull/503
but it didn't address the core of the problem.
The core of the problem was that zrepl assumed that if a resume token
contained `rawok=true, compressok=true`, the resulting send would be
encrypted. But if the sender dataset was unencrypted, such a resume would
actually result in an unencrypted send.
Which could be totally legitimate but zrepl failed to recognize that.
BACKGROUND
==========
The following snippets of OpenZFS code are insightful regarding how the
various ${X}ok values in the resume token are handled:
- https://github.com/openzfs/zfs/blob/6c3c5fcfbe27d9193cd131753cc7e47ee2784621/module/zfs/dmu_send.c#L1947-L2012
- https://github.com/openzfs/zfs/blob/6c3c5fcfbe27d9193cd131753cc7e47ee2784621/module/zfs/dmu_recv.c#L877-L891
- https://github.com/openzfs/zfs/blob/6c3c5fc/lib/libzfs/libzfs_sendrecv.c#L1663-L1672
Basically, some zfs send flags make the DMU send code set some DMU send
stream featureflags, although it's not a pure mapping, i.e, which DMU
send stream flags are used depends somewhat on the dataset (e.g., is it
encrypted or not, or, does it use zstd or not).
Then, the receiver looks at some (but not all) feature flags and maps
them to ${X}ok dataset zap attributes.
These are funnelled back to the sender 1:1 through the resume_token.
And the sender turns them into lzc flags.
As an example, let's look at zfs send --raw.
if the sender requests a raw send on an unencrypted dataset, the send
stream (and hence the resume token) will not have the raw stream
featureflag set, and hence the resume token will not have the rawok
field set. Instead, it will have compressok, embedok, and depending
on whether large blocks are present in the dataset, largeblockok set.
WHAT'S ZREPL'S ROLE IN THIS?
============================
zrepl provides a virtual encrypted sendflag that is like `raw`,
but further ensures that we only send encrypted datasets.
For any other resume token stuff, it shoudn't do any checking,
because it's a futile effort to keep up with ZFS send/recv features
that are orthogonal to encryption.
CHANGES MADE IN THIS COMMIT
===========================
- Rip out a bunch of needless checking that zrepl would do during
planning. These checks were there to give better error messages,
but actually, the error messages created by the endpoint.Sender.Send
RPC upon send args validation failure are good enough.
- Add platformtests to validate all combinations of
(Unencrypted/Encrypted FS) x (send.encrypted = true | false) x (send.raw = true | false)
for cases both non-resuming and resuming send.
Additional manual testing done:
1. With zrepl 0.5, setup with unencrypted dataset, send.raw=true specified, no send.encrypted specified.
2. Observe that regular non-resuming send works, but resuming doesn't work.
3. Upgrade zrepl to this change.
4. Observe that both regular and resuming send works.
closes https://github.com/zrepl/zrepl/pull/613
2022-07-10 14:56:35 +02:00
|
|
|
var doCtx context.Context = ctx
|
|
|
|
if i.skipSendArgsValidation {
|
|
|
|
doCtx = zfs.ZFSSendArgsSkipValidation(ctx)
|
|
|
|
}
|
|
|
|
|
2020-05-24 18:18:02 +02:00
|
|
|
report, wait := replication.Do(
|
rework resume token validation to allow resuming from raw sends of unencrypted datasets
Before this change, resuming from an unencrypted dataset with
send.raw=true specified wouldn't work with zrepl due to overly
restrictive resume token checking.
An initial PR to fix this was made in https://github.com/zrepl/zrepl/pull/503
but it didn't address the core of the problem.
The core of the problem was that zrepl assumed that if a resume token
contained `rawok=true, compressok=true`, the resulting send would be
encrypted. But if the sender dataset was unencrypted, such a resume would
actually result in an unencrypted send.
Which could be totally legitimate but zrepl failed to recognize that.
BACKGROUND
==========
The following snippets of OpenZFS code are insightful regarding how the
various ${X}ok values in the resume token are handled:
- https://github.com/openzfs/zfs/blob/6c3c5fcfbe27d9193cd131753cc7e47ee2784621/module/zfs/dmu_send.c#L1947-L2012
- https://github.com/openzfs/zfs/blob/6c3c5fcfbe27d9193cd131753cc7e47ee2784621/module/zfs/dmu_recv.c#L877-L891
- https://github.com/openzfs/zfs/blob/6c3c5fc/lib/libzfs/libzfs_sendrecv.c#L1663-L1672
Basically, some zfs send flags make the DMU send code set some DMU send
stream featureflags, although it's not a pure mapping, i.e, which DMU
send stream flags are used depends somewhat on the dataset (e.g., is it
encrypted or not, or, does it use zstd or not).
Then, the receiver looks at some (but not all) feature flags and maps
them to ${X}ok dataset zap attributes.
These are funnelled back to the sender 1:1 through the resume_token.
And the sender turns them into lzc flags.
As an example, let's look at zfs send --raw.
if the sender requests a raw send on an unencrypted dataset, the send
stream (and hence the resume token) will not have the raw stream
featureflag set, and hence the resume token will not have the rawok
field set. Instead, it will have compressok, embedok, and depending
on whether large blocks are present in the dataset, largeblockok set.
WHAT'S ZREPL'S ROLE IN THIS?
============================
zrepl provides a virtual encrypted sendflag that is like `raw`,
but further ensures that we only send encrypted datasets.
For any other resume token stuff, it shoudn't do any checking,
because it's a futile effort to keep up with ZFS send/recv features
that are orthogonal to encryption.
CHANGES MADE IN THIS COMMIT
===========================
- Rip out a bunch of needless checking that zrepl would do during
planning. These checks were there to give better error messages,
but actually, the error messages created by the endpoint.Sender.Send
RPC upon send args validation failure are good enough.
- Add platformtests to validate all combinations of
(Unencrypted/Encrypted FS) x (send.encrypted = true | false) x (send.raw = true | false)
for cases both non-resuming and resuming send.
Additional manual testing done:
1. With zrepl 0.5, setup with unencrypted dataset, send.raw=true specified, no send.encrypted specified.
2. Observe that regular non-resuming send works, but resuming doesn't work.
3. Upgrade zrepl to this change.
4. Observe that both regular and resuming send works.
closes https://github.com/zrepl/zrepl/pull/613
2022-07-10 14:56:35 +02:00
|
|
|
doCtx,
|
2021-02-28 23:33:28 +01:00
|
|
|
driver.Config{
|
|
|
|
MaxAttempts: 1,
|
|
|
|
StepQueueConcurrency: 1,
|
|
|
|
ReconnectHardFailTimeout: 1 * time.Second,
|
|
|
|
},
|
2020-05-24 18:18:02 +02:00
|
|
|
logic.NewPlanner(nil, nil, sender, receiver, plannerPolicy),
|
|
|
|
)
|
|
|
|
wait(true)
|
|
|
|
return report()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (i replicationInvocation) ReceiveSideFilesystem() string {
|
|
|
|
return path.Join(i.rfsRoot, i.sfs)
|
|
|
|
}
|
|
|
|
|
|
|
|
func ReplicationIncrementalIsPossibleIfCommonSnapshotIsDestroyed(ctx *platformtest.Context) {
|
|
|
|
|
|
|
|
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
|
|
|
|
CREATEROOT
|
|
|
|
+ "sender"
|
|
|
|
+ "sender@1"
|
|
|
|
+ "receiver"
|
|
|
|
R zfs create -p "${ROOTDS}/receiver/${ROOTDS}"
|
|
|
|
`)
|
|
|
|
|
|
|
|
sjid := endpoint.MustMakeJobID("sender-job")
|
|
|
|
rjid := endpoint.MustMakeJobID("receiver-job")
|
|
|
|
|
|
|
|
sfs := ctx.RootDataset + "/sender"
|
|
|
|
rfsRoot := ctx.RootDataset + "/receiver"
|
|
|
|
snap1 := fsversion(ctx, sfs, "@1")
|
|
|
|
|
|
|
|
rep := replicationInvocation{
|
2020-06-27 23:53:33 +02:00
|
|
|
sjid: sjid,
|
|
|
|
rjid: rjid,
|
|
|
|
sfs: sfs,
|
|
|
|
rfsRoot: rfsRoot,
|
2021-01-24 23:31:45 +01:00
|
|
|
guarantee: pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeResumability),
|
2020-05-24 18:18:02 +02:00
|
|
|
}
|
|
|
|
rfs := rep.ReceiveSideFilesystem()
|
|
|
|
|
|
|
|
// first replication
|
|
|
|
report := rep.Do(ctx)
|
|
|
|
ctx.Logf("\n%s", pretty.Sprint(report))
|
|
|
|
|
|
|
|
// assert @1 exists on receiver
|
|
|
|
_ = fsversion(ctx, rfs, "@1")
|
|
|
|
|
|
|
|
// cut off the common base between sender and receiver
|
|
|
|
// (replication engine guarantees resumability through bookmarks)
|
|
|
|
err := zfs.ZFSDestroy(ctx, snap1.FullPath(sfs))
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
|
|
|
|
// assert that the replication cursor has been created
|
|
|
|
snap1CursorName, err := endpoint.ReplicationCursorBookmarkName(sfs, snap1.Guid, sjid)
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
snap1CursorInfo, err := zfs.ZFSGetFilesystemVersion(ctx, sfs+"#"+snap1CursorName)
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
require.True(ctx, snap1CursorInfo.IsBookmark())
|
|
|
|
|
|
|
|
// second replication of a new snapshot, should use the cursor
|
|
|
|
mustSnapshot(ctx, sfs+"@2")
|
|
|
|
report = rep.Do(ctx)
|
|
|
|
ctx.Logf("\n%s", pretty.Sprint(report))
|
|
|
|
_ = fsversion(ctx, rfs, "@2")
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2020-05-20 13:10:37 +02:00
|
|
|
func ReplicationIncrementalCleansUpStaleAbstractionsWithCacheOnSecondReplication(ctx *platformtest.Context) {
|
|
|
|
implReplicationIncrementalCleansUpStaleAbstractions(ctx, true)
|
|
|
|
}
|
|
|
|
|
|
|
|
func ReplicationIncrementalCleansUpStaleAbstractionsWithoutCacheOnSecondReplication(ctx *platformtest.Context) {
|
|
|
|
implReplicationIncrementalCleansUpStaleAbstractions(ctx, false)
|
|
|
|
}
|
|
|
|
|
|
|
|
func implReplicationIncrementalCleansUpStaleAbstractions(ctx *platformtest.Context, invalidateCacheBeforeSecondReplication bool) {
|
|
|
|
|
|
|
|
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
|
|
|
|
CREATEROOT
|
|
|
|
+ "sender"
|
|
|
|
+ "sender@1"
|
|
|
|
+ "sender@2"
|
|
|
|
+ "sender#2" "sender@2"
|
|
|
|
+ "sender@3"
|
|
|
|
+ "receiver"
|
|
|
|
R zfs create -p "${ROOTDS}/receiver/${ROOTDS}"
|
|
|
|
`)
|
|
|
|
|
|
|
|
sjid := endpoint.MustMakeJobID("sender-job")
|
|
|
|
ojid := endpoint.MustMakeJobID("other-job")
|
|
|
|
rjid := endpoint.MustMakeJobID("receiver-job")
|
|
|
|
|
|
|
|
sfs := ctx.RootDataset + "/sender"
|
|
|
|
rfsRoot := ctx.RootDataset + "/receiver"
|
|
|
|
|
|
|
|
rep := replicationInvocation{
|
2020-06-27 23:53:33 +02:00
|
|
|
sjid: sjid,
|
|
|
|
rjid: rjid,
|
|
|
|
sfs: sfs,
|
|
|
|
rfsRoot: rfsRoot,
|
2021-01-24 23:31:45 +01:00
|
|
|
guarantee: pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeResumability),
|
2020-05-20 13:10:37 +02:00
|
|
|
}
|
|
|
|
rfs := rep.ReceiveSideFilesystem()
|
|
|
|
|
|
|
|
// first replication
|
|
|
|
report := rep.Do(ctx)
|
|
|
|
ctx.Logf("\n%s", pretty.Sprint(report))
|
|
|
|
|
|
|
|
// assert most recent send-side version @3 exists on receiver (=replication succeeded)
|
|
|
|
rSnap3 := fsversion(ctx, rfs, "@3")
|
|
|
|
// assert the source-side versions not managed by zrepl still exist
|
|
|
|
snap1 := fsversion(ctx, sfs, "@1")
|
|
|
|
snap2 := fsversion(ctx, sfs, "@2")
|
|
|
|
_ = fsversion(ctx, sfs, "#2") // non-replicationc-cursor bookmarks should not be affected
|
|
|
|
snap3 := fsversion(ctx, sfs, "@3")
|
|
|
|
// assert a replication cursor is in place
|
|
|
|
snap3CursorName, err := endpoint.ReplicationCursorBookmarkName(sfs, snap3.Guid, sjid)
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
_ = fsversion(ctx, sfs, "#"+snap3CursorName)
|
|
|
|
// assert a last-received hold is in place
|
|
|
|
expectRjidHoldTag, err := endpoint.LastReceivedHoldTag(rjid)
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
holds, err := zfs.ZFSHolds(ctx, rfs, rSnap3.Name)
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
require.Contains(ctx, holds, expectRjidHoldTag)
|
|
|
|
|
2020-06-01 14:39:59 +02:00
|
|
|
// create artifical stale replication cursors & step holds
|
2020-05-20 13:10:37 +02:00
|
|
|
createArtificalStaleAbstractions := func(jobId endpoint.JobID) []endpoint.Abstraction {
|
|
|
|
snap2Cursor, err := endpoint.CreateReplicationCursor(ctx, sfs, snap2, jobId) // no shadow
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
// create artifical stale step holds jobId
|
|
|
|
snap1Hold, err := endpoint.HoldStep(ctx, sfs, snap1, jobId) // no shadow
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
snap2Hold, err := endpoint.HoldStep(ctx, sfs, snap2, jobId) // no shadow
|
|
|
|
require.NoError(ctx, err)
|
fix handling of tenative cursor presence if protection strategy doesn't use it (#714)
Before this PR, we would panic in the `check` phase of `endpoint.Send()`'s `TryBatchDestroy` call in the following cases: the current protection strategy does NOT produce a tentative replication cursor AND
* `FromVersion` is a tentative cursor bookmark
* `FromVersion` is a snapshot, and there exists a tentative cursor bookmark for that snapshot
* `FromVersion` is a bookmark != tentative cursor bookmark, but there exists a tentative cursor bookmark for the same snapshot as the `FromVersion` bookmark
In those cases, the `check` concluded that we would delete `FromVersion`.
It came to that conclusion because the tentative cursor isn't part of `obsoleteAbs` if the protection strategy doesn't produce a tentative replication cursor.
The scenarios above can happen if the user changes the protection strategy from "with tentative cursor" to one "without tentative replication cursor", while there is a tentative replication cursor on disk.
The workaround was to rename the tentative cursor.
In all cases above, `TryBatchDestroy` would have destroyed the tentative cursor.
In case 1, that would fail the `Send` step and potentially break replication if the cursor is the last common bookmark. The `check` conclusion was correct.
In cases 2 and 3, deleting the tentative cursor would have been fine because `FromVersion` was a different entity than the tentative cursor. So, destroying the tentative cursor would be the right call.
The solution in this PR is as follows:
* add the `FromVersion` to the `liveAbs` set of live abstractions
* rewrite the `check` closure to use the full dataset path (`fullpath`) to identify the concrete ZFS object instead of the `zfs.FilesystemVersionEqualIdentity`, which is only identified by matching GUID.
* Holds have no dataset path and are not the `FromVersion` in any case, so disregard them.
fixes #666
2023-07-04 20:21:48 +02:00
|
|
|
// create artificial tentative cursor
|
|
|
|
snap3TentativeCursor, err := endpoint.CreateTentativeReplicationCursor(ctx, sfs, snap3, jobId)
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
return []endpoint.Abstraction{snap2Cursor, snap1Hold, snap2Hold, snap3TentativeCursor}
|
2020-05-20 13:10:37 +02:00
|
|
|
}
|
|
|
|
createArtificalStaleAbstractions(sjid)
|
|
|
|
ojidSendAbstractions := createArtificalStaleAbstractions(ojid)
|
|
|
|
|
|
|
|
snap3ojidLastReceivedHold, err := endpoint.CreateLastReceivedHold(ctx, rfs, fsversion(ctx, rfs, "@3"), ojid)
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
require.True(ctx, zfs.FilesystemVersionEqualIdentity(fsversion(ctx, rfs, "@3"), snap3ojidLastReceivedHold.GetFilesystemVersion()))
|
|
|
|
|
|
|
|
// take another 2 snapshots
|
|
|
|
mustSnapshot(ctx, sfs+"@4")
|
|
|
|
mustSnapshot(ctx, sfs+"@5")
|
|
|
|
snap5 := fsversion(ctx, sfs, "@5")
|
|
|
|
|
|
|
|
if invalidateCacheBeforeSecondReplication {
|
2020-06-27 23:53:33 +02:00
|
|
|
endpoint.AbstractionsCacheInvalidate(sfs)
|
2020-05-20 13:10:37 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// do another replication
|
|
|
|
// - ojid's abstractions should not be affected on either side
|
|
|
|
// - stale abstractions of sjid and rjid should be cleaned up
|
|
|
|
// - 1 replication cursors and 1 last-received hold should be present
|
|
|
|
|
|
|
|
checkOjidAbstractionsExist := func() {
|
|
|
|
var expectedOjidAbstractions []endpoint.Abstraction
|
|
|
|
expectedOjidAbstractions = append(expectedOjidAbstractions, ojidSendAbstractions...)
|
|
|
|
expectedOjidAbstractions = append(expectedOjidAbstractions, snap3ojidLastReceivedHold)
|
|
|
|
|
|
|
|
sfsAndRfsFilter := filters.NewDatasetMapFilter(2, true)
|
|
|
|
require.NoError(ctx, sfsAndRfsFilter.Add(sfs, "ok"))
|
|
|
|
require.NoError(ctx, sfsAndRfsFilter.Add(rfs, "ok"))
|
|
|
|
rAbs, rAbsErrs, err := endpoint.ListAbstractions(ctx, endpoint.ListZFSHoldsAndBookmarksQuery{
|
|
|
|
FS: endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{Filter: sfsAndRfsFilter},
|
|
|
|
JobID: &ojid,
|
|
|
|
What: endpoint.AbstractionTypesAll,
|
|
|
|
Concurrency: 1,
|
|
|
|
})
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
require.Len(ctx, rAbsErrs, 0)
|
|
|
|
ctx.Logf("rAbs=%s", rAbs)
|
|
|
|
ctx.Logf("expectedOjidAbstractions=%s", expectedOjidAbstractions)
|
|
|
|
require.Equal(ctx, len(expectedOjidAbstractions), len(rAbs))
|
|
|
|
for _, ea := range expectedOjidAbstractions {
|
|
|
|
ctx.Logf("looking for %s %#v", ea, ea.GetFilesystemVersion())
|
|
|
|
found := false
|
|
|
|
for _, a := range rAbs {
|
|
|
|
eq := endpoint.AbstractionEquals(ea, a)
|
|
|
|
ctx.Logf("comp=%v for %s %#v", eq, a, a.GetFilesystemVersion())
|
|
|
|
found = found || eq
|
|
|
|
}
|
|
|
|
require.True(ctx, found, "%s", ea)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
checkOjidAbstractionsExist()
|
|
|
|
|
|
|
|
report = rep.Do(ctx)
|
|
|
|
ctx.Logf("\n%s", pretty.Sprint(report))
|
|
|
|
|
|
|
|
checkOjidAbstractionsExist()
|
|
|
|
|
|
|
|
_ = fsversion(ctx, sfs, "@1")
|
|
|
|
_ = fsversion(ctx, sfs, "@2")
|
|
|
|
_ = fsversion(ctx, sfs, "#2")
|
|
|
|
_ = fsversion(ctx, sfs, "@3")
|
|
|
|
_ = fsversion(ctx, sfs, "@4")
|
|
|
|
_ = fsversion(ctx, sfs, "@5")
|
|
|
|
|
|
|
|
_ = fsversion(ctx, rfs, "@3")
|
|
|
|
_ = fsversion(ctx, rfs, "@4")
|
|
|
|
_ = fsversion(ctx, rfs, "@5")
|
|
|
|
|
|
|
|
// check bookmark situation
|
|
|
|
{
|
|
|
|
sBms, err := zfs.ZFSListFilesystemVersions(ctx, mustDatasetPath(sfs), zfs.ListFilesystemVersionsOptions{
|
|
|
|
Types: zfs.Bookmarks,
|
|
|
|
})
|
|
|
|
ctx.Logf("sbms=%s", sBms)
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
|
|
|
|
snap5SjidCursorName, err := endpoint.ReplicationCursorBookmarkName(sfs, snap5.Guid, sjid)
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
snap2SjidCursorName, err := endpoint.ReplicationCursorBookmarkName(sfs, snap2.Guid, sjid)
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
snap2OjidCursorName, err := endpoint.ReplicationCursorBookmarkName(sfs, snap2.Guid, ojid)
|
|
|
|
require.NoError(ctx, err)
|
fix handling of tenative cursor presence if protection strategy doesn't use it (#714)
Before this PR, we would panic in the `check` phase of `endpoint.Send()`'s `TryBatchDestroy` call in the following cases: the current protection strategy does NOT produce a tentative replication cursor AND
* `FromVersion` is a tentative cursor bookmark
* `FromVersion` is a snapshot, and there exists a tentative cursor bookmark for that snapshot
* `FromVersion` is a bookmark != tentative cursor bookmark, but there exists a tentative cursor bookmark for the same snapshot as the `FromVersion` bookmark
In those cases, the `check` concluded that we would delete `FromVersion`.
It came to that conclusion because the tentative cursor isn't part of `obsoleteAbs` if the protection strategy doesn't produce a tentative replication cursor.
The scenarios above can happen if the user changes the protection strategy from "with tentative cursor" to one "without tentative replication cursor", while there is a tentative replication cursor on disk.
The workaround was to rename the tentative cursor.
In all cases above, `TryBatchDestroy` would have destroyed the tentative cursor.
In case 1, that would fail the `Send` step and potentially break replication if the cursor is the last common bookmark. The `check` conclusion was correct.
In cases 2 and 3, deleting the tentative cursor would have been fine because `FromVersion` was a different entity than the tentative cursor. So, destroying the tentative cursor would be the right call.
The solution in this PR is as follows:
* add the `FromVersion` to the `liveAbs` set of live abstractions
* rewrite the `check` closure to use the full dataset path (`fullpath`) to identify the concrete ZFS object instead of the `zfs.FilesystemVersionEqualIdentity`, which is only identified by matching GUID.
* Holds have no dataset path and are not the `FromVersion` in any case, so disregard them.
fixes #666
2023-07-04 20:21:48 +02:00
|
|
|
snap3SjidTentativeCursorName, err := endpoint.TentativeReplicationCursorBookmarkName(sfs, snap3.Guid, sjid)
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
snap3OjidTentativeCursorName, err := endpoint.TentativeReplicationCursorBookmarkName(sfs, snap3.Guid, ojid)
|
|
|
|
require.NoError(ctx, err)
|
2020-05-20 13:10:37 +02:00
|
|
|
var bmNames []string
|
|
|
|
for _, bm := range sBms {
|
|
|
|
bmNames = append(bmNames, bm.Name)
|
|
|
|
}
|
|
|
|
|
|
|
|
if invalidateCacheBeforeSecondReplication {
|
fix handling of tenative cursor presence if protection strategy doesn't use it (#714)
Before this PR, we would panic in the `check` phase of `endpoint.Send()`'s `TryBatchDestroy` call in the following cases: the current protection strategy does NOT produce a tentative replication cursor AND
* `FromVersion` is a tentative cursor bookmark
* `FromVersion` is a snapshot, and there exists a tentative cursor bookmark for that snapshot
* `FromVersion` is a bookmark != tentative cursor bookmark, but there exists a tentative cursor bookmark for the same snapshot as the `FromVersion` bookmark
In those cases, the `check` concluded that we would delete `FromVersion`.
It came to that conclusion because the tentative cursor isn't part of `obsoleteAbs` if the protection strategy doesn't produce a tentative replication cursor.
The scenarios above can happen if the user changes the protection strategy from "with tentative cursor" to one "without tentative replication cursor", while there is a tentative replication cursor on disk.
The workaround was to rename the tentative cursor.
In all cases above, `TryBatchDestroy` would have destroyed the tentative cursor.
In case 1, that would fail the `Send` step and potentially break replication if the cursor is the last common bookmark. The `check` conclusion was correct.
In cases 2 and 3, deleting the tentative cursor would have been fine because `FromVersion` was a different entity than the tentative cursor. So, destroying the tentative cursor would be the right call.
The solution in this PR is as follows:
* add the `FromVersion` to the `liveAbs` set of live abstractions
* rewrite the `check` closure to use the full dataset path (`fullpath`) to identify the concrete ZFS object instead of the `zfs.FilesystemVersionEqualIdentity`, which is only identified by matching GUID.
* Holds have no dataset path and are not the `FromVersion` in any case, so disregard them.
fixes #666
2023-07-04 20:21:48 +02:00
|
|
|
require.Len(ctx, sBms, 4)
|
2020-05-20 13:10:37 +02:00
|
|
|
require.Contains(ctx, bmNames, snap5SjidCursorName)
|
|
|
|
require.Contains(ctx, bmNames, snap2OjidCursorName)
|
fix handling of tenative cursor presence if protection strategy doesn't use it (#714)
Before this PR, we would panic in the `check` phase of `endpoint.Send()`'s `TryBatchDestroy` call in the following cases: the current protection strategy does NOT produce a tentative replication cursor AND
* `FromVersion` is a tentative cursor bookmark
* `FromVersion` is a snapshot, and there exists a tentative cursor bookmark for that snapshot
* `FromVersion` is a bookmark != tentative cursor bookmark, but there exists a tentative cursor bookmark for the same snapshot as the `FromVersion` bookmark
In those cases, the `check` concluded that we would delete `FromVersion`.
It came to that conclusion because the tentative cursor isn't part of `obsoleteAbs` if the protection strategy doesn't produce a tentative replication cursor.
The scenarios above can happen if the user changes the protection strategy from "with tentative cursor" to one "without tentative replication cursor", while there is a tentative replication cursor on disk.
The workaround was to rename the tentative cursor.
In all cases above, `TryBatchDestroy` would have destroyed the tentative cursor.
In case 1, that would fail the `Send` step and potentially break replication if the cursor is the last common bookmark. The `check` conclusion was correct.
In cases 2 and 3, deleting the tentative cursor would have been fine because `FromVersion` was a different entity than the tentative cursor. So, destroying the tentative cursor would be the right call.
The solution in this PR is as follows:
* add the `FromVersion` to the `liveAbs` set of live abstractions
* rewrite the `check` closure to use the full dataset path (`fullpath`) to identify the concrete ZFS object instead of the `zfs.FilesystemVersionEqualIdentity`, which is only identified by matching GUID.
* Holds have no dataset path and are not the `FromVersion` in any case, so disregard them.
fixes #666
2023-07-04 20:21:48 +02:00
|
|
|
require.Contains(ctx, bmNames, snap3OjidTentativeCursorName)
|
2020-05-20 13:10:37 +02:00
|
|
|
require.Contains(ctx, bmNames, "2")
|
|
|
|
} else {
|
fix handling of tenative cursor presence if protection strategy doesn't use it (#714)
Before this PR, we would panic in the `check` phase of `endpoint.Send()`'s `TryBatchDestroy` call in the following cases: the current protection strategy does NOT produce a tentative replication cursor AND
* `FromVersion` is a tentative cursor bookmark
* `FromVersion` is a snapshot, and there exists a tentative cursor bookmark for that snapshot
* `FromVersion` is a bookmark != tentative cursor bookmark, but there exists a tentative cursor bookmark for the same snapshot as the `FromVersion` bookmark
In those cases, the `check` concluded that we would delete `FromVersion`.
It came to that conclusion because the tentative cursor isn't part of `obsoleteAbs` if the protection strategy doesn't produce a tentative replication cursor.
The scenarios above can happen if the user changes the protection strategy from "with tentative cursor" to one "without tentative replication cursor", while there is a tentative replication cursor on disk.
The workaround was to rename the tentative cursor.
In all cases above, `TryBatchDestroy` would have destroyed the tentative cursor.
In case 1, that would fail the `Send` step and potentially break replication if the cursor is the last common bookmark. The `check` conclusion was correct.
In cases 2 and 3, deleting the tentative cursor would have been fine because `FromVersion` was a different entity than the tentative cursor. So, destroying the tentative cursor would be the right call.
The solution in this PR is as follows:
* add the `FromVersion` to the `liveAbs` set of live abstractions
* rewrite the `check` closure to use the full dataset path (`fullpath`) to identify the concrete ZFS object instead of the `zfs.FilesystemVersionEqualIdentity`, which is only identified by matching GUID.
* Holds have no dataset path and are not the `FromVersion` in any case, so disregard them.
fixes #666
2023-07-04 20:21:48 +02:00
|
|
|
require.Len(ctx, sBms, 6)
|
|
|
|
ctx.Logf("%s", pretty.Sprint(sBms))
|
2020-05-20 13:10:37 +02:00
|
|
|
require.Contains(ctx, bmNames, snap5SjidCursorName)
|
|
|
|
require.Contains(ctx, bmNames, snap2SjidCursorName)
|
|
|
|
require.Contains(ctx, bmNames, snap2OjidCursorName)
|
fix handling of tenative cursor presence if protection strategy doesn't use it (#714)
Before this PR, we would panic in the `check` phase of `endpoint.Send()`'s `TryBatchDestroy` call in the following cases: the current protection strategy does NOT produce a tentative replication cursor AND
* `FromVersion` is a tentative cursor bookmark
* `FromVersion` is a snapshot, and there exists a tentative cursor bookmark for that snapshot
* `FromVersion` is a bookmark != tentative cursor bookmark, but there exists a tentative cursor bookmark for the same snapshot as the `FromVersion` bookmark
In those cases, the `check` concluded that we would delete `FromVersion`.
It came to that conclusion because the tentative cursor isn't part of `obsoleteAbs` if the protection strategy doesn't produce a tentative replication cursor.
The scenarios above can happen if the user changes the protection strategy from "with tentative cursor" to one "without tentative replication cursor", while there is a tentative replication cursor on disk.
The workaround was to rename the tentative cursor.
In all cases above, `TryBatchDestroy` would have destroyed the tentative cursor.
In case 1, that would fail the `Send` step and potentially break replication if the cursor is the last common bookmark. The `check` conclusion was correct.
In cases 2 and 3, deleting the tentative cursor would have been fine because `FromVersion` was a different entity than the tentative cursor. So, destroying the tentative cursor would be the right call.
The solution in this PR is as follows:
* add the `FromVersion` to the `liveAbs` set of live abstractions
* rewrite the `check` closure to use the full dataset path (`fullpath`) to identify the concrete ZFS object instead of the `zfs.FilesystemVersionEqualIdentity`, which is only identified by matching GUID.
* Holds have no dataset path and are not the `FromVersion` in any case, so disregard them.
fixes #666
2023-07-04 20:21:48 +02:00
|
|
|
require.Contains(ctx, bmNames, snap3SjidTentativeCursorName)
|
|
|
|
require.Contains(ctx, bmNames, snap3OjidTentativeCursorName)
|
2020-05-20 13:10:37 +02:00
|
|
|
require.Contains(ctx, bmNames, "2")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// check last-received hold moved
|
|
|
|
{
|
|
|
|
rAbs, rAbsErrs, err := endpoint.ListAbstractions(ctx, endpoint.ListZFSHoldsAndBookmarksQuery{
|
|
|
|
FS: endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{FS: &rfs},
|
|
|
|
JobID: &rjid,
|
|
|
|
What: endpoint.AbstractionTypesAll,
|
|
|
|
Concurrency: 1,
|
|
|
|
})
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
require.Len(ctx, rAbsErrs, 0)
|
|
|
|
require.Len(ctx, rAbs, 1)
|
|
|
|
require.Equal(ctx, rAbs[0].GetType(), endpoint.AbstractionLastReceivedHold)
|
|
|
|
require.Equal(ctx, *rAbs[0].GetJobID(), rjid)
|
|
|
|
require.Equal(ctx, rAbs[0].GetFilesystemVersion().GetGuid(), snap5.GetGuid())
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
fix handling of tenative cursor presence if protection strategy doesn't use it (#714)
Before this PR, we would panic in the `check` phase of `endpoint.Send()`'s `TryBatchDestroy` call in the following cases: the current protection strategy does NOT produce a tentative replication cursor AND
* `FromVersion` is a tentative cursor bookmark
* `FromVersion` is a snapshot, and there exists a tentative cursor bookmark for that snapshot
* `FromVersion` is a bookmark != tentative cursor bookmark, but there exists a tentative cursor bookmark for the same snapshot as the `FromVersion` bookmark
In those cases, the `check` concluded that we would delete `FromVersion`.
It came to that conclusion because the tentative cursor isn't part of `obsoleteAbs` if the protection strategy doesn't produce a tentative replication cursor.
The scenarios above can happen if the user changes the protection strategy from "with tentative cursor" to one "without tentative replication cursor", while there is a tentative replication cursor on disk.
The workaround was to rename the tentative cursor.
In all cases above, `TryBatchDestroy` would have destroyed the tentative cursor.
In case 1, that would fail the `Send` step and potentially break replication if the cursor is the last common bookmark. The `check` conclusion was correct.
In cases 2 and 3, deleting the tentative cursor would have been fine because `FromVersion` was a different entity than the tentative cursor. So, destroying the tentative cursor would be the right call.
The solution in this PR is as follows:
* add the `FromVersion` to the `liveAbs` set of live abstractions
* rewrite the `check` closure to use the full dataset path (`fullpath`) to identify the concrete ZFS object instead of the `zfs.FilesystemVersionEqualIdentity`, which is only identified by matching GUID.
* Holds have no dataset path and are not the `FromVersion` in any case, so disregard them.
fixes #666
2023-07-04 20:21:48 +02:00
|
|
|
func ReplicationIncrementalHandlesFromVersionEqTentativeCursorCorrectly(ctx *platformtest.Context) {
|
|
|
|
|
|
|
|
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
|
|
|
|
CREATEROOT
|
|
|
|
+ "sender"
|
|
|
|
+ "sender@1"
|
|
|
|
+ "receiver"
|
|
|
|
R zfs create -p "${ROOTDS}/receiver/${ROOTDS}"
|
|
|
|
`)
|
|
|
|
|
|
|
|
sjid := endpoint.MustMakeJobID("sender-job")
|
|
|
|
rjid := endpoint.MustMakeJobID("receiver-job")
|
|
|
|
|
|
|
|
sfs := ctx.RootDataset + "/sender"
|
|
|
|
rfsRoot := ctx.RootDataset + "/receiver"
|
|
|
|
|
|
|
|
rep := replicationInvocation{
|
|
|
|
sjid: sjid,
|
|
|
|
rjid: rjid,
|
|
|
|
sfs: sfs,
|
|
|
|
rfsRoot: rfsRoot,
|
|
|
|
// It doesn't really matter what guarantee we use here, as the second replication will configure another.
|
|
|
|
// But, in the real world, the only way for a stale tentative cursor to appear is if the guarantee is set to
|
|
|
|
// incremental replication and we crash before converting the tentative cursor into a regular cursor.
|
|
|
|
guarantee: pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeIncrementalReplication),
|
|
|
|
}
|
|
|
|
|
|
|
|
// Do initial replication to set up the test.
|
|
|
|
rep1 := rep.Do(ctx)
|
|
|
|
ctx.Logf("\n%s", pretty.Sprint(rep1))
|
|
|
|
sfsDs := mustDatasetPath(sfs)
|
|
|
|
snap1_sender := mustGetFilesystemVersion(ctx, sfs+"@1")
|
|
|
|
snap1_replicationCursor_name, err := endpoint.ReplicationCursorBookmarkName(sfs, snap1_sender.Guid, sjid)
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
snap1_replicationCursor := mustGetFilesystemVersion(ctx, sfs+"#"+snap1_replicationCursor_name)
|
|
|
|
|
|
|
|
// The second replication will be done with a guarantee kind that doesn't create tentative cursors by itself.
|
|
|
|
// So, it would generally be right to clean up any tentative cursors on sfs since they're stale abstractions.
|
|
|
|
// However, if the cursor is used as the `from` version in any send step, we must not destroy it, as that
|
|
|
|
// would break incremental replication.
|
|
|
|
// NB: we only need to test the first step as all subsequent steps will be snapshot->snapshot.
|
|
|
|
rep.guarantee = pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeNothing)
|
|
|
|
// create the artificial cursor
|
|
|
|
snap1_tentativeCursor, err := endpoint.CreateTentativeReplicationCursor(ctx, sfs, snap1_sender, sjid)
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
endpoint.AbstractionsCacheInvalidate(sfs)
|
|
|
|
// remove other bookmarks of snap1, and snap1 itself, to force the replication planner to use the tentative cursor
|
|
|
|
err = zfs.ZFSDestroyFilesystemVersion(ctx, sfsDs, &snap1_sender)
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
err = zfs.ZFSDestroyFilesystemVersion(ctx, sfsDs, &snap1_replicationCursor)
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
versions, err := zfs.ZFSListFilesystemVersions(ctx, sfsDs, zfs.ListFilesystemVersionsOptions{})
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
require.Len(ctx, versions, 1)
|
|
|
|
require.Equal(ctx, versions[0].Guid, snap1_tentativeCursor.GetFilesystemVersion().Guid)
|
|
|
|
// create another snapshot so that replication does one incremental step `tentative_cursor` -> `@2`
|
|
|
|
mustSnapshot(ctx, sfs+"@2")
|
|
|
|
mustGetFilesystemVersion(ctx, sfs+"@2")
|
|
|
|
// do the replication
|
|
|
|
rep2 := rep.Do(ctx)
|
|
|
|
ctx.Logf("\n%s", pretty.Sprint(rep2))
|
|
|
|
|
|
|
|
// Ensure that the tentative cursor was used.
|
|
|
|
require.Len(ctx, rep2.Attempts, 1)
|
|
|
|
require.Equal(ctx, rep2.Attempts[0].State, report.AttemptDone)
|
|
|
|
require.Len(ctx, rep2.Attempts[0].Filesystems, 1)
|
|
|
|
require.Nil(ctx, rep2.Attempts[0].Filesystems[0].Error())
|
|
|
|
require.Len(ctx, rep2.Attempts[0].Filesystems[0].Steps, 1)
|
|
|
|
require.EqualValues(ctx, rep2.Attempts[0].Filesystems[0].CurrentStep, 1)
|
|
|
|
require.Len(ctx, rep2.Attempts[0].Filesystems[0].Steps, 1)
|
|
|
|
require.Equal(ctx, rep2.Attempts[0].Filesystems[0].Steps[0].Info.From, snap1_tentativeCursor.GetFilesystemVersion().RelName())
|
|
|
|
|
|
|
|
// Ensure that the tentative cursor was destroyed as part of SendPost.
|
|
|
|
_, err = zfs.ZFSGetFilesystemVersion(ctx, snap1_replicationCursor.FullPath(sfs))
|
|
|
|
_, ok := err.(*zfs.DatasetDoesNotExist)
|
|
|
|
require.True(ctx, ok)
|
|
|
|
}
|
|
|
|
|
2020-05-24 18:18:02 +02:00
|
|
|
type PartialSender struct {
|
|
|
|
*endpoint.Sender
|
|
|
|
failAfterByteCount int64
|
|
|
|
}
|
|
|
|
|
|
|
|
var _ logic.Sender = (*PartialSender)(nil)
|
|
|
|
|
|
|
|
func (s *PartialSender) Send(ctx context.Context, r *pdu.SendReq) (r1 *pdu.SendRes, r2 io.ReadCloser, r3 error) {
|
|
|
|
r1, r2, r3 = s.Sender.Send(ctx, r)
|
|
|
|
r2 = limitio.ReadCloser(r2, s.failAfterByteCount)
|
|
|
|
return r1, r2, r3
|
|
|
|
}
|
|
|
|
|
2020-06-27 23:53:33 +02:00
|
|
|
func ReplicationIsResumableFullSend__both_GuaranteeResumability(ctx *platformtest.Context) {
|
|
|
|
|
|
|
|
setup := replicationIsResumableFullSendSetup{
|
2021-01-24 23:31:45 +01:00
|
|
|
protection: &pdu.ReplicationConfigProtection{
|
2020-06-27 23:53:33 +02:00
|
|
|
Initial: pdu.ReplicationGuaranteeKind_GuaranteeResumability,
|
|
|
|
Incremental: pdu.ReplicationGuaranteeKind_GuaranteeResumability,
|
|
|
|
},
|
|
|
|
expectDatasetIsBusyErrorWhenDestroySnapshotWhilePartiallyReplicated: true,
|
|
|
|
expectAllThreeSnapshotsToThreeBePresentAfterLoop: true,
|
|
|
|
expectNoSnapshotsOnReceiverAfterLoop: false,
|
|
|
|
}
|
|
|
|
|
|
|
|
implReplicationIsResumableFullSend(ctx, setup)
|
2020-06-01 14:39:59 +02:00
|
|
|
}
|
|
|
|
|
2020-06-27 23:53:33 +02:00
|
|
|
func ReplicationIsResumableFullSend__initial_GuaranteeResumability_incremental_GuaranteeIncrementalReplication(ctx *platformtest.Context) {
|
|
|
|
|
|
|
|
setup := replicationIsResumableFullSendSetup{
|
2021-01-24 23:31:45 +01:00
|
|
|
protection: &pdu.ReplicationConfigProtection{
|
2020-06-27 23:53:33 +02:00
|
|
|
Initial: pdu.ReplicationGuaranteeKind_GuaranteeResumability,
|
|
|
|
Incremental: pdu.ReplicationGuaranteeKind_GuaranteeIncrementalReplication,
|
|
|
|
},
|
|
|
|
expectDatasetIsBusyErrorWhenDestroySnapshotWhilePartiallyReplicated: true,
|
|
|
|
expectAllThreeSnapshotsToThreeBePresentAfterLoop: true,
|
|
|
|
expectNoSnapshotsOnReceiverAfterLoop: false,
|
|
|
|
}
|
|
|
|
|
|
|
|
implReplicationIsResumableFullSend(ctx, setup)
|
2020-06-01 14:39:59 +02:00
|
|
|
}
|
|
|
|
|
2020-06-27 23:53:33 +02:00
|
|
|
func ReplicationIsResumableFullSend__initial_GuaranteeIncrementalReplication_incremental_GuaranteeIncrementalReplication(ctx *platformtest.Context) {
|
|
|
|
|
|
|
|
setup := replicationIsResumableFullSendSetup{
|
2021-01-24 23:31:45 +01:00
|
|
|
protection: &pdu.ReplicationConfigProtection{
|
2020-06-27 23:53:33 +02:00
|
|
|
Initial: pdu.ReplicationGuaranteeKind_GuaranteeIncrementalReplication,
|
|
|
|
Incremental: pdu.ReplicationGuaranteeKind_GuaranteeIncrementalReplication,
|
|
|
|
},
|
|
|
|
expectDatasetIsBusyErrorWhenDestroySnapshotWhilePartiallyReplicated: false,
|
|
|
|
expectAllThreeSnapshotsToThreeBePresentAfterLoop: false,
|
|
|
|
expectNoSnapshotsOnReceiverAfterLoop: true,
|
|
|
|
}
|
|
|
|
|
|
|
|
implReplicationIsResumableFullSend(ctx, setup)
|
|
|
|
}
|
|
|
|
|
|
|
|
type replicationIsResumableFullSendSetup struct {
|
2021-01-24 23:31:45 +01:00
|
|
|
protection *pdu.ReplicationConfigProtection
|
2020-06-27 23:53:33 +02:00
|
|
|
expectDatasetIsBusyErrorWhenDestroySnapshotWhilePartiallyReplicated bool
|
|
|
|
expectAllThreeSnapshotsToThreeBePresentAfterLoop bool
|
|
|
|
expectNoSnapshotsOnReceiverAfterLoop bool
|
|
|
|
}
|
|
|
|
|
|
|
|
func implReplicationIsResumableFullSend(ctx *platformtest.Context, setup replicationIsResumableFullSendSetup) {
|
2020-05-24 18:18:02 +02:00
|
|
|
|
|
|
|
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
|
|
|
|
CREATEROOT
|
|
|
|
+ "sender"
|
|
|
|
+ "receiver"
|
|
|
|
R zfs create -p "${ROOTDS}/receiver/${ROOTDS}"
|
|
|
|
`)
|
|
|
|
|
|
|
|
sjid := endpoint.MustMakeJobID("sender-job")
|
|
|
|
rjid := endpoint.MustMakeJobID("receiver-job")
|
|
|
|
|
|
|
|
sfs := ctx.RootDataset + "/sender"
|
|
|
|
rfsRoot := ctx.RootDataset + "/receiver"
|
|
|
|
|
|
|
|
sfsmp, err := zfs.ZFSGetMountpoint(ctx, sfs)
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
require.True(ctx, sfsmp.Mounted)
|
|
|
|
|
|
|
|
writeDummyData(path.Join(sfsmp.Mountpoint, "dummy.data"), 1<<22)
|
|
|
|
mustSnapshot(ctx, sfs+"@1")
|
|
|
|
snap1 := fsversion(ctx, sfs, "@1")
|
|
|
|
|
|
|
|
rep := replicationInvocation{
|
|
|
|
sjid: sjid,
|
|
|
|
rjid: rjid,
|
|
|
|
sfs: sfs,
|
|
|
|
rfsRoot: rfsRoot,
|
|
|
|
interceptSender: func(e *endpoint.Sender) logic.Sender {
|
|
|
|
return &PartialSender{Sender: e, failAfterByteCount: 1 << 20}
|
|
|
|
},
|
2020-06-27 23:53:33 +02:00
|
|
|
guarantee: setup.protection,
|
2020-05-24 18:18:02 +02:00
|
|
|
}
|
2020-06-27 23:53:33 +02:00
|
|
|
|
2020-05-24 18:18:02 +02:00
|
|
|
rfs := rep.ReceiveSideFilesystem()
|
|
|
|
|
|
|
|
for i := 2; i < 10; i++ {
|
|
|
|
report := rep.Do(ctx)
|
|
|
|
ctx.Logf("\n%s", pretty.Sprint(report))
|
|
|
|
|
|
|
|
// always attempt to destroy the incremental source
|
|
|
|
err := zfs.ZFSDestroy(ctx, snap1.FullPath(sfs))
|
|
|
|
if i < 4 {
|
|
|
|
// we configured the PartialSender to fail after 1<<20 bytes
|
|
|
|
// and we wrote dummy data 1<<22 bytes, thus at least
|
|
|
|
// for the first 4 times this should not be possible
|
|
|
|
// due to step holds
|
2020-06-27 23:53:33 +02:00
|
|
|
if setup.expectDatasetIsBusyErrorWhenDestroySnapshotWhilePartiallyReplicated {
|
|
|
|
ctx.Logf("i=%v", i)
|
|
|
|
require.Error(ctx, err)
|
|
|
|
require.Contains(ctx, err.Error(), "dataset is busy")
|
|
|
|
}
|
2020-05-24 18:18:02 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// and create some additional snapshots that could
|
|
|
|
// confuse a naive implementation that doesn't take into
|
|
|
|
// account resume state when planning replication
|
|
|
|
if i == 2 || i == 3 {
|
|
|
|
// no significant size to avoid making this test run longer than necessary
|
|
|
|
mustSnapshot(ctx, fmt.Sprintf("%s@%d", sfs, i))
|
|
|
|
}
|
|
|
|
|
|
|
|
require.Len(ctx, report.Attempts, 1)
|
|
|
|
require.Nil(ctx, report.Attempts[0].PlanError)
|
|
|
|
require.Len(ctx, report.Attempts[0].Filesystems, 1)
|
|
|
|
if len(report.Attempts[0].Filesystems[0].Steps) == 0 {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-06-27 23:53:33 +02:00
|
|
|
if setup.expectAllThreeSnapshotsToThreeBePresentAfterLoop {
|
|
|
|
// make sure all the filesystem versions we created
|
|
|
|
// were replicated by the replication loop
|
|
|
|
_ = fsversion(ctx, rfs, "@1")
|
|
|
|
_ = fsversion(ctx, rfs, "@2")
|
|
|
|
_ = fsversion(ctx, rfs, "@3")
|
|
|
|
}
|
|
|
|
|
|
|
|
if setup.expectNoSnapshotsOnReceiverAfterLoop {
|
|
|
|
versions, err := zfs.ZFSListFilesystemVersions(ctx, mustDatasetPath(rfs), zfs.ListFilesystemVersionsOptions{})
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
require.Empty(ctx, versions)
|
|
|
|
}
|
2020-05-24 18:18:02 +02:00
|
|
|
|
|
|
|
}
|
2020-06-01 14:39:59 +02:00
|
|
|
|
|
|
|
func ReplicationIncrementalDestroysStepHoldsIffIncrementalStepHoldsAreDisabledButStepHoldsExist(ctx *platformtest.Context) {
|
|
|
|
|
|
|
|
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
|
|
|
|
CREATEROOT
|
|
|
|
+ "sender"
|
|
|
|
+ "receiver"
|
|
|
|
R zfs create -p "${ROOTDS}/receiver/${ROOTDS}"
|
|
|
|
`)
|
|
|
|
|
|
|
|
sjid := endpoint.MustMakeJobID("sender-job")
|
|
|
|
rjid := endpoint.MustMakeJobID("receiver-job")
|
|
|
|
|
|
|
|
sfs := ctx.RootDataset + "/sender"
|
|
|
|
rfsRoot := ctx.RootDataset + "/receiver"
|
|
|
|
|
|
|
|
// fully replicate snapshots @1
|
|
|
|
{
|
|
|
|
mustSnapshot(ctx, sfs+"@1")
|
|
|
|
rep := replicationInvocation{
|
2020-06-27 23:53:33 +02:00
|
|
|
sjid: sjid,
|
|
|
|
rjid: rjid,
|
|
|
|
sfs: sfs,
|
|
|
|
rfsRoot: rfsRoot,
|
2021-01-24 23:31:45 +01:00
|
|
|
guarantee: pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeResumability),
|
2020-06-01 14:39:59 +02:00
|
|
|
}
|
|
|
|
rfs := rep.ReceiveSideFilesystem()
|
|
|
|
report := rep.Do(ctx)
|
|
|
|
ctx.Logf("\n%s", pretty.Sprint(report))
|
|
|
|
// assert this worked (not the main subject of the test)
|
|
|
|
_ = fsversion(ctx, rfs, "@1")
|
|
|
|
}
|
|
|
|
|
|
|
|
// create a large snapshot @2
|
|
|
|
{
|
|
|
|
sfsmp, err := zfs.ZFSGetMountpoint(ctx, sfs)
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
require.True(ctx, sfsmp.Mounted)
|
|
|
|
writeDummyData(path.Join(sfsmp.Mountpoint, "dummy.data"), 1<<22)
|
|
|
|
mustSnapshot(ctx, sfs+"@2")
|
|
|
|
}
|
|
|
|
snap2sfs := fsversion(ctx, sfs, "@2")
|
|
|
|
|
|
|
|
// partially replicate snapshots @2 with step holds enabled
|
|
|
|
// to effect a step-holds situation
|
|
|
|
{
|
|
|
|
rep := replicationInvocation{
|
2020-06-27 23:53:33 +02:00
|
|
|
sjid: sjid,
|
|
|
|
rjid: rjid,
|
|
|
|
sfs: sfs,
|
|
|
|
rfsRoot: rfsRoot,
|
2021-01-24 23:31:45 +01:00
|
|
|
guarantee: pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeResumability), // !
|
2020-06-01 14:39:59 +02:00
|
|
|
interceptSender: func(e *endpoint.Sender) logic.Sender {
|
|
|
|
return &PartialSender{Sender: e, failAfterByteCount: 1 << 20}
|
|
|
|
},
|
|
|
|
}
|
|
|
|
rfs := rep.ReceiveSideFilesystem()
|
|
|
|
report := rep.Do(ctx)
|
|
|
|
ctx.Logf("\n%s", pretty.Sprint(report))
|
|
|
|
// assert this partial receive worked
|
|
|
|
_, err := zfs.ZFSGetFilesystemVersion(ctx, rfs+"@2")
|
|
|
|
ctx.Logf("%T %s", err, err)
|
|
|
|
_, notFullyReceived := err.(*zfs.DatasetDoesNotExist)
|
|
|
|
require.True(ctx, notFullyReceived)
|
|
|
|
// assert step holds are in place
|
|
|
|
abs, absErrs, err := endpoint.ListAbstractions(ctx, endpoint.ListZFSHoldsAndBookmarksQuery{
|
|
|
|
FS: endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{
|
|
|
|
FS: &sfs,
|
|
|
|
},
|
|
|
|
Concurrency: 1,
|
|
|
|
JobID: &sjid,
|
|
|
|
What: endpoint.AbstractionTypeSet{endpoint.AbstractionStepHold: true},
|
|
|
|
})
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
require.Empty(ctx, absErrs)
|
|
|
|
require.Len(ctx, abs, 2)
|
|
|
|
sort.Slice(abs, func(i, j int) bool {
|
|
|
|
return abs[i].GetCreateTXG() < abs[j].GetCreateTXG()
|
|
|
|
})
|
|
|
|
require.True(ctx, zfs.FilesystemVersionEqualIdentity(abs[0].GetFilesystemVersion(), fsversion(ctx, sfs, "@1")))
|
|
|
|
require.True(ctx, zfs.FilesystemVersionEqualIdentity(abs[1].GetFilesystemVersion(), fsversion(ctx, sfs, "@2")))
|
|
|
|
}
|
|
|
|
|
|
|
|
//
|
|
|
|
// end of test setup
|
|
|
|
//
|
|
|
|
|
2020-06-27 23:53:33 +02:00
|
|
|
// retry replication with incremental step holds disabled (set to bookmarks-only in this case)
|
2020-06-01 14:39:59 +02:00
|
|
|
// - replication should not fail due to holds-related stuff
|
|
|
|
// - replication should fail intermittently due to partial sender being fully read
|
|
|
|
// - the partial sender is 1/4th the length of the stream, thus expect
|
|
|
|
// successful replication after 5 more attempts
|
|
|
|
rep := replicationInvocation{
|
2020-06-27 23:53:33 +02:00
|
|
|
sjid: sjid,
|
|
|
|
rjid: rjid,
|
|
|
|
sfs: sfs,
|
|
|
|
rfsRoot: rfsRoot,
|
2021-01-24 23:31:45 +01:00
|
|
|
guarantee: pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeIncrementalReplication), // !
|
2020-06-01 14:39:59 +02:00
|
|
|
interceptSender: func(e *endpoint.Sender) logic.Sender {
|
|
|
|
return &PartialSender{Sender: e, failAfterByteCount: 1 << 20}
|
|
|
|
},
|
|
|
|
}
|
|
|
|
rfs := rep.ReceiveSideFilesystem()
|
|
|
|
for i := 0; ; i++ {
|
|
|
|
require.True(ctx, i < 5)
|
|
|
|
report := rep.Do(ctx)
|
|
|
|
ctx.Logf("retry run=%v\n%s", i, pretty.Sprint(report))
|
|
|
|
_, err := zfs.ZFSGetFilesystemVersion(ctx, rfs+"@2")
|
|
|
|
if err == nil {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// assert replication worked
|
|
|
|
fsversion(ctx, rfs, "@2")
|
|
|
|
|
|
|
|
// assert no step holds exist
|
|
|
|
abs, absErrs, err := endpoint.ListAbstractions(ctx, endpoint.ListZFSHoldsAndBookmarksQuery{
|
|
|
|
FS: endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{
|
|
|
|
FS: &sfs,
|
|
|
|
},
|
|
|
|
Concurrency: 1,
|
|
|
|
JobID: &sjid,
|
|
|
|
What: endpoint.AbstractionTypeSet{endpoint.AbstractionStepHold: true},
|
|
|
|
})
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
require.Empty(ctx, absErrs)
|
|
|
|
require.Len(ctx, abs, 0)
|
|
|
|
|
|
|
|
// assert that the replication cursor bookmark exists
|
|
|
|
abs, absErrs, err = endpoint.ListAbstractions(ctx, endpoint.ListZFSHoldsAndBookmarksQuery{
|
|
|
|
FS: endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{
|
|
|
|
FS: &sfs,
|
|
|
|
},
|
|
|
|
Concurrency: 1,
|
|
|
|
JobID: &sjid,
|
|
|
|
What: endpoint.AbstractionTypeSet{endpoint.AbstractionReplicationCursorBookmarkV2: true},
|
|
|
|
})
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
require.Empty(ctx, absErrs)
|
|
|
|
require.Len(ctx, abs, 1)
|
|
|
|
require.True(ctx, zfs.FilesystemVersionEqualIdentity(abs[0].GetFilesystemVersion(), snap2sfs))
|
|
|
|
}
|
2020-06-27 23:53:33 +02:00
|
|
|
|
|
|
|
func ReplicationStepCompletedLostBehavior__GuaranteeResumability(ctx *platformtest.Context) {
|
|
|
|
scenario := replicationStepCompletedLostBehavior_impl(ctx, pdu.ReplicationGuaranteeKind_GuaranteeResumability)
|
|
|
|
|
|
|
|
require.Error(ctx, scenario.deleteSfs1Err, "protected by holds")
|
|
|
|
require.Contains(ctx, scenario.deleteSfs1Err.Error(), "dataset is busy")
|
|
|
|
|
|
|
|
require.Error(ctx, scenario.deleteSfs2Err, "protected by holds")
|
|
|
|
require.Contains(ctx, scenario.deleteSfs2Err.Error(), "dataset is busy")
|
|
|
|
|
|
|
|
require.Nil(ctx, scenario.finalReport.Error())
|
|
|
|
_ = fsversion(ctx, scenario.rfs, "@3") // @3 ade it to the other side
|
|
|
|
}
|
|
|
|
|
|
|
|
func ReplicationStepCompletedLostBehavior__GuaranteeIncrementalReplication(ctx *platformtest.Context) {
|
|
|
|
scenario := replicationStepCompletedLostBehavior_impl(ctx, pdu.ReplicationGuaranteeKind_GuaranteeIncrementalReplication)
|
|
|
|
|
|
|
|
require.NoError(ctx, scenario.deleteSfs1Err, "not protected by holds")
|
|
|
|
require.NoError(ctx, scenario.deleteSfs2Err, "not protected by holds")
|
|
|
|
|
|
|
|
// step bookmarks should protect against loss of StepCompleted message
|
|
|
|
require.Nil(ctx, scenario.finalReport.Error())
|
|
|
|
_ = fsversion(ctx, scenario.rfs, "@3") // @3 ade it to the other side
|
|
|
|
}
|
|
|
|
|
|
|
|
type FailSendCompletedSender struct {
|
|
|
|
*endpoint.Sender
|
|
|
|
}
|
|
|
|
|
|
|
|
var _ logic.Sender = (*FailSendCompletedSender)(nil)
|
|
|
|
|
|
|
|
func (p *FailSendCompletedSender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) {
|
|
|
|
return nil, fmt.Errorf("[mock] SendCompleted not delivered to actual endpoint")
|
|
|
|
}
|
|
|
|
|
|
|
|
type replicationStepCompletedLost_scenario struct {
|
|
|
|
rfs string
|
|
|
|
deleteSfs1Err, deleteSfs2Err error
|
|
|
|
finalReport *report.FilesystemReport
|
|
|
|
}
|
|
|
|
|
|
|
|
func replicationStepCompletedLostBehavior_impl(ctx *platformtest.Context, guaranteeKind pdu.ReplicationGuaranteeKind) *replicationStepCompletedLost_scenario {
|
|
|
|
|
|
|
|
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
|
|
|
|
CREATEROOT
|
|
|
|
+ "sender"
|
|
|
|
+ "receiver"
|
|
|
|
R zfs create -p "${ROOTDS}/receiver/${ROOTDS}"
|
|
|
|
`)
|
|
|
|
|
|
|
|
sjid := endpoint.MustMakeJobID("sender-job")
|
|
|
|
rjid := endpoint.MustMakeJobID("receiver-job")
|
|
|
|
|
|
|
|
sfs := ctx.RootDataset + "/sender"
|
|
|
|
rfsRoot := ctx.RootDataset + "/receiver"
|
|
|
|
|
|
|
|
// fully replicate snapshots @1
|
|
|
|
{
|
|
|
|
mustSnapshot(ctx, sfs+"@1")
|
|
|
|
rep := replicationInvocation{
|
|
|
|
sjid: sjid,
|
|
|
|
rjid: rjid,
|
|
|
|
sfs: sfs,
|
|
|
|
rfsRoot: rfsRoot,
|
2021-01-24 23:31:45 +01:00
|
|
|
guarantee: pdu.ReplicationConfigProtectionWithKind(guaranteeKind),
|
2020-06-27 23:53:33 +02:00
|
|
|
}
|
|
|
|
rfs := rep.ReceiveSideFilesystem()
|
|
|
|
report := rep.Do(ctx)
|
|
|
|
ctx.Logf("\n%s", pretty.Sprint(report))
|
|
|
|
// assert this worked (not the main subject of the test)
|
|
|
|
_ = fsversion(ctx, rfs, "@1")
|
|
|
|
}
|
|
|
|
|
|
|
|
// create a second snapshot @2
|
|
|
|
mustSnapshot(ctx, sfs+"@2")
|
|
|
|
|
|
|
|
// fake loss of stepcompleted message
|
|
|
|
rep := replicationInvocation{
|
|
|
|
sjid: sjid,
|
|
|
|
rjid: rjid,
|
|
|
|
sfs: sfs,
|
|
|
|
rfsRoot: rfsRoot,
|
2021-01-24 23:31:45 +01:00
|
|
|
guarantee: pdu.ReplicationConfigProtectionWithKind(guaranteeKind),
|
2020-06-27 23:53:33 +02:00
|
|
|
interceptSender: func(e *endpoint.Sender) logic.Sender {
|
|
|
|
return &FailSendCompletedSender{e}
|
|
|
|
},
|
|
|
|
}
|
|
|
|
rfs := rep.ReceiveSideFilesystem()
|
|
|
|
report := rep.Do(ctx)
|
|
|
|
ctx.Logf("\n%s", pretty.Sprint(report))
|
|
|
|
|
|
|
|
// assert the replication worked
|
|
|
|
_ = fsversion(ctx, rfs, "@2")
|
|
|
|
// and that we hold it using a last-received-hold
|
|
|
|
abs, absErrs, err := endpoint.ListAbstractions(ctx, endpoint.ListZFSHoldsAndBookmarksQuery{
|
|
|
|
FS: endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{
|
|
|
|
FS: &rfs,
|
|
|
|
},
|
|
|
|
Concurrency: 1,
|
|
|
|
JobID: &rjid,
|
|
|
|
What: endpoint.AbstractionTypeSet{endpoint.AbstractionLastReceivedHold: true},
|
|
|
|
})
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
require.Empty(ctx, absErrs)
|
|
|
|
require.Len(ctx, abs, 1)
|
|
|
|
require.True(ctx, zfs.FilesystemVersionEqualIdentity(abs[0].GetFilesystemVersion(), fsversion(ctx, rfs, "@2")))
|
|
|
|
|
|
|
|
// now try to delete @2 on the sender, this should work because don't have step holds on it
|
|
|
|
deleteSfs2Err := zfs.ZFSDestroy(ctx, sfs+"@2")
|
|
|
|
// defer check to caller
|
|
|
|
|
|
|
|
// and create a new snapshot on the sender
|
|
|
|
mustSnapshot(ctx, sfs+"@3")
|
|
|
|
|
|
|
|
// now we have: sender @1, @3
|
|
|
|
// recver @1, @2
|
|
|
|
|
|
|
|
// delete @1 on both sides to demonstrate that, if we didn't have bookmarks, we would be out of sync
|
|
|
|
deleteSfs1Err := zfs.ZFSDestroy(ctx, sfs+"@1")
|
|
|
|
// defer check to caller
|
|
|
|
err = zfs.ZFSDestroy(ctx, rfs+"@1")
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
|
|
|
|
// attempt replication and return the filesystem report report
|
|
|
|
{
|
|
|
|
rep := replicationInvocation{
|
|
|
|
sjid: sjid,
|
|
|
|
rjid: rjid,
|
|
|
|
sfs: sfs,
|
|
|
|
rfsRoot: rfsRoot,
|
2021-01-24 23:31:45 +01:00
|
|
|
guarantee: pdu.ReplicationConfigProtectionWithKind(guaranteeKind),
|
2020-06-27 23:53:33 +02:00
|
|
|
}
|
|
|
|
report := rep.Do(ctx)
|
|
|
|
ctx.Logf("expecting failure:\n%s", pretty.Sprint(report))
|
|
|
|
require.Len(ctx, report.Attempts, 1)
|
|
|
|
require.Len(ctx, report.Attempts[0].Filesystems, 1)
|
|
|
|
return &replicationStepCompletedLost_scenario{
|
|
|
|
rfs: rfs,
|
|
|
|
deleteSfs1Err: deleteSfs1Err,
|
|
|
|
deleteSfs2Err: deleteSfs2Err,
|
|
|
|
finalReport: report.Attempts[0].Filesystems[0],
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
2020-07-26 12:24:05 +02:00
|
|
|
|
|
|
|
type ErroringReceiver struct {
|
|
|
|
recvErr error
|
|
|
|
*endpoint.Receiver
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *ErroringReceiver) Receive(ctx context.Context, req *pdu.ReceiveReq, stream io.ReadCloser) (*pdu.ReceiveRes, error) {
|
|
|
|
return nil, r.recvErr
|
|
|
|
}
|
|
|
|
|
|
|
|
type NeverEndingSender struct {
|
|
|
|
*endpoint.Sender
|
|
|
|
}
|
|
|
|
|
2021-08-16 10:11:37 +02:00
|
|
|
func (s *NeverEndingSender) SendDry(ctx context.Context, req *pdu.SendReq) (r *pdu.SendRes, err error) {
|
|
|
|
r, _, err = s.sendImpl(ctx, req, true)
|
|
|
|
return r, err
|
|
|
|
}
|
|
|
|
|
2020-07-26 12:24:05 +02:00
|
|
|
func (s *NeverEndingSender) Send(ctx context.Context, req *pdu.SendReq) (r *pdu.SendRes, stream io.ReadCloser, _ error) {
|
2021-08-16 10:11:37 +02:00
|
|
|
return s.sendImpl(ctx, req, false)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *NeverEndingSender) sendImpl(ctx context.Context, req *pdu.SendReq, dry bool) (r *pdu.SendRes, stream io.ReadCloser, _ error) {
|
2020-07-26 12:24:05 +02:00
|
|
|
stream = nil
|
|
|
|
r = &pdu.SendRes{
|
|
|
|
UsedResumeToken: false,
|
|
|
|
ExpectedSize: 1 << 30,
|
|
|
|
}
|
2021-08-16 10:11:37 +02:00
|
|
|
if dry {
|
2020-07-26 12:24:05 +02:00
|
|
|
return r, stream, nil
|
|
|
|
}
|
|
|
|
dz, err := os.Open("/dev/zero")
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
return r, dz, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func ReplicationReceiverErrorWhileStillSending(ctx *platformtest.Context) {
|
|
|
|
|
|
|
|
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
|
|
|
|
CREATEROOT
|
|
|
|
+ "sender"
|
|
|
|
+ "sender@1"
|
|
|
|
+ "receiver"
|
|
|
|
R zfs create -p "${ROOTDS}/receiver/${ROOTDS}"
|
|
|
|
`)
|
|
|
|
|
|
|
|
sjid := endpoint.MustMakeJobID("sender-job")
|
|
|
|
rjid := endpoint.MustMakeJobID("receiver-job")
|
|
|
|
|
|
|
|
sfs := ctx.RootDataset + "/sender"
|
|
|
|
rfsRoot := ctx.RootDataset + "/receiver"
|
|
|
|
|
|
|
|
mockRecvErr := fmt.Errorf("YiezahK3thie8ahKiel5sah2uugei2ize1yi8feivuu7musoat")
|
|
|
|
|
|
|
|
rep := replicationInvocation{
|
|
|
|
sjid: sjid,
|
|
|
|
rjid: rjid,
|
|
|
|
sfs: sfs,
|
|
|
|
rfsRoot: rfsRoot,
|
2021-01-24 23:31:45 +01:00
|
|
|
guarantee: pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeNothing),
|
2020-07-26 12:24:05 +02:00
|
|
|
interceptReceiver: func(r *endpoint.Receiver) logic.Receiver {
|
|
|
|
return &ErroringReceiver{recvErr: mockRecvErr, Receiver: r}
|
|
|
|
},
|
|
|
|
interceptSender: func(s *endpoint.Sender) logic.Sender {
|
|
|
|
return &NeverEndingSender{s}
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
// first replication
|
|
|
|
report := rep.Do(ctx)
|
|
|
|
ctx.Logf("\n%s", pretty.Sprint(report))
|
|
|
|
|
|
|
|
require.Len(ctx, report.Attempts, 1)
|
|
|
|
attempt := report.Attempts[0]
|
|
|
|
require.Nil(ctx, attempt.PlanError)
|
|
|
|
require.Len(ctx, attempt.Filesystems, 1)
|
|
|
|
afs := attempt.Filesystems[0]
|
|
|
|
require.Nil(ctx, afs.PlanError)
|
|
|
|
require.Len(ctx, afs.Steps, 1)
|
|
|
|
require.Nil(ctx, afs.PlanError)
|
|
|
|
require.NotNil(ctx, afs.StepError)
|
|
|
|
require.Contains(ctx, afs.StepError.Err, mockRecvErr.Error())
|
|
|
|
}
|
2020-07-26 17:58:20 +02:00
|
|
|
|
|
|
|
func ReplicationFailingInitialParentProhibitsChildReplication(ctx *platformtest.Context) {
|
|
|
|
|
|
|
|
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
|
|
|
|
CREATEROOT
|
|
|
|
+ "sender"
|
|
|
|
+ "sender/a"
|
|
|
|
+ "sender/a/child"
|
|
|
|
+ "sender/aa"
|
|
|
|
+ "receiver"
|
|
|
|
R zfs create -p "${ROOTDS}/receiver/${ROOTDS}"
|
|
|
|
R zfs snapshot -r ${ROOTDS}/sender@initial
|
|
|
|
`)
|
|
|
|
|
|
|
|
sjid := endpoint.MustMakeJobID("sender-job")
|
|
|
|
rjid := endpoint.MustMakeJobID("receiver-job")
|
|
|
|
|
|
|
|
fsA := ctx.RootDataset + "/sender/a"
|
|
|
|
fsAChild := ctx.RootDataset + "/sender/a/child"
|
|
|
|
fsAA := ctx.RootDataset + "/sender/aa"
|
|
|
|
|
|
|
|
sfilter := filters.NewDatasetMapFilter(3, true)
|
2021-11-21 21:16:37 +01:00
|
|
|
mustAddToSFilter(ctx, sfilter, fsA)
|
|
|
|
mustAddToSFilter(ctx, sfilter, fsAChild)
|
|
|
|
mustAddToSFilter(ctx, sfilter, fsAA)
|
2020-07-26 17:58:20 +02:00
|
|
|
rfsRoot := ctx.RootDataset + "/receiver"
|
|
|
|
|
|
|
|
mockRecvErr := fmt.Errorf("yifae4ohPhaquaes0hohghiep9oufie4roo7quoWooluaj2ee8")
|
|
|
|
|
|
|
|
rep := replicationInvocation{
|
|
|
|
sjid: sjid,
|
|
|
|
rjid: rjid,
|
|
|
|
sfilter: sfilter,
|
|
|
|
rfsRoot: rfsRoot,
|
2021-01-24 23:31:45 +01:00
|
|
|
guarantee: pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeNothing),
|
2020-07-26 17:58:20 +02:00
|
|
|
interceptReceiver: func(r *endpoint.Receiver) logic.Receiver {
|
|
|
|
return &ErroringReceiver{recvErr: mockRecvErr, Receiver: r}
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
r := rep.Do(ctx)
|
|
|
|
ctx.Logf("\n%s", pretty.Sprint(r))
|
|
|
|
|
|
|
|
require.Len(ctx, r.Attempts, 1)
|
|
|
|
attempt := r.Attempts[0]
|
|
|
|
require.Nil(ctx, attempt.PlanError)
|
|
|
|
require.Len(ctx, attempt.Filesystems, 3)
|
|
|
|
|
|
|
|
fsByName := make(map[string]*report.FilesystemReport, len(attempt.Filesystems))
|
|
|
|
for _, fs := range attempt.Filesystems {
|
|
|
|
fsByName[fs.Info.Name] = fs
|
|
|
|
}
|
|
|
|
|
|
|
|
require.Contains(ctx, fsByName, fsA)
|
2020-09-07 01:20:57 +02:00
|
|
|
require.Contains(ctx, fsByName, fsAChild)
|
2020-07-26 17:58:20 +02:00
|
|
|
require.Contains(ctx, fsByName, fsAA)
|
|
|
|
|
|
|
|
checkFS := func(fs string, expectErrMsg string) {
|
|
|
|
rep := fsByName[fs]
|
|
|
|
require.Len(ctx, rep.Steps, 1)
|
|
|
|
require.Nil(ctx, rep.PlanError)
|
|
|
|
require.NotNil(ctx, rep.StepError)
|
|
|
|
require.Contains(ctx, rep.StepError.Err, expectErrMsg)
|
|
|
|
}
|
|
|
|
|
|
|
|
checkFS(fsA, mockRecvErr.Error())
|
|
|
|
checkFS(fsAChild, "parent(s) failed during initial replication")
|
|
|
|
checkFS(fsAA, mockRecvErr.Error()) // fsAA is not treated as a child of fsA
|
|
|
|
}
|
2020-09-07 01:20:57 +02:00
|
|
|
|
|
|
|
func ReplicationPropertyReplicationWorks(ctx *platformtest.Context) {
|
|
|
|
|
|
|
|
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
|
|
|
|
CREATEROOT
|
|
|
|
+ "sender"
|
|
|
|
+ "sender/a"
|
|
|
|
+ "sender/a@1"
|
|
|
|
+ "sender/a/child"
|
|
|
|
+ "sender/a/child@1"
|
|
|
|
+ "receiver"
|
2021-11-21 21:16:37 +01:00
|
|
|
R zfs create -p "${ROOTDS}/receiver/${ROOTDS}/sender"
|
2020-09-07 01:20:57 +02:00
|
|
|
`)
|
|
|
|
|
|
|
|
sjid := endpoint.MustMakeJobID("sender-job")
|
|
|
|
rjid := endpoint.MustMakeJobID("receiver-job")
|
|
|
|
|
|
|
|
fsA := ctx.RootDataset + "/sender/a"
|
|
|
|
fsAChild := ctx.RootDataset + "/sender/a/child"
|
|
|
|
|
|
|
|
sfilter := filters.NewDatasetMapFilter(2, true)
|
2021-11-21 21:16:37 +01:00
|
|
|
mustAddToSFilter(ctx, sfilter, fsA)
|
|
|
|
mustAddToSFilter(ctx, sfilter, fsAChild)
|
2020-09-07 01:20:57 +02:00
|
|
|
rfsRoot := ctx.RootDataset + "/receiver"
|
|
|
|
|
|
|
|
type testPropExpectation struct {
|
|
|
|
Exists bool
|
|
|
|
Source zfs.PropertySource
|
|
|
|
ExpectSpecialValue string
|
|
|
|
}
|
|
|
|
type testProp struct {
|
|
|
|
Name string
|
|
|
|
SetOnSender map[string]bool
|
|
|
|
ExpectReceiver map[string]testPropExpectation
|
|
|
|
}
|
|
|
|
testProps := []testProp{
|
|
|
|
{
|
|
|
|
Name: "zrepl:ignored",
|
|
|
|
SetOnSender: map[string]bool{fsA: true, fsAChild: true},
|
|
|
|
ExpectReceiver: map[string]testPropExpectation{
|
|
|
|
fsA: {
|
|
|
|
Exists: false,
|
|
|
|
},
|
|
|
|
fsAChild: {
|
|
|
|
Exists: false,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
{
|
|
|
|
Name: "zrepl:replicate",
|
|
|
|
SetOnSender: map[string]bool{fsA: true, fsAChild: true},
|
|
|
|
ExpectReceiver: map[string]testPropExpectation{
|
|
|
|
fsA: {Exists: true, Source: zfs.SourceReceived},
|
|
|
|
fsAChild: {Exists: true, Source: zfs.SourceReceived},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
{
|
|
|
|
Name: "zrepl:overridden",
|
|
|
|
SetOnSender: map[string]bool{fsA: false, fsAChild: true},
|
|
|
|
ExpectReceiver: map[string]testPropExpectation{
|
|
|
|
fsA: {Exists: true, Source: zfs.SourceLocal, ExpectSpecialValue: "overridden value"},
|
|
|
|
fsAChild: {Exists: true, Source: zfs.SourceLocal, ExpectSpecialValue: "overridden value"},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, prop := range testProps {
|
|
|
|
for fs := range prop.SetOnSender {
|
|
|
|
err := zfs.ZFSSet(ctx, mustDatasetPath(fs), map[string]string{prop.Name: prop.Name})
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
rep := replicationInvocation{
|
|
|
|
sjid: sjid,
|
|
|
|
rjid: rjid,
|
|
|
|
sfilter: sfilter,
|
|
|
|
rfsRoot: rfsRoot,
|
|
|
|
guarantee: pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeNothing),
|
|
|
|
receiverConfigHook: func(c *endpoint.ReceiverConfig) {
|
|
|
|
c.InheritProperties = []zfsprop.Property{"zrepl:ignored"}
|
|
|
|
c.OverrideProperties = map[zfsprop.Property]string{
|
|
|
|
"zrepl:overridden": "overridden value",
|
|
|
|
}
|
|
|
|
},
|
|
|
|
senderConfigHook: func(c *endpoint.SenderConfig) {
|
|
|
|
c.SendProperties = true // TODO: do another tier with SendBackupProperties
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
r := rep.Do(ctx)
|
|
|
|
ctx.Logf("\n%s", pretty.Sprint(r))
|
|
|
|
|
|
|
|
require.Len(ctx, r.Attempts, 1)
|
|
|
|
attempt := r.Attempts[0]
|
|
|
|
require.Nil(ctx, attempt.PlanError)
|
|
|
|
require.Len(ctx, attempt.Filesystems, 2)
|
|
|
|
|
|
|
|
fsByName := make(map[string]*report.FilesystemReport, len(attempt.Filesystems))
|
|
|
|
for _, fs := range attempt.Filesystems {
|
|
|
|
fsByName[fs.Info.Name] = fs
|
|
|
|
}
|
|
|
|
|
|
|
|
require.Contains(ctx, fsByName, fsA)
|
|
|
|
require.Contains(ctx, fsByName, fsAChild)
|
|
|
|
require.Len(ctx, fsByName, 2)
|
|
|
|
|
|
|
|
requireFSReportSucceeded := func(fs string) *zfs.DatasetPath {
|
|
|
|
rep := fsByName[fs]
|
|
|
|
require.Len(ctx, rep.Steps, 1)
|
|
|
|
require.Nil(ctx, rep.PlanError)
|
2021-08-22 20:11:15 +02:00
|
|
|
if rep.StepError != nil && strings.Contains(rep.StepError.Error(), "invalid option 'x'") {
|
|
|
|
ctx.SkipNow() // XXX feature detection
|
|
|
|
}
|
2020-09-07 01:20:57 +02:00
|
|
|
require.Nil(ctx, rep.StepError)
|
|
|
|
require.Len(ctx, rep.Steps, 1)
|
|
|
|
require.Equal(ctx, 1, rep.CurrentStep)
|
|
|
|
return mustDatasetPath(path.Join(rfsRoot, fs))
|
|
|
|
}
|
|
|
|
|
|
|
|
rfsA := requireFSReportSucceeded(fsA)
|
|
|
|
rfsAChild := requireFSReportSucceeded(fsAChild)
|
|
|
|
|
|
|
|
rfsmap := map[string]*zfs.DatasetPath{
|
|
|
|
fsA: rfsA,
|
|
|
|
fsAChild: rfsAChild,
|
|
|
|
}
|
|
|
|
|
|
|
|
for fs, rfs := range rfsmap {
|
|
|
|
for _, tp := range testProps {
|
|
|
|
r_, err := zfs.ZFSGet(ctx, rfs, []string{tp.Name})
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
r := r_.GetDetails(tp.Name)
|
|
|
|
|
|
|
|
expect := tp.ExpectReceiver[fs]
|
|
|
|
|
|
|
|
if !expect.Exists {
|
|
|
|
require.Equal(ctx, zfs.SourceNone, r.Source)
|
|
|
|
} else {
|
|
|
|
require.Equal(ctx, expect.Source, r.Source)
|
|
|
|
|
|
|
|
if expect.ExpectSpecialValue != "" {
|
|
|
|
require.Equal(ctx, expect.ExpectSpecialValue, r.Value)
|
|
|
|
} else {
|
|
|
|
require.Equal(ctx, tp.Name, r.Value)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-11-21 21:16:37 +01:00
|
|
|
|
|
|
|
func ReplicationPlaceholderEncryption__UnspecifiedLeadsToFailureAtRuntimeWhenCreatingPlaceholders(ctx *platformtest.Context) {
|
|
|
|
|
|
|
|
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
|
|
|
|
CREATEROOT
|
|
|
|
+ "sender"
|
|
|
|
+ "sender/a"
|
|
|
|
+ "sender/a/child"
|
|
|
|
+ "receiver"
|
|
|
|
R zfs snapshot -r ${ROOTDS}/sender@initial
|
|
|
|
`)
|
|
|
|
|
|
|
|
sjid := endpoint.MustMakeJobID("sender-job")
|
|
|
|
rjid := endpoint.MustMakeJobID("receiver-job")
|
|
|
|
|
|
|
|
childfs := ctx.RootDataset + "/sender/a/child"
|
|
|
|
|
|
|
|
sfilter := filters.NewDatasetMapFilter(3, true)
|
|
|
|
|
|
|
|
mustAddToSFilter(ctx, sfilter, childfs)
|
|
|
|
rfsRoot := ctx.RootDataset + "/receiver"
|
|
|
|
|
|
|
|
rep := replicationInvocation{
|
|
|
|
sjid: sjid,
|
|
|
|
rjid: rjid,
|
|
|
|
sfilter: sfilter,
|
|
|
|
rfsRoot: rfsRoot,
|
|
|
|
guarantee: pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeResumability),
|
|
|
|
receiverConfigHook: func(rc *endpoint.ReceiverConfig) {
|
|
|
|
rc.PlaceholderEncryption = endpoint.PlaceholderCreationEncryptionPropertyUnspecified
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
r := rep.Do(ctx)
|
|
|
|
ctx.Logf("\n%s", pretty.Sprint(r))
|
|
|
|
|
|
|
|
require.Len(ctx, r.Attempts, 1)
|
|
|
|
attempt := r.Attempts[0]
|
|
|
|
require.Nil(ctx, attempt.PlanError)
|
|
|
|
require.Len(ctx, attempt.Filesystems, 1)
|
|
|
|
|
|
|
|
afs := attempt.Filesystems[0]
|
|
|
|
require.Equal(ctx, childfs, afs.Info.Name)
|
|
|
|
|
|
|
|
require.Equal(ctx, 1, len(afs.Steps))
|
|
|
|
require.Equal(ctx, 0, afs.CurrentStep)
|
|
|
|
|
|
|
|
require.Equal(ctx, report.FilesystemSteppingErrored, afs.State)
|
|
|
|
|
|
|
|
childfsFirstComponent := strings.Split(childfs, "/")[0]
|
|
|
|
require.Contains(ctx, afs.StepError.Err, "cannot create placeholder filesystem "+rfsRoot+"/"+childfsFirstComponent+": placeholder filesystem encryption handling is unspecified in receiver config")
|
|
|
|
}
|
|
|
|
|
|
|
|
type ClientIdentityReceiver struct {
|
|
|
|
clientIdentity string
|
|
|
|
*endpoint.Receiver
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *ClientIdentityReceiver) Receive(ctx context.Context, req *pdu.ReceiveReq, stream io.ReadCloser) (*pdu.ReceiveRes, error) {
|
|
|
|
ctx = context.WithValue(ctx, endpoint.ClientIdentityKey, r.clientIdentity)
|
|
|
|
return r.Receiver.Receive(ctx, req, stream)
|
|
|
|
}
|
|
|
|
|
|
|
|
func ReplicationPlaceholderEncryption__UnspecifiedIsOkForClientIdentityPlaceholder(ctx *platformtest.Context) {
|
|
|
|
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
|
|
|
|
CREATEROOT
|
|
|
|
+ "receiver"
|
|
|
|
`)
|
|
|
|
|
|
|
|
sjid := endpoint.MustMakeJobID("sender-job")
|
|
|
|
rjid := endpoint.MustMakeJobID("receiver-job")
|
|
|
|
|
|
|
|
sfilter := filters.NewDatasetMapFilter(1, true)
|
|
|
|
|
|
|
|
// hacky...
|
|
|
|
comps := strings.Split(ctx.RootDataset, "/")
|
|
|
|
require.GreaterOrEqual(ctx, len(comps), 2)
|
|
|
|
pool := comps[0]
|
|
|
|
require.Contains(ctx, pool, "zreplplatformtest", "don't want to cause accidents")
|
|
|
|
poolchild := pool + "/" + comps[1]
|
|
|
|
|
|
|
|
err := zfs.ZFSSnapshot(ctx, mustDatasetPath(pool), "testsnap", false)
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
|
|
|
|
err = zfs.ZFSSnapshot(ctx, mustDatasetPath(poolchild), "testsnap", false)
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
|
|
|
|
mustAddToSFilter(ctx, sfilter, pool)
|
|
|
|
mustAddToSFilter(ctx, sfilter, poolchild)
|
|
|
|
|
|
|
|
clientIdentity := "testclientid"
|
|
|
|
|
|
|
|
rfsRoot := ctx.RootDataset + "/receiver"
|
|
|
|
|
|
|
|
rep := replicationInvocation{
|
|
|
|
sjid: sjid,
|
|
|
|
rjid: rjid,
|
|
|
|
sfilter: sfilter,
|
|
|
|
rfsRoot: rfsRoot,
|
|
|
|
guarantee: pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeResumability),
|
|
|
|
receiverConfigHook: func(rc *endpoint.ReceiverConfig) {
|
|
|
|
rc.PlaceholderEncryption = endpoint.PlaceholderCreationEncryptionPropertyUnspecified
|
|
|
|
rc.AppendClientIdentity = true
|
|
|
|
},
|
|
|
|
interceptReceiver: func(r *endpoint.Receiver) logic.Receiver {
|
|
|
|
r.Test_OverrideClientIdentityFunc = func() string { return clientIdentity }
|
|
|
|
return r
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
r := rep.Do(ctx)
|
|
|
|
ctx.Logf("\n%s", pretty.Sprint(r))
|
|
|
|
|
|
|
|
require.Len(ctx, r.Attempts, 1)
|
|
|
|
attempt := r.Attempts[0]
|
|
|
|
require.Nil(ctx, attempt.PlanError)
|
|
|
|
require.Len(ctx, attempt.Filesystems, 2)
|
|
|
|
|
|
|
|
filesystemsByName := make(map[string]*report.FilesystemReport)
|
|
|
|
for _, fs := range attempt.Filesystems {
|
|
|
|
filesystemsByName[fs.Info.Name] = fs
|
|
|
|
}
|
|
|
|
require.Len(ctx, filesystemsByName, len(attempt.Filesystems))
|
|
|
|
|
|
|
|
afs, ok := filesystemsByName[pool]
|
|
|
|
require.True(ctx, ok)
|
|
|
|
require.Nil(ctx, afs.PlanError)
|
|
|
|
require.Nil(ctx, afs.StepError)
|
|
|
|
require.Equal(ctx, report.FilesystemDone, afs.State)
|
|
|
|
|
|
|
|
afs, ok = filesystemsByName[poolchild]
|
|
|
|
require.True(ctx, ok)
|
|
|
|
require.Nil(ctx, afs.PlanError)
|
|
|
|
require.Nil(ctx, afs.StepError)
|
|
|
|
require.Equal(ctx, report.FilesystemDone, afs.State)
|
|
|
|
|
|
|
|
mustGetFilesystemVersion(ctx, rfsRoot+"/"+clientIdentity+"/"+pool+"@testsnap")
|
|
|
|
mustGetFilesystemVersion(ctx, rfsRoot+"/"+clientIdentity+"/"+poolchild+"@testsnap")
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
func replicationPlaceholderEncryption__EncryptOnReceiverUseCase__impl(ctx *platformtest.Context, placeholderEncryption endpoint.PlaceholderCreationEncryptionProperty) {
|
|
|
|
|
|
|
|
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
|
|
|
|
CREATEROOT
|
|
|
|
+ "sender"
|
|
|
|
+ "sender/a"
|
|
|
|
+ "sender/a/child"
|
|
|
|
+ "receiver" encrypted
|
|
|
|
R zfs snapshot -r ${ROOTDS}/sender@initial
|
|
|
|
`)
|
|
|
|
|
|
|
|
sjid := endpoint.MustMakeJobID("sender-job")
|
|
|
|
rjid := endpoint.MustMakeJobID("receiver-job")
|
|
|
|
|
|
|
|
childfs := ctx.RootDataset + "/sender/a/child"
|
|
|
|
|
|
|
|
sfilter := filters.NewDatasetMapFilter(3, true)
|
|
|
|
|
|
|
|
mustAddToSFilter(ctx, sfilter, childfs)
|
|
|
|
rfsRoot := ctx.RootDataset + "/receiver"
|
|
|
|
|
|
|
|
rep := replicationInvocation{
|
|
|
|
sjid: sjid,
|
|
|
|
rjid: rjid,
|
|
|
|
sfilter: sfilter,
|
|
|
|
rfsRoot: rfsRoot,
|
|
|
|
guarantee: pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeResumability),
|
|
|
|
receiverConfigHook: func(rc *endpoint.ReceiverConfig) {
|
|
|
|
rc.PlaceholderEncryption = placeholderEncryption
|
|
|
|
rc.AppendClientIdentity = false
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
r := rep.Do(ctx)
|
|
|
|
ctx.Logf("\n%s", pretty.Sprint(r))
|
|
|
|
|
|
|
|
require.Len(ctx, r.Attempts, 1)
|
|
|
|
attempt := r.Attempts[0]
|
|
|
|
require.Equal(ctx, report.AttemptDone, attempt.State)
|
|
|
|
require.Len(ctx, attempt.Filesystems, 1)
|
|
|
|
afs := attempt.Filesystems[0]
|
|
|
|
require.Equal(ctx, childfs, afs.Info.Name)
|
|
|
|
|
|
|
|
require.Equal(ctx, 1, len(afs.Steps))
|
|
|
|
|
|
|
|
rfs := mustDatasetPath(rfsRoot + "/" + childfs)
|
|
|
|
mustGetFilesystemVersion(ctx, rfs.ToString()+"@initial")
|
|
|
|
}
|
|
|
|
|
|
|
|
func ReplicationPlaceholderEncryption__EncryptOnReceiverUseCase__WorksIfConfiguredWithInherit(ctx *platformtest.Context) {
|
|
|
|
placeholderEncryption := endpoint.PlaceholderCreationEncryptionPropertyInherit
|
|
|
|
|
|
|
|
replicationPlaceholderEncryption__EncryptOnReceiverUseCase__impl(ctx, placeholderEncryption)
|
|
|
|
childfs := ctx.RootDataset + "/sender/a/child"
|
|
|
|
rfsRoot := ctx.RootDataset + "/receiver"
|
|
|
|
rfs := mustDatasetPath(rfsRoot + "/" + childfs)
|
|
|
|
|
|
|
|
// The leaf child dataset should be inhering from rfsRoot.
|
|
|
|
// If we had replicated with PlaceholderCreationEncryptionPropertyOff
|
|
|
|
// then it would be unencrypted and inherit from the placeholder.
|
|
|
|
props, err := zfs.ZFSGet(ctx, rfs, []string{"encryptionroot"})
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
require.Equal(ctx, rfsRoot, props.Get("encryptionroot"))
|
|
|
|
}
|
2022-05-01 14:46:38 +02:00
|
|
|
|
|
|
|
func replicationInitialImpl(ctx *platformtest.Context, iras logic.InitialReplicationAutoResolution, expectExactRfsSnaps []string) *report.Report {
|
|
|
|
// reverse order for snap names to expose sorting assumptions
|
|
|
|
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
|
|
|
|
CREATEROOT
|
|
|
|
+ "sender"
|
|
|
|
+ "sender@3"
|
|
|
|
+ "sender@2"
|
|
|
|
+ "sender@1"
|
|
|
|
+ "receiver"
|
|
|
|
R zfs create -p "${ROOTDS}/receiver/${ROOTDS}"
|
|
|
|
`)
|
|
|
|
|
|
|
|
sjid := endpoint.MustMakeJobID("sender-job")
|
|
|
|
rjid := endpoint.MustMakeJobID("receiver-job")
|
|
|
|
|
|
|
|
sfs := ctx.RootDataset + "/sender"
|
|
|
|
rfsRoot := ctx.RootDataset + "/receiver"
|
|
|
|
|
|
|
|
rep := replicationInvocation{
|
|
|
|
sjid: sjid,
|
|
|
|
rjid: rjid,
|
|
|
|
sfs: sfs,
|
|
|
|
rfsRoot: rfsRoot,
|
|
|
|
guarantee: pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeResumability),
|
|
|
|
plannerPolicyHook: func(pp *logic.PlannerPolicy) {
|
|
|
|
pp.ConflictResolution.InitialReplication = iras
|
|
|
|
},
|
|
|
|
}
|
|
|
|
rfs := rep.ReceiveSideFilesystem()
|
|
|
|
|
|
|
|
// first replication
|
|
|
|
report := rep.Do(ctx)
|
|
|
|
ctx.Logf("\n%s", pretty.Sprint(report))
|
|
|
|
|
|
|
|
versions, err := zfs.ZFSListFilesystemVersions(ctx, mustDatasetPath(rfs), zfs.ListFilesystemVersionsOptions{Types: zfs.Snapshots})
|
|
|
|
if _, ok := err.(*zfs.DatasetDoesNotExist); ok {
|
|
|
|
versions = nil
|
|
|
|
} else {
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
bySnapName := make(map[string]int)
|
|
|
|
for _, v := range versions {
|
|
|
|
bySnapName[v.GetName()] += 1
|
|
|
|
}
|
|
|
|
for _, v := range expectExactRfsSnaps {
|
|
|
|
bySnapName[v] -= 1
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, v := range bySnapName {
|
|
|
|
if v != 0 {
|
|
|
|
ctx.Logf("unexpected snaps:\n%#v", bySnapName)
|
|
|
|
ctx.FailNow()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return report
|
|
|
|
}
|
|
|
|
|
|
|
|
func ReplicationInitialAll(ctx *platformtest.Context) {
|
|
|
|
replicationInitialImpl(ctx, logic.InitialReplicationAutoResolutionAll, []string{"3", "2", "1"})
|
|
|
|
}
|
|
|
|
|
|
|
|
func ReplicationInitialMostRecent(ctx *platformtest.Context) {
|
|
|
|
replicationInitialImpl(ctx, logic.InitialReplicationAutoResolutionMostRecent, []string{"1"})
|
|
|
|
}
|
|
|
|
|
|
|
|
func ReplicationInitialFail(ctx *platformtest.Context) {
|
|
|
|
report := replicationInitialImpl(ctx, logic.InitialReplicationAutoResolutionFail, []string{})
|
|
|
|
require.Len(ctx, report.Attempts, 1)
|
|
|
|
require.Nil(ctx, report.Attempts[0].PlanError)
|
|
|
|
require.Len(ctx, report.Attempts[0].Filesystems, 1)
|
|
|
|
require.NotNil(ctx, report.Attempts[0].Filesystems[0].PlanError)
|
|
|
|
require.Contains(ctx, report.Attempts[0].Filesystems[0].PlanError.Err, "automatic conflict resolution for initial replication is disabled in config")
|
|
|
|
}
|
fix: replication of placeholder filesystems (#744)
fixes https://github.com/zrepl/zrepl/issues/742
Before this PR, when chaining replication from
A => B => C, if B had placeholders and the `filesystems`
included these placeholders, we'd incorrectly
fail the planning phase with error
`sender does not have any versions`.
The non-placeholder child filesystems of these placeholders
would then fail to replicate because of the
initial-replication-dependency-tracking that we do, i.e.,
their parent failed to initially replication, hence
they fail to replicate as well
(`parent(s) failed during initial replication`).
We can do better than that because we have the information
whether a sender-side filesystem is a placeholder.
This PR makes the planner act on that information.
The outcome is that placeholders are replicated as
placeholders (albeit the receiver remains in control
of how these placeholders are created, i.e., `recv.placeholders`)
The mechanism to do it is:
1. Don't plan any replication steps for filesystems that
are placeholders on the sender.
2. Ensure that, if a receiving-side filesystem exists, it
is indeed a placeholder.
Check (2) may seem overly restrictive, but, the goal here
is not just to mirror all non-placeholder filesystems, but
also to mirror the hierarchy.
Testing performed:
- [x] confirm with issue reporter that this PR fixes their issue
- [x] add a regression test that fails without the changes in this PR
2024-09-05 23:26:42 +02:00
|
|
|
|
|
|
|
// https://github.com/zrepl/zrepl/issues/742
|
|
|
|
func ReplicationOfPlaceholderFilesystemsInChainedReplicationScenario(ctx *platformtest.Context) {
|
|
|
|
|
|
|
|
//
|
|
|
|
// Setup datasets
|
|
|
|
//
|
|
|
|
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
|
|
|
|
CREATEROOT
|
|
|
|
+ "host1"
|
|
|
|
+ "host1/a"
|
|
|
|
+ "host1/a/b"
|
|
|
|
+ "host1/a/b@1"
|
|
|
|
+ "host2"
|
|
|
|
+ "host2/sink"
|
|
|
|
+ "host3"
|
|
|
|
+ "host3/sink"
|
|
|
|
`)
|
|
|
|
|
|
|
|
// Replicate host1/a to host2/sink
|
|
|
|
host1_a := ctx.RootDataset + "/host1/a"
|
|
|
|
host1_b := ctx.RootDataset + "/host1/a/b"
|
|
|
|
|
|
|
|
host2_sink := ctx.RootDataset + "/host2/sink"
|
|
|
|
host2_a := host2_sink + "/" + host1_a
|
|
|
|
host2_b := host2_sink + "/" + host1_b
|
|
|
|
|
|
|
|
host3_sink := ctx.RootDataset + "/host3/sink"
|
|
|
|
host3_a := host3_sink + "/" + host2_a
|
|
|
|
host3_b := host3_sink + "/" + host2_b
|
|
|
|
|
|
|
|
type job struct {
|
|
|
|
sender, receiver endpoint.JobID
|
|
|
|
receiver_root string
|
|
|
|
sender_filesystems_filter map[string]bool
|
|
|
|
}
|
|
|
|
h1_to_h2 := job{
|
|
|
|
sender: endpoint.MustMakeJobID("h1-to-h2-sender"),
|
|
|
|
receiver: endpoint.MustMakeJobID("h1-to-h2-receiver"),
|
|
|
|
sender_filesystems_filter: map[string]bool{
|
|
|
|
// omit host1_a so that it becomes a placeholder on host2
|
|
|
|
host1_b: true,
|
|
|
|
},
|
|
|
|
receiver_root: host2_sink,
|
|
|
|
}
|
|
|
|
h2_to_h3 := job{
|
|
|
|
sender: endpoint.MustMakeJobID("h2-to-h3-sender"),
|
|
|
|
receiver: endpoint.MustMakeJobID("h2-to-h3-receiver"),
|
|
|
|
sender_filesystems_filter: map[string]bool{
|
|
|
|
host2_sink: false,
|
|
|
|
host2_sink + "<": true,
|
|
|
|
},
|
|
|
|
receiver_root: host3_sink,
|
|
|
|
}
|
|
|
|
|
|
|
|
do_repl := func(j job) {
|
|
|
|
|
|
|
|
sfilter := filters.NewDatasetMapFilter(len(j.sender_filesystems_filter), true)
|
|
|
|
|
|
|
|
for lhs, rhs := range j.sender_filesystems_filter {
|
|
|
|
var err error
|
|
|
|
if rhs {
|
|
|
|
err = sfilter.Add(lhs, filters.MapFilterResultOk)
|
|
|
|
} else {
|
|
|
|
err = sfilter.Add(lhs, filters.MapFilterResultOmit)
|
|
|
|
}
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
rep := replicationInvocation{
|
|
|
|
sjid: j.sender,
|
|
|
|
rjid: j.receiver,
|
|
|
|
sfilter: sfilter,
|
|
|
|
rfsRoot: j.receiver_root,
|
|
|
|
guarantee: pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeResumability),
|
|
|
|
receiverConfigHook: func(rc *endpoint.ReceiverConfig) {
|
|
|
|
rc.PlaceholderEncryption = endpoint.PlaceholderCreationEncryptionPropertyOff
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
r := rep.Do(ctx)
|
|
|
|
ctx.Logf("\n%s", pretty.Sprint(r))
|
|
|
|
}
|
|
|
|
|
|
|
|
do_repl(h1_to_h2)
|
|
|
|
do_repl(h2_to_h3)
|
|
|
|
|
|
|
|
// assert that the replication worked
|
|
|
|
mustGetFilesystemVersion(ctx, host3_b+"@1")
|
|
|
|
|
|
|
|
// assert placeholder status
|
|
|
|
for _, fs := range []string{host2_a, host3_a} {
|
|
|
|
st, err := zfs.ZFSGetFilesystemPlaceholderState(ctx, mustDatasetPath(fs))
|
|
|
|
require.NoError(ctx, err)
|
|
|
|
require.True(ctx, st.IsPlaceholder)
|
|
|
|
}
|
|
|
|
}
|