show expected size of current send

Needs to be changed to send sizes for all planned steps
This commit is contained in:
Anton Schirg 2018-08-29 23:29:45 +02:00
parent 6ca11a7391
commit 98f3f3dfd8
6 changed files with 152 additions and 75 deletions

View File

@ -177,13 +177,13 @@ func (t *tui) draw() {
} }
for _, fs := range rep.Completed { for _, fs := range rep.Completed {
printFilesystem(fs, t) printFilesystem(fs, t, false)
} }
if rep.Active != nil { if rep.Active != nil {
printFilesystem(rep.Active, t) printFilesystem(rep.Active, t, true)
} }
for _, fs := range rep.Pending { for _, fs := range rep.Pending {
printFilesystem(fs, t) printFilesystem(fs, t, false)
} }
} }
@ -202,43 +202,51 @@ func rightPad(str string, length int, pad string) string {
return str + times(pad, length-len(str)) return str + times(pad, length-len(str))
} }
func (t *tui) drawBar(name string, status string, total int, done int, bytes int64) { func (t *tui) drawBar(name string, status string, bytes int64, totalBytes int64) {
t.write(rightPad(name, 20, " ")) t.write(rightPad(name, 20, " "))
t.write(" ") t.write(" ")
t.write(rightPad(status, 20, " ")) t.write(rightPad(status, 20, " "))
if total > 0 { if totalBytes > 0 {
length := 50 length := 50
completedLength := length * done / total completedLength := int(int64(length) * bytes / totalBytes)
t.write(times("=", completedLength)) t.write(times("=", completedLength))
t.write(">") t.write(">")
t.write(times("-", length-completedLength)) t.write(times("-", length-completedLength))
t.write(" ") t.write(" ")
t.write(rightPad(ByteCountBinary(bytes), 8, " ")) t.write(rightPad(ByteCountBinary(bytes) + "/" + ByteCountBinary(bytes), 16, " "))
t.printf(" %d/%d", done, total)
} }
t.newline() t.newline()
} }
func printFilesystem(rep *fsrep.Report, t *tui) { func printFilesystem(rep *fsrep.Report, t *tui, versions bool) {
bytes := int64(0) bytes := int64(0)
totalBytes := int64(0)
for _, s := range rep.Pending { for _, s := range rep.Pending {
bytes += s.Bytes bytes += s.Bytes
totalBytes += s.ExpectedBytes
} }
for _, s := range rep.Completed { for _, s := range rep.Completed {
bytes += s.Bytes bytes += s.Bytes
totalBytes += s.ExpectedBytes
} }
t.drawBar(rep.Filesystem, rep.Status, len(rep.Completed)+len(rep.Pending), len(rep.Completed), bytes) t.drawBar(rep.Filesystem, rep.Status, bytes, totalBytes)
if rep.Problem != "" { if rep.Problem != "" {
t.addIndent(1) t.addIndent(1)
t.printf("Problem: %s", rep.Problem) t.printf("Problem: %s", rep.Problem)
t.newline() t.newline()
t.addIndent(-1) t.addIndent(-1)
} }
if versions {
vs := append(rep.Completed, rep.Pending...)
for _, v := range vs {
t.drawBar(" " + v.To, v.Status, v.Bytes, v.ExpectedBytes)
}
}
} }
func ByteCountBinary(b int64) string { func ByteCountBinary(b int64) string {

View File

@ -74,11 +74,17 @@ func (p *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
if !pass { if !pass {
return nil, nil, replication.NewFilteredError(r.Filesystem) return nil, nil, replication.NewFilteredError(r.Filesystem)
} }
size, err := zfs.ZFSSendDry(r.Filesystem, r.From, r.To)
if err != nil {
return nil, nil, err
}
stream, err := zfs.ZFSSend(r.Filesystem, r.From, r.To) stream, err := zfs.ZFSSend(r.Filesystem, r.From, r.To)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
return &pdu.SendRes{}, stream, nil return &pdu.SendRes{ExpectedSize: size}, stream, nil
} }
func (p *Sender) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) { func (p *Sender) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) {

View File

@ -40,6 +40,7 @@ func getLogger(ctx context.Context) Logger {
type Sender interface { type Sender interface {
// If a non-nil io.ReadCloser is returned, it is guaranteed to be closed before // If a non-nil io.ReadCloser is returned, it is guaranteed to be closed before
// any next call to the parent github.com/zrepl/zrepl/replication.Endpoint. // any next call to the parent github.com/zrepl/zrepl/replication.Endpoint.
// returned int64 is the expected size of the stream
Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error)
SnapshotReplicationStatus(ctx context.Context, r *pdu.SnapshotReplicationStatusReq) (*pdu.SnapshotReplicationStatusRes, error) SnapshotReplicationStatus(ctx context.Context, r *pdu.SnapshotReplicationStatusReq) (*pdu.SnapshotReplicationStatusRes, error)
} }
@ -58,6 +59,7 @@ type StepReport struct {
Status string Status string
Problem string Problem string
Bytes int64 Bytes int64
ExpectedBytes int64
} }
type Report struct { type Report struct {
@ -170,7 +172,9 @@ type ReplicationStep struct {
// both retry and permanent error // both retry and permanent error
err error err error
byteCounter *util.ByteCounterReader byteCounter *util.ByteCounterReader
expectedSize int64
} }
func (f *Replication) TakeStep(ctx context.Context, sender Sender, receiver Receiver) (post State, nextStepDate time.Time) { func (f *Replication) TakeStep(ctx context.Context, sender Sender, receiver Receiver) (post State, nextStepDate time.Time) {
@ -365,6 +369,7 @@ func (s *ReplicationStep) doReplication(ctx context.Context, sender Sender, rece
return updateStateError(err) return updateStateError(err)
} }
s.expectedSize = sres.ExpectedSize
s.byteCounter = util.NewByteCounterReader(sstream) s.byteCounter = util.NewByteCounterReader(sstream)
sstream = s.byteCounter sstream = s.byteCounter
@ -459,6 +464,7 @@ func (s *ReplicationStep) Report() *StepReport {
To: s.to.RelName(), To: s.to.RelName(),
Status: s.state.String(), Status: s.state.String(),
Bytes: bytes, Bytes: bytes,
ExpectedBytes: s.expectedSize,
} }
return &rep return &rep
} }

View File

@ -38,7 +38,7 @@ func (x FilesystemVersion_VersionType) String() string {
return proto.EnumName(FilesystemVersion_VersionType_name, int32(x)) return proto.EnumName(FilesystemVersion_VersionType_name, int32(x))
} }
func (FilesystemVersion_VersionType) EnumDescriptor() ([]byte, []int) { func (FilesystemVersion_VersionType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_pdu_e1dccbd3b8cde5a3, []int{5, 0} return fileDescriptor_pdu_cc43fc3f9f8b50e6, []int{5, 0}
} }
type SnapshotReplicationStatusReq_Op int32 type SnapshotReplicationStatusReq_Op int32
@ -61,7 +61,7 @@ func (x SnapshotReplicationStatusReq_Op) String() string {
return proto.EnumName(SnapshotReplicationStatusReq_Op_name, int32(x)) return proto.EnumName(SnapshotReplicationStatusReq_Op_name, int32(x))
} }
func (SnapshotReplicationStatusReq_Op) EnumDescriptor() ([]byte, []int) { func (SnapshotReplicationStatusReq_Op) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_pdu_e1dccbd3b8cde5a3, []int{14, 0} return fileDescriptor_pdu_cc43fc3f9f8b50e6, []int{14, 0}
} }
type ListFilesystemReq struct { type ListFilesystemReq struct {
@ -74,7 +74,7 @@ func (m *ListFilesystemReq) Reset() { *m = ListFilesystemReq{} }
func (m *ListFilesystemReq) String() string { return proto.CompactTextString(m) } func (m *ListFilesystemReq) String() string { return proto.CompactTextString(m) }
func (*ListFilesystemReq) ProtoMessage() {} func (*ListFilesystemReq) ProtoMessage() {}
func (*ListFilesystemReq) Descriptor() ([]byte, []int) { func (*ListFilesystemReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e1dccbd3b8cde5a3, []int{0} return fileDescriptor_pdu_cc43fc3f9f8b50e6, []int{0}
} }
func (m *ListFilesystemReq) XXX_Unmarshal(b []byte) error { func (m *ListFilesystemReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListFilesystemReq.Unmarshal(m, b) return xxx_messageInfo_ListFilesystemReq.Unmarshal(m, b)
@ -105,7 +105,7 @@ func (m *ListFilesystemRes) Reset() { *m = ListFilesystemRes{} }
func (m *ListFilesystemRes) String() string { return proto.CompactTextString(m) } func (m *ListFilesystemRes) String() string { return proto.CompactTextString(m) }
func (*ListFilesystemRes) ProtoMessage() {} func (*ListFilesystemRes) ProtoMessage() {}
func (*ListFilesystemRes) Descriptor() ([]byte, []int) { func (*ListFilesystemRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e1dccbd3b8cde5a3, []int{1} return fileDescriptor_pdu_cc43fc3f9f8b50e6, []int{1}
} }
func (m *ListFilesystemRes) XXX_Unmarshal(b []byte) error { func (m *ListFilesystemRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListFilesystemRes.Unmarshal(m, b) return xxx_messageInfo_ListFilesystemRes.Unmarshal(m, b)
@ -144,7 +144,7 @@ func (m *Filesystem) Reset() { *m = Filesystem{} }
func (m *Filesystem) String() string { return proto.CompactTextString(m) } func (m *Filesystem) String() string { return proto.CompactTextString(m) }
func (*Filesystem) ProtoMessage() {} func (*Filesystem) ProtoMessage() {}
func (*Filesystem) Descriptor() ([]byte, []int) { func (*Filesystem) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e1dccbd3b8cde5a3, []int{2} return fileDescriptor_pdu_cc43fc3f9f8b50e6, []int{2}
} }
func (m *Filesystem) XXX_Unmarshal(b []byte) error { func (m *Filesystem) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Filesystem.Unmarshal(m, b) return xxx_messageInfo_Filesystem.Unmarshal(m, b)
@ -189,7 +189,7 @@ func (m *ListFilesystemVersionsReq) Reset() { *m = ListFilesystemVersion
func (m *ListFilesystemVersionsReq) String() string { return proto.CompactTextString(m) } func (m *ListFilesystemVersionsReq) String() string { return proto.CompactTextString(m) }
func (*ListFilesystemVersionsReq) ProtoMessage() {} func (*ListFilesystemVersionsReq) ProtoMessage() {}
func (*ListFilesystemVersionsReq) Descriptor() ([]byte, []int) { func (*ListFilesystemVersionsReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e1dccbd3b8cde5a3, []int{3} return fileDescriptor_pdu_cc43fc3f9f8b50e6, []int{3}
} }
func (m *ListFilesystemVersionsReq) XXX_Unmarshal(b []byte) error { func (m *ListFilesystemVersionsReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListFilesystemVersionsReq.Unmarshal(m, b) return xxx_messageInfo_ListFilesystemVersionsReq.Unmarshal(m, b)
@ -227,7 +227,7 @@ func (m *ListFilesystemVersionsRes) Reset() { *m = ListFilesystemVersion
func (m *ListFilesystemVersionsRes) String() string { return proto.CompactTextString(m) } func (m *ListFilesystemVersionsRes) String() string { return proto.CompactTextString(m) }
func (*ListFilesystemVersionsRes) ProtoMessage() {} func (*ListFilesystemVersionsRes) ProtoMessage() {}
func (*ListFilesystemVersionsRes) Descriptor() ([]byte, []int) { func (*ListFilesystemVersionsRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e1dccbd3b8cde5a3, []int{4} return fileDescriptor_pdu_cc43fc3f9f8b50e6, []int{4}
} }
func (m *ListFilesystemVersionsRes) XXX_Unmarshal(b []byte) error { func (m *ListFilesystemVersionsRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListFilesystemVersionsRes.Unmarshal(m, b) return xxx_messageInfo_ListFilesystemVersionsRes.Unmarshal(m, b)
@ -269,7 +269,7 @@ func (m *FilesystemVersion) Reset() { *m = FilesystemVersion{} }
func (m *FilesystemVersion) String() string { return proto.CompactTextString(m) } func (m *FilesystemVersion) String() string { return proto.CompactTextString(m) }
func (*FilesystemVersion) ProtoMessage() {} func (*FilesystemVersion) ProtoMessage() {}
func (*FilesystemVersion) Descriptor() ([]byte, []int) { func (*FilesystemVersion) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e1dccbd3b8cde5a3, []int{5} return fileDescriptor_pdu_cc43fc3f9f8b50e6, []int{5}
} }
func (m *FilesystemVersion) XXX_Unmarshal(b []byte) error { func (m *FilesystemVersion) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_FilesystemVersion.Unmarshal(m, b) return xxx_messageInfo_FilesystemVersion.Unmarshal(m, b)
@ -348,7 +348,7 @@ func (m *SendReq) Reset() { *m = SendReq{} }
func (m *SendReq) String() string { return proto.CompactTextString(m) } func (m *SendReq) String() string { return proto.CompactTextString(m) }
func (*SendReq) ProtoMessage() {} func (*SendReq) ProtoMessage() {}
func (*SendReq) Descriptor() ([]byte, []int) { func (*SendReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e1dccbd3b8cde5a3, []int{6} return fileDescriptor_pdu_cc43fc3f9f8b50e6, []int{6}
} }
func (m *SendReq) XXX_Unmarshal(b []byte) error { func (m *SendReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SendReq.Unmarshal(m, b) return xxx_messageInfo_SendReq.Unmarshal(m, b)
@ -422,7 +422,7 @@ func (m *Property) Reset() { *m = Property{} }
func (m *Property) String() string { return proto.CompactTextString(m) } func (m *Property) String() string { return proto.CompactTextString(m) }
func (*Property) ProtoMessage() {} func (*Property) ProtoMessage() {}
func (*Property) Descriptor() ([]byte, []int) { func (*Property) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e1dccbd3b8cde5a3, []int{7} return fileDescriptor_pdu_cc43fc3f9f8b50e6, []int{7}
} }
func (m *Property) XXX_Unmarshal(b []byte) error { func (m *Property) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Property.Unmarshal(m, b) return xxx_messageInfo_Property.Unmarshal(m, b)
@ -459,7 +459,9 @@ func (m *Property) GetValue() string {
type SendRes struct { type SendRes struct {
// Whether the resume token provided in the request has been used or not. // Whether the resume token provided in the request has been used or not.
UsedResumeToken bool `protobuf:"varint,1,opt,name=UsedResumeToken,proto3" json:"UsedResumeToken,omitempty"` UsedResumeToken bool `protobuf:"varint,1,opt,name=UsedResumeToken,proto3" json:"UsedResumeToken,omitempty"`
Properties []*Property `protobuf:"bytes,2,rep,name=Properties,proto3" json:"Properties,omitempty"` // Expected stream size determined by dry run, not exact
ExpectedSize int64 `protobuf:"varint,2,opt,name=ExpectedSize,proto3" json:"ExpectedSize,omitempty"`
Properties []*Property `protobuf:"bytes,3,rep,name=Properties,proto3" json:"Properties,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@ -469,7 +471,7 @@ func (m *SendRes) Reset() { *m = SendRes{} }
func (m *SendRes) String() string { return proto.CompactTextString(m) } func (m *SendRes) String() string { return proto.CompactTextString(m) }
func (*SendRes) ProtoMessage() {} func (*SendRes) ProtoMessage() {}
func (*SendRes) Descriptor() ([]byte, []int) { func (*SendRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e1dccbd3b8cde5a3, []int{8} return fileDescriptor_pdu_cc43fc3f9f8b50e6, []int{8}
} }
func (m *SendRes) XXX_Unmarshal(b []byte) error { func (m *SendRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SendRes.Unmarshal(m, b) return xxx_messageInfo_SendRes.Unmarshal(m, b)
@ -496,6 +498,13 @@ func (m *SendRes) GetUsedResumeToken() bool {
return false return false
} }
func (m *SendRes) GetExpectedSize() int64 {
if m != nil {
return m.ExpectedSize
}
return 0
}
func (m *SendRes) GetProperties() []*Property { func (m *SendRes) GetProperties() []*Property {
if m != nil { if m != nil {
return m.Properties return m.Properties
@ -516,7 +525,7 @@ func (m *ReceiveReq) Reset() { *m = ReceiveReq{} }
func (m *ReceiveReq) String() string { return proto.CompactTextString(m) } func (m *ReceiveReq) String() string { return proto.CompactTextString(m) }
func (*ReceiveReq) ProtoMessage() {} func (*ReceiveReq) ProtoMessage() {}
func (*ReceiveReq) Descriptor() ([]byte, []int) { func (*ReceiveReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e1dccbd3b8cde5a3, []int{9} return fileDescriptor_pdu_cc43fc3f9f8b50e6, []int{9}
} }
func (m *ReceiveReq) XXX_Unmarshal(b []byte) error { func (m *ReceiveReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReceiveReq.Unmarshal(m, b) return xxx_messageInfo_ReceiveReq.Unmarshal(m, b)
@ -560,7 +569,7 @@ func (m *ReceiveRes) Reset() { *m = ReceiveRes{} }
func (m *ReceiveRes) String() string { return proto.CompactTextString(m) } func (m *ReceiveRes) String() string { return proto.CompactTextString(m) }
func (*ReceiveRes) ProtoMessage() {} func (*ReceiveRes) ProtoMessage() {}
func (*ReceiveRes) Descriptor() ([]byte, []int) { func (*ReceiveRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e1dccbd3b8cde5a3, []int{10} return fileDescriptor_pdu_cc43fc3f9f8b50e6, []int{10}
} }
func (m *ReceiveRes) XXX_Unmarshal(b []byte) error { func (m *ReceiveRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReceiveRes.Unmarshal(m, b) return xxx_messageInfo_ReceiveRes.Unmarshal(m, b)
@ -593,7 +602,7 @@ func (m *DestroySnapshotsReq) Reset() { *m = DestroySnapshotsReq{} }
func (m *DestroySnapshotsReq) String() string { return proto.CompactTextString(m) } func (m *DestroySnapshotsReq) String() string { return proto.CompactTextString(m) }
func (*DestroySnapshotsReq) ProtoMessage() {} func (*DestroySnapshotsReq) ProtoMessage() {}
func (*DestroySnapshotsReq) Descriptor() ([]byte, []int) { func (*DestroySnapshotsReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e1dccbd3b8cde5a3, []int{11} return fileDescriptor_pdu_cc43fc3f9f8b50e6, []int{11}
} }
func (m *DestroySnapshotsReq) XXX_Unmarshal(b []byte) error { func (m *DestroySnapshotsReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DestroySnapshotsReq.Unmarshal(m, b) return xxx_messageInfo_DestroySnapshotsReq.Unmarshal(m, b)
@ -639,7 +648,7 @@ func (m *DestroySnapshotRes) Reset() { *m = DestroySnapshotRes{} }
func (m *DestroySnapshotRes) String() string { return proto.CompactTextString(m) } func (m *DestroySnapshotRes) String() string { return proto.CompactTextString(m) }
func (*DestroySnapshotRes) ProtoMessage() {} func (*DestroySnapshotRes) ProtoMessage() {}
func (*DestroySnapshotRes) Descriptor() ([]byte, []int) { func (*DestroySnapshotRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e1dccbd3b8cde5a3, []int{12} return fileDescriptor_pdu_cc43fc3f9f8b50e6, []int{12}
} }
func (m *DestroySnapshotRes) XXX_Unmarshal(b []byte) error { func (m *DestroySnapshotRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DestroySnapshotRes.Unmarshal(m, b) return xxx_messageInfo_DestroySnapshotRes.Unmarshal(m, b)
@ -684,7 +693,7 @@ func (m *DestroySnapshotsRes) Reset() { *m = DestroySnapshotsRes{} }
func (m *DestroySnapshotsRes) String() string { return proto.CompactTextString(m) } func (m *DestroySnapshotsRes) String() string { return proto.CompactTextString(m) }
func (*DestroySnapshotsRes) ProtoMessage() {} func (*DestroySnapshotsRes) ProtoMessage() {}
func (*DestroySnapshotsRes) Descriptor() ([]byte, []int) { func (*DestroySnapshotsRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e1dccbd3b8cde5a3, []int{13} return fileDescriptor_pdu_cc43fc3f9f8b50e6, []int{13}
} }
func (m *DestroySnapshotsRes) XXX_Unmarshal(b []byte) error { func (m *DestroySnapshotsRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DestroySnapshotsRes.Unmarshal(m, b) return xxx_messageInfo_DestroySnapshotsRes.Unmarshal(m, b)
@ -724,7 +733,7 @@ func (m *SnapshotReplicationStatusReq) Reset() { *m = SnapshotReplicatio
func (m *SnapshotReplicationStatusReq) String() string { return proto.CompactTextString(m) } func (m *SnapshotReplicationStatusReq) String() string { return proto.CompactTextString(m) }
func (*SnapshotReplicationStatusReq) ProtoMessage() {} func (*SnapshotReplicationStatusReq) ProtoMessage() {}
func (*SnapshotReplicationStatusReq) Descriptor() ([]byte, []int) { func (*SnapshotReplicationStatusReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e1dccbd3b8cde5a3, []int{14} return fileDescriptor_pdu_cc43fc3f9f8b50e6, []int{14}
} }
func (m *SnapshotReplicationStatusReq) XXX_Unmarshal(b []byte) error { func (m *SnapshotReplicationStatusReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SnapshotReplicationStatusReq.Unmarshal(m, b) return xxx_messageInfo_SnapshotReplicationStatusReq.Unmarshal(m, b)
@ -776,7 +785,7 @@ func (m *SnapshotReplicationStatusRes) Reset() { *m = SnapshotReplicatio
func (m *SnapshotReplicationStatusRes) String() string { return proto.CompactTextString(m) } func (m *SnapshotReplicationStatusRes) String() string { return proto.CompactTextString(m) }
func (*SnapshotReplicationStatusRes) ProtoMessage() {} func (*SnapshotReplicationStatusRes) ProtoMessage() {}
func (*SnapshotReplicationStatusRes) Descriptor() ([]byte, []int) { func (*SnapshotReplicationStatusRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e1dccbd3b8cde5a3, []int{15} return fileDescriptor_pdu_cc43fc3f9f8b50e6, []int{15}
} }
func (m *SnapshotReplicationStatusRes) XXX_Unmarshal(b []byte) error { func (m *SnapshotReplicationStatusRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SnapshotReplicationStatusRes.Unmarshal(m, b) return xxx_messageInfo_SnapshotReplicationStatusRes.Unmarshal(m, b)
@ -824,46 +833,47 @@ func init() {
proto.RegisterEnum("pdu.SnapshotReplicationStatusReq_Op", SnapshotReplicationStatusReq_Op_name, SnapshotReplicationStatusReq_Op_value) proto.RegisterEnum("pdu.SnapshotReplicationStatusReq_Op", SnapshotReplicationStatusReq_Op_name, SnapshotReplicationStatusReq_Op_value)
} }
func init() { proto.RegisterFile("pdu.proto", fileDescriptor_pdu_e1dccbd3b8cde5a3) } func init() { proto.RegisterFile("pdu.proto", fileDescriptor_pdu_cc43fc3f9f8b50e6) }
var fileDescriptor_pdu_e1dccbd3b8cde5a3 = []byte{ var fileDescriptor_pdu_cc43fc3f9f8b50e6 = []byte{
// 595 bytes of a gzipped FileDescriptorProto // 616 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x54, 0xdd, 0x6e, 0x12, 0x41, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x54, 0xdb, 0x6e, 0x13, 0x3d,
0x14, 0x76, 0x17, 0x5a, 0x96, 0x43, 0x4b, 0x61, 0xda, 0xe8, 0xda, 0x34, 0x86, 0x4c, 0xbc, 0x40, 0x10, 0xfe, 0x77, 0x37, 0x6d, 0x37, 0x93, 0x1e, 0x52, 0xb7, 0xfa, 0x59, 0xaa, 0x0a, 0x45, 0x16,
0x13, 0x49, 0x44, 0xe2, 0x8d, 0x89, 0x17, 0x6d, 0x05, 0x2f, 0x8c, 0x34, 0x03, 0x36, 0xbd, 0x32, 0x17, 0x01, 0x89, 0x48, 0x84, 0x8a, 0x1b, 0x24, 0x2e, 0x7a, 0xe4, 0x02, 0xd1, 0xca, 0x09, 0x55,
0xd9, 0x76, 0x4f, 0xd2, 0x0d, 0x2c, 0x33, 0x9d, 0xd9, 0x35, 0xe1, 0x71, 0x7c, 0x05, 0x9f, 0xc6, 0xaf, 0x90, 0x96, 0xee, 0x48, 0x5d, 0x25, 0x1b, 0xbb, 0xb6, 0x17, 0x11, 0x1e, 0x80, 0xf7, 0xe0,
0xc7, 0x31, 0x33, 0xec, 0xcf, 0x08, 0x2d, 0xe1, 0x8a, 0xf9, 0xce, 0xf9, 0x38, 0xe7, 0x3b, 0x7f, 0x15, 0x78, 0x1a, 0x1e, 0x07, 0xd9, 0xd9, 0x83, 0x9b, 0x94, 0x28, 0x57, 0xeb, 0xf9, 0xe6, 0xdb,
0x0b, 0x75, 0x11, 0xa6, 0x3d, 0x21, 0x79, 0xc2, 0x49, 0x45, 0x84, 0x29, 0x3d, 0x86, 0xf6, 0xb7, 0x99, 0x6f, 0x66, 0x3c, 0x86, 0xa6, 0x48, 0xf2, 0x9e, 0x90, 0x5c, 0x73, 0x12, 0x88, 0x24, 0xa7,
0x48, 0x25, 0xc3, 0x68, 0x8e, 0x6a, 0xa9, 0x12, 0x8c, 0x19, 0x3e, 0xd0, 0xe1, 0xa6, 0x51, 0x91, 0x7b, 0xb0, 0xfb, 0x31, 0x55, 0xfa, 0x3c, 0x1d, 0xa3, 0x9a, 0x2a, 0x8d, 0x19, 0xc3, 0x7b, 0x7a,
0xf7, 0xd0, 0x28, 0x0d, 0xca, 0x77, 0x3a, 0x95, 0x6e, 0xa3, 0x7f, 0xd4, 0xd3, 0xf1, 0x2c, 0xa2, 0xbe, 0x08, 0x2a, 0xf2, 0x1a, 0x5a, 0x35, 0xa0, 0x22, 0xaf, 0x13, 0x74, 0x5b, 0xfd, 0x9d, 0x9e,
0xcd, 0xa1, 0xe7, 0x00, 0x25, 0x24, 0x04, 0xaa, 0x57, 0x41, 0x72, 0xef, 0x3b, 0x1d, 0xa7, 0x5b, 0x89, 0xe7, 0x10, 0x5d, 0x0e, 0x3d, 0x06, 0xa8, 0x4d, 0x42, 0xa0, 0x71, 0x15, 0xeb, 0xbb, 0xc8,
0x67, 0xe6, 0x4d, 0x3a, 0xd0, 0x60, 0xa8, 0xd2, 0x18, 0xa7, 0x7c, 0x86, 0x0b, 0xdf, 0x35, 0x2e, 0xeb, 0x78, 0xdd, 0x26, 0xb3, 0x67, 0xd2, 0x81, 0x16, 0x43, 0x95, 0x67, 0x38, 0xe4, 0x23, 0x9c,
0xdb, 0x44, 0x3f, 0xc1, 0xcb, 0xff, 0xb5, 0x5c, 0xa3, 0x54, 0x11, 0x5f, 0x28, 0x86, 0x0f, 0xe4, 0x44, 0xbe, 0x75, 0xb9, 0x10, 0x7d, 0x07, 0x4f, 0x1f, 0x6a, 0xb9, 0x46, 0xa9, 0x52, 0x3e, 0x51,
0x95, 0x9d, 0x20, 0x0b, 0x6c, 0x59, 0xe8, 0xf8, 0xe9, 0x3f, 0x2b, 0xd2, 0x07, 0x2f, 0x87, 0x59, 0x0c, 0xef, 0xc9, 0x33, 0x37, 0x41, 0x11, 0xd8, 0x41, 0xe8, 0xe5, 0xbf, 0x7f, 0x56, 0xa4, 0x0f,
0x35, 0xcf, 0xd7, 0xaa, 0xc9, 0xdc, 0xac, 0xe0, 0xd1, 0xbf, 0x0e, 0xb4, 0x37, 0xfc, 0xe4, 0x23, 0x61, 0x69, 0x16, 0xd5, 0xfc, 0x3f, 0x57, 0x4d, 0xe1, 0x66, 0x15, 0x8f, 0xfe, 0xf1, 0x60, 0x77,
0x54, 0xa7, 0x4b, 0x81, 0x46, 0x40, 0xb3, 0x4f, 0x1f, 0x8f, 0xd2, 0xcb, 0x7e, 0x35, 0x93, 0x19, 0xc1, 0x4f, 0xde, 0x42, 0x63, 0x38, 0x15, 0x68, 0x05, 0x6c, 0xf7, 0xe9, 0xe3, 0x51, 0x7a, 0xc5,
0xbe, 0xee, 0xc8, 0xf7, 0x20, 0xc6, 0xac, 0x6c, 0xf3, 0xd6, 0xb6, 0x51, 0x1a, 0x85, 0x7e, 0xa5, 0xd7, 0x30, 0x99, 0xe5, 0x9b, 0x8e, 0x7c, 0x8a, 0x33, 0x2c, 0xca, 0xb6, 0x67, 0x83, 0x5d, 0xe4,
0xe3, 0x74, 0xab, 0xcc, 0xbc, 0xc9, 0x19, 0xd4, 0x2f, 0x24, 0x06, 0x09, 0x4e, 0x6f, 0x46, 0x7e, 0x69, 0x12, 0x05, 0x1d, 0xaf, 0xdb, 0x60, 0xf6, 0x4c, 0x0e, 0xa1, 0x79, 0x22, 0x31, 0xd6, 0x38,
0xd5, 0x38, 0x4a, 0x03, 0x39, 0x05, 0xcf, 0x80, 0x88, 0x2f, 0xfc, 0x3d, 0x13, 0xa9, 0xc0, 0xf4, 0xbc, 0xb9, 0x88, 0x1a, 0xd6, 0x51, 0x03, 0xe4, 0x00, 0x42, 0x6b, 0xa4, 0x7c, 0x12, 0xad, 0xd9,
0x0d, 0x34, 0xac, 0xb4, 0xe4, 0x00, 0xbc, 0xc9, 0x22, 0x10, 0xea, 0x9e, 0x27, 0xad, 0x67, 0x1a, 0x48, 0x95, 0x4d, 0x5f, 0x40, 0xcb, 0x49, 0x4b, 0x36, 0x21, 0x1c, 0x4c, 0x62, 0xa1, 0xee, 0xb8,
0x9d, 0x73, 0x3e, 0x8b, 0x03, 0x39, 0x6b, 0x39, 0xf4, 0xb7, 0x03, 0xb5, 0x09, 0x2e, 0xc2, 0x1d, 0x6e, 0xff, 0x67, 0xac, 0x63, 0xce, 0x47, 0x59, 0x2c, 0x47, 0x6d, 0x8f, 0xfe, 0xf2, 0x60, 0x63,
0xfa, 0xaa, 0x45, 0x0e, 0x25, 0x8f, 0x73, 0xe1, 0xfa, 0x4d, 0x9a, 0xe0, 0x4e, 0xb9, 0x91, 0x5d, 0x80, 0x93, 0x64, 0x85, 0xbe, 0x1a, 0x91, 0xe7, 0x92, 0x67, 0xa5, 0x70, 0x73, 0x26, 0xdb, 0xe0,
0x67, 0xee, 0x94, 0xaf, 0x8f, 0xb6, 0xba, 0x31, 0x5a, 0x23, 0x9c, 0xc7, 0x42, 0xa2, 0x52, 0x46, 0x0f, 0xb9, 0x95, 0xdd, 0x64, 0xfe, 0x90, 0xcf, 0x8f, 0xb6, 0xb1, 0x30, 0x5a, 0x2b, 0x9c, 0x67,
0xb8, 0xc7, 0x0a, 0x4c, 0x4e, 0x60, 0xef, 0x12, 0xc3, 0x54, 0xf8, 0xfb, 0xc6, 0xb1, 0x02, 0x74, 0x42, 0xa2, 0x52, 0x56, 0x78, 0xc8, 0x2a, 0x9b, 0xec, 0xc3, 0xda, 0x29, 0x26, 0xb9, 0x88, 0xd6,
0x00, 0xde, 0x95, 0xe4, 0x02, 0x65, 0xb2, 0x2c, 0x9a, 0xe7, 0x58, 0xcd, 0x3b, 0x81, 0xbd, 0xeb, 0xad, 0x63, 0x66, 0xd0, 0x23, 0x08, 0xaf, 0x24, 0x17, 0x28, 0xf5, 0xb4, 0x6a, 0x9e, 0xe7, 0x34,
0x60, 0x9e, 0xe6, 0x1d, 0x5d, 0x01, 0x7a, 0x9b, 0x17, 0xa6, 0x48, 0x17, 0x8e, 0x7e, 0x28, 0x0c, 0x6f, 0x1f, 0xd6, 0xae, 0xe3, 0x71, 0x5e, 0x76, 0x74, 0x66, 0xd0, 0x9f, 0x55, 0x65, 0x8a, 0x74,
0x6d, 0x61, 0x8e, 0x49, 0xb0, 0x6e, 0x26, 0xef, 0x00, 0xb2, 0x54, 0x11, 0x2a, 0xdf, 0x35, 0xfb, 0x61, 0xe7, 0xb3, 0xc2, 0xc4, 0x55, 0xe6, 0xd9, 0x0c, 0xf3, 0x30, 0xa1, 0xb0, 0x79, 0xf6, 0x5d,
0x71, 0x68, 0x26, 0x9b, 0x2b, 0x60, 0x16, 0x81, 0xde, 0x00, 0x30, 0xbc, 0xc3, 0xe8, 0x17, 0xee, 0xe0, 0xad, 0xc6, 0x64, 0x90, 0xfe, 0x98, 0x85, 0x0c, 0xd8, 0x03, 0x8c, 0xbc, 0x02, 0x28, 0xf4,
0xd2, 0xbf, 0xb7, 0xd0, 0xba, 0x98, 0x63, 0x20, 0xd7, 0x77, 0xdf, 0x63, 0x1b, 0x76, 0x7a, 0x60, 0xa4, 0xa8, 0xa2, 0xc0, 0x5e, 0xa2, 0x2d, 0x3b, 0xfe, 0x52, 0x26, 0x73, 0x08, 0xf4, 0x06, 0x80,
0x45, 0x56, 0x74, 0x06, 0xc7, 0x97, 0xa8, 0x12, 0xc9, 0x97, 0xf9, 0x20, 0x77, 0x39, 0x04, 0x32, 0xe1, 0x2d, 0xa6, 0xdf, 0x70, 0x95, 0x26, 0xbf, 0x84, 0xf6, 0xc9, 0x18, 0x63, 0x39, 0xbf, 0x20,
0x80, 0x7a, 0xc1, 0xcf, 0x8a, 0x79, 0x6a, 0xd9, 0x4b, 0x22, 0xfd, 0x09, 0x64, 0x2d, 0x59, 0x76, 0x21, 0x5b, 0xc0, 0xe9, 0xa6, 0x13, 0x59, 0xd1, 0x11, 0xec, 0x9d, 0xa2, 0xd2, 0x92, 0x4f, 0xcb,
0x37, 0x39, 0x34, 0x99, 0xb6, 0xdc, 0x4d, 0xce, 0xd3, 0x83, 0xf9, 0x22, 0x25, 0x97, 0xf9, 0x60, 0x69, 0xaf, 0xb2, 0x2d, 0xe4, 0x08, 0x9a, 0x15, 0x3f, 0xf2, 0x97, 0x6e, 0x44, 0x4d, 0xa4, 0x5f,
0x0c, 0xa0, 0x5f, 0x1f, 0x2b, 0x46, 0x7f, 0x69, 0x6a, 0xba, 0x01, 0xf3, 0x24, 0xbf, 0xcb, 0x17, 0x80, 0xcc, 0x25, 0x2b, 0x96, 0xab, 0x34, 0x6d, 0xa6, 0x25, 0xcb, 0x55, 0xf2, 0xcc, 0xf4, 0xce,
0x26, 0xfe, 0xa6, 0x14, 0x96, 0xf3, 0xe8, 0x1f, 0x07, 0xce, 0x4a, 0x87, 0x98, 0x47, 0x77, 0x66, 0xa4, 0xe4, 0xb2, 0x9c, 0x9e, 0x35, 0xe8, 0x87, 0xc7, 0x8a, 0x31, 0xcf, 0xd1, 0x86, 0x69, 0xc0,
0xff, 0x27, 0x49, 0x90, 0xa4, 0x3b, 0x35, 0xe8, 0xd4, 0x2a, 0x6a, 0xa5, 0xb1, 0x14, 0x3f, 0x00, 0x58, 0x97, 0xcb, 0xfb, 0xc4, 0xc6, 0x5f, 0x94, 0xc2, 0x4a, 0x1e, 0xfd, 0xed, 0xc1, 0x61, 0xed,
0x97, 0x0b, 0xb3, 0xd9, 0xcd, 0xfe, 0x6b, 0x23, 0x65, 0x5b, 0xaa, 0xde, 0x58, 0x30, 0x97, 0x0b, 0x10, 0xe3, 0xf4, 0xd6, 0x2e, 0xc9, 0x40, 0xc7, 0x3a, 0x5f, 0xa9, 0x41, 0x07, 0x4e, 0x51, 0x33,
0xda, 0x01, 0x77, 0x2c, 0x48, 0x0d, 0x2a, 0x23, 0xd4, 0xc7, 0xd6, 0x86, 0xc3, 0x09, 0x16, 0x7f, 0x8d, 0xb5, 0xf8, 0x23, 0xf0, 0xb9, 0xb0, 0xd7, 0x7f, 0xbb, 0xff, 0xdc, 0x4a, 0x59, 0x96, 0xaa,
0xc0, 0xb0, 0xe5, 0xd0, 0xcf, 0x5b, 0x35, 0x2b, 0xad, 0xb9, 0xe4, 0x67, 0x7b, 0x6a, 0x59, 0x6e, 0x77, 0x29, 0x98, 0xcf, 0x05, 0xed, 0x80, 0x7f, 0x29, 0xc8, 0x06, 0x04, 0x17, 0x68, 0x36, 0x72,
0xf7, 0xcd, 0x77, 0xfc, 0xc3, 0xbf, 0x00, 0x00, 0x00, 0xff, 0xff, 0xf6, 0x1a, 0xe7, 0xf6, 0xd4, 0x17, 0xb6, 0x06, 0x58, 0xfd, 0x80, 0x49, 0xdb, 0xa3, 0xef, 0x97, 0x6a, 0x56, 0x46, 0x73, 0xcd,
0x05, 0x00, 0x00, 0x2f, 0xee, 0xb2, 0x83, 0x7c, 0x5d, 0xb7, 0x8f, 0xfd, 0x9b, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff,
0x47, 0xbd, 0x08, 0x65, 0xf9, 0x05, 0x00, 0x00,
} }

View File

@ -63,7 +63,10 @@ message SendRes {
// Whether the resume token provided in the request has been used or not. // Whether the resume token provided in the request has been used or not.
bool UsedResumeToken = 1; bool UsedResumeToken = 1;
repeated Property Properties = 2; // Expected stream size determined by dry run, not exact
int64 ExpectedSize = 2;
repeated Property Properties = 3;
} }
message ReceiveReq { message ReceiveReq {

View File

@ -15,6 +15,7 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/util" "github.com/zrepl/zrepl/util"
"regexp" "regexp"
"strconv"
) )
type DatasetPath struct { type DatasetPath struct {
@ -308,6 +309,49 @@ func ZFSSend(fs string, from, to string) (stream io.ReadCloser, err error) {
return return
} }
func ZFSSendDry(fs string, from, to string) (size int64, err error) {
fromV, err := absVersion(fs, from)
if err != nil {
return 0, err
}
toV := ""
if to != "" {
toV, err = absVersion(fs, to)
if err != nil {
return 0, err
}
}
args := make([]string, 0)
args = append(args, "send", "-n", "-v", "-P")
if toV == "" { // Initial
args = append(args, fromV)
} else {
args = append(args, "-i", fromV, toV)
}
cmd := exec.Command(ZFS_BINARY, args...)
output, err := cmd.CombinedOutput()
if err != nil {
return 0, err
}
o := string(output)
lines := strings.Split(o, "\n")
if len(lines) < 2 {
return 0, errors.New("zfs send -n did not return the expected number of lines")
}
fields := strings.Fields(lines[1])
if len(fields) != 2 {
return 0, errors.New("zfs send -n returned unexpexted output")
}
size, err = strconv.ParseInt(fields[1], 10, 64)
return size, err
}
func ZFSRecv(fs string, stream io.Reader, additionalArgs ...string) (err error) { func ZFSRecv(fs string, stream io.Reader, additionalArgs ...string) (err error) {
if err := validateZFSFilesystem(fs); err != nil { if err := validateZFSFilesystem(fs); err != nil {