From 975fdee217edccff693e997f3731a59f416826ea Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 5 Sep 2018 18:24:15 -0700 Subject: [PATCH] replication & pruning: ditch replicated-property, use bookmark as cursor instead A bookmark with a well-known name is used to track which version was last successfully received by the receiver. The createtxg that can be retrieved from the bookmark using `zfs get` is used to set the Replicated attribute of each snap on the sender: If the snap's CreateTXG > the cursor's, it is not yet replicated, otherwise it has been. There is an optional config option to change the behvior to `CreateTXG >= the cursor's`, and the implementation defaults to that. The reason: While things work just fine with `CreateTXG > the cursor's`, ZFS does not provide size estimates in a `zfs send` dry run (see acd2418). However, to enable the use case of keeping the snapshot only around for the replication, the config flag exists. --- config/config.go | 1 + daemon/pruner/pruner.go | 106 +++--- endpoint/endpoint.go | 61 ++-- replication/fsrep/fsfsm.go | 21 +- replication/internal/diff/diff.go | 7 +- replication/pdu/pdu.pb.go | 568 +++++++++++++++++++++--------- replication/pdu/pdu.proto | 24 +- zfs/replication_history.go | 63 +++- zfs/zfs.go | 2 +- 9 files changed, 559 insertions(+), 294 deletions(-) diff --git a/config/config.go b/config/config.go index 32b3b4d..08262a9 100644 --- a/config/config.go +++ b/config/config.go @@ -205,6 +205,7 @@ type PruningEnum struct { type PruneKeepNotReplicated struct { Type string `yaml:"type"` + KeepSnapshotAtCursor bool `yaml:"keep_snapshot_at_cursor,optional,default=true"` } type PruneKeepLastN struct { diff --git a/daemon/pruner/pruner.go b/daemon/pruner/pruner.go index f8e3ab2..e3d5c8c 100644 --- a/daemon/pruner/pruner.go +++ b/daemon/pruner/pruner.go @@ -9,13 +9,14 @@ import ( "github.com/zrepl/zrepl/pruning" "github.com/zrepl/zrepl/replication/pdu" "net" + "sort" "sync" "time" ) // Try to keep it compatible with gitub.com/zrepl/zrepl/replication.Endpoint type History interface { - SnapshotReplicationStatus(ctx context.Context, req *pdu.SnapshotReplicationStatusReq) (*pdu.SnapshotReplicationStatusRes, error) + ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) } type Target interface { @@ -42,11 +43,12 @@ func GetLogger(ctx context.Context) Logger { } type args struct { - ctx context.Context - target Target - receiver History - rules []pruning.KeepRule - retryWait time.Duration + ctx context.Context + target Target + receiver History + rules []pruning.KeepRule + retryWait time.Duration + considerSnapAtCursorReplicated bool } type Pruner struct { @@ -66,9 +68,10 @@ type Pruner struct { } type PrunerFactory struct { - senderRules []pruning.KeepRule - receiverRules []pruning.KeepRule - retryWait time.Duration + senderRules []pruning.KeepRule + receiverRules []pruning.KeepRule + retryWait time.Duration + considerSnapAtCursorReplicated bool } func checkContainsKeep1(rules []pruning.KeepRule) error { @@ -95,14 +98,19 @@ func NewPrunerFactory(in config.PruningSenderReceiver) (*PrunerFactory, error) { return nil, errors.Wrap(err, "cannot build sender pruning rules") } - if err := checkContainsKeep1(keepRulesSender); err != nil { - return nil, err + considerSnapAtCursorReplicated := false + for _, r := range in.KeepSender { + knr, ok := r.Ret.(*config.PruneKeepNotReplicated) + if !ok { + continue + } + considerSnapAtCursorReplicated = considerSnapAtCursorReplicated || !knr.KeepSnapshotAtCursor } - f := &PrunerFactory{ keepRulesSender, keepRulesReceiver, 10 * time.Second, //FIXME constant + considerSnapAtCursorReplicated, } return f, nil } @@ -115,6 +123,7 @@ func (f *PrunerFactory) BuildSenderPruner(ctx context.Context, target Target, re receiver, f.senderRules, f.retryWait, + f.considerSnapAtCursorReplicated, }, state: Plan, } @@ -129,6 +138,7 @@ func (f *PrunerFactory) BuildReceiverPruner(ctx context.Context, target Target, receiver, f.receiverRules, f.retryWait, + false, // senseless here anyways }, state: Plan, } @@ -254,56 +264,66 @@ func statePlan(a *args, u updater) state { } pfss := make([]*fs, len(tfss)) +fsloop: for i, tfs := range tfss { + + l := GetLogger(ctx).WithField("fs", tfs.Path) + l.Debug("plan filesystem") + tfsvs, err := target.ListFilesystemVersions(ctx, tfs.Path) if err != nil { + l.WithError(err).Error("cannot list filesystem versions") return onErr(u, err) } + rcReq := &pdu.ReplicationCursorReq{ + Filesystem: tfs.Path, + Op: &pdu.ReplicationCursorReq_Get{}, + } + rc, err := receiver.ReplicationCursor(ctx, rcReq) + if err != nil { + l.WithError(err).Error("cannot get replication cursor") + return onErr(u, err) + } + if rc.GetError() != "" { + l.WithField("reqErr", rc.GetError()).Error("cannot get replication cursor") + return onErr(u, fmt.Errorf("%s", rc.GetError())) + } + pfs := &fs{ path: tfs.Path, snaps: make([]pruning.Snapshot, 0, len(tfsvs)), } + pfss[i] = pfs + // scan from older to newer, all snapshots older than cursor are interpreted as replicated + sort.Slice(tfsvs, func(i, j int) bool { + return tfsvs[i].CreateTXG < tfsvs[j].CreateTXG + }) + preCursor := true for _, tfsv := range tfsvs { if tfsv.Type != pdu.FilesystemVersion_Snapshot { continue } creation, err := tfsv.CreationAsTime() if err != nil { - return onErr(u, fmt.Errorf("%s%s has invalid creation date: %s", tfs, tfsv.RelName(), err)) - } - req := pdu.SnapshotReplicationStatusReq{ - Filesystem: tfs.Path, - Snapshot: tfsv.Name, - Op: pdu.SnapshotReplicationStatusReq_Get, - } - res, err := receiver.SnapshotReplicationStatus(ctx, &req) - if err != nil { - GetLogger(ctx). - WithField("req", req.String()). - WithError(err).Error("cannot get snapshot replication status") - } - if err != nil && shouldRetry(err) { - return onErr(u, err) - } else if err != nil { - pfs.err = err - pfs.snaps = nil - break - } - if res.Status == pdu.SnapshotReplicationStatusRes_Nonexistent { - GetLogger(ctx). - Debug("snapshot does not exist in history, assuming was replicated") + pfs.err = fmt.Errorf("%s%s has invalid creation date: %s", tfs, tfsv.RelName(), err) + l.WithError(pfs.err).Error("") + continue fsloop } + atCursor := tfsv.Guid == rc.GetGuid() + preCursor = preCursor && !atCursor pfs.snaps = append(pfs.snaps, snapshot{ - replicated: !(res.Status != pdu.SnapshotReplicationStatusRes_Replicated), + replicated: preCursor || (a.considerSnapAtCursorReplicated && atCursor), date: creation, fsv: tfsv, }) - } - - pfss[i] = pfs + if preCursor { + pfs.err = fmt.Errorf("replication cursor not found in prune target filesystem versions") + l.WithError(pfs.err).Error("") + continue fsloop + } } @@ -324,7 +344,13 @@ func stateExec(a *args, u updater) state { var pfs *fs state := u(func(pruner *Pruner) { if len(pruner.prunePending) == 0 { - pruner.state = Done + nextState := Done + for _, pfs := range pruner.pruneCompleted { + if pfs.err != nil { + nextState = ErrPerm + } + } + pruner.state = nextState return } pfs = pruner.prunePending[0] diff --git a/endpoint/endpoint.go b/endpoint/endpoint.go index c6c5b37..bd3e2f7 100644 --- a/endpoint/endpoint.go +++ b/endpoint/endpoint.go @@ -109,10 +109,7 @@ func (p *Sender) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshots return doDestroySnapshots(ctx, dp, req.Snapshots) } -// Since replication always happens from sender to receiver, this method is only ipmlemented for the sender. -// If this method returns a *zfs.DatasetDoesNotExist as an error, it might be a good indicator -// that something is wrong with the pruning logic, which is the only consumer of this method. -func (p *Sender) SnapshotReplicationStatus(ctx context.Context, req *pdu.SnapshotReplicationStatusReq) (*pdu.SnapshotReplicationStatusRes, error) { +func (p *Sender) ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) { dp, err := zfs.NewDatasetPath(req.Filesystem) if err != nil { return nil, err @@ -125,35 +122,25 @@ func (p *Sender) SnapshotReplicationStatus(ctx context.Context, req *pdu.Snapsho return nil, replication.NewFilteredError(req.Filesystem) } - version := zfs.FilesystemVersion{ - Type: zfs.Snapshot, - Name: req.Snapshot, //FIXME validation - } - - var status pdu.SnapshotReplicationStatusRes_Status - switch req.Op { - case pdu.SnapshotReplicationStatusReq_Get: - replicated, err := zfs.ZFSGetReplicatedProperty(dp, &version) - if _, ok := err.(*zfs.DatasetDoesNotExist); ok { - status = pdu.SnapshotReplicationStatusRes_Nonexistent - } else if err != nil { - - } - if replicated { - status = pdu.SnapshotReplicationStatusRes_Replicated - } else { - status = pdu.SnapshotReplicationStatusRes_NotReplicated - } - case pdu.SnapshotReplicationStatusReq_SetReplicated: - err = zfs.ZFSSetReplicatedProperty(dp, &version, true) + switch op := req.Op.(type) { + case *pdu.ReplicationCursorReq_Get: + cursor, err := zfs.ZFSGetReplicationCursor(dp) if err != nil { return nil, err } - status = pdu.SnapshotReplicationStatusRes_Replicated + if cursor == nil { + return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Error{Error: "cursor does not exist"}}, nil + } + return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Guid{cursor.Guid}}, nil + case *pdu.ReplicationCursorReq_Set: + guid, err := zfs.ZFSSetReplicationCursor(dp, op.Set.Snapshot) + if err != nil { + return nil, err + } + return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Guid{Guid: guid}}, nil default: - return nil, errors.Errorf("unknown opcode %v", req.Op) + return nil, errors.Errorf("unknown op %T", op) } - return &pdu.SnapshotReplicationStatusRes{Status: status}, nil } type FSFilter interface { @@ -347,12 +334,12 @@ func doDestroySnapshots(ctx context.Context, lp *zfs.DatasetPath, snaps []*pdu.F // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= const ( - RPCListFilesystems = "ListFilesystems" - RPCListFilesystemVersions = "ListFilesystemVersions" - RPCReceive = "Receive" - RPCSend = "Send" - RPCSDestroySnapshots = "DestroySnapshots" - RPCSnapshotReplicationStatus = "SnapshotReplicationStatus" + RPCListFilesystems = "ListFilesystems" + RPCListFilesystemVersions = "ListFilesystemVersions" + RPCReceive = "Receive" + RPCSend = "Send" + RPCSDestroySnapshots = "DestroySnapshots" + RPCReplicationCursor = "ReplicationCursor" ) // Remote implements an endpoint stub that uses streamrpc as a transport. @@ -578,18 +565,18 @@ func (a *Handler) Handle(ctx context.Context, endpoint string, reqStructured *by } return bytes.NewBuffer(b), nil, nil - case RPCSnapshotReplicationStatus: + case RPCReplicationCursor: sender, ok := a.ep.(replication.Sender) if !ok { goto Err } - var req pdu.SnapshotReplicationStatusReq + var req pdu.ReplicationCursorReq if err := proto.Unmarshal(reqStructured.Bytes(), &req); err != nil { return nil, nil, err } - res, err := sender.SnapshotReplicationStatus(ctx, &req) + res, err := sender.ReplicationCursor(ctx, &req) if err != nil { return nil, nil, err } diff --git a/replication/fsrep/fsfsm.go b/replication/fsrep/fsfsm.go index 2b992c7..31dcdc1 100644 --- a/replication/fsrep/fsfsm.go +++ b/replication/fsrep/fsfsm.go @@ -42,7 +42,7 @@ type Sender interface { // any next call to the parent github.com/zrepl/zrepl/replication.Endpoint. // If the send request is for dry run the io.ReadCloser will be nil Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) - SnapshotReplicationStatus(ctx context.Context, r *pdu.SnapshotReplicationStatusReq) (*pdu.SnapshotReplicationStatusRes, error) + ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) } // A Sender is usually part of a github.com/zrepl/zrepl/replication.Endpoint. @@ -423,19 +423,22 @@ func (s *ReplicationStep) doMarkReplicated(ctx context.Context, sender Sender) S return s.state } - log.Debug("mark snapshot as replicated") - req := pdu.SnapshotReplicationStatusReq{ + log.Debug("advance replication cursor") + req := &pdu.ReplicationCursorReq{ Filesystem: s.parent.fs, - Snapshot: s.to.GetName(), - Op: pdu.SnapshotReplicationStatusReq_SetReplicated, + Op: &pdu.ReplicationCursorReq_Set{ + Set: &pdu.ReplicationCursorReq_SetOp{ + Snapshot: s.to.GetName(), + }, + }, } - res, err := sender.SnapshotReplicationStatus(ctx, &req) + res, err := sender.ReplicationCursor(ctx, req) if err != nil { - log.WithError(err).Error("error marking snapshot as replicated") + log.WithError(err).Error("error advancing replication cursor") return updateStateError(err) } - if res.Status != pdu.SnapshotReplicationStatusRes_Replicated { - err := fmt.Errorf("sender did not report snapshot as replicated: %s", res.Status) + if res.GetError() != "" { + err := fmt.Errorf("cannot advance replication cursor: %s", res.GetError()) log.Error(err.Error()) return updateStateError(err) } diff --git a/replication/internal/diff/diff.go b/replication/internal/diff/diff.go index c76f5de..6af5246 100644 --- a/replication/internal/diff/diff.go +++ b/replication/internal/diff/diff.go @@ -59,10 +59,9 @@ func IncrementalPath(receiver, sender []*FilesystemVersion) (incPath []*Filesyst for mrcaRcv >= 0 && mrcaSnd >= 0 { if receiver[mrcaRcv].Guid == sender[mrcaSnd].Guid { - if mrcaSnd-1 >= 0 && sender[mrcaSnd-1].Guid == sender[mrcaSnd].Guid && sender[mrcaSnd-1].Type == FilesystemVersion_Bookmark { - // prefer bookmarks over snapshots as the snapshot might go away sooner - mrcaSnd -= 1 - } + // Since we arrive from the end of the array, and because we defined bookmark < snapshot, + // this condition will match snapshot first, which is what we want because it gives us + // size estimation break } receiverCreation, err := receiver[mrcaRcv].CreationAsTime() diff --git a/replication/pdu/pdu.pb.go b/replication/pdu/pdu.pb.go index 52f596e..1171e07 100644 --- a/replication/pdu/pdu.pb.go +++ b/replication/pdu/pdu.pb.go @@ -38,56 +38,7 @@ func (x FilesystemVersion_VersionType) String() string { return proto.EnumName(FilesystemVersion_VersionType_name, int32(x)) } func (FilesystemVersion_VersionType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{5, 0} -} - -type SnapshotReplicationStatusReq_Op int32 - -const ( - SnapshotReplicationStatusReq_Get SnapshotReplicationStatusReq_Op = 0 - SnapshotReplicationStatusReq_SetReplicated SnapshotReplicationStatusReq_Op = 1 -) - -var SnapshotReplicationStatusReq_Op_name = map[int32]string{ - 0: "Get", - 1: "SetReplicated", -} -var SnapshotReplicationStatusReq_Op_value = map[string]int32{ - "Get": 0, - "SetReplicated": 1, -} - -func (x SnapshotReplicationStatusReq_Op) String() string { - return proto.EnumName(SnapshotReplicationStatusReq_Op_name, int32(x)) -} -func (SnapshotReplicationStatusReq_Op) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{14, 0} -} - -type SnapshotReplicationStatusRes_Status int32 - -const ( - SnapshotReplicationStatusRes_Nonexistent SnapshotReplicationStatusRes_Status = 0 - SnapshotReplicationStatusRes_NotReplicated SnapshotReplicationStatusRes_Status = 1 - SnapshotReplicationStatusRes_Replicated SnapshotReplicationStatusRes_Status = 2 -) - -var SnapshotReplicationStatusRes_Status_name = map[int32]string{ - 0: "Nonexistent", - 1: "NotReplicated", - 2: "Replicated", -} -var SnapshotReplicationStatusRes_Status_value = map[string]int32{ - "Nonexistent": 0, - "NotReplicated": 1, - "Replicated": 2, -} - -func (x SnapshotReplicationStatusRes_Status) String() string { - return proto.EnumName(SnapshotReplicationStatusRes_Status_name, int32(x)) -} -func (SnapshotReplicationStatusRes_Status) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{15, 0} + return fileDescriptor_pdu_cbdc4740ab26577c, []int{5, 0} } type ListFilesystemReq struct { @@ -100,7 +51,7 @@ func (m *ListFilesystemReq) Reset() { *m = ListFilesystemReq{} } func (m *ListFilesystemReq) String() string { return proto.CompactTextString(m) } func (*ListFilesystemReq) ProtoMessage() {} func (*ListFilesystemReq) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{0} + return fileDescriptor_pdu_cbdc4740ab26577c, []int{0} } func (m *ListFilesystemReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ListFilesystemReq.Unmarshal(m, b) @@ -131,7 +82,7 @@ func (m *ListFilesystemRes) Reset() { *m = ListFilesystemRes{} } func (m *ListFilesystemRes) String() string { return proto.CompactTextString(m) } func (*ListFilesystemRes) ProtoMessage() {} func (*ListFilesystemRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{1} + return fileDescriptor_pdu_cbdc4740ab26577c, []int{1} } func (m *ListFilesystemRes) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ListFilesystemRes.Unmarshal(m, b) @@ -170,7 +121,7 @@ func (m *Filesystem) Reset() { *m = Filesystem{} } func (m *Filesystem) String() string { return proto.CompactTextString(m) } func (*Filesystem) ProtoMessage() {} func (*Filesystem) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{2} + return fileDescriptor_pdu_cbdc4740ab26577c, []int{2} } func (m *Filesystem) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Filesystem.Unmarshal(m, b) @@ -215,7 +166,7 @@ func (m *ListFilesystemVersionsReq) Reset() { *m = ListFilesystemVersion func (m *ListFilesystemVersionsReq) String() string { return proto.CompactTextString(m) } func (*ListFilesystemVersionsReq) ProtoMessage() {} func (*ListFilesystemVersionsReq) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{3} + return fileDescriptor_pdu_cbdc4740ab26577c, []int{3} } func (m *ListFilesystemVersionsReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ListFilesystemVersionsReq.Unmarshal(m, b) @@ -253,7 +204,7 @@ func (m *ListFilesystemVersionsRes) Reset() { *m = ListFilesystemVersion func (m *ListFilesystemVersionsRes) String() string { return proto.CompactTextString(m) } func (*ListFilesystemVersionsRes) ProtoMessage() {} func (*ListFilesystemVersionsRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{4} + return fileDescriptor_pdu_cbdc4740ab26577c, []int{4} } func (m *ListFilesystemVersionsRes) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ListFilesystemVersionsRes.Unmarshal(m, b) @@ -295,7 +246,7 @@ func (m *FilesystemVersion) Reset() { *m = FilesystemVersion{} } func (m *FilesystemVersion) String() string { return proto.CompactTextString(m) } func (*FilesystemVersion) ProtoMessage() {} func (*FilesystemVersion) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{5} + return fileDescriptor_pdu_cbdc4740ab26577c, []int{5} } func (m *FilesystemVersion) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_FilesystemVersion.Unmarshal(m, b) @@ -375,7 +326,7 @@ func (m *SendReq) Reset() { *m = SendReq{} } func (m *SendReq) String() string { return proto.CompactTextString(m) } func (*SendReq) ProtoMessage() {} func (*SendReq) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{6} + return fileDescriptor_pdu_cbdc4740ab26577c, []int{6} } func (m *SendReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_SendReq.Unmarshal(m, b) @@ -456,7 +407,7 @@ func (m *Property) Reset() { *m = Property{} } func (m *Property) String() string { return proto.CompactTextString(m) } func (*Property) ProtoMessage() {} func (*Property) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{7} + return fileDescriptor_pdu_cbdc4740ab26577c, []int{7} } func (m *Property) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Property.Unmarshal(m, b) @@ -506,7 +457,7 @@ func (m *SendRes) Reset() { *m = SendRes{} } func (m *SendRes) String() string { return proto.CompactTextString(m) } func (*SendRes) ProtoMessage() {} func (*SendRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{8} + return fileDescriptor_pdu_cbdc4740ab26577c, []int{8} } func (m *SendRes) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_SendRes.Unmarshal(m, b) @@ -560,7 +511,7 @@ func (m *ReceiveReq) Reset() { *m = ReceiveReq{} } func (m *ReceiveReq) String() string { return proto.CompactTextString(m) } func (*ReceiveReq) ProtoMessage() {} func (*ReceiveReq) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{9} + return fileDescriptor_pdu_cbdc4740ab26577c, []int{9} } func (m *ReceiveReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ReceiveReq.Unmarshal(m, b) @@ -604,7 +555,7 @@ func (m *ReceiveRes) Reset() { *m = ReceiveRes{} } func (m *ReceiveRes) String() string { return proto.CompactTextString(m) } func (*ReceiveRes) ProtoMessage() {} func (*ReceiveRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{10} + return fileDescriptor_pdu_cbdc4740ab26577c, []int{10} } func (m *ReceiveRes) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ReceiveRes.Unmarshal(m, b) @@ -637,7 +588,7 @@ func (m *DestroySnapshotsReq) Reset() { *m = DestroySnapshotsReq{} } func (m *DestroySnapshotsReq) String() string { return proto.CompactTextString(m) } func (*DestroySnapshotsReq) ProtoMessage() {} func (*DestroySnapshotsReq) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{11} + return fileDescriptor_pdu_cbdc4740ab26577c, []int{11} } func (m *DestroySnapshotsReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_DestroySnapshotsReq.Unmarshal(m, b) @@ -683,7 +634,7 @@ func (m *DestroySnapshotRes) Reset() { *m = DestroySnapshotRes{} } func (m *DestroySnapshotRes) String() string { return proto.CompactTextString(m) } func (*DestroySnapshotRes) ProtoMessage() {} func (*DestroySnapshotRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{12} + return fileDescriptor_pdu_cbdc4740ab26577c, []int{12} } func (m *DestroySnapshotRes) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_DestroySnapshotRes.Unmarshal(m, b) @@ -728,7 +679,7 @@ func (m *DestroySnapshotsRes) Reset() { *m = DestroySnapshotsRes{} } func (m *DestroySnapshotsRes) String() string { return proto.CompactTextString(m) } func (*DestroySnapshotsRes) ProtoMessage() {} func (*DestroySnapshotsRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{13} + return fileDescriptor_pdu_cbdc4740ab26577c, []int{13} } func (m *DestroySnapshotsRes) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_DestroySnapshotsRes.Unmarshal(m, b) @@ -755,96 +706,361 @@ func (m *DestroySnapshotsRes) GetResults() []*DestroySnapshotRes { return nil } -type SnapshotReplicationStatusReq struct { - Filesystem string `protobuf:"bytes,1,opt,name=Filesystem,proto3" json:"Filesystem,omitempty"` - Snapshot string `protobuf:"bytes,2,opt,name=Snapshot,proto3" json:"Snapshot,omitempty"` - Op SnapshotReplicationStatusReq_Op `protobuf:"varint,3,opt,name=op,proto3,enum=pdu.SnapshotReplicationStatusReq_Op" json:"op,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` +type ReplicationCursorReq struct { + Filesystem string `protobuf:"bytes,1,opt,name=Filesystem,proto3" json:"Filesystem,omitempty"` + // Types that are valid to be assigned to Op: + // *ReplicationCursorReq_Get + // *ReplicationCursorReq_Set + Op isReplicationCursorReq_Op `protobuf_oneof:"op"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } -func (m *SnapshotReplicationStatusReq) Reset() { *m = SnapshotReplicationStatusReq{} } -func (m *SnapshotReplicationStatusReq) String() string { return proto.CompactTextString(m) } -func (*SnapshotReplicationStatusReq) ProtoMessage() {} -func (*SnapshotReplicationStatusReq) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{14} +func (m *ReplicationCursorReq) Reset() { *m = ReplicationCursorReq{} } +func (m *ReplicationCursorReq) String() string { return proto.CompactTextString(m) } +func (*ReplicationCursorReq) ProtoMessage() {} +func (*ReplicationCursorReq) Descriptor() ([]byte, []int) { + return fileDescriptor_pdu_cbdc4740ab26577c, []int{14} } -func (m *SnapshotReplicationStatusReq) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_SnapshotReplicationStatusReq.Unmarshal(m, b) +func (m *ReplicationCursorReq) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ReplicationCursorReq.Unmarshal(m, b) } -func (m *SnapshotReplicationStatusReq) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_SnapshotReplicationStatusReq.Marshal(b, m, deterministic) +func (m *ReplicationCursorReq) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ReplicationCursorReq.Marshal(b, m, deterministic) } -func (dst *SnapshotReplicationStatusReq) XXX_Merge(src proto.Message) { - xxx_messageInfo_SnapshotReplicationStatusReq.Merge(dst, src) +func (dst *ReplicationCursorReq) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReplicationCursorReq.Merge(dst, src) } -func (m *SnapshotReplicationStatusReq) XXX_Size() int { - return xxx_messageInfo_SnapshotReplicationStatusReq.Size(m) +func (m *ReplicationCursorReq) XXX_Size() int { + return xxx_messageInfo_ReplicationCursorReq.Size(m) } -func (m *SnapshotReplicationStatusReq) XXX_DiscardUnknown() { - xxx_messageInfo_SnapshotReplicationStatusReq.DiscardUnknown(m) +func (m *ReplicationCursorReq) XXX_DiscardUnknown() { + xxx_messageInfo_ReplicationCursorReq.DiscardUnknown(m) } -var xxx_messageInfo_SnapshotReplicationStatusReq proto.InternalMessageInfo +var xxx_messageInfo_ReplicationCursorReq proto.InternalMessageInfo -func (m *SnapshotReplicationStatusReq) GetFilesystem() string { +func (m *ReplicationCursorReq) GetFilesystem() string { if m != nil { return m.Filesystem } return "" } -func (m *SnapshotReplicationStatusReq) GetSnapshot() string { +type isReplicationCursorReq_Op interface { + isReplicationCursorReq_Op() +} + +type ReplicationCursorReq_Get struct { + Get *ReplicationCursorReq_GetOp `protobuf:"bytes,2,opt,name=get,proto3,oneof"` +} + +type ReplicationCursorReq_Set struct { + Set *ReplicationCursorReq_SetOp `protobuf:"bytes,3,opt,name=set,proto3,oneof"` +} + +func (*ReplicationCursorReq_Get) isReplicationCursorReq_Op() {} + +func (*ReplicationCursorReq_Set) isReplicationCursorReq_Op() {} + +func (m *ReplicationCursorReq) GetOp() isReplicationCursorReq_Op { + if m != nil { + return m.Op + } + return nil +} + +func (m *ReplicationCursorReq) GetGet() *ReplicationCursorReq_GetOp { + if x, ok := m.GetOp().(*ReplicationCursorReq_Get); ok { + return x.Get + } + return nil +} + +func (m *ReplicationCursorReq) GetSet() *ReplicationCursorReq_SetOp { + if x, ok := m.GetOp().(*ReplicationCursorReq_Set); ok { + return x.Set + } + return nil +} + +// XXX_OneofFuncs is for the internal use of the proto package. +func (*ReplicationCursorReq) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { + return _ReplicationCursorReq_OneofMarshaler, _ReplicationCursorReq_OneofUnmarshaler, _ReplicationCursorReq_OneofSizer, []interface{}{ + (*ReplicationCursorReq_Get)(nil), + (*ReplicationCursorReq_Set)(nil), + } +} + +func _ReplicationCursorReq_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { + m := msg.(*ReplicationCursorReq) + // op + switch x := m.Op.(type) { + case *ReplicationCursorReq_Get: + b.EncodeVarint(2<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Get); err != nil { + return err + } + case *ReplicationCursorReq_Set: + b.EncodeVarint(3<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Set); err != nil { + return err + } + case nil: + default: + return fmt.Errorf("ReplicationCursorReq.Op has unexpected type %T", x) + } + return nil +} + +func _ReplicationCursorReq_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { + m := msg.(*ReplicationCursorReq) + switch tag { + case 2: // op.get + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(ReplicationCursorReq_GetOp) + err := b.DecodeMessage(msg) + m.Op = &ReplicationCursorReq_Get{msg} + return true, err + case 3: // op.set + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(ReplicationCursorReq_SetOp) + err := b.DecodeMessage(msg) + m.Op = &ReplicationCursorReq_Set{msg} + return true, err + default: + return false, nil + } +} + +func _ReplicationCursorReq_OneofSizer(msg proto.Message) (n int) { + m := msg.(*ReplicationCursorReq) + // op + switch x := m.Op.(type) { + case *ReplicationCursorReq_Get: + s := proto.Size(x.Get) + n += 1 // tag and wire + n += proto.SizeVarint(uint64(s)) + n += s + case *ReplicationCursorReq_Set: + s := proto.Size(x.Set) + n += 1 // tag and wire + n += proto.SizeVarint(uint64(s)) + n += s + case nil: + default: + panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) + } + return n +} + +type ReplicationCursorReq_GetOp struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ReplicationCursorReq_GetOp) Reset() { *m = ReplicationCursorReq_GetOp{} } +func (m *ReplicationCursorReq_GetOp) String() string { return proto.CompactTextString(m) } +func (*ReplicationCursorReq_GetOp) ProtoMessage() {} +func (*ReplicationCursorReq_GetOp) Descriptor() ([]byte, []int) { + return fileDescriptor_pdu_cbdc4740ab26577c, []int{14, 0} +} +func (m *ReplicationCursorReq_GetOp) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ReplicationCursorReq_GetOp.Unmarshal(m, b) +} +func (m *ReplicationCursorReq_GetOp) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ReplicationCursorReq_GetOp.Marshal(b, m, deterministic) +} +func (dst *ReplicationCursorReq_GetOp) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReplicationCursorReq_GetOp.Merge(dst, src) +} +func (m *ReplicationCursorReq_GetOp) XXX_Size() int { + return xxx_messageInfo_ReplicationCursorReq_GetOp.Size(m) +} +func (m *ReplicationCursorReq_GetOp) XXX_DiscardUnknown() { + xxx_messageInfo_ReplicationCursorReq_GetOp.DiscardUnknown(m) +} + +var xxx_messageInfo_ReplicationCursorReq_GetOp proto.InternalMessageInfo + +type ReplicationCursorReq_SetOp struct { + Snapshot string `protobuf:"bytes,2,opt,name=Snapshot,proto3" json:"Snapshot,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ReplicationCursorReq_SetOp) Reset() { *m = ReplicationCursorReq_SetOp{} } +func (m *ReplicationCursorReq_SetOp) String() string { return proto.CompactTextString(m) } +func (*ReplicationCursorReq_SetOp) ProtoMessage() {} +func (*ReplicationCursorReq_SetOp) Descriptor() ([]byte, []int) { + return fileDescriptor_pdu_cbdc4740ab26577c, []int{14, 1} +} +func (m *ReplicationCursorReq_SetOp) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ReplicationCursorReq_SetOp.Unmarshal(m, b) +} +func (m *ReplicationCursorReq_SetOp) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ReplicationCursorReq_SetOp.Marshal(b, m, deterministic) +} +func (dst *ReplicationCursorReq_SetOp) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReplicationCursorReq_SetOp.Merge(dst, src) +} +func (m *ReplicationCursorReq_SetOp) XXX_Size() int { + return xxx_messageInfo_ReplicationCursorReq_SetOp.Size(m) +} +func (m *ReplicationCursorReq_SetOp) XXX_DiscardUnknown() { + xxx_messageInfo_ReplicationCursorReq_SetOp.DiscardUnknown(m) +} + +var xxx_messageInfo_ReplicationCursorReq_SetOp proto.InternalMessageInfo + +func (m *ReplicationCursorReq_SetOp) GetSnapshot() string { if m != nil { return m.Snapshot } return "" } -func (m *SnapshotReplicationStatusReq) GetOp() SnapshotReplicationStatusReq_Op { +type ReplicationCursorRes struct { + // Types that are valid to be assigned to Result: + // *ReplicationCursorRes_Guid + // *ReplicationCursorRes_Error + Result isReplicationCursorRes_Result `protobuf_oneof:"Result"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ReplicationCursorRes) Reset() { *m = ReplicationCursorRes{} } +func (m *ReplicationCursorRes) String() string { return proto.CompactTextString(m) } +func (*ReplicationCursorRes) ProtoMessage() {} +func (*ReplicationCursorRes) Descriptor() ([]byte, []int) { + return fileDescriptor_pdu_cbdc4740ab26577c, []int{15} +} +func (m *ReplicationCursorRes) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ReplicationCursorRes.Unmarshal(m, b) +} +func (m *ReplicationCursorRes) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ReplicationCursorRes.Marshal(b, m, deterministic) +} +func (dst *ReplicationCursorRes) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReplicationCursorRes.Merge(dst, src) +} +func (m *ReplicationCursorRes) XXX_Size() int { + return xxx_messageInfo_ReplicationCursorRes.Size(m) +} +func (m *ReplicationCursorRes) XXX_DiscardUnknown() { + xxx_messageInfo_ReplicationCursorRes.DiscardUnknown(m) +} + +var xxx_messageInfo_ReplicationCursorRes proto.InternalMessageInfo + +type isReplicationCursorRes_Result interface { + isReplicationCursorRes_Result() +} + +type ReplicationCursorRes_Guid struct { + Guid uint64 `protobuf:"varint,1,opt,name=Guid,proto3,oneof"` +} + +type ReplicationCursorRes_Error struct { + Error string `protobuf:"bytes,2,opt,name=Error,proto3,oneof"` +} + +func (*ReplicationCursorRes_Guid) isReplicationCursorRes_Result() {} + +func (*ReplicationCursorRes_Error) isReplicationCursorRes_Result() {} + +func (m *ReplicationCursorRes) GetResult() isReplicationCursorRes_Result { if m != nil { - return m.Op + return m.Result } - return SnapshotReplicationStatusReq_Get + return nil } -type SnapshotReplicationStatusRes struct { - Status SnapshotReplicationStatusRes_Status `protobuf:"varint,1,opt,name=status,proto3,enum=pdu.SnapshotReplicationStatusRes_Status" json:"status,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *SnapshotReplicationStatusRes) Reset() { *m = SnapshotReplicationStatusRes{} } -func (m *SnapshotReplicationStatusRes) String() string { return proto.CompactTextString(m) } -func (*SnapshotReplicationStatusRes) ProtoMessage() {} -func (*SnapshotReplicationStatusRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_b3a98b3542e9fb4e, []int{15} -} -func (m *SnapshotReplicationStatusRes) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_SnapshotReplicationStatusRes.Unmarshal(m, b) -} -func (m *SnapshotReplicationStatusRes) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_SnapshotReplicationStatusRes.Marshal(b, m, deterministic) -} -func (dst *SnapshotReplicationStatusRes) XXX_Merge(src proto.Message) { - xxx_messageInfo_SnapshotReplicationStatusRes.Merge(dst, src) -} -func (m *SnapshotReplicationStatusRes) XXX_Size() int { - return xxx_messageInfo_SnapshotReplicationStatusRes.Size(m) -} -func (m *SnapshotReplicationStatusRes) XXX_DiscardUnknown() { - xxx_messageInfo_SnapshotReplicationStatusRes.DiscardUnknown(m) -} - -var xxx_messageInfo_SnapshotReplicationStatusRes proto.InternalMessageInfo - -func (m *SnapshotReplicationStatusRes) GetStatus() SnapshotReplicationStatusRes_Status { - if m != nil { - return m.Status +func (m *ReplicationCursorRes) GetGuid() uint64 { + if x, ok := m.GetResult().(*ReplicationCursorRes_Guid); ok { + return x.Guid } - return SnapshotReplicationStatusRes_Nonexistent + return 0 +} + +func (m *ReplicationCursorRes) GetError() string { + if x, ok := m.GetResult().(*ReplicationCursorRes_Error); ok { + return x.Error + } + return "" +} + +// XXX_OneofFuncs is for the internal use of the proto package. +func (*ReplicationCursorRes) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { + return _ReplicationCursorRes_OneofMarshaler, _ReplicationCursorRes_OneofUnmarshaler, _ReplicationCursorRes_OneofSizer, []interface{}{ + (*ReplicationCursorRes_Guid)(nil), + (*ReplicationCursorRes_Error)(nil), + } +} + +func _ReplicationCursorRes_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { + m := msg.(*ReplicationCursorRes) + // Result + switch x := m.Result.(type) { + case *ReplicationCursorRes_Guid: + b.EncodeVarint(1<<3 | proto.WireVarint) + b.EncodeVarint(uint64(x.Guid)) + case *ReplicationCursorRes_Error: + b.EncodeVarint(2<<3 | proto.WireBytes) + b.EncodeStringBytes(x.Error) + case nil: + default: + return fmt.Errorf("ReplicationCursorRes.Result has unexpected type %T", x) + } + return nil +} + +func _ReplicationCursorRes_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { + m := msg.(*ReplicationCursorRes) + switch tag { + case 1: // Result.Guid + if wire != proto.WireVarint { + return true, proto.ErrInternalBadWireType + } + x, err := b.DecodeVarint() + m.Result = &ReplicationCursorRes_Guid{x} + return true, err + case 2: // Result.Error + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + x, err := b.DecodeStringBytes() + m.Result = &ReplicationCursorRes_Error{x} + return true, err + default: + return false, nil + } +} + +func _ReplicationCursorRes_OneofSizer(msg proto.Message) (n int) { + m := msg.(*ReplicationCursorRes) + // Result + switch x := m.Result.(type) { + case *ReplicationCursorRes_Guid: + n += 1 // tag and wire + n += proto.SizeVarint(uint64(x.Guid)) + case *ReplicationCursorRes_Error: + n += 1 // tag and wire + n += proto.SizeVarint(uint64(len(x.Error))) + n += len(x.Error) + case nil: + default: + panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) + } + return n } func init() { @@ -862,57 +1078,57 @@ func init() { proto.RegisterType((*DestroySnapshotsReq)(nil), "pdu.DestroySnapshotsReq") proto.RegisterType((*DestroySnapshotRes)(nil), "pdu.DestroySnapshotRes") proto.RegisterType((*DestroySnapshotsRes)(nil), "pdu.DestroySnapshotsRes") - proto.RegisterType((*SnapshotReplicationStatusReq)(nil), "pdu.SnapshotReplicationStatusReq") - proto.RegisterType((*SnapshotReplicationStatusRes)(nil), "pdu.SnapshotReplicationStatusRes") + proto.RegisterType((*ReplicationCursorReq)(nil), "pdu.ReplicationCursorReq") + proto.RegisterType((*ReplicationCursorReq_GetOp)(nil), "pdu.ReplicationCursorReq.GetOp") + proto.RegisterType((*ReplicationCursorReq_SetOp)(nil), "pdu.ReplicationCursorReq.SetOp") + proto.RegisterType((*ReplicationCursorRes)(nil), "pdu.ReplicationCursorRes") proto.RegisterEnum("pdu.FilesystemVersion_VersionType", FilesystemVersion_VersionType_name, FilesystemVersion_VersionType_value) - proto.RegisterEnum("pdu.SnapshotReplicationStatusReq_Op", SnapshotReplicationStatusReq_Op_name, SnapshotReplicationStatusReq_Op_value) - proto.RegisterEnum("pdu.SnapshotReplicationStatusRes_Status", SnapshotReplicationStatusRes_Status_name, SnapshotReplicationStatusRes_Status_value) } -func init() { proto.RegisterFile("pdu.proto", fileDescriptor_pdu_b3a98b3542e9fb4e) } +func init() { proto.RegisterFile("pdu.proto", fileDescriptor_pdu_cbdc4740ab26577c) } -var fileDescriptor_pdu_b3a98b3542e9fb4e = []byte{ - // 666 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0xcf, 0x6e, 0xd3, 0x4e, - 0x10, 0xfe, 0xd9, 0x49, 0xf3, 0x67, 0xd2, 0xa6, 0xe9, 0xb6, 0xea, 0xcf, 0x54, 0x15, 0x8a, 0x56, - 0x1c, 0x02, 0x12, 0x91, 0x08, 0x15, 0x17, 0x38, 0xa0, 0xfe, 0xe5, 0x80, 0xda, 0x6a, 0x13, 0xaa, - 0x9e, 0x90, 0x4c, 0x3d, 0x52, 0xad, 0x24, 0xde, 0xed, 0xee, 0x1a, 0x35, 0x3c, 0x00, 0x8f, 0xc1, - 0x43, 0x70, 0xe3, 0x4d, 0x78, 0x1c, 0xe4, 0x89, 0xed, 0xb8, 0x49, 0x09, 0x39, 0x65, 0xbe, 0x6f, - 0x66, 0x67, 0xbe, 0x99, 0xdd, 0x71, 0xa0, 0xae, 0x82, 0xb8, 0xab, 0xb4, 0xb4, 0x92, 0x95, 0x54, - 0x10, 0xf3, 0x6d, 0xd8, 0xfa, 0x18, 0x1a, 0x7b, 0x1a, 0x8e, 0xd0, 0x4c, 0x8c, 0xc5, 0xb1, 0xc0, - 0x3b, 0x7e, 0xba, 0x48, 0x1a, 0xf6, 0x0a, 0x1a, 0x33, 0xc2, 0x78, 0x4e, 0xbb, 0xd4, 0x69, 0xf4, - 0x36, 0xbb, 0x49, 0xbe, 0x42, 0x60, 0x31, 0x86, 0x1f, 0x02, 0xcc, 0x20, 0x63, 0x50, 0xbe, 0xf4, - 0xed, 0xad, 0xe7, 0xb4, 0x9d, 0x4e, 0x5d, 0x90, 0xcd, 0xda, 0xd0, 0x10, 0x68, 0xe2, 0x31, 0x0e, - 0xe4, 0x10, 0x23, 0xcf, 0x25, 0x57, 0x91, 0xe2, 0x6f, 0xe1, 0xc9, 0x43, 0x2d, 0x57, 0xa8, 0x4d, - 0x28, 0x23, 0x23, 0xf0, 0x8e, 0x3d, 0x2d, 0x16, 0x48, 0x13, 0x17, 0x18, 0x7e, 0xf1, 0xf7, 0xc3, - 0x86, 0xf5, 0xa0, 0x96, 0xc1, 0xb4, 0x9b, 0xdd, 0xb9, 0x6e, 0x52, 0xb7, 0xc8, 0xe3, 0xf8, 0x6f, - 0x07, 0xb6, 0x16, 0xfc, 0xec, 0x0d, 0x94, 0x07, 0x13, 0x85, 0x24, 0xa0, 0xd9, 0xe3, 0x8f, 0x67, - 0xe9, 0xa6, 0xbf, 0x49, 0xa4, 0xa0, 0xf8, 0x64, 0x22, 0xe7, 0xfe, 0x18, 0xd3, 0xb6, 0xc9, 0x4e, - 0xb8, 0xb3, 0x38, 0x0c, 0xbc, 0x52, 0xdb, 0xe9, 0x94, 0x05, 0xd9, 0x6c, 0x1f, 0xea, 0x47, 0x1a, - 0x7d, 0x8b, 0x83, 0xeb, 0x33, 0xaf, 0x4c, 0x8e, 0x19, 0xc1, 0xf6, 0xa0, 0x46, 0x20, 0x94, 0x91, - 0xb7, 0x46, 0x99, 0x72, 0xcc, 0x9f, 0x43, 0xa3, 0x50, 0x96, 0xad, 0x43, 0xad, 0x1f, 0xf9, 0xca, - 0xdc, 0x4a, 0xdb, 0xfa, 0x2f, 0x41, 0x87, 0x52, 0x0e, 0xc7, 0xbe, 0x1e, 0xb6, 0x1c, 0xfe, 0xcb, - 0x81, 0x6a, 0x1f, 0xa3, 0x60, 0x85, 0xb9, 0x26, 0x22, 0x4f, 0xb5, 0x1c, 0x67, 0xc2, 0x13, 0x9b, - 0x35, 0xc1, 0x1d, 0x48, 0x92, 0x5d, 0x17, 0xee, 0x40, 0xce, 0x5f, 0x6d, 0x79, 0xe1, 0x6a, 0x49, - 0xb8, 0x1c, 0x2b, 0x8d, 0xc6, 0x90, 0xf0, 0x9a, 0xc8, 0x31, 0xdb, 0x81, 0xb5, 0x63, 0x0c, 0x62, - 0xe5, 0x55, 0xc8, 0x31, 0x05, 0x6c, 0x17, 0x2a, 0xc7, 0x7a, 0x22, 0xe2, 0xc8, 0xab, 0x12, 0x9d, - 0x22, 0x7e, 0x00, 0xb5, 0x4b, 0x2d, 0x15, 0x6a, 0x3b, 0xc9, 0x87, 0xea, 0x14, 0x86, 0xba, 0x03, - 0x6b, 0x57, 0xfe, 0x28, 0xce, 0x26, 0x3d, 0x05, 0xfc, 0x7b, 0xde, 0xb1, 0x61, 0x1d, 0xd8, 0xfc, - 0x64, 0x30, 0x28, 0x2a, 0x76, 0xa8, 0xc4, 0x3c, 0xcd, 0x38, 0xac, 0x9f, 0xdc, 0x2b, 0xbc, 0xb1, - 0x18, 0xf4, 0xc3, 0x6f, 0xd3, 0x94, 0x25, 0xf1, 0x80, 0x63, 0x2f, 0x01, 0x52, 0x3d, 0x21, 0x1a, - 0xaf, 0x44, 0x8f, 0x6b, 0x83, 0x9e, 0x45, 0x26, 0x53, 0x14, 0x02, 0xf8, 0x35, 0x80, 0xc0, 0x1b, - 0x0c, 0xbf, 0xe2, 0x2a, 0xc3, 0x7f, 0x01, 0xad, 0xa3, 0x11, 0xfa, 0x7a, 0x7e, 0x71, 0x6a, 0x62, - 0x81, 0xe7, 0xeb, 0x85, 0xcc, 0x86, 0x0f, 0x61, 0xfb, 0x18, 0x8d, 0xd5, 0x72, 0x92, 0xbd, 0x82, - 0x55, 0xb6, 0x88, 0x1d, 0x40, 0x3d, 0x8f, 0xf7, 0xdc, 0xa5, 0x9b, 0x32, 0x0b, 0xe4, 0x9f, 0x81, - 0xcd, 0x15, 0x4b, 0x97, 0x2e, 0x83, 0x54, 0x69, 0xc9, 0xd2, 0x65, 0x71, 0xc9, 0xed, 0x9d, 0x68, - 0x2d, 0x75, 0x76, 0x7b, 0x04, 0xf8, 0x87, 0xc7, 0x9a, 0x49, 0x3e, 0x53, 0xd5, 0x64, 0x00, 0x23, - 0x9b, 0x2d, 0xf5, 0xff, 0x94, 0x7f, 0x51, 0x8a, 0xc8, 0xe2, 0xf8, 0x4f, 0x07, 0xf6, 0x67, 0x0e, - 0x35, 0x0a, 0x6f, 0x68, 0x79, 0xfa, 0xd6, 0xb7, 0xf1, 0x4a, 0x03, 0xda, 0x2b, 0x34, 0x35, 0xd5, - 0x38, 0x13, 0x7f, 0x00, 0xae, 0x54, 0xb4, 0x16, 0xcd, 0xde, 0x33, 0x92, 0xb2, 0xac, 0x54, 0xf7, - 0x42, 0x09, 0x57, 0x2a, 0xde, 0x06, 0xf7, 0x42, 0xb1, 0x2a, 0x94, 0xce, 0x30, 0xd9, 0xd4, 0x2d, - 0xd8, 0xe8, 0x63, 0x7e, 0x00, 0x83, 0x96, 0xc3, 0x7f, 0x2c, 0x17, 0x6d, 0xd8, 0x7b, 0xa8, 0x18, - 0x02, 0xe9, 0x67, 0xa9, 0xf3, 0xaf, 0xe2, 0xa6, 0x9b, 0x5a, 0xe9, 0x39, 0xfe, 0x0e, 0x2a, 0x53, - 0x86, 0x6d, 0x42, 0xe3, 0x5c, 0x46, 0x78, 0x1f, 0x1a, 0x8b, 0x51, 0x2a, 0xe8, 0x5c, 0x3e, 0x10, - 0xc4, 0x9a, 0xc9, 0x53, 0xcb, 0xb1, 0xfb, 0xa5, 0x42, 0xff, 0x32, 0xaf, 0xff, 0x04, 0x00, 0x00, - 0xff, 0xff, 0x3a, 0x00, 0x8b, 0x6e, 0x72, 0x06, 0x00, 0x00, +var fileDescriptor_pdu_cbdc4740ab26577c = []byte{ + // 657 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0xcb, 0x6e, 0xdb, 0x3a, + 0x10, 0xb5, 0x6c, 0xd9, 0x96, 0xc7, 0xb9, 0x79, 0x30, 0x41, 0xae, 0x6e, 0x70, 0x71, 0xaf, 0xc1, + 0x6e, 0xdc, 0x02, 0x35, 0x50, 0x27, 0xe8, 0xa6, 0x3b, 0xe7, 0xe5, 0x45, 0x91, 0x04, 0xb4, 0x1b, + 0x64, 0x55, 0x40, 0x8d, 0x06, 0x8d, 0xe0, 0x07, 0x15, 0x92, 0x2a, 0xea, 0x7e, 0x40, 0xff, 0xa9, + 0xff, 0xd1, 0x45, 0x3f, 0xa7, 0xe0, 0x58, 0x92, 0x15, 0xdb, 0x0d, 0xbc, 0x32, 0xcf, 0xf0, 0x70, + 0xe6, 0xcc, 0xa1, 0x86, 0x86, 0x46, 0x1c, 0x26, 0x9d, 0x58, 0x49, 0x23, 0x59, 0x25, 0x0e, 0x13, + 0xbe, 0x0f, 0x7b, 0xef, 0x23, 0x6d, 0x2e, 0xa2, 0x31, 0xea, 0x99, 0x36, 0x38, 0x11, 0xf8, 0xc8, + 0x2f, 0x56, 0x83, 0x9a, 0xbd, 0x81, 0xe6, 0x22, 0xa0, 0x7d, 0xa7, 0x55, 0x69, 0x37, 0xbb, 0x3b, + 0x1d, 0x9b, 0xaf, 0x40, 0x2c, 0x72, 0x78, 0x0f, 0x60, 0x01, 0x19, 0x03, 0xf7, 0x26, 0x30, 0x0f, + 0xbe, 0xd3, 0x72, 0xda, 0x0d, 0x41, 0x6b, 0xd6, 0x82, 0xa6, 0x40, 0x9d, 0x4c, 0x70, 0x28, 0x47, + 0x38, 0xf5, 0xcb, 0xb4, 0x55, 0x0c, 0xf1, 0x77, 0xf0, 0xcf, 0x53, 0x2d, 0xb7, 0xa8, 0x74, 0x24, + 0xa7, 0x5a, 0xe0, 0x23, 0xfb, 0xaf, 0x58, 0x20, 0x4d, 0x5c, 0x88, 0xf0, 0xeb, 0x3f, 0x1f, 0xd6, + 0xac, 0x0b, 0x5e, 0x06, 0xd3, 0x6e, 0x0e, 0x97, 0xba, 0x49, 0xb7, 0x45, 0xce, 0xe3, 0xbf, 0x1c, + 0xd8, 0x5b, 0xd9, 0x67, 0x6f, 0xc1, 0x1d, 0xce, 0x62, 0x24, 0x01, 0xdb, 0x5d, 0xbe, 0x3e, 0x4b, + 0x27, 0xfd, 0xb5, 0x4c, 0x41, 0x7c, 0xeb, 0xc8, 0x55, 0x30, 0xc1, 0xb4, 0x6d, 0x5a, 0xdb, 0xd8, + 0x65, 0x12, 0x85, 0x7e, 0xa5, 0xe5, 0xb4, 0x5d, 0x41, 0x6b, 0xf6, 0x2f, 0x34, 0x4e, 0x15, 0x06, + 0x06, 0x87, 0x77, 0x97, 0xbe, 0x4b, 0x1b, 0x8b, 0x00, 0x3b, 0x02, 0x8f, 0x40, 0x24, 0xa7, 0x7e, + 0x95, 0x32, 0xe5, 0x98, 0xbf, 0x84, 0x66, 0xa1, 0x2c, 0xdb, 0x02, 0x6f, 0x30, 0x0d, 0x62, 0xfd, + 0x20, 0xcd, 0x6e, 0xc9, 0xa2, 0x9e, 0x94, 0xa3, 0x49, 0xa0, 0x46, 0xbb, 0x0e, 0xff, 0xe1, 0x40, + 0x7d, 0x80, 0xd3, 0x70, 0x03, 0x5f, 0xad, 0xc8, 0x0b, 0x25, 0x27, 0x99, 0x70, 0xbb, 0x66, 0xdb, + 0x50, 0x1e, 0x4a, 0x92, 0xdd, 0x10, 0xe5, 0xa1, 0x5c, 0xbe, 0x5a, 0x77, 0xe5, 0x6a, 0x49, 0xb8, + 0x9c, 0xc4, 0x0a, 0xb5, 0x26, 0xe1, 0x9e, 0xc8, 0x31, 0x3b, 0x80, 0xea, 0x19, 0x86, 0x49, 0xec, + 0xd7, 0x68, 0x63, 0x0e, 0xd8, 0x21, 0xd4, 0xce, 0xd4, 0x4c, 0x24, 0x53, 0xbf, 0x4e, 0xe1, 0x14, + 0xf1, 0x13, 0xf0, 0x6e, 0x94, 0x8c, 0x51, 0x99, 0x59, 0x6e, 0xaa, 0x53, 0x30, 0xf5, 0x00, 0xaa, + 0xb7, 0xc1, 0x38, 0xc9, 0x9c, 0x9e, 0x03, 0xfe, 0x3d, 0xef, 0x58, 0xb3, 0x36, 0xec, 0x7c, 0xd0, + 0x18, 0x16, 0x15, 0x3b, 0x54, 0x62, 0x39, 0xcc, 0x38, 0x6c, 0x9d, 0x7f, 0x8d, 0xf1, 0xde, 0x60, + 0x38, 0x88, 0xbe, 0xcd, 0x53, 0x56, 0xc4, 0x93, 0x18, 0x7b, 0x0d, 0x90, 0xea, 0x89, 0x50, 0xfb, + 0x15, 0xfa, 0xb8, 0xfe, 0xa2, 0xcf, 0x22, 0x93, 0x29, 0x0a, 0x04, 0x7e, 0x07, 0x20, 0xf0, 0x1e, + 0xa3, 0x2f, 0xb8, 0x89, 0xf9, 0xaf, 0x60, 0xf7, 0x74, 0x8c, 0x81, 0x5a, 0x1e, 0x1c, 0x4f, 0xac, + 0xc4, 0xf9, 0x56, 0x21, 0xb3, 0xe6, 0x23, 0xd8, 0x3f, 0x43, 0x6d, 0x94, 0x9c, 0x65, 0x5f, 0xc1, + 0x26, 0x53, 0xc4, 0x4e, 0xa0, 0x91, 0xf3, 0xfd, 0xf2, 0xb3, 0x93, 0xb2, 0x20, 0xf2, 0x8f, 0xc0, + 0x96, 0x8a, 0xa5, 0x43, 0x97, 0x41, 0xaa, 0xf4, 0xcc, 0xd0, 0x65, 0x3c, 0x7b, 0x7b, 0xe7, 0x4a, + 0x49, 0x95, 0xdd, 0x1e, 0x01, 0xde, 0x5f, 0xd7, 0x8c, 0x7d, 0xa6, 0xea, 0xd6, 0x80, 0xb1, 0xc9, + 0x86, 0xfa, 0x6f, 0xca, 0xbf, 0x2a, 0x45, 0x64, 0x3c, 0xfe, 0xd3, 0x81, 0x03, 0x81, 0xf1, 0x38, + 0xba, 0xa7, 0xa1, 0x39, 0x4d, 0x94, 0x96, 0x6a, 0x13, 0x63, 0x8e, 0xa1, 0xf2, 0x19, 0x0d, 0xc9, + 0x6a, 0x76, 0xff, 0xa7, 0x3a, 0xeb, 0xf2, 0x74, 0x2e, 0xd1, 0x5c, 0xc7, 0xfd, 0x92, 0xb0, 0x6c, + 0x7b, 0x48, 0xa3, 0xa1, 0x41, 0x79, 0xf6, 0xd0, 0x20, 0x3b, 0xa4, 0xd1, 0x1c, 0xd5, 0xa1, 0x4a, + 0x49, 0x8e, 0x5e, 0x40, 0x95, 0x36, 0xec, 0xf0, 0xe4, 0x46, 0xce, 0x7d, 0xc9, 0x71, 0xcf, 0x85, + 0xb2, 0x8c, 0xf9, 0xd5, 0xda, 0xae, 0xec, 0x68, 0xcd, 0x5f, 0x18, 0xdb, 0x8f, 0xdb, 0x2f, 0xa5, + 0x6f, 0xcc, 0xe1, 0x13, 0x93, 0xfb, 0xa5, 0xd4, 0xe6, 0x9e, 0x07, 0xb5, 0xb9, 0x4f, 0x9f, 0x6a, + 0xf4, 0xb7, 0x71, 0xfc, 0x3b, 0x00, 0x00, 0xff, 0xff, 0x02, 0x35, 0xe7, 0x43, 0x43, 0x06, 0x00, + 0x00, } diff --git a/replication/pdu/pdu.proto b/replication/pdu/pdu.proto index 1238243..d07902c 100644 --- a/replication/pdu/pdu.proto +++ b/replication/pdu/pdu.proto @@ -98,21 +98,21 @@ message DestroySnapshotsRes { repeated DestroySnapshotRes Results = 1; } -message SnapshotReplicationStatusReq { +message ReplicationCursorReq { string Filesystem = 1; - string Snapshot = 2; - enum Op { - Get = 0; - SetReplicated = 1; + message GetOp {} + message SetOp { + string Snapshot = 2; + } + oneof op { + GetOp get = 2; + SetOp set = 3; } - Op op = 3; } -message SnapshotReplicationStatusRes { - enum Status { - Nonexistent = 0; - NotReplicated = 1; - Replicated = 2; +message ReplicationCursorRes { + oneof Result { + uint64 Guid = 1; + string Error = 2; } - Status status = 1; } diff --git a/zfs/replication_history.go b/zfs/replication_history.go index c650451..c5b46c3 100644 --- a/zfs/replication_history.go +++ b/zfs/replication_history.go @@ -1,25 +1,58 @@ package zfs -const ReplicatedProperty = "zrepl:replicated" +import ( + "fmt" + "github.com/pkg/errors" + "strconv" +) -// May return *DatasetDoesNotExist as an error -func ZFSGetReplicatedProperty(fs *DatasetPath, v *FilesystemVersion) (replicated bool, err error) { - props, err := zfsGet(v.ToAbsPath(fs), []string{ReplicatedProperty}) +const ReplicationCursorBookmarkName = "zrepl_replication_cursor" + +// may return nil for both values, indicating there is no cursor +func ZFSGetReplicationCursor(fs *DatasetPath) (*FilesystemVersion, error) { + versions, err := ZFSListFilesystemVersions(fs, nil) if err != nil { - return false, err + return nil, err } - if props.Get(ReplicatedProperty) == "yes" { - return true, nil + for _, v := range versions { + if v.Type == Bookmark && v.Name == ReplicationCursorBookmarkName { + return &v, nil + } } - return false, nil + return nil, nil } -func ZFSSetReplicatedProperty(fs *DatasetPath, v *FilesystemVersion, replicated bool) error { - val := "no" - if replicated { - val = "yes" +func ZFSSetReplicationCursor(fs *DatasetPath, snapname string) (guid uint64, err error) { + snapPath := fmt.Sprintf("%s@%s", fs.ToString(), snapname) + propsSnap, err := zfsGet(snapPath, []string{"createtxg", "guid"}) + if err != nil { + return 0, err } - props := NewZFSProperties() - props.Set(ReplicatedProperty, val) - return zfsSet(v.ToAbsPath(fs), props) + snapGuid, err := strconv.ParseUint(propsSnap.Get("guid"), 10, 64) + bookmarkPath := fmt.Sprintf("%s#%s", fs.ToString(), ReplicationCursorBookmarkName) + propsBookmark, err := zfsGet(bookmarkPath, []string{"createtxg"}) + _, bookmarkNotExistErr := err.(*DatasetDoesNotExist) + if err != nil && !bookmarkNotExistErr { + return 0, err + } + if err == nil { + bookmarkTxg, err := strconv.ParseUint(propsBookmark.Get("createtxg"), 10, 64) + if err != nil { + return 0, errors.Wrap(err, "cannot parse bookmark createtxg") + } + snapTxg, err := strconv.ParseUint(propsSnap.Get("createtxg"), 10, 64) + if err != nil { + return 0, errors.Wrap(err, "cannot parse snapshot createtxg") + } + if snapTxg < bookmarkTxg { + return 0, errors.New("replication cursor can only be advanced, not set back") + } + if err := ZFSDestroy(bookmarkPath); err != nil { // FIXME make safer by using new temporary bookmark, then rename, possible with channel programs + return 0, err + } + } + if err := ZFSBookmark(fs, snapname, ReplicationCursorBookmarkName); err != nil { + return 0, err + } + return snapGuid, nil } diff --git a/zfs/zfs.go b/zfs/zfs.go index 9c4d6b7..22a4d49 100644 --- a/zfs/zfs.go +++ b/zfs/zfs.go @@ -490,7 +490,7 @@ func ZFSGet(fs *DatasetPath, props []string) (*ZFSProperties, error) { return zfsGet(fs.ToString(), props) } -var zfsGetDatasetDoesNotExistRegexp = regexp.MustCompile(`^cannot open '(\S+)': dataset does not exist`) +var zfsGetDatasetDoesNotExistRegexp = regexp.MustCompile(`^cannot open '(\S+)': (dataset does not exist|no such pool or dataset)`) type DatasetDoesNotExist struct { Path string