[#348] replication: add platformtest to check behavior on recv fail while still sending

Regression test for #348
This commit is contained in:
Christian Schwarz 2020-07-26 12:24:05 +02:00
parent 43495d70c7
commit a0e3dc7040
2 changed files with 90 additions and 7 deletions

View File

@ -23,6 +23,7 @@ var Cases = []Case{BatchDestroy,
ReplicationIsResumableFullSend__both_GuaranteeResumability,
ReplicationIsResumableFullSend__initial_GuaranteeIncrementalReplication_incremental_GuaranteeIncrementalReplication,
ReplicationIsResumableFullSend__initial_GuaranteeResumability_incremental_GuaranteeIncrementalReplication,
ReplicationReceiverErrorWhileStillSending,
ReplicationStepCompletedLostBehavior__GuaranteeIncrementalReplication,
ReplicationStepCompletedLostBehavior__GuaranteeResumability,
ResumableRecvAndTokenHandling,

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"os"
"path"
"sort"
@ -27,11 +28,12 @@ import (
// of a new sender and receiver instance and one blocking invocation
// of the replication engine without encryption
type replicationInvocation struct {
sjid, rjid endpoint.JobID
sfs string
rfsRoot string
interceptSender func(e *endpoint.Sender) logic.Sender
guarantee pdu.ReplicationConfigProtection
sjid, rjid endpoint.JobID
sfs string
rfsRoot string
interceptSender func(e *endpoint.Sender) logic.Sender
interceptReceiver func(e *endpoint.Receiver) logic.Receiver
guarantee pdu.ReplicationConfigProtection
}
func (i replicationInvocation) Do(ctx *platformtest.Context) *report.Report {
@ -39,6 +41,9 @@ func (i replicationInvocation) Do(ctx *platformtest.Context) *report.Report {
if i.interceptSender == nil {
i.interceptSender = func(e *endpoint.Sender) logic.Sender { return e }
}
if i.interceptReceiver == nil {
i.interceptReceiver = func(e *endpoint.Receiver) logic.Receiver { return e }
}
sfilter := filters.NewDatasetMapFilter(1, true)
err := sfilter.Add(i.sfs, "ok")
@ -48,11 +53,11 @@ func (i replicationInvocation) Do(ctx *platformtest.Context) *report.Report {
Encrypt: &zfs.NilBool{B: false},
JobID: i.sjid,
}))
receiver := endpoint.NewReceiver(endpoint.ReceiverConfig{
receiver := i.interceptReceiver(endpoint.NewReceiver(endpoint.ReceiverConfig{
JobID: i.rjid,
AppendClientIdentity: false,
RootWithoutClientComponent: mustDatasetPath(i.rfsRoot),
})
}))
plannerPolicy := logic.PlannerPolicy{
EncryptedSend: logic.TriFromBool(false),
ReplicationConfig: pdu.ReplicationConfig{
@ -753,3 +758,80 @@ func replicationStepCompletedLostBehavior_impl(ctx *platformtest.Context, guaran
}
}
type ErroringReceiver struct {
recvErr error
*endpoint.Receiver
}
func (r *ErroringReceiver) Receive(ctx context.Context, req *pdu.ReceiveReq, stream io.ReadCloser) (*pdu.ReceiveRes, error) {
return nil, r.recvErr
}
type NeverEndingSender struct {
*endpoint.Sender
}
func (s *NeverEndingSender) Send(ctx context.Context, req *pdu.SendReq) (r *pdu.SendRes, stream io.ReadCloser, _ error) {
stream = nil
r = &pdu.SendRes{
UsedResumeToken: false,
ExpectedSize: 1 << 30,
}
if req.DryRun {
return r, stream, nil
}
dz, err := os.Open("/dev/zero")
if err != nil {
panic(err)
}
return r, dz, nil
}
func ReplicationReceiverErrorWhileStillSending(ctx *platformtest.Context) {
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
CREATEROOT
+ "sender"
+ "sender@1"
+ "receiver"
R zfs create -p "${ROOTDS}/receiver/${ROOTDS}"
`)
sjid := endpoint.MustMakeJobID("sender-job")
rjid := endpoint.MustMakeJobID("receiver-job")
sfs := ctx.RootDataset + "/sender"
rfsRoot := ctx.RootDataset + "/receiver"
mockRecvErr := fmt.Errorf("YiezahK3thie8ahKiel5sah2uugei2ize1yi8feivuu7musoat")
rep := replicationInvocation{
sjid: sjid,
rjid: rjid,
sfs: sfs,
rfsRoot: rfsRoot,
guarantee: *pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeNothing),
interceptReceiver: func(r *endpoint.Receiver) logic.Receiver {
return &ErroringReceiver{recvErr: mockRecvErr, Receiver: r}
},
interceptSender: func(s *endpoint.Sender) logic.Sender {
return &NeverEndingSender{s}
},
}
// first replication
report := rep.Do(ctx)
ctx.Logf("\n%s", pretty.Sprint(report))
require.Len(ctx, report.Attempts, 1)
attempt := report.Attempts[0]
require.Nil(ctx, attempt.PlanError)
require.Len(ctx, attempt.Filesystems, 1)
afs := attempt.Filesystems[0]
require.Nil(ctx, afs.PlanError)
require.Len(ctx, afs.Steps, 1)
require.Nil(ctx, afs.PlanError)
require.NotNil(ctx, afs.StepError)
require.Contains(ctx, afs.StepError.Err, mockRecvErr.Error())
}