mirror of
https://github.com/zrepl/zrepl.git
synced 2025-06-20 17:58:04 +02:00
[#345] fix broken identification of parent-fs for initial replication ordering
fixup of 02807279 fixes #345
This commit is contained in:
parent
1d7a84e8ae
commit
02db5994fe
@ -16,6 +16,7 @@ var Cases = []Case{BatchDestroy,
|
|||||||
ListFilesystemsNoFilter,
|
ListFilesystemsNoFilter,
|
||||||
ReceiveForceIntoEncryptedErr,
|
ReceiveForceIntoEncryptedErr,
|
||||||
ReceiveForceRollbackWorksUnencrypted,
|
ReceiveForceRollbackWorksUnencrypted,
|
||||||
|
ReplicationFailingInitialParentProhibitsChildReplication,
|
||||||
ReplicationIncrementalCleansUpStaleAbstractionsWithCacheOnSecondReplication,
|
ReplicationIncrementalCleansUpStaleAbstractionsWithCacheOnSecondReplication,
|
||||||
ReplicationIncrementalCleansUpStaleAbstractionsWithoutCacheOnSecondReplication,
|
ReplicationIncrementalCleansUpStaleAbstractionsWithoutCacheOnSecondReplication,
|
||||||
ReplicationIncrementalDestroysStepHoldsIffIncrementalStepHoldsAreDisabledButStepHoldsExist,
|
ReplicationIncrementalDestroysStepHoldsIffIncrementalStepHoldsAreDisabledButStepHoldsExist,
|
||||||
|
@ -30,6 +30,7 @@ import (
|
|||||||
type replicationInvocation struct {
|
type replicationInvocation struct {
|
||||||
sjid, rjid endpoint.JobID
|
sjid, rjid endpoint.JobID
|
||||||
sfs string
|
sfs string
|
||||||
|
sfilter *filters.DatasetMapFilter
|
||||||
rfsRoot string
|
rfsRoot string
|
||||||
interceptSender func(e *endpoint.Sender) logic.Sender
|
interceptSender func(e *endpoint.Sender) logic.Sender
|
||||||
interceptReceiver func(e *endpoint.Receiver) logic.Receiver
|
interceptReceiver func(e *endpoint.Receiver) logic.Receiver
|
||||||
@ -45,11 +46,16 @@ func (i replicationInvocation) Do(ctx *platformtest.Context) *report.Report {
|
|||||||
i.interceptReceiver = func(e *endpoint.Receiver) logic.Receiver { return e }
|
i.interceptReceiver = func(e *endpoint.Receiver) logic.Receiver { return e }
|
||||||
}
|
}
|
||||||
|
|
||||||
sfilter := filters.NewDatasetMapFilter(1, true)
|
if i.sfs != "" && i.sfilter != nil || i.sfs == "" && i.sfilter == nil {
|
||||||
err := sfilter.Add(i.sfs, "ok")
|
panic("either sfs or sfilter must be set")
|
||||||
require.NoError(ctx, err)
|
}
|
||||||
|
if i.sfilter == nil {
|
||||||
|
i.sfilter = filters.NewDatasetMapFilter(1, true)
|
||||||
|
err := i.sfilter.Add(i.sfs, "ok")
|
||||||
|
require.NoError(ctx, err)
|
||||||
|
}
|
||||||
sender := i.interceptSender(endpoint.NewSender(endpoint.SenderConfig{
|
sender := i.interceptSender(endpoint.NewSender(endpoint.SenderConfig{
|
||||||
FSF: sfilter.AsFilter(),
|
FSF: i.sfilter.AsFilter(),
|
||||||
Encrypt: &zfs.NilBool{B: false},
|
Encrypt: &zfs.NilBool{B: false},
|
||||||
JobID: i.sjid,
|
JobID: i.sjid,
|
||||||
}))
|
}))
|
||||||
@ -835,3 +841,76 @@ func ReplicationReceiverErrorWhileStillSending(ctx *platformtest.Context) {
|
|||||||
require.NotNil(ctx, afs.StepError)
|
require.NotNil(ctx, afs.StepError)
|
||||||
require.Contains(ctx, afs.StepError.Err, mockRecvErr.Error())
|
require.Contains(ctx, afs.StepError.Err, mockRecvErr.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ReplicationFailingInitialParentProhibitsChildReplication(ctx *platformtest.Context) {
|
||||||
|
|
||||||
|
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
|
||||||
|
CREATEROOT
|
||||||
|
+ "sender"
|
||||||
|
+ "sender/a"
|
||||||
|
+ "sender/a/child"
|
||||||
|
+ "sender/aa"
|
||||||
|
+ "receiver"
|
||||||
|
R zfs create -p "${ROOTDS}/receiver/${ROOTDS}"
|
||||||
|
R zfs snapshot -r ${ROOTDS}/sender@initial
|
||||||
|
`)
|
||||||
|
|
||||||
|
sjid := endpoint.MustMakeJobID("sender-job")
|
||||||
|
rjid := endpoint.MustMakeJobID("receiver-job")
|
||||||
|
|
||||||
|
fsA := ctx.RootDataset + "/sender/a"
|
||||||
|
fsAChild := ctx.RootDataset + "/sender/a/child"
|
||||||
|
fsAA := ctx.RootDataset + "/sender/aa"
|
||||||
|
|
||||||
|
sfilter := filters.NewDatasetMapFilter(3, true)
|
||||||
|
mustAddToSFilter := func(fs string) {
|
||||||
|
err := sfilter.Add(fs, "ok")
|
||||||
|
require.NoError(ctx, err)
|
||||||
|
}
|
||||||
|
mustAddToSFilter(fsA)
|
||||||
|
mustAddToSFilter(fsAChild)
|
||||||
|
mustAddToSFilter(fsAA)
|
||||||
|
rfsRoot := ctx.RootDataset + "/receiver"
|
||||||
|
|
||||||
|
mockRecvErr := fmt.Errorf("yifae4ohPhaquaes0hohghiep9oufie4roo7quoWooluaj2ee8")
|
||||||
|
|
||||||
|
rep := replicationInvocation{
|
||||||
|
sjid: sjid,
|
||||||
|
rjid: rjid,
|
||||||
|
sfilter: sfilter,
|
||||||
|
rfsRoot: rfsRoot,
|
||||||
|
guarantee: *pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeNothing),
|
||||||
|
interceptReceiver: func(r *endpoint.Receiver) logic.Receiver {
|
||||||
|
return &ErroringReceiver{recvErr: mockRecvErr, Receiver: r}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
r := rep.Do(ctx)
|
||||||
|
ctx.Logf("\n%s", pretty.Sprint(r))
|
||||||
|
|
||||||
|
require.Len(ctx, r.Attempts, 1)
|
||||||
|
attempt := r.Attempts[0]
|
||||||
|
require.Nil(ctx, attempt.PlanError)
|
||||||
|
require.Len(ctx, attempt.Filesystems, 3)
|
||||||
|
|
||||||
|
fsByName := make(map[string]*report.FilesystemReport, len(attempt.Filesystems))
|
||||||
|
for _, fs := range attempt.Filesystems {
|
||||||
|
fsByName[fs.Info.Name] = fs
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Contains(ctx, fsByName, fsA)
|
||||||
|
require.Contains(ctx, fsByName, fsAA)
|
||||||
|
require.Contains(ctx, fsByName, fsAA)
|
||||||
|
|
||||||
|
checkFS := func(fs string, expectErrMsg string) {
|
||||||
|
rep := fsByName[fs]
|
||||||
|
require.Len(ctx, rep.Steps, 1)
|
||||||
|
require.Nil(ctx, rep.PlanError)
|
||||||
|
require.NotNil(ctx, rep.StepError)
|
||||||
|
require.Contains(ctx, rep.StepError.Err, expectErrMsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
checkFS(fsA, mockRecvErr.Error())
|
||||||
|
checkFS(fsAChild, "parent(s) failed during initial replication")
|
||||||
|
checkFS(fsAA, mockRecvErr.Error()) // fsAA is not treated as a child of fsA
|
||||||
|
}
|
||||||
|
@ -2,7 +2,6 @@ package driver
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"sort"
|
"sort"
|
||||||
@ -10,7 +9,9 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
"github.com/zrepl/zrepl/daemon/logging/trace"
|
"github.com/zrepl/zrepl/daemon/logging/trace"
|
||||||
|
"github.com/zrepl/zrepl/zfs"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
|
|
||||||
@ -374,11 +375,28 @@ func (a *attempt) doGlobalPlanning(ctx context.Context, prev *attempt) map[*fs]*
|
|||||||
// invariant: prevs contains an entry for each unambiguous correspondence
|
// invariant: prevs contains an entry for each unambiguous correspondence
|
||||||
|
|
||||||
// build up parent-child relationship (FIXME (O(n^2), but who's going to have that many filesystems...))
|
// build up parent-child relationship (FIXME (O(n^2), but who's going to have that many filesystems...))
|
||||||
|
mustDatasetPathOrPlanFail := func(fs string) *zfs.DatasetPath {
|
||||||
|
dp, err := zfs.NewDatasetPath(fs)
|
||||||
|
if err != nil {
|
||||||
|
now := time.Now()
|
||||||
|
a.planErr = newTimedError(errors.Wrapf(err, "%q", fs), now)
|
||||||
|
a.fss = nil
|
||||||
|
a.finishedAt = now
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return dp
|
||||||
|
}
|
||||||
for _, f1 := range a.fss {
|
for _, f1 := range a.fss {
|
||||||
fs1 := f1.fs.ReportInfo().Name
|
fs1 := mustDatasetPathOrPlanFail(f1.fs.ReportInfo().Name)
|
||||||
|
if fs1 == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
for _, f2 := range a.fss {
|
for _, f2 := range a.fss {
|
||||||
fs2 := f2.fs.ReportInfo().Name
|
fs2 := mustDatasetPathOrPlanFail(f2.fs.ReportInfo().Name)
|
||||||
if strings.HasPrefix(fs1, fs2) && fs1 != fs2 {
|
if fs2 == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if fs1.HasPrefix(fs2) && !fs1.Equal(fs2) {
|
||||||
f1.initialRepOrd.parents = append(f1.initialRepOrd.parents, f2)
|
f1.initialRepOrd.parents = append(f1.initialRepOrd.parents, f2)
|
||||||
f2.initialRepOrd.children = append(f2.initialRepOrd.children, f1)
|
f2.initialRepOrd.children = append(f2.initialRepOrd.children, f1)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user