diff --git a/config/config.go b/config/config.go index 81c3104..6ff2718 100644 --- a/config/config.go +++ b/config/config.go @@ -76,7 +76,12 @@ type SnapJob struct { } type SendOptions struct { - Encrypted bool `yaml:"encrypted"` + Encrypted bool `yaml:"encrypted"` + StepHolds SendOptionsStepHolds `yaml:"step_holds,optional"` +} + +type SendOptionsStepHolds struct { + DisableIncremental bool `yaml:"disable_incremental,optional"` } var _ yaml.Defaulter = (*SendOptions)(nil) diff --git a/daemon/job/active.go b/daemon/job/active.go index a65a8a6..fb66f94 100644 --- a/daemon/job/active.go +++ b/daemon/job/active.go @@ -152,9 +152,10 @@ func modePushFromConfig(g *config.Global, in *config.PushJob, jobID endpoint.Job } m.senderConfig = &endpoint.SenderConfig{ - FSF: fsf, - Encrypt: &zfs.NilBool{B: in.Send.Encrypted}, - JobID: jobID, + FSF: fsf, + Encrypt: &zfs.NilBool{B: in.Send.Encrypted}, + DisableIncrementalStepHolds: in.Send.StepHolds.DisableIncremental, + JobID: jobID, } m.plannerPolicy = &logic.PlannerPolicy{ EncryptedSend: logic.TriFromBool(in.Send.Encrypted), diff --git a/daemon/job/passive.go b/daemon/job/passive.go index 3d3f79c..3c1d32e 100644 --- a/daemon/job/passive.go +++ b/daemon/job/passive.go @@ -79,9 +79,10 @@ func modeSourceFromConfig(g *config.Global, in *config.SourceJob, jobID endpoint return nil, errors.Wrap(err, "cannot build filesystem filter") } m.senderConfig = &endpoint.SenderConfig{ - FSF: fsf, - Encrypt: &zfs.NilBool{B: in.Send.Encrypted}, - JobID: jobID, + FSF: fsf, + Encrypt: &zfs.NilBool{B: in.Send.Encrypted}, + DisableIncrementalStepHolds: in.Send.StepHolds.DisableIncremental, + JobID: jobID, } if m.snapper, err = snapper.FromConfig(g, fsf, in.Snapshotting); err != nil { diff --git a/daemon/job/snapjob.go b/daemon/job/snapjob.go index 3ce7ee4..24f6863 100644 --- a/daemon/job/snapjob.go +++ b/daemon/job/snapjob.go @@ -175,6 +175,8 @@ func (j *SnapJob) doPrune(ctx context.Context) { FSF: j.fsfilter, // FIXME encryption setting is irrelevant for SnapJob because the endpoint is only used as pruner.Target Encrypt: &zfs.NilBool{B: true}, + // FIXME DisableIncrementalStepHolds setting is irrelevant for SnapJob because the endpoint is only used as pruner.Target + DisableIncrementalStepHolds: false, }) j.pruner = j.prunerFactory.BuildLocalPruner(ctx, sender, alwaysUpToDateReplicationCursorHistory{sender}) log.Info("start pruning") diff --git a/docs/configuration/sendrecvoptions.rst b/docs/configuration/sendrecvoptions.rst index 3e75d72..25e2880 100644 --- a/docs/configuration/sendrecvoptions.rst +++ b/docs/configuration/sendrecvoptions.rst @@ -16,6 +16,8 @@ Send Options filesystems: ... send: encrypted: true + step_holds: + disable_incremental: false ... :ref:`Source` and :ref:`push` jobs have an optional ``send`` configuration section. @@ -34,6 +36,23 @@ Filesystems matched by ``filesystems`` that are not encrypted are not sent and w If ``encryption=false``, zrepl expects that filesystems matching ``filesystems`` are not encrypted or have loaded encryption keys. + +``step_holds.disable_incremental`` option +----------------------------------------- + +The ``step_holds.disable_incremental`` variable controls whether the creation of :ref:`step holds ` should be disabled for incremental replication. +The default value is ``false``. + +Disabling step holds has the disadvantage that steps :ref:`might not be resumable ` if interrupted. +Non-resumability means that replication progress is no longer monotonic which might result in a replication setup that never makes progress if mid-step interruptions are too frequent (e.g. frequent network outages). + +However, the advantage and :issue:`reason for existence <288>` of this flag is that it allows the pruner to delete snapshots of interrupted replication steps +which is useful if replication happens so rarely (or fails so frequently) that the amount of disk space exclusively referenced by the step's snapshots becomes intolerable. + +.. NOTE:: + + When setting this flag to ``true``, existing step holds for the job will be destroyed on the next replication attempt. + .. _job-recv-options: Recv Options diff --git a/endpoint/endpoint.go b/endpoint/endpoint.go index ee72a08..96ad595 100644 --- a/endpoint/endpoint.go +++ b/endpoint/endpoint.go @@ -21,9 +21,10 @@ import ( ) type SenderConfig struct { - FSF zfs.DatasetFilter - Encrypt *zfs.NilBool - JobID JobID + FSF zfs.DatasetFilter + Encrypt *zfs.NilBool + DisableIncrementalStepHolds bool + JobID JobID } func (c *SenderConfig) Validate() error { @@ -39,9 +40,10 @@ func (c *SenderConfig) Validate() error { // Sender implements replication.ReplicationEndpoint for a sending side type Sender struct { - FSFilter zfs.DatasetFilter - encrypt *zfs.NilBool - jobId JobID + FSFilter zfs.DatasetFilter + encrypt *zfs.NilBool + disableIncrementalStepHolds bool + jobId JobID } func NewSender(conf SenderConfig) *Sender { @@ -49,9 +51,10 @@ func NewSender(conf SenderConfig) *Sender { panic("invalid config" + err.Error()) } return &Sender{ - FSFilter: conf.FSF, - encrypt: conf.Encrypt, - jobId: conf.JobID, + FSFilter: conf.FSF, + encrypt: conf.Encrypt, + disableIncrementalStepHolds: conf.DisableIncrementalStepHolds, + jobId: conf.JobID, } } @@ -221,9 +224,11 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea } } + takeStepHolds := sendArgs.FromVersion == nil || !s.disableIncrementalStepHolds + var fromHold, toHold Abstraction // make sure `From` doesn't go away in order to make this step resumable - if sendArgs.From != nil { + if sendArgs.From != nil && takeStepHolds { fromHold, err = HoldStep(ctx, sendArgs.FS, *sendArgs.FromVersion, s.jobId) // no shadow if err == zfs.ErrBookmarkCloningNotSupported { getLogger(ctx).Debug("not creating step bookmark because ZFS does not support it") @@ -232,10 +237,12 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea return nil, nil, errors.Wrapf(err, "cannot hold `from` version %q before starting send", *sendArgs.FromVersion) } } - // make sure `To` doesn't go away in order to make this step resumable - toHold, err = HoldStep(ctx, sendArgs.FS, sendArgs.ToVersion, s.jobId) - if err != nil { - return nil, nil, errors.Wrapf(err, "cannot hold `to` version %q before starting send", sendArgs.ToVersion) + if takeStepHolds { + // make sure `To` doesn't go away in order to make this step resumable + toHold, err = HoldStep(ctx, sendArgs.FS, sendArgs.ToVersion, s.jobId) + if err != nil { + return nil, nil, errors.Wrapf(err, "cannot hold `to` version %q before starting send", sendArgs.ToVersion) + } } // cleanup the mess that _this function_ might have created in prior failed attempts: @@ -254,11 +261,11 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea // // Note further that a resuming send, due to the idempotent nature of func CreateReplicationCursor and HoldStep, // will never lose its step holds because we just (idempotently re-)created them above, before attempting the cleanup. + liveAbs := []Abstraction{fromHold, toHold, fromReplicationCursor} func() { ctx, endSpan := trace.WithSpan(ctx, "cleanup-stale-abstractions") defer endSpan() - liveAbs := []Abstraction{fromHold, toHold, fromReplicationCursor} keep := func(a Abstraction) (keep bool) { keep = false for _, k := range liveAbs { @@ -275,7 +282,10 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea } for _, staleVersion := range obsoleteAbs { for _, mustLiveVersion := range mustLiveVersions { - if zfs.FilesystemVersionEqualIdentity(mustLiveVersion, staleVersion.GetFilesystemVersion()) { + isSendArg := zfs.FilesystemVersionEqualIdentity(mustLiveVersion, staleVersion.GetFilesystemVersion()) + isStepHoldWeMightHaveCreatedWithCurrentValueOf_takeStepHolds := + takeStepHolds && staleVersion.GetType() == AbstractionStepHold + if isSendArg && isStepHoldWeMightHaveCreatedWithCurrentValueOf_takeStepHolds { panic(fmt.Sprintf("impl error: %q would be destroyed because it is considered stale but it is part of of sendArgs=%s", mustLiveVersion.String(), pretty.Sprint(sendArgs))) } } @@ -283,13 +293,11 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea } sendAbstractionsCacheSingleton.TryBatchDestroy(ctx, s.jobId, sendArgs.FS, keep, check) }() - - if fromHold != nil { - sendAbstractionsCacheSingleton.Put(fromHold) - } - sendAbstractionsCacheSingleton.Put(toHold) - if fromReplicationCursor != nil { - sendAbstractionsCacheSingleton.Put(fromReplicationCursor) + // now add the newly created abstractions to the cleaned-up cache + for _, a := range liveAbs { + if a != nil { + sendAbstractionsCacheSingleton.Put(a) + } } sendStream, err := zfs.ZFSSend(ctx, sendArgs) diff --git a/platformtest/tests/generated_cases.go b/platformtest/tests/generated_cases.go index 311944d..3a4396d 100644 --- a/platformtest/tests/generated_cases.go +++ b/platformtest/tests/generated_cases.go @@ -18,8 +18,10 @@ var Cases = []Case{BatchDestroy, ReceiveForceRollbackWorksUnencrypted, ReplicationIncrementalCleansUpStaleAbstractionsWithCacheOnSecondReplication, ReplicationIncrementalCleansUpStaleAbstractionsWithoutCacheOnSecondReplication, + ReplicationIncrementalDestroysStepHoldsIffIncrementalStepHoldsAreDisabledButStepHoldsExist, ReplicationIncrementalIsPossibleIfCommonSnapshotIsDestroyed, - ReplicationIsResumableFullSend, + ReplicationIsResumableFullSend__DisableIncrementalStepHolds_False, + ReplicationIsResumableFullSend__DisableIncrementalStepHolds_True, ResumableRecvAndTokenHandling, ResumeTokenParsing, SendArgsValidationEncryptedSendOfUnencryptedDatasetForbidden, diff --git a/platformtest/tests/replication.go b/platformtest/tests/replication.go index 8176b63..23cb4a4 100644 --- a/platformtest/tests/replication.go +++ b/platformtest/tests/replication.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "path" + "sort" "github.com/kr/pretty" "github.com/stretchr/testify/require" @@ -26,10 +27,11 @@ import ( // of a new sender and receiver instance and one blocking invocation // of the replication engine without encryption type replicationInvocation struct { - sjid, rjid endpoint.JobID - sfs string - rfsRoot string - interceptSender func(e *endpoint.Sender) logic.Sender + sjid, rjid endpoint.JobID + sfs string + rfsRoot string + interceptSender func(e *endpoint.Sender) logic.Sender + disableIncrementalStepHolds bool } func (i replicationInvocation) Do(ctx *platformtest.Context) *report.Report { @@ -42,9 +44,10 @@ func (i replicationInvocation) Do(ctx *platformtest.Context) *report.Report { err := sfilter.Add(i.sfs, "ok") require.NoError(ctx, err) sender := i.interceptSender(endpoint.NewSender(endpoint.SenderConfig{ - FSF: sfilter.AsFilter(), - Encrypt: &zfs.NilBool{B: false}, - JobID: i.sjid, + FSF: sfilter.AsFilter(), + Encrypt: &zfs.NilBool{B: false}, + DisableIncrementalStepHolds: i.disableIncrementalStepHolds, + JobID: i.sjid, })) receiver := endpoint.NewReceiver(endpoint.ReceiverConfig{ JobID: i.rjid, @@ -86,10 +89,11 @@ func ReplicationIncrementalIsPossibleIfCommonSnapshotIsDestroyed(ctx *platformte snap1 := fsversion(ctx, sfs, "@1") rep := replicationInvocation{ - sjid: sjid, - rjid: rjid, - sfs: sfs, - rfsRoot: rfsRoot, + sjid: sjid, + rjid: rjid, + sfs: sfs, + rfsRoot: rfsRoot, + disableIncrementalStepHolds: false, } rfs := rep.ReceiveSideFilesystem() @@ -149,10 +153,11 @@ func implReplicationIncrementalCleansUpStaleAbstractions(ctx *platformtest.Conte rfsRoot := ctx.RootDataset + "/receiver" rep := replicationInvocation{ - sjid: sjid, - rjid: rjid, - sfs: sfs, - rfsRoot: rfsRoot, + sjid: sjid, + rjid: rjid, + sfs: sfs, + rfsRoot: rfsRoot, + disableIncrementalStepHolds: false, } rfs := rep.ReceiveSideFilesystem() @@ -178,7 +183,7 @@ func implReplicationIncrementalCleansUpStaleAbstractions(ctx *platformtest.Conte require.NoError(ctx, err) require.Contains(ctx, holds, expectRjidHoldTag) - // create artifical stale replication cursors + // create artifical stale replication cursors & step holds createArtificalStaleAbstractions := func(jobId endpoint.JobID) []endpoint.Abstraction { snap2Cursor, err := endpoint.CreateReplicationCursor(ctx, sfs, snap2, jobId) // no shadow require.NoError(ctx, err) @@ -322,7 +327,15 @@ func (s *PartialSender) Send(ctx context.Context, r *pdu.SendReq) (r1 *pdu.SendR return r1, r2, r3 } -func ReplicationIsResumableFullSend(ctx *platformtest.Context) { +func ReplicationIsResumableFullSend__DisableIncrementalStepHolds_False(ctx *platformtest.Context) { + implReplicationIsResumableFullSend(ctx, false) +} + +func ReplicationIsResumableFullSend__DisableIncrementalStepHolds_True(ctx *platformtest.Context) { + implReplicationIsResumableFullSend(ctx, true) +} + +func implReplicationIsResumableFullSend(ctx *platformtest.Context, disableIncrementalStepHolds bool) { platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, ` CREATEROOT @@ -353,6 +366,7 @@ func ReplicationIsResumableFullSend(ctx *platformtest.Context) { interceptSender: func(e *endpoint.Sender) logic.Sender { return &PartialSender{Sender: e, failAfterByteCount: 1 << 20} }, + disableIncrementalStepHolds: disableIncrementalStepHolds, } rfs := rep.ReceiveSideFilesystem() @@ -394,3 +408,146 @@ func ReplicationIsResumableFullSend(ctx *platformtest.Context) { _ = fsversion(ctx, rfs, "@3") } + +func ReplicationIncrementalDestroysStepHoldsIffIncrementalStepHoldsAreDisabledButStepHoldsExist(ctx *platformtest.Context) { + + platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, ` + CREATEROOT + + "sender" + + "receiver" + R zfs create -p "${ROOTDS}/receiver/${ROOTDS}" + `) + + sjid := endpoint.MustMakeJobID("sender-job") + rjid := endpoint.MustMakeJobID("receiver-job") + + sfs := ctx.RootDataset + "/sender" + rfsRoot := ctx.RootDataset + "/receiver" + + // fully replicate snapshots @1 + { + mustSnapshot(ctx, sfs+"@1") + rep := replicationInvocation{ + sjid: sjid, + rjid: rjid, + sfs: sfs, + rfsRoot: rfsRoot, + disableIncrementalStepHolds: false, + } + rfs := rep.ReceiveSideFilesystem() + report := rep.Do(ctx) + ctx.Logf("\n%s", pretty.Sprint(report)) + // assert this worked (not the main subject of the test) + _ = fsversion(ctx, rfs, "@1") + } + + // create a large snapshot @2 + { + sfsmp, err := zfs.ZFSGetMountpoint(ctx, sfs) + require.NoError(ctx, err) + require.True(ctx, sfsmp.Mounted) + writeDummyData(path.Join(sfsmp.Mountpoint, "dummy.data"), 1<<22) + mustSnapshot(ctx, sfs+"@2") + } + snap2sfs := fsversion(ctx, sfs, "@2") + + // partially replicate snapshots @2 with step holds enabled + // to effect a step-holds situation + { + rep := replicationInvocation{ + sjid: sjid, + rjid: rjid, + sfs: sfs, + rfsRoot: rfsRoot, + disableIncrementalStepHolds: false, // ! + interceptSender: func(e *endpoint.Sender) logic.Sender { + return &PartialSender{Sender: e, failAfterByteCount: 1 << 20} + }, + } + rfs := rep.ReceiveSideFilesystem() + report := rep.Do(ctx) + ctx.Logf("\n%s", pretty.Sprint(report)) + // assert this partial receive worked + _, err := zfs.ZFSGetFilesystemVersion(ctx, rfs+"@2") + ctx.Logf("%T %s", err, err) + _, notFullyReceived := err.(*zfs.DatasetDoesNotExist) + require.True(ctx, notFullyReceived) + // assert step holds are in place + abs, absErrs, err := endpoint.ListAbstractions(ctx, endpoint.ListZFSHoldsAndBookmarksQuery{ + FS: endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{ + FS: &sfs, + }, + Concurrency: 1, + JobID: &sjid, + What: endpoint.AbstractionTypeSet{endpoint.AbstractionStepHold: true}, + }) + require.NoError(ctx, err) + require.Empty(ctx, absErrs) + require.Len(ctx, abs, 2) + sort.Slice(abs, func(i, j int) bool { + return abs[i].GetCreateTXG() < abs[j].GetCreateTXG() + }) + require.True(ctx, zfs.FilesystemVersionEqualIdentity(abs[0].GetFilesystemVersion(), fsversion(ctx, sfs, "@1"))) + require.True(ctx, zfs.FilesystemVersionEqualIdentity(abs[1].GetFilesystemVersion(), fsversion(ctx, sfs, "@2"))) + } + + // + // end of test setup + // + + // retry replication with incremental step holds disabled + // - replication should not fail due to holds-related stuff + // - replication should fail intermittently due to partial sender being fully read + // - the partial sender is 1/4th the length of the stream, thus expect + // successful replication after 5 more attempts + rep := replicationInvocation{ + sjid: sjid, + rjid: rjid, + sfs: sfs, + rfsRoot: rfsRoot, + disableIncrementalStepHolds: true, // ! + interceptSender: func(e *endpoint.Sender) logic.Sender { + return &PartialSender{Sender: e, failAfterByteCount: 1 << 20} + }, + } + rfs := rep.ReceiveSideFilesystem() + for i := 0; ; i++ { + require.True(ctx, i < 5) + report := rep.Do(ctx) + ctx.Logf("retry run=%v\n%s", i, pretty.Sprint(report)) + _, err := zfs.ZFSGetFilesystemVersion(ctx, rfs+"@2") + if err == nil { + break + } + } + + // assert replication worked + fsversion(ctx, rfs, "@2") + + // assert no step holds exist + abs, absErrs, err := endpoint.ListAbstractions(ctx, endpoint.ListZFSHoldsAndBookmarksQuery{ + FS: endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{ + FS: &sfs, + }, + Concurrency: 1, + JobID: &sjid, + What: endpoint.AbstractionTypeSet{endpoint.AbstractionStepHold: true}, + }) + require.NoError(ctx, err) + require.Empty(ctx, absErrs) + require.Len(ctx, abs, 0) + + // assert that the replication cursor bookmark exists + abs, absErrs, err = endpoint.ListAbstractions(ctx, endpoint.ListZFSHoldsAndBookmarksQuery{ + FS: endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{ + FS: &sfs, + }, + Concurrency: 1, + JobID: &sjid, + What: endpoint.AbstractionTypeSet{endpoint.AbstractionReplicationCursorBookmarkV2: true}, + }) + require.NoError(ctx, err) + require.Empty(ctx, absErrs) + require.Len(ctx, abs, 1) + require.True(ctx, zfs.FilesystemVersionEqualIdentity(abs[0].GetFilesystemVersion(), snap2sfs)) +}