From 4f9b63aa09d5010992f6c3e71a84be64cdcff176 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 16 Aug 2021 10:11:37 +0200 Subject: [PATCH] rework size estimation & dry sends - use control connection (gRPC) - use uint64 everywhere => fixes https://github.com/zrepl/zrepl/issues/463 - [BREAK] bump protocol version closes https://github.com/zrepl/zrepl/pull/518 fixes https://github.com/zrepl/zrepl/issues/463 --- client/status/viewmodel/bytecountbinary.go | 10 +- .../status/viewmodel/bytesprogresshistory.go | 11 +- client/status/viewmodel/render.go | 6 +- .../viewmodel/stringbuilder/stringbuilder.go | 4 +- endpoint/endpoint.go | 73 +++-- go.mod | 2 - platformtest/tests/replication.go | 11 +- replication/logic/pdu/pdu.pb.go | 286 ++++++++---------- replication/logic/pdu/pdu.proto | 13 +- replication/logic/pdu/pdu_grpc.pb.go | 36 +++ replication/logic/replication_logic.go | 14 +- replication/report/replication_report.go | 8 +- rpc/dataconn/dataconn_client.go | 16 +- rpc/rpc_client.go | 7 + rpc/versionhandshake/versionhandshake.go | 2 +- util/bytecounter/bytecounter_readcloser.go | 13 +- util/envconst/envconst.go | 18 ++ zfs/zfs.go | 22 +- 18 files changed, 320 insertions(+), 232 deletions(-) diff --git a/client/status/viewmodel/bytecountbinary.go b/client/status/viewmodel/bytecountbinary.go index 204072d..d89198f 100644 --- a/client/status/viewmodel/bytecountbinary.go +++ b/client/status/viewmodel/bytecountbinary.go @@ -2,14 +2,22 @@ package viewmodel import ( "fmt" + "math" ) +func ByteCountBinaryUint(b uint64) string { + if b > math.MaxInt64 { + panic(b) + } + return ByteCountBinary(int64(b)) +} + func ByteCountBinary(b int64) string { const unit = 1024 if b < unit { return fmt.Sprintf("%d B", b) } - div, exp := int64(unit), 0 + div, exp := unit, 0 for n := b / unit; n >= unit; n /= unit { div *= unit exp++ diff --git a/client/status/viewmodel/bytesprogresshistory.go b/client/status/viewmodel/bytesprogresshistory.go index 57a10d6..6125495 100644 --- a/client/status/viewmodel/bytesprogresshistory.go +++ b/client/status/viewmodel/bytesprogresshistory.go @@ -4,7 +4,7 @@ import "time" type byteProgressMeasurement struct { time time.Time - val int64 + val uint64 } type bytesProgressHistory struct { @@ -13,7 +13,7 @@ type bytesProgressHistory struct { lastChange time.Time } -func (p *bytesProgressHistory) Update(currentVal int64) (bytesPerSecondAvg int64, changeCount int) { +func (p *bytesProgressHistory) Update(currentVal uint64) (bytesPerSecondAvg int64, changeCount int) { if p.last == nil { p.last = &byteProgressMeasurement{ @@ -33,7 +33,12 @@ func (p *bytesProgressHistory) Update(currentVal int64) (bytesPerSecondAvg int64 return 0, 0 } - deltaV := currentVal - p.last.val + var deltaV int64 + if currentVal >= p.last.val { + deltaV = int64(currentVal - p.last.val) + } else { + deltaV = -int64(p.last.val - currentVal) + } deltaT := time.Since(p.last.time) rate := float64(deltaV) / deltaT.Seconds() diff --git a/client/status/viewmodel/render.go b/client/status/viewmodel/render.go index 7e29e6c..b83d014 100644 --- a/client/status/viewmodel/render.go +++ b/client/status/viewmodel/render.go @@ -230,7 +230,7 @@ func printFilesystemStatus(t *stringbuilder.B, rep *report.FilesystemReport, act status := fmt.Sprintf("%s (step %d/%d, %s/%s)%s", strings.ToUpper(string(rep.State)), rep.CurrentStep, len(rep.Steps), - ByteCountBinary(replicated), ByteCountBinary(expected), + ByteCountBinaryUint(replicated), ByteCountBinaryUint(expected), sizeEstimationImpreciseNotice, ) @@ -358,12 +358,12 @@ func renderReplicationReport(t *stringbuilder.B, rep *report.Report, history *by rate, changeCount := history.Update(replicated) eta := time.Duration(0) if rate > 0 { - eta = time.Duration((expected-replicated)/rate) * time.Second + eta = time.Duration((float64(expected)-float64(replicated))/float64(rate)) * time.Second } t.Write("Progress: ") t.DrawBar(50, replicated, expected, changeCount) - t.Write(fmt.Sprintf(" %s / %s @ %s/s", ByteCountBinary(replicated), ByteCountBinary(expected), ByteCountBinary(rate))) + t.Write(fmt.Sprintf(" %s / %s @ %s/s", ByteCountBinaryUint(replicated), ByteCountBinaryUint(expected), ByteCountBinary(rate))) if eta != 0 { t.Write(fmt.Sprintf(" (%s remaining)", humanizeDuration(eta))) } diff --git a/client/status/viewmodel/stringbuilder/stringbuilder.go b/client/status/viewmodel/stringbuilder/stringbuilder.go index 9d32f4d..5e30519 100644 --- a/client/status/viewmodel/stringbuilder/stringbuilder.go +++ b/client/status/viewmodel/stringbuilder/stringbuilder.go @@ -99,11 +99,11 @@ func RightPad(str string, length int, pad string) string { } // changeCount = 0 indicates stall / no progress -func (w *B) DrawBar(length int, bytes, totalBytes int64, changeCount int) { +func (w *B) DrawBar(length int, bytes, totalBytes uint64, changeCount int) { const arrowPositions = `>\|/` var completedLength int if totalBytes > 0 { - completedLength = int(int64(length) * bytes / totalBytes) + completedLength = int(uint64(length) * bytes / totalBytes) if completedLength > length { completedLength = length } diff --git a/endpoint/endpoint.go b/endpoint/endpoint.go index 6681bca..bc348c7 100644 --- a/endpoint/endpoint.go +++ b/endpoint/endpoint.go @@ -159,12 +159,11 @@ func sendArgsFromPDUAndValidateExistsAndGetVersion(ctx context.Context, fs strin return version, nil } -func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) { - defer trace.WithSpanFromStackUpdateCtx(&ctx)() +func (s *Sender) sendMakeArgs(ctx context.Context, r *pdu.SendReq) (sendArgs zfs.ZFSSendArgsValidated, _ error) { _, err := s.filterCheckFS(r.Filesystem) if err != nil { - return nil, nil, err + return sendArgs, err } switch r.Encrypted { case pdu.Tri_DontCare: @@ -172,16 +171,16 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea // ok, fallthrough outer case pdu.Tri_False: if s.config.Encrypt.B { - return nil, nil, errors.New("only encrypted sends allowed (send -w + encryption!= off), but unencrypted send requested") + return sendArgs, errors.New("only encrypted sends allowed (send -w + encryption!= off), but unencrypted send requested") } // fallthrough outer case pdu.Tri_True: if !s.config.Encrypt.B { - return nil, nil, errors.New("only unencrypted sends allowed, but encrypted send requested") + return sendArgs, errors.New("only unencrypted sends allowed, but encrypted send requested") } // fallthrough outer default: - return nil, nil, fmt.Errorf("unknown pdu.Tri variant %q", r.Encrypted) + return sendArgs, fmt.Errorf("unknown pdu.Tri variant %q", r.Encrypted) } sendArgsUnvalidated := zfs.ZFSSendArgsUnvalidated{ @@ -201,30 +200,19 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea }, } - sendArgs, err := sendArgsUnvalidated.Validate(ctx) + sendArgs, err = sendArgsUnvalidated.Validate(ctx) if err != nil { - return nil, nil, errors.Wrap(err, "validate send arguments") + return sendArgs, errors.Wrap(err, "validate send arguments") } + return sendArgs, nil +} - si, err := zfs.ZFSSendDry(ctx, sendArgs) +func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) { + defer trace.WithSpanFromStackUpdateCtx(&ctx)() + + sendArgs, err := s.sendMakeArgs(ctx, r) if err != nil { - return nil, nil, errors.Wrap(err, "zfs send dry failed") - } - - // From now on, assume that sendArgs has been validated by ZFSSendDry - // (because validation involves shelling out, it's actually a little expensive) - - var expSize int64 = 0 // protocol says 0 means no estimate - if si.SizeEstimate != -1 { // but si returns -1 for no size estimate - expSize = si.SizeEstimate - } - res := &pdu.SendRes{ - ExpectedSize: expSize, - UsedResumeToken: r.ResumeToken != "", - } - - if r.DryRun { - return res, nil, nil + return nil, nil, err } // create holds or bookmarks of `From` and `To` to guarantee one of the following: @@ -322,9 +310,37 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea // apply rate limit sendStream = s.bwLimit.WrapReadCloser(sendStream) + res := &pdu.SendRes{ + ExpectedSize: 0, + UsedResumeToken: r.ResumeToken != "", + } + return res, sendStream, nil } +func (s *Sender) SendDry(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, error) { + defer trace.WithSpanFromStackUpdateCtx(&ctx)() + + sendArgs, err := s.sendMakeArgs(ctx, r) + if err != nil { + return nil, err + } + + si, err := zfs.ZFSSendDry(ctx, sendArgs) + if err != nil { + return nil, errors.Wrap(err, "zfs send dry failed") + } + + // From now on, assume that sendArgs has been validated by ZFSSendDry + // (because validation involves shelling out, it's actually a little expensive) + + res := &pdu.SendRes{ + ExpectedSize: si.SizeEstimate, + UsedResumeToken: r.ResumeToken != "", + } + return res, nil +} + func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) { defer trace.WithSpanFromStackUpdateCtx(&ctx)() @@ -693,6 +709,11 @@ func (s *Receiver) Send(ctx context.Context, req *pdu.SendReq) (*pdu.SendRes, io return nil, nil, fmt.Errorf("receiver does not implement Send()") } +func (s *Receiver) SendDry(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, error) { + defer trace.WithSpanFromStackUpdateCtx(&ctx)() + return nil, fmt.Errorf("receiver does not implement SendDry()") +} + func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io.ReadCloser) (*pdu.ReceiveRes, error) { defer trace.WithSpanFromStackUpdateCtx(&ctx)() diff --git a/go.mod b/go.mod index 1bd33c9..e2eb7f1 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( github.com/gdamore/tcell/v2 v2.2.0 github.com/gitchander/permutation v0.0.0-20181107151852-9e56b92e9909 github.com/go-logfmt/logfmt v0.4.0 - github.com/go-playground/universal-translator v0.17.0 // indirect github.com/go-playground/validator v9.31.0+incompatible github.com/go-playground/validator/v10 v10.4.1 github.com/go-sql-driver/mysql v1.4.1-0.20190907122137-b2c03bcae3d4 @@ -44,7 +43,6 @@ require ( golang.org/x/net v0.0.0-20210119194325-5f4716e94777 golang.org/x/sync v0.0.0-20190423024810-112230192c58 golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c - golang.org/x/text v0.3.5 // indirect golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 gonum.org/v1/gonum v0.7.0 // indirect google.golang.org/genproto v0.0.0-20210122163508-8081c04a3579 // indirect diff --git a/platformtest/tests/replication.go b/platformtest/tests/replication.go index 972e8a4..88cec64 100644 --- a/platformtest/tests/replication.go +++ b/platformtest/tests/replication.go @@ -809,13 +809,22 @@ type NeverEndingSender struct { *endpoint.Sender } +func (s *NeverEndingSender) SendDry(ctx context.Context, req *pdu.SendReq) (r *pdu.SendRes, err error) { + r, _, err = s.sendImpl(ctx, req, true) + return r, err +} + func (s *NeverEndingSender) Send(ctx context.Context, req *pdu.SendReq) (r *pdu.SendRes, stream io.ReadCloser, _ error) { + return s.sendImpl(ctx, req, false) +} + +func (s *NeverEndingSender) sendImpl(ctx context.Context, req *pdu.SendReq, dry bool) (r *pdu.SendRes, stream io.ReadCloser, _ error) { stream = nil r = &pdu.SendRes{ UsedResumeToken: false, ExpectedSize: 1 << 30, } - if req.DryRun { + if dry { return r, stream, nil } dz, err := os.Open("/dev/zero") diff --git a/replication/logic/pdu/pdu.pb.go b/replication/logic/pdu/pdu.pb.go index 85783a3..534a40b 100644 --- a/replication/logic/pdu/pdu.pb.go +++ b/replication/logic/pdu/pdu.pb.go @@ -518,8 +518,7 @@ type SendReq struct { // encoded in the ResumeToken. Otherwise, the Sender MUST return an error. ResumeToken string `protobuf:"bytes,4,opt,name=ResumeToken,proto3" json:"ResumeToken,omitempty"` Encrypted Tri `protobuf:"varint,5,opt,name=Encrypted,proto3,enum=Tri" json:"Encrypted,omitempty"` - DryRun bool `protobuf:"varint,6,opt,name=DryRun,proto3" json:"DryRun,omitempty"` - ReplicationConfig *ReplicationConfig `protobuf:"bytes,7,opt,name=ReplicationConfig,proto3" json:"ReplicationConfig,omitempty"` + ReplicationConfig *ReplicationConfig `protobuf:"bytes,6,opt,name=ReplicationConfig,proto3" json:"ReplicationConfig,omitempty"` } func (x *SendReq) Reset() { @@ -589,13 +588,6 @@ func (x *SendReq) GetEncrypted() Tri { return Tri_DontCare } -func (x *SendReq) GetDryRun() bool { - if x != nil { - return x.DryRun - } - return false -} - func (x *SendReq) GetReplicationConfig() *ReplicationConfig { if x != nil { return x.ReplicationConfig @@ -766,12 +758,11 @@ type SendRes struct { unknownFields protoimpl.UnknownFields // Whether the resume token provided in the request has been used or not. - // If the SendReq.ResumeToken == "", this field has no meaning. - UsedResumeToken bool `protobuf:"varint,2,opt,name=UsedResumeToken,proto3" json:"UsedResumeToken,omitempty"` + // If the SendReq.ResumeToken == "", this field MUST be false. + UsedResumeToken bool `protobuf:"varint,1,opt,name=UsedResumeToken,proto3" json:"UsedResumeToken,omitempty"` // Expected stream size determined by dry run, not exact. // 0 indicates that for the given SendReq, no size estimate could be made. - ExpectedSize int64 `protobuf:"varint,3,opt,name=ExpectedSize,proto3" json:"ExpectedSize,omitempty"` - Properties []*Property `protobuf:"bytes,4,rep,name=Properties,proto3" json:"Properties,omitempty"` + ExpectedSize uint64 `protobuf:"varint,2,opt,name=ExpectedSize,proto3" json:"ExpectedSize,omitempty"` } func (x *SendRes) Reset() { @@ -813,20 +804,13 @@ func (x *SendRes) GetUsedResumeToken() bool { return false } -func (x *SendRes) GetExpectedSize() int64 { +func (x *SendRes) GetExpectedSize() uint64 { if x != nil { return x.ExpectedSize } return 0 } -func (x *SendRes) GetProperties() []*Property { - if x != nil { - return x.Properties - } - return nil -} - type SendCompletedReq struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1443,7 +1427,7 @@ var file_pdu_proto_rawDesc = []byte{ 0x72, 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x29, 0x0a, 0x0b, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0c, 0x0a, 0x08, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x10, 0x00, 0x12, 0x0c, 0x0a, 0x08, 0x42, 0x6f, 0x6f, 0x6b, 0x6d, 0x61, 0x72, 0x6b, - 0x10, 0x01, 0x22, 0x95, 0x02, 0x0a, 0x07, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x12, 0x1e, + 0x10, 0x01, 0x22, 0xfd, 0x01, 0x0a, 0x07, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x12, 0x1e, 0x0a, 0x0a, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x12, 0x26, 0x0a, 0x04, 0x46, 0x72, 0x6f, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x46, @@ -1455,122 +1439,119 @@ var file_pdu_proto_rawDesc = []byte{ 0x0b, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x22, 0x0a, 0x09, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x04, 0x2e, 0x54, 0x72, 0x69, 0x52, 0x09, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, - 0x12, 0x16, 0x0a, 0x06, 0x44, 0x72, 0x79, 0x52, 0x75, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x08, - 0x52, 0x06, 0x44, 0x72, 0x79, 0x52, 0x75, 0x6e, 0x12, 0x40, 0x0a, 0x11, 0x52, 0x65, 0x70, 0x6c, - 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x07, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, - 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x11, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22, 0x51, 0x0a, 0x11, 0x52, 0x65, - 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, - 0x3c, 0x0a, 0x0a, 0x70, 0x72, 0x6f, 0x74, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, - 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x50, 0x72, 0x6f, 0x74, 0x65, 0x63, 0x74, 0x69, 0x6f, - 0x6e, 0x52, 0x0a, 0x70, 0x72, 0x6f, 0x74, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x8f, 0x01, - 0x0a, 0x1b, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, - 0x66, 0x69, 0x67, 0x50, 0x72, 0x6f, 0x74, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x33, 0x0a, - 0x07, 0x49, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, - 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x47, 0x75, 0x61, 0x72, - 0x61, 0x6e, 0x74, 0x65, 0x65, 0x4b, 0x69, 0x6e, 0x64, 0x52, 0x07, 0x49, 0x6e, 0x69, 0x74, 0x69, - 0x61, 0x6c, 0x12, 0x3b, 0x0a, 0x0b, 0x49, 0x6e, 0x63, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x61, - 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x47, 0x75, 0x61, 0x72, 0x61, 0x6e, 0x74, 0x65, 0x65, 0x4b, 0x69, - 0x6e, 0x64, 0x52, 0x0b, 0x49, 0x6e, 0x63, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x22, - 0x34, 0x0a, 0x08, 0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x4e, - 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x4e, 0x61, 0x6d, 0x65, 0x12, - 0x14, 0x0a, 0x05, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, - 0x56, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x82, 0x01, 0x0a, 0x07, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, - 0x73, 0x12, 0x28, 0x0a, 0x0f, 0x55, 0x73, 0x65, 0x64, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x54, - 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0f, 0x55, 0x73, 0x65, 0x64, - 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x22, 0x0a, 0x0c, 0x45, - 0x78, 0x70, 0x65, 0x63, 0x74, 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x03, 0x52, 0x0c, 0x45, 0x78, 0x70, 0x65, 0x63, 0x74, 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65, 0x12, - 0x29, 0x0a, 0x0a, 0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x69, 0x65, 0x73, 0x18, 0x04, 0x20, - 0x03, 0x28, 0x0b, 0x32, 0x09, 0x2e, 0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x79, 0x52, 0x0a, - 0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x69, 0x65, 0x73, 0x22, 0x3e, 0x0a, 0x10, 0x53, 0x65, - 0x6e, 0x64, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x52, 0x65, 0x71, 0x12, 0x2a, - 0x0a, 0x0b, 0x4f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x61, 0x6c, 0x52, 0x65, 0x71, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x52, 0x0b, 0x4f, - 0x72, 0x69, 0x67, 0x69, 0x6e, 0x61, 0x6c, 0x52, 0x65, 0x71, 0x22, 0x12, 0x0a, 0x10, 0x53, 0x65, - 0x6e, 0x64, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x52, 0x65, 0x73, 0x22, 0xbe, - 0x01, 0x0a, 0x0a, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x52, 0x65, 0x71, 0x12, 0x1e, 0x0a, - 0x0a, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x0a, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x12, 0x22, 0x0a, - 0x02, 0x54, 0x6f, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x46, 0x69, 0x6c, 0x65, - 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x02, 0x54, - 0x6f, 0x12, 0x2a, 0x0a, 0x10, 0x43, 0x6c, 0x65, 0x61, 0x72, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, - 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x10, 0x43, 0x6c, 0x65, - 0x61, 0x72, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x40, 0x0a, + 0x12, 0x40, 0x0a, 0x11, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, + 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x52, 0x65, + 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x11, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, - 0x69, 0x67, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, - 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x11, 0x52, 0x65, - 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22, - 0x0c, 0x0a, 0x0a, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x52, 0x65, 0x73, 0x22, 0x67, 0x0a, - 0x13, 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f, 0x79, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, - 0x73, 0x52, 0x65, 0x71, 0x12, 0x1e, 0x0a, 0x0a, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, - 0x65, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, - 0x73, 0x74, 0x65, 0x6d, 0x12, 0x30, 0x0a, 0x09, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, - 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, - 0x73, 0x74, 0x65, 0x6d, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x53, 0x6e, 0x61, - 0x70, 0x73, 0x68, 0x6f, 0x74, 0x73, 0x22, 0x5a, 0x0a, 0x12, 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f, - 0x79, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x52, 0x65, 0x73, 0x12, 0x2e, 0x0a, 0x08, - 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, - 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x56, 0x65, 0x72, 0x73, 0x69, - 0x6f, 0x6e, 0x52, 0x08, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x12, 0x14, 0x0a, 0x05, - 0x45, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x45, 0x72, 0x72, - 0x6f, 0x72, 0x22, 0x44, 0x0a, 0x13, 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f, 0x79, 0x53, 0x6e, 0x61, - 0x70, 0x73, 0x68, 0x6f, 0x74, 0x73, 0x52, 0x65, 0x73, 0x12, 0x2d, 0x0a, 0x07, 0x52, 0x65, 0x73, - 0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x44, 0x65, 0x73, - 0x74, 0x72, 0x6f, 0x79, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x52, 0x65, 0x73, 0x52, - 0x07, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x22, 0x36, 0x0a, 0x14, 0x52, 0x65, 0x70, 0x6c, - 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x52, 0x65, 0x71, + 0x69, 0x67, 0x22, 0x51, 0x0a, 0x11, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3c, 0x0a, 0x0a, 0x70, 0x72, 0x6f, 0x74, 0x65, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x52, 0x65, + 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x50, + 0x72, 0x6f, 0x74, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0a, 0x70, 0x72, 0x6f, 0x74, 0x65, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x8f, 0x01, 0x0a, 0x1b, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x50, 0x72, 0x6f, 0x74, 0x65, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x33, 0x0a, 0x07, 0x49, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x6c, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x47, 0x75, 0x61, 0x72, 0x61, 0x6e, 0x74, 0x65, 0x65, 0x4b, 0x69, 0x6e, + 0x64, 0x52, 0x07, 0x49, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x6c, 0x12, 0x3b, 0x0a, 0x0b, 0x49, 0x6e, + 0x63, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, + 0x19, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x47, 0x75, 0x61, + 0x72, 0x61, 0x6e, 0x74, 0x65, 0x65, 0x4b, 0x69, 0x6e, 0x64, 0x52, 0x0b, 0x49, 0x6e, 0x63, 0x72, + 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x22, 0x34, 0x0a, 0x08, 0x50, 0x72, 0x6f, 0x70, 0x65, + 0x72, 0x74, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x04, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x56, 0x61, 0x6c, 0x75, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x57, 0x0a, + 0x07, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x12, 0x28, 0x0a, 0x0f, 0x55, 0x73, 0x65, 0x64, + 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x08, 0x52, 0x0f, 0x55, 0x73, 0x65, 0x64, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x54, 0x6f, 0x6b, + 0x65, 0x6e, 0x12, 0x22, 0x0a, 0x0c, 0x45, 0x78, 0x70, 0x65, 0x63, 0x74, 0x65, 0x64, 0x53, 0x69, + 0x7a, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x45, 0x78, 0x70, 0x65, 0x63, 0x74, + 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65, 0x22, 0x3e, 0x0a, 0x10, 0x53, 0x65, 0x6e, 0x64, 0x43, 0x6f, + 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x52, 0x65, 0x71, 0x12, 0x2a, 0x0a, 0x0b, 0x4f, 0x72, + 0x69, 0x67, 0x69, 0x6e, 0x61, 0x6c, 0x52, 0x65, 0x71, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x08, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x52, 0x0b, 0x4f, 0x72, 0x69, 0x67, 0x69, + 0x6e, 0x61, 0x6c, 0x52, 0x65, 0x71, 0x22, 0x12, 0x0a, 0x10, 0x53, 0x65, 0x6e, 0x64, 0x43, 0x6f, + 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x52, 0x65, 0x73, 0x22, 0xbe, 0x01, 0x0a, 0x0a, 0x52, + 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x52, 0x65, 0x71, 0x12, 0x1e, 0x0a, 0x0a, 0x46, 0x69, 0x6c, + 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x46, + 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x12, 0x22, 0x0a, 0x02, 0x54, 0x6f, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, + 0x65, 0x6d, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x02, 0x54, 0x6f, 0x12, 0x2a, 0x0a, + 0x10, 0x43, 0x6c, 0x65, 0x61, 0x72, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x54, 0x6f, 0x6b, 0x65, + 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x10, 0x43, 0x6c, 0x65, 0x61, 0x72, 0x52, 0x65, + 0x73, 0x75, 0x6d, 0x65, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x40, 0x0a, 0x11, 0x52, 0x65, 0x70, + 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x11, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22, 0x0c, 0x0a, 0x0a, 0x52, + 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x52, 0x65, 0x73, 0x22, 0x67, 0x0a, 0x13, 0x44, 0x65, 0x73, + 0x74, 0x72, 0x6f, 0x79, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x73, 0x52, 0x65, 0x71, 0x12, 0x1e, 0x0a, 0x0a, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, - 0x22, 0x54, 0x0a, 0x14, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, - 0x75, 0x72, 0x73, 0x6f, 0x72, 0x52, 0x65, 0x73, 0x12, 0x14, 0x0a, 0x04, 0x47, 0x75, 0x69, 0x64, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x48, 0x00, 0x52, 0x04, 0x47, 0x75, 0x69, 0x64, 0x12, 0x1c, - 0x0a, 0x08, 0x4e, 0x6f, 0x74, 0x65, 0x78, 0x69, 0x73, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, - 0x48, 0x00, 0x52, 0x08, 0x4e, 0x6f, 0x74, 0x65, 0x78, 0x69, 0x73, 0x74, 0x42, 0x08, 0x0a, 0x06, - 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x23, 0x0a, 0x07, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, - 0x71, 0x12, 0x18, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x1d, 0x0a, 0x07, 0x50, - 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x45, 0x63, 0x68, 0x6f, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x45, 0x63, 0x68, 0x6f, 0x2a, 0x28, 0x0a, 0x03, 0x54, 0x72, - 0x69, 0x12, 0x0c, 0x0a, 0x08, 0x44, 0x6f, 0x6e, 0x74, 0x43, 0x61, 0x72, 0x65, 0x10, 0x00, 0x12, - 0x09, 0x0a, 0x05, 0x46, 0x61, 0x6c, 0x73, 0x65, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x54, 0x72, - 0x75, 0x65, 0x10, 0x02, 0x2a, 0x86, 0x01, 0x0a, 0x18, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x47, 0x75, 0x61, 0x72, 0x61, 0x6e, 0x74, 0x65, 0x65, 0x4b, 0x69, 0x6e, - 0x64, 0x12, 0x14, 0x0a, 0x10, 0x47, 0x75, 0x61, 0x72, 0x61, 0x6e, 0x74, 0x65, 0x65, 0x49, 0x6e, - 0x76, 0x61, 0x6c, 0x69, 0x64, 0x10, 0x00, 0x12, 0x19, 0x0a, 0x15, 0x47, 0x75, 0x61, 0x72, 0x61, - 0x6e, 0x74, 0x65, 0x65, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, - 0x10, 0x01, 0x12, 0x23, 0x0a, 0x1f, 0x47, 0x75, 0x61, 0x72, 0x61, 0x6e, 0x74, 0x65, 0x65, 0x49, - 0x6e, 0x63, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x10, 0x02, 0x12, 0x14, 0x0a, 0x10, 0x47, 0x75, 0x61, 0x72, 0x61, - 0x6e, 0x74, 0x65, 0x65, 0x4e, 0x6f, 0x74, 0x68, 0x69, 0x6e, 0x67, 0x10, 0x03, 0x32, 0xf0, 0x02, - 0x0a, 0x0b, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1a, 0x0a, - 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x08, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x1a, - 0x08, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x12, 0x39, 0x0a, 0x0f, 0x4c, 0x69, 0x73, - 0x74, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x73, 0x12, 0x12, 0x2e, 0x4c, - 0x69, 0x73, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x52, 0x65, 0x71, - 0x1a, 0x12, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, - 0x6d, 0x52, 0x65, 0x73, 0x12, 0x50, 0x0a, 0x16, 0x4c, 0x69, 0x73, 0x74, 0x46, 0x69, 0x6c, 0x65, - 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1a, - 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x56, - 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x1a, 0x1a, 0x2e, 0x4c, 0x69, 0x73, + 0x12, 0x30, 0x0a, 0x09, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x73, 0x18, 0x02, 0x20, + 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, + 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, + 0x74, 0x73, 0x22, 0x5a, 0x0a, 0x12, 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f, 0x79, 0x53, 0x6e, 0x61, + 0x70, 0x73, 0x68, 0x6f, 0x74, 0x52, 0x65, 0x73, 0x12, 0x2e, 0x0a, 0x08, 0x53, 0x6e, 0x61, 0x70, + 0x73, 0x68, 0x6f, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x46, 0x69, 0x6c, + 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x08, + 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x45, 0x72, 0x72, 0x6f, + 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x22, 0x44, + 0x0a, 0x13, 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f, 0x79, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, + 0x74, 0x73, 0x52, 0x65, 0x73, 0x12, 0x2d, 0x0a, 0x07, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, + 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f, 0x79, + 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x52, 0x65, 0x73, 0x52, 0x07, 0x52, 0x65, 0x73, + 0x75, 0x6c, 0x74, 0x73, 0x22, 0x36, 0x0a, 0x14, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x52, 0x65, 0x71, 0x12, 0x1e, 0x0a, 0x0a, + 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0a, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x22, 0x54, 0x0a, 0x14, + 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x75, 0x72, 0x73, 0x6f, + 0x72, 0x52, 0x65, 0x73, 0x12, 0x14, 0x0a, 0x04, 0x47, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x04, 0x48, 0x00, 0x52, 0x04, 0x47, 0x75, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x08, 0x4e, 0x6f, + 0x74, 0x65, 0x78, 0x69, 0x73, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x08, + 0x4e, 0x6f, 0x74, 0x65, 0x78, 0x69, 0x73, 0x74, 0x42, 0x08, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, + 0x6c, 0x74, 0x22, 0x23, 0x0a, 0x07, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x12, 0x18, 0x0a, + 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, + 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x1d, 0x0a, 0x07, 0x50, 0x69, 0x6e, 0x67, 0x52, + 0x65, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x45, 0x63, 0x68, 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x04, 0x45, 0x63, 0x68, 0x6f, 0x2a, 0x28, 0x0a, 0x03, 0x54, 0x72, 0x69, 0x12, 0x0c, 0x0a, + 0x08, 0x44, 0x6f, 0x6e, 0x74, 0x43, 0x61, 0x72, 0x65, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x46, + 0x61, 0x6c, 0x73, 0x65, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x54, 0x72, 0x75, 0x65, 0x10, 0x02, + 0x2a, 0x86, 0x01, 0x0a, 0x18, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x47, 0x75, 0x61, 0x72, 0x61, 0x6e, 0x74, 0x65, 0x65, 0x4b, 0x69, 0x6e, 0x64, 0x12, 0x14, 0x0a, + 0x10, 0x47, 0x75, 0x61, 0x72, 0x61, 0x6e, 0x74, 0x65, 0x65, 0x49, 0x6e, 0x76, 0x61, 0x6c, 0x69, + 0x64, 0x10, 0x00, 0x12, 0x19, 0x0a, 0x15, 0x47, 0x75, 0x61, 0x72, 0x61, 0x6e, 0x74, 0x65, 0x65, + 0x52, 0x65, 0x73, 0x75, 0x6d, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x10, 0x01, 0x12, 0x23, + 0x0a, 0x1f, 0x47, 0x75, 0x61, 0x72, 0x61, 0x6e, 0x74, 0x65, 0x65, 0x49, 0x6e, 0x63, 0x72, 0x65, + 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x10, 0x02, 0x12, 0x14, 0x0a, 0x10, 0x47, 0x75, 0x61, 0x72, 0x61, 0x6e, 0x74, 0x65, 0x65, + 0x4e, 0x6f, 0x74, 0x68, 0x69, 0x6e, 0x67, 0x10, 0x03, 0x32, 0x8f, 0x03, 0x0a, 0x0b, 0x52, 0x65, + 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1a, 0x0a, 0x04, 0x50, 0x69, 0x6e, + 0x67, 0x12, 0x08, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x1a, 0x08, 0x2e, 0x50, 0x69, + 0x6e, 0x67, 0x52, 0x65, 0x73, 0x12, 0x39, 0x0a, 0x0f, 0x4c, 0x69, 0x73, 0x74, 0x46, 0x69, 0x6c, + 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x73, 0x12, 0x12, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x46, + 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x52, 0x65, 0x71, 0x1a, 0x12, 0x2e, 0x4c, + 0x69, 0x73, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x52, 0x65, 0x73, + 0x12, 0x50, 0x0a, 0x16, 0x4c, 0x69, 0x73, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, + 0x65, 0x6d, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1a, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x56, 0x65, 0x72, 0x73, 0x69, - 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x12, 0x3e, 0x0a, 0x10, 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f, - 0x79, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x73, 0x12, 0x14, 0x2e, 0x44, 0x65, 0x73, - 0x74, 0x72, 0x6f, 0x79, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x73, 0x52, 0x65, 0x71, - 0x1a, 0x14, 0x2e, 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f, 0x79, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, - 0x6f, 0x74, 0x73, 0x52, 0x65, 0x73, 0x12, 0x41, 0x0a, 0x11, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, 0x15, 0x2e, 0x52, 0x65, - 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x52, - 0x65, 0x71, 0x1a, 0x15, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, - 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x52, 0x65, 0x73, 0x12, 0x35, 0x0a, 0x0d, 0x53, 0x65, 0x6e, - 0x64, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x12, 0x11, 0x2e, 0x53, 0x65, 0x6e, - 0x64, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x52, 0x65, 0x71, 0x1a, 0x11, 0x2e, - 0x53, 0x65, 0x6e, 0x64, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x52, 0x65, 0x73, - 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x3b, 0x70, 0x64, 0x75, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, + 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x1a, 0x1a, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x46, 0x69, 0x6c, + 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x52, + 0x65, 0x73, 0x12, 0x3e, 0x0a, 0x10, 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f, 0x79, 0x53, 0x6e, 0x61, + 0x70, 0x73, 0x68, 0x6f, 0x74, 0x73, 0x12, 0x14, 0x2e, 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f, 0x79, + 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x73, 0x52, 0x65, 0x71, 0x1a, 0x14, 0x2e, 0x44, + 0x65, 0x73, 0x74, 0x72, 0x6f, 0x79, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x73, 0x52, + 0x65, 0x73, 0x12, 0x41, 0x0a, 0x11, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, 0x15, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x52, 0x65, 0x71, 0x1a, 0x15, + 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x75, 0x72, 0x73, + 0x6f, 0x72, 0x52, 0x65, 0x73, 0x12, 0x1d, 0x0a, 0x07, 0x53, 0x65, 0x6e, 0x64, 0x44, 0x72, 0x79, + 0x12, 0x08, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x1a, 0x08, 0x2e, 0x53, 0x65, 0x6e, + 0x64, 0x52, 0x65, 0x73, 0x12, 0x35, 0x0a, 0x0d, 0x53, 0x65, 0x6e, 0x64, 0x43, 0x6f, 0x6d, 0x70, + 0x6c, 0x65, 0x74, 0x65, 0x64, 0x12, 0x11, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x43, 0x6f, 0x6d, 0x70, + 0x6c, 0x65, 0x74, 0x65, 0x64, 0x52, 0x65, 0x71, 0x1a, 0x11, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x43, + 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x52, 0x65, 0x73, 0x42, 0x07, 0x5a, 0x05, 0x2e, + 0x3b, 0x70, 0x64, 0x75, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1625,30 +1606,31 @@ var file_pdu_proto_depIdxs = []int32{ 11, // 7: ReplicationConfig.protection:type_name -> ReplicationConfigProtection 1, // 8: ReplicationConfigProtection.Initial:type_name -> ReplicationGuaranteeKind 1, // 9: ReplicationConfigProtection.Incremental:type_name -> ReplicationGuaranteeKind - 12, // 10: SendRes.Properties:type_name -> Property - 9, // 11: SendCompletedReq.OriginalReq:type_name -> SendReq - 8, // 12: ReceiveReq.To:type_name -> FilesystemVersion - 10, // 13: ReceiveReq.ReplicationConfig:type_name -> ReplicationConfig - 8, // 14: DestroySnapshotsReq.Snapshots:type_name -> FilesystemVersion - 8, // 15: DestroySnapshotRes.Snapshot:type_name -> FilesystemVersion - 19, // 16: DestroySnapshotsRes.Results:type_name -> DestroySnapshotRes - 23, // 17: Replication.Ping:input_type -> PingReq - 3, // 18: Replication.ListFilesystems:input_type -> ListFilesystemReq - 6, // 19: Replication.ListFilesystemVersions:input_type -> ListFilesystemVersionsReq - 18, // 20: Replication.DestroySnapshots:input_type -> DestroySnapshotsReq - 21, // 21: Replication.ReplicationCursor:input_type -> ReplicationCursorReq + 9, // 10: SendCompletedReq.OriginalReq:type_name -> SendReq + 8, // 11: ReceiveReq.To:type_name -> FilesystemVersion + 10, // 12: ReceiveReq.ReplicationConfig:type_name -> ReplicationConfig + 8, // 13: DestroySnapshotsReq.Snapshots:type_name -> FilesystemVersion + 8, // 14: DestroySnapshotRes.Snapshot:type_name -> FilesystemVersion + 19, // 15: DestroySnapshotsRes.Results:type_name -> DestroySnapshotRes + 23, // 16: Replication.Ping:input_type -> PingReq + 3, // 17: Replication.ListFilesystems:input_type -> ListFilesystemReq + 6, // 18: Replication.ListFilesystemVersions:input_type -> ListFilesystemVersionsReq + 18, // 19: Replication.DestroySnapshots:input_type -> DestroySnapshotsReq + 21, // 20: Replication.ReplicationCursor:input_type -> ReplicationCursorReq + 9, // 21: Replication.SendDry:input_type -> SendReq 14, // 22: Replication.SendCompleted:input_type -> SendCompletedReq 24, // 23: Replication.Ping:output_type -> PingRes 4, // 24: Replication.ListFilesystems:output_type -> ListFilesystemRes 7, // 25: Replication.ListFilesystemVersions:output_type -> ListFilesystemVersionsRes 20, // 26: Replication.DestroySnapshots:output_type -> DestroySnapshotsRes 22, // 27: Replication.ReplicationCursor:output_type -> ReplicationCursorRes - 15, // 28: Replication.SendCompleted:output_type -> SendCompletedRes - 23, // [23:29] is the sub-list for method output_type - 17, // [17:23] is the sub-list for method input_type - 17, // [17:17] is the sub-list for extension type_name - 17, // [17:17] is the sub-list for extension extendee - 0, // [0:17] is the sub-list for field type_name + 13, // 28: Replication.SendDry:output_type -> SendRes + 15, // 29: Replication.SendCompleted:output_type -> SendCompletedRes + 23, // [23:30] is the sub-list for method output_type + 16, // [16:23] is the sub-list for method input_type + 16, // [16:16] is the sub-list for extension type_name + 16, // [16:16] is the sub-list for extension extendee + 0, // [0:16] is the sub-list for field type_name } func init() { file_pdu_proto_init() } diff --git a/replication/logic/pdu/pdu.proto b/replication/logic/pdu/pdu.proto index b420524..6eb8cd6 100644 --- a/replication/logic/pdu/pdu.proto +++ b/replication/logic/pdu/pdu.proto @@ -8,6 +8,7 @@ service Replication { returns (ListFilesystemVersionsRes); rpc DestroySnapshots(DestroySnapshotsReq) returns (DestroySnapshotsRes); rpc ReplicationCursor(ReplicationCursorReq) returns (ReplicationCursorRes); + rpc SendDry(SendReq) returns (SendRes); rpc SendCompleted(SendCompletedReq) returns (SendCompletedRes); // for Send and Recv, see package rpc } @@ -60,9 +61,7 @@ message SendReq { string ResumeToken = 4; Tri Encrypted = 5; - bool DryRun = 6; - - ReplicationConfig ReplicationConfig = 7; + ReplicationConfig ReplicationConfig = 6; } message ReplicationConfig { @@ -89,14 +88,12 @@ message Property { message SendRes { // Whether the resume token provided in the request has been used or not. - // If the SendReq.ResumeToken == "", this field has no meaning. - bool UsedResumeToken = 2; + // If the SendReq.ResumeToken == "", this field MUST be false. + bool UsedResumeToken = 1; // Expected stream size determined by dry run, not exact. // 0 indicates that for the given SendReq, no size estimate could be made. - int64 ExpectedSize = 3; - - repeated Property Properties = 4; + uint64 ExpectedSize = 2; } message SendCompletedReq { diff --git a/replication/logic/pdu/pdu_grpc.pb.go b/replication/logic/pdu/pdu_grpc.pb.go index 848c92f..235e614 100644 --- a/replication/logic/pdu/pdu_grpc.pb.go +++ b/replication/logic/pdu/pdu_grpc.pb.go @@ -23,6 +23,7 @@ type ReplicationClient interface { ListFilesystemVersions(ctx context.Context, in *ListFilesystemVersionsReq, opts ...grpc.CallOption) (*ListFilesystemVersionsRes, error) DestroySnapshots(ctx context.Context, in *DestroySnapshotsReq, opts ...grpc.CallOption) (*DestroySnapshotsRes, error) ReplicationCursor(ctx context.Context, in *ReplicationCursorReq, opts ...grpc.CallOption) (*ReplicationCursorRes, error) + SendDry(ctx context.Context, in *SendReq, opts ...grpc.CallOption) (*SendRes, error) SendCompleted(ctx context.Context, in *SendCompletedReq, opts ...grpc.CallOption) (*SendCompletedRes, error) } @@ -79,6 +80,15 @@ func (c *replicationClient) ReplicationCursor(ctx context.Context, in *Replicati return out, nil } +func (c *replicationClient) SendDry(ctx context.Context, in *SendReq, opts ...grpc.CallOption) (*SendRes, error) { + out := new(SendRes) + err := c.cc.Invoke(ctx, "/Replication/SendDry", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *replicationClient) SendCompleted(ctx context.Context, in *SendCompletedReq, opts ...grpc.CallOption) (*SendCompletedRes, error) { out := new(SendCompletedRes) err := c.cc.Invoke(ctx, "/Replication/SendCompleted", in, out, opts...) @@ -97,6 +107,7 @@ type ReplicationServer interface { ListFilesystemVersions(context.Context, *ListFilesystemVersionsReq) (*ListFilesystemVersionsRes, error) DestroySnapshots(context.Context, *DestroySnapshotsReq) (*DestroySnapshotsRes, error) ReplicationCursor(context.Context, *ReplicationCursorReq) (*ReplicationCursorRes, error) + SendDry(context.Context, *SendReq) (*SendRes, error) SendCompleted(context.Context, *SendCompletedReq) (*SendCompletedRes, error) mustEmbedUnimplementedReplicationServer() } @@ -120,6 +131,9 @@ func (UnimplementedReplicationServer) DestroySnapshots(context.Context, *Destroy func (UnimplementedReplicationServer) ReplicationCursor(context.Context, *ReplicationCursorReq) (*ReplicationCursorRes, error) { return nil, status.Errorf(codes.Unimplemented, "method ReplicationCursor not implemented") } +func (UnimplementedReplicationServer) SendDry(context.Context, *SendReq) (*SendRes, error) { + return nil, status.Errorf(codes.Unimplemented, "method SendDry not implemented") +} func (UnimplementedReplicationServer) SendCompleted(context.Context, *SendCompletedReq) (*SendCompletedRes, error) { return nil, status.Errorf(codes.Unimplemented, "method SendCompleted not implemented") } @@ -226,6 +240,24 @@ func _Replication_ReplicationCursor_Handler(srv interface{}, ctx context.Context return interceptor(ctx, in, info, handler) } +func _Replication_SendDry_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(SendReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ReplicationServer).SendDry(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/Replication/SendDry", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ReplicationServer).SendDry(ctx, req.(*SendReq)) + } + return interceptor(ctx, in, info, handler) +} + func _Replication_SendCompleted_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(SendCompletedReq) if err := dec(in); err != nil { @@ -271,6 +303,10 @@ var Replication_ServiceDesc = grpc.ServiceDesc{ MethodName: "ReplicationCursor", Handler: _Replication_ReplicationCursor_Handler, }, + { + MethodName: "SendDry", + Handler: _Replication_SendDry_Handler, + }, { MethodName: "SendCompleted", Handler: _Replication_SendCompleted_Handler, diff --git a/replication/logic/replication_logic.go b/replication/logic/replication_logic.go index cc5bbe7..6e67182 100644 --- a/replication/logic/replication_logic.go +++ b/replication/logic/replication_logic.go @@ -41,6 +41,7 @@ type Sender interface { // any next call to the parent github.com/zrepl/zrepl/replication.Endpoint. // If the send request is for dry run the io.ReadCloser will be nil Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) + SendDry(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, error) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) } @@ -158,7 +159,7 @@ type Step struct { encrypt tri resumeToken string // empty means no resume token shall be used - expectedSize int64 // 0 means no size estimate present / possible + expectedSize uint64 // 0 means no size estimate present / possible // byteCounter is nil initially, and set later in Step.doReplication // => concurrent read of that pointer from Step.ReportInfo must be protected @@ -189,7 +190,7 @@ func (s *Step) Step(ctx context.Context) error { func (s *Step) ReportInfo() *report.StepInfo { // get current byteCounter value - var byteCounter int64 + var byteCounter uint64 s.byteCounterMtx.Lock() if s.byteCounter != nil { byteCounter = s.byteCounter.Count() @@ -546,10 +547,10 @@ func (s *Step) updateSizeEstimate(ctx context.Context) error { log := getLogger(ctx) - sr := s.buildSendRequest(true) + sr := s.buildSendRequest() log.Debug("initiate dry run send request") - sres, _, err := s.sender.Send(ctx, sr) + sres, err := s.sender.SendDry(ctx, sr) if err != nil { log.WithError(err).Error("dry run send request failed") return err @@ -563,7 +564,7 @@ func (s *Step) updateSizeEstimate(ctx context.Context) error { return nil } -func (s *Step) buildSendRequest(dryRun bool) (sr *pdu.SendReq) { +func (s *Step) buildSendRequest() (sr *pdu.SendReq) { fs := s.parent.Path sr = &pdu.SendReq{ Filesystem: fs, @@ -571,7 +572,6 @@ func (s *Step) buildSendRequest(dryRun bool) (sr *pdu.SendReq) { To: s.to, Encrypted: s.encrypt.ToPDU(), ResumeToken: s.resumeToken, - DryRun: dryRun, ReplicationConfig: s.parent.policy.ReplicationConfig, } return sr @@ -582,7 +582,7 @@ func (s *Step) doReplication(ctx context.Context) error { fs := s.parent.Path log := getLogger(ctx).WithField("filesystem", fs) - sr := s.buildSendRequest(false) + sr := s.buildSendRequest() log.Debug("initiate send request") sres, stream, err := s.sender.Send(ctx, sr) diff --git a/replication/report/replication_report.go b/replication/report/replication_report.go index bc3d0ae..bb05085 100644 --- a/replication/report/replication_report.go +++ b/replication/report/replication_report.go @@ -97,11 +97,11 @@ type StepInfo struct { From, To string Resumed bool Encrypted EncryptedEnum - BytesExpected int64 - BytesReplicated int64 + BytesExpected uint64 + BytesReplicated uint64 } -func (a *AttemptReport) BytesSum() (expected, replicated int64, containsInvalidSizeEstimates bool) { +func (a *AttemptReport) BytesSum() (expected, replicated uint64, containsInvalidSizeEstimates bool) { for _, fs := range a.Filesystems { e, r, fsContainsInvalidEstimate := fs.BytesSum() containsInvalidSizeEstimates = containsInvalidSizeEstimates || fsContainsInvalidEstimate @@ -111,7 +111,7 @@ func (a *AttemptReport) BytesSum() (expected, replicated int64, containsInvalidS return expected, replicated, containsInvalidSizeEstimates } -func (f *FilesystemReport) BytesSum() (expected, replicated int64, containsInvalidSizeEstimates bool) { +func (f *FilesystemReport) BytesSum() (expected, replicated uint64, containsInvalidSizeEstimates bool) { for _, step := range f.Steps { expected += step.Info.BytesExpected replicated += step.Info.BytesReplicated diff --git a/rpc/dataconn/dataconn_client.go b/rpc/dataconn/dataconn_client.go index f590e92..76914db 100644 --- a/rpc/dataconn/dataconn_client.go +++ b/rpc/dataconn/dataconn_client.go @@ -114,12 +114,6 @@ func (c *Client) ReqSend(ctx context.Context, req *pdu.SendReq) (*pdu.SendRes, i if err != nil { return nil, nil, err } - putWireOnReturn := true - defer func() { - if putWireOnReturn { - c.putWire(conn) - } - }() if err := c.send(ctx, conn, EndpointSend, req, nil); err != nil { return nil, nil, err @@ -131,14 +125,10 @@ func (c *Client) ReqSend(ctx context.Context, req *pdu.SendReq) (*pdu.SendRes, i } var stream io.ReadCloser - if !req.DryRun { - putWireOnReturn = false - stream, err = conn.ReadStream(ZFSStream, true) // no shadow - if err != nil { - return nil, nil, err - } + stream, err = conn.ReadStream(ZFSStream, true) // no shadow + if err != nil { + return nil, nil, err } - return &res, stream, nil } diff --git a/rpc/rpc_client.go b/rpc/rpc_client.go index 841a6dd..4c8f81e 100644 --- a/rpc/rpc_client.go +++ b/rpc/rpc_client.go @@ -108,6 +108,13 @@ func (c *Client) Receive(ctx context.Context, req *pdu.ReceiveReq, stream io.Rea return c.dataClient.ReqRecv(ctx, req, stream) } +func (c *Client) SendDry(ctx context.Context, in *pdu.SendReq) (*pdu.SendRes, error) { + ctx, endSpan := trace.WithSpan(ctx, "rpc.client.SendDry") + defer endSpan() + + return c.controlClient.SendDry(ctx, in) +} + func (c *Client) ListFilesystems(ctx context.Context, in *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error) { ctx, endSpan := trace.WithSpan(ctx, "rpc.client.ListFilesystems") defer endSpan() diff --git a/rpc/versionhandshake/versionhandshake.go b/rpc/versionhandshake/versionhandshake.go index 749535a..341b5cc 100644 --- a/rpc/versionhandshake/versionhandshake.go +++ b/rpc/versionhandshake/versionhandshake.go @@ -152,7 +152,7 @@ func (m *HandshakeMessage) DecodeReader(r io.Reader, maxLen int) error { func DoHandshakeCurrentVersion(conn net.Conn, deadline time.Time) *HandshakeError { // current protocol version is hardcoded here - return DoHandshakeVersion(conn, deadline, 5) + return DoHandshakeVersion(conn, deadline, 6) } const HandshakeMessageMaxLen = 16 * 4096 diff --git a/util/bytecounter/bytecounter_readcloser.go b/util/bytecounter/bytecounter_readcloser.go index 0ddd238..12e542a 100644 --- a/util/bytecounter/bytecounter_readcloser.go +++ b/util/bytecounter/bytecounter_readcloser.go @@ -9,7 +9,7 @@ import ( // its interface and counting the bytes written to during copying. type ReadCloser interface { io.ReadCloser - Count() int64 + Count() uint64 } // NewReadCloser wraps rc. @@ -19,11 +19,11 @@ func NewReadCloser(rc io.ReadCloser) ReadCloser { type readCloser struct { rc io.ReadCloser - count int64 + count uint64 } -func (r *readCloser) Count() int64 { - return atomic.LoadInt64(&r.count) +func (r *readCloser) Count() uint64 { + return atomic.LoadUint64(&r.count) } var _ io.ReadCloser = &readCloser{} @@ -34,6 +34,9 @@ func (r *readCloser) Close() error { func (r *readCloser) Read(p []byte) (int, error) { n, err := r.rc.Read(p) - atomic.AddInt64(&r.count, int64(n)) + if n < 0 { + panic("expecting n >= 0") + } + atomic.AddUint64(&r.count, uint64(n)) return n, err } diff --git a/util/envconst/envconst.go b/util/envconst/envconst.go index 1dfc6eb..5858b98 100644 --- a/util/envconst/envconst.go +++ b/util/envconst/envconst.go @@ -73,6 +73,24 @@ func Int64(varname string, def int64) (d int64) { return d } +func Uint64(varname string, def uint64) (d uint64) { + var err error + if v, ok := cache.Load(varname); ok { + return v.(uint64) + } + e := os.Getenv(varname) + if e == "" { + d = def + } else { + d, err = strconv.ParseUint(e, 10, 64) + if err != nil { + panic(err) + } + } + cache.Store(varname, d) + return d +} + func Bool(varname string, def bool) (d bool) { var err error if v, ok := cache.Load(varname); ok { diff --git a/zfs/zfs.go b/zfs/zfs.go index 141ea5e..bf82547 100644 --- a/zfs/zfs.go +++ b/zfs/zfs.go @@ -7,6 +7,7 @@ import ( "encoding/json" "fmt" "io" + "math" "os" "os/exec" "regexp" @@ -960,7 +961,7 @@ type DrySendInfo struct { Type DrySendType Filesystem string // parsed from To field From, To string // direct copy from ZFS output - SizeEstimate int64 // -1 if size estimate is not possible + SizeEstimate uint64 // 0 if size estimate is not possible } var ( @@ -1033,11 +1034,10 @@ func (s *DrySendInfo) unmarshalInfoLine(l string) (regexMatched bool, err error) // see https://github.com/zrepl/zrepl/issues/289 fields["size"] = "0" } - s.SizeEstimate, err = strconv.ParseInt(fields["size"], 10, 64) + s.SizeEstimate, err = strconv.ParseUint(fields["size"], 10, 64) if err != nil { return true, fmt.Errorf("cannot not parse size: %s", err) } - return true, nil } @@ -1047,6 +1047,7 @@ func ZFSSendDry(ctx context.Context, sendArgs ZFSSendArgsValidated) (_ *DrySendI if sendArgs.From != nil && strings.Contains(sendArgs.From.RelName, "#") { /* TODO: + * XXX feature check & support this as well * ZFS at the time of writing does not support dry-run send because size-estimation * uses fromSnap's deadlist. However, for a bookmark, that deadlist no longer exists. * Redacted send & recv will bring this functionality, see @@ -1065,7 +1066,7 @@ func ZFSSendDry(ctx context.Context, sendArgs ZFSSendArgsValidated) (_ *DrySendI Filesystem: sendArgs.FS, From: fromAbs, To: toAbs, - SizeEstimate: -1}, nil + SizeEstimate: 0}, nil } args := make([]string, 0) @@ -1085,6 +1086,19 @@ func ZFSSendDry(ctx context.Context, sendArgs ZFSSendArgsValidated) (_ *DrySendI if err := si.unmarshalZFSOutput(output); err != nil { return nil, fmt.Errorf("could not parse zfs send -n output: %s", err) } + + // There is a bug in OpenZFS where it estimates the size incorrectly. + // - zrepl: https://github.com/zrepl/zrepl/issues/463 + // - resulting upstream bug: https://github.com/openzfs/zfs/issues/12265 + // + // The wrong estimates are easy to detect because they are absurdly large. + // NB: we're doing the workaround for this late so that the test cases are not affected. + sizeEstimateThreshold := envconst.Uint64("ZREPL_ZFS_SEND_SIZE_ESTIMATE_INCORRECT_THRESHOLD", math.MaxInt64) + if sizeEstimateThreshold != 0 && si.SizeEstimate >= sizeEstimateThreshold { + debug("size estimate exceeds threshold %v, working around it: %#v %q", sizeEstimateThreshold, si, args) + si.SizeEstimate = 0 + } + return &si, nil }