rework size estimation & dry sends

- use control connection (gRPC)
- use uint64 everywhere => fixes https://github.com/zrepl/zrepl/issues/463
- [BREAK] bump protocol version

closes https://github.com/zrepl/zrepl/pull/518
fixes https://github.com/zrepl/zrepl/issues/463
This commit is contained in:
Christian Schwarz 2021-08-16 10:11:37 +02:00
parent a8e92971d0
commit 4f9b63aa09
18 changed files with 320 additions and 232 deletions

View File

@ -2,14 +2,22 @@ package viewmodel
import (
"fmt"
"math"
)
func ByteCountBinaryUint(b uint64) string {
if b > math.MaxInt64 {
panic(b)
}
return ByteCountBinary(int64(b))
}
func ByteCountBinary(b int64) string {
const unit = 1024
if b < unit {
return fmt.Sprintf("%d B", b)
}
div, exp := int64(unit), 0
div, exp := unit, 0
for n := b / unit; n >= unit; n /= unit {
div *= unit
exp++

View File

@ -4,7 +4,7 @@ import "time"
type byteProgressMeasurement struct {
time time.Time
val int64
val uint64
}
type bytesProgressHistory struct {
@ -13,7 +13,7 @@ type bytesProgressHistory struct {
lastChange time.Time
}
func (p *bytesProgressHistory) Update(currentVal int64) (bytesPerSecondAvg int64, changeCount int) {
func (p *bytesProgressHistory) Update(currentVal uint64) (bytesPerSecondAvg int64, changeCount int) {
if p.last == nil {
p.last = &byteProgressMeasurement{
@ -33,7 +33,12 @@ func (p *bytesProgressHistory) Update(currentVal int64) (bytesPerSecondAvg int64
return 0, 0
}
deltaV := currentVal - p.last.val
var deltaV int64
if currentVal >= p.last.val {
deltaV = int64(currentVal - p.last.val)
} else {
deltaV = -int64(p.last.val - currentVal)
}
deltaT := time.Since(p.last.time)
rate := float64(deltaV) / deltaT.Seconds()

View File

@ -230,7 +230,7 @@ func printFilesystemStatus(t *stringbuilder.B, rep *report.FilesystemReport, act
status := fmt.Sprintf("%s (step %d/%d, %s/%s)%s",
strings.ToUpper(string(rep.State)),
rep.CurrentStep, len(rep.Steps),
ByteCountBinary(replicated), ByteCountBinary(expected),
ByteCountBinaryUint(replicated), ByteCountBinaryUint(expected),
sizeEstimationImpreciseNotice,
)
@ -358,12 +358,12 @@ func renderReplicationReport(t *stringbuilder.B, rep *report.Report, history *by
rate, changeCount := history.Update(replicated)
eta := time.Duration(0)
if rate > 0 {
eta = time.Duration((expected-replicated)/rate) * time.Second
eta = time.Duration((float64(expected)-float64(replicated))/float64(rate)) * time.Second
}
t.Write("Progress: ")
t.DrawBar(50, replicated, expected, changeCount)
t.Write(fmt.Sprintf(" %s / %s @ %s/s", ByteCountBinary(replicated), ByteCountBinary(expected), ByteCountBinary(rate)))
t.Write(fmt.Sprintf(" %s / %s @ %s/s", ByteCountBinaryUint(replicated), ByteCountBinaryUint(expected), ByteCountBinary(rate)))
if eta != 0 {
t.Write(fmt.Sprintf(" (%s remaining)", humanizeDuration(eta)))
}

View File

@ -99,11 +99,11 @@ func RightPad(str string, length int, pad string) string {
}
// changeCount = 0 indicates stall / no progress
func (w *B) DrawBar(length int, bytes, totalBytes int64, changeCount int) {
func (w *B) DrawBar(length int, bytes, totalBytes uint64, changeCount int) {
const arrowPositions = `>\|/`
var completedLength int
if totalBytes > 0 {
completedLength = int(int64(length) * bytes / totalBytes)
completedLength = int(uint64(length) * bytes / totalBytes)
if completedLength > length {
completedLength = length
}

View File

@ -159,12 +159,11 @@ func sendArgsFromPDUAndValidateExistsAndGetVersion(ctx context.Context, fs strin
return version, nil
}
func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
func (s *Sender) sendMakeArgs(ctx context.Context, r *pdu.SendReq) (sendArgs zfs.ZFSSendArgsValidated, _ error) {
_, err := s.filterCheckFS(r.Filesystem)
if err != nil {
return nil, nil, err
return sendArgs, err
}
switch r.Encrypted {
case pdu.Tri_DontCare:
@ -172,16 +171,16 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
// ok, fallthrough outer
case pdu.Tri_False:
if s.config.Encrypt.B {
return nil, nil, errors.New("only encrypted sends allowed (send -w + encryption!= off), but unencrypted send requested")
return sendArgs, errors.New("only encrypted sends allowed (send -w + encryption!= off), but unencrypted send requested")
}
// fallthrough outer
case pdu.Tri_True:
if !s.config.Encrypt.B {
return nil, nil, errors.New("only unencrypted sends allowed, but encrypted send requested")
return sendArgs, errors.New("only unencrypted sends allowed, but encrypted send requested")
}
// fallthrough outer
default:
return nil, nil, fmt.Errorf("unknown pdu.Tri variant %q", r.Encrypted)
return sendArgs, fmt.Errorf("unknown pdu.Tri variant %q", r.Encrypted)
}
sendArgsUnvalidated := zfs.ZFSSendArgsUnvalidated{
@ -201,30 +200,19 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
},
}
sendArgs, err := sendArgsUnvalidated.Validate(ctx)
sendArgs, err = sendArgsUnvalidated.Validate(ctx)
if err != nil {
return nil, nil, errors.Wrap(err, "validate send arguments")
return sendArgs, errors.Wrap(err, "validate send arguments")
}
return sendArgs, nil
}
si, err := zfs.ZFSSendDry(ctx, sendArgs)
func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
sendArgs, err := s.sendMakeArgs(ctx, r)
if err != nil {
return nil, nil, errors.Wrap(err, "zfs send dry failed")
}
// From now on, assume that sendArgs has been validated by ZFSSendDry
// (because validation involves shelling out, it's actually a little expensive)
var expSize int64 = 0 // protocol says 0 means no estimate
if si.SizeEstimate != -1 { // but si returns -1 for no size estimate
expSize = si.SizeEstimate
}
res := &pdu.SendRes{
ExpectedSize: expSize,
UsedResumeToken: r.ResumeToken != "",
}
if r.DryRun {
return res, nil, nil
return nil, nil, err
}
// create holds or bookmarks of `From` and `To` to guarantee one of the following:
@ -322,9 +310,37 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
// apply rate limit
sendStream = s.bwLimit.WrapReadCloser(sendStream)
res := &pdu.SendRes{
ExpectedSize: 0,
UsedResumeToken: r.ResumeToken != "",
}
return res, sendStream, nil
}
func (s *Sender) SendDry(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
sendArgs, err := s.sendMakeArgs(ctx, r)
if err != nil {
return nil, err
}
si, err := zfs.ZFSSendDry(ctx, sendArgs)
if err != nil {
return nil, errors.Wrap(err, "zfs send dry failed")
}
// From now on, assume that sendArgs has been validated by ZFSSendDry
// (because validation involves shelling out, it's actually a little expensive)
res := &pdu.SendRes{
ExpectedSize: si.SizeEstimate,
UsedResumeToken: r.ResumeToken != "",
}
return res, nil
}
func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
@ -693,6 +709,11 @@ func (s *Receiver) Send(ctx context.Context, req *pdu.SendReq) (*pdu.SendRes, io
return nil, nil, fmt.Errorf("receiver does not implement Send()")
}
func (s *Receiver) SendDry(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
return nil, fmt.Errorf("receiver does not implement SendDry()")
}
func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io.ReadCloser) (*pdu.ReceiveRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()

2
go.mod
View File

@ -8,7 +8,6 @@ require (
github.com/gdamore/tcell/v2 v2.2.0
github.com/gitchander/permutation v0.0.0-20181107151852-9e56b92e9909
github.com/go-logfmt/logfmt v0.4.0
github.com/go-playground/universal-translator v0.17.0 // indirect
github.com/go-playground/validator v9.31.0+incompatible
github.com/go-playground/validator/v10 v10.4.1
github.com/go-sql-driver/mysql v1.4.1-0.20190907122137-b2c03bcae3d4
@ -44,7 +43,6 @@ require (
golang.org/x/net v0.0.0-20210119194325-5f4716e94777
golang.org/x/sync v0.0.0-20190423024810-112230192c58
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c
golang.org/x/text v0.3.5 // indirect
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135
gonum.org/v1/gonum v0.7.0 // indirect
google.golang.org/genproto v0.0.0-20210122163508-8081c04a3579 // indirect

View File

@ -809,13 +809,22 @@ type NeverEndingSender struct {
*endpoint.Sender
}
func (s *NeverEndingSender) SendDry(ctx context.Context, req *pdu.SendReq) (r *pdu.SendRes, err error) {
r, _, err = s.sendImpl(ctx, req, true)
return r, err
}
func (s *NeverEndingSender) Send(ctx context.Context, req *pdu.SendReq) (r *pdu.SendRes, stream io.ReadCloser, _ error) {
return s.sendImpl(ctx, req, false)
}
func (s *NeverEndingSender) sendImpl(ctx context.Context, req *pdu.SendReq, dry bool) (r *pdu.SendRes, stream io.ReadCloser, _ error) {
stream = nil
r = &pdu.SendRes{
UsedResumeToken: false,
ExpectedSize: 1 << 30,
}
if req.DryRun {
if dry {
return r, stream, nil
}
dz, err := os.Open("/dev/zero")

View File

@ -518,8 +518,7 @@ type SendReq struct {
// encoded in the ResumeToken. Otherwise, the Sender MUST return an error.
ResumeToken string `protobuf:"bytes,4,opt,name=ResumeToken,proto3" json:"ResumeToken,omitempty"`
Encrypted Tri `protobuf:"varint,5,opt,name=Encrypted,proto3,enum=Tri" json:"Encrypted,omitempty"`
DryRun bool `protobuf:"varint,6,opt,name=DryRun,proto3" json:"DryRun,omitempty"`
ReplicationConfig *ReplicationConfig `protobuf:"bytes,7,opt,name=ReplicationConfig,proto3" json:"ReplicationConfig,omitempty"`
ReplicationConfig *ReplicationConfig `protobuf:"bytes,6,opt,name=ReplicationConfig,proto3" json:"ReplicationConfig,omitempty"`
}
func (x *SendReq) Reset() {
@ -589,13 +588,6 @@ func (x *SendReq) GetEncrypted() Tri {
return Tri_DontCare
}
func (x *SendReq) GetDryRun() bool {
if x != nil {
return x.DryRun
}
return false
}
func (x *SendReq) GetReplicationConfig() *ReplicationConfig {
if x != nil {
return x.ReplicationConfig
@ -766,12 +758,11 @@ type SendRes struct {
unknownFields protoimpl.UnknownFields
// Whether the resume token provided in the request has been used or not.
// If the SendReq.ResumeToken == "", this field has no meaning.
UsedResumeToken bool `protobuf:"varint,2,opt,name=UsedResumeToken,proto3" json:"UsedResumeToken,omitempty"`
// If the SendReq.ResumeToken == "", this field MUST be false.
UsedResumeToken bool `protobuf:"varint,1,opt,name=UsedResumeToken,proto3" json:"UsedResumeToken,omitempty"`
// Expected stream size determined by dry run, not exact.
// 0 indicates that for the given SendReq, no size estimate could be made.
ExpectedSize int64 `protobuf:"varint,3,opt,name=ExpectedSize,proto3" json:"ExpectedSize,omitempty"`
Properties []*Property `protobuf:"bytes,4,rep,name=Properties,proto3" json:"Properties,omitempty"`
ExpectedSize uint64 `protobuf:"varint,2,opt,name=ExpectedSize,proto3" json:"ExpectedSize,omitempty"`
}
func (x *SendRes) Reset() {
@ -813,20 +804,13 @@ func (x *SendRes) GetUsedResumeToken() bool {
return false
}
func (x *SendRes) GetExpectedSize() int64 {
func (x *SendRes) GetExpectedSize() uint64 {
if x != nil {
return x.ExpectedSize
}
return 0
}
func (x *SendRes) GetProperties() []*Property {
if x != nil {
return x.Properties
}
return nil
}
type SendCompletedReq struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@ -1443,7 +1427,7 @@ var file_pdu_proto_rawDesc = []byte{
0x72, 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x29, 0x0a, 0x0b, 0x56, 0x65, 0x72, 0x73, 0x69,
0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0c, 0x0a, 0x08, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68,
0x6f, 0x74, 0x10, 0x00, 0x12, 0x0c, 0x0a, 0x08, 0x42, 0x6f, 0x6f, 0x6b, 0x6d, 0x61, 0x72, 0x6b,
0x10, 0x01, 0x22, 0x95, 0x02, 0x0a, 0x07, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x12, 0x1e,
0x10, 0x01, 0x22, 0xfd, 0x01, 0x0a, 0x07, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x12, 0x1e,
0x0a, 0x0a, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x0a, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x12, 0x26,
0x0a, 0x04, 0x46, 0x72, 0x6f, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x46,
@ -1455,122 +1439,119 @@ var file_pdu_proto_rawDesc = []byte{
0x0b, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x22, 0x0a, 0x09,
0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0e, 0x32,
0x04, 0x2e, 0x54, 0x72, 0x69, 0x52, 0x09, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64,
0x12, 0x16, 0x0a, 0x06, 0x44, 0x72, 0x79, 0x52, 0x75, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x08,
0x52, 0x06, 0x44, 0x72, 0x79, 0x52, 0x75, 0x6e, 0x12, 0x40, 0x0a, 0x11, 0x52, 0x65, 0x70, 0x6c,
0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x07, 0x20,
0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f,
0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x11, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61,
0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22, 0x51, 0x0a, 0x11, 0x52, 0x65,
0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12,
0x3c, 0x0a, 0x0a, 0x70, 0x72, 0x6f, 0x74, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20,
0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f,
0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x50, 0x72, 0x6f, 0x74, 0x65, 0x63, 0x74, 0x69, 0x6f,
0x6e, 0x52, 0x0a, 0x70, 0x72, 0x6f, 0x74, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x8f, 0x01,
0x0a, 0x1b, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e,
0x66, 0x69, 0x67, 0x50, 0x72, 0x6f, 0x74, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x33, 0x0a,
0x07, 0x49, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19,
0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x47, 0x75, 0x61, 0x72,
0x61, 0x6e, 0x74, 0x65, 0x65, 0x4b, 0x69, 0x6e, 0x64, 0x52, 0x07, 0x49, 0x6e, 0x69, 0x74, 0x69,
0x61, 0x6c, 0x12, 0x3b, 0x0a, 0x0b, 0x49, 0x6e, 0x63, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x61,
0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63,
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x47, 0x75, 0x61, 0x72, 0x61, 0x6e, 0x74, 0x65, 0x65, 0x4b, 0x69,
0x6e, 0x64, 0x52, 0x0b, 0x49, 0x6e, 0x63, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x22,
0x34, 0x0a, 0x08, 0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x4e,
0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x4e, 0x61, 0x6d, 0x65, 0x12,
0x14, 0x0a, 0x05, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05,
0x56, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x82, 0x01, 0x0a, 0x07, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65,
0x73, 0x12, 0x28, 0x0a, 0x0f, 0x55, 0x73, 0x65, 0x64, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x54,
0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0f, 0x55, 0x73, 0x65, 0x64,
0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x22, 0x0a, 0x0c, 0x45,
0x78, 0x70, 0x65, 0x63, 0x74, 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28,
0x03, 0x52, 0x0c, 0x45, 0x78, 0x70, 0x65, 0x63, 0x74, 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65, 0x12,
0x29, 0x0a, 0x0a, 0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x69, 0x65, 0x73, 0x18, 0x04, 0x20,
0x03, 0x28, 0x0b, 0x32, 0x09, 0x2e, 0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x79, 0x52, 0x0a,
0x50, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x69, 0x65, 0x73, 0x22, 0x3e, 0x0a, 0x10, 0x53, 0x65,
0x6e, 0x64, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x52, 0x65, 0x71, 0x12, 0x2a,
0x0a, 0x0b, 0x4f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x61, 0x6c, 0x52, 0x65, 0x71, 0x18, 0x02, 0x20,
0x01, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x52, 0x0b, 0x4f,
0x72, 0x69, 0x67, 0x69, 0x6e, 0x61, 0x6c, 0x52, 0x65, 0x71, 0x22, 0x12, 0x0a, 0x10, 0x53, 0x65,
0x6e, 0x64, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x52, 0x65, 0x73, 0x22, 0xbe,
0x01, 0x0a, 0x0a, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x52, 0x65, 0x71, 0x12, 0x1e, 0x0a,
0x0a, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x0a, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x12, 0x22, 0x0a,
0x02, 0x54, 0x6f, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x46, 0x69, 0x6c, 0x65,
0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x02, 0x54,
0x6f, 0x12, 0x2a, 0x0a, 0x10, 0x43, 0x6c, 0x65, 0x61, 0x72, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65,
0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x10, 0x43, 0x6c, 0x65,
0x61, 0x72, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x40, 0x0a,
0x12, 0x40, 0x0a, 0x11, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43,
0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x52, 0x65,
0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52,
0x11, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66,
0x69, 0x67, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69,
0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x11, 0x52, 0x65,
0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22,
0x0c, 0x0a, 0x0a, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x52, 0x65, 0x73, 0x22, 0x67, 0x0a,
0x13, 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f, 0x79, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74,
0x73, 0x52, 0x65, 0x71, 0x12, 0x1e, 0x0a, 0x0a, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74,
0x65, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79,
0x73, 0x74, 0x65, 0x6d, 0x12, 0x30, 0x0a, 0x09, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74,
0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79,
0x73, 0x74, 0x65, 0x6d, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x53, 0x6e, 0x61,
0x70, 0x73, 0x68, 0x6f, 0x74, 0x73, 0x22, 0x5a, 0x0a, 0x12, 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f,
0x79, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x52, 0x65, 0x73, 0x12, 0x2e, 0x0a, 0x08,
0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12,
0x2e, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x56, 0x65, 0x72, 0x73, 0x69,
0x6f, 0x6e, 0x52, 0x08, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x12, 0x14, 0x0a, 0x05,
0x45, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x45, 0x72, 0x72,
0x6f, 0x72, 0x22, 0x44, 0x0a, 0x13, 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f, 0x79, 0x53, 0x6e, 0x61,
0x70, 0x73, 0x68, 0x6f, 0x74, 0x73, 0x52, 0x65, 0x73, 0x12, 0x2d, 0x0a, 0x07, 0x52, 0x65, 0x73,
0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x44, 0x65, 0x73,
0x74, 0x72, 0x6f, 0x79, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x52, 0x65, 0x73, 0x52,
0x07, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x22, 0x36, 0x0a, 0x14, 0x52, 0x65, 0x70, 0x6c,
0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x52, 0x65, 0x71,
0x69, 0x67, 0x22, 0x51, 0x0a, 0x11, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f,
0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3c, 0x0a, 0x0a, 0x70, 0x72, 0x6f, 0x74, 0x65,
0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x52, 0x65,
0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x50,
0x72, 0x6f, 0x74, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0a, 0x70, 0x72, 0x6f, 0x74, 0x65,
0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x8f, 0x01, 0x0a, 0x1b, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63,
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x50, 0x72, 0x6f, 0x74, 0x65,
0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x33, 0x0a, 0x07, 0x49, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x6c,
0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61,
0x74, 0x69, 0x6f, 0x6e, 0x47, 0x75, 0x61, 0x72, 0x61, 0x6e, 0x74, 0x65, 0x65, 0x4b, 0x69, 0x6e,
0x64, 0x52, 0x07, 0x49, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x6c, 0x12, 0x3b, 0x0a, 0x0b, 0x49, 0x6e,
0x63, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32,
0x19, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x47, 0x75, 0x61,
0x72, 0x61, 0x6e, 0x74, 0x65, 0x65, 0x4b, 0x69, 0x6e, 0x64, 0x52, 0x0b, 0x49, 0x6e, 0x63, 0x72,
0x65, 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x22, 0x34, 0x0a, 0x08, 0x50, 0x72, 0x6f, 0x70, 0x65,
0x72, 0x74, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x04, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x56, 0x61, 0x6c, 0x75, 0x65,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x57, 0x0a,
0x07, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x12, 0x28, 0x0a, 0x0f, 0x55, 0x73, 0x65, 0x64,
0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28,
0x08, 0x52, 0x0f, 0x55, 0x73, 0x65, 0x64, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x54, 0x6f, 0x6b,
0x65, 0x6e, 0x12, 0x22, 0x0a, 0x0c, 0x45, 0x78, 0x70, 0x65, 0x63, 0x74, 0x65, 0x64, 0x53, 0x69,
0x7a, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x45, 0x78, 0x70, 0x65, 0x63, 0x74,
0x65, 0x64, 0x53, 0x69, 0x7a, 0x65, 0x22, 0x3e, 0x0a, 0x10, 0x53, 0x65, 0x6e, 0x64, 0x43, 0x6f,
0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x52, 0x65, 0x71, 0x12, 0x2a, 0x0a, 0x0b, 0x4f, 0x72,
0x69, 0x67, 0x69, 0x6e, 0x61, 0x6c, 0x52, 0x65, 0x71, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x08, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x52, 0x0b, 0x4f, 0x72, 0x69, 0x67, 0x69,
0x6e, 0x61, 0x6c, 0x52, 0x65, 0x71, 0x22, 0x12, 0x0a, 0x10, 0x53, 0x65, 0x6e, 0x64, 0x43, 0x6f,
0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x52, 0x65, 0x73, 0x22, 0xbe, 0x01, 0x0a, 0x0a, 0x52,
0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x52, 0x65, 0x71, 0x12, 0x1e, 0x0a, 0x0a, 0x46, 0x69, 0x6c,
0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x46,
0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x12, 0x22, 0x0a, 0x02, 0x54, 0x6f, 0x18,
0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74,
0x65, 0x6d, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x02, 0x54, 0x6f, 0x12, 0x2a, 0x0a,
0x10, 0x43, 0x6c, 0x65, 0x61, 0x72, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x54, 0x6f, 0x6b, 0x65,
0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x10, 0x43, 0x6c, 0x65, 0x61, 0x72, 0x52, 0x65,
0x73, 0x75, 0x6d, 0x65, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x40, 0x0a, 0x11, 0x52, 0x65, 0x70,
0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x04,
0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69,
0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x11, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63,
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22, 0x0c, 0x0a, 0x0a, 0x52,
0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x52, 0x65, 0x73, 0x22, 0x67, 0x0a, 0x13, 0x44, 0x65, 0x73,
0x74, 0x72, 0x6f, 0x79, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x73, 0x52, 0x65, 0x71,
0x12, 0x1e, 0x0a, 0x0a, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d,
0x22, 0x54, 0x0a, 0x14, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43,
0x75, 0x72, 0x73, 0x6f, 0x72, 0x52, 0x65, 0x73, 0x12, 0x14, 0x0a, 0x04, 0x47, 0x75, 0x69, 0x64,
0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x48, 0x00, 0x52, 0x04, 0x47, 0x75, 0x69, 0x64, 0x12, 0x1c,
0x0a, 0x08, 0x4e, 0x6f, 0x74, 0x65, 0x78, 0x69, 0x73, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08,
0x48, 0x00, 0x52, 0x08, 0x4e, 0x6f, 0x74, 0x65, 0x78, 0x69, 0x73, 0x74, 0x42, 0x08, 0x0a, 0x06,
0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x23, 0x0a, 0x07, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65,
0x71, 0x12, 0x18, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x1d, 0x0a, 0x07, 0x50,
0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x45, 0x63, 0x68, 0x6f, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x45, 0x63, 0x68, 0x6f, 0x2a, 0x28, 0x0a, 0x03, 0x54, 0x72,
0x69, 0x12, 0x0c, 0x0a, 0x08, 0x44, 0x6f, 0x6e, 0x74, 0x43, 0x61, 0x72, 0x65, 0x10, 0x00, 0x12,
0x09, 0x0a, 0x05, 0x46, 0x61, 0x6c, 0x73, 0x65, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x54, 0x72,
0x75, 0x65, 0x10, 0x02, 0x2a, 0x86, 0x01, 0x0a, 0x18, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61,
0x74, 0x69, 0x6f, 0x6e, 0x47, 0x75, 0x61, 0x72, 0x61, 0x6e, 0x74, 0x65, 0x65, 0x4b, 0x69, 0x6e,
0x64, 0x12, 0x14, 0x0a, 0x10, 0x47, 0x75, 0x61, 0x72, 0x61, 0x6e, 0x74, 0x65, 0x65, 0x49, 0x6e,
0x76, 0x61, 0x6c, 0x69, 0x64, 0x10, 0x00, 0x12, 0x19, 0x0a, 0x15, 0x47, 0x75, 0x61, 0x72, 0x61,
0x6e, 0x74, 0x65, 0x65, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79,
0x10, 0x01, 0x12, 0x23, 0x0a, 0x1f, 0x47, 0x75, 0x61, 0x72, 0x61, 0x6e, 0x74, 0x65, 0x65, 0x49,
0x6e, 0x63, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63,
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x10, 0x02, 0x12, 0x14, 0x0a, 0x10, 0x47, 0x75, 0x61, 0x72, 0x61,
0x6e, 0x74, 0x65, 0x65, 0x4e, 0x6f, 0x74, 0x68, 0x69, 0x6e, 0x67, 0x10, 0x03, 0x32, 0xf0, 0x02,
0x0a, 0x0b, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1a, 0x0a,
0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x08, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x1a,
0x08, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x12, 0x39, 0x0a, 0x0f, 0x4c, 0x69, 0x73,
0x74, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x73, 0x12, 0x12, 0x2e, 0x4c,
0x69, 0x73, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x52, 0x65, 0x71,
0x1a, 0x12, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65,
0x6d, 0x52, 0x65, 0x73, 0x12, 0x50, 0x0a, 0x16, 0x4c, 0x69, 0x73, 0x74, 0x46, 0x69, 0x6c, 0x65,
0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1a,
0x2e, 0x4c, 0x69, 0x73, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x56,
0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x1a, 0x1a, 0x2e, 0x4c, 0x69, 0x73,
0x12, 0x30, 0x0a, 0x09, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x73, 0x18, 0x02, 0x20,
0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d,
0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f,
0x74, 0x73, 0x22, 0x5a, 0x0a, 0x12, 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f, 0x79, 0x53, 0x6e, 0x61,
0x70, 0x73, 0x68, 0x6f, 0x74, 0x52, 0x65, 0x73, 0x12, 0x2e, 0x0a, 0x08, 0x53, 0x6e, 0x61, 0x70,
0x73, 0x68, 0x6f, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x46, 0x69, 0x6c,
0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x08,
0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x45, 0x72, 0x72, 0x6f,
0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x22, 0x44,
0x0a, 0x13, 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f, 0x79, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f,
0x74, 0x73, 0x52, 0x65, 0x73, 0x12, 0x2d, 0x0a, 0x07, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73,
0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f, 0x79,
0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x52, 0x65, 0x73, 0x52, 0x07, 0x52, 0x65, 0x73,
0x75, 0x6c, 0x74, 0x73, 0x22, 0x36, 0x0a, 0x14, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74,
0x69, 0x6f, 0x6e, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x52, 0x65, 0x71, 0x12, 0x1e, 0x0a, 0x0a,
0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x0a, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x22, 0x54, 0x0a, 0x14,
0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x75, 0x72, 0x73, 0x6f,
0x72, 0x52, 0x65, 0x73, 0x12, 0x14, 0x0a, 0x04, 0x47, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01,
0x28, 0x04, 0x48, 0x00, 0x52, 0x04, 0x47, 0x75, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x08, 0x4e, 0x6f,
0x74, 0x65, 0x78, 0x69, 0x73, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x08,
0x4e, 0x6f, 0x74, 0x65, 0x78, 0x69, 0x73, 0x74, 0x42, 0x08, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75,
0x6c, 0x74, 0x22, 0x23, 0x0a, 0x07, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x12, 0x18, 0x0a,
0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07,
0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x1d, 0x0a, 0x07, 0x50, 0x69, 0x6e, 0x67, 0x52,
0x65, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x45, 0x63, 0x68, 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x04, 0x45, 0x63, 0x68, 0x6f, 0x2a, 0x28, 0x0a, 0x03, 0x54, 0x72, 0x69, 0x12, 0x0c, 0x0a,
0x08, 0x44, 0x6f, 0x6e, 0x74, 0x43, 0x61, 0x72, 0x65, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x46,
0x61, 0x6c, 0x73, 0x65, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x54, 0x72, 0x75, 0x65, 0x10, 0x02,
0x2a, 0x86, 0x01, 0x0a, 0x18, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e,
0x47, 0x75, 0x61, 0x72, 0x61, 0x6e, 0x74, 0x65, 0x65, 0x4b, 0x69, 0x6e, 0x64, 0x12, 0x14, 0x0a,
0x10, 0x47, 0x75, 0x61, 0x72, 0x61, 0x6e, 0x74, 0x65, 0x65, 0x49, 0x6e, 0x76, 0x61, 0x6c, 0x69,
0x64, 0x10, 0x00, 0x12, 0x19, 0x0a, 0x15, 0x47, 0x75, 0x61, 0x72, 0x61, 0x6e, 0x74, 0x65, 0x65,
0x52, 0x65, 0x73, 0x75, 0x6d, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x10, 0x01, 0x12, 0x23,
0x0a, 0x1f, 0x47, 0x75, 0x61, 0x72, 0x61, 0x6e, 0x74, 0x65, 0x65, 0x49, 0x6e, 0x63, 0x72, 0x65,
0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f,
0x6e, 0x10, 0x02, 0x12, 0x14, 0x0a, 0x10, 0x47, 0x75, 0x61, 0x72, 0x61, 0x6e, 0x74, 0x65, 0x65,
0x4e, 0x6f, 0x74, 0x68, 0x69, 0x6e, 0x67, 0x10, 0x03, 0x32, 0x8f, 0x03, 0x0a, 0x0b, 0x52, 0x65,
0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1a, 0x0a, 0x04, 0x50, 0x69, 0x6e,
0x67, 0x12, 0x08, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x1a, 0x08, 0x2e, 0x50, 0x69,
0x6e, 0x67, 0x52, 0x65, 0x73, 0x12, 0x39, 0x0a, 0x0f, 0x4c, 0x69, 0x73, 0x74, 0x46, 0x69, 0x6c,
0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x73, 0x12, 0x12, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x46,
0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x52, 0x65, 0x71, 0x1a, 0x12, 0x2e, 0x4c,
0x69, 0x73, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x52, 0x65, 0x73,
0x12, 0x50, 0x0a, 0x16, 0x4c, 0x69, 0x73, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74,
0x65, 0x6d, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1a, 0x2e, 0x4c, 0x69, 0x73,
0x74, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x56, 0x65, 0x72, 0x73, 0x69,
0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x12, 0x3e, 0x0a, 0x10, 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f,
0x79, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x73, 0x12, 0x14, 0x2e, 0x44, 0x65, 0x73,
0x74, 0x72, 0x6f, 0x79, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x73, 0x52, 0x65, 0x71,
0x1a, 0x14, 0x2e, 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f, 0x79, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68,
0x6f, 0x74, 0x73, 0x52, 0x65, 0x73, 0x12, 0x41, 0x0a, 0x11, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63,
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, 0x15, 0x2e, 0x52, 0x65,
0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x52,
0x65, 0x71, 0x1a, 0x15, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e,
0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x52, 0x65, 0x73, 0x12, 0x35, 0x0a, 0x0d, 0x53, 0x65, 0x6e,
0x64, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x12, 0x11, 0x2e, 0x53, 0x65, 0x6e,
0x64, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x52, 0x65, 0x71, 0x1a, 0x11, 0x2e,
0x53, 0x65, 0x6e, 0x64, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x52, 0x65, 0x73,
0x42, 0x07, 0x5a, 0x05, 0x2e, 0x3b, 0x70, 0x64, 0x75, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x33,
0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x1a, 0x1a, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x46, 0x69, 0x6c,
0x65, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x52,
0x65, 0x73, 0x12, 0x3e, 0x0a, 0x10, 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f, 0x79, 0x53, 0x6e, 0x61,
0x70, 0x73, 0x68, 0x6f, 0x74, 0x73, 0x12, 0x14, 0x2e, 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f, 0x79,
0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x73, 0x52, 0x65, 0x71, 0x1a, 0x14, 0x2e, 0x44,
0x65, 0x73, 0x74, 0x72, 0x6f, 0x79, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x73, 0x52,
0x65, 0x73, 0x12, 0x41, 0x0a, 0x11, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f,
0x6e, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, 0x15, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63,
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x52, 0x65, 0x71, 0x1a, 0x15,
0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x75, 0x72, 0x73,
0x6f, 0x72, 0x52, 0x65, 0x73, 0x12, 0x1d, 0x0a, 0x07, 0x53, 0x65, 0x6e, 0x64, 0x44, 0x72, 0x79,
0x12, 0x08, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x1a, 0x08, 0x2e, 0x53, 0x65, 0x6e,
0x64, 0x52, 0x65, 0x73, 0x12, 0x35, 0x0a, 0x0d, 0x53, 0x65, 0x6e, 0x64, 0x43, 0x6f, 0x6d, 0x70,
0x6c, 0x65, 0x74, 0x65, 0x64, 0x12, 0x11, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x43, 0x6f, 0x6d, 0x70,
0x6c, 0x65, 0x74, 0x65, 0x64, 0x52, 0x65, 0x71, 0x1a, 0x11, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x43,
0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x52, 0x65, 0x73, 0x42, 0x07, 0x5a, 0x05, 0x2e,
0x3b, 0x70, 0x64, 0x75, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -1625,30 +1606,31 @@ var file_pdu_proto_depIdxs = []int32{
11, // 7: ReplicationConfig.protection:type_name -> ReplicationConfigProtection
1, // 8: ReplicationConfigProtection.Initial:type_name -> ReplicationGuaranteeKind
1, // 9: ReplicationConfigProtection.Incremental:type_name -> ReplicationGuaranteeKind
12, // 10: SendRes.Properties:type_name -> Property
9, // 11: SendCompletedReq.OriginalReq:type_name -> SendReq
8, // 12: ReceiveReq.To:type_name -> FilesystemVersion
10, // 13: ReceiveReq.ReplicationConfig:type_name -> ReplicationConfig
8, // 14: DestroySnapshotsReq.Snapshots:type_name -> FilesystemVersion
8, // 15: DestroySnapshotRes.Snapshot:type_name -> FilesystemVersion
19, // 16: DestroySnapshotsRes.Results:type_name -> DestroySnapshotRes
23, // 17: Replication.Ping:input_type -> PingReq
3, // 18: Replication.ListFilesystems:input_type -> ListFilesystemReq
6, // 19: Replication.ListFilesystemVersions:input_type -> ListFilesystemVersionsReq
18, // 20: Replication.DestroySnapshots:input_type -> DestroySnapshotsReq
21, // 21: Replication.ReplicationCursor:input_type -> ReplicationCursorReq
9, // 10: SendCompletedReq.OriginalReq:type_name -> SendReq
8, // 11: ReceiveReq.To:type_name -> FilesystemVersion
10, // 12: ReceiveReq.ReplicationConfig:type_name -> ReplicationConfig
8, // 13: DestroySnapshotsReq.Snapshots:type_name -> FilesystemVersion
8, // 14: DestroySnapshotRes.Snapshot:type_name -> FilesystemVersion
19, // 15: DestroySnapshotsRes.Results:type_name -> DestroySnapshotRes
23, // 16: Replication.Ping:input_type -> PingReq
3, // 17: Replication.ListFilesystems:input_type -> ListFilesystemReq
6, // 18: Replication.ListFilesystemVersions:input_type -> ListFilesystemVersionsReq
18, // 19: Replication.DestroySnapshots:input_type -> DestroySnapshotsReq
21, // 20: Replication.ReplicationCursor:input_type -> ReplicationCursorReq
9, // 21: Replication.SendDry:input_type -> SendReq
14, // 22: Replication.SendCompleted:input_type -> SendCompletedReq
24, // 23: Replication.Ping:output_type -> PingRes
4, // 24: Replication.ListFilesystems:output_type -> ListFilesystemRes
7, // 25: Replication.ListFilesystemVersions:output_type -> ListFilesystemVersionsRes
20, // 26: Replication.DestroySnapshots:output_type -> DestroySnapshotsRes
22, // 27: Replication.ReplicationCursor:output_type -> ReplicationCursorRes
15, // 28: Replication.SendCompleted:output_type -> SendCompletedRes
23, // [23:29] is the sub-list for method output_type
17, // [17:23] is the sub-list for method input_type
17, // [17:17] is the sub-list for extension type_name
17, // [17:17] is the sub-list for extension extendee
0, // [0:17] is the sub-list for field type_name
13, // 28: Replication.SendDry:output_type -> SendRes
15, // 29: Replication.SendCompleted:output_type -> SendCompletedRes
23, // [23:30] is the sub-list for method output_type
16, // [16:23] is the sub-list for method input_type
16, // [16:16] is the sub-list for extension type_name
16, // [16:16] is the sub-list for extension extendee
0, // [0:16] is the sub-list for field type_name
}
func init() { file_pdu_proto_init() }

View File

@ -8,6 +8,7 @@ service Replication {
returns (ListFilesystemVersionsRes);
rpc DestroySnapshots(DestroySnapshotsReq) returns (DestroySnapshotsRes);
rpc ReplicationCursor(ReplicationCursorReq) returns (ReplicationCursorRes);
rpc SendDry(SendReq) returns (SendRes);
rpc SendCompleted(SendCompletedReq) returns (SendCompletedRes);
// for Send and Recv, see package rpc
}
@ -60,9 +61,7 @@ message SendReq {
string ResumeToken = 4;
Tri Encrypted = 5;
bool DryRun = 6;
ReplicationConfig ReplicationConfig = 7;
ReplicationConfig ReplicationConfig = 6;
}
message ReplicationConfig {
@ -89,14 +88,12 @@ message Property {
message SendRes {
// Whether the resume token provided in the request has been used or not.
// If the SendReq.ResumeToken == "", this field has no meaning.
bool UsedResumeToken = 2;
// If the SendReq.ResumeToken == "", this field MUST be false.
bool UsedResumeToken = 1;
// Expected stream size determined by dry run, not exact.
// 0 indicates that for the given SendReq, no size estimate could be made.
int64 ExpectedSize = 3;
repeated Property Properties = 4;
uint64 ExpectedSize = 2;
}
message SendCompletedReq {

View File

@ -23,6 +23,7 @@ type ReplicationClient interface {
ListFilesystemVersions(ctx context.Context, in *ListFilesystemVersionsReq, opts ...grpc.CallOption) (*ListFilesystemVersionsRes, error)
DestroySnapshots(ctx context.Context, in *DestroySnapshotsReq, opts ...grpc.CallOption) (*DestroySnapshotsRes, error)
ReplicationCursor(ctx context.Context, in *ReplicationCursorReq, opts ...grpc.CallOption) (*ReplicationCursorRes, error)
SendDry(ctx context.Context, in *SendReq, opts ...grpc.CallOption) (*SendRes, error)
SendCompleted(ctx context.Context, in *SendCompletedReq, opts ...grpc.CallOption) (*SendCompletedRes, error)
}
@ -79,6 +80,15 @@ func (c *replicationClient) ReplicationCursor(ctx context.Context, in *Replicati
return out, nil
}
func (c *replicationClient) SendDry(ctx context.Context, in *SendReq, opts ...grpc.CallOption) (*SendRes, error) {
out := new(SendRes)
err := c.cc.Invoke(ctx, "/Replication/SendDry", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *replicationClient) SendCompleted(ctx context.Context, in *SendCompletedReq, opts ...grpc.CallOption) (*SendCompletedRes, error) {
out := new(SendCompletedRes)
err := c.cc.Invoke(ctx, "/Replication/SendCompleted", in, out, opts...)
@ -97,6 +107,7 @@ type ReplicationServer interface {
ListFilesystemVersions(context.Context, *ListFilesystemVersionsReq) (*ListFilesystemVersionsRes, error)
DestroySnapshots(context.Context, *DestroySnapshotsReq) (*DestroySnapshotsRes, error)
ReplicationCursor(context.Context, *ReplicationCursorReq) (*ReplicationCursorRes, error)
SendDry(context.Context, *SendReq) (*SendRes, error)
SendCompleted(context.Context, *SendCompletedReq) (*SendCompletedRes, error)
mustEmbedUnimplementedReplicationServer()
}
@ -120,6 +131,9 @@ func (UnimplementedReplicationServer) DestroySnapshots(context.Context, *Destroy
func (UnimplementedReplicationServer) ReplicationCursor(context.Context, *ReplicationCursorReq) (*ReplicationCursorRes, error) {
return nil, status.Errorf(codes.Unimplemented, "method ReplicationCursor not implemented")
}
func (UnimplementedReplicationServer) SendDry(context.Context, *SendReq) (*SendRes, error) {
return nil, status.Errorf(codes.Unimplemented, "method SendDry not implemented")
}
func (UnimplementedReplicationServer) SendCompleted(context.Context, *SendCompletedReq) (*SendCompletedRes, error) {
return nil, status.Errorf(codes.Unimplemented, "method SendCompleted not implemented")
}
@ -226,6 +240,24 @@ func _Replication_ReplicationCursor_Handler(srv interface{}, ctx context.Context
return interceptor(ctx, in, info, handler)
}
func _Replication_SendDry_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(SendReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ReplicationServer).SendDry(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/Replication/SendDry",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ReplicationServer).SendDry(ctx, req.(*SendReq))
}
return interceptor(ctx, in, info, handler)
}
func _Replication_SendCompleted_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(SendCompletedReq)
if err := dec(in); err != nil {
@ -271,6 +303,10 @@ var Replication_ServiceDesc = grpc.ServiceDesc{
MethodName: "ReplicationCursor",
Handler: _Replication_ReplicationCursor_Handler,
},
{
MethodName: "SendDry",
Handler: _Replication_SendDry_Handler,
},
{
MethodName: "SendCompleted",
Handler: _Replication_SendCompleted_Handler,

View File

@ -41,6 +41,7 @@ type Sender interface {
// any next call to the parent github.com/zrepl/zrepl/replication.Endpoint.
// If the send request is for dry run the io.ReadCloser will be nil
Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error)
SendDry(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, error)
SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error)
ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error)
}
@ -158,7 +159,7 @@ type Step struct {
encrypt tri
resumeToken string // empty means no resume token shall be used
expectedSize int64 // 0 means no size estimate present / possible
expectedSize uint64 // 0 means no size estimate present / possible
// byteCounter is nil initially, and set later in Step.doReplication
// => concurrent read of that pointer from Step.ReportInfo must be protected
@ -189,7 +190,7 @@ func (s *Step) Step(ctx context.Context) error {
func (s *Step) ReportInfo() *report.StepInfo {
// get current byteCounter value
var byteCounter int64
var byteCounter uint64
s.byteCounterMtx.Lock()
if s.byteCounter != nil {
byteCounter = s.byteCounter.Count()
@ -546,10 +547,10 @@ func (s *Step) updateSizeEstimate(ctx context.Context) error {
log := getLogger(ctx)
sr := s.buildSendRequest(true)
sr := s.buildSendRequest()
log.Debug("initiate dry run send request")
sres, _, err := s.sender.Send(ctx, sr)
sres, err := s.sender.SendDry(ctx, sr)
if err != nil {
log.WithError(err).Error("dry run send request failed")
return err
@ -563,7 +564,7 @@ func (s *Step) updateSizeEstimate(ctx context.Context) error {
return nil
}
func (s *Step) buildSendRequest(dryRun bool) (sr *pdu.SendReq) {
func (s *Step) buildSendRequest() (sr *pdu.SendReq) {
fs := s.parent.Path
sr = &pdu.SendReq{
Filesystem: fs,
@ -571,7 +572,6 @@ func (s *Step) buildSendRequest(dryRun bool) (sr *pdu.SendReq) {
To: s.to,
Encrypted: s.encrypt.ToPDU(),
ResumeToken: s.resumeToken,
DryRun: dryRun,
ReplicationConfig: s.parent.policy.ReplicationConfig,
}
return sr
@ -582,7 +582,7 @@ func (s *Step) doReplication(ctx context.Context) error {
fs := s.parent.Path
log := getLogger(ctx).WithField("filesystem", fs)
sr := s.buildSendRequest(false)
sr := s.buildSendRequest()
log.Debug("initiate send request")
sres, stream, err := s.sender.Send(ctx, sr)

View File

@ -97,11 +97,11 @@ type StepInfo struct {
From, To string
Resumed bool
Encrypted EncryptedEnum
BytesExpected int64
BytesReplicated int64
BytesExpected uint64
BytesReplicated uint64
}
func (a *AttemptReport) BytesSum() (expected, replicated int64, containsInvalidSizeEstimates bool) {
func (a *AttemptReport) BytesSum() (expected, replicated uint64, containsInvalidSizeEstimates bool) {
for _, fs := range a.Filesystems {
e, r, fsContainsInvalidEstimate := fs.BytesSum()
containsInvalidSizeEstimates = containsInvalidSizeEstimates || fsContainsInvalidEstimate
@ -111,7 +111,7 @@ func (a *AttemptReport) BytesSum() (expected, replicated int64, containsInvalidS
return expected, replicated, containsInvalidSizeEstimates
}
func (f *FilesystemReport) BytesSum() (expected, replicated int64, containsInvalidSizeEstimates bool) {
func (f *FilesystemReport) BytesSum() (expected, replicated uint64, containsInvalidSizeEstimates bool) {
for _, step := range f.Steps {
expected += step.Info.BytesExpected
replicated += step.Info.BytesReplicated

View File

@ -114,12 +114,6 @@ func (c *Client) ReqSend(ctx context.Context, req *pdu.SendReq) (*pdu.SendRes, i
if err != nil {
return nil, nil, err
}
putWireOnReturn := true
defer func() {
if putWireOnReturn {
c.putWire(conn)
}
}()
if err := c.send(ctx, conn, EndpointSend, req, nil); err != nil {
return nil, nil, err
@ -131,14 +125,10 @@ func (c *Client) ReqSend(ctx context.Context, req *pdu.SendReq) (*pdu.SendRes, i
}
var stream io.ReadCloser
if !req.DryRun {
putWireOnReturn = false
stream, err = conn.ReadStream(ZFSStream, true) // no shadow
if err != nil {
return nil, nil, err
}
stream, err = conn.ReadStream(ZFSStream, true) // no shadow
if err != nil {
return nil, nil, err
}
return &res, stream, nil
}

View File

@ -108,6 +108,13 @@ func (c *Client) Receive(ctx context.Context, req *pdu.ReceiveReq, stream io.Rea
return c.dataClient.ReqRecv(ctx, req, stream)
}
func (c *Client) SendDry(ctx context.Context, in *pdu.SendReq) (*pdu.SendRes, error) {
ctx, endSpan := trace.WithSpan(ctx, "rpc.client.SendDry")
defer endSpan()
return c.controlClient.SendDry(ctx, in)
}
func (c *Client) ListFilesystems(ctx context.Context, in *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error) {
ctx, endSpan := trace.WithSpan(ctx, "rpc.client.ListFilesystems")
defer endSpan()

View File

@ -152,7 +152,7 @@ func (m *HandshakeMessage) DecodeReader(r io.Reader, maxLen int) error {
func DoHandshakeCurrentVersion(conn net.Conn, deadline time.Time) *HandshakeError {
// current protocol version is hardcoded here
return DoHandshakeVersion(conn, deadline, 5)
return DoHandshakeVersion(conn, deadline, 6)
}
const HandshakeMessageMaxLen = 16 * 4096

View File

@ -9,7 +9,7 @@ import (
// its interface and counting the bytes written to during copying.
type ReadCloser interface {
io.ReadCloser
Count() int64
Count() uint64
}
// NewReadCloser wraps rc.
@ -19,11 +19,11 @@ func NewReadCloser(rc io.ReadCloser) ReadCloser {
type readCloser struct {
rc io.ReadCloser
count int64
count uint64
}
func (r *readCloser) Count() int64 {
return atomic.LoadInt64(&r.count)
func (r *readCloser) Count() uint64 {
return atomic.LoadUint64(&r.count)
}
var _ io.ReadCloser = &readCloser{}
@ -34,6 +34,9 @@ func (r *readCloser) Close() error {
func (r *readCloser) Read(p []byte) (int, error) {
n, err := r.rc.Read(p)
atomic.AddInt64(&r.count, int64(n))
if n < 0 {
panic("expecting n >= 0")
}
atomic.AddUint64(&r.count, uint64(n))
return n, err
}

View File

@ -73,6 +73,24 @@ func Int64(varname string, def int64) (d int64) {
return d
}
func Uint64(varname string, def uint64) (d uint64) {
var err error
if v, ok := cache.Load(varname); ok {
return v.(uint64)
}
e := os.Getenv(varname)
if e == "" {
d = def
} else {
d, err = strconv.ParseUint(e, 10, 64)
if err != nil {
panic(err)
}
}
cache.Store(varname, d)
return d
}
func Bool(varname string, def bool) (d bool) {
var err error
if v, ok := cache.Load(varname); ok {

View File

@ -7,6 +7,7 @@ import (
"encoding/json"
"fmt"
"io"
"math"
"os"
"os/exec"
"regexp"
@ -960,7 +961,7 @@ type DrySendInfo struct {
Type DrySendType
Filesystem string // parsed from To field
From, To string // direct copy from ZFS output
SizeEstimate int64 // -1 if size estimate is not possible
SizeEstimate uint64 // 0 if size estimate is not possible
}
var (
@ -1033,11 +1034,10 @@ func (s *DrySendInfo) unmarshalInfoLine(l string) (regexMatched bool, err error)
// see https://github.com/zrepl/zrepl/issues/289
fields["size"] = "0"
}
s.SizeEstimate, err = strconv.ParseInt(fields["size"], 10, 64)
s.SizeEstimate, err = strconv.ParseUint(fields["size"], 10, 64)
if err != nil {
return true, fmt.Errorf("cannot not parse size: %s", err)
}
return true, nil
}
@ -1047,6 +1047,7 @@ func ZFSSendDry(ctx context.Context, sendArgs ZFSSendArgsValidated) (_ *DrySendI
if sendArgs.From != nil && strings.Contains(sendArgs.From.RelName, "#") {
/* TODO:
* XXX feature check & support this as well
* ZFS at the time of writing does not support dry-run send because size-estimation
* uses fromSnap's deadlist. However, for a bookmark, that deadlist no longer exists.
* Redacted send & recv will bring this functionality, see
@ -1065,7 +1066,7 @@ func ZFSSendDry(ctx context.Context, sendArgs ZFSSendArgsValidated) (_ *DrySendI
Filesystem: sendArgs.FS,
From: fromAbs,
To: toAbs,
SizeEstimate: -1}, nil
SizeEstimate: 0}, nil
}
args := make([]string, 0)
@ -1085,6 +1086,19 @@ func ZFSSendDry(ctx context.Context, sendArgs ZFSSendArgsValidated) (_ *DrySendI
if err := si.unmarshalZFSOutput(output); err != nil {
return nil, fmt.Errorf("could not parse zfs send -n output: %s", err)
}
// There is a bug in OpenZFS where it estimates the size incorrectly.
// - zrepl: https://github.com/zrepl/zrepl/issues/463
// - resulting upstream bug: https://github.com/openzfs/zfs/issues/12265
//
// The wrong estimates are easy to detect because they are absurdly large.
// NB: we're doing the workaround for this late so that the test cases are not affected.
sizeEstimateThreshold := envconst.Uint64("ZREPL_ZFS_SEND_SIZE_ESTIMATE_INCORRECT_THRESHOLD", math.MaxInt64)
if sizeEstimateThreshold != 0 && si.SizeEstimate >= sizeEstimateThreshold {
debug("size estimate exceeds threshold %v, working around it: %#v %q", sizeEstimateThreshold, si, args)
si.SizeEstimate = 0
}
return &si, nil
}