add option to disable step holds for incremental sends

This is a stop-gap solution until we re-write the pruner to support
rules for removing step holds.

Note that disabling step holds for incremental sends does not affect
zrepl's guarantee that incremental replication is always possible:

Suppose you yank the external drive during an incremental @from -> @to step:

* restarting that step or future incrementals @from -> @to_later` will be possible
  because the replication cursor bookmark points to @from until the step is complete
* resuming @from -> @to will work as long as the pruner on your internal pool doesn't come around to destroy @to.
    * in that case, the replication algorithm should determine that the resumable state
      on the receiving side isuseless because @to no longer exists on the sending side,
      and consequently clear it, and restart an incremental step @from -> @to_later

refs #288
This commit is contained in:
Christian Schwarz 2020-06-01 14:39:59 +02:00
parent 1b39e9d03c
commit 1c270b7e39
8 changed files with 243 additions and 48 deletions

View File

@ -76,7 +76,12 @@ type SnapJob struct {
}
type SendOptions struct {
Encrypted bool `yaml:"encrypted"`
Encrypted bool `yaml:"encrypted"`
StepHolds SendOptionsStepHolds `yaml:"step_holds,optional"`
}
type SendOptionsStepHolds struct {
DisableIncremental bool `yaml:"disable_incremental,optional"`
}
var _ yaml.Defaulter = (*SendOptions)(nil)

View File

@ -152,9 +152,10 @@ func modePushFromConfig(g *config.Global, in *config.PushJob, jobID endpoint.Job
}
m.senderConfig = &endpoint.SenderConfig{
FSF: fsf,
Encrypt: &zfs.NilBool{B: in.Send.Encrypted},
JobID: jobID,
FSF: fsf,
Encrypt: &zfs.NilBool{B: in.Send.Encrypted},
DisableIncrementalStepHolds: in.Send.StepHolds.DisableIncremental,
JobID: jobID,
}
m.plannerPolicy = &logic.PlannerPolicy{
EncryptedSend: logic.TriFromBool(in.Send.Encrypted),

View File

@ -79,9 +79,10 @@ func modeSourceFromConfig(g *config.Global, in *config.SourceJob, jobID endpoint
return nil, errors.Wrap(err, "cannot build filesystem filter")
}
m.senderConfig = &endpoint.SenderConfig{
FSF: fsf,
Encrypt: &zfs.NilBool{B: in.Send.Encrypted},
JobID: jobID,
FSF: fsf,
Encrypt: &zfs.NilBool{B: in.Send.Encrypted},
DisableIncrementalStepHolds: in.Send.StepHolds.DisableIncremental,
JobID: jobID,
}
if m.snapper, err = snapper.FromConfig(g, fsf, in.Snapshotting); err != nil {

View File

@ -175,6 +175,8 @@ func (j *SnapJob) doPrune(ctx context.Context) {
FSF: j.fsfilter,
// FIXME encryption setting is irrelevant for SnapJob because the endpoint is only used as pruner.Target
Encrypt: &zfs.NilBool{B: true},
// FIXME DisableIncrementalStepHolds setting is irrelevant for SnapJob because the endpoint is only used as pruner.Target
DisableIncrementalStepHolds: false,
})
j.pruner = j.prunerFactory.BuildLocalPruner(ctx, sender, alwaysUpToDateReplicationCursorHistory{sender})
log.Info("start pruning")

View File

@ -16,6 +16,8 @@ Send Options
filesystems: ...
send:
encrypted: true
step_holds:
disable_incremental: false
...
:ref:`Source<job-source>` and :ref:`push<job-push>` jobs have an optional ``send`` configuration section.
@ -34,6 +36,23 @@ Filesystems matched by ``filesystems`` that are not encrypted are not sent and w
If ``encryption=false``, zrepl expects that filesystems matching ``filesystems`` are not encrypted or have loaded encryption keys.
``step_holds.disable_incremental`` option
-----------------------------------------
The ``step_holds.disable_incremental`` variable controls whether the creation of :ref:`step holds <step-holds-and-bookmarks>` should be disabled for incremental replication.
The default value is ``false``.
Disabling step holds has the disadvantage that steps :ref:`might not be resumable <step-holds-and-bookmarks>` if interrupted.
Non-resumability means that replication progress is no longer monotonic which might result in a replication setup that never makes progress if mid-step interruptions are too frequent (e.g. frequent network outages).
However, the advantage and :issue:`reason for existence <288>` of this flag is that it allows the pruner to delete snapshots of interrupted replication steps
which is useful if replication happens so rarely (or fails so frequently) that the amount of disk space exclusively referenced by the step's snapshots becomes intolerable.
.. NOTE::
When setting this flag to ``true``, existing step holds for the job will be destroyed on the next replication attempt.
.. _job-recv-options:
Recv Options

View File

@ -21,9 +21,10 @@ import (
)
type SenderConfig struct {
FSF zfs.DatasetFilter
Encrypt *zfs.NilBool
JobID JobID
FSF zfs.DatasetFilter
Encrypt *zfs.NilBool
DisableIncrementalStepHolds bool
JobID JobID
}
func (c *SenderConfig) Validate() error {
@ -39,9 +40,10 @@ func (c *SenderConfig) Validate() error {
// Sender implements replication.ReplicationEndpoint for a sending side
type Sender struct {
FSFilter zfs.DatasetFilter
encrypt *zfs.NilBool
jobId JobID
FSFilter zfs.DatasetFilter
encrypt *zfs.NilBool
disableIncrementalStepHolds bool
jobId JobID
}
func NewSender(conf SenderConfig) *Sender {
@ -49,9 +51,10 @@ func NewSender(conf SenderConfig) *Sender {
panic("invalid config" + err.Error())
}
return &Sender{
FSFilter: conf.FSF,
encrypt: conf.Encrypt,
jobId: conf.JobID,
FSFilter: conf.FSF,
encrypt: conf.Encrypt,
disableIncrementalStepHolds: conf.DisableIncrementalStepHolds,
jobId: conf.JobID,
}
}
@ -221,9 +224,11 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
}
}
takeStepHolds := sendArgs.FromVersion == nil || !s.disableIncrementalStepHolds
var fromHold, toHold Abstraction
// make sure `From` doesn't go away in order to make this step resumable
if sendArgs.From != nil {
if sendArgs.From != nil && takeStepHolds {
fromHold, err = HoldStep(ctx, sendArgs.FS, *sendArgs.FromVersion, s.jobId) // no shadow
if err == zfs.ErrBookmarkCloningNotSupported {
getLogger(ctx).Debug("not creating step bookmark because ZFS does not support it")
@ -232,10 +237,12 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
return nil, nil, errors.Wrapf(err, "cannot hold `from` version %q before starting send", *sendArgs.FromVersion)
}
}
// make sure `To` doesn't go away in order to make this step resumable
toHold, err = HoldStep(ctx, sendArgs.FS, sendArgs.ToVersion, s.jobId)
if err != nil {
return nil, nil, errors.Wrapf(err, "cannot hold `to` version %q before starting send", sendArgs.ToVersion)
if takeStepHolds {
// make sure `To` doesn't go away in order to make this step resumable
toHold, err = HoldStep(ctx, sendArgs.FS, sendArgs.ToVersion, s.jobId)
if err != nil {
return nil, nil, errors.Wrapf(err, "cannot hold `to` version %q before starting send", sendArgs.ToVersion)
}
}
// cleanup the mess that _this function_ might have created in prior failed attempts:
@ -254,11 +261,11 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
//
// Note further that a resuming send, due to the idempotent nature of func CreateReplicationCursor and HoldStep,
// will never lose its step holds because we just (idempotently re-)created them above, before attempting the cleanup.
liveAbs := []Abstraction{fromHold, toHold, fromReplicationCursor}
func() {
ctx, endSpan := trace.WithSpan(ctx, "cleanup-stale-abstractions")
defer endSpan()
liveAbs := []Abstraction{fromHold, toHold, fromReplicationCursor}
keep := func(a Abstraction) (keep bool) {
keep = false
for _, k := range liveAbs {
@ -275,7 +282,10 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
}
for _, staleVersion := range obsoleteAbs {
for _, mustLiveVersion := range mustLiveVersions {
if zfs.FilesystemVersionEqualIdentity(mustLiveVersion, staleVersion.GetFilesystemVersion()) {
isSendArg := zfs.FilesystemVersionEqualIdentity(mustLiveVersion, staleVersion.GetFilesystemVersion())
isStepHoldWeMightHaveCreatedWithCurrentValueOf_takeStepHolds :=
takeStepHolds && staleVersion.GetType() == AbstractionStepHold
if isSendArg && isStepHoldWeMightHaveCreatedWithCurrentValueOf_takeStepHolds {
panic(fmt.Sprintf("impl error: %q would be destroyed because it is considered stale but it is part of of sendArgs=%s", mustLiveVersion.String(), pretty.Sprint(sendArgs)))
}
}
@ -283,13 +293,11 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
}
sendAbstractionsCacheSingleton.TryBatchDestroy(ctx, s.jobId, sendArgs.FS, keep, check)
}()
if fromHold != nil {
sendAbstractionsCacheSingleton.Put(fromHold)
}
sendAbstractionsCacheSingleton.Put(toHold)
if fromReplicationCursor != nil {
sendAbstractionsCacheSingleton.Put(fromReplicationCursor)
// now add the newly created abstractions to the cleaned-up cache
for _, a := range liveAbs {
if a != nil {
sendAbstractionsCacheSingleton.Put(a)
}
}
sendStream, err := zfs.ZFSSend(ctx, sendArgs)

View File

@ -18,8 +18,10 @@ var Cases = []Case{BatchDestroy,
ReceiveForceRollbackWorksUnencrypted,
ReplicationIncrementalCleansUpStaleAbstractionsWithCacheOnSecondReplication,
ReplicationIncrementalCleansUpStaleAbstractionsWithoutCacheOnSecondReplication,
ReplicationIncrementalDestroysStepHoldsIffIncrementalStepHoldsAreDisabledButStepHoldsExist,
ReplicationIncrementalIsPossibleIfCommonSnapshotIsDestroyed,
ReplicationIsResumableFullSend,
ReplicationIsResumableFullSend__DisableIncrementalStepHolds_False,
ReplicationIsResumableFullSend__DisableIncrementalStepHolds_True,
ResumableRecvAndTokenHandling,
ResumeTokenParsing,
SendArgsValidationEncryptedSendOfUnencryptedDatasetForbidden,

View File

@ -5,6 +5,7 @@ import (
"fmt"
"io"
"path"
"sort"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
@ -26,10 +27,11 @@ import (
// of a new sender and receiver instance and one blocking invocation
// of the replication engine without encryption
type replicationInvocation struct {
sjid, rjid endpoint.JobID
sfs string
rfsRoot string
interceptSender func(e *endpoint.Sender) logic.Sender
sjid, rjid endpoint.JobID
sfs string
rfsRoot string
interceptSender func(e *endpoint.Sender) logic.Sender
disableIncrementalStepHolds bool
}
func (i replicationInvocation) Do(ctx *platformtest.Context) *report.Report {
@ -42,9 +44,10 @@ func (i replicationInvocation) Do(ctx *platformtest.Context) *report.Report {
err := sfilter.Add(i.sfs, "ok")
require.NoError(ctx, err)
sender := i.interceptSender(endpoint.NewSender(endpoint.SenderConfig{
FSF: sfilter.AsFilter(),
Encrypt: &zfs.NilBool{B: false},
JobID: i.sjid,
FSF: sfilter.AsFilter(),
Encrypt: &zfs.NilBool{B: false},
DisableIncrementalStepHolds: i.disableIncrementalStepHolds,
JobID: i.sjid,
}))
receiver := endpoint.NewReceiver(endpoint.ReceiverConfig{
JobID: i.rjid,
@ -86,10 +89,11 @@ func ReplicationIncrementalIsPossibleIfCommonSnapshotIsDestroyed(ctx *platformte
snap1 := fsversion(ctx, sfs, "@1")
rep := replicationInvocation{
sjid: sjid,
rjid: rjid,
sfs: sfs,
rfsRoot: rfsRoot,
sjid: sjid,
rjid: rjid,
sfs: sfs,
rfsRoot: rfsRoot,
disableIncrementalStepHolds: false,
}
rfs := rep.ReceiveSideFilesystem()
@ -149,10 +153,11 @@ func implReplicationIncrementalCleansUpStaleAbstractions(ctx *platformtest.Conte
rfsRoot := ctx.RootDataset + "/receiver"
rep := replicationInvocation{
sjid: sjid,
rjid: rjid,
sfs: sfs,
rfsRoot: rfsRoot,
sjid: sjid,
rjid: rjid,
sfs: sfs,
rfsRoot: rfsRoot,
disableIncrementalStepHolds: false,
}
rfs := rep.ReceiveSideFilesystem()
@ -178,7 +183,7 @@ func implReplicationIncrementalCleansUpStaleAbstractions(ctx *platformtest.Conte
require.NoError(ctx, err)
require.Contains(ctx, holds, expectRjidHoldTag)
// create artifical stale replication cursors
// create artifical stale replication cursors & step holds
createArtificalStaleAbstractions := func(jobId endpoint.JobID) []endpoint.Abstraction {
snap2Cursor, err := endpoint.CreateReplicationCursor(ctx, sfs, snap2, jobId) // no shadow
require.NoError(ctx, err)
@ -322,7 +327,15 @@ func (s *PartialSender) Send(ctx context.Context, r *pdu.SendReq) (r1 *pdu.SendR
return r1, r2, r3
}
func ReplicationIsResumableFullSend(ctx *platformtest.Context) {
func ReplicationIsResumableFullSend__DisableIncrementalStepHolds_False(ctx *platformtest.Context) {
implReplicationIsResumableFullSend(ctx, false)
}
func ReplicationIsResumableFullSend__DisableIncrementalStepHolds_True(ctx *platformtest.Context) {
implReplicationIsResumableFullSend(ctx, true)
}
func implReplicationIsResumableFullSend(ctx *platformtest.Context, disableIncrementalStepHolds bool) {
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
CREATEROOT
@ -353,6 +366,7 @@ func ReplicationIsResumableFullSend(ctx *platformtest.Context) {
interceptSender: func(e *endpoint.Sender) logic.Sender {
return &PartialSender{Sender: e, failAfterByteCount: 1 << 20}
},
disableIncrementalStepHolds: disableIncrementalStepHolds,
}
rfs := rep.ReceiveSideFilesystem()
@ -394,3 +408,146 @@ func ReplicationIsResumableFullSend(ctx *platformtest.Context) {
_ = fsversion(ctx, rfs, "@3")
}
func ReplicationIncrementalDestroysStepHoldsIffIncrementalStepHoldsAreDisabledButStepHoldsExist(ctx *platformtest.Context) {
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
CREATEROOT
+ "sender"
+ "receiver"
R zfs create -p "${ROOTDS}/receiver/${ROOTDS}"
`)
sjid := endpoint.MustMakeJobID("sender-job")
rjid := endpoint.MustMakeJobID("receiver-job")
sfs := ctx.RootDataset + "/sender"
rfsRoot := ctx.RootDataset + "/receiver"
// fully replicate snapshots @1
{
mustSnapshot(ctx, sfs+"@1")
rep := replicationInvocation{
sjid: sjid,
rjid: rjid,
sfs: sfs,
rfsRoot: rfsRoot,
disableIncrementalStepHolds: false,
}
rfs := rep.ReceiveSideFilesystem()
report := rep.Do(ctx)
ctx.Logf("\n%s", pretty.Sprint(report))
// assert this worked (not the main subject of the test)
_ = fsversion(ctx, rfs, "@1")
}
// create a large snapshot @2
{
sfsmp, err := zfs.ZFSGetMountpoint(ctx, sfs)
require.NoError(ctx, err)
require.True(ctx, sfsmp.Mounted)
writeDummyData(path.Join(sfsmp.Mountpoint, "dummy.data"), 1<<22)
mustSnapshot(ctx, sfs+"@2")
}
snap2sfs := fsversion(ctx, sfs, "@2")
// partially replicate snapshots @2 with step holds enabled
// to effect a step-holds situation
{
rep := replicationInvocation{
sjid: sjid,
rjid: rjid,
sfs: sfs,
rfsRoot: rfsRoot,
disableIncrementalStepHolds: false, // !
interceptSender: func(e *endpoint.Sender) logic.Sender {
return &PartialSender{Sender: e, failAfterByteCount: 1 << 20}
},
}
rfs := rep.ReceiveSideFilesystem()
report := rep.Do(ctx)
ctx.Logf("\n%s", pretty.Sprint(report))
// assert this partial receive worked
_, err := zfs.ZFSGetFilesystemVersion(ctx, rfs+"@2")
ctx.Logf("%T %s", err, err)
_, notFullyReceived := err.(*zfs.DatasetDoesNotExist)
require.True(ctx, notFullyReceived)
// assert step holds are in place
abs, absErrs, err := endpoint.ListAbstractions(ctx, endpoint.ListZFSHoldsAndBookmarksQuery{
FS: endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{
FS: &sfs,
},
Concurrency: 1,
JobID: &sjid,
What: endpoint.AbstractionTypeSet{endpoint.AbstractionStepHold: true},
})
require.NoError(ctx, err)
require.Empty(ctx, absErrs)
require.Len(ctx, abs, 2)
sort.Slice(abs, func(i, j int) bool {
return abs[i].GetCreateTXG() < abs[j].GetCreateTXG()
})
require.True(ctx, zfs.FilesystemVersionEqualIdentity(abs[0].GetFilesystemVersion(), fsversion(ctx, sfs, "@1")))
require.True(ctx, zfs.FilesystemVersionEqualIdentity(abs[1].GetFilesystemVersion(), fsversion(ctx, sfs, "@2")))
}
//
// end of test setup
//
// retry replication with incremental step holds disabled
// - replication should not fail due to holds-related stuff
// - replication should fail intermittently due to partial sender being fully read
// - the partial sender is 1/4th the length of the stream, thus expect
// successful replication after 5 more attempts
rep := replicationInvocation{
sjid: sjid,
rjid: rjid,
sfs: sfs,
rfsRoot: rfsRoot,
disableIncrementalStepHolds: true, // !
interceptSender: func(e *endpoint.Sender) logic.Sender {
return &PartialSender{Sender: e, failAfterByteCount: 1 << 20}
},
}
rfs := rep.ReceiveSideFilesystem()
for i := 0; ; i++ {
require.True(ctx, i < 5)
report := rep.Do(ctx)
ctx.Logf("retry run=%v\n%s", i, pretty.Sprint(report))
_, err := zfs.ZFSGetFilesystemVersion(ctx, rfs+"@2")
if err == nil {
break
}
}
// assert replication worked
fsversion(ctx, rfs, "@2")
// assert no step holds exist
abs, absErrs, err := endpoint.ListAbstractions(ctx, endpoint.ListZFSHoldsAndBookmarksQuery{
FS: endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{
FS: &sfs,
},
Concurrency: 1,
JobID: &sjid,
What: endpoint.AbstractionTypeSet{endpoint.AbstractionStepHold: true},
})
require.NoError(ctx, err)
require.Empty(ctx, absErrs)
require.Len(ctx, abs, 0)
// assert that the replication cursor bookmark exists
abs, absErrs, err = endpoint.ListAbstractions(ctx, endpoint.ListZFSHoldsAndBookmarksQuery{
FS: endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{
FS: &sfs,
},
Concurrency: 1,
JobID: &sjid,
What: endpoint.AbstractionTypeSet{endpoint.AbstractionReplicationCursorBookmarkV2: true},
})
require.NoError(ctx, err)
require.Empty(ctx, absErrs)
require.Len(ctx, abs, 1)
require.True(ctx, zfs.FilesystemVersionEqualIdentity(abs[0].GetFilesystemVersion(), snap2sfs))
}