From a0e3dc7040f8a4a81f6e2cc810fb361288d83071 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sun, 26 Jul 2020 12:24:05 +0200 Subject: [PATCH] [#348] replication: add platformtest to check behavior on recv fail while still sending Regression test for #348 --- platformtest/tests/generated_cases.go | 1 + platformtest/tests/replication.go | 96 +++++++++++++++++++++++++-- 2 files changed, 90 insertions(+), 7 deletions(-) diff --git a/platformtest/tests/generated_cases.go b/platformtest/tests/generated_cases.go index 73c5b4e..7a69669 100644 --- a/platformtest/tests/generated_cases.go +++ b/platformtest/tests/generated_cases.go @@ -23,6 +23,7 @@ var Cases = []Case{BatchDestroy, ReplicationIsResumableFullSend__both_GuaranteeResumability, ReplicationIsResumableFullSend__initial_GuaranteeIncrementalReplication_incremental_GuaranteeIncrementalReplication, ReplicationIsResumableFullSend__initial_GuaranteeResumability_incremental_GuaranteeIncrementalReplication, + ReplicationReceiverErrorWhileStillSending, ReplicationStepCompletedLostBehavior__GuaranteeIncrementalReplication, ReplicationStepCompletedLostBehavior__GuaranteeResumability, ResumableRecvAndTokenHandling, diff --git a/platformtest/tests/replication.go b/platformtest/tests/replication.go index 7cd0faa..3bae6f4 100644 --- a/platformtest/tests/replication.go +++ b/platformtest/tests/replication.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "os" "path" "sort" @@ -27,11 +28,12 @@ import ( // of a new sender and receiver instance and one blocking invocation // of the replication engine without encryption type replicationInvocation struct { - sjid, rjid endpoint.JobID - sfs string - rfsRoot string - interceptSender func(e *endpoint.Sender) logic.Sender - guarantee pdu.ReplicationConfigProtection + sjid, rjid endpoint.JobID + sfs string + rfsRoot string + interceptSender func(e *endpoint.Sender) logic.Sender + interceptReceiver func(e *endpoint.Receiver) logic.Receiver + guarantee pdu.ReplicationConfigProtection } func (i replicationInvocation) Do(ctx *platformtest.Context) *report.Report { @@ -39,6 +41,9 @@ func (i replicationInvocation) Do(ctx *platformtest.Context) *report.Report { if i.interceptSender == nil { i.interceptSender = func(e *endpoint.Sender) logic.Sender { return e } } + if i.interceptReceiver == nil { + i.interceptReceiver = func(e *endpoint.Receiver) logic.Receiver { return e } + } sfilter := filters.NewDatasetMapFilter(1, true) err := sfilter.Add(i.sfs, "ok") @@ -48,11 +53,11 @@ func (i replicationInvocation) Do(ctx *platformtest.Context) *report.Report { Encrypt: &zfs.NilBool{B: false}, JobID: i.sjid, })) - receiver := endpoint.NewReceiver(endpoint.ReceiverConfig{ + receiver := i.interceptReceiver(endpoint.NewReceiver(endpoint.ReceiverConfig{ JobID: i.rjid, AppendClientIdentity: false, RootWithoutClientComponent: mustDatasetPath(i.rfsRoot), - }) + })) plannerPolicy := logic.PlannerPolicy{ EncryptedSend: logic.TriFromBool(false), ReplicationConfig: pdu.ReplicationConfig{ @@ -753,3 +758,80 @@ func replicationStepCompletedLostBehavior_impl(ctx *platformtest.Context, guaran } } + +type ErroringReceiver struct { + recvErr error + *endpoint.Receiver +} + +func (r *ErroringReceiver) Receive(ctx context.Context, req *pdu.ReceiveReq, stream io.ReadCloser) (*pdu.ReceiveRes, error) { + return nil, r.recvErr +} + +type NeverEndingSender struct { + *endpoint.Sender +} + +func (s *NeverEndingSender) Send(ctx context.Context, req *pdu.SendReq) (r *pdu.SendRes, stream io.ReadCloser, _ error) { + stream = nil + r = &pdu.SendRes{ + UsedResumeToken: false, + ExpectedSize: 1 << 30, + } + if req.DryRun { + return r, stream, nil + } + dz, err := os.Open("/dev/zero") + if err != nil { + panic(err) + } + return r, dz, nil +} + +func ReplicationReceiverErrorWhileStillSending(ctx *platformtest.Context) { + + platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, ` + CREATEROOT + + "sender" + + "sender@1" + + "receiver" + R zfs create -p "${ROOTDS}/receiver/${ROOTDS}" + `) + + sjid := endpoint.MustMakeJobID("sender-job") + rjid := endpoint.MustMakeJobID("receiver-job") + + sfs := ctx.RootDataset + "/sender" + rfsRoot := ctx.RootDataset + "/receiver" + + mockRecvErr := fmt.Errorf("YiezahK3thie8ahKiel5sah2uugei2ize1yi8feivuu7musoat") + + rep := replicationInvocation{ + sjid: sjid, + rjid: rjid, + sfs: sfs, + rfsRoot: rfsRoot, + guarantee: *pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeNothing), + interceptReceiver: func(r *endpoint.Receiver) logic.Receiver { + return &ErroringReceiver{recvErr: mockRecvErr, Receiver: r} + }, + interceptSender: func(s *endpoint.Sender) logic.Sender { + return &NeverEndingSender{s} + }, + } + + // first replication + report := rep.Do(ctx) + ctx.Logf("\n%s", pretty.Sprint(report)) + + require.Len(ctx, report.Attempts, 1) + attempt := report.Attempts[0] + require.Nil(ctx, attempt.PlanError) + require.Len(ctx, attempt.Filesystems, 1) + afs := attempt.Filesystems[0] + require.Nil(ctx, afs.PlanError) + require.Len(ctx, afs.Steps, 1) + require.Nil(ctx, afs.PlanError) + require.NotNil(ctx, afs.StepError) + require.Contains(ctx, afs.StepError.Err, mockRecvErr.Error()) +}