[#288] option to disable step holds for incremental sends

This is a stop-gap solution until we re-write the pruner to support
rules for removing step holds.

Note that disabling step holds for incremental sends does not affect
zrepl's guarantee that incremental replication is always possible:

Suppose you yank the external drive during an incremental @from -> @to step:

* restarting that step or future incrementals @from -> @to_later` will be possible
  because the replication cursor bookmark points to @from until the step is complete
* resuming @from -> @to will work as long as the pruner on your internal pool doesn't come around to destroy @to.
    * in that case, the replication algorithm should determine that the resumable state
      on the receiving side isuseless because @to no longer exists on the sending side,
      and consequently clear it, and restart an incremental step @from -> @to_later

refs #288
This commit is contained in:
Christian Schwarz 2020-06-01 14:39:59 +02:00
parent b6b2d7b615
commit 5b564a3e28
8 changed files with 87 additions and 46 deletions

View File

@ -76,7 +76,12 @@ type SnapJob struct {
} }
type SendOptions struct { type SendOptions struct {
Encrypted bool `yaml:"encrypted"` Encrypted bool `yaml:"encrypted"`
StepHolds SendOptionsStepHolds `yaml:"step_holds,optional"`
}
type SendOptionsStepHolds struct {
DisableIncremental bool `yaml:"disable_incremental,optional"`
} }
var _ yaml.Defaulter = (*SendOptions)(nil) var _ yaml.Defaulter = (*SendOptions)(nil)

View File

@ -152,9 +152,10 @@ func modePushFromConfig(g *config.Global, in *config.PushJob, jobID endpoint.Job
} }
m.senderConfig = &endpoint.SenderConfig{ m.senderConfig = &endpoint.SenderConfig{
FSF: fsf, FSF: fsf,
Encrypt: &zfs.NilBool{B: in.Send.Encrypted}, Encrypt: &zfs.NilBool{B: in.Send.Encrypted},
JobID: jobID, DisableIncrementalStepHolds: in.Send.StepHolds.DisableIncremental,
JobID: jobID,
} }
m.plannerPolicy = &logic.PlannerPolicy{ m.plannerPolicy = &logic.PlannerPolicy{
EncryptedSend: logic.TriFromBool(in.Send.Encrypted), EncryptedSend: logic.TriFromBool(in.Send.Encrypted),

View File

@ -79,9 +79,10 @@ func modeSourceFromConfig(g *config.Global, in *config.SourceJob, jobID endpoint
return nil, errors.Wrap(err, "cannot build filesystem filter") return nil, errors.Wrap(err, "cannot build filesystem filter")
} }
m.senderConfig = &endpoint.SenderConfig{ m.senderConfig = &endpoint.SenderConfig{
FSF: fsf, FSF: fsf,
Encrypt: &zfs.NilBool{B: in.Send.Encrypted}, Encrypt: &zfs.NilBool{B: in.Send.Encrypted},
JobID: jobID, DisableIncrementalStepHolds: in.Send.StepHolds.DisableIncremental,
JobID: jobID,
} }
if m.snapper, err = snapper.FromConfig(g, fsf, in.Snapshotting); err != nil { if m.snapper, err = snapper.FromConfig(g, fsf, in.Snapshotting); err != nil {

View File

@ -175,6 +175,8 @@ func (j *SnapJob) doPrune(ctx context.Context) {
FSF: j.fsfilter, FSF: j.fsfilter,
// FIXME encryption setting is irrelevant for SnapJob because the endpoint is only used as pruner.Target // FIXME encryption setting is irrelevant for SnapJob because the endpoint is only used as pruner.Target
Encrypt: &zfs.NilBool{B: true}, Encrypt: &zfs.NilBool{B: true},
// FIXME DisableIncrementalStepHolds setting is irrelevant for SnapJob because the endpoint is only used as pruner.Target
DisableIncrementalStepHolds: false,
}) })
j.pruner = j.prunerFactory.BuildLocalPruner(ctx, sender, alwaysUpToDateReplicationCursorHistory{sender}) j.pruner = j.prunerFactory.BuildLocalPruner(ctx, sender, alwaysUpToDateReplicationCursorHistory{sender})
log.Info("start pruning") log.Info("start pruning")

View File

@ -16,6 +16,8 @@ Send Options
filesystems: ... filesystems: ...
send: send:
encrypted: true encrypted: true
step_holds:
disable_incremental: false
... ...
:ref:`Source<job-source>` and :ref:`push<job-push>` jobs have an optional ``send`` configuration section. :ref:`Source<job-source>` and :ref:`push<job-push>` jobs have an optional ``send`` configuration section.
@ -34,6 +36,17 @@ Filesystems matched by ``filesystems`` that are not encrypted are not sent and w
If ``encryption=false``, zrepl expects that filesystems matching ``filesystems`` are not encrypted or have loaded encryption keys. If ``encryption=false``, zrepl expects that filesystems matching ``filesystems`` are not encrypted or have loaded encryption keys.
``step_holds.disable_incremental`` option
-----------------------------------------
The ``step_holds.disable_incremental`` variable controls whether the creation of :ref:`step holds <step-holds-and-bookmarks>` should be disabled for incremental replication.
The default value is ``false``.
Disabling step holds has the disadvantage that steps :ref:`might not be resumable <step-holds-and-bookmarks>` if interrupted.
However, the advantage and :issue:`reason for existence <288>` of this flag is that it allows the pruner to delete snapshots of interrupted replication steps
which is useful if replication happens so rarely (or fails so frequently) that the amount of disk space exclusively referenced by the step's snapshots becomes intolerable.
.. _job-recv-options: .. _job-recv-options:
Recv Options Recv Options

View File

@ -21,9 +21,10 @@ import (
) )
type SenderConfig struct { type SenderConfig struct {
FSF zfs.DatasetFilter FSF zfs.DatasetFilter
Encrypt *zfs.NilBool Encrypt *zfs.NilBool
JobID JobID DisableIncrementalStepHolds bool
JobID JobID
} }
func (c *SenderConfig) Validate() error { func (c *SenderConfig) Validate() error {
@ -39,9 +40,10 @@ func (c *SenderConfig) Validate() error {
// Sender implements replication.ReplicationEndpoint for a sending side // Sender implements replication.ReplicationEndpoint for a sending side
type Sender struct { type Sender struct {
FSFilter zfs.DatasetFilter FSFilter zfs.DatasetFilter
encrypt *zfs.NilBool encrypt *zfs.NilBool
jobId JobID disableIncrementalStepHolds bool
jobId JobID
} }
func NewSender(conf SenderConfig) *Sender { func NewSender(conf SenderConfig) *Sender {
@ -49,9 +51,10 @@ func NewSender(conf SenderConfig) *Sender {
panic("invalid config" + err.Error()) panic("invalid config" + err.Error())
} }
return &Sender{ return &Sender{
FSFilter: conf.FSF, FSFilter: conf.FSF,
encrypt: conf.Encrypt, encrypt: conf.Encrypt,
jobId: conf.JobID, disableIncrementalStepHolds: conf.DisableIncrementalStepHolds,
jobId: conf.JobID,
} }
} }
@ -221,9 +224,11 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
} }
} }
takeStepHolds := sendArgs.FromVersion == nil || !s.disableIncrementalStepHolds
var fromHold, toHold Abstraction var fromHold, toHold Abstraction
// make sure `From` doesn't go away in order to make this step resumable // make sure `From` doesn't go away in order to make this step resumable
if sendArgs.From != nil { if sendArgs.From != nil && takeStepHolds {
fromHold, err = HoldStep(ctx, sendArgs.FS, *sendArgs.FromVersion, s.jobId) // no shadow fromHold, err = HoldStep(ctx, sendArgs.FS, *sendArgs.FromVersion, s.jobId) // no shadow
if err == zfs.ErrBookmarkCloningNotSupported { if err == zfs.ErrBookmarkCloningNotSupported {
getLogger(ctx).Debug("not creating step bookmark because ZFS does not support it") getLogger(ctx).Debug("not creating step bookmark because ZFS does not support it")
@ -232,10 +237,12 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
return nil, nil, errors.Wrapf(err, "cannot hold `from` version %q before starting send", *sendArgs.FromVersion) return nil, nil, errors.Wrapf(err, "cannot hold `from` version %q before starting send", *sendArgs.FromVersion)
} }
} }
// make sure `To` doesn't go away in order to make this step resumable if takeStepHolds {
toHold, err = HoldStep(ctx, sendArgs.FS, sendArgs.ToVersion, s.jobId) // make sure `To` doesn't go away in order to make this step resumable
if err != nil { toHold, err = HoldStep(ctx, sendArgs.FS, sendArgs.ToVersion, s.jobId)
return nil, nil, errors.Wrapf(err, "cannot hold `to` version %q before starting send", sendArgs.ToVersion) if err != nil {
return nil, nil, errors.Wrapf(err, "cannot hold `to` version %q before starting send", sendArgs.ToVersion)
}
} }
// cleanup the mess that _this function_ might have created in prior failed attempts: // cleanup the mess that _this function_ might have created in prior failed attempts:
@ -254,11 +261,11 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
// //
// Note further that a resuming send, due to the idempotent nature of func CreateReplicationCursor and HoldStep, // Note further that a resuming send, due to the idempotent nature of func CreateReplicationCursor and HoldStep,
// will never lose its step holds because we just (idempotently re-)created them above, before attempting the cleanup. // will never lose its step holds because we just (idempotently re-)created them above, before attempting the cleanup.
liveAbs := []Abstraction{fromHold, toHold, fromReplicationCursor}
func() { func() {
ctx, endSpan := trace.WithSpan(ctx, "cleanup-stale-abstractions") ctx, endSpan := trace.WithSpan(ctx, "cleanup-stale-abstractions")
defer endSpan() defer endSpan()
liveAbs := []Abstraction{fromHold, toHold, fromReplicationCursor}
keep := func(a Abstraction) (keep bool) { keep := func(a Abstraction) (keep bool) {
keep = false keep = false
for _, k := range liveAbs { for _, k := range liveAbs {
@ -283,13 +290,11 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
} }
sendAbstractionsCacheSingleton.TryBatchDestroy(ctx, s.jobId, sendArgs.FS, keep, check) sendAbstractionsCacheSingleton.TryBatchDestroy(ctx, s.jobId, sendArgs.FS, keep, check)
}() }()
// now add the newly created abstractions to the cleaned-up cache
if fromHold != nil { for _, a := range liveAbs {
sendAbstractionsCacheSingleton.Put(fromHold) if a != nil {
} sendAbstractionsCacheSingleton.Put(a)
sendAbstractionsCacheSingleton.Put(toHold) }
if fromReplicationCursor != nil {
sendAbstractionsCacheSingleton.Put(fromReplicationCursor)
} }
sendStream, err := zfs.ZFSSend(ctx, sendArgs) sendStream, err := zfs.ZFSSend(ctx, sendArgs)

View File

@ -19,7 +19,8 @@ var Cases = []Case{BatchDestroy,
ReplicationIncrementalCleansUpStaleAbstractionsWithCacheOnSecondReplication, ReplicationIncrementalCleansUpStaleAbstractionsWithCacheOnSecondReplication,
ReplicationIncrementalCleansUpStaleAbstractionsWithoutCacheOnSecondReplication, ReplicationIncrementalCleansUpStaleAbstractionsWithoutCacheOnSecondReplication,
ReplicationIncrementalIsPossibleIfCommonSnapshotIsDestroyed, ReplicationIncrementalIsPossibleIfCommonSnapshotIsDestroyed,
ReplicationIsResumableFullSend, ReplicationIsResumableFullSend__DisableIncrementalStepHolds_False,
ReplicationIsResumableFullSend__DisableIncrementalStepHolds_True,
ResumableRecvAndTokenHandling, ResumableRecvAndTokenHandling,
ResumeTokenParsing, ResumeTokenParsing,
SendArgsValidationEncryptedSendOfUnencryptedDatasetForbidden, SendArgsValidationEncryptedSendOfUnencryptedDatasetForbidden,

View File

@ -26,10 +26,11 @@ import (
// of a new sender and receiver instance and one blocking invocation // of a new sender and receiver instance and one blocking invocation
// of the replication engine without encryption // of the replication engine without encryption
type replicationInvocation struct { type replicationInvocation struct {
sjid, rjid endpoint.JobID sjid, rjid endpoint.JobID
sfs string sfs string
rfsRoot string rfsRoot string
interceptSender func(e *endpoint.Sender) logic.Sender interceptSender func(e *endpoint.Sender) logic.Sender
disableIncrementalStepHolds bool
} }
func (i replicationInvocation) Do(ctx *platformtest.Context) *report.Report { func (i replicationInvocation) Do(ctx *platformtest.Context) *report.Report {
@ -42,9 +43,10 @@ func (i replicationInvocation) Do(ctx *platformtest.Context) *report.Report {
err := sfilter.Add(i.sfs, "ok") err := sfilter.Add(i.sfs, "ok")
require.NoError(ctx, err) require.NoError(ctx, err)
sender := i.interceptSender(endpoint.NewSender(endpoint.SenderConfig{ sender := i.interceptSender(endpoint.NewSender(endpoint.SenderConfig{
FSF: sfilter.AsFilter(), FSF: sfilter.AsFilter(),
Encrypt: &zfs.NilBool{B: false}, Encrypt: &zfs.NilBool{B: false},
JobID: i.sjid, DisableIncrementalStepHolds: i.disableIncrementalStepHolds,
JobID: i.sjid,
})) }))
receiver := endpoint.NewReceiver(endpoint.ReceiverConfig{ receiver := endpoint.NewReceiver(endpoint.ReceiverConfig{
JobID: i.rjid, JobID: i.rjid,
@ -86,10 +88,11 @@ func ReplicationIncrementalIsPossibleIfCommonSnapshotIsDestroyed(ctx *platformte
snap1 := fsversion(ctx, sfs, "@1") snap1 := fsversion(ctx, sfs, "@1")
rep := replicationInvocation{ rep := replicationInvocation{
sjid: sjid, sjid: sjid,
rjid: rjid, rjid: rjid,
sfs: sfs, sfs: sfs,
rfsRoot: rfsRoot, rfsRoot: rfsRoot,
disableIncrementalStepHolds: false,
} }
rfs := rep.ReceiveSideFilesystem() rfs := rep.ReceiveSideFilesystem()
@ -149,10 +152,11 @@ func implReplicationIncrementalCleansUpStaleAbstractions(ctx *platformtest.Conte
rfsRoot := ctx.RootDataset + "/receiver" rfsRoot := ctx.RootDataset + "/receiver"
rep := replicationInvocation{ rep := replicationInvocation{
sjid: sjid, sjid: sjid,
rjid: rjid, rjid: rjid,
sfs: sfs, sfs: sfs,
rfsRoot: rfsRoot, rfsRoot: rfsRoot,
disableIncrementalStepHolds: false,
} }
rfs := rep.ReceiveSideFilesystem() rfs := rep.ReceiveSideFilesystem()
@ -322,7 +326,15 @@ func (s *PartialSender) Send(ctx context.Context, r *pdu.SendReq) (r1 *pdu.SendR
return r1, r2, r3 return r1, r2, r3
} }
func ReplicationIsResumableFullSend(ctx *platformtest.Context) { func ReplicationIsResumableFullSend__DisableIncrementalStepHolds_False(ctx *platformtest.Context) {
implReplicationIsResumableFullSend(ctx, false)
}
func ReplicationIsResumableFullSend__DisableIncrementalStepHolds_True(ctx *platformtest.Context) {
implReplicationIsResumableFullSend(ctx, true)
}
func implReplicationIsResumableFullSend(ctx *platformtest.Context, disableIncrementalStepHolds bool) {
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, ` platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
CREATEROOT CREATEROOT
@ -353,6 +365,7 @@ func ReplicationIsResumableFullSend(ctx *platformtest.Context) {
interceptSender: func(e *endpoint.Sender) logic.Sender { interceptSender: func(e *endpoint.Sender) logic.Sender {
return &PartialSender{Sender: e, failAfterByteCount: 1 << 20} return &PartialSender{Sender: e, failAfterByteCount: 1 << 20}
}, },
disableIncrementalStepHolds: disableIncrementalStepHolds,
} }
rfs := rep.ReceiveSideFilesystem() rfs := rep.ReceiveSideFilesystem()