From 5b564a3e28d1858af0355298bd0c07cd48775840 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 1 Jun 2020 14:39:59 +0200 Subject: [PATCH] [#288] option to disable step holds for incremental sends This is a stop-gap solution until we re-write the pruner to support rules for removing step holds. Note that disabling step holds for incremental sends does not affect zrepl's guarantee that incremental replication is always possible: Suppose you yank the external drive during an incremental @from -> @to step: * restarting that step or future incrementals @from -> @to_later` will be possible because the replication cursor bookmark points to @from until the step is complete * resuming @from -> @to will work as long as the pruner on your internal pool doesn't come around to destroy @to. * in that case, the replication algorithm should determine that the resumable state on the receiving side isuseless because @to no longer exists on the sending side, and consequently clear it, and restart an incremental step @from -> @to_later refs #288 --- config/config.go | 7 +++- daemon/job/active.go | 7 ++-- daemon/job/passive.go | 7 ++-- daemon/job/snapjob.go | 2 ++ docs/configuration/sendrecvoptions.rst | 13 +++++++ endpoint/endpoint.go | 49 ++++++++++++++------------ platformtest/tests/generated_cases.go | 3 +- platformtest/tests/replication.go | 45 ++++++++++++++--------- 8 files changed, 87 insertions(+), 46 deletions(-) diff --git a/config/config.go b/config/config.go index 81c3104..6ff2718 100644 --- a/config/config.go +++ b/config/config.go @@ -76,7 +76,12 @@ type SnapJob struct { } type SendOptions struct { - Encrypted bool `yaml:"encrypted"` + Encrypted bool `yaml:"encrypted"` + StepHolds SendOptionsStepHolds `yaml:"step_holds,optional"` +} + +type SendOptionsStepHolds struct { + DisableIncremental bool `yaml:"disable_incremental,optional"` } var _ yaml.Defaulter = (*SendOptions)(nil) diff --git a/daemon/job/active.go b/daemon/job/active.go index a65a8a6..fb66f94 100644 --- a/daemon/job/active.go +++ b/daemon/job/active.go @@ -152,9 +152,10 @@ func modePushFromConfig(g *config.Global, in *config.PushJob, jobID endpoint.Job } m.senderConfig = &endpoint.SenderConfig{ - FSF: fsf, - Encrypt: &zfs.NilBool{B: in.Send.Encrypted}, - JobID: jobID, + FSF: fsf, + Encrypt: &zfs.NilBool{B: in.Send.Encrypted}, + DisableIncrementalStepHolds: in.Send.StepHolds.DisableIncremental, + JobID: jobID, } m.plannerPolicy = &logic.PlannerPolicy{ EncryptedSend: logic.TriFromBool(in.Send.Encrypted), diff --git a/daemon/job/passive.go b/daemon/job/passive.go index 3d3f79c..3c1d32e 100644 --- a/daemon/job/passive.go +++ b/daemon/job/passive.go @@ -79,9 +79,10 @@ func modeSourceFromConfig(g *config.Global, in *config.SourceJob, jobID endpoint return nil, errors.Wrap(err, "cannot build filesystem filter") } m.senderConfig = &endpoint.SenderConfig{ - FSF: fsf, - Encrypt: &zfs.NilBool{B: in.Send.Encrypted}, - JobID: jobID, + FSF: fsf, + Encrypt: &zfs.NilBool{B: in.Send.Encrypted}, + DisableIncrementalStepHolds: in.Send.StepHolds.DisableIncremental, + JobID: jobID, } if m.snapper, err = snapper.FromConfig(g, fsf, in.Snapshotting); err != nil { diff --git a/daemon/job/snapjob.go b/daemon/job/snapjob.go index 3ce7ee4..24f6863 100644 --- a/daemon/job/snapjob.go +++ b/daemon/job/snapjob.go @@ -175,6 +175,8 @@ func (j *SnapJob) doPrune(ctx context.Context) { FSF: j.fsfilter, // FIXME encryption setting is irrelevant for SnapJob because the endpoint is only used as pruner.Target Encrypt: &zfs.NilBool{B: true}, + // FIXME DisableIncrementalStepHolds setting is irrelevant for SnapJob because the endpoint is only used as pruner.Target + DisableIncrementalStepHolds: false, }) j.pruner = j.prunerFactory.BuildLocalPruner(ctx, sender, alwaysUpToDateReplicationCursorHistory{sender}) log.Info("start pruning") diff --git a/docs/configuration/sendrecvoptions.rst b/docs/configuration/sendrecvoptions.rst index 3e75d72..4dc786e 100644 --- a/docs/configuration/sendrecvoptions.rst +++ b/docs/configuration/sendrecvoptions.rst @@ -16,6 +16,8 @@ Send Options filesystems: ... send: encrypted: true + step_holds: + disable_incremental: false ... :ref:`Source` and :ref:`push` jobs have an optional ``send`` configuration section. @@ -34,6 +36,17 @@ Filesystems matched by ``filesystems`` that are not encrypted are not sent and w If ``encryption=false``, zrepl expects that filesystems matching ``filesystems`` are not encrypted or have loaded encryption keys. + +``step_holds.disable_incremental`` option +----------------------------------------- + +The ``step_holds.disable_incremental`` variable controls whether the creation of :ref:`step holds ` should be disabled for incremental replication. +The default value is ``false``. + +Disabling step holds has the disadvantage that steps :ref:`might not be resumable ` if interrupted. +However, the advantage and :issue:`reason for existence <288>` of this flag is that it allows the pruner to delete snapshots of interrupted replication steps +which is useful if replication happens so rarely (or fails so frequently) that the amount of disk space exclusively referenced by the step's snapshots becomes intolerable. + .. _job-recv-options: Recv Options diff --git a/endpoint/endpoint.go b/endpoint/endpoint.go index fa64621..084b76a 100644 --- a/endpoint/endpoint.go +++ b/endpoint/endpoint.go @@ -21,9 +21,10 @@ import ( ) type SenderConfig struct { - FSF zfs.DatasetFilter - Encrypt *zfs.NilBool - JobID JobID + FSF zfs.DatasetFilter + Encrypt *zfs.NilBool + DisableIncrementalStepHolds bool + JobID JobID } func (c *SenderConfig) Validate() error { @@ -39,9 +40,10 @@ func (c *SenderConfig) Validate() error { // Sender implements replication.ReplicationEndpoint for a sending side type Sender struct { - FSFilter zfs.DatasetFilter - encrypt *zfs.NilBool - jobId JobID + FSFilter zfs.DatasetFilter + encrypt *zfs.NilBool + disableIncrementalStepHolds bool + jobId JobID } func NewSender(conf SenderConfig) *Sender { @@ -49,9 +51,10 @@ func NewSender(conf SenderConfig) *Sender { panic("invalid config" + err.Error()) } return &Sender{ - FSFilter: conf.FSF, - encrypt: conf.Encrypt, - jobId: conf.JobID, + FSFilter: conf.FSF, + encrypt: conf.Encrypt, + disableIncrementalStepHolds: conf.DisableIncrementalStepHolds, + jobId: conf.JobID, } } @@ -221,9 +224,11 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea } } + takeStepHolds := sendArgs.FromVersion == nil || !s.disableIncrementalStepHolds + var fromHold, toHold Abstraction // make sure `From` doesn't go away in order to make this step resumable - if sendArgs.From != nil { + if sendArgs.From != nil && takeStepHolds { fromHold, err = HoldStep(ctx, sendArgs.FS, *sendArgs.FromVersion, s.jobId) // no shadow if err == zfs.ErrBookmarkCloningNotSupported { getLogger(ctx).Debug("not creating step bookmark because ZFS does not support it") @@ -232,10 +237,12 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea return nil, nil, errors.Wrapf(err, "cannot hold `from` version %q before starting send", *sendArgs.FromVersion) } } - // make sure `To` doesn't go away in order to make this step resumable - toHold, err = HoldStep(ctx, sendArgs.FS, sendArgs.ToVersion, s.jobId) - if err != nil { - return nil, nil, errors.Wrapf(err, "cannot hold `to` version %q before starting send", sendArgs.ToVersion) + if takeStepHolds { + // make sure `To` doesn't go away in order to make this step resumable + toHold, err = HoldStep(ctx, sendArgs.FS, sendArgs.ToVersion, s.jobId) + if err != nil { + return nil, nil, errors.Wrapf(err, "cannot hold `to` version %q before starting send", sendArgs.ToVersion) + } } // cleanup the mess that _this function_ might have created in prior failed attempts: @@ -254,11 +261,11 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea // // Note further that a resuming send, due to the idempotent nature of func CreateReplicationCursor and HoldStep, // will never lose its step holds because we just (idempotently re-)created them above, before attempting the cleanup. + liveAbs := []Abstraction{fromHold, toHold, fromReplicationCursor} func() { ctx, endSpan := trace.WithSpan(ctx, "cleanup-stale-abstractions") defer endSpan() - liveAbs := []Abstraction{fromHold, toHold, fromReplicationCursor} keep := func(a Abstraction) (keep bool) { keep = false for _, k := range liveAbs { @@ -283,13 +290,11 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea } sendAbstractionsCacheSingleton.TryBatchDestroy(ctx, s.jobId, sendArgs.FS, keep, check) }() - - if fromHold != nil { - sendAbstractionsCacheSingleton.Put(fromHold) - } - sendAbstractionsCacheSingleton.Put(toHold) - if fromReplicationCursor != nil { - sendAbstractionsCacheSingleton.Put(fromReplicationCursor) + // now add the newly created abstractions to the cleaned-up cache + for _, a := range liveAbs { + if a != nil { + sendAbstractionsCacheSingleton.Put(a) + } } sendStream, err := zfs.ZFSSend(ctx, sendArgs) diff --git a/platformtest/tests/generated_cases.go b/platformtest/tests/generated_cases.go index 311944d..5ba7687 100644 --- a/platformtest/tests/generated_cases.go +++ b/platformtest/tests/generated_cases.go @@ -19,7 +19,8 @@ var Cases = []Case{BatchDestroy, ReplicationIncrementalCleansUpStaleAbstractionsWithCacheOnSecondReplication, ReplicationIncrementalCleansUpStaleAbstractionsWithoutCacheOnSecondReplication, ReplicationIncrementalIsPossibleIfCommonSnapshotIsDestroyed, - ReplicationIsResumableFullSend, + ReplicationIsResumableFullSend__DisableIncrementalStepHolds_False, + ReplicationIsResumableFullSend__DisableIncrementalStepHolds_True, ResumableRecvAndTokenHandling, ResumeTokenParsing, SendArgsValidationEncryptedSendOfUnencryptedDatasetForbidden, diff --git a/platformtest/tests/replication.go b/platformtest/tests/replication.go index 8176b63..f0b511a 100644 --- a/platformtest/tests/replication.go +++ b/platformtest/tests/replication.go @@ -26,10 +26,11 @@ import ( // of a new sender and receiver instance and one blocking invocation // of the replication engine without encryption type replicationInvocation struct { - sjid, rjid endpoint.JobID - sfs string - rfsRoot string - interceptSender func(e *endpoint.Sender) logic.Sender + sjid, rjid endpoint.JobID + sfs string + rfsRoot string + interceptSender func(e *endpoint.Sender) logic.Sender + disableIncrementalStepHolds bool } func (i replicationInvocation) Do(ctx *platformtest.Context) *report.Report { @@ -42,9 +43,10 @@ func (i replicationInvocation) Do(ctx *platformtest.Context) *report.Report { err := sfilter.Add(i.sfs, "ok") require.NoError(ctx, err) sender := i.interceptSender(endpoint.NewSender(endpoint.SenderConfig{ - FSF: sfilter.AsFilter(), - Encrypt: &zfs.NilBool{B: false}, - JobID: i.sjid, + FSF: sfilter.AsFilter(), + Encrypt: &zfs.NilBool{B: false}, + DisableIncrementalStepHolds: i.disableIncrementalStepHolds, + JobID: i.sjid, })) receiver := endpoint.NewReceiver(endpoint.ReceiverConfig{ JobID: i.rjid, @@ -86,10 +88,11 @@ func ReplicationIncrementalIsPossibleIfCommonSnapshotIsDestroyed(ctx *platformte snap1 := fsversion(ctx, sfs, "@1") rep := replicationInvocation{ - sjid: sjid, - rjid: rjid, - sfs: sfs, - rfsRoot: rfsRoot, + sjid: sjid, + rjid: rjid, + sfs: sfs, + rfsRoot: rfsRoot, + disableIncrementalStepHolds: false, } rfs := rep.ReceiveSideFilesystem() @@ -149,10 +152,11 @@ func implReplicationIncrementalCleansUpStaleAbstractions(ctx *platformtest.Conte rfsRoot := ctx.RootDataset + "/receiver" rep := replicationInvocation{ - sjid: sjid, - rjid: rjid, - sfs: sfs, - rfsRoot: rfsRoot, + sjid: sjid, + rjid: rjid, + sfs: sfs, + rfsRoot: rfsRoot, + disableIncrementalStepHolds: false, } rfs := rep.ReceiveSideFilesystem() @@ -322,7 +326,15 @@ func (s *PartialSender) Send(ctx context.Context, r *pdu.SendReq) (r1 *pdu.SendR return r1, r2, r3 } -func ReplicationIsResumableFullSend(ctx *platformtest.Context) { +func ReplicationIsResumableFullSend__DisableIncrementalStepHolds_False(ctx *platformtest.Context) { + implReplicationIsResumableFullSend(ctx, false) +} + +func ReplicationIsResumableFullSend__DisableIncrementalStepHolds_True(ctx *platformtest.Context) { + implReplicationIsResumableFullSend(ctx, true) +} + +func implReplicationIsResumableFullSend(ctx *platformtest.Context, disableIncrementalStepHolds bool) { platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, ` CREATEROOT @@ -353,6 +365,7 @@ func ReplicationIsResumableFullSend(ctx *platformtest.Context) { interceptSender: func(e *endpoint.Sender) logic.Sender { return &PartialSender{Sender: e, failAfterByteCount: 1 << 20} }, + disableIncrementalStepHolds: disableIncrementalStepHolds, } rfs := rep.ReceiveSideFilesystem()