replication & pruning: ditch replicated-property, use bookmark as cursor instead

A bookmark with a well-known name is used to track which version was
last successfully received by the receiver.
The createtxg that can be retrieved from the bookmark using `zfs get` is
used to set the Replicated attribute of each snap on the sender:
If the snap's CreateTXG > the cursor's, it is not yet replicated,
otherwise it has been.

There is an optional config option to change the behvior to
`CreateTXG >= the cursor's`, and the implementation defaults to that.

The reason: While things work just fine with `CreateTXG > the cursor's`,
ZFS does not provide size estimates in a `zfs send` dry run
(see acd2418).
However, to enable the use case of keeping the snapshot only around for
the replication, the config flag exists.
This commit is contained in:
Christian Schwarz 2018-09-05 18:24:15 -07:00
parent acd2418803
commit 975fdee217
9 changed files with 559 additions and 294 deletions

View File

@ -205,6 +205,7 @@ type PruningEnum struct {
type PruneKeepNotReplicated struct { type PruneKeepNotReplicated struct {
Type string `yaml:"type"` Type string `yaml:"type"`
KeepSnapshotAtCursor bool `yaml:"keep_snapshot_at_cursor,optional,default=true"`
} }
type PruneKeepLastN struct { type PruneKeepLastN struct {

View File

@ -9,13 +9,14 @@ import (
"github.com/zrepl/zrepl/pruning" "github.com/zrepl/zrepl/pruning"
"github.com/zrepl/zrepl/replication/pdu" "github.com/zrepl/zrepl/replication/pdu"
"net" "net"
"sort"
"sync" "sync"
"time" "time"
) )
// Try to keep it compatible with gitub.com/zrepl/zrepl/replication.Endpoint // Try to keep it compatible with gitub.com/zrepl/zrepl/replication.Endpoint
type History interface { type History interface {
SnapshotReplicationStatus(ctx context.Context, req *pdu.SnapshotReplicationStatusReq) (*pdu.SnapshotReplicationStatusRes, error) ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error)
} }
type Target interface { type Target interface {
@ -47,6 +48,7 @@ type args struct {
receiver History receiver History
rules []pruning.KeepRule rules []pruning.KeepRule
retryWait time.Duration retryWait time.Duration
considerSnapAtCursorReplicated bool
} }
type Pruner struct { type Pruner struct {
@ -69,6 +71,7 @@ type PrunerFactory struct {
senderRules []pruning.KeepRule senderRules []pruning.KeepRule
receiverRules []pruning.KeepRule receiverRules []pruning.KeepRule
retryWait time.Duration retryWait time.Duration
considerSnapAtCursorReplicated bool
} }
func checkContainsKeep1(rules []pruning.KeepRule) error { func checkContainsKeep1(rules []pruning.KeepRule) error {
@ -95,14 +98,19 @@ func NewPrunerFactory(in config.PruningSenderReceiver) (*PrunerFactory, error) {
return nil, errors.Wrap(err, "cannot build sender pruning rules") return nil, errors.Wrap(err, "cannot build sender pruning rules")
} }
if err := checkContainsKeep1(keepRulesSender); err != nil { considerSnapAtCursorReplicated := false
return nil, err for _, r := range in.KeepSender {
knr, ok := r.Ret.(*config.PruneKeepNotReplicated)
if !ok {
continue
}
considerSnapAtCursorReplicated = considerSnapAtCursorReplicated || !knr.KeepSnapshotAtCursor
} }
f := &PrunerFactory{ f := &PrunerFactory{
keepRulesSender, keepRulesSender,
keepRulesReceiver, keepRulesReceiver,
10 * time.Second, //FIXME constant 10 * time.Second, //FIXME constant
considerSnapAtCursorReplicated,
} }
return f, nil return f, nil
} }
@ -115,6 +123,7 @@ func (f *PrunerFactory) BuildSenderPruner(ctx context.Context, target Target, re
receiver, receiver,
f.senderRules, f.senderRules,
f.retryWait, f.retryWait,
f.considerSnapAtCursorReplicated,
}, },
state: Plan, state: Plan,
} }
@ -129,6 +138,7 @@ func (f *PrunerFactory) BuildReceiverPruner(ctx context.Context, target Target,
receiver, receiver,
f.receiverRules, f.receiverRules,
f.retryWait, f.retryWait,
false, // senseless here anyways
}, },
state: Plan, state: Plan,
} }
@ -254,56 +264,66 @@ func statePlan(a *args, u updater) state {
} }
pfss := make([]*fs, len(tfss)) pfss := make([]*fs, len(tfss))
fsloop:
for i, tfs := range tfss { for i, tfs := range tfss {
l := GetLogger(ctx).WithField("fs", tfs.Path)
l.Debug("plan filesystem")
tfsvs, err := target.ListFilesystemVersions(ctx, tfs.Path) tfsvs, err := target.ListFilesystemVersions(ctx, tfs.Path)
if err != nil { if err != nil {
l.WithError(err).Error("cannot list filesystem versions")
return onErr(u, err) return onErr(u, err)
} }
rcReq := &pdu.ReplicationCursorReq{
Filesystem: tfs.Path,
Op: &pdu.ReplicationCursorReq_Get{},
}
rc, err := receiver.ReplicationCursor(ctx, rcReq)
if err != nil {
l.WithError(err).Error("cannot get replication cursor")
return onErr(u, err)
}
if rc.GetError() != "" {
l.WithField("reqErr", rc.GetError()).Error("cannot get replication cursor")
return onErr(u, fmt.Errorf("%s", rc.GetError()))
}
pfs := &fs{ pfs := &fs{
path: tfs.Path, path: tfs.Path,
snaps: make([]pruning.Snapshot, 0, len(tfsvs)), snaps: make([]pruning.Snapshot, 0, len(tfsvs)),
} }
pfss[i] = pfs
// scan from older to newer, all snapshots older than cursor are interpreted as replicated
sort.Slice(tfsvs, func(i, j int) bool {
return tfsvs[i].CreateTXG < tfsvs[j].CreateTXG
})
preCursor := true
for _, tfsv := range tfsvs { for _, tfsv := range tfsvs {
if tfsv.Type != pdu.FilesystemVersion_Snapshot { if tfsv.Type != pdu.FilesystemVersion_Snapshot {
continue continue
} }
creation, err := tfsv.CreationAsTime() creation, err := tfsv.CreationAsTime()
if err != nil { if err != nil {
return onErr(u, fmt.Errorf("%s%s has invalid creation date: %s", tfs, tfsv.RelName(), err)) pfs.err = fmt.Errorf("%s%s has invalid creation date: %s", tfs, tfsv.RelName(), err)
} l.WithError(pfs.err).Error("")
req := pdu.SnapshotReplicationStatusReq{ continue fsloop
Filesystem: tfs.Path,
Snapshot: tfsv.Name,
Op: pdu.SnapshotReplicationStatusReq_Get,
}
res, err := receiver.SnapshotReplicationStatus(ctx, &req)
if err != nil {
GetLogger(ctx).
WithField("req", req.String()).
WithError(err).Error("cannot get snapshot replication status")
}
if err != nil && shouldRetry(err) {
return onErr(u, err)
} else if err != nil {
pfs.err = err
pfs.snaps = nil
break
}
if res.Status == pdu.SnapshotReplicationStatusRes_Nonexistent {
GetLogger(ctx).
Debug("snapshot does not exist in history, assuming was replicated")
} }
atCursor := tfsv.Guid == rc.GetGuid()
preCursor = preCursor && !atCursor
pfs.snaps = append(pfs.snaps, snapshot{ pfs.snaps = append(pfs.snaps, snapshot{
replicated: !(res.Status != pdu.SnapshotReplicationStatusRes_Replicated), replicated: preCursor || (a.considerSnapAtCursorReplicated && atCursor),
date: creation, date: creation,
fsv: tfsv, fsv: tfsv,
}) })
} }
if preCursor {
pfss[i] = pfs pfs.err = fmt.Errorf("replication cursor not found in prune target filesystem versions")
l.WithError(pfs.err).Error("")
continue fsloop
}
} }
@ -324,7 +344,13 @@ func stateExec(a *args, u updater) state {
var pfs *fs var pfs *fs
state := u(func(pruner *Pruner) { state := u(func(pruner *Pruner) {
if len(pruner.prunePending) == 0 { if len(pruner.prunePending) == 0 {
pruner.state = Done nextState := Done
for _, pfs := range pruner.pruneCompleted {
if pfs.err != nil {
nextState = ErrPerm
}
}
pruner.state = nextState
return return
} }
pfs = pruner.prunePending[0] pfs = pruner.prunePending[0]

View File

@ -109,10 +109,7 @@ func (p *Sender) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshots
return doDestroySnapshots(ctx, dp, req.Snapshots) return doDestroySnapshots(ctx, dp, req.Snapshots)
} }
// Since replication always happens from sender to receiver, this method is only ipmlemented for the sender. func (p *Sender) ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) {
// If this method returns a *zfs.DatasetDoesNotExist as an error, it might be a good indicator
// that something is wrong with the pruning logic, which is the only consumer of this method.
func (p *Sender) SnapshotReplicationStatus(ctx context.Context, req *pdu.SnapshotReplicationStatusReq) (*pdu.SnapshotReplicationStatusRes, error) {
dp, err := zfs.NewDatasetPath(req.Filesystem) dp, err := zfs.NewDatasetPath(req.Filesystem)
if err != nil { if err != nil {
return nil, err return nil, err
@ -125,35 +122,25 @@ func (p *Sender) SnapshotReplicationStatus(ctx context.Context, req *pdu.Snapsho
return nil, replication.NewFilteredError(req.Filesystem) return nil, replication.NewFilteredError(req.Filesystem)
} }
version := zfs.FilesystemVersion{ switch op := req.Op.(type) {
Type: zfs.Snapshot, case *pdu.ReplicationCursorReq_Get:
Name: req.Snapshot, //FIXME validation cursor, err := zfs.ZFSGetReplicationCursor(dp)
}
var status pdu.SnapshotReplicationStatusRes_Status
switch req.Op {
case pdu.SnapshotReplicationStatusReq_Get:
replicated, err := zfs.ZFSGetReplicatedProperty(dp, &version)
if _, ok := err.(*zfs.DatasetDoesNotExist); ok {
status = pdu.SnapshotReplicationStatusRes_Nonexistent
} else if err != nil {
}
if replicated {
status = pdu.SnapshotReplicationStatusRes_Replicated
} else {
status = pdu.SnapshotReplicationStatusRes_NotReplicated
}
case pdu.SnapshotReplicationStatusReq_SetReplicated:
err = zfs.ZFSSetReplicatedProperty(dp, &version, true)
if err != nil { if err != nil {
return nil, err return nil, err
} }
status = pdu.SnapshotReplicationStatusRes_Replicated if cursor == nil {
default: return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Error{Error: "cursor does not exist"}}, nil
return nil, errors.Errorf("unknown opcode %v", req.Op) }
return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Guid{cursor.Guid}}, nil
case *pdu.ReplicationCursorReq_Set:
guid, err := zfs.ZFSSetReplicationCursor(dp, op.Set.Snapshot)
if err != nil {
return nil, err
}
return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Guid{Guid: guid}}, nil
default:
return nil, errors.Errorf("unknown op %T", op)
} }
return &pdu.SnapshotReplicationStatusRes{Status: status}, nil
} }
type FSFilter interface { type FSFilter interface {
@ -352,7 +339,7 @@ const (
RPCReceive = "Receive" RPCReceive = "Receive"
RPCSend = "Send" RPCSend = "Send"
RPCSDestroySnapshots = "DestroySnapshots" RPCSDestroySnapshots = "DestroySnapshots"
RPCSnapshotReplicationStatus = "SnapshotReplicationStatus" RPCReplicationCursor = "ReplicationCursor"
) )
// Remote implements an endpoint stub that uses streamrpc as a transport. // Remote implements an endpoint stub that uses streamrpc as a transport.
@ -578,18 +565,18 @@ func (a *Handler) Handle(ctx context.Context, endpoint string, reqStructured *by
} }
return bytes.NewBuffer(b), nil, nil return bytes.NewBuffer(b), nil, nil
case RPCSnapshotReplicationStatus: case RPCReplicationCursor:
sender, ok := a.ep.(replication.Sender) sender, ok := a.ep.(replication.Sender)
if !ok { if !ok {
goto Err goto Err
} }
var req pdu.SnapshotReplicationStatusReq var req pdu.ReplicationCursorReq
if err := proto.Unmarshal(reqStructured.Bytes(), &req); err != nil { if err := proto.Unmarshal(reqStructured.Bytes(), &req); err != nil {
return nil, nil, err return nil, nil, err
} }
res, err := sender.SnapshotReplicationStatus(ctx, &req) res, err := sender.ReplicationCursor(ctx, &req)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@ -42,7 +42,7 @@ type Sender interface {
// any next call to the parent github.com/zrepl/zrepl/replication.Endpoint. // any next call to the parent github.com/zrepl/zrepl/replication.Endpoint.
// If the send request is for dry run the io.ReadCloser will be nil // If the send request is for dry run the io.ReadCloser will be nil
Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error)
SnapshotReplicationStatus(ctx context.Context, r *pdu.SnapshotReplicationStatusReq) (*pdu.SnapshotReplicationStatusRes, error) ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error)
} }
// A Sender is usually part of a github.com/zrepl/zrepl/replication.Endpoint. // A Sender is usually part of a github.com/zrepl/zrepl/replication.Endpoint.
@ -423,19 +423,22 @@ func (s *ReplicationStep) doMarkReplicated(ctx context.Context, sender Sender) S
return s.state return s.state
} }
log.Debug("mark snapshot as replicated") log.Debug("advance replication cursor")
req := pdu.SnapshotReplicationStatusReq{ req := &pdu.ReplicationCursorReq{
Filesystem: s.parent.fs, Filesystem: s.parent.fs,
Op: &pdu.ReplicationCursorReq_Set{
Set: &pdu.ReplicationCursorReq_SetOp{
Snapshot: s.to.GetName(), Snapshot: s.to.GetName(),
Op: pdu.SnapshotReplicationStatusReq_SetReplicated, },
},
} }
res, err := sender.SnapshotReplicationStatus(ctx, &req) res, err := sender.ReplicationCursor(ctx, req)
if err != nil { if err != nil {
log.WithError(err).Error("error marking snapshot as replicated") log.WithError(err).Error("error advancing replication cursor")
return updateStateError(err) return updateStateError(err)
} }
if res.Status != pdu.SnapshotReplicationStatusRes_Replicated { if res.GetError() != "" {
err := fmt.Errorf("sender did not report snapshot as replicated: %s", res.Status) err := fmt.Errorf("cannot advance replication cursor: %s", res.GetError())
log.Error(err.Error()) log.Error(err.Error())
return updateStateError(err) return updateStateError(err)
} }

View File

@ -59,10 +59,9 @@ func IncrementalPath(receiver, sender []*FilesystemVersion) (incPath []*Filesyst
for mrcaRcv >= 0 && mrcaSnd >= 0 { for mrcaRcv >= 0 && mrcaSnd >= 0 {
if receiver[mrcaRcv].Guid == sender[mrcaSnd].Guid { if receiver[mrcaRcv].Guid == sender[mrcaSnd].Guid {
if mrcaSnd-1 >= 0 && sender[mrcaSnd-1].Guid == sender[mrcaSnd].Guid && sender[mrcaSnd-1].Type == FilesystemVersion_Bookmark { // Since we arrive from the end of the array, and because we defined bookmark < snapshot,
// prefer bookmarks over snapshots as the snapshot might go away sooner // this condition will match snapshot first, which is what we want because it gives us
mrcaSnd -= 1 // size estimation
}
break break
} }
receiverCreation, err := receiver[mrcaRcv].CreationAsTime() receiverCreation, err := receiver[mrcaRcv].CreationAsTime()

View File

@ -38,56 +38,7 @@ func (x FilesystemVersion_VersionType) String() string {
return proto.EnumName(FilesystemVersion_VersionType_name, int32(x)) return proto.EnumName(FilesystemVersion_VersionType_name, int32(x))
} }
func (FilesystemVersion_VersionType) EnumDescriptor() ([]byte, []int) { func (FilesystemVersion_VersionType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{5, 0} return fileDescriptor_pdu_cbdc4740ab26577c, []int{5, 0}
}
type SnapshotReplicationStatusReq_Op int32
const (
SnapshotReplicationStatusReq_Get SnapshotReplicationStatusReq_Op = 0
SnapshotReplicationStatusReq_SetReplicated SnapshotReplicationStatusReq_Op = 1
)
var SnapshotReplicationStatusReq_Op_name = map[int32]string{
0: "Get",
1: "SetReplicated",
}
var SnapshotReplicationStatusReq_Op_value = map[string]int32{
"Get": 0,
"SetReplicated": 1,
}
func (x SnapshotReplicationStatusReq_Op) String() string {
return proto.EnumName(SnapshotReplicationStatusReq_Op_name, int32(x))
}
func (SnapshotReplicationStatusReq_Op) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{14, 0}
}
type SnapshotReplicationStatusRes_Status int32
const (
SnapshotReplicationStatusRes_Nonexistent SnapshotReplicationStatusRes_Status = 0
SnapshotReplicationStatusRes_NotReplicated SnapshotReplicationStatusRes_Status = 1
SnapshotReplicationStatusRes_Replicated SnapshotReplicationStatusRes_Status = 2
)
var SnapshotReplicationStatusRes_Status_name = map[int32]string{
0: "Nonexistent",
1: "NotReplicated",
2: "Replicated",
}
var SnapshotReplicationStatusRes_Status_value = map[string]int32{
"Nonexistent": 0,
"NotReplicated": 1,
"Replicated": 2,
}
func (x SnapshotReplicationStatusRes_Status) String() string {
return proto.EnumName(SnapshotReplicationStatusRes_Status_name, int32(x))
}
func (SnapshotReplicationStatusRes_Status) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{15, 0}
} }
type ListFilesystemReq struct { type ListFilesystemReq struct {
@ -100,7 +51,7 @@ func (m *ListFilesystemReq) Reset() { *m = ListFilesystemReq{} }
func (m *ListFilesystemReq) String() string { return proto.CompactTextString(m) } func (m *ListFilesystemReq) String() string { return proto.CompactTextString(m) }
func (*ListFilesystemReq) ProtoMessage() {} func (*ListFilesystemReq) ProtoMessage() {}
func (*ListFilesystemReq) Descriptor() ([]byte, []int) { func (*ListFilesystemReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{0} return fileDescriptor_pdu_cbdc4740ab26577c, []int{0}
} }
func (m *ListFilesystemReq) XXX_Unmarshal(b []byte) error { func (m *ListFilesystemReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListFilesystemReq.Unmarshal(m, b) return xxx_messageInfo_ListFilesystemReq.Unmarshal(m, b)
@ -131,7 +82,7 @@ func (m *ListFilesystemRes) Reset() { *m = ListFilesystemRes{} }
func (m *ListFilesystemRes) String() string { return proto.CompactTextString(m) } func (m *ListFilesystemRes) String() string { return proto.CompactTextString(m) }
func (*ListFilesystemRes) ProtoMessage() {} func (*ListFilesystemRes) ProtoMessage() {}
func (*ListFilesystemRes) Descriptor() ([]byte, []int) { func (*ListFilesystemRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{1} return fileDescriptor_pdu_cbdc4740ab26577c, []int{1}
} }
func (m *ListFilesystemRes) XXX_Unmarshal(b []byte) error { func (m *ListFilesystemRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListFilesystemRes.Unmarshal(m, b) return xxx_messageInfo_ListFilesystemRes.Unmarshal(m, b)
@ -170,7 +121,7 @@ func (m *Filesystem) Reset() { *m = Filesystem{} }
func (m *Filesystem) String() string { return proto.CompactTextString(m) } func (m *Filesystem) String() string { return proto.CompactTextString(m) }
func (*Filesystem) ProtoMessage() {} func (*Filesystem) ProtoMessage() {}
func (*Filesystem) Descriptor() ([]byte, []int) { func (*Filesystem) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{2} return fileDescriptor_pdu_cbdc4740ab26577c, []int{2}
} }
func (m *Filesystem) XXX_Unmarshal(b []byte) error { func (m *Filesystem) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Filesystem.Unmarshal(m, b) return xxx_messageInfo_Filesystem.Unmarshal(m, b)
@ -215,7 +166,7 @@ func (m *ListFilesystemVersionsReq) Reset() { *m = ListFilesystemVersion
func (m *ListFilesystemVersionsReq) String() string { return proto.CompactTextString(m) } func (m *ListFilesystemVersionsReq) String() string { return proto.CompactTextString(m) }
func (*ListFilesystemVersionsReq) ProtoMessage() {} func (*ListFilesystemVersionsReq) ProtoMessage() {}
func (*ListFilesystemVersionsReq) Descriptor() ([]byte, []int) { func (*ListFilesystemVersionsReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{3} return fileDescriptor_pdu_cbdc4740ab26577c, []int{3}
} }
func (m *ListFilesystemVersionsReq) XXX_Unmarshal(b []byte) error { func (m *ListFilesystemVersionsReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListFilesystemVersionsReq.Unmarshal(m, b) return xxx_messageInfo_ListFilesystemVersionsReq.Unmarshal(m, b)
@ -253,7 +204,7 @@ func (m *ListFilesystemVersionsRes) Reset() { *m = ListFilesystemVersion
func (m *ListFilesystemVersionsRes) String() string { return proto.CompactTextString(m) } func (m *ListFilesystemVersionsRes) String() string { return proto.CompactTextString(m) }
func (*ListFilesystemVersionsRes) ProtoMessage() {} func (*ListFilesystemVersionsRes) ProtoMessage() {}
func (*ListFilesystemVersionsRes) Descriptor() ([]byte, []int) { func (*ListFilesystemVersionsRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{4} return fileDescriptor_pdu_cbdc4740ab26577c, []int{4}
} }
func (m *ListFilesystemVersionsRes) XXX_Unmarshal(b []byte) error { func (m *ListFilesystemVersionsRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListFilesystemVersionsRes.Unmarshal(m, b) return xxx_messageInfo_ListFilesystemVersionsRes.Unmarshal(m, b)
@ -295,7 +246,7 @@ func (m *FilesystemVersion) Reset() { *m = FilesystemVersion{} }
func (m *FilesystemVersion) String() string { return proto.CompactTextString(m) } func (m *FilesystemVersion) String() string { return proto.CompactTextString(m) }
func (*FilesystemVersion) ProtoMessage() {} func (*FilesystemVersion) ProtoMessage() {}
func (*FilesystemVersion) Descriptor() ([]byte, []int) { func (*FilesystemVersion) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{5} return fileDescriptor_pdu_cbdc4740ab26577c, []int{5}
} }
func (m *FilesystemVersion) XXX_Unmarshal(b []byte) error { func (m *FilesystemVersion) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_FilesystemVersion.Unmarshal(m, b) return xxx_messageInfo_FilesystemVersion.Unmarshal(m, b)
@ -375,7 +326,7 @@ func (m *SendReq) Reset() { *m = SendReq{} }
func (m *SendReq) String() string { return proto.CompactTextString(m) } func (m *SendReq) String() string { return proto.CompactTextString(m) }
func (*SendReq) ProtoMessage() {} func (*SendReq) ProtoMessage() {}
func (*SendReq) Descriptor() ([]byte, []int) { func (*SendReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{6} return fileDescriptor_pdu_cbdc4740ab26577c, []int{6}
} }
func (m *SendReq) XXX_Unmarshal(b []byte) error { func (m *SendReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SendReq.Unmarshal(m, b) return xxx_messageInfo_SendReq.Unmarshal(m, b)
@ -456,7 +407,7 @@ func (m *Property) Reset() { *m = Property{} }
func (m *Property) String() string { return proto.CompactTextString(m) } func (m *Property) String() string { return proto.CompactTextString(m) }
func (*Property) ProtoMessage() {} func (*Property) ProtoMessage() {}
func (*Property) Descriptor() ([]byte, []int) { func (*Property) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{7} return fileDescriptor_pdu_cbdc4740ab26577c, []int{7}
} }
func (m *Property) XXX_Unmarshal(b []byte) error { func (m *Property) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Property.Unmarshal(m, b) return xxx_messageInfo_Property.Unmarshal(m, b)
@ -506,7 +457,7 @@ func (m *SendRes) Reset() { *m = SendRes{} }
func (m *SendRes) String() string { return proto.CompactTextString(m) } func (m *SendRes) String() string { return proto.CompactTextString(m) }
func (*SendRes) ProtoMessage() {} func (*SendRes) ProtoMessage() {}
func (*SendRes) Descriptor() ([]byte, []int) { func (*SendRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{8} return fileDescriptor_pdu_cbdc4740ab26577c, []int{8}
} }
func (m *SendRes) XXX_Unmarshal(b []byte) error { func (m *SendRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SendRes.Unmarshal(m, b) return xxx_messageInfo_SendRes.Unmarshal(m, b)
@ -560,7 +511,7 @@ func (m *ReceiveReq) Reset() { *m = ReceiveReq{} }
func (m *ReceiveReq) String() string { return proto.CompactTextString(m) } func (m *ReceiveReq) String() string { return proto.CompactTextString(m) }
func (*ReceiveReq) ProtoMessage() {} func (*ReceiveReq) ProtoMessage() {}
func (*ReceiveReq) Descriptor() ([]byte, []int) { func (*ReceiveReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{9} return fileDescriptor_pdu_cbdc4740ab26577c, []int{9}
} }
func (m *ReceiveReq) XXX_Unmarshal(b []byte) error { func (m *ReceiveReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReceiveReq.Unmarshal(m, b) return xxx_messageInfo_ReceiveReq.Unmarshal(m, b)
@ -604,7 +555,7 @@ func (m *ReceiveRes) Reset() { *m = ReceiveRes{} }
func (m *ReceiveRes) String() string { return proto.CompactTextString(m) } func (m *ReceiveRes) String() string { return proto.CompactTextString(m) }
func (*ReceiveRes) ProtoMessage() {} func (*ReceiveRes) ProtoMessage() {}
func (*ReceiveRes) Descriptor() ([]byte, []int) { func (*ReceiveRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{10} return fileDescriptor_pdu_cbdc4740ab26577c, []int{10}
} }
func (m *ReceiveRes) XXX_Unmarshal(b []byte) error { func (m *ReceiveRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReceiveRes.Unmarshal(m, b) return xxx_messageInfo_ReceiveRes.Unmarshal(m, b)
@ -637,7 +588,7 @@ func (m *DestroySnapshotsReq) Reset() { *m = DestroySnapshotsReq{} }
func (m *DestroySnapshotsReq) String() string { return proto.CompactTextString(m) } func (m *DestroySnapshotsReq) String() string { return proto.CompactTextString(m) }
func (*DestroySnapshotsReq) ProtoMessage() {} func (*DestroySnapshotsReq) ProtoMessage() {}
func (*DestroySnapshotsReq) Descriptor() ([]byte, []int) { func (*DestroySnapshotsReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{11} return fileDescriptor_pdu_cbdc4740ab26577c, []int{11}
} }
func (m *DestroySnapshotsReq) XXX_Unmarshal(b []byte) error { func (m *DestroySnapshotsReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DestroySnapshotsReq.Unmarshal(m, b) return xxx_messageInfo_DestroySnapshotsReq.Unmarshal(m, b)
@ -683,7 +634,7 @@ func (m *DestroySnapshotRes) Reset() { *m = DestroySnapshotRes{} }
func (m *DestroySnapshotRes) String() string { return proto.CompactTextString(m) } func (m *DestroySnapshotRes) String() string { return proto.CompactTextString(m) }
func (*DestroySnapshotRes) ProtoMessage() {} func (*DestroySnapshotRes) ProtoMessage() {}
func (*DestroySnapshotRes) Descriptor() ([]byte, []int) { func (*DestroySnapshotRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{12} return fileDescriptor_pdu_cbdc4740ab26577c, []int{12}
} }
func (m *DestroySnapshotRes) XXX_Unmarshal(b []byte) error { func (m *DestroySnapshotRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DestroySnapshotRes.Unmarshal(m, b) return xxx_messageInfo_DestroySnapshotRes.Unmarshal(m, b)
@ -728,7 +679,7 @@ func (m *DestroySnapshotsRes) Reset() { *m = DestroySnapshotsRes{} }
func (m *DestroySnapshotsRes) String() string { return proto.CompactTextString(m) } func (m *DestroySnapshotsRes) String() string { return proto.CompactTextString(m) }
func (*DestroySnapshotsRes) ProtoMessage() {} func (*DestroySnapshotsRes) ProtoMessage() {}
func (*DestroySnapshotsRes) Descriptor() ([]byte, []int) { func (*DestroySnapshotsRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{13} return fileDescriptor_pdu_cbdc4740ab26577c, []int{13}
} }
func (m *DestroySnapshotsRes) XXX_Unmarshal(b []byte) error { func (m *DestroySnapshotsRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DestroySnapshotsRes.Unmarshal(m, b) return xxx_messageInfo_DestroySnapshotsRes.Unmarshal(m, b)
@ -755,96 +706,361 @@ func (m *DestroySnapshotsRes) GetResults() []*DestroySnapshotRes {
return nil return nil
} }
type SnapshotReplicationStatusReq struct { type ReplicationCursorReq struct {
Filesystem string `protobuf:"bytes,1,opt,name=Filesystem,proto3" json:"Filesystem,omitempty"` Filesystem string `protobuf:"bytes,1,opt,name=Filesystem,proto3" json:"Filesystem,omitempty"`
Snapshot string `protobuf:"bytes,2,opt,name=Snapshot,proto3" json:"Snapshot,omitempty"` // Types that are valid to be assigned to Op:
Op SnapshotReplicationStatusReq_Op `protobuf:"varint,3,opt,name=op,proto3,enum=pdu.SnapshotReplicationStatusReq_Op" json:"op,omitempty"` // *ReplicationCursorReq_Get
// *ReplicationCursorReq_Set
Op isReplicationCursorReq_Op `protobuf_oneof:"op"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
} }
func (m *SnapshotReplicationStatusReq) Reset() { *m = SnapshotReplicationStatusReq{} } func (m *ReplicationCursorReq) Reset() { *m = ReplicationCursorReq{} }
func (m *SnapshotReplicationStatusReq) String() string { return proto.CompactTextString(m) } func (m *ReplicationCursorReq) String() string { return proto.CompactTextString(m) }
func (*SnapshotReplicationStatusReq) ProtoMessage() {} func (*ReplicationCursorReq) ProtoMessage() {}
func (*SnapshotReplicationStatusReq) Descriptor() ([]byte, []int) { func (*ReplicationCursorReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{14} return fileDescriptor_pdu_cbdc4740ab26577c, []int{14}
} }
func (m *SnapshotReplicationStatusReq) XXX_Unmarshal(b []byte) error { func (m *ReplicationCursorReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SnapshotReplicationStatusReq.Unmarshal(m, b) return xxx_messageInfo_ReplicationCursorReq.Unmarshal(m, b)
} }
func (m *SnapshotReplicationStatusReq) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { func (m *ReplicationCursorReq) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_SnapshotReplicationStatusReq.Marshal(b, m, deterministic) return xxx_messageInfo_ReplicationCursorReq.Marshal(b, m, deterministic)
} }
func (dst *SnapshotReplicationStatusReq) XXX_Merge(src proto.Message) { func (dst *ReplicationCursorReq) XXX_Merge(src proto.Message) {
xxx_messageInfo_SnapshotReplicationStatusReq.Merge(dst, src) xxx_messageInfo_ReplicationCursorReq.Merge(dst, src)
} }
func (m *SnapshotReplicationStatusReq) XXX_Size() int { func (m *ReplicationCursorReq) XXX_Size() int {
return xxx_messageInfo_SnapshotReplicationStatusReq.Size(m) return xxx_messageInfo_ReplicationCursorReq.Size(m)
} }
func (m *SnapshotReplicationStatusReq) XXX_DiscardUnknown() { func (m *ReplicationCursorReq) XXX_DiscardUnknown() {
xxx_messageInfo_SnapshotReplicationStatusReq.DiscardUnknown(m) xxx_messageInfo_ReplicationCursorReq.DiscardUnknown(m)
} }
var xxx_messageInfo_SnapshotReplicationStatusReq proto.InternalMessageInfo var xxx_messageInfo_ReplicationCursorReq proto.InternalMessageInfo
func (m *SnapshotReplicationStatusReq) GetFilesystem() string { func (m *ReplicationCursorReq) GetFilesystem() string {
if m != nil { if m != nil {
return m.Filesystem return m.Filesystem
} }
return "" return ""
} }
func (m *SnapshotReplicationStatusReq) GetSnapshot() string { type isReplicationCursorReq_Op interface {
isReplicationCursorReq_Op()
}
type ReplicationCursorReq_Get struct {
Get *ReplicationCursorReq_GetOp `protobuf:"bytes,2,opt,name=get,proto3,oneof"`
}
type ReplicationCursorReq_Set struct {
Set *ReplicationCursorReq_SetOp `protobuf:"bytes,3,opt,name=set,proto3,oneof"`
}
func (*ReplicationCursorReq_Get) isReplicationCursorReq_Op() {}
func (*ReplicationCursorReq_Set) isReplicationCursorReq_Op() {}
func (m *ReplicationCursorReq) GetOp() isReplicationCursorReq_Op {
if m != nil {
return m.Op
}
return nil
}
func (m *ReplicationCursorReq) GetGet() *ReplicationCursorReq_GetOp {
if x, ok := m.GetOp().(*ReplicationCursorReq_Get); ok {
return x.Get
}
return nil
}
func (m *ReplicationCursorReq) GetSet() *ReplicationCursorReq_SetOp {
if x, ok := m.GetOp().(*ReplicationCursorReq_Set); ok {
return x.Set
}
return nil
}
// XXX_OneofFuncs is for the internal use of the proto package.
func (*ReplicationCursorReq) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
return _ReplicationCursorReq_OneofMarshaler, _ReplicationCursorReq_OneofUnmarshaler, _ReplicationCursorReq_OneofSizer, []interface{}{
(*ReplicationCursorReq_Get)(nil),
(*ReplicationCursorReq_Set)(nil),
}
}
func _ReplicationCursorReq_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
m := msg.(*ReplicationCursorReq)
// op
switch x := m.Op.(type) {
case *ReplicationCursorReq_Get:
b.EncodeVarint(2<<3 | proto.WireBytes)
if err := b.EncodeMessage(x.Get); err != nil {
return err
}
case *ReplicationCursorReq_Set:
b.EncodeVarint(3<<3 | proto.WireBytes)
if err := b.EncodeMessage(x.Set); err != nil {
return err
}
case nil:
default:
return fmt.Errorf("ReplicationCursorReq.Op has unexpected type %T", x)
}
return nil
}
func _ReplicationCursorReq_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) {
m := msg.(*ReplicationCursorReq)
switch tag {
case 2: // op.get
if wire != proto.WireBytes {
return true, proto.ErrInternalBadWireType
}
msg := new(ReplicationCursorReq_GetOp)
err := b.DecodeMessage(msg)
m.Op = &ReplicationCursorReq_Get{msg}
return true, err
case 3: // op.set
if wire != proto.WireBytes {
return true, proto.ErrInternalBadWireType
}
msg := new(ReplicationCursorReq_SetOp)
err := b.DecodeMessage(msg)
m.Op = &ReplicationCursorReq_Set{msg}
return true, err
default:
return false, nil
}
}
func _ReplicationCursorReq_OneofSizer(msg proto.Message) (n int) {
m := msg.(*ReplicationCursorReq)
// op
switch x := m.Op.(type) {
case *ReplicationCursorReq_Get:
s := proto.Size(x.Get)
n += 1 // tag and wire
n += proto.SizeVarint(uint64(s))
n += s
case *ReplicationCursorReq_Set:
s := proto.Size(x.Set)
n += 1 // tag and wire
n += proto.SizeVarint(uint64(s))
n += s
case nil:
default:
panic(fmt.Sprintf("proto: unexpected type %T in oneof", x))
}
return n
}
type ReplicationCursorReq_GetOp struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ReplicationCursorReq_GetOp) Reset() { *m = ReplicationCursorReq_GetOp{} }
func (m *ReplicationCursorReq_GetOp) String() string { return proto.CompactTextString(m) }
func (*ReplicationCursorReq_GetOp) ProtoMessage() {}
func (*ReplicationCursorReq_GetOp) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_cbdc4740ab26577c, []int{14, 0}
}
func (m *ReplicationCursorReq_GetOp) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReplicationCursorReq_GetOp.Unmarshal(m, b)
}
func (m *ReplicationCursorReq_GetOp) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ReplicationCursorReq_GetOp.Marshal(b, m, deterministic)
}
func (dst *ReplicationCursorReq_GetOp) XXX_Merge(src proto.Message) {
xxx_messageInfo_ReplicationCursorReq_GetOp.Merge(dst, src)
}
func (m *ReplicationCursorReq_GetOp) XXX_Size() int {
return xxx_messageInfo_ReplicationCursorReq_GetOp.Size(m)
}
func (m *ReplicationCursorReq_GetOp) XXX_DiscardUnknown() {
xxx_messageInfo_ReplicationCursorReq_GetOp.DiscardUnknown(m)
}
var xxx_messageInfo_ReplicationCursorReq_GetOp proto.InternalMessageInfo
type ReplicationCursorReq_SetOp struct {
Snapshot string `protobuf:"bytes,2,opt,name=Snapshot,proto3" json:"Snapshot,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ReplicationCursorReq_SetOp) Reset() { *m = ReplicationCursorReq_SetOp{} }
func (m *ReplicationCursorReq_SetOp) String() string { return proto.CompactTextString(m) }
func (*ReplicationCursorReq_SetOp) ProtoMessage() {}
func (*ReplicationCursorReq_SetOp) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_cbdc4740ab26577c, []int{14, 1}
}
func (m *ReplicationCursorReq_SetOp) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReplicationCursorReq_SetOp.Unmarshal(m, b)
}
func (m *ReplicationCursorReq_SetOp) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ReplicationCursorReq_SetOp.Marshal(b, m, deterministic)
}
func (dst *ReplicationCursorReq_SetOp) XXX_Merge(src proto.Message) {
xxx_messageInfo_ReplicationCursorReq_SetOp.Merge(dst, src)
}
func (m *ReplicationCursorReq_SetOp) XXX_Size() int {
return xxx_messageInfo_ReplicationCursorReq_SetOp.Size(m)
}
func (m *ReplicationCursorReq_SetOp) XXX_DiscardUnknown() {
xxx_messageInfo_ReplicationCursorReq_SetOp.DiscardUnknown(m)
}
var xxx_messageInfo_ReplicationCursorReq_SetOp proto.InternalMessageInfo
func (m *ReplicationCursorReq_SetOp) GetSnapshot() string {
if m != nil { if m != nil {
return m.Snapshot return m.Snapshot
} }
return "" return ""
} }
func (m *SnapshotReplicationStatusReq) GetOp() SnapshotReplicationStatusReq_Op { type ReplicationCursorRes struct {
if m != nil { // Types that are valid to be assigned to Result:
return m.Op // *ReplicationCursorRes_Guid
} // *ReplicationCursorRes_Error
return SnapshotReplicationStatusReq_Get Result isReplicationCursorRes_Result `protobuf_oneof:"Result"`
}
type SnapshotReplicationStatusRes struct {
Status SnapshotReplicationStatusRes_Status `protobuf:"varint,1,opt,name=status,proto3,enum=pdu.SnapshotReplicationStatusRes_Status" json:"status,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
} }
func (m *SnapshotReplicationStatusRes) Reset() { *m = SnapshotReplicationStatusRes{} } func (m *ReplicationCursorRes) Reset() { *m = ReplicationCursorRes{} }
func (m *SnapshotReplicationStatusRes) String() string { return proto.CompactTextString(m) } func (m *ReplicationCursorRes) String() string { return proto.CompactTextString(m) }
func (*SnapshotReplicationStatusRes) ProtoMessage() {} func (*ReplicationCursorRes) ProtoMessage() {}
func (*SnapshotReplicationStatusRes) Descriptor() ([]byte, []int) { func (*ReplicationCursorRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{15} return fileDescriptor_pdu_cbdc4740ab26577c, []int{15}
} }
func (m *SnapshotReplicationStatusRes) XXX_Unmarshal(b []byte) error { func (m *ReplicationCursorRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SnapshotReplicationStatusRes.Unmarshal(m, b) return xxx_messageInfo_ReplicationCursorRes.Unmarshal(m, b)
} }
func (m *SnapshotReplicationStatusRes) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { func (m *ReplicationCursorRes) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_SnapshotReplicationStatusRes.Marshal(b, m, deterministic) return xxx_messageInfo_ReplicationCursorRes.Marshal(b, m, deterministic)
} }
func (dst *SnapshotReplicationStatusRes) XXX_Merge(src proto.Message) { func (dst *ReplicationCursorRes) XXX_Merge(src proto.Message) {
xxx_messageInfo_SnapshotReplicationStatusRes.Merge(dst, src) xxx_messageInfo_ReplicationCursorRes.Merge(dst, src)
} }
func (m *SnapshotReplicationStatusRes) XXX_Size() int { func (m *ReplicationCursorRes) XXX_Size() int {
return xxx_messageInfo_SnapshotReplicationStatusRes.Size(m) return xxx_messageInfo_ReplicationCursorRes.Size(m)
} }
func (m *SnapshotReplicationStatusRes) XXX_DiscardUnknown() { func (m *ReplicationCursorRes) XXX_DiscardUnknown() {
xxx_messageInfo_SnapshotReplicationStatusRes.DiscardUnknown(m) xxx_messageInfo_ReplicationCursorRes.DiscardUnknown(m)
} }
var xxx_messageInfo_SnapshotReplicationStatusRes proto.InternalMessageInfo var xxx_messageInfo_ReplicationCursorRes proto.InternalMessageInfo
func (m *SnapshotReplicationStatusRes) GetStatus() SnapshotReplicationStatusRes_Status { type isReplicationCursorRes_Result interface {
isReplicationCursorRes_Result()
}
type ReplicationCursorRes_Guid struct {
Guid uint64 `protobuf:"varint,1,opt,name=Guid,proto3,oneof"`
}
type ReplicationCursorRes_Error struct {
Error string `protobuf:"bytes,2,opt,name=Error,proto3,oneof"`
}
func (*ReplicationCursorRes_Guid) isReplicationCursorRes_Result() {}
func (*ReplicationCursorRes_Error) isReplicationCursorRes_Result() {}
func (m *ReplicationCursorRes) GetResult() isReplicationCursorRes_Result {
if m != nil { if m != nil {
return m.Status return m.Result
} }
return SnapshotReplicationStatusRes_Nonexistent return nil
}
func (m *ReplicationCursorRes) GetGuid() uint64 {
if x, ok := m.GetResult().(*ReplicationCursorRes_Guid); ok {
return x.Guid
}
return 0
}
func (m *ReplicationCursorRes) GetError() string {
if x, ok := m.GetResult().(*ReplicationCursorRes_Error); ok {
return x.Error
}
return ""
}
// XXX_OneofFuncs is for the internal use of the proto package.
func (*ReplicationCursorRes) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
return _ReplicationCursorRes_OneofMarshaler, _ReplicationCursorRes_OneofUnmarshaler, _ReplicationCursorRes_OneofSizer, []interface{}{
(*ReplicationCursorRes_Guid)(nil),
(*ReplicationCursorRes_Error)(nil),
}
}
func _ReplicationCursorRes_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
m := msg.(*ReplicationCursorRes)
// Result
switch x := m.Result.(type) {
case *ReplicationCursorRes_Guid:
b.EncodeVarint(1<<3 | proto.WireVarint)
b.EncodeVarint(uint64(x.Guid))
case *ReplicationCursorRes_Error:
b.EncodeVarint(2<<3 | proto.WireBytes)
b.EncodeStringBytes(x.Error)
case nil:
default:
return fmt.Errorf("ReplicationCursorRes.Result has unexpected type %T", x)
}
return nil
}
func _ReplicationCursorRes_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) {
m := msg.(*ReplicationCursorRes)
switch tag {
case 1: // Result.Guid
if wire != proto.WireVarint {
return true, proto.ErrInternalBadWireType
}
x, err := b.DecodeVarint()
m.Result = &ReplicationCursorRes_Guid{x}
return true, err
case 2: // Result.Error
if wire != proto.WireBytes {
return true, proto.ErrInternalBadWireType
}
x, err := b.DecodeStringBytes()
m.Result = &ReplicationCursorRes_Error{x}
return true, err
default:
return false, nil
}
}
func _ReplicationCursorRes_OneofSizer(msg proto.Message) (n int) {
m := msg.(*ReplicationCursorRes)
// Result
switch x := m.Result.(type) {
case *ReplicationCursorRes_Guid:
n += 1 // tag and wire
n += proto.SizeVarint(uint64(x.Guid))
case *ReplicationCursorRes_Error:
n += 1 // tag and wire
n += proto.SizeVarint(uint64(len(x.Error)))
n += len(x.Error)
case nil:
default:
panic(fmt.Sprintf("proto: unexpected type %T in oneof", x))
}
return n
} }
func init() { func init() {
@ -862,57 +1078,57 @@ func init() {
proto.RegisterType((*DestroySnapshotsReq)(nil), "pdu.DestroySnapshotsReq") proto.RegisterType((*DestroySnapshotsReq)(nil), "pdu.DestroySnapshotsReq")
proto.RegisterType((*DestroySnapshotRes)(nil), "pdu.DestroySnapshotRes") proto.RegisterType((*DestroySnapshotRes)(nil), "pdu.DestroySnapshotRes")
proto.RegisterType((*DestroySnapshotsRes)(nil), "pdu.DestroySnapshotsRes") proto.RegisterType((*DestroySnapshotsRes)(nil), "pdu.DestroySnapshotsRes")
proto.RegisterType((*SnapshotReplicationStatusReq)(nil), "pdu.SnapshotReplicationStatusReq") proto.RegisterType((*ReplicationCursorReq)(nil), "pdu.ReplicationCursorReq")
proto.RegisterType((*SnapshotReplicationStatusRes)(nil), "pdu.SnapshotReplicationStatusRes") proto.RegisterType((*ReplicationCursorReq_GetOp)(nil), "pdu.ReplicationCursorReq.GetOp")
proto.RegisterType((*ReplicationCursorReq_SetOp)(nil), "pdu.ReplicationCursorReq.SetOp")
proto.RegisterType((*ReplicationCursorRes)(nil), "pdu.ReplicationCursorRes")
proto.RegisterEnum("pdu.FilesystemVersion_VersionType", FilesystemVersion_VersionType_name, FilesystemVersion_VersionType_value) proto.RegisterEnum("pdu.FilesystemVersion_VersionType", FilesystemVersion_VersionType_name, FilesystemVersion_VersionType_value)
proto.RegisterEnum("pdu.SnapshotReplicationStatusReq_Op", SnapshotReplicationStatusReq_Op_name, SnapshotReplicationStatusReq_Op_value)
proto.RegisterEnum("pdu.SnapshotReplicationStatusRes_Status", SnapshotReplicationStatusRes_Status_name, SnapshotReplicationStatusRes_Status_value)
} }
func init() { proto.RegisterFile("pdu.proto", fileDescriptor_pdu_b3a98b3542e9fb4e) } func init() { proto.RegisterFile("pdu.proto", fileDescriptor_pdu_cbdc4740ab26577c) }
var fileDescriptor_pdu_b3a98b3542e9fb4e = []byte{ var fileDescriptor_pdu_cbdc4740ab26577c = []byte{
// 666 bytes of a gzipped FileDescriptorProto // 657 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0xcf, 0x6e, 0xd3, 0x4e, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0xcb, 0x6e, 0xdb, 0x3a,
0x10, 0xfe, 0xd9, 0x49, 0xf3, 0x67, 0xd2, 0xa6, 0xe9, 0xb6, 0xea, 0xcf, 0x54, 0x15, 0x8a, 0x56, 0x10, 0xb5, 0x6c, 0xd9, 0x96, 0xc7, 0xb9, 0x79, 0x30, 0x41, 0xae, 0x6e, 0x70, 0x71, 0xaf, 0xc1,
0x1c, 0x02, 0x12, 0x91, 0x08, 0x15, 0x17, 0x38, 0xa0, 0xfe, 0xe5, 0x80, 0xda, 0x6a, 0x13, 0xaa, 0x6e, 0xdc, 0x02, 0x35, 0x50, 0x27, 0xe8, 0xa6, 0x3b, 0xe7, 0xe5, 0x45, 0x91, 0x04, 0xb4, 0x1b,
0x9e, 0x90, 0x4c, 0x3d, 0x52, 0xad, 0x24, 0xde, 0xed, 0xee, 0x1a, 0x35, 0x3c, 0x00, 0x8f, 0xc1, 0x64, 0x55, 0x40, 0x8d, 0x06, 0x8d, 0xe0, 0x07, 0x15, 0x92, 0x2a, 0xea, 0x7e, 0x40, 0xff, 0xa9,
0x43, 0x70, 0xe3, 0x4d, 0x78, 0x1c, 0xe4, 0x89, 0xed, 0xb8, 0x49, 0x09, 0x39, 0x65, 0xbe, 0x6f, 0xff, 0xd1, 0x45, 0x3f, 0xa7, 0xe0, 0x58, 0x92, 0x15, 0xdb, 0x0d, 0xbc, 0x32, 0xcf, 0xf0, 0x70,
0x66, 0x67, 0xbe, 0x99, 0xdd, 0x71, 0xa0, 0xae, 0x82, 0xb8, 0xab, 0xb4, 0xb4, 0x92, 0x95, 0x54, 0xe6, 0xcc, 0xa1, 0x86, 0x86, 0x46, 0x1c, 0x26, 0x9d, 0x58, 0x49, 0x23, 0x59, 0x25, 0x0e, 0x13,
0x10, 0xf3, 0x6d, 0xd8, 0xfa, 0x18, 0x1a, 0x7b, 0x1a, 0x8e, 0xd0, 0x4c, 0x8c, 0xc5, 0xb1, 0xc0, 0xbe, 0x0f, 0x7b, 0xef, 0x23, 0x6d, 0x2e, 0xa2, 0x31, 0xea, 0x99, 0x36, 0x38, 0x11, 0xf8, 0xc8,
0x3b, 0x7e, 0xba, 0x48, 0x1a, 0xf6, 0x0a, 0x1a, 0x33, 0xc2, 0x78, 0x4e, 0xbb, 0xd4, 0x69, 0xf4, 0x2f, 0x56, 0x83, 0x9a, 0xbd, 0x81, 0xe6, 0x22, 0xa0, 0x7d, 0xa7, 0x55, 0x69, 0x37, 0xbb, 0x3b,
0x36, 0xbb, 0x49, 0xbe, 0x42, 0x60, 0x31, 0x86, 0x1f, 0x02, 0xcc, 0x20, 0x63, 0x50, 0xbe, 0xf4, 0x1d, 0x9b, 0xaf, 0x40, 0x2c, 0x72, 0x78, 0x0f, 0x60, 0x01, 0x19, 0x03, 0xf7, 0x26, 0x30, 0x0f,
0xed, 0xad, 0xe7, 0xb4, 0x9d, 0x4e, 0x5d, 0x90, 0xcd, 0xda, 0xd0, 0x10, 0x68, 0xe2, 0x31, 0x0e, 0xbe, 0xd3, 0x72, 0xda, 0x0d, 0x41, 0x6b, 0xd6, 0x82, 0xa6, 0x40, 0x9d, 0x4c, 0x70, 0x28, 0x47,
0xe4, 0x10, 0x23, 0xcf, 0x25, 0x57, 0x91, 0xe2, 0x6f, 0xe1, 0xc9, 0x43, 0x2d, 0x57, 0xa8, 0x4d, 0x38, 0xf5, 0xcb, 0xb4, 0x55, 0x0c, 0xf1, 0x77, 0xf0, 0xcf, 0x53, 0x2d, 0xb7, 0xa8, 0x74, 0x24,
0x28, 0x23, 0x23, 0xf0, 0x8e, 0x3d, 0x2d, 0x16, 0x48, 0x13, 0x17, 0x18, 0x7e, 0xf1, 0xf7, 0xc3, 0xa7, 0x5a, 0xe0, 0x23, 0xfb, 0xaf, 0x58, 0x20, 0x4d, 0x5c, 0x88, 0xf0, 0xeb, 0x3f, 0x1f, 0xd6,
0x86, 0xf5, 0xa0, 0x96, 0xc1, 0xb4, 0x9b, 0xdd, 0xb9, 0x6e, 0x52, 0xb7, 0xc8, 0xe3, 0xf8, 0x6f, 0xac, 0x0b, 0x5e, 0x06, 0xd3, 0x6e, 0x0e, 0x97, 0xba, 0x49, 0xb7, 0x45, 0xce, 0xe3, 0xbf, 0x1c,
0x07, 0xb6, 0x16, 0xfc, 0xec, 0x0d, 0x94, 0x07, 0x13, 0x85, 0x24, 0xa0, 0xd9, 0xe3, 0x8f, 0x67, 0xd8, 0x5b, 0xd9, 0x67, 0x6f, 0xc1, 0x1d, 0xce, 0x62, 0x24, 0x01, 0xdb, 0x5d, 0xbe, 0x3e, 0x4b,
0xe9, 0xa6, 0xbf, 0x49, 0xa4, 0xa0, 0xf8, 0x64, 0x22, 0xe7, 0xfe, 0x18, 0xd3, 0xb6, 0xc9, 0x4e, 0x27, 0xfd, 0xb5, 0x4c, 0x41, 0x7c, 0xeb, 0xc8, 0x55, 0x30, 0xc1, 0xb4, 0x6d, 0x5a, 0xdb, 0xd8,
0xb8, 0xb3, 0x38, 0x0c, 0xbc, 0x52, 0xdb, 0xe9, 0x94, 0x05, 0xd9, 0x6c, 0x1f, 0xea, 0x47, 0x1a, 0x65, 0x12, 0x85, 0x7e, 0xa5, 0xe5, 0xb4, 0x5d, 0x41, 0x6b, 0xf6, 0x2f, 0x34, 0x4e, 0x15, 0x06,
0x7d, 0x8b, 0x83, 0xeb, 0x33, 0xaf, 0x4c, 0x8e, 0x19, 0xc1, 0xf6, 0xa0, 0x46, 0x20, 0x94, 0x91, 0x06, 0x87, 0x77, 0x97, 0xbe, 0x4b, 0x1b, 0x8b, 0x00, 0x3b, 0x02, 0x8f, 0x40, 0x24, 0xa7, 0x7e,
0xb7, 0x46, 0x99, 0x72, 0xcc, 0x9f, 0x43, 0xa3, 0x50, 0x96, 0xad, 0x43, 0xad, 0x1f, 0xf9, 0xca, 0x95, 0x32, 0xe5, 0x98, 0xbf, 0x84, 0x66, 0xa1, 0x2c, 0xdb, 0x02, 0x6f, 0x30, 0x0d, 0x62, 0xfd,
0xdc, 0x4a, 0xdb, 0xfa, 0x2f, 0x41, 0x87, 0x52, 0x0e, 0xc7, 0xbe, 0x1e, 0xb6, 0x1c, 0xfe, 0xcb, 0x20, 0xcd, 0x6e, 0xc9, 0xa2, 0x9e, 0x94, 0xa3, 0x49, 0xa0, 0x46, 0xbb, 0x0e, 0xff, 0xe1, 0x40,
0x81, 0x6a, 0x1f, 0xa3, 0x60, 0x85, 0xb9, 0x26, 0x22, 0x4f, 0xb5, 0x1c, 0x67, 0xc2, 0x13, 0x9b, 0x7d, 0x80, 0xd3, 0x70, 0x03, 0x5f, 0xad, 0xc8, 0x0b, 0x25, 0x27, 0x99, 0x70, 0xbb, 0x66, 0xdb,
0x35, 0xc1, 0x1d, 0x48, 0x92, 0x5d, 0x17, 0xee, 0x40, 0xce, 0x5f, 0x6d, 0x79, 0xe1, 0x6a, 0x49, 0x50, 0x1e, 0x4a, 0x92, 0xdd, 0x10, 0xe5, 0xa1, 0x5c, 0xbe, 0x5a, 0x77, 0xe5, 0x6a, 0x49, 0xb8,
0xb8, 0x1c, 0x2b, 0x8d, 0xc6, 0x90, 0xf0, 0x9a, 0xc8, 0x31, 0xdb, 0x81, 0xb5, 0x63, 0x0c, 0x62, 0x9c, 0xc4, 0x0a, 0xb5, 0x26, 0xe1, 0x9e, 0xc8, 0x31, 0x3b, 0x80, 0xea, 0x19, 0x86, 0x49, 0xec,
0xe5, 0x55, 0xc8, 0x31, 0x05, 0x6c, 0x17, 0x2a, 0xc7, 0x7a, 0x22, 0xe2, 0xc8, 0xab, 0x12, 0x9d, 0xd7, 0x68, 0x63, 0x0e, 0xd8, 0x21, 0xd4, 0xce, 0xd4, 0x4c, 0x24, 0x53, 0xbf, 0x4e, 0xe1, 0x14,
0x22, 0x7e, 0x00, 0xb5, 0x4b, 0x2d, 0x15, 0x6a, 0x3b, 0xc9, 0x87, 0xea, 0x14, 0x86, 0xba, 0x03, 0xf1, 0x13, 0xf0, 0x6e, 0x94, 0x8c, 0x51, 0x99, 0x59, 0x6e, 0xaa, 0x53, 0x30, 0xf5, 0x00, 0xaa,
0x6b, 0x57, 0xfe, 0x28, 0xce, 0x26, 0x3d, 0x05, 0xfc, 0x7b, 0xde, 0xb1, 0x61, 0x1d, 0xd8, 0xfc, 0xb7, 0xc1, 0x38, 0xc9, 0x9c, 0x9e, 0x03, 0xfe, 0x3d, 0xef, 0x58, 0xb3, 0x36, 0xec, 0x7c, 0xd0,
0x64, 0x30, 0x28, 0x2a, 0x76, 0xa8, 0xc4, 0x3c, 0xcd, 0x38, 0xac, 0x9f, 0xdc, 0x2b, 0xbc, 0xb1, 0x18, 0x16, 0x15, 0x3b, 0x54, 0x62, 0x39, 0xcc, 0x38, 0x6c, 0x9d, 0x7f, 0x8d, 0xf1, 0xde, 0x60,
0x18, 0xf4, 0xc3, 0x6f, 0xd3, 0x94, 0x25, 0xf1, 0x80, 0x63, 0x2f, 0x01, 0x52, 0x3d, 0x21, 0x1a, 0x38, 0x88, 0xbe, 0xcd, 0x53, 0x56, 0xc4, 0x93, 0x18, 0x7b, 0x0d, 0x90, 0xea, 0x89, 0x50, 0xfb,
0xaf, 0x44, 0x8f, 0x6b, 0x83, 0x9e, 0x45, 0x26, 0x53, 0x14, 0x02, 0xf8, 0x35, 0x80, 0xc0, 0x1b, 0x15, 0xfa, 0xb8, 0xfe, 0xa2, 0xcf, 0x22, 0x93, 0x29, 0x0a, 0x04, 0x7e, 0x07, 0x20, 0xf0, 0x1e,
0x0c, 0xbf, 0xe2, 0x2a, 0xc3, 0x7f, 0x01, 0xad, 0xa3, 0x11, 0xfa, 0x7a, 0x7e, 0x71, 0x6a, 0x62, 0xa3, 0x2f, 0xb8, 0x89, 0xf9, 0xaf, 0x60, 0xf7, 0x74, 0x8c, 0x81, 0x5a, 0x1e, 0x1c, 0x4f, 0xac,
0x81, 0xe7, 0xeb, 0x85, 0xcc, 0x86, 0x0f, 0x61, 0xfb, 0x18, 0x8d, 0xd5, 0x72, 0x92, 0xbd, 0x82, 0xc4, 0xf9, 0x56, 0x21, 0xb3, 0xe6, 0x23, 0xd8, 0x3f, 0x43, 0x6d, 0x94, 0x9c, 0x65, 0x5f, 0xc1,
0x55, 0xb6, 0x88, 0x1d, 0x40, 0x3d, 0x8f, 0xf7, 0xdc, 0xa5, 0x9b, 0x32, 0x0b, 0xe4, 0x9f, 0x81, 0x26, 0x53, 0xc4, 0x4e, 0xa0, 0x91, 0xf3, 0xfd, 0xf2, 0xb3, 0x93, 0xb2, 0x20, 0xf2, 0x8f, 0xc0,
0xcd, 0x15, 0x4b, 0x97, 0x2e, 0x83, 0x54, 0x69, 0xc9, 0xd2, 0x65, 0x71, 0xc9, 0xed, 0x9d, 0x68, 0x96, 0x8a, 0xa5, 0x43, 0x97, 0x41, 0xaa, 0xf4, 0xcc, 0xd0, 0x65, 0x3c, 0x7b, 0x7b, 0xe7, 0x4a,
0x2d, 0x75, 0x76, 0x7b, 0x04, 0xf8, 0x87, 0xc7, 0x9a, 0x49, 0x3e, 0x53, 0xd5, 0x64, 0x00, 0x23, 0x49, 0x95, 0xdd, 0x1e, 0x01, 0xde, 0x5f, 0xd7, 0x8c, 0x7d, 0xa6, 0xea, 0xd6, 0x80, 0xb1, 0xc9,
0x9b, 0x2d, 0xf5, 0xff, 0x94, 0x7f, 0x51, 0x8a, 0xc8, 0xe2, 0xf8, 0x4f, 0x07, 0xf6, 0x67, 0x0e, 0x86, 0xfa, 0x6f, 0xca, 0xbf, 0x2a, 0x45, 0x64, 0x3c, 0xfe, 0xd3, 0x81, 0x03, 0x81, 0xf1, 0x38,
0x35, 0x0a, 0x6f, 0x68, 0x79, 0xfa, 0xd6, 0xb7, 0xf1, 0x4a, 0x03, 0xda, 0x2b, 0x34, 0x35, 0xd5, 0xba, 0xa7, 0xa1, 0x39, 0x4d, 0x94, 0x96, 0x6a, 0x13, 0x63, 0x8e, 0xa1, 0xf2, 0x19, 0x0d, 0xc9,
0x38, 0x13, 0x7f, 0x00, 0xae, 0x54, 0xb4, 0x16, 0xcd, 0xde, 0x33, 0x92, 0xb2, 0xac, 0x54, 0xf7, 0x6a, 0x76, 0xff, 0xa7, 0x3a, 0xeb, 0xf2, 0x74, 0x2e, 0xd1, 0x5c, 0xc7, 0xfd, 0x92, 0xb0, 0x6c,
0x42, 0x09, 0x57, 0x2a, 0xde, 0x06, 0xf7, 0x42, 0xb1, 0x2a, 0x94, 0xce, 0x30, 0xd9, 0xd4, 0x2d, 0x7b, 0x48, 0xa3, 0xa1, 0x41, 0x79, 0xf6, 0xd0, 0x20, 0x3b, 0xa4, 0xd1, 0x1c, 0xd5, 0xa1, 0x4a,
0xd8, 0xe8, 0x63, 0x7e, 0x00, 0x83, 0x96, 0xc3, 0x7f, 0x2c, 0x17, 0x6d, 0xd8, 0x7b, 0xa8, 0x18, 0x49, 0x8e, 0x5e, 0x40, 0x95, 0x36, 0xec, 0xf0, 0xe4, 0x46, 0xce, 0x7d, 0xc9, 0x71, 0xcf, 0x85,
0x02, 0xe9, 0x67, 0xa9, 0xf3, 0xaf, 0xe2, 0xa6, 0x9b, 0x5a, 0xe9, 0x39, 0xfe, 0x0e, 0x2a, 0x53, 0xb2, 0x8c, 0xf9, 0xd5, 0xda, 0xae, 0xec, 0x68, 0xcd, 0x5f, 0x18, 0xdb, 0x8f, 0xdb, 0x2f, 0xa5,
0x86, 0x6d, 0x42, 0xe3, 0x5c, 0x46, 0x78, 0x1f, 0x1a, 0x8b, 0x51, 0x2a, 0xe8, 0x5c, 0x3e, 0x10, 0x6f, 0xcc, 0xe1, 0x13, 0x93, 0xfb, 0xa5, 0xd4, 0xe6, 0x9e, 0x07, 0xb5, 0xb9, 0x4f, 0x9f, 0x6a,
0xc4, 0x9a, 0xc9, 0x53, 0xcb, 0xb1, 0xfb, 0xa5, 0x42, 0xff, 0x32, 0xaf, 0xff, 0x04, 0x00, 0x00, 0xf4, 0xb7, 0x71, 0xfc, 0x3b, 0x00, 0x00, 0xff, 0xff, 0x02, 0x35, 0xe7, 0x43, 0x43, 0x06, 0x00,
0xff, 0xff, 0x3a, 0x00, 0x8b, 0x6e, 0x72, 0x06, 0x00, 0x00, 0x00,
} }

View File

@ -98,21 +98,21 @@ message DestroySnapshotsRes {
repeated DestroySnapshotRes Results = 1; repeated DestroySnapshotRes Results = 1;
} }
message SnapshotReplicationStatusReq { message ReplicationCursorReq {
string Filesystem = 1; string Filesystem = 1;
message GetOp {}
message SetOp {
string Snapshot = 2; string Snapshot = 2;
enum Op {
Get = 0;
SetReplicated = 1;
} }
Op op = 3; oneof op {
GetOp get = 2;
SetOp set = 3;
}
} }
message SnapshotReplicationStatusRes { message ReplicationCursorRes {
enum Status { oneof Result {
Nonexistent = 0; uint64 Guid = 1;
NotReplicated = 1; string Error = 2;
Replicated = 2;
} }
Status status = 1;
} }

View File

@ -1,25 +1,58 @@
package zfs package zfs
const ReplicatedProperty = "zrepl:replicated" import (
"fmt"
"github.com/pkg/errors"
"strconv"
)
// May return *DatasetDoesNotExist as an error const ReplicationCursorBookmarkName = "zrepl_replication_cursor"
func ZFSGetReplicatedProperty(fs *DatasetPath, v *FilesystemVersion) (replicated bool, err error) {
props, err := zfsGet(v.ToAbsPath(fs), []string{ReplicatedProperty}) // may return nil for both values, indicating there is no cursor
func ZFSGetReplicationCursor(fs *DatasetPath) (*FilesystemVersion, error) {
versions, err := ZFSListFilesystemVersions(fs, nil)
if err != nil { if err != nil {
return false, err return nil, err
} }
if props.Get(ReplicatedProperty) == "yes" { for _, v := range versions {
return true, nil if v.Type == Bookmark && v.Name == ReplicationCursorBookmarkName {
return &v, nil
} }
return false, nil }
return nil, nil
} }
func ZFSSetReplicatedProperty(fs *DatasetPath, v *FilesystemVersion, replicated bool) error { func ZFSSetReplicationCursor(fs *DatasetPath, snapname string) (guid uint64, err error) {
val := "no" snapPath := fmt.Sprintf("%s@%s", fs.ToString(), snapname)
if replicated { propsSnap, err := zfsGet(snapPath, []string{"createtxg", "guid"})
val = "yes" if err != nil {
return 0, err
} }
props := NewZFSProperties() snapGuid, err := strconv.ParseUint(propsSnap.Get("guid"), 10, 64)
props.Set(ReplicatedProperty, val) bookmarkPath := fmt.Sprintf("%s#%s", fs.ToString(), ReplicationCursorBookmarkName)
return zfsSet(v.ToAbsPath(fs), props) propsBookmark, err := zfsGet(bookmarkPath, []string{"createtxg"})
_, bookmarkNotExistErr := err.(*DatasetDoesNotExist)
if err != nil && !bookmarkNotExistErr {
return 0, err
}
if err == nil {
bookmarkTxg, err := strconv.ParseUint(propsBookmark.Get("createtxg"), 10, 64)
if err != nil {
return 0, errors.Wrap(err, "cannot parse bookmark createtxg")
}
snapTxg, err := strconv.ParseUint(propsSnap.Get("createtxg"), 10, 64)
if err != nil {
return 0, errors.Wrap(err, "cannot parse snapshot createtxg")
}
if snapTxg < bookmarkTxg {
return 0, errors.New("replication cursor can only be advanced, not set back")
}
if err := ZFSDestroy(bookmarkPath); err != nil { // FIXME make safer by using new temporary bookmark, then rename, possible with channel programs
return 0, err
}
}
if err := ZFSBookmark(fs, snapname, ReplicationCursorBookmarkName); err != nil {
return 0, err
}
return snapGuid, nil
} }

View File

@ -490,7 +490,7 @@ func ZFSGet(fs *DatasetPath, props []string) (*ZFSProperties, error) {
return zfsGet(fs.ToString(), props) return zfsGet(fs.ToString(), props)
} }
var zfsGetDatasetDoesNotExistRegexp = regexp.MustCompile(`^cannot open '(\S+)': dataset does not exist`) var zfsGetDatasetDoesNotExistRegexp = regexp.MustCompile(`^cannot open '(\S+)': (dataset does not exist|no such pool or dataset)`)
type DatasetDoesNotExist struct { type DatasetDoesNotExist struct {
Path string Path string