From 2642c6430378299c91d585f517f4a072e5ac4786 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sun, 1 May 2022 14:46:38 +0200 Subject: [PATCH] make initial replication policy configurable (most_recent, all, fail) Config: ``` - type: push ... conflict_resolution: initial_replication: most_recent | all | fali ``` The ``initial_replication`` option determines which snapshots zrepl replicates if the filesystem has not been replicated before. If ``most_recent`` (the default), the initial replication will only transfer the most recent snapshot, while ignoring previous snapshots. If all snapshots should be replicated, specify ``all``. Use ``fail`` to make replication of the filesystem fail in case there is no corresponding fileystem on the receiver. Code-Level Changes, apart from the obvious: - Rework IncrementalPath()'s return signature. Now returns an error for initial replications as well. - Rename & rework it's consumer, resolveConflict(). Co-authored-by: Graham Christensen Fixes https://github.com/zrepl/zrepl/issues/550 Fixes https://github.com/zrepl/zrepl/issues/187 Closes https://github.com/zrepl/zrepl/pull/592 --- config/config.go | 17 ++-- daemon/job/active.go | 12 +++ docs/configuration.rst | 1 + docs/configuration/conflict_resolution.rst | 36 ++++++++ endpoint/endpoint_zfs_abstraction.go | 13 ++- platformtest/tests/generated_cases.go | 3 + platformtest/tests/replication.go | 83 +++++++++++++++++ replication/logic/diff/diff.go | 34 +++++-- replication/logic/diff/diff_test.go | 35 ++++++-- ...initialreplicationautoresolution_enumer.go | 62 +++++++++++++ replication/logic/replication_logic.go | 90 ++++++++++++------- replication/logic/replication_logic_policy.go | 69 +++++++++++++- 12 files changed, 394 insertions(+), 61 deletions(-) create mode 100644 docs/configuration/conflict_resolution.rst create mode 100644 replication/logic/initialreplicationautoresolution_enumer.go diff --git a/config/config.go b/config/config.go index 196ed52..17263be 100644 --- a/config/config.go +++ b/config/config.go @@ -55,12 +55,17 @@ func (j JobEnum) Name() string { } type ActiveJob struct { - Type string `yaml:"type"` - Name string `yaml:"name"` - Connect ConnectEnum `yaml:"connect"` - Pruning PruningSenderReceiver `yaml:"pruning"` - Debug JobDebugSettings `yaml:"debug,optional"` - Replication *Replication `yaml:"replication,optional,fromdefaults"` + Type string `yaml:"type"` + Name string `yaml:"name"` + Connect ConnectEnum `yaml:"connect"` + Pruning PruningSenderReceiver `yaml:"pruning"` + Debug JobDebugSettings `yaml:"debug,optional"` + Replication *Replication `yaml:"replication,optional,fromdefaults"` + ConflictResolution *ConflictResolution `yaml:"conflict_resolution,optional,fromdefaults"` +} + +type ConflictResolution struct { + InitialReplication string `yaml:"initial_replication,optional,default=most_recent"` } type PassiveJob struct { diff --git a/daemon/job/active.go b/daemon/job/active.go index ebececf..b540789 100644 --- a/daemon/job/active.go +++ b/daemon/job/active.go @@ -162,8 +162,14 @@ func modePushFromConfig(g *config.Global, in *config.PushJob, jobID endpoint.Job return nil, errors.Wrap(err, "field `replication`") } + conflictResolution, err := logic.ConflictResolutionFromConfig(in.ConflictResolution) + if err != nil { + return nil, errors.Wrap(err, "field `conflict_resolution`") + } + m.plannerPolicy = &logic.PlannerPolicy{ EncryptedSend: logic.TriFromBool(in.Send.Encrypted), + ConflictResolution: conflictResolution, ReplicationConfig: replicationConfig, SizeEstimationConcurrency: in.Replication.Concurrency.SizeEstimates, } @@ -261,8 +267,14 @@ func modePullFromConfig(g *config.Global, in *config.PullJob, jobID endpoint.Job return nil, errors.Wrap(err, "field `replication`") } + conflictResolution, err := logic.ConflictResolutionFromConfig(in.ConflictResolution) + if err != nil { + return nil, errors.Wrap(err, "field `conflict_resolution`") + } + m.plannerPolicy = &logic.PlannerPolicy{ EncryptedSend: logic.DontCare, + ConflictResolution: conflictResolution, ReplicationConfig: replicationConfig, SizeEstimationConcurrency: in.Replication.Concurrency.SizeEstimates, } diff --git a/docs/configuration.rst b/docs/configuration.rst index 51fd5c4..f25878f 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -13,6 +13,7 @@ Configuration configuration/filter_syntax configuration/sendrecvoptions configuration/replication + configuration/conflict_resolution configuration/snapshotting configuration/prune configuration/logging diff --git a/docs/configuration/conflict_resolution.rst b/docs/configuration/conflict_resolution.rst new file mode 100644 index 0000000..526e37d --- /dev/null +++ b/docs/configuration/conflict_resolution.rst @@ -0,0 +1,36 @@ +.. include:: ../global.rst.inc + + +Conflict Resolution Options +=========================== + + +:: + + jobs: + - type: push + filesystems: ... + conflict_resolution: + initial_replication: most_recent | all | fail # default: most_recent + + ... + +.. _conflict_resolution-initial_replication-option-send_all_snapshots: + + +``initial_replication`` option +------------------------------ + +The ``initial_replication`` option determines how many snapshots zrepl replicates if the filesystem has not been replicated before. +If ``most_recent`` (the default), the initial replication will only transfer the most recent snapshot, while ignoring previous snapshots. +If all snapshots should be replicated, specify ``all``. +Use ``fail`` to make replication of the filesystem fail in case there is no corresponding fileystem on the receiver. + +For example, suppose there are snapshosts ``tank@1``, ``tank@2``, ``tank@3`` on a sender. +Then ``most_recent`` will replicate just ``@3``, but ``all`` will replicate ``@1``, ``@2``, and ``@3``. + +If initial replication is interrupted, and there is at least one (maybe partial) snapshot on the receiver, zrepl will always resume in **incremental mode**. +And that is regardless of where the initial replication was interrupted. + +For example, if ``initial_replication: all`` and the transfer of ``@1`` is interrupted, zrepl would retry/resume at ``@1``. +And even if the user changes the config to ``initial_replication: most_recent`` before resuming, **incremental mode** will still resume at ``@1``. diff --git a/endpoint/endpoint_zfs_abstraction.go b/endpoint/endpoint_zfs_abstraction.go index e7928ac..9bfa3be 100644 --- a/endpoint/endpoint_zfs_abstraction.go +++ b/endpoint/endpoint_zfs_abstraction.go @@ -813,7 +813,7 @@ func listStaleFiltering(abs []Abstraction, sinceBound *CreateTXGRangeBound) *Sta l := by[k] if k.Type == AbstractionStepHold { - // all older than the most recent cursor are stale, others are always live + // all step holds older than the most recent cursor are stale, others are always live // if we don't have a replication cursor yet, use untilBound = nil // to consider all steps stale (...at first) @@ -821,11 +821,10 @@ func listStaleFiltering(abs []Abstraction, sinceBound *CreateTXGRangeBound) *Sta { sfnsc := stepFirstNotStaleCandidates[k.fsAndJobId] - // if there's a replication cursor, use it as a cutoff between live and stale - // if there's none, we are in initial replication and only need to keep - // the most recent step hold live, since that's what our initial replication strategy - // uses (both initially and on resume) - // (FIXME hardcoded replication strategy) + // If there's a replication cursor, use it as a cutoff between live and stale + // for both cursors and holds. + // If there's no cursor, we are in initial replication and only need to keep + // the most recent step hold live, which hold the .To of the initial send step. if sfnsc.cursor != nil { untilBound = &CreateTXGRangeBound{ CreateTXG: (*sfnsc.cursor).GetCreateTXG(), @@ -858,7 +857,7 @@ func listStaleFiltering(abs []Abstraction, sinceBound *CreateTXGRangeBound) *Sta } } else if k.Type == AbstractionReplicationCursorBookmarkV2 || k.Type == AbstractionLastReceivedHold { - // all but the most recent are stale by definition (we always _move_ them) + // all cursors but the most recent cursor are stale by definition (we always _move_ them) // NOTE: must not use firstNotStale in this branch, not computed for these types // sort descending (highest createtxg first), then cut off diff --git a/platformtest/tests/generated_cases.go b/platformtest/tests/generated_cases.go index c5aeebe..83f9eda 100644 --- a/platformtest/tests/generated_cases.go +++ b/platformtest/tests/generated_cases.go @@ -21,6 +21,9 @@ var Cases = []Case{BatchDestroy, ReplicationIncrementalCleansUpStaleAbstractionsWithoutCacheOnSecondReplication, ReplicationIncrementalDestroysStepHoldsIffIncrementalStepHoldsAreDisabledButStepHoldsExist, ReplicationIncrementalIsPossibleIfCommonSnapshotIsDestroyed, + ReplicationInitialAll, + ReplicationInitialFail, + ReplicationInitialMostRecent, ReplicationIsResumableFullSend__both_GuaranteeResumability, ReplicationIsResumableFullSend__initial_GuaranteeIncrementalReplication_incremental_GuaranteeIncrementalReplication, ReplicationIsResumableFullSend__initial_GuaranteeResumability_incremental_GuaranteeIncrementalReplication, diff --git a/platformtest/tests/replication.go b/platformtest/tests/replication.go index 12addc2..0173b1e 100644 --- a/platformtest/tests/replication.go +++ b/platformtest/tests/replication.go @@ -44,6 +44,7 @@ type replicationInvocation struct { guarantee *pdu.ReplicationConfigProtection senderConfigHook func(*endpoint.SenderConfig) receiverConfigHook func(*endpoint.ReceiverConfig) + plannerPolicyHook func(*logic.PlannerPolicy) } func (i replicationInvocation) Do(ctx *platformtest.Context) *report.Report { @@ -95,8 +96,14 @@ func (i replicationInvocation) Do(ctx *platformtest.Context) *report.Report { ReplicationConfig: &pdu.ReplicationConfig{ Protection: i.guarantee, }, + ConflictResolution: &logic.ConflictResolution{ + InitialReplication: logic.InitialReplicationAutoResolutionMostRecent, + }, SizeEstimationConcurrency: 1, } + if i.plannerPolicyHook != nil { + i.plannerPolicyHook(&plannerPolicy) + } report, wait := replication.Do( ctx, @@ -1306,3 +1313,79 @@ func ReplicationPlaceholderEncryption__EncryptOnReceiverUseCase__WorksIfConfigur require.NoError(ctx, err) require.Equal(ctx, rfsRoot, props.Get("encryptionroot")) } + +func replicationInitialImpl(ctx *platformtest.Context, iras logic.InitialReplicationAutoResolution, expectExactRfsSnaps []string) *report.Report { + // reverse order for snap names to expose sorting assumptions + platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, ` + CREATEROOT + + "sender" + + "sender@3" + + "sender@2" + + "sender@1" + + "receiver" + R zfs create -p "${ROOTDS}/receiver/${ROOTDS}" + `) + + sjid := endpoint.MustMakeJobID("sender-job") + rjid := endpoint.MustMakeJobID("receiver-job") + + sfs := ctx.RootDataset + "/sender" + rfsRoot := ctx.RootDataset + "/receiver" + + rep := replicationInvocation{ + sjid: sjid, + rjid: rjid, + sfs: sfs, + rfsRoot: rfsRoot, + guarantee: pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeResumability), + plannerPolicyHook: func(pp *logic.PlannerPolicy) { + pp.ConflictResolution.InitialReplication = iras + }, + } + rfs := rep.ReceiveSideFilesystem() + + // first replication + report := rep.Do(ctx) + ctx.Logf("\n%s", pretty.Sprint(report)) + + versions, err := zfs.ZFSListFilesystemVersions(ctx, mustDatasetPath(rfs), zfs.ListFilesystemVersionsOptions{Types: zfs.Snapshots}) + if _, ok := err.(*zfs.DatasetDoesNotExist); ok { + versions = nil + } else { + require.NoError(ctx, err) + } + + bySnapName := make(map[string]int) + for _, v := range versions { + bySnapName[v.GetName()] += 1 + } + for _, v := range expectExactRfsSnaps { + bySnapName[v] -= 1 + } + + for _, v := range bySnapName { + if v != 0 { + ctx.Logf("unexpected snaps:\n%#v", bySnapName) + ctx.FailNow() + } + } + + return report +} + +func ReplicationInitialAll(ctx *platformtest.Context) { + replicationInitialImpl(ctx, logic.InitialReplicationAutoResolutionAll, []string{"3", "2", "1"}) +} + +func ReplicationInitialMostRecent(ctx *platformtest.Context) { + replicationInitialImpl(ctx, logic.InitialReplicationAutoResolutionMostRecent, []string{"1"}) +} + +func ReplicationInitialFail(ctx *platformtest.Context) { + report := replicationInitialImpl(ctx, logic.InitialReplicationAutoResolutionFail, []string{}) + require.Len(ctx, report.Attempts, 1) + require.Nil(ctx, report.Attempts[0].PlanError) + require.Len(ctx, report.Attempts[0].Filesystems, 1) + require.NotNil(ctx, report.Attempts[0].Filesystems[0].PlanError) + require.Contains(ctx, report.Attempts[0].Filesystems[0].PlanError.Err, "automatic conflict resolution for initial replication is disabled in config") +} diff --git a/replication/logic/diff/diff.go b/replication/logic/diff/diff.go index b3c4c22..477a334 100644 --- a/replication/logic/diff/diff.go +++ b/replication/logic/diff/diff.go @@ -49,6 +49,23 @@ func (c *ConflictDiverged) Error() string { return buf.String() } +type ConflictNoSenderSnapshots struct{} + +func (c *ConflictNoSenderSnapshots) Error() string { + return "no snapshots available on sender side" +} + +type ConflictMostRecentSnapshotAlreadyPresent struct { + SortedSenderVersions, SortedReceiverVersions []*FilesystemVersion + CommonAncestor *FilesystemVersion +} + +func (c *ConflictMostRecentSnapshotAlreadyPresent) Error() string { + var buf strings.Builder + fmt.Fprintf(&buf, "the most recent sender snapshot is already present on the receiver (guid=%v, name=%q)", c.CommonAncestor.GetGuid(), c.CommonAncestor.RelName()) + return buf.String() +} + func SortVersionListByCreateTXGThenBookmarkLTSnapshot(fsvslice []*FilesystemVersion) []*FilesystemVersion { lesser := func(s []*FilesystemVersion) func(i, j int) bool { return func(i, j int) bool { @@ -71,7 +88,6 @@ func SortVersionListByCreateTXGThenBookmarkLTSnapshot(fsvslice []*FilesystemVers return sorted } -// conflict may be a *ConflictDiverged or a *ConflictNoCommonAncestor func IncrementalPath(receiver, sender []*FilesystemVersion) (incPath []*FilesystemVersion, conflict error) { receiver = SortVersionListByCreateTXGThenBookmarkLTSnapshot(receiver) @@ -98,9 +114,13 @@ findCandidate: // handle failure cases if !mrcaCandidate.found { - return nil, &ConflictNoCommonAncestor{ - SortedSenderVersions: sender, - SortedReceiverVersions: receiver, + if len(sender) == 0 { + return nil, &ConflictNoSenderSnapshots{} + } else { + return nil, &ConflictNoCommonAncestor{ + SortedSenderVersions: sender, + SortedReceiverVersions: receiver, + } } } else if mrcaCandidate.r != len(receiver)-1 { return nil, &ConflictDiverged{ @@ -125,7 +145,11 @@ findCandidate: } if len(incPath) == 1 { // nothing to do - incPath = incPath[1:] + return nil, &ConflictMostRecentSnapshotAlreadyPresent{ + SortedSenderVersions: sender, + SortedReceiverVersions: receiver, + CommonAncestor: sender[mrcaCandidate.s], + } } return incPath, nil } diff --git a/replication/logic/diff/diff_test.go b/replication/logic/diff/diff_test.go index 6058648..d8ea0c5 100644 --- a/replication/logic/diff/diff_test.go +++ b/replication/logic/diff/diff_test.go @@ -96,12 +96,37 @@ func TestIncrementalPath_SnapshotsOnly(t *testing.T) { assert.Equal(t, l("@c,3", "@d,4"), path) }) - // sender with earlier but also current version as sender is not a conflict + // nothing to do if fully shared history + doTest(l("@a,1", "@b,2"), l("@a,1", "@b,2"), func(incpath []*FilesystemVersion, conflict error) { + assert.Nil(t, incpath) + assert.NotNil(t, conflict) + _, ok := conflict.(*ConflictMostRecentSnapshotAlreadyPresent) + assert.True(t, ok) + }) + + // ...but it's sufficient if the most recent snapshot is present doTest(l("@c,3"), l("@a,1", "@b,2", "@c,3"), func(path []*FilesystemVersion, conflict error) { - t.Logf("path: %#v", path) - t.Logf("conflict: %#v", conflict) - assert.Empty(t, path) - assert.Nil(t, conflict) + assert.Nil(t, path) + _, ok := conflict.(*ConflictMostRecentSnapshotAlreadyPresent) + assert.True(t, ok) + }) + + // no sender snapshots errors: empty receiver + doTest(l(), l(), func(incpath []*FilesystemVersion, conflict error) { + assert.Nil(t, incpath) + assert.NotNil(t, conflict) + t.Logf("%T", conflict) + _, ok := conflict.(*ConflictNoSenderSnapshots) + assert.True(t, ok) + }) + + // no sender snapshots errors: snapshots on receiver + doTest(l("@a,1"), l(), func(incpath []*FilesystemVersion, conflict error) { + assert.Nil(t, incpath) + assert.NotNil(t, conflict) + t.Logf("%T", conflict) + _, ok := conflict.(*ConflictNoSenderSnapshots) + assert.True(t, ok) }) } diff --git a/replication/logic/initialreplicationautoresolution_enumer.go b/replication/logic/initialreplicationautoresolution_enumer.go new file mode 100644 index 0000000..f543f71 --- /dev/null +++ b/replication/logic/initialreplicationautoresolution_enumer.go @@ -0,0 +1,62 @@ +// Code generated by "enumer -type=InitialReplicationAutoResolution -trimprefix=InitialReplicationAutoResolution"; DO NOT EDIT. + +// +package logic + +import ( + "fmt" +) + +const ( + _InitialReplicationAutoResolutionName_0 = "MostRecentAll" + _InitialReplicationAutoResolutionName_1 = "Fail" +) + +var ( + _InitialReplicationAutoResolutionIndex_0 = [...]uint8{0, 10, 13} + _InitialReplicationAutoResolutionIndex_1 = [...]uint8{0, 4} +) + +func (i InitialReplicationAutoResolution) String() string { + switch { + case 1 <= i && i <= 2: + i -= 1 + return _InitialReplicationAutoResolutionName_0[_InitialReplicationAutoResolutionIndex_0[i]:_InitialReplicationAutoResolutionIndex_0[i+1]] + case i == 4: + return _InitialReplicationAutoResolutionName_1 + default: + return fmt.Sprintf("InitialReplicationAutoResolution(%d)", i) + } +} + +var _InitialReplicationAutoResolutionValues = []InitialReplicationAutoResolution{1, 2, 4} + +var _InitialReplicationAutoResolutionNameToValueMap = map[string]InitialReplicationAutoResolution{ + _InitialReplicationAutoResolutionName_0[0:10]: 1, + _InitialReplicationAutoResolutionName_0[10:13]: 2, + _InitialReplicationAutoResolutionName_1[0:4]: 4, +} + +// InitialReplicationAutoResolutionString retrieves an enum value from the enum constants string name. +// Throws an error if the param is not part of the enum. +func InitialReplicationAutoResolutionString(s string) (InitialReplicationAutoResolution, error) { + if val, ok := _InitialReplicationAutoResolutionNameToValueMap[s]; ok { + return val, nil + } + return 0, fmt.Errorf("%s does not belong to InitialReplicationAutoResolution values", s) +} + +// InitialReplicationAutoResolutionValues returns all values of the enum +func InitialReplicationAutoResolutionValues() []InitialReplicationAutoResolution { + return _InitialReplicationAutoResolutionValues +} + +// IsAInitialReplicationAutoResolution returns "true" if the value is listed in the enum definition. "false" otherwise +func (i InitialReplicationAutoResolution) IsAInitialReplicationAutoResolution() bool { + for _, v := range _InitialReplicationAutoResolutionValues { + if i == v { + return true + } + } + return false +} diff --git a/replication/logic/replication_logic.go b/replication/logic/replication_logic.go index 6e67182..36fedb3 100644 --- a/replication/logic/replication_logic.go +++ b/replication/logic/replication_logic.go @@ -235,25 +235,55 @@ func NewPlanner(secsPerState *prometheus.HistogramVec, bytesReplicated *promethe promBytesReplicated: bytesReplicated, } } -func resolveConflict(conflict error) (path []*pdu.FilesystemVersion, msg string) { + +func tryAutoresolveConflict(conflict error, policy ConflictResolution) (path []*pdu.FilesystemVersion, reason error) { + + if _, ok := conflict.(*ConflictMostRecentSnapshotAlreadyPresent); ok { + // replicatoin is a no-op + return nil, nil + } + if noCommonAncestor, ok := conflict.(*ConflictNoCommonAncestor); ok { if len(noCommonAncestor.SortedReceiverVersions) == 0 { - // TODO this is hard-coded replication policy: most recent snapshot as source - // NOTE: Keep in sync with listStaleFiltering, it depends on this hard-coded assumption - var mostRecentSnap *pdu.FilesystemVersion - for n := len(noCommonAncestor.SortedSenderVersions) - 1; n >= 0; n-- { - if noCommonAncestor.SortedSenderVersions[n].Type == pdu.FilesystemVersion_Snapshot { - mostRecentSnap = noCommonAncestor.SortedSenderVersions[n] - break + + if len(noCommonAncestor.SortedSenderVersions) == 0 { + return nil, fmt.Errorf("no snapshots available on sender side") + } + + switch policy.InitialReplication { + + case InitialReplicationAutoResolutionMostRecent: + + var mostRecentSnap *pdu.FilesystemVersion + for n := len(noCommonAncestor.SortedSenderVersions) - 1; n >= 0; n-- { + if noCommonAncestor.SortedSenderVersions[n].Type == pdu.FilesystemVersion_Snapshot { + mostRecentSnap = noCommonAncestor.SortedSenderVersions[n] + break + } } + return []*pdu.FilesystemVersion{nil, mostRecentSnap}, nil + + case InitialReplicationAutoResolutionAll: + + path = append(path, nil) + + for n := 0; n < len(noCommonAncestor.SortedSenderVersions); n++ { + if noCommonAncestor.SortedSenderVersions[n].Type == pdu.FilesystemVersion_Snapshot { + path = append(path, noCommonAncestor.SortedSenderVersions[n]) + } + } + return path, nil + + case InitialReplicationAutoResolutionFail: + + return nil, fmt.Errorf("automatic conflict resolution for initial replication is disabled in config") + + default: + panic(fmt.Sprintf("unimplemented: %#v", policy.InitialReplication)) } - if mostRecentSnap == nil { - return nil, "no snapshots available on sender side" - } - return []*pdu.FilesystemVersion{mostRecentSnap}, fmt.Sprintf("start replication at most recent snapshot %s", mostRecentSnap.RelName()) } } - return nil, "no automated way to handle conflict type" + return nil, conflict } func (p *Planner) doPlanning(ctx context.Context) ([]*Filesystem, error) { @@ -456,39 +486,31 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) { } else { // resumeToken == nil path, conflict := IncrementalPath(rfsvs, sfsvs) if conflict != nil { - var msg string - path, msg = resolveConflict(conflict) // no shadowing allowed! - if path != nil { - log(ctx).WithField("conflict", conflict).Info("conflict") - log(ctx).WithField("resolution", msg).Info("automatically resolved") + updPath, updConflict := tryAutoresolveConflict(conflict, *fs.policy.ConflictResolution) + if updConflict == nil { + log(ctx).WithField("conflict", conflict).Info("conflict automatically resolved") + } else { - log(ctx).WithField("conflict", conflict).Error("conflict") - log(ctx).WithField("problem", msg).Error("cannot resolve conflict") + log(ctx).WithField("conflict", conflict).Error("cannot resolve conflict") } + path, conflict = updPath, updConflict } - if len(path) == 0 { + if conflict != nil { return nil, conflict } - - steps = make([]*Step, 0, len(path)) // shadow - if len(path) == 1 { - steps = append(steps, &Step{ - parent: fs, - sender: fs.sender, - receiver: fs.receiver, - - from: nil, - to: path[0], - encrypt: fs.policy.EncryptedSend, - }) + if len(path) == 0 { + steps = nil + } else if len(path) == 1 { + panic(fmt.Sprintf("len(path) must be two for incremental repl, and initial repl must start with nil, got path[0]=%#v", path[0])) } else { + steps = make([]*Step, 0, len(path)) // shadow for i := 0; i < len(path)-1; i++ { steps = append(steps, &Step{ parent: fs, sender: fs.sender, receiver: fs.receiver, - from: path[i], + from: path[i], // nil in case of initial repl to: path[i+1], encrypt: fs.policy.EncryptedSend, }) diff --git a/replication/logic/replication_logic_policy.go b/replication/logic/replication_logic_policy.go index 052d41d..8084399 100644 --- a/replication/logic/replication_logic_policy.go +++ b/replication/logic/replication_logic_policy.go @@ -1,6 +1,9 @@ package logic import ( + "fmt" + "strings" + "github.com/go-playground/validator" "github.com/pkg/errors" @@ -8,16 +11,74 @@ import ( "github.com/zrepl/zrepl/replication/logic/pdu" ) +//go:generate enumer -type=InitialReplicationAutoResolution -trimprefix=InitialReplicationAutoResolution +type InitialReplicationAutoResolution uint32 + +const ( + InitialReplicationAutoResolutionMostRecent InitialReplicationAutoResolution = 1 << iota + InitialReplicationAutoResolutionAll + InitialReplicationAutoResolutionFail +) + +var initialReplicationAutoResolutionConfigMap = map[InitialReplicationAutoResolution]string{ + InitialReplicationAutoResolutionMostRecent: "most_recent", + InitialReplicationAutoResolutionAll: "all", + InitialReplicationAutoResolutionFail: "fail", +} + +func InitialReplicationAutoResolutionFromConfig(in string) (InitialReplicationAutoResolution, error) { + for v, s := range initialReplicationAutoResolutionConfigMap { + if s == in { + return v, nil + } + } + l := make([]string, 0, len(initialReplicationAutoResolutionConfigMap)) + for _, v := range InitialReplicationAutoResolutionValues() { + l = append(l, initialReplicationAutoResolutionConfigMap[v]) + } + return 0, fmt.Errorf("invalid value %q, must be one of %s", in, strings.Join(l, ", ")) +} + +type ConflictResolution struct { + InitialReplication InitialReplicationAutoResolution +} + +func (c *ConflictResolution) Validate() error { + if !c.InitialReplication.IsAInitialReplicationAutoResolution() { + return errors.Errorf("must be one of %s", InitialReplicationAutoResolutionValues()) + } + return nil +} + +func ConflictResolutionFromConfig(in *config.ConflictResolution) (*ConflictResolution, error) { + + initialReplication, err := InitialReplicationAutoResolutionFromConfig(in.InitialReplication) + if err != nil { + return nil, errors.Errorf("field `initial_replication` is invalid: %q is not one of %v", in.InitialReplication, InitialReplicationAutoResolutionValues()) + } + + return &ConflictResolution{ + InitialReplication: initialReplication, + }, nil +} + type PlannerPolicy struct { - EncryptedSend tri // all sends must be encrypted (send -w, and encryption!=off) - ReplicationConfig *pdu.ReplicationConfig - SizeEstimationConcurrency int `validate:"gte=1"` + EncryptedSend tri // all sends must be encrypted (send -w, and encryption!=off) + ConflictResolution *ConflictResolution `validate:"ne=nil"` + ReplicationConfig *pdu.ReplicationConfig `validate:"ne=nil"` + SizeEstimationConcurrency int `validate:"gte=1"` } var validate = validator.New() func (p PlannerPolicy) Validate() error { - return validate.Struct(p) + if err := validate.Struct(p); err != nil { + return err + } + if err := p.ConflictResolution.Validate(); err != nil { + return err + } + return nil } func ReplicationConfigFromConfig(in *config.Replication) (*pdu.ReplicationConfig, error) {