handle changes to placeholder state correctly

We assumed that `zfs recv -F FS` would basically replace FS inplace, leaving its children untouched.
That is in fact not the case, it only works if `zfs send -R` is set, which we don't do.

Thus, implement the required functionality manually.

This solves a `zfs recv` error that would occur when a filesystem previously created as placeholder on the receiving side becomes a non-placeholder filesystem (likely due to config change on the sending side):

  zfs send pool1/foo@1 | zfs recv -F pool1/bar
  cannot receive new filesystem stream:
  destination has snapshots (eg. pool1/bar)
  must destroy them to overwrite it
This commit is contained in:
Christian Schwarz 2019-03-13 18:33:20 +01:00
parent 1eb0f12a61
commit d50e553ebb
6 changed files with 196 additions and 116 deletions

View File

@ -48,9 +48,10 @@ func (s *Sender) ListFilesystems(ctx context.Context, r *pdu.ListFilesystemReq)
rfss[i] = &pdu.Filesystem{ rfss[i] = &pdu.Filesystem{
Path: fss[i].ToString(), Path: fss[i].ToString(),
// FIXME: not supporting ResumeToken yet // FIXME: not supporting ResumeToken yet
IsPlaceholder: false, // sender FSs are never placeholders
} }
} }
res := &pdu.ListFilesystemRes{Filesystems: rfss, Empty: len(rfss) == 0} res := &pdu.ListFilesystemRes{Filesystems: rfss}
return res, nil return res, nil
} }
@ -118,7 +119,7 @@ func (p *Sender) PingDataconn(ctx context.Context, req *pdu.PingReq) (*pdu.PingR
return p.Ping(ctx, req) return p.Ping(ctx, req)
} }
func (p *Sender) WaitForConnectivity(ctx context.Context) (error) { func (p *Sender) WaitForConnectivity(ctx context.Context) error {
return nil return nil
} }
@ -243,7 +244,7 @@ func (s *Receiver) ListFilesystems(ctx context.Context, req *pdu.ListFilesystemR
if err != nil { if err != nil {
return nil, err return nil, err
} }
// present without prefix, and only those that are not placeholders // present filesystem without the root_fs prefix
fss := make([]*pdu.Filesystem, 0, len(filtered)) fss := make([]*pdu.Filesystem, 0, len(filtered))
for _, a := range filtered { for _, a := range filtered {
ph, err := zfs.ZFSIsPlaceholderFilesystem(a) ph, err := zfs.ZFSIsPlaceholderFilesystem(a)
@ -254,21 +255,16 @@ func (s *Receiver) ListFilesystems(ctx context.Context, req *pdu.ListFilesystemR
Error("inconsistent placeholder property") Error("inconsistent placeholder property")
return nil, errors.New("server error: inconsistent placeholder property") // don't leak path return nil, errors.New("server error: inconsistent placeholder property") // don't leak path
} }
if ph {
getLogger(ctx).
WithField("fs", a.ToString()).
Debug("ignoring placeholder filesystem")
continue
}
getLogger(ctx). getLogger(ctx).
WithField("fs", a.ToString()). WithField("fs", a.ToString()).
Debug("non-placeholder filesystem") WithField("is_placeholder", ph).
Debug("filesystem")
a.TrimPrefix(root) a.TrimPrefix(root)
fss = append(fss, &pdu.Filesystem{Path: a.ToString()}) fss = append(fss, &pdu.Filesystem{Path: a.ToString(), IsPlaceholder: ph})
} }
if len(fss) == 0 { if len(fss) == 0 {
getLogger(ctx).Debug("no non-placeholder filesystems") getLogger(ctx).Debug("no filesystems found")
return &pdu.ListFilesystemRes{Empty: true}, nil return &pdu.ListFilesystemRes{}, nil
} }
return &pdu.ListFilesystemRes{Filesystems: fss}, nil return &pdu.ListFilesystemRes{Filesystems: fss}, nil
} }
@ -304,7 +300,7 @@ func (s *Receiver) PingDataconn(ctx context.Context, req *pdu.PingReq) (*pdu.Pin
return s.Ping(ctx, req) return s.Ping(ctx, req)
} }
func (s *Receiver) WaitForConnectivity(ctx context.Context) (error) { func (s *Receiver) WaitForConnectivity(ctx context.Context) error {
return nil return nil
} }
@ -316,7 +312,6 @@ func (s *Receiver) Send(ctx context.Context, req *pdu.SendReq) (*pdu.SendRes, zf
return nil, nil, fmt.Errorf("receiver does not implement Send()") return nil, nil, fmt.Errorf("receiver does not implement Send()")
} }
func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs.StreamCopier) (*pdu.ReceiveRes, error) { func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs.StreamCopier) (*pdu.ReceiveRes, error) {
getLogger(ctx).Debug("incoming Receive") getLogger(ctx).Debug("incoming Receive")
defer receive.Close() defer receive.Close()
@ -357,25 +352,27 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs
return nil, err return nil, err
} }
needForceRecv := false var clearPlaceholderProperty bool
var recvOpts zfs.RecvOptions
props, err := zfs.ZFSGet(lp, []string{zfs.ZREPL_PLACEHOLDER_PROPERTY_NAME}) props, err := zfs.ZFSGet(lp, []string{zfs.ZREPL_PLACEHOLDER_PROPERTY_NAME})
if err == nil { if err == nil {
if isPlaceholder, _ := zfs.IsPlaceholder(lp, props.Get(zfs.ZREPL_PLACEHOLDER_PROPERTY_NAME)); isPlaceholder { if isPlaceholder, _ := zfs.IsPlaceholder(lp, props.Get(zfs.ZREPL_PLACEHOLDER_PROPERTY_NAME)); isPlaceholder {
needForceRecv = true recvOpts.RollbackAndForceRecv = true
clearPlaceholderProperty = true
}
}
if clearPlaceholderProperty {
if err := zfs.ZFSSetNoPlaceholder(lp); err != nil {
return nil, fmt.Errorf("cannot clear placeholder property for forced receive: %s", err)
} }
} }
args := make([]string, 0, 1) getLogger(ctx).WithField("opts", fmt.Sprintf("%#v", recvOpts)).Debug("start receive command")
if needForceRecv {
args = append(args, "-F")
}
getLogger(ctx).Debug("start receive command") if err := zfs.ZFSRecv(ctx, lp.ToString(), receive, recvOpts); err != nil {
if err := zfs.ZFSRecv(ctx, lp.ToString(), receive, args...); err != nil {
getLogger(ctx). getLogger(ctx).
WithError(err). WithError(err).
WithField("args", args). WithField("opts", recvOpts).
Error("zfs receive failed") Error("zfs receive failed")
return nil, err return nil, err
} }

View File

@ -43,7 +43,7 @@ func (x FilesystemVersion_VersionType) String() string {
return proto.EnumName(FilesystemVersion_VersionType_name, int32(x)) return proto.EnumName(FilesystemVersion_VersionType_name, int32(x))
} }
func (FilesystemVersion_VersionType) EnumDescriptor() ([]byte, []int) { func (FilesystemVersion_VersionType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_pdu_759e477f62b58e58, []int{5, 0} return fileDescriptor_pdu_83b7e2a28d820622, []int{5, 0}
} }
type ListFilesystemReq struct { type ListFilesystemReq struct {
@ -56,7 +56,7 @@ func (m *ListFilesystemReq) Reset() { *m = ListFilesystemReq{} }
func (m *ListFilesystemReq) String() string { return proto.CompactTextString(m) } func (m *ListFilesystemReq) String() string { return proto.CompactTextString(m) }
func (*ListFilesystemReq) ProtoMessage() {} func (*ListFilesystemReq) ProtoMessage() {}
func (*ListFilesystemReq) Descriptor() ([]byte, []int) { func (*ListFilesystemReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_759e477f62b58e58, []int{0} return fileDescriptor_pdu_83b7e2a28d820622, []int{0}
} }
func (m *ListFilesystemReq) XXX_Unmarshal(b []byte) error { func (m *ListFilesystemReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListFilesystemReq.Unmarshal(m, b) return xxx_messageInfo_ListFilesystemReq.Unmarshal(m, b)
@ -78,7 +78,6 @@ var xxx_messageInfo_ListFilesystemReq proto.InternalMessageInfo
type ListFilesystemRes struct { type ListFilesystemRes struct {
Filesystems []*Filesystem `protobuf:"bytes,1,rep,name=Filesystems,proto3" json:"Filesystems,omitempty"` Filesystems []*Filesystem `protobuf:"bytes,1,rep,name=Filesystems,proto3" json:"Filesystems,omitempty"`
Empty bool `protobuf:"varint,2,opt,name=Empty,proto3" json:"Empty,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@ -88,7 +87,7 @@ func (m *ListFilesystemRes) Reset() { *m = ListFilesystemRes{} }
func (m *ListFilesystemRes) String() string { return proto.CompactTextString(m) } func (m *ListFilesystemRes) String() string { return proto.CompactTextString(m) }
func (*ListFilesystemRes) ProtoMessage() {} func (*ListFilesystemRes) ProtoMessage() {}
func (*ListFilesystemRes) Descriptor() ([]byte, []int) { func (*ListFilesystemRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_759e477f62b58e58, []int{1} return fileDescriptor_pdu_83b7e2a28d820622, []int{1}
} }
func (m *ListFilesystemRes) XXX_Unmarshal(b []byte) error { func (m *ListFilesystemRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListFilesystemRes.Unmarshal(m, b) return xxx_messageInfo_ListFilesystemRes.Unmarshal(m, b)
@ -115,16 +114,10 @@ func (m *ListFilesystemRes) GetFilesystems() []*Filesystem {
return nil return nil
} }
func (m *ListFilesystemRes) GetEmpty() bool {
if m != nil {
return m.Empty
}
return false
}
type Filesystem struct { type Filesystem struct {
Path string `protobuf:"bytes,1,opt,name=Path,proto3" json:"Path,omitempty"` Path string `protobuf:"bytes,1,opt,name=Path,proto3" json:"Path,omitempty"`
ResumeToken string `protobuf:"bytes,2,opt,name=ResumeToken,proto3" json:"ResumeToken,omitempty"` ResumeToken string `protobuf:"bytes,2,opt,name=ResumeToken,proto3" json:"ResumeToken,omitempty"`
IsPlaceholder bool `protobuf:"varint,3,opt,name=IsPlaceholder,proto3" json:"IsPlaceholder,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@ -134,7 +127,7 @@ func (m *Filesystem) Reset() { *m = Filesystem{} }
func (m *Filesystem) String() string { return proto.CompactTextString(m) } func (m *Filesystem) String() string { return proto.CompactTextString(m) }
func (*Filesystem) ProtoMessage() {} func (*Filesystem) ProtoMessage() {}
func (*Filesystem) Descriptor() ([]byte, []int) { func (*Filesystem) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_759e477f62b58e58, []int{2} return fileDescriptor_pdu_83b7e2a28d820622, []int{2}
} }
func (m *Filesystem) XXX_Unmarshal(b []byte) error { func (m *Filesystem) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Filesystem.Unmarshal(m, b) return xxx_messageInfo_Filesystem.Unmarshal(m, b)
@ -168,6 +161,13 @@ func (m *Filesystem) GetResumeToken() string {
return "" return ""
} }
func (m *Filesystem) GetIsPlaceholder() bool {
if m != nil {
return m.IsPlaceholder
}
return false
}
type ListFilesystemVersionsReq struct { type ListFilesystemVersionsReq struct {
Filesystem string `protobuf:"bytes,1,opt,name=Filesystem,proto3" json:"Filesystem,omitempty"` Filesystem string `protobuf:"bytes,1,opt,name=Filesystem,proto3" json:"Filesystem,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
@ -179,7 +179,7 @@ func (m *ListFilesystemVersionsReq) Reset() { *m = ListFilesystemVersion
func (m *ListFilesystemVersionsReq) String() string { return proto.CompactTextString(m) } func (m *ListFilesystemVersionsReq) String() string { return proto.CompactTextString(m) }
func (*ListFilesystemVersionsReq) ProtoMessage() {} func (*ListFilesystemVersionsReq) ProtoMessage() {}
func (*ListFilesystemVersionsReq) Descriptor() ([]byte, []int) { func (*ListFilesystemVersionsReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_759e477f62b58e58, []int{3} return fileDescriptor_pdu_83b7e2a28d820622, []int{3}
} }
func (m *ListFilesystemVersionsReq) XXX_Unmarshal(b []byte) error { func (m *ListFilesystemVersionsReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListFilesystemVersionsReq.Unmarshal(m, b) return xxx_messageInfo_ListFilesystemVersionsReq.Unmarshal(m, b)
@ -217,7 +217,7 @@ func (m *ListFilesystemVersionsRes) Reset() { *m = ListFilesystemVersion
func (m *ListFilesystemVersionsRes) String() string { return proto.CompactTextString(m) } func (m *ListFilesystemVersionsRes) String() string { return proto.CompactTextString(m) }
func (*ListFilesystemVersionsRes) ProtoMessage() {} func (*ListFilesystemVersionsRes) ProtoMessage() {}
func (*ListFilesystemVersionsRes) Descriptor() ([]byte, []int) { func (*ListFilesystemVersionsRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_759e477f62b58e58, []int{4} return fileDescriptor_pdu_83b7e2a28d820622, []int{4}
} }
func (m *ListFilesystemVersionsRes) XXX_Unmarshal(b []byte) error { func (m *ListFilesystemVersionsRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListFilesystemVersionsRes.Unmarshal(m, b) return xxx_messageInfo_ListFilesystemVersionsRes.Unmarshal(m, b)
@ -259,7 +259,7 @@ func (m *FilesystemVersion) Reset() { *m = FilesystemVersion{} }
func (m *FilesystemVersion) String() string { return proto.CompactTextString(m) } func (m *FilesystemVersion) String() string { return proto.CompactTextString(m) }
func (*FilesystemVersion) ProtoMessage() {} func (*FilesystemVersion) ProtoMessage() {}
func (*FilesystemVersion) Descriptor() ([]byte, []int) { func (*FilesystemVersion) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_759e477f62b58e58, []int{5} return fileDescriptor_pdu_83b7e2a28d820622, []int{5}
} }
func (m *FilesystemVersion) XXX_Unmarshal(b []byte) error { func (m *FilesystemVersion) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_FilesystemVersion.Unmarshal(m, b) return xxx_messageInfo_FilesystemVersion.Unmarshal(m, b)
@ -339,7 +339,7 @@ func (m *SendReq) Reset() { *m = SendReq{} }
func (m *SendReq) String() string { return proto.CompactTextString(m) } func (m *SendReq) String() string { return proto.CompactTextString(m) }
func (*SendReq) ProtoMessage() {} func (*SendReq) ProtoMessage() {}
func (*SendReq) Descriptor() ([]byte, []int) { func (*SendReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_759e477f62b58e58, []int{6} return fileDescriptor_pdu_83b7e2a28d820622, []int{6}
} }
func (m *SendReq) XXX_Unmarshal(b []byte) error { func (m *SendReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SendReq.Unmarshal(m, b) return xxx_messageInfo_SendReq.Unmarshal(m, b)
@ -420,7 +420,7 @@ func (m *Property) Reset() { *m = Property{} }
func (m *Property) String() string { return proto.CompactTextString(m) } func (m *Property) String() string { return proto.CompactTextString(m) }
func (*Property) ProtoMessage() {} func (*Property) ProtoMessage() {}
func (*Property) Descriptor() ([]byte, []int) { func (*Property) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_759e477f62b58e58, []int{7} return fileDescriptor_pdu_83b7e2a28d820622, []int{7}
} }
func (m *Property) XXX_Unmarshal(b []byte) error { func (m *Property) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Property.Unmarshal(m, b) return xxx_messageInfo_Property.Unmarshal(m, b)
@ -470,7 +470,7 @@ func (m *SendRes) Reset() { *m = SendRes{} }
func (m *SendRes) String() string { return proto.CompactTextString(m) } func (m *SendRes) String() string { return proto.CompactTextString(m) }
func (*SendRes) ProtoMessage() {} func (*SendRes) ProtoMessage() {}
func (*SendRes) Descriptor() ([]byte, []int) { func (*SendRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_759e477f62b58e58, []int{8} return fileDescriptor_pdu_83b7e2a28d820622, []int{8}
} }
func (m *SendRes) XXX_Unmarshal(b []byte) error { func (m *SendRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SendRes.Unmarshal(m, b) return xxx_messageInfo_SendRes.Unmarshal(m, b)
@ -524,7 +524,7 @@ func (m *ReceiveReq) Reset() { *m = ReceiveReq{} }
func (m *ReceiveReq) String() string { return proto.CompactTextString(m) } func (m *ReceiveReq) String() string { return proto.CompactTextString(m) }
func (*ReceiveReq) ProtoMessage() {} func (*ReceiveReq) ProtoMessage() {}
func (*ReceiveReq) Descriptor() ([]byte, []int) { func (*ReceiveReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_759e477f62b58e58, []int{9} return fileDescriptor_pdu_83b7e2a28d820622, []int{9}
} }
func (m *ReceiveReq) XXX_Unmarshal(b []byte) error { func (m *ReceiveReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReceiveReq.Unmarshal(m, b) return xxx_messageInfo_ReceiveReq.Unmarshal(m, b)
@ -568,7 +568,7 @@ func (m *ReceiveRes) Reset() { *m = ReceiveRes{} }
func (m *ReceiveRes) String() string { return proto.CompactTextString(m) } func (m *ReceiveRes) String() string { return proto.CompactTextString(m) }
func (*ReceiveRes) ProtoMessage() {} func (*ReceiveRes) ProtoMessage() {}
func (*ReceiveRes) Descriptor() ([]byte, []int) { func (*ReceiveRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_759e477f62b58e58, []int{10} return fileDescriptor_pdu_83b7e2a28d820622, []int{10}
} }
func (m *ReceiveRes) XXX_Unmarshal(b []byte) error { func (m *ReceiveRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReceiveRes.Unmarshal(m, b) return xxx_messageInfo_ReceiveRes.Unmarshal(m, b)
@ -601,7 +601,7 @@ func (m *DestroySnapshotsReq) Reset() { *m = DestroySnapshotsReq{} }
func (m *DestroySnapshotsReq) String() string { return proto.CompactTextString(m) } func (m *DestroySnapshotsReq) String() string { return proto.CompactTextString(m) }
func (*DestroySnapshotsReq) ProtoMessage() {} func (*DestroySnapshotsReq) ProtoMessage() {}
func (*DestroySnapshotsReq) Descriptor() ([]byte, []int) { func (*DestroySnapshotsReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_759e477f62b58e58, []int{11} return fileDescriptor_pdu_83b7e2a28d820622, []int{11}
} }
func (m *DestroySnapshotsReq) XXX_Unmarshal(b []byte) error { func (m *DestroySnapshotsReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DestroySnapshotsReq.Unmarshal(m, b) return xxx_messageInfo_DestroySnapshotsReq.Unmarshal(m, b)
@ -647,7 +647,7 @@ func (m *DestroySnapshotRes) Reset() { *m = DestroySnapshotRes{} }
func (m *DestroySnapshotRes) String() string { return proto.CompactTextString(m) } func (m *DestroySnapshotRes) String() string { return proto.CompactTextString(m) }
func (*DestroySnapshotRes) ProtoMessage() {} func (*DestroySnapshotRes) ProtoMessage() {}
func (*DestroySnapshotRes) Descriptor() ([]byte, []int) { func (*DestroySnapshotRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_759e477f62b58e58, []int{12} return fileDescriptor_pdu_83b7e2a28d820622, []int{12}
} }
func (m *DestroySnapshotRes) XXX_Unmarshal(b []byte) error { func (m *DestroySnapshotRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DestroySnapshotRes.Unmarshal(m, b) return xxx_messageInfo_DestroySnapshotRes.Unmarshal(m, b)
@ -692,7 +692,7 @@ func (m *DestroySnapshotsRes) Reset() { *m = DestroySnapshotsRes{} }
func (m *DestroySnapshotsRes) String() string { return proto.CompactTextString(m) } func (m *DestroySnapshotsRes) String() string { return proto.CompactTextString(m) }
func (*DestroySnapshotsRes) ProtoMessage() {} func (*DestroySnapshotsRes) ProtoMessage() {}
func (*DestroySnapshotsRes) Descriptor() ([]byte, []int) { func (*DestroySnapshotsRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_759e477f62b58e58, []int{13} return fileDescriptor_pdu_83b7e2a28d820622, []int{13}
} }
func (m *DestroySnapshotsRes) XXX_Unmarshal(b []byte) error { func (m *DestroySnapshotsRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DestroySnapshotsRes.Unmarshal(m, b) return xxx_messageInfo_DestroySnapshotsRes.Unmarshal(m, b)
@ -734,7 +734,7 @@ func (m *ReplicationCursorReq) Reset() { *m = ReplicationCursorReq{} }
func (m *ReplicationCursorReq) String() string { return proto.CompactTextString(m) } func (m *ReplicationCursorReq) String() string { return proto.CompactTextString(m) }
func (*ReplicationCursorReq) ProtoMessage() {} func (*ReplicationCursorReq) ProtoMessage() {}
func (*ReplicationCursorReq) Descriptor() ([]byte, []int) { func (*ReplicationCursorReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_759e477f62b58e58, []int{14} return fileDescriptor_pdu_83b7e2a28d820622, []int{14}
} }
func (m *ReplicationCursorReq) XXX_Unmarshal(b []byte) error { func (m *ReplicationCursorReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReplicationCursorReq.Unmarshal(m, b) return xxx_messageInfo_ReplicationCursorReq.Unmarshal(m, b)
@ -882,7 +882,7 @@ func (m *ReplicationCursorReq_GetOp) Reset() { *m = ReplicationCursorReq
func (m *ReplicationCursorReq_GetOp) String() string { return proto.CompactTextString(m) } func (m *ReplicationCursorReq_GetOp) String() string { return proto.CompactTextString(m) }
func (*ReplicationCursorReq_GetOp) ProtoMessage() {} func (*ReplicationCursorReq_GetOp) ProtoMessage() {}
func (*ReplicationCursorReq_GetOp) Descriptor() ([]byte, []int) { func (*ReplicationCursorReq_GetOp) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_759e477f62b58e58, []int{14, 0} return fileDescriptor_pdu_83b7e2a28d820622, []int{14, 0}
} }
func (m *ReplicationCursorReq_GetOp) XXX_Unmarshal(b []byte) error { func (m *ReplicationCursorReq_GetOp) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReplicationCursorReq_GetOp.Unmarshal(m, b) return xxx_messageInfo_ReplicationCursorReq_GetOp.Unmarshal(m, b)
@ -913,7 +913,7 @@ func (m *ReplicationCursorReq_SetOp) Reset() { *m = ReplicationCursorReq
func (m *ReplicationCursorReq_SetOp) String() string { return proto.CompactTextString(m) } func (m *ReplicationCursorReq_SetOp) String() string { return proto.CompactTextString(m) }
func (*ReplicationCursorReq_SetOp) ProtoMessage() {} func (*ReplicationCursorReq_SetOp) ProtoMessage() {}
func (*ReplicationCursorReq_SetOp) Descriptor() ([]byte, []int) { func (*ReplicationCursorReq_SetOp) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_759e477f62b58e58, []int{14, 1} return fileDescriptor_pdu_83b7e2a28d820622, []int{14, 1}
} }
func (m *ReplicationCursorReq_SetOp) XXX_Unmarshal(b []byte) error { func (m *ReplicationCursorReq_SetOp) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReplicationCursorReq_SetOp.Unmarshal(m, b) return xxx_messageInfo_ReplicationCursorReq_SetOp.Unmarshal(m, b)
@ -954,7 +954,7 @@ func (m *ReplicationCursorRes) Reset() { *m = ReplicationCursorRes{} }
func (m *ReplicationCursorRes) String() string { return proto.CompactTextString(m) } func (m *ReplicationCursorRes) String() string { return proto.CompactTextString(m) }
func (*ReplicationCursorRes) ProtoMessage() {} func (*ReplicationCursorRes) ProtoMessage() {}
func (*ReplicationCursorRes) Descriptor() ([]byte, []int) { func (*ReplicationCursorRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_759e477f62b58e58, []int{15} return fileDescriptor_pdu_83b7e2a28d820622, []int{15}
} }
func (m *ReplicationCursorRes) XXX_Unmarshal(b []byte) error { func (m *ReplicationCursorRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReplicationCursorRes.Unmarshal(m, b) return xxx_messageInfo_ReplicationCursorRes.Unmarshal(m, b)
@ -1090,7 +1090,7 @@ func (m *PingReq) Reset() { *m = PingReq{} }
func (m *PingReq) String() string { return proto.CompactTextString(m) } func (m *PingReq) String() string { return proto.CompactTextString(m) }
func (*PingReq) ProtoMessage() {} func (*PingReq) ProtoMessage() {}
func (*PingReq) Descriptor() ([]byte, []int) { func (*PingReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_759e477f62b58e58, []int{16} return fileDescriptor_pdu_83b7e2a28d820622, []int{16}
} }
func (m *PingReq) XXX_Unmarshal(b []byte) error { func (m *PingReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_PingReq.Unmarshal(m, b) return xxx_messageInfo_PingReq.Unmarshal(m, b)
@ -1129,7 +1129,7 @@ func (m *PingRes) Reset() { *m = PingRes{} }
func (m *PingRes) String() string { return proto.CompactTextString(m) } func (m *PingRes) String() string { return proto.CompactTextString(m) }
func (*PingRes) ProtoMessage() {} func (*PingRes) ProtoMessage() {}
func (*PingRes) Descriptor() ([]byte, []int) { func (*PingRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_759e477f62b58e58, []int{17} return fileDescriptor_pdu_83b7e2a28d820622, []int{17}
} }
func (m *PingRes) XXX_Unmarshal(b []byte) error { func (m *PingRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_PingRes.Unmarshal(m, b) return xxx_messageInfo_PingRes.Unmarshal(m, b)
@ -1384,57 +1384,58 @@ var _Replication_serviceDesc = grpc.ServiceDesc{
Metadata: "pdu.proto", Metadata: "pdu.proto",
} }
func init() { proto.RegisterFile("pdu.proto", fileDescriptor_pdu_759e477f62b58e58) } func init() { proto.RegisterFile("pdu.proto", fileDescriptor_pdu_83b7e2a28d820622) }
var fileDescriptor_pdu_759e477f62b58e58 = []byte{ var fileDescriptor_pdu_83b7e2a28d820622 = []byte{
// 779 bytes of a gzipped FileDescriptorProto // 785 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0x51, 0x8f, 0xe2, 0x36, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0xd1, 0x8e, 0xe3, 0x34,
0x10, 0xde, 0x40, 0x80, 0x30, 0xac, 0xee, 0x58, 0x2f, 0x3d, 0xa5, 0x69, 0x7b, 0x42, 0xbe, 0x17, 0x14, 0x9d, 0xb4, 0x69, 0x9b, 0xde, 0x0e, 0xbb, 0x1d, 0x4f, 0x59, 0x85, 0x00, 0xab, 0xca, 0xcb,
0xae, 0x52, 0xd3, 0x8a, 0xf6, 0xa5, 0xaa, 0x54, 0xa9, 0x2c, 0x7b, 0xbb, 0x52, 0xdb, 0x2b, 0x32, 0x43, 0x17, 0x89, 0x80, 0x0a, 0x2f, 0x08, 0x09, 0x89, 0x4e, 0x67, 0x67, 0x10, 0xb0, 0x54, 0x6e,
0xf4, 0xb4, 0xba, 0xb7, 0x14, 0x46, 0x6c, 0xb4, 0x80, 0x73, 0xb6, 0x53, 0x1d, 0x7d, 0xec, 0xbf, 0x59, 0xad, 0xf6, 0x2d, 0x34, 0x57, 0x6d, 0x34, 0x6d, 0x9d, 0xb5, 0x13, 0xb4, 0xe5, 0x91, 0xbf,
0xda, 0xff, 0xd0, 0xc7, 0xfe, 0xa0, 0x93, 0x4d, 0x1c, 0xb2, 0x24, 0x48, 0x3c, 0xc5, 0xdf, 0xe7, 0x9a, 0x7f, 0xe0, 0x91, 0x0f, 0x42, 0x76, 0xe3, 0x34, 0x6d, 0x52, 0xa9, 0x4f, 0xf1, 0x39, 0xf7,
0xcf, 0xe3, 0x99, 0xf1, 0xcc, 0x04, 0xda, 0xc9, 0x22, 0x0d, 0x13, 0xc1, 0x15, 0xa7, 0x97, 0x70, 0xda, 0x3e, 0xf7, 0xd8, 0xd7, 0x81, 0x76, 0x1c, 0xa6, 0x7e, 0x2c, 0x78, 0xc2, 0xe9, 0x35, 0x5c,
0xf1, 0x5b, 0x2c, 0xd5, 0x9b, 0x78, 0x85, 0x72, 0x2b, 0x15, 0xae, 0x19, 0x7e, 0xa0, 0x77, 0x65, 0xfd, 0x1a, 0xc9, 0xe4, 0x55, 0xb4, 0x42, 0xb9, 0x95, 0x09, 0xae, 0x19, 0xbe, 0xa7, 0xa3, 0x32,
0x52, 0x92, 0x6f, 0xa0, 0xb3, 0x27, 0xa4, 0xef, 0xf4, 0xeb, 0x83, 0xce, 0xb0, 0x13, 0x16, 0x44, 0x29, 0xc9, 0x57, 0xd0, 0xd9, 0x13, 0xd2, 0xb5, 0xfa, 0xf5, 0x41, 0x67, 0xd8, 0xf1, 0x0b, 0x49,
0xc5, 0x7d, 0xd2, 0x83, 0xc6, 0xf5, 0x3a, 0x51, 0x5b, 0xbf, 0xd6, 0x77, 0x06, 0x1e, 0xdb, 0x01, 0xc5, 0x38, 0x5d, 0x02, 0xec, 0x21, 0x21, 0x60, 0x4f, 0x82, 0x64, 0xe9, 0x5a, 0x7d, 0x6b, 0xd0,
0x3a, 0x02, 0xd8, 0x8b, 0x08, 0x01, 0x77, 0x12, 0xa9, 0x7b, 0xdf, 0xe9, 0x3b, 0x83, 0x36, 0x33, 0x66, 0x7a, 0x4c, 0xfa, 0xd0, 0x61, 0x28, 0xd3, 0x35, 0xce, 0xf8, 0x03, 0x6e, 0xdc, 0x9a, 0x0e,
0x6b, 0xd2, 0x87, 0x0e, 0x43, 0x99, 0xae, 0x71, 0xc6, 0x1f, 0x70, 0x63, 0x4e, 0xb7, 0x59, 0x91, 0x15, 0x29, 0xf2, 0x05, 0x7c, 0xf4, 0xb3, 0x9c, 0xac, 0x82, 0x39, 0x2e, 0xf9, 0x2a, 0x44, 0xe1,
0xa2, 0x3f, 0xc1, 0xe7, 0x4f, 0xbd, 0x7b, 0x87, 0x42, 0xc6, 0x7c, 0x23, 0x19, 0x7e, 0x20, 0x2f, 0xd6, 0xfb, 0xd6, 0xc0, 0x61, 0x87, 0x24, 0xfd, 0x01, 0x3e, 0x39, 0x54, 0xfb, 0x06, 0x85, 0x8c,
0x8b, 0x17, 0x64, 0x86, 0x0b, 0x0c, 0xfd, 0xf5, 0xf8, 0x61, 0x49, 0x42, 0xf0, 0x2c, 0xcc, 0xe2, 0xf8, 0x46, 0x32, 0x7c, 0x4f, 0x9e, 0x17, 0x65, 0x64, 0xdb, 0x17, 0x18, 0xfa, 0xcb, 0xe9, 0xc9,
0x23, 0x61, 0x49, 0xc9, 0x72, 0x0d, 0xfd, 0xdf, 0x81, 0x8b, 0xd2, 0x3e, 0x19, 0x82, 0x3b, 0xdb, 0x92, 0xf8, 0xe0, 0x18, 0x98, 0xd5, 0x4b, 0xfc, 0x52, 0x26, 0xcb, 0x73, 0xe8, 0x7f, 0x16, 0x5c,
0x26, 0x68, 0x2e, 0x7f, 0x36, 0x7c, 0x59, 0xb6, 0x10, 0x66, 0x5f, 0xad, 0x62, 0x46, 0xab, 0x33, 0x95, 0xe2, 0x64, 0x08, 0xf6, 0x6c, 0x1b, 0xa3, 0xde, 0xfc, 0xc9, 0xf0, 0x79, 0x79, 0x05, 0x3f,
0xf1, 0x36, 0x5a, 0x63, 0x16, 0xae, 0x59, 0x6b, 0xee, 0x26, 0x8d, 0x17, 0x7e, 0xbd, 0xef, 0x0c, 0xfb, 0xaa, 0x2c, 0xa6, 0x73, 0x95, 0x5f, 0xaf, 0x83, 0x35, 0x66, 0xa6, 0xe8, 0xb1, 0xe2, 0xee,
0x5c, 0x66, 0xd6, 0xe4, 0x4b, 0x68, 0x5f, 0x09, 0x8c, 0x14, 0xce, 0xee, 0x6e, 0x7c, 0xd7, 0x6c, 0xd2, 0x28, 0xd4, 0x26, 0xd8, 0x4c, 0x8f, 0xc9, 0x67, 0xd0, 0xbe, 0x11, 0x18, 0x24, 0x38, 0x7b,
0xec, 0x09, 0x12, 0x80, 0x67, 0x40, 0xcc, 0x37, 0x7e, 0xc3, 0x58, 0xca, 0x31, 0x7d, 0x0d, 0x9d, 0x7b, 0xe7, 0xda, 0x3a, 0xb0, 0x27, 0x88, 0x07, 0x8e, 0x06, 0x11, 0xdf, 0xb8, 0x0d, 0xbd, 0x52,
0xc2, 0xb5, 0xe4, 0x1c, 0xbc, 0xe9, 0x26, 0x4a, 0xe4, 0x3d, 0x57, 0xdd, 0x33, 0x8d, 0x46, 0x9c, 0x8e, 0xe9, 0x4b, 0xe8, 0x14, 0xb6, 0x25, 0x97, 0xe0, 0x4c, 0x37, 0x41, 0x2c, 0x97, 0x3c, 0xe9,
0x3f, 0xac, 0x23, 0xf1, 0xd0, 0x75, 0xe8, 0xa3, 0x03, 0xad, 0x29, 0x6e, 0x16, 0x27, 0xe4, 0x53, 0x5e, 0x28, 0x34, 0xe2, 0xfc, 0x61, 0x1d, 0x88, 0x87, 0xae, 0x45, 0x1f, 0x2d, 0x68, 0x4d, 0x71,
0x3b, 0xf9, 0x46, 0xf0, 0xb5, 0x75, 0x5c, 0xaf, 0xc9, 0x33, 0xa8, 0xcd, 0xb8, 0x71, 0xbb, 0xcd, 0x13, 0x9e, 0xe1, 0xa7, 0x12, 0xf9, 0x4a, 0xf0, 0xb5, 0x11, 0xae, 0xc6, 0xe4, 0x09, 0xd4, 0x66,
0x6a, 0x33, 0x7e, 0xf8, 0xa4, 0x6e, 0xe9, 0x49, 0x8d, 0xe3, 0x7c, 0x9d, 0x08, 0x94, 0xd2, 0x38, 0x5c, 0xcb, 0x6e, 0xb3, 0xda, 0x8c, 0x1f, 0x1f, 0xbc, 0x5d, 0x3e, 0x78, 0x25, 0x9c, 0xaf, 0x63,
0xee, 0xb1, 0x1c, 0xeb, 0x42, 0x1a, 0xe3, 0x22, 0x4d, 0xfc, 0xe6, 0xae, 0x90, 0x0c, 0x20, 0x2f, 0x81, 0x52, 0x6a, 0xe1, 0x0e, 0xcb, 0x31, 0xe9, 0x41, 0x63, 0x8c, 0x61, 0x1a, 0xbb, 0x4d, 0x1d,
0xa0, 0x39, 0x16, 0x5b, 0x96, 0x6e, 0xfc, 0x96, 0xa1, 0x33, 0x44, 0x7f, 0x00, 0x6f, 0x22, 0x78, 0xd8, 0x01, 0xf2, 0x0c, 0x9a, 0x63, 0xb1, 0x65, 0xe9, 0xc6, 0x6d, 0x69, 0x3a, 0x43, 0xf4, 0x3b,
0x82, 0x42, 0x6d, 0xf3, 0xa4, 0x3a, 0x85, 0xa4, 0xf6, 0xa0, 0xf1, 0x2e, 0x5a, 0xa5, 0x36, 0xd3, 0x70, 0x26, 0x82, 0xc7, 0x28, 0x92, 0x6d, 0x6e, 0xaa, 0x55, 0x30, 0xb5, 0x07, 0x8d, 0x37, 0xc1,
0x3b, 0x40, 0xff, 0xcd, 0x23, 0x96, 0x64, 0x00, 0xcf, 0xff, 0x94, 0xb8, 0x38, 0x2c, 0x42, 0x8f, 0x2a, 0x35, 0x4e, 0xef, 0x00, 0xfd, 0x27, 0xaf, 0x58, 0x92, 0x01, 0x3c, 0xfd, 0x43, 0x62, 0x78,
0x1d, 0xd2, 0x84, 0xc2, 0xf9, 0xf5, 0xc7, 0x04, 0xe7, 0x0a, 0x17, 0xd3, 0xf8, 0x1f, 0x34, 0x11, 0x7c, 0x55, 0x1d, 0x76, 0x4c, 0x13, 0x0a, 0x97, 0xb7, 0x1f, 0x62, 0x9c, 0x27, 0x18, 0x4e, 0xa3,
0xd7, 0xd9, 0x13, 0x8e, 0xbc, 0x06, 0xc8, 0xfc, 0x89, 0x51, 0xfa, 0xae, 0x29, 0xaa, 0x76, 0x68, 0xbf, 0x51, 0x57, 0x5c, 0x67, 0x07, 0x1c, 0x79, 0x09, 0x90, 0xe9, 0x89, 0x50, 0xba, 0xb6, 0xbe,
0x5d, 0x64, 0x85, 0x4d, 0x7a, 0x07, 0xc0, 0x70, 0x8e, 0xf1, 0xdf, 0x78, 0x4a, 0xe2, 0xbf, 0x86, 0x54, 0x6d, 0xdf, 0x48, 0x64, 0x85, 0x20, 0x7d, 0x0b, 0xc0, 0x70, 0x8e, 0xd1, 0x5f, 0x78, 0x8e,
0xee, 0xd5, 0x0a, 0x23, 0x51, 0xf6, 0xb3, 0xc4, 0xd3, 0xf3, 0x82, 0x65, 0x49, 0x97, 0x70, 0x39, 0xf1, 0x5f, 0x42, 0xf7, 0x66, 0x85, 0x81, 0x28, 0xeb, 0x2c, 0xf1, 0xf4, 0xb2, 0xb0, 0xb2, 0xa4,
0x46, 0xa9, 0x04, 0xdf, 0xda, 0x0a, 0x38, 0xa5, 0x73, 0xc8, 0x77, 0xd0, 0xce, 0xf5, 0x7e, 0xed, 0x0b, 0xb8, 0x1e, 0xa3, 0x4c, 0x04, 0xdf, 0x9a, 0x1b, 0x70, 0x4e, 0xe7, 0x90, 0x6f, 0xa0, 0x9d,
0x68, 0x77, 0xec, 0x45, 0xf4, 0x3d, 0x90, 0x83, 0x8b, 0xb2, 0x26, 0xb3, 0xd0, 0xdc, 0x72, 0xa4, 0xe7, 0xbb, 0xb5, 0x93, 0xdd, 0xb1, 0x4f, 0xa2, 0xef, 0x80, 0x1c, 0x6d, 0x94, 0x35, 0x99, 0x81,
0xc9, 0xac, 0xc6, 0x0c, 0x12, 0x21, 0xb8, 0xb0, 0x2f, 0x66, 0x00, 0x1d, 0x57, 0x05, 0xa1, 0x87, 0x7a, 0x97, 0x13, 0x4d, 0x66, 0x72, 0xd4, 0x89, 0xdd, 0x0a, 0xc1, 0x85, 0x39, 0x31, 0x0d, 0xe8,
0x54, 0x4b, 0x07, 0xbe, 0x52, 0xb6, 0x81, 0x2f, 0xc3, 0xb2, 0x0b, 0xcc, 0x6a, 0xe8, 0x7f, 0x0e, 0xb8, 0xaa, 0x08, 0xf5, 0x68, 0xb5, 0x54, 0xe1, 0xab, 0xc4, 0x34, 0xf0, 0xb5, 0x5f, 0x96, 0xc0,
0xf4, 0x18, 0x26, 0xab, 0x78, 0x6e, 0x9a, 0xe4, 0x2a, 0x15, 0x92, 0x8b, 0x53, 0x92, 0xf1, 0x2d, 0x4c, 0x0e, 0xfd, 0xd7, 0x82, 0x1e, 0xc3, 0x78, 0x15, 0xcd, 0x75, 0x93, 0xdc, 0xa4, 0x42, 0x72,
0xd4, 0x97, 0xa8, 0x8c, 0x4b, 0x9d, 0xe1, 0x17, 0x61, 0x95, 0x8d, 0xf0, 0x06, 0xd5, 0x1f, 0xc9, 0x71, 0x8e, 0x19, 0x5f, 0x43, 0x7d, 0x81, 0x89, 0x96, 0xd4, 0x19, 0x7e, 0xea, 0x57, 0xad, 0xe1,
0xed, 0x19, 0xd3, 0x4a, 0x7d, 0x40, 0xa2, 0x32, 0x25, 0x72, 0xf4, 0xc0, 0xd4, 0x1e, 0x90, 0xa8, 0xdf, 0x61, 0xf2, 0x7b, 0x7c, 0x7f, 0xc1, 0x54, 0xa6, 0x9a, 0x20, 0x31, 0xd1, 0x57, 0xe4, 0xe4,
0x82, 0x16, 0x34, 0x8c, 0x81, 0xe0, 0x15, 0x34, 0xcc, 0x86, 0x6e, 0x92, 0x3c, 0x71, 0xbb, 0x5c, 0x84, 0xa9, 0x99, 0x20, 0x31, 0xf1, 0x5a, 0xd0, 0xd0, 0x0b, 0x78, 0x2f, 0xa0, 0xa1, 0x03, 0xaa,
0xe4, 0x78, 0xe4, 0x42, 0x8d, 0x27, 0x74, 0x56, 0x19, 0x8d, 0x6e, 0xa1, 0xdd, 0x24, 0xd1, 0x71, 0x49, 0x72, 0xe3, 0x76, 0x5e, 0xe4, 0x78, 0x64, 0x43, 0x8d, 0xc7, 0x74, 0x56, 0x59, 0x8d, 0x6a,
0xb8, 0xb7, 0x67, 0xf9, 0x2c, 0xf1, 0xde, 0x72, 0x85, 0x1f, 0x63, 0xb9, 0xb3, 0xe7, 0xdd, 0x9e, 0xa1, 0xdd, 0x4b, 0xa2, 0xea, 0xb0, 0xef, 0x2f, 0xf2, 0xb7, 0xc4, 0x79, 0xcd, 0x13, 0xfc, 0x10,
0xb1, 0x9c, 0x19, 0x79, 0xd0, 0xdc, 0x65, 0x89, 0xbe, 0x82, 0xd6, 0x24, 0xde, 0x2c, 0x75, 0x5a, 0xc9, 0xdd, 0x7a, 0xce, 0xfd, 0x05, 0xcb, 0x99, 0x91, 0x03, 0xcd, 0x9d, 0x4b, 0xf4, 0x05, 0xb4,
0x7c, 0x68, 0xfd, 0x8e, 0x52, 0x46, 0x4b, 0xdb, 0x54, 0x16, 0xd2, 0xaf, 0xac, 0x48, 0xea, 0xb6, 0x26, 0xd1, 0x66, 0xa1, 0x6c, 0x71, 0xa1, 0xf5, 0x1b, 0x4a, 0x19, 0x2c, 0x4c, 0x53, 0x19, 0x48,
0xbb, 0x9e, 0xdf, 0x73, 0xdb, 0x76, 0x7a, 0x3d, 0x7c, 0xac, 0xe9, 0x19, 0x90, 0xbb, 0x46, 0x02, 0x3f, 0x37, 0x49, 0x52, 0xb5, 0xdd, 0xed, 0x7c, 0xc9, 0x4d, 0xdb, 0xa9, 0xf1, 0xf0, 0xb1, 0xa6,
0x70, 0xb5, 0x9c, 0x78, 0x61, 0x66, 0x3a, 0xb0, 0x2b, 0x49, 0x7e, 0x84, 0xe7, 0x4f, 0x47, 0xb4, 0xde, 0x80, 0x5c, 0x1a, 0xf1, 0xc0, 0x56, 0xe9, 0xc4, 0xf1, 0xb3, 0xa5, 0x3d, 0x33, 0x92, 0xe4,
0x24, 0x24, 0x2c, 0xfd, 0xa4, 0x82, 0x32, 0x27, 0xc9, 0x04, 0x5e, 0x54, 0x4f, 0x77, 0x12, 0x84, 0x7b, 0x78, 0x7a, 0xf8, 0x44, 0x4b, 0x42, 0xfc, 0xd2, 0x4f, 0xcb, 0x2b, 0x73, 0x92, 0x4c, 0xe0,
0x47, 0xff, 0x19, 0xc1, 0xf1, 0x3d, 0x49, 0x7e, 0x86, 0xee, 0x61, 0x9d, 0x91, 0x5e, 0x58, 0xd1, 0x59, 0xf5, 0xeb, 0x4e, 0x3c, 0xff, 0xe4, 0x3f, 0xc3, 0x3b, 0x1d, 0x93, 0xe4, 0x47, 0xe8, 0x1e,
0x3f, 0x41, 0x15, 0x2b, 0xc9, 0x2f, 0x70, 0x51, 0x7a, 0x12, 0xf2, 0x59, 0xe5, 0xfb, 0x07, 0x95, 0xdf, 0x33, 0xd2, 0xf3, 0x2b, 0xfa, 0xc7, 0xab, 0x62, 0x25, 0xf9, 0x09, 0xae, 0x4a, 0x47, 0x42,
0xb4, 0x1c, 0x35, 0xde, 0xd7, 0x93, 0x45, 0xfa, 0x57, 0xd3, 0xfc, 0xb0, 0xbf, 0xff, 0x14, 0x00, 0x3e, 0xae, 0x3c, 0x7f, 0xaf, 0x92, 0x96, 0xa3, 0xc6, 0xbb, 0x7a, 0x1c, 0xa6, 0x7f, 0x36, 0xf5,
0x00, 0xff, 0xff, 0xaf, 0xa9, 0xda, 0x41, 0xbd, 0x07, 0x00, 0x00, 0x0f, 0xfc, 0xdb, 0xff, 0x03, 0x00, 0x00, 0xff, 0xff, 0x37, 0x0e, 0xf2, 0xe4, 0xcd, 0x07, 0x00,
0x00,
} }

View File

@ -14,12 +14,12 @@ message ListFilesystemReq {}
message ListFilesystemRes { message ListFilesystemRes {
repeated Filesystem Filesystems = 1; repeated Filesystem Filesystems = 1;
bool Empty = 2;
} }
message Filesystem { message Filesystem {
string Path = 1; string Path = 1;
string ResumeToken = 2; string ResumeToken = 2;
bool IsPlaceholder = 3;
} }
message ListFilesystemVersionsReq { message ListFilesystemVersionsReq {

View File

@ -108,7 +108,7 @@ type Filesystem struct {
receiver Receiver receiver Receiver
Path string // compat Path string // compat
receiverFSExists bool // compat receiverFS *pdu.Filesystem
promBytesReplicated prometheus.Counter // compat promBytesReplicated prometheus.Counter // compat
} }
@ -237,10 +237,10 @@ func (p *Planner) doPlanning(ctx context.Context) ([]*Filesystem, error) {
q := make([]*Filesystem, 0, len(sfss)) q := make([]*Filesystem, 0, len(sfss))
for _, fs := range sfss { for _, fs := range sfss {
receiverFSExists := false var receiverFS *pdu.Filesystem
for _, rfs := range rfss { for _, rfs := range rfss {
if rfs.Path == fs.Path { if rfs.Path == fs.Path {
receiverFSExists = true receiverFS = rfs
} }
} }
@ -250,7 +250,7 @@ func (p *Planner) doPlanning(ctx context.Context) ([]*Filesystem, error) {
sender: p.sender, sender: p.sender,
receiver: p.receiver, receiver: p.receiver,
Path: fs.Path, Path: fs.Path,
receiverFSExists: receiverFSExists, receiverFS: receiverFS,
promBytesReplicated: ctr, promBytesReplicated: ctr,
}) })
} }
@ -278,7 +278,7 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) {
} }
var rfsvs []*pdu.FilesystemVersion var rfsvs []*pdu.FilesystemVersion
if fs.receiverFSExists { if fs.receiverFS != nil && !fs.receiverFS.GetIsPlaceholder() {
rfsvsres, err := fs.receiver.ListFilesystemVersions(ctx, &pdu.ListFilesystemVersionsReq{Filesystem: fs.Path}) rfsvsres, err := fs.receiver.ListFilesystemVersions(ctx, &pdu.ListFilesystemVersionsReq{Filesystem: fs.Path})
if err != nil { if err != nil {
log.WithError(err).Error("receiver error") log.WithError(err).Error("receiver error")
@ -301,7 +301,7 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) {
log.WithField("problem", msg).Error("cannot resolve conflict") log.WithField("problem", msg).Error("cannot resolve conflict")
} }
} }
if path == nil { if len(path) == 0 {
return nil, conflict return nil, conflict
} }

View File

@ -105,3 +105,9 @@ func ZFSCreatePlaceholderFilesystem(p *DatasetPath) (err error) {
return return
} }
func ZFSSetNoPlaceholder(p *DatasetPath) error {
props := NewZFSProperties()
props.Set(ZREPL_PLACEHOLDER_PROPERTY_NAME, "off")
return zfsSet(p.ToString(), props)
}

View File

@ -9,6 +9,7 @@ import (
"io" "io"
"os" "os"
"os/exec" "os/exec"
"sort"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -691,17 +692,62 @@ type StreamCopier interface {
Close() error Close() error
} }
type RecvOptions struct {
// Rollback to the oldest snapshot, destroy it, then perform `recv -F`.
// Note that this doesn't change property values, i.e. an existing local property value will be kept.
RollbackAndForceRecv bool
}
func ZFSRecv(ctx context.Context, fs string, streamCopier StreamCopier, additionalArgs ...string) (err error) { func ZFSRecv(ctx context.Context, fs string, streamCopier StreamCopier, opts RecvOptions) (err error) {
if err := validateZFSFilesystem(fs); err != nil { if err := validateZFSFilesystem(fs); err != nil {
return err return err
} }
fsdp, err := NewDatasetPath(fs)
if err != nil {
return err
}
if opts.RollbackAndForceRecv {
// destroy all snapshots before `recv -F` because `recv -F`
// does not perform a rollback unless `send -R` was used (which we assume hasn't been the case)
var snaps []FilesystemVersion
{
vs, err := ZFSListFilesystemVersions(fsdp, nil)
if err != nil {
err = fmt.Errorf("cannot list versions to rollback is required: %s", err)
}
for _, v := range vs {
if v.Type == Snapshot {
snaps = append(snaps, v)
}
}
sort.Slice(snaps, func(i, j int) bool {
return snaps[i].CreateTXG < snaps[j].CreateTXG
})
}
// bookmarks are rolled back automatically
if len(snaps) > 0 {
// use rollback to efficiently destroy all but the earliest snapshot
// then destroy that earliest snapshot
// afterwards, `recv -F` will work
rollbackTarget := snaps[0]
rollbackTargetAbs := rollbackTarget.ToAbsPath(fsdp)
debug("recv: rollback to %q", rollbackTargetAbs)
if err := ZFSRollback(fsdp, rollbackTarget, "-r"); err != nil {
return fmt.Errorf("cannot rollback %s to %s for forced receive: %s", fsdp.ToString(), rollbackTarget, err)
}
debug("recv: destroy %q", rollbackTargetAbs)
if err := ZFSDestroy(rollbackTargetAbs); err != nil {
return fmt.Errorf("cannot destroy %s for forced receive: %s", rollbackTargetAbs, err)
}
}
}
args := make([]string, 0) args := make([]string, 0)
args = append(args, "recv") args = append(args, "recv")
if len(args) > 0 { if opts.RollbackAndForceRecv {
args = append(args, additionalArgs...) args = append(args, "-F")
} }
args = append(args, fs) args = append(args, fs)
@ -1038,3 +1084,33 @@ func ZFSBookmark(fs *DatasetPath, snapshot, bookmark string) (err error) {
return return
} }
func ZFSRollback(fs *DatasetPath, snapshot FilesystemVersion, rollbackArgs ...string) (err error) {
snapabs := snapshot.ToAbsPath(fs)
if snapshot.Type != Snapshot {
return fmt.Errorf("can only rollback to snapshots, got %s", snapabs)
}
args := []string{"rollback"}
args = append(args, rollbackArgs...)
args = append(args, snapabs)
cmd := exec.Command(ZFS_BINARY, args...)
stderr := bytes.NewBuffer(make([]byte, 0, 1024))
cmd.Stderr = stderr
if err = cmd.Start(); err != nil {
return err
}
if err = cmd.Wait(); err != nil {
err = &ZFSError{
Stderr: stderr.Bytes(),
WaitErr: err,
}
}
return err
}