From 4a0104a44f6a689311c3b367ef9301a4ae69b693 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 29 May 2020 00:09:43 +0200 Subject: [PATCH] WIP endpoint abstractions + pruning integration / pruner rewrite --- config/config.go | 6 + daemon/pruner.v2/fsstate_enumer.go | 72 +++ daemon/pruner.v2/pruner.go | 454 ++++++++++++++++++ daemon/pruner.v2/pruner_side.go | 27 ++ daemon/pruner.v2/snapstate_enumer.go | 70 +++ daemon/pruner.v2/state_enumer.go | 71 +++ daemon/pruner/pruner.go | 80 +-- endpoint/endpoint.go | 7 + endpoint/endpoint_send_abstractions_cache.go | 18 + endpoint/endpoint_zfs_abstraction_pdu.go | 28 ++ platformtest/tests/batchDestroy.go | 37 +- platformtest/tests/generated_cases.go | 1 + platformtest/tests/pruner.go.deact | 331 +++++++++++++ platformtest/tests/pruner.v2.go | 137 ++++++ .../tests/undestroyableSnapshotParsing.go | 9 +- pruning/keep_grid.go | 25 +- pruning/keep_helpers.go | 11 +- pruning/keep_last_n.go | 8 +- pruning/keep_not_replicated.go | 4 +- pruning/keep_regex.go | 8 +- pruning/keep_step_holds.go | 44 ++ pruning/pruning.go | 82 +++- replication/logic/pdu/pdu.pb.go | 255 +++++++--- replication/logic/pdu/pdu.proto | 23 +- zfs/versions.go | 2 + zfs/zfs.go | 26 + 26 files changed, 1672 insertions(+), 164 deletions(-) create mode 100644 daemon/pruner.v2/fsstate_enumer.go create mode 100644 daemon/pruner.v2/pruner.go create mode 100644 daemon/pruner.v2/pruner_side.go create mode 100644 daemon/pruner.v2/snapstate_enumer.go create mode 100644 daemon/pruner.v2/state_enumer.go create mode 100644 endpoint/endpoint_zfs_abstraction_pdu.go create mode 100644 platformtest/tests/pruner.go.deact create mode 100644 platformtest/tests/pruner.v2.go create mode 100644 pruning/keep_step_holds.go diff --git a/config/config.go b/config/config.go index 81c3104..ea91a42 100644 --- a/config/config.go +++ b/config/config.go @@ -309,6 +309,11 @@ type PruneKeepNotReplicated struct { KeepSnapshotAtCursor bool `yaml:"keep_snapshot_at_cursor,optional,default=true"` } +type PruneKeepStepHolds struct { + Type string `yaml:"type"` + AdditionalJobIds []string `yaml:"additional_job_ids,optional"` +} + type PruneKeepLastN struct { Type string `yaml:"type"` Count int `yaml:"count"` @@ -480,6 +485,7 @@ func (t *ServeEnum) UnmarshalYAML(u func(interface{}, bool) error) (err error) { func (t *PruningEnum) UnmarshalYAML(u func(interface{}, bool) error) (err error) { t.Ret, err = enumUnmarshal(u, map[string]interface{}{ + "step_holds": &PruneKeepStepHolds{}, "not_replicated": &PruneKeepNotReplicated{}, "last_n": &PruneKeepLastN{}, "grid": &PruneGrid{}, diff --git a/daemon/pruner.v2/fsstate_enumer.go b/daemon/pruner.v2/fsstate_enumer.go new file mode 100644 index 0000000..5933d72 --- /dev/null +++ b/daemon/pruner.v2/fsstate_enumer.go @@ -0,0 +1,72 @@ +// Code generated by "enumer -type=FSState -json"; DO NOT EDIT. + +// +package pruner + +import ( + "encoding/json" + "fmt" +) + +const _FSStateName = "FSStateInitializedFSStatePlanningFSStatePlanErrFSStateExecutingFSStateExecuteErrFSStateExecuteSuccess" + +var _FSStateIndex = [...]uint8{0, 18, 33, 47, 63, 80, 101} + +func (i FSState) String() string { + if i < 0 || i >= FSState(len(_FSStateIndex)-1) { + return fmt.Sprintf("FSState(%d)", i) + } + return _FSStateName[_FSStateIndex[i]:_FSStateIndex[i+1]] +} + +var _FSStateValues = []FSState{0, 1, 2, 3, 4, 5} + +var _FSStateNameToValueMap = map[string]FSState{ + _FSStateName[0:18]: 0, + _FSStateName[18:33]: 1, + _FSStateName[33:47]: 2, + _FSStateName[47:63]: 3, + _FSStateName[63:80]: 4, + _FSStateName[80:101]: 5, +} + +// FSStateString retrieves an enum value from the enum constants string name. +// Throws an error if the param is not part of the enum. +func FSStateString(s string) (FSState, error) { + if val, ok := _FSStateNameToValueMap[s]; ok { + return val, nil + } + return 0, fmt.Errorf("%s does not belong to FSState values", s) +} + +// FSStateValues returns all values of the enum +func FSStateValues() []FSState { + return _FSStateValues +} + +// IsAFSState returns "true" if the value is listed in the enum definition. "false" otherwise +func (i FSState) IsAFSState() bool { + for _, v := range _FSStateValues { + if i == v { + return true + } + } + return false +} + +// MarshalJSON implements the json.Marshaler interface for FSState +func (i FSState) MarshalJSON() ([]byte, error) { + return json.Marshal(i.String()) +} + +// UnmarshalJSON implements the json.Unmarshaler interface for FSState +func (i *FSState) UnmarshalJSON(data []byte) error { + var s string + if err := json.Unmarshal(data, &s); err != nil { + return fmt.Errorf("FSState should be a string, got %s", data) + } + + var err error + *i, err = FSStateString(s) + return err +} diff --git a/daemon/pruner.v2/pruner.go b/daemon/pruner.v2/pruner.go new file mode 100644 index 0000000..00f6aa1 --- /dev/null +++ b/daemon/pruner.v2/pruner.go @@ -0,0 +1,454 @@ +package pruner + +import ( + "context" + "encoding/json" + "fmt" + "os" + + "github.com/pkg/errors" + "github.com/zrepl/zrepl/daemon/logging" + "github.com/zrepl/zrepl/daemon/logging/trace" + "github.com/zrepl/zrepl/endpoint" + "github.com/zrepl/zrepl/pruning" + "github.com/zrepl/zrepl/zfs" +) + +type Pruner struct { + fsfilter endpoint.FSFilter + jid endpoint.JobID + side Side + keepRules []pruning.KeepRule + + // all channels consumed by the run loop + reportReqs chan reportRequest + stopReqs chan stopRequest + done chan struct{} + fsListRes chan fsListRes + + state State + + listFilesystemsError error // only in state StateListFilesystemsError + fsPruners []*FSPruner // only in state StateFanOutFilesystems +} + +//go:generate enumer -type=State -json +type State int + +const ( + StateInitialized State = iota + StateListFilesystems + StateListFilesystemsError + StateFanOutFilesystems + StateDone +) + +type Report struct { + State State + ListFilesystemsError error // only valid in StateListFilesystemsError + Filesystems []*FSReport // valid from StateFanOutFilesystems +} + +type reportRequest struct { + ctx context.Context + reply chan *Report +} + +type runRequest struct { + complete chan struct{} +} + +type stopRequest struct { + complete chan struct{} +} + +type fsListRes struct { + filesystems []*zfs.DatasetPath + err error +} + +type Side interface { + // may return both nil, indicating there is no replication position + GetReplicationPosition(ctx context.Context, fs string) (*zfs.FilesystemVersion, error) + isSide() Side +} + +func NewPruner(fsfilter endpoint.FSFilter, jid endpoint.JobID, side Side, keepRules []pruning.KeepRule) *Pruner { + return &Pruner{ + fsfilter, + jid, + side, + keepRules, + make(chan reportRequest), + make(chan stopRequest), + make(chan struct{}), + make(chan fsListRes), + StateInitialized, + nil, + nil, + } +} + +func (p *Pruner) Run(ctx context.Context) *Report { + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if p.state != StateInitialized { + panic("Run can onl[y be called once") + } + + go func() { + fss, err := zfs.ZFSListMapping(ctx, p.fsfilter) + p.fsListRes <- fsListRes{fss, err} + }() + + for { + select { + case res := <-p.fsListRes: + if res.err != nil { + p.state = StateListFilesystemsError + p.listFilesystemsError = res.err + close(p.done) + continue + } + + p.state = StateFanOutFilesystems + + p.fsPruners = make([]*FSPruner, len(res.filesystems)) + _, add, end := trace.WithTaskGroup(ctx, "pruner-fan-out-fs") + for i, fs := range res.filesystems { + p.fsPruners[i] = NewFSPruner(p.jid, p.side, p.keepRules, fs) + add(func(ctx context.Context) { + p.fsPruners[i].Run(ctx) + }) + } + go func() { + end() + close(p.done) + }() + + case req := <-p.stopReqs: + cancel() + go func() { + <-p.done + close(req.complete) + }() + case req := <-p.reportReqs: + req.reply <- p.report(req.ctx) + case <-p.done: + p.state = StateDone + return p.report(ctx) + } + } +} + +func (p *Pruner) Report(ctx context.Context) *Report { + req := reportRequest{ + ctx: ctx, + reply: make(chan *Report, 1), + } + select { + case p.reportReqs <- req: + return <-req.reply + case <-ctx.Done(): + return nil + case <-p.done: + return nil + } +} + +func (p *Pruner) report(ctx context.Context) *Report { + fsreports := make([]*FSReport, len(p.fsPruners)) + for i := range fsreports { + fsreports[i] = p.fsPruners[i].report() + } + return &Report{ + State: p.state, + ListFilesystemsError: p.listFilesystemsError, + Filesystems: fsreports, + } +} + +// implements pruning.Snapshot +type snapshot struct { + replicated bool + stepHolds []pruning.StepHold + zfs.FilesystemVersion + + state SnapState + destroyOp *zfs.DestroySnapOp +} + +//go:generate enumer -type=SnapState -json +type SnapState int + +const ( + SnapStateInitialized SnapState = iota + SnapStateKeeping + SnapStateDeletePending + SnapStateDeleteAttempted +) + +// implements pruning.StepHold +type stepHold struct { + endpoint.Abstraction +} + +func (s snapshot) Replicated() bool { return s.replicated } +func (s snapshot) StepHolds() []pruning.StepHold { return s.stepHolds } + +func (s stepHold) GetJobID() endpoint.JobID { return *s.Abstraction.GetJobID() } + +type FSPruner struct { + jid endpoint.JobID + side Side + keepRules []pruning.KeepRule + fsp *zfs.DatasetPath + + state FSState + + // all channels consumed by the run loop + planned chan fsPlanRes + executed chan fsExecuteRes + done chan struct{} + reportReqs chan fsReportReq + + keepList []*snapshot // valid in FSStateExecuting and forward + destroyList []*snapshot // valid in FSStateExecuting and forward, field .destroyOp is invalid until FSStateExecuting is left + +} + +type fsPlanRes struct { + keepList []*snapshot + destroyList []*snapshot + err error +} + +type fsExecuteRes struct { + completedDestroyOps []*zfs.DestroySnapOp // same len() as FSPruner.destroyList +} + +type fsReportReq struct { + res chan *FSReport +} + +type FSReport struct { + State FSState + KeepList []*SnapReport + Destroy []*SnapReport +} + +type SnapReport struct { + State SnapState + Name string + Replicated bool + StepHoldCount int + DestroyError error +} + +//go:generate enumer -type=FSState -json +type FSState int + +const ( + FSStateInitialized FSState = iota + FSStatePlanning + FSStatePlanErr + FSStateExecuting + FSStateExecuteErr + FSStateExecuteSuccess +) + +func (s FSState) IsTerminal() bool { + return s == FSStatePlanErr || s == FSStateExecuteErr || s == FSStateExecuteSuccess +} + +func NewFSPruner(jid endpoint.JobID, side Side, keepRules []pruning.KeepRule, fsp *zfs.DatasetPath) *FSPruner { + return &FSPruner{ + jid, side, keepRules, fsp, + FSStateInitialized, + make(chan fsPlanRes), + make(chan fsExecuteRes), + make(chan struct{}), + make(chan fsReportReq), + nil, nil, + } +} + +func (p *FSPruner) Run(ctx context.Context) *FSReport { + + defer func() { + }() + + p.state = FSStatePlanning + + go func() { p.planned <- p.plan(ctx) }() + +out: + for !p.state.IsTerminal() { + select { + case res := <-p.planned: + + if res.err != nil { + p.state = FSStatePlanErr + continue + } + p.state = FSStateExecuting + p.keepList = res.keepList + p.destroyList = res.destroyList + + go func() { p.executed <- p.execute(ctx, p.destroyList) }() + + case res := <-p.executed: + + if len(res.completedDestroyOps) != len(p.destroyList) { + panic("impl error: completedDestroyOps is a vector corresponding to entries in p.destroyList") + } + + var erronous []*zfs.DestroySnapOp + for i, op := range res.completedDestroyOps { + if *op.ErrOut != nil { + erronous = append(erronous, op) + } + p.destroyList[i].destroyOp = op + p.destroyList[i].state = SnapStateDeleteAttempted + } + if len(erronous) > 0 { + p.state = FSStateExecuteErr + } else { + p.state = FSStateExecuteSuccess + } + + close(p.done) + + case <-p.reportReqs: + panic("unimp") + case <-p.done: + break out + } + } + + // TODO render last FS report + return nil +} + +func (p *FSPruner) plan(ctx context.Context) fsPlanRes { + fs := p.fsp.ToString() + vs, err := zfs.ZFSListFilesystemVersions(ctx, p.fsp, zfs.ListFilesystemVersionsOptions{}) + if err != nil { + return fsPlanRes{err: errors.Wrap(err, "list filesystem versions")} + } + + allJobsStepHolds, absErrs, err := endpoint.ListAbstractions(ctx, endpoint.ListZFSHoldsAndBookmarksQuery{ + FS: endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{ + FS: &fs, + }, + What: endpoint.AbstractionTypeSet{ + endpoint.AbstractionStepHold: true, + }, + Concurrency: 1, + }) + if err != nil { + return fsPlanRes{err: errors.Wrap(err, "list abstractions")} + } + if len(absErrs) > 0 { + logging.GetLogger(ctx, logging.SubsysPruning).WithError(endpoint.ListAbstractionsErrors(absErrs)). + Error("error listing some step holds, prune attempt might fail with 'dataset is busy' errors") + } + + repPos, err := p.side.GetReplicationPosition(ctx, p.fsp.ToString()) + if err != nil { + return fsPlanRes{err: errors.Wrap(err, "get replication position")} + } + + vsAsSnaps := make([]pruning.Snapshot, len(vs)) + for i := range vs { + var repPosCreateTxgOrZero uint64 + if repPos != nil { + repPosCreateTxgOrZero = repPos.GetCreateTXG() + } + s := &snapshot{ + state: SnapStateInitialized, + FilesystemVersion: vs[i], + replicated: vs[i].GetCreateTXG() <= repPosCreateTxgOrZero, + } + for _, h := range allJobsStepHolds { + if zfs.FilesystemVersionEqualIdentity(vs[i], h.GetFilesystemVersion()) { + s.stepHolds = append(s.stepHolds, stepHold{h}) + } + } + vsAsSnaps[i] = s + } + + downcastToSnapshots := func(l []pruning.Snapshot) (r []*snapshot) { + r = make([]*snapshot, len(l)) + for i, e := range l { + r[i] = e.(*snapshot) + } + return r + } + pruningResult := pruning.PruneSnapshots(vsAsSnaps, p.keepRules) + remove, keep := downcastToSnapshots(pruningResult.Remove), downcastToSnapshots(pruningResult.Keep) + if len(remove)+len(keep) != len(vsAsSnaps) { + for _, s := range vsAsSnaps { + r, _ := json.MarshalIndent(s.(*snapshot).report(), "", " ") + fmt.Fprintf(os.Stderr, "%s\n", string(r)) + } + panic("indecisive") + } + + for _, s := range remove { + s.state = SnapStateDeletePending + } + for _, s := range keep { + s.state = SnapStateKeeping + } + + return fsPlanRes{keepList: keep, destroyList: remove, err: nil} +} + +func (p *FSPruner) execute(ctx context.Context, destroyList []*snapshot) fsExecuteRes { + ops := make([]*zfs.DestroySnapOp, len(destroyList)) + for i, fsv := range p.destroyList { + ops[i] = &zfs.DestroySnapOp{ + Filesystem: p.fsp.ToString(), + Name: fsv.GetName(), + ErrOut: new(error), + } + } + zfs.ZFSDestroyFilesystemVersions(ctx, ops) + + return fsExecuteRes{completedDestroyOps: ops} +} + +func (p *FSPruner) report() *FSReport { + return &FSReport{ + State: p.state, + KeepList: p.reportRenderSnapReports(p.keepList), + Destroy: p.reportRenderSnapReports(p.destroyList), + } +} + +func (p *FSPruner) reportRenderSnapReports(l []*snapshot) (r []*SnapReport) { + r = make([]*SnapReport, len(l)) + for i := range l { + r[i] = l[i].report() + } + return r +} + +func (s *snapshot) report() *SnapReport { + var snapErr error + if s.state == SnapStateDeleteAttempted { + if *s.destroyOp.ErrOut != nil { + snapErr = (*s.destroyOp.ErrOut) + } + } + return &SnapReport{ + State: s.state, + Name: s.Name, + Replicated: s.Replicated(), + StepHoldCount: len(s.stepHolds), + DestroyError: snapErr, + } +} diff --git a/daemon/pruner.v2/pruner_side.go b/daemon/pruner.v2/pruner_side.go new file mode 100644 index 0000000..a653da9 --- /dev/null +++ b/daemon/pruner.v2/pruner_side.go @@ -0,0 +1,27 @@ +package pruner + +import ( + "context" + + "github.com/zrepl/zrepl/endpoint" + "github.com/zrepl/zrepl/zfs" +) + +type SideSender struct { + jobID endpoint.JobID +} + +func NewSideSender(jid endpoint.JobID) *SideSender { + return &SideSender{jid} +} + +func (s *SideSender) isSide() Side { return nil } + +var _ Side = (*SideSender)(nil) + +func (s *SideSender) GetReplicationPosition(ctx context.Context, fs string) (*zfs.FilesystemVersion, error) { + if fs == "" { + panic("must not pass zero value for fs") + } + return endpoint.GetMostRecentReplicationCursorOfJob(ctx, fs, s.jobID) +} diff --git a/daemon/pruner.v2/snapstate_enumer.go b/daemon/pruner.v2/snapstate_enumer.go new file mode 100644 index 0000000..1818a31 --- /dev/null +++ b/daemon/pruner.v2/snapstate_enumer.go @@ -0,0 +1,70 @@ +// Code generated by "enumer -type=SnapState -json"; DO NOT EDIT. + +// +package pruner + +import ( + "encoding/json" + "fmt" +) + +const _SnapStateName = "SnapStateInitializedSnapStateKeepingSnapStateDeletePendingSnapStateDeleteAttempted" + +var _SnapStateIndex = [...]uint8{0, 20, 36, 58, 82} + +func (i SnapState) String() string { + if i < 0 || i >= SnapState(len(_SnapStateIndex)-1) { + return fmt.Sprintf("SnapState(%d)", i) + } + return _SnapStateName[_SnapStateIndex[i]:_SnapStateIndex[i+1]] +} + +var _SnapStateValues = []SnapState{0, 1, 2, 3} + +var _SnapStateNameToValueMap = map[string]SnapState{ + _SnapStateName[0:20]: 0, + _SnapStateName[20:36]: 1, + _SnapStateName[36:58]: 2, + _SnapStateName[58:82]: 3, +} + +// SnapStateString retrieves an enum value from the enum constants string name. +// Throws an error if the param is not part of the enum. +func SnapStateString(s string) (SnapState, error) { + if val, ok := _SnapStateNameToValueMap[s]; ok { + return val, nil + } + return 0, fmt.Errorf("%s does not belong to SnapState values", s) +} + +// SnapStateValues returns all values of the enum +func SnapStateValues() []SnapState { + return _SnapStateValues +} + +// IsASnapState returns "true" if the value is listed in the enum definition. "false" otherwise +func (i SnapState) IsASnapState() bool { + for _, v := range _SnapStateValues { + if i == v { + return true + } + } + return false +} + +// MarshalJSON implements the json.Marshaler interface for SnapState +func (i SnapState) MarshalJSON() ([]byte, error) { + return json.Marshal(i.String()) +} + +// UnmarshalJSON implements the json.Unmarshaler interface for SnapState +func (i *SnapState) UnmarshalJSON(data []byte) error { + var s string + if err := json.Unmarshal(data, &s); err != nil { + return fmt.Errorf("SnapState should be a string, got %s", data) + } + + var err error + *i, err = SnapStateString(s) + return err +} diff --git a/daemon/pruner.v2/state_enumer.go b/daemon/pruner.v2/state_enumer.go new file mode 100644 index 0000000..04ef777 --- /dev/null +++ b/daemon/pruner.v2/state_enumer.go @@ -0,0 +1,71 @@ +// Code generated by "enumer -type=State -json"; DO NOT EDIT. + +// +package pruner + +import ( + "encoding/json" + "fmt" +) + +const _StateName = "StateInitializedStateListFilesystemsStateListFilesystemsErrorStateFanOutFilesystemsStateDone" + +var _StateIndex = [...]uint8{0, 16, 36, 61, 83, 92} + +func (i State) String() string { + if i < 0 || i >= State(len(_StateIndex)-1) { + return fmt.Sprintf("State(%d)", i) + } + return _StateName[_StateIndex[i]:_StateIndex[i+1]] +} + +var _StateValues = []State{0, 1, 2, 3, 4} + +var _StateNameToValueMap = map[string]State{ + _StateName[0:16]: 0, + _StateName[16:36]: 1, + _StateName[36:61]: 2, + _StateName[61:83]: 3, + _StateName[83:92]: 4, +} + +// StateString retrieves an enum value from the enum constants string name. +// Throws an error if the param is not part of the enum. +func StateString(s string) (State, error) { + if val, ok := _StateNameToValueMap[s]; ok { + return val, nil + } + return 0, fmt.Errorf("%s does not belong to State values", s) +} + +// StateValues returns all values of the enum +func StateValues() []State { + return _StateValues +} + +// IsAState returns "true" if the value is listed in the enum definition. "false" otherwise +func (i State) IsAState() bool { + for _, v := range _StateValues { + if i == v { + return true + } + } + return false +} + +// MarshalJSON implements the json.Marshaler interface for State +func (i State) MarshalJSON() ([]byte, error) { + return json.Marshal(i.String()) +} + +// UnmarshalJSON implements the json.Unmarshaler interface for State +func (i *State) UnmarshalJSON(data []byte) error { + var s string + if err := json.Unmarshal(data, &s); err != nil { + return fmt.Errorf("State should be a string, got %s", data) + } + + var err error + *i, err = StateString(s) + return err +} diff --git a/daemon/pruner/pruner.go b/daemon/pruner/pruner.go index 3816483..1084187 100644 --- a/daemon/pruner/pruner.go +++ b/daemon/pruner/pruner.go @@ -20,15 +20,14 @@ import ( ) // Try to keep it compatible with github.com/zrepl/zrepl/endpoint.Endpoint -type History interface { - ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) +type Endpoint interface { ListFilesystems(ctx context.Context, req *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error) + ListFilesystemVersions(ctx context.Context, req *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error) } // Try to keep it compatible with github.com/zrepl/zrepl/endpoint.Endpoint type Target interface { - ListFilesystems(ctx context.Context, req *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error) - ListFilesystemVersions(ctx context.Context, req *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error) + Endpoint DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) } @@ -46,13 +45,14 @@ func GetLogger(ctx context.Context) Logger { } type args struct { - ctx context.Context - target Target - receiver History - rules []pruning.KeepRule - retryWait time.Duration - considerSnapAtCursorReplicated bool - promPruneSecs prometheus.Observer + ctx context.Context + target Target + sender, receiver Endpoint + rules []pruning.KeepRule + retryWait time.Duration + considerSnapAtCursorReplicated bool + convertAnyStepHoldToStepBookmark bool + promPruneSecs prometheus.Observer } type Pruner struct { @@ -70,11 +70,12 @@ type Pruner struct { } type PrunerFactory struct { - senderRules []pruning.KeepRule - receiverRules []pruning.KeepRule - retryWait time.Duration - considerSnapAtCursorReplicated bool - promPruneSecs *prometheus.HistogramVec + senderRules []pruning.KeepRule + receiverRules []pruning.KeepRule + retryWait time.Duration + considerSnapAtCursorReplicated bool + convertAnyStepHoldToStepBookmark bool + promPruneSecs *prometheus.HistogramVec } type LocalPrunerFactory struct { @@ -122,25 +123,35 @@ func NewPrunerFactory(in config.PruningSenderReceiver, promPruneSecs *prometheus } considerSnapAtCursorReplicated = considerSnapAtCursorReplicated || !knr.KeepSnapshotAtCursor } + + convertAnyStepHoldToStepBookmark := false + for _, r := range in.KeepSender { + _, ok := r.Ret.(*config.PruneKeepStepHolds) + convertAnyStepHoldToStepBookmark = convertAnyStepHoldToStepBookmark || ok + } + f := &PrunerFactory{ - senderRules: keepRulesSender, - receiverRules: keepRulesReceiver, - retryWait: envconst.Duration("ZREPL_PRUNER_RETRY_INTERVAL", 10*time.Second), - considerSnapAtCursorReplicated: considerSnapAtCursorReplicated, - promPruneSecs: promPruneSecs, + senderRules: keepRulesSender, + receiverRules: keepRulesReceiver, + retryWait: envconst.Duration("ZREPL_PRUNER_RETRY_INTERVAL", 10*time.Second), + considerSnapAtCursorReplicated: considerSnapAtCursorReplicated, + convertAnyStepHoldToStepBookmark: convertAnyStepHoldToStepBookmark, + promPruneSecs: promPruneSecs, } return f, nil } -func (f *PrunerFactory) BuildSenderPruner(ctx context.Context, target Target, receiver History) *Pruner { +func (f *PrunerFactory) BuildSenderPruner(ctx context.Context, sender Target, receiver Endpoint) *Pruner { p := &Pruner{ args: args{ context.WithValue(ctx, contextKeyPruneSide, "sender"), - target, + sender, + sender, receiver, f.senderRules, f.retryWait, f.considerSnapAtCursorReplicated, + f.convertAnyStepHoldToStepBookmark, f.promPruneSecs.WithLabelValues("sender"), }, state: Plan, @@ -148,15 +159,17 @@ func (f *PrunerFactory) BuildSenderPruner(ctx context.Context, target Target, re return p } -func (f *PrunerFactory) BuildReceiverPruner(ctx context.Context, target Target, receiver History) *Pruner { +func (f *PrunerFactory) BuildReceiverPruner(ctx context.Context, receiver Target, sender Endpoint) *Pruner { p := &Pruner{ args: args{ context.WithValue(ctx, contextKeyPruneSide, "receiver"), - target, + receiver, + sender, receiver, f.receiverRules, f.retryWait, false, // senseless here anyways + false, // senseless here anyways f.promPruneSecs.WithLabelValues("receiver"), }, state: Plan, @@ -164,15 +177,17 @@ func (f *PrunerFactory) BuildReceiverPruner(ctx context.Context, target Target, return p } -func (f *LocalPrunerFactory) BuildLocalPruner(ctx context.Context, target Target, receiver History) *Pruner { +func (f *LocalPrunerFactory) BuildLocalPruner(ctx context.Context, target Target) *Pruner { p := &Pruner{ args: args{ context.WithValue(ctx, contextKeyPruneSide, "local"), target, - receiver, + target, + target, f.keepRules, f.retryWait, false, // considerSnapAtCursorReplicated is not relevant for local pruning + false, // convertAnyStepHoldToStepBookmark is not relevant for local pruning f.promPruneSecs.WithLabelValues("local"), }, state: Plan, @@ -341,11 +356,13 @@ func (s snapshot) Replicated() bool { return s.replicated } func (s snapshot) Date() time.Time { return s.date } +func (s snapshot) CreateTXG() uint64 { return s.fsv.GetCreateTXG() } + func doOneAttempt(a *args, u updater) { - ctx, target, receiver := a.ctx, a.target, a.receiver + ctx, sender, receiver, target := a.ctx, a.sender, a.receiver, a.target - sfssres, err := receiver.ListFilesystems(ctx, &pdu.ListFilesystemReq{}) + sfssres, err := sender.ListFilesystems(ctx, &pdu.ListFilesystemReq{}) if err != nil { u(func(p *Pruner) { p.state = PlanErr @@ -407,6 +424,10 @@ tfss_loop: pfs.snaps = make([]pruning.Snapshot, 0, len(tfsvs)) + receiver.ListFilesystemVersions(ctx, &pdu.ListFilesystemVersionsReq{ + Filesystem: tfs.Path, + }) + rcReq := &pdu.ReplicationCursorReq{ Filesystem: tfs.Path, } @@ -415,6 +436,7 @@ tfss_loop: pfsPlanErrAndLog(err, "cannot get replication cursor bookmark") continue tfss_loop } + if rc.GetNotexist() { err := errors.New("replication cursor bookmark does not exist (one successful replication is required before pruning works)") pfsPlanErrAndLog(err, "") diff --git a/endpoint/endpoint.go b/endpoint/endpoint.go index fa64621..6d1c988 100644 --- a/endpoint/endpoint.go +++ b/endpoint/endpoint.go @@ -112,6 +112,13 @@ func (s *Sender) ListFilesystemVersions(ctx context.Context, r *pdu.ListFilesyst for i := range fsvs { rfsvs[i] = pdu.FilesystemVersionFromZFS(&fsvs[i]) } + + sendAbstractions := sendAbstractionsCacheSingleton.GetByFS(ctx, lp.ToString()) + rSabsInfo := make([]*pdu.SendAbstraction, len(sendAbstractions)) + for i := range rSabsInfo { + rSabsInfo[i] = SendAbstractionToPDU(sendAbstractions[i]) + } + res := &pdu.ListFilesystemVersionsRes{Versions: rfsvs} return res, nil diff --git a/endpoint/endpoint_send_abstractions_cache.go b/endpoint/endpoint_send_abstractions_cache.go index 9bfe653..2ccbab5 100644 --- a/endpoint/endpoint_send_abstractions_cache.go +++ b/endpoint/endpoint_send_abstractions_cache.go @@ -82,6 +82,24 @@ func (s *sendAbstractionsCache) InvalidateFSCache(fs string) { } +func (s *sendAbstractionsCache) GetByFS(ctx context.Context, fs string) (ret []Abstraction) { + defer s.mtx.Lock().Unlock() + defer trace.WithSpanFromStackUpdateCtx(&ctx)() + if fs == "" { + panic("must not pass zero-value fs") + } + + s.tryLoadOnDiskSendAbstractions(ctx, fs) + + for _, a := range s.abstractions { + if a.GetFS() == fs { + ret = append(ret, a) + } + } + + return ret +} + // - logs errors in getting on-disk abstractions // - only fetches on-disk abstractions once, but every time from the in-memory store // diff --git a/endpoint/endpoint_zfs_abstraction_pdu.go b/endpoint/endpoint_zfs_abstraction_pdu.go new file mode 100644 index 0000000..f4280d2 --- /dev/null +++ b/endpoint/endpoint_zfs_abstraction_pdu.go @@ -0,0 +1,28 @@ +package endpoint + +import "github.com/zrepl/zrepl/replication/logic/pdu" + +func SendAbstractionToPDU(a Abstraction) *pdu.SendAbstraction { + var ty pdu.SendAbstraction_SendAbstractionType + switch a.GetType() { + case AbstractionLastReceivedHold: + panic(a) + case AbstractionReplicationCursorBookmarkV1: + panic(a) + case AbstractionReplicationCursorBookmarkV2: + ty = pdu.SendAbstraction_ReplicationCursorV2 + case AbstractionStepHold: + ty = pdu.SendAbstraction_StepHold + case AbstractionStepBookmark: + ty = pdu.SendAbstraction_StepBookmark + default: + panic(a) + } + + version := a.GetFilesystemVersion() + return &pdu.SendAbstraction{ + Type: ty, + JobID: (*a.GetJobID()).String(), + Version: pdu.FilesystemVersionFromZFS(&version), + } +} diff --git a/platformtest/tests/batchDestroy.go b/platformtest/tests/batchDestroy.go index d65ef7d..7eeb80a 100644 --- a/platformtest/tests/batchDestroy.go +++ b/platformtest/tests/batchDestroy.go @@ -2,8 +2,9 @@ package tests import ( "fmt" - "strings" + "github.com/kr/pretty" + "github.com/stretchr/testify/require" "github.com/zrepl/zrepl/platformtest" "github.com/zrepl/zrepl/zfs" ) @@ -17,7 +18,9 @@ func BatchDestroy(ctx *platformtest.Context) { + "foo bar@1" + "foo bar@2" + "foo bar@3" + + "foo bar@4" R zfs hold zrepl_platformtest "${ROOTDS}/foo bar@2" + R zfs hold zrepl_platformtest "${ROOTDS}/foo bar@4" `) reqs := []*zfs.DestroySnapOp{ @@ -31,24 +34,40 @@ func BatchDestroy(ctx *platformtest.Context) { Filesystem: fmt.Sprintf("%s/foo bar", ctx.RootDataset), Name: "2", }, + &zfs.DestroySnapOp{ + ErrOut: new(error), + Filesystem: fmt.Sprintf("%s/foo bar", ctx.RootDataset), + Name: "non existent", + }, + &zfs.DestroySnapOp{ + ErrOut: new(error), + Filesystem: fmt.Sprintf("%s/foo bar", ctx.RootDataset), + Name: "4", + }, } zfs.ZFSDestroyFilesystemVersions(ctx, reqs) + + pretty.Println(reqs) + if *reqs[0].ErrOut != nil { panic("expecting no error") } - err := (*reqs[1].ErrOut).Error() - if !strings.Contains(err, fmt.Sprintf("%s/foo bar@2", ctx.RootDataset)) { - panic(fmt.Sprintf("expecting error about being unable to destroy @2: %T\n%s", err, err)) - } + + eBusy, ok := (*reqs[1].ErrOut).(*zfs.ErrDestroySnapshotDatasetIsBusy) + require.True(ctx, ok) + require.Equal(ctx, reqs[1].Name, eBusy.Name) + + require.Nil(ctx, *reqs[2].ErrOut, "destroying non-existent snap is not an error (idempotence)") + + eBusy, ok = (*reqs[3].ErrOut).(*zfs.ErrDestroySnapshotDatasetIsBusy) + require.True(ctx, ok) + require.Equal(ctx, reqs[3].Name, eBusy.Name) platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, ` !N "foo bar@3" !E "foo bar@1" !E "foo bar@2" - R zfs release zrepl_platformtest "${ROOTDS}/foo bar@2" - - "foo bar@2" - - "foo bar@1" - - "foo bar" + !E "foo bar@4" `) } diff --git a/platformtest/tests/generated_cases.go b/platformtest/tests/generated_cases.go index 311944d..86b366e 100644 --- a/platformtest/tests/generated_cases.go +++ b/platformtest/tests/generated_cases.go @@ -14,6 +14,7 @@ var Cases = []Case{BatchDestroy, ListFilesystemVersionsUserrefs, ListFilesystemVersionsZeroExistIsNotAnError, ListFilesystemsNoFilter, + Pruner2NotReplicated, ReceiveForceIntoEncryptedErr, ReceiveForceRollbackWorksUnencrypted, ReplicationIncrementalCleansUpStaleAbstractionsWithCacheOnSecondReplication, diff --git a/platformtest/tests/pruner.go.deact b/platformtest/tests/pruner.go.deact new file mode 100644 index 0000000..3e23403 --- /dev/null +++ b/platformtest/tests/pruner.go.deact @@ -0,0 +1,331 @@ +package tests + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/daemon/filters" + "github.com/zrepl/zrepl/daemon/pruner" + "github.com/zrepl/zrepl/endpoint" + "github.com/zrepl/zrepl/platformtest" + "github.com/zrepl/zrepl/zfs" +) + +func PrunerNotReplicated(ctx *platformtest.Context) { + + platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, ` + DESTROYROOT + CREATEROOT + + "foo bar" + + "foo bar@1" + + "foo bar@2" + + "foo bar@3" + + "foo bar@4" + + "foo bar@5" + `) + + c, err := config.ParseConfigBytes([]byte(fmt.Sprintf(` +jobs: +- name: prunetest + type: push + filesystems: { + "%s/foo bar<": true + } + connect: + type: tcp + address: 255.255.255.255:255 + snapshotting: + type: manual + pruning: + keep_sender: + - type: not_replicated + - type: last_n + count: 1 + keep_receiver: + - type: last_n + count: 2 + `, ctx.RootDataset))) + require.NoError(ctx, err) + + pushJob := c.Jobs[0].Ret.(*config.PushJob) + + dummyHistVec := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "foo", + Subsystem: "foo", + Name: "foo", + Help: "foo", + }, []string{"foo"}) + + prunerFactory, err := pruner.NewPrunerFactory(pushJob.Pruning, dummyHistVec) + require.NoError(ctx, err) + + senderJid := endpoint.MustMakeJobID("sender-job") + + fsfilter, err := filters.DatasetMapFilterFromConfig(pushJob.Filesystems) + require.NoError(ctx, err) + + sender := endpoint.NewSender(endpoint.SenderConfig{ + FSF: fsfilter, + Encrypt: &zfs.NilBool{ + B: false, + }, + JobID: senderJid, + }) + + fs := ctx.RootDataset + "/foo bar" + + // create a replication cursor to make pruning work at all + _, err = endpoint.CreateReplicationCursor(ctx, fs, fsversion(ctx, fs, "@2"), senderJid) + require.NoError(ctx, err) + + p := prunerFactory.BuildSenderPruner(ctx, sender, sender) + + p.Prune() + + report := p.Report() + + reportJSON, err := json.MarshalIndent(report, "", " ") + require.NoError(ctx, err) + ctx.Logf("%s\n", string(reportJSON)) + + require.Equal(ctx, pruner.Done.String(), report.State) + require.Len(ctx, report.Completed, 1) + fsReport := report.Completed[0] + require.Equal(ctx, fs, fsReport.Filesystem) + require.Empty(ctx, fsReport.SkipReason) + require.Empty(ctx, fsReport.LastError) + require.Len(ctx, fsReport.DestroyList, 1) + require.Equal(ctx, fsReport.DestroyList[0], pruner.SnapshotReport{ + Name: "1", + Replicated: true, + Date: fsReport.DestroyList[0].Date, + }) + +} + +func PrunerNoKeepNotReplicatedNoKeepStepHoldConvertsAnyStepHoldToBookmark(ctx *platformtest.Context) { + + platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, ` + DESTROYROOT + CREATEROOT + + "foo bar" + + "foo bar@1" + + "foo bar@2" + + "foo bar@3" + + "foo bar@4" + + "foo bar@5" + `) + + c, err := config.ParseConfigBytes([]byte(fmt.Sprintf(` +jobs: +- name: prunetest + type: push + filesystems: { + "%s/foo bar<": true + } + connect: + type: tcp + address: 255.255.255.255:255 + snapshotting: + type: manual + pruning: + keep_sender: + - type: last_n + count: 1 + keep_receiver: + - type: last_n + count: 2 + `, ctx.RootDataset))) + require.NoError(ctx, err) + + pushJob := c.Jobs[0].Ret.(*config.PushJob) + + dummyHistVec := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "foo", + Subsystem: "foo", + Name: "foo", + Help: "foo", + }, []string{"foo"}) + + prunerFactory, err := pruner.NewPrunerFactory(pushJob.Pruning, dummyHistVec) + require.NoError(ctx, err) + + senderJid := endpoint.MustMakeJobID("sender-job") + + fsfilter, err := filters.DatasetMapFilterFromConfig(pushJob.Filesystems) + require.NoError(ctx, err) + + sender := endpoint.NewSender(endpoint.SenderConfig{ + FSF: fsfilter, + Encrypt: &zfs.NilBool{ + B: false, + }, + JobID: senderJid, + }) + + fs := ctx.RootDataset + "/foo bar" + + // create a replication cursor to make pruning work at all + _, err = endpoint.CreateReplicationCursor(ctx, fs, fsversion(ctx, fs, "@2"), senderJid) + require.NoError(ctx, err) + + // create step holds for the incremental @2->@3 + endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@2"), senderJid) + endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@3"), senderJid) + // create step holds for another job + otherJid := endpoint.MustMakeJobID("other-job") + endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@2"), otherJid) + endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@3"), otherJid) + + p := prunerFactory.BuildSenderPruner(ctx, sender, sender) + + p.Prune() + + report := p.Report() + + reportJSON, err := json.MarshalIndent(report, "", " ") + require.NoError(ctx, err) + ctx.Logf("%s\n", string(reportJSON)) + + require.Equal(ctx, pruner.Done.String(), report.State) + require.Len(ctx, report.Completed, 1) + fsReport := report.Completed[0] + require.Equal(ctx, fs, fsReport.Filesystem) + require.Empty(ctx, fsReport.SkipReason) + require.Empty(ctx, fsReport.LastError) + expectDestroyList := []pruner.SnapshotReport{ + { + Name: "1", + Replicated: true, + }, + { + Name: "2", + Replicated: true, + }, + { + Name: "3", + Replicated: true, + }, + { + Name: "4", + Replicated: true, + }, + } + for _, d := range fsReport.DestroyList { + d.Date = time.Time{} + } + require.Subset(ctx, fsReport.DestroyList, expectDestroyList) +} + + + +func PrunerNoKeepNotReplicatedButKeepStepHold(ctx *platformtest.Context) { + + platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, ` + DESTROYROOT + CREATEROOT + + "foo bar" + + "foo bar@1" + + "foo bar@2" + + "foo bar@3" + + "foo bar@4" + + "foo bar@5" + `) + + c, err := config.ParseConfigBytes([]byte(fmt.Sprintf(` +jobs: +- name: prunetest + type: push + filesystems: { + "%s/foo bar<": true + } + connect: + type: tcp + address: 255.255.255.255:255 + snapshotting: + type: manual + pruning: + keep_sender: + - type: step_holds + - type: last_n + count: 1 + keep_receiver: + - type: last_n + count: 2 + `, ctx.RootDataset))) + require.NoError(ctx, err) + + pushJob := c.Jobs[0].Ret.(*config.PushJob) + + dummyHistVec := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "foo", + Subsystem: "foo", + Name: "foo", + Help: "foo", + }, []string{"foo"}) + + prunerFactory, err := pruner.NewPrunerFactory(pushJob.Pruning, dummyHistVec) + require.NoError(ctx, err) + + senderJid := endpoint.MustMakeJobID("sender-job") + + fsfilter, err := filters.DatasetMapFilterFromConfig(pushJob.Filesystems) + require.NoError(ctx, err) + + sender := endpoint.NewSender(endpoint.SenderConfig{ + FSF: fsfilter, + Encrypt: &zfs.NilBool{ + B: false, + }, + JobID: senderJid, + }) + + fs := ctx.RootDataset + "/foo bar" + + // create a replication cursor to make pruning work at all + _, err = endpoint.CreateReplicationCursor(ctx, fs, fsversion(ctx, fs, "@2"), senderJid) + require.NoError(ctx, err) + + // create step holds for the incremental @2->@3 + endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@2"), senderJid) + endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@3"), senderJid) + // create step holds for another job + otherJid := endpoint.MustMakeJobID("other-job") + endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@2"), otherJid) + endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@3"), otherJid) + + p := prunerFactory.BuildSenderPruner(ctx, sender, sender) + + p.Prune() + + report := p.Report() + + reportJSON, err := json.MarshalIndent(report, "", " ") + require.NoError(ctx, err) + ctx.Logf("%s\n", string(reportJSON)) + + require.Equal(ctx, pruner.Done.String(), report.State) + require.Len(ctx, report.Completed, 1) + fsReport := report.Completed[0] + require.Equal(ctx, fs, fsReport.Filesystem) + require.Empty(ctx, fsReport.SkipReason) + require.Empty(ctx, fsReport.LastError) + expectDestroyList := []pruner.SnapshotReport{ + { + Name: "1", + Replicated: true, + }, + { + Name: "4", + Replicated: true, + }, + } + for _, d := range fsReport.DestroyList { + d.Date = time.Time{} + } + require.Subset(ctx, fsReport.DestroyList, expectDestroyList) +} diff --git a/platformtest/tests/pruner.v2.go b/platformtest/tests/pruner.v2.go new file mode 100644 index 0000000..d3c7a04 --- /dev/null +++ b/platformtest/tests/pruner.v2.go @@ -0,0 +1,137 @@ +package tests + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/kr/pretty" + "github.com/stretchr/testify/require" + "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/daemon/filters" + "github.com/zrepl/zrepl/daemon/pruner.v2" + "github.com/zrepl/zrepl/endpoint" + "github.com/zrepl/zrepl/platformtest" + "github.com/zrepl/zrepl/pruning" + "github.com/zrepl/zrepl/zfs" +) + +func Pruner2NotReplicated(ctx *platformtest.Context) { + + platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, ` + DESTROYROOT + CREATEROOT + + "foo bar" + + "foo bar@1" + + "foo bar@2" + + "foo bar@3" + + "foo bar@4" + + "foo bar@5" + `) + + fs := ctx.RootDataset + "/foo bar" + senderJid := endpoint.MustMakeJobID("sender-job") + otherJid1 := endpoint.MustMakeJobID("other-job-1") + otherJid2 := endpoint.MustMakeJobID("other-job-2") + + + // create step holds for the incremental @2->@3 + endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@2"), senderJid) + endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@3"), senderJid) + // create step holds for other-job-1 @2 -> @3 + endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@2"), otherJid1) + endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@3"), otherJid1) + // create step hold for other-job-2 @1 (will be pruned) + endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@1"), otherJid2) + + c, err := config.ParseConfigBytes([]byte(fmt.Sprintf(` +jobs: +- name: prunetest + type: push + filesystems: { + "%s/foo bar": true + } + connect: + type: tcp + address: 255.255.255.255:255 + snapshotting: + type: manual + pruning: + keep_sender: + #- type: not_replicated + - type: step_holds + - type: last_n + count: 1 + keep_receiver: + - type: last_n + count: 2 + `, ctx.RootDataset))) + require.NoError(ctx, err) + + pushJob := c.Jobs[0].Ret.(*config.PushJob) + + require.NoError(ctx, err) + + fsfilter, err := filters.DatasetMapFilterFromConfig(pushJob.Filesystems) + require.NoError(ctx, err) + + matchedFilesystems, err := zfs.ZFSListMapping(ctx, fsfilter) + ctx.Logf("%s", pretty.Sprint(matchedFilesystems)) + require.NoError(ctx, err) + require.Len(ctx, matchedFilesystems, 1) + + sideSender := pruner.NewSideSender(senderJid) + + keepRules, err := pruning.RulesFromConfig(senderJid, pushJob.Pruning.KeepSender) + require.NoError(ctx, err) + p := pruner.NewPruner(fsfilter, senderJid, sideSender, keepRules) + + runDone := make(chan *pruner.Report) + go func() { + runDone <- p.Run(ctx) + }() + + var report *pruner.Report + // concurrency stress +out: + for { + select { + case <-time.After(10 * time.Millisecond): + p.Report(ctx) + case report = <-runDone: + break out + } + } + ctx.Logf("%s\n", pretty.Sprint(report)) + + reportJSON, err := json.MarshalIndent(report, "", " ") + require.NoError(ctx, err) + ctx.Logf("%s\n", string(reportJSON)) + + ctx.FailNow() + // fs := ctx.RootDataset + "/foo bar" + + // // create a replication cursor to make pruning work at all + // _, err = endpoint.CreateReplicationCursor(ctx, fs, fsversion(ctx, fs, "@2"), senderJid) + // require.NoError(ctx, err) + + // p := prunerFactory.BuildSenderPruner(ctx, sender, sender) + + // p.Prune() + + // report := p.Report() + + // require.Equal(ctx, pruner.Done.String(), report.State) + // require.Len(ctx, report.Completed, 1) + // fsReport := report.Completed[0] + // require.Equal(ctx, fs, fsReport.Filesystem) + // require.Empty(ctx, fsReport.SkipReason) + // require.Empty(ctx, fsReport.LastError) + // require.Len(ctx, fsReport.DestroyList, 1) + // require.Equal(ctx, fsReport.DestroyList[0], pruner.SnapshotReport{ + // Name: "1", + // Replicated: true, + // Date: fsReport.DestroyList[0].Date, + // }) + +} diff --git a/platformtest/tests/undestroyableSnapshotParsing.go b/platformtest/tests/undestroyableSnapshotParsing.go index 8b6089e..7c31289 100644 --- a/platformtest/tests/undestroyableSnapshotParsing.go +++ b/platformtest/tests/undestroyableSnapshotParsing.go @@ -17,10 +17,12 @@ func UndestroyableSnapshotParsing(t *platformtest.Context) { + "foo bar@1 2 3" + "foo bar@4 5 6" + "foo bar@7 8 9" + + "foo bar@10 11 12" R zfs hold zrepl_platformtest "${ROOTDS}/foo bar@4 5 6" + R zfs hold zrepl_platformtest "${ROOTDS}/foo bar@7 8 9" `) - err := zfs.ZFSDestroy(t, fmt.Sprintf("%s/foo bar@1 2 3,4 5 6,7 8 9", t.RootDataset)) + err := zfs.ZFSDestroy(t, fmt.Sprintf("%s/foo bar@1 2 3,4 5 6,7 8 9,10 11 12", t.RootDataset)) if err == nil { panic("expecting destroy error due to hold") } @@ -30,8 +32,9 @@ func UndestroyableSnapshotParsing(t *platformtest.Context) { if dse.Filesystem != fmt.Sprintf("%s/foo bar", t.RootDataset) { panic(dse.Filesystem) } - require.Equal(t, []string{"4 5 6"}, dse.Undestroyable) - require.Equal(t, []string{"dataset is busy"}, dse.Reason) + expectUndestroyable := []string{"4 5 6", "7 8 9"} + require.Len(t, dse.Undestroyable, len(expectUndestroyable)) + require.Subset(t, dse.Undestroyable, expectUndestroyable) } } diff --git a/pruning/keep_grid.go b/pruning/keep_grid.go index 05d61d7..8ba4904 100644 --- a/pruning/keep_grid.go +++ b/pruning/keep_grid.go @@ -66,24 +66,26 @@ type retentionGridAdaptor struct { Snapshot } +func (a retentionGridAdaptor) Date() time.Time { return a.Snapshot.GetCreation() } + func (a retentionGridAdaptor) LessThan(b retentiongrid.Entry) bool { return a.Date().Before(b.Date()) } // Prune filters snapshots with the retention grid. -func (p *KeepGrid) KeepRule(snaps []Snapshot) (destroyList []Snapshot) { +func (p *KeepGrid) KeepRule(snaps []Snapshot) PruneSnapshotsResult { - snaps = filterSnapList(snaps, func(snapshot Snapshot) bool { - return p.re.MatchString(snapshot.Name()) + reCandidates := partitionSnapList(snaps, func(snapshot Snapshot) bool { + return p.re.MatchString(snapshot.GetName()) }) - if len(snaps) == 0 { - return nil + if len(reCandidates.Remove) == 0 { + return reCandidates } // Build adaptors for retention grid adaptors := make([]retentiongrid.Entry, 0) for i := range snaps { - adaptors = append(adaptors, retentionGridAdaptor{snaps[i]}) + adaptors = append(adaptors, retentionGridAdaptor{reCandidates.Remove[i]}) } // determine 'now' edge @@ -93,12 +95,17 @@ func (p *KeepGrid) KeepRule(snaps []Snapshot) (destroyList []Snapshot) { now := adaptors[len(adaptors)-1].Date() // Evaluate retention grid - _, removea := p.retentionGrid.FitEntries(now, adaptors) + keepa, removea := p.retentionGrid.FitEntries(now, adaptors) // Revert adaptors - destroyList = make([]Snapshot, len(removea)) + destroyList := make([]Snapshot, len(removea)) for i := range removea { destroyList[i] = removea[i].(retentionGridAdaptor).Snapshot } - return destroyList + for _, a := range keepa { + reCandidates.Keep = append(reCandidates.Keep, a.(retentionGridAdaptor)) + } + reCandidates.Remove = destroyList + + return reCandidates } diff --git a/pruning/keep_helpers.go b/pruning/keep_helpers.go index a756ce7..4f82bab 100644 --- a/pruning/keep_helpers.go +++ b/pruning/keep_helpers.go @@ -1,10 +1,13 @@ package pruning -func filterSnapList(snaps []Snapshot, predicate func(Snapshot) bool) []Snapshot { - r := make([]Snapshot, 0, len(snaps)) +func partitionSnapList(snaps []Snapshot, remove func(Snapshot) bool) (r PruneSnapshotsResult) { + r.Keep = make([]Snapshot, 0, len(snaps)) + r.Remove = make([]Snapshot, 0, len(snaps)) for i := range snaps { - if predicate(snaps[i]) { - r = append(r, snaps[i]) + if remove(snaps[i]) { + r.Remove = append(r.Remove, snaps[i]) + } else { + r.Keep = append(r.Keep, snaps[i]) } } return r diff --git a/pruning/keep_last_n.go b/pruning/keep_last_n.go index 487eaff..e5368d3 100644 --- a/pruning/keep_last_n.go +++ b/pruning/keep_last_n.go @@ -17,17 +17,17 @@ func NewKeepLastN(n int) (*KeepLastN, error) { return &KeepLastN{n}, nil } -func (k KeepLastN) KeepRule(snaps []Snapshot) (destroyList []Snapshot) { +func (k KeepLastN) KeepRule(snaps []Snapshot) PruneSnapshotsResult { if k.n > len(snaps) { - return []Snapshot{} + return PruneSnapshotsResult{Keep: snaps} } res := shallowCopySnapList(snaps) sort.Slice(res, func(i, j int) bool { - return res[i].Date().After(res[j].Date()) + return res[i].GetCreateTXG() > res[j].GetCreateTXG() }) - return res[k.n:] + return PruneSnapshotsResult{Remove: res[k.n:], Keep: res[:k.n]} } diff --git a/pruning/keep_not_replicated.go b/pruning/keep_not_replicated.go index e84a5c2..041e78c 100644 --- a/pruning/keep_not_replicated.go +++ b/pruning/keep_not_replicated.go @@ -2,8 +2,8 @@ package pruning type KeepNotReplicated struct{} -func (*KeepNotReplicated) KeepRule(snaps []Snapshot) (destroyList []Snapshot) { - return filterSnapList(snaps, func(snapshot Snapshot) bool { +func (*KeepNotReplicated) KeepRule(snaps []Snapshot) PruneSnapshotsResult { + return partitionSnapList(snaps, func(snapshot Snapshot) bool { return snapshot.Replicated() }) } diff --git a/pruning/keep_regex.go b/pruning/keep_regex.go index 7618075..34bb929 100644 --- a/pruning/keep_regex.go +++ b/pruning/keep_regex.go @@ -27,12 +27,12 @@ func MustKeepRegex(expr string, negate bool) *KeepRegex { return k } -func (k *KeepRegex) KeepRule(snaps []Snapshot) []Snapshot { - return filterSnapList(snaps, func(s Snapshot) bool { +func (k *KeepRegex) KeepRule(snaps []Snapshot) PruneSnapshotsResult { + return partitionSnapList(snaps, func(s Snapshot) bool { if k.negate { - return k.expr.FindStringIndex(s.Name()) != nil + return k.expr.FindStringIndex(s.GetName()) != nil } else { - return k.expr.FindStringIndex(s.Name()) == nil + return k.expr.FindStringIndex(s.GetName()) == nil } }) } diff --git a/pruning/keep_step_holds.go b/pruning/keep_step_holds.go new file mode 100644 index 0000000..1dda5f8 --- /dev/null +++ b/pruning/keep_step_holds.go @@ -0,0 +1,44 @@ +package pruning + +import ( + "github.com/pkg/errors" + "github.com/zrepl/zrepl/endpoint" +) + +type KeepStepHolds struct { + keepJobIDs map[endpoint.JobID]bool +} + +var _ KeepRule = (*KeepStepHolds)(nil) + +func NewKeepStepHolds(mainJobId endpoint.JobID, additionalJobIdsStrings []string) (_ *KeepStepHolds, err error) { + additionalJobIds := make(map[endpoint.JobID]bool, len(additionalJobIdsStrings)) + + mainJobId.MustValidate() + additionalJobIds[mainJobId] = true + + for i := range additionalJobIdsStrings { + ajid, err := endpoint.MakeJobID(additionalJobIdsStrings[i]) + if err != nil { + return nil, errors.WithMessagef(err, "cannot parse job id %q: %s", additionalJobIdsStrings[i]) + } + if additionalJobIds[ajid] == true { + return nil, errors.Errorf("duplicate job id %q", ajid) + } + } + return &KeepStepHolds{additionalJobIds}, nil +} + +func (h *KeepStepHolds) KeepRule(snaps []Snapshot) PruneSnapshotsResult { + return partitionSnapList(snaps, func(s Snapshot) bool { + holdingJobIDs := make(map[endpoint.JobID]bool) + for _, h := range s.StepHolds() { + holdingJobIDs[h.GetJobID()] = true + } + oneOrMoreOfOurJobIDsHoldsSnap := false + for kjid := range h.keepJobIDs { + oneOrMoreOfOurJobIDsHoldsSnap = oneOrMoreOfOurJobIDsHoldsSnap || holdingJobIDs[kjid] + } + return !oneOrMoreOfOurJobIDsHoldsSnap + }) +} diff --git a/pruning/pruning.go b/pruning/pruning.go index 9f36581..c50058f 100644 --- a/pruning/pruning.go +++ b/pruning/pruning.go @@ -7,47 +7,93 @@ import ( "github.com/pkg/errors" "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/endpoint" ) type KeepRule interface { - KeepRule(snaps []Snapshot) (destroyList []Snapshot) + KeepRule(snaps []Snapshot) PruneSnapshotsResult } type Snapshot interface { - Name() string + GetName() string Replicated() bool - Date() time.Time + GetCreation() time.Time + GetCreateTXG() uint64 + StepHolds() []StepHold } -// The returned snapshot list is guaranteed to only contains elements of input parameter snaps -func PruneSnapshots(snaps []Snapshot, keepRules []KeepRule) []Snapshot { +type StepHold interface { + GetJobID() endpoint.JobID +} + +type PruneSnapshotsResult struct { + Remove, Keep []Snapshot +} + +// The returned snapshot results are a partition of the snaps argument. +// That means than len(Remove) + len(Keep) == len(snaps) +func PruneSnapshots(snapsI []Snapshot, keepRules []KeepRule) PruneSnapshotsResult { if len(keepRules) == 0 { - return []Snapshot{} + return PruneSnapshotsResult{Remove: nil, Keep: snapsI} + } + + type snapshot struct { + Snapshot + keepCount, removeCount int + } + + // project down to snapshot + snaps := make([]Snapshot, len(snapsI)) + for i := range snaps { + snaps[i] = &snapshot{snapsI[i], 0, 0} } - remCount := make(map[Snapshot]int, len(snaps)) for _, r := range keepRules { - ruleRems := r.KeepRule(snaps) - for _, ruleRem := range ruleRems { - remCount[ruleRem]++ + + ruleImplCheckSet := make(map[Snapshot]int, len(snaps)) + for _, s := range snaps { + ruleImplCheckSet[s] = ruleImplCheckSet[s] + 1 + } + + ruleResults := r.KeepRule(snaps) + + for _, s := range snaps { + ruleImplCheckSet[s] = ruleImplCheckSet[s] - 1 + } + for _, n := range ruleImplCheckSet { + if n != 0 { + panic(fmt.Sprintf("incorrect rule implementation: %T", r)) + } + } + + for _, s := range ruleResults.Remove { + s.(*snapshot).removeCount++ + } + for _, s := range ruleResults.Keep { + s.(*snapshot).keepCount++ } } remove := make([]Snapshot, 0, len(snaps)) - for snap, rc := range remCount { - if rc == len(keepRules) { - remove = append(remove, snap) + keep := make([]Snapshot, 0, len(snaps)) + for _, sI := range snaps { + s := sI.(*snapshot) + if s.removeCount == len(keepRules) { + // all keep rules agree to remove the snap + remove = append(remove, s.Snapshot) + } else { + keep = append(keep, s.Snapshot) } } - return remove + return PruneSnapshotsResult{Remove: remove, Keep: keep} } -func RulesFromConfig(in []config.PruningEnum) (rules []KeepRule, err error) { +func RulesFromConfig(mainJobId endpoint.JobID, in []config.PruningEnum) (rules []KeepRule, err error) { rules = make([]KeepRule, len(in)) for i := range in { - rules[i], err = RuleFromConfig(in[i]) + rules[i], err = RuleFromConfig(mainJobId, in[i]) if err != nil { return nil, errors.Wrapf(err, "cannot build rule #%d", i) } @@ -55,7 +101,7 @@ func RulesFromConfig(in []config.PruningEnum) (rules []KeepRule, err error) { return rules, nil } -func RuleFromConfig(in config.PruningEnum) (KeepRule, error) { +func RuleFromConfig(mainJobId endpoint.JobID, in config.PruningEnum) (KeepRule, error) { switch v := in.Ret.(type) { case *config.PruneKeepNotReplicated: return NewKeepNotReplicated(), nil @@ -65,6 +111,8 @@ func RuleFromConfig(in config.PruningEnum) (KeepRule, error) { return NewKeepRegex(v.Regex, v.Negate) case *config.PruneGrid: return NewKeepGrid(v) + case *config.PruneKeepStepHolds: + return NewKeepStepHolds(mainJobId, v.AdditionalJobIds) default: return nil, fmt.Errorf("unknown keep rule type %T", v) } diff --git a/replication/logic/pdu/pdu.pb.go b/replication/logic/pdu/pdu.pb.go index cdb85bc..3cfd468 100644 --- a/replication/logic/pdu/pdu.pb.go +++ b/replication/logic/pdu/pdu.pb.go @@ -46,7 +46,36 @@ func (x Tri) String() string { return proto.EnumName(Tri_name, int32(x)) } func (Tri) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_pdu_483c6918b7b3d747, []int{0} + return fileDescriptor_pdu_2d84e8d7d278a80d, []int{0} +} + +type SendAbstraction_SendAbstractionType int32 + +const ( + SendAbstraction_Undefined SendAbstraction_SendAbstractionType = 0 + SendAbstraction_ReplicationCursorV2 SendAbstraction_SendAbstractionType = 1 + SendAbstraction_StepHold SendAbstraction_SendAbstractionType = 2 + SendAbstraction_StepBookmark SendAbstraction_SendAbstractionType = 3 +) + +var SendAbstraction_SendAbstractionType_name = map[int32]string{ + 0: "Undefined", + 1: "ReplicationCursorV2", + 2: "StepHold", + 3: "StepBookmark", +} +var SendAbstraction_SendAbstractionType_value = map[string]int32{ + "Undefined": 0, + "ReplicationCursorV2": 1, + "StepHold": 2, + "StepBookmark": 3, +} + +func (x SendAbstraction_SendAbstractionType) String() string { + return proto.EnumName(SendAbstraction_SendAbstractionType_name, int32(x)) +} +func (SendAbstraction_SendAbstractionType) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_pdu_2d84e8d7d278a80d, []int{5, 0} } type FilesystemVersion_VersionType int32 @@ -69,7 +98,7 @@ func (x FilesystemVersion_VersionType) String() string { return proto.EnumName(FilesystemVersion_VersionType_name, int32(x)) } func (FilesystemVersion_VersionType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_pdu_483c6918b7b3d747, []int{5, 0} + return fileDescriptor_pdu_2d84e8d7d278a80d, []int{6, 0} } type ListFilesystemReq struct { @@ -82,7 +111,7 @@ func (m *ListFilesystemReq) Reset() { *m = ListFilesystemReq{} } func (m *ListFilesystemReq) String() string { return proto.CompactTextString(m) } func (*ListFilesystemReq) ProtoMessage() {} func (*ListFilesystemReq) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_483c6918b7b3d747, []int{0} + return fileDescriptor_pdu_2d84e8d7d278a80d, []int{0} } func (m *ListFilesystemReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ListFilesystemReq.Unmarshal(m, b) @@ -113,7 +142,7 @@ func (m *ListFilesystemRes) Reset() { *m = ListFilesystemRes{} } func (m *ListFilesystemRes) String() string { return proto.CompactTextString(m) } func (*ListFilesystemRes) ProtoMessage() {} func (*ListFilesystemRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_483c6918b7b3d747, []int{1} + return fileDescriptor_pdu_2d84e8d7d278a80d, []int{1} } func (m *ListFilesystemRes) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ListFilesystemRes.Unmarshal(m, b) @@ -154,7 +183,7 @@ func (m *Filesystem) Reset() { *m = Filesystem{} } func (m *Filesystem) String() string { return proto.CompactTextString(m) } func (*Filesystem) ProtoMessage() {} func (*Filesystem) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_483c6918b7b3d747, []int{2} + return fileDescriptor_pdu_2d84e8d7d278a80d, []int{2} } func (m *Filesystem) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Filesystem.Unmarshal(m, b) @@ -213,7 +242,7 @@ func (m *ListFilesystemVersionsReq) Reset() { *m = ListFilesystemVersion func (m *ListFilesystemVersionsReq) String() string { return proto.CompactTextString(m) } func (*ListFilesystemVersionsReq) ProtoMessage() {} func (*ListFilesystemVersionsReq) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_483c6918b7b3d747, []int{3} + return fileDescriptor_pdu_2d84e8d7d278a80d, []int{3} } func (m *ListFilesystemVersionsReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ListFilesystemVersionsReq.Unmarshal(m, b) @@ -242,6 +271,7 @@ func (m *ListFilesystemVersionsReq) GetFilesystem() string { type ListFilesystemVersionsRes struct { Versions []*FilesystemVersion `protobuf:"bytes,1,rep,name=Versions,proto3" json:"Versions,omitempty"` + SendAbstractions []*SendAbstraction `protobuf:"bytes,2,rep,name=SendAbstractions,proto3" json:"SendAbstractions,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -251,7 +281,7 @@ func (m *ListFilesystemVersionsRes) Reset() { *m = ListFilesystemVersion func (m *ListFilesystemVersionsRes) String() string { return proto.CompactTextString(m) } func (*ListFilesystemVersionsRes) ProtoMessage() {} func (*ListFilesystemVersionsRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_483c6918b7b3d747, []int{4} + return fileDescriptor_pdu_2d84e8d7d278a80d, []int{4} } func (m *ListFilesystemVersionsRes) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ListFilesystemVersionsRes.Unmarshal(m, b) @@ -278,6 +308,67 @@ func (m *ListFilesystemVersionsRes) GetVersions() []*FilesystemVersion { return nil } +func (m *ListFilesystemVersionsRes) GetSendAbstractions() []*SendAbstraction { + if m != nil { + return m.SendAbstractions + } + return nil +} + +type SendAbstraction struct { + Type SendAbstraction_SendAbstractionType `protobuf:"varint,1,opt,name=Type,proto3,enum=SendAbstraction_SendAbstractionType" json:"Type,omitempty"` + JobID string `protobuf:"bytes,2,opt,name=JobID,proto3" json:"JobID,omitempty"` + Version *FilesystemVersion `protobuf:"bytes,3,opt,name=Version,proto3" json:"Version,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SendAbstraction) Reset() { *m = SendAbstraction{} } +func (m *SendAbstraction) String() string { return proto.CompactTextString(m) } +func (*SendAbstraction) ProtoMessage() {} +func (*SendAbstraction) Descriptor() ([]byte, []int) { + return fileDescriptor_pdu_2d84e8d7d278a80d, []int{5} +} +func (m *SendAbstraction) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_SendAbstraction.Unmarshal(m, b) +} +func (m *SendAbstraction) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_SendAbstraction.Marshal(b, m, deterministic) +} +func (dst *SendAbstraction) XXX_Merge(src proto.Message) { + xxx_messageInfo_SendAbstraction.Merge(dst, src) +} +func (m *SendAbstraction) XXX_Size() int { + return xxx_messageInfo_SendAbstraction.Size(m) +} +func (m *SendAbstraction) XXX_DiscardUnknown() { + xxx_messageInfo_SendAbstraction.DiscardUnknown(m) +} + +var xxx_messageInfo_SendAbstraction proto.InternalMessageInfo + +func (m *SendAbstraction) GetType() SendAbstraction_SendAbstractionType { + if m != nil { + return m.Type + } + return SendAbstraction_Undefined +} + +func (m *SendAbstraction) GetJobID() string { + if m != nil { + return m.JobID + } + return "" +} + +func (m *SendAbstraction) GetVersion() *FilesystemVersion { + if m != nil { + return m.Version + } + return nil +} + type FilesystemVersion struct { Type FilesystemVersion_VersionType `protobuf:"varint,1,opt,name=Type,proto3,enum=FilesystemVersion_VersionType" json:"Type,omitempty"` Name string `protobuf:"bytes,2,opt,name=Name,proto3" json:"Name,omitempty"` @@ -293,7 +384,7 @@ func (m *FilesystemVersion) Reset() { *m = FilesystemVersion{} } func (m *FilesystemVersion) String() string { return proto.CompactTextString(m) } func (*FilesystemVersion) ProtoMessage() {} func (*FilesystemVersion) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_483c6918b7b3d747, []int{5} + return fileDescriptor_pdu_2d84e8d7d278a80d, []int{6} } func (m *FilesystemVersion) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_FilesystemVersion.Unmarshal(m, b) @@ -371,7 +462,7 @@ func (m *SendReq) Reset() { *m = SendReq{} } func (m *SendReq) String() string { return proto.CompactTextString(m) } func (*SendReq) ProtoMessage() {} func (*SendReq) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_483c6918b7b3d747, []int{6} + return fileDescriptor_pdu_2d84e8d7d278a80d, []int{7} } func (m *SendReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_SendReq.Unmarshal(m, b) @@ -445,7 +536,7 @@ func (m *Property) Reset() { *m = Property{} } func (m *Property) String() string { return proto.CompactTextString(m) } func (*Property) ProtoMessage() {} func (*Property) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_483c6918b7b3d747, []int{7} + return fileDescriptor_pdu_2d84e8d7d278a80d, []int{8} } func (m *Property) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Property.Unmarshal(m, b) @@ -496,7 +587,7 @@ func (m *SendRes) Reset() { *m = SendRes{} } func (m *SendRes) String() string { return proto.CompactTextString(m) } func (*SendRes) ProtoMessage() {} func (*SendRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_483c6918b7b3d747, []int{8} + return fileDescriptor_pdu_2d84e8d7d278a80d, []int{9} } func (m *SendRes) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_SendRes.Unmarshal(m, b) @@ -548,7 +639,7 @@ func (m *SendCompletedReq) Reset() { *m = SendCompletedReq{} } func (m *SendCompletedReq) String() string { return proto.CompactTextString(m) } func (*SendCompletedReq) ProtoMessage() {} func (*SendCompletedReq) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_483c6918b7b3d747, []int{9} + return fileDescriptor_pdu_2d84e8d7d278a80d, []int{10} } func (m *SendCompletedReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_SendCompletedReq.Unmarshal(m, b) @@ -585,7 +676,7 @@ func (m *SendCompletedRes) Reset() { *m = SendCompletedRes{} } func (m *SendCompletedRes) String() string { return proto.CompactTextString(m) } func (*SendCompletedRes) ProtoMessage() {} func (*SendCompletedRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_483c6918b7b3d747, []int{10} + return fileDescriptor_pdu_2d84e8d7d278a80d, []int{11} } func (m *SendCompletedRes) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_SendCompletedRes.Unmarshal(m, b) @@ -620,7 +711,7 @@ func (m *ReceiveReq) Reset() { *m = ReceiveReq{} } func (m *ReceiveReq) String() string { return proto.CompactTextString(m) } func (*ReceiveReq) ProtoMessage() {} func (*ReceiveReq) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_483c6918b7b3d747, []int{11} + return fileDescriptor_pdu_2d84e8d7d278a80d, []int{12} } func (m *ReceiveReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ReceiveReq.Unmarshal(m, b) @@ -671,7 +762,7 @@ func (m *ReceiveRes) Reset() { *m = ReceiveRes{} } func (m *ReceiveRes) String() string { return proto.CompactTextString(m) } func (*ReceiveRes) ProtoMessage() {} func (*ReceiveRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_483c6918b7b3d747, []int{12} + return fileDescriptor_pdu_2d84e8d7d278a80d, []int{13} } func (m *ReceiveRes) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ReceiveRes.Unmarshal(m, b) @@ -704,7 +795,7 @@ func (m *DestroySnapshotsReq) Reset() { *m = DestroySnapshotsReq{} } func (m *DestroySnapshotsReq) String() string { return proto.CompactTextString(m) } func (*DestroySnapshotsReq) ProtoMessage() {} func (*DestroySnapshotsReq) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_483c6918b7b3d747, []int{13} + return fileDescriptor_pdu_2d84e8d7d278a80d, []int{14} } func (m *DestroySnapshotsReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_DestroySnapshotsReq.Unmarshal(m, b) @@ -750,7 +841,7 @@ func (m *DestroySnapshotRes) Reset() { *m = DestroySnapshotRes{} } func (m *DestroySnapshotRes) String() string { return proto.CompactTextString(m) } func (*DestroySnapshotRes) ProtoMessage() {} func (*DestroySnapshotRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_483c6918b7b3d747, []int{14} + return fileDescriptor_pdu_2d84e8d7d278a80d, []int{15} } func (m *DestroySnapshotRes) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_DestroySnapshotRes.Unmarshal(m, b) @@ -795,7 +886,7 @@ func (m *DestroySnapshotsRes) Reset() { *m = DestroySnapshotsRes{} } func (m *DestroySnapshotsRes) String() string { return proto.CompactTextString(m) } func (*DestroySnapshotsRes) ProtoMessage() {} func (*DestroySnapshotsRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_483c6918b7b3d747, []int{15} + return fileDescriptor_pdu_2d84e8d7d278a80d, []int{16} } func (m *DestroySnapshotsRes) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_DestroySnapshotsRes.Unmarshal(m, b) @@ -833,7 +924,7 @@ func (m *ReplicationCursorReq) Reset() { *m = ReplicationCursorReq{} } func (m *ReplicationCursorReq) String() string { return proto.CompactTextString(m) } func (*ReplicationCursorReq) ProtoMessage() {} func (*ReplicationCursorReq) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_483c6918b7b3d747, []int{16} + return fileDescriptor_pdu_2d84e8d7d278a80d, []int{17} } func (m *ReplicationCursorReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ReplicationCursorReq.Unmarshal(m, b) @@ -874,7 +965,7 @@ func (m *ReplicationCursorRes) Reset() { *m = ReplicationCursorRes{} } func (m *ReplicationCursorRes) String() string { return proto.CompactTextString(m) } func (*ReplicationCursorRes) ProtoMessage() {} func (*ReplicationCursorRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_483c6918b7b3d747, []int{17} + return fileDescriptor_pdu_2d84e8d7d278a80d, []int{18} } func (m *ReplicationCursorRes) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ReplicationCursorRes.Unmarshal(m, b) @@ -1010,7 +1101,7 @@ func (m *PingReq) Reset() { *m = PingReq{} } func (m *PingReq) String() string { return proto.CompactTextString(m) } func (*PingReq) ProtoMessage() {} func (*PingReq) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_483c6918b7b3d747, []int{18} + return fileDescriptor_pdu_2d84e8d7d278a80d, []int{19} } func (m *PingReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PingReq.Unmarshal(m, b) @@ -1049,7 +1140,7 @@ func (m *PingRes) Reset() { *m = PingRes{} } func (m *PingRes) String() string { return proto.CompactTextString(m) } func (*PingRes) ProtoMessage() {} func (*PingRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_483c6918b7b3d747, []int{19} + return fileDescriptor_pdu_2d84e8d7d278a80d, []int{20} } func (m *PingRes) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PingRes.Unmarshal(m, b) @@ -1082,6 +1173,7 @@ func init() { proto.RegisterType((*Filesystem)(nil), "Filesystem") proto.RegisterType((*ListFilesystemVersionsReq)(nil), "ListFilesystemVersionsReq") proto.RegisterType((*ListFilesystemVersionsRes)(nil), "ListFilesystemVersionsRes") + proto.RegisterType((*SendAbstraction)(nil), "SendAbstraction") proto.RegisterType((*FilesystemVersion)(nil), "FilesystemVersion") proto.RegisterType((*SendReq)(nil), "SendReq") proto.RegisterType((*Property)(nil), "Property") @@ -1098,6 +1190,7 @@ func init() { proto.RegisterType((*PingReq)(nil), "PingReq") proto.RegisterType((*PingRes)(nil), "PingRes") proto.RegisterEnum("Tri", Tri_name, Tri_value) + proto.RegisterEnum("SendAbstraction_SendAbstractionType", SendAbstraction_SendAbstractionType_name, SendAbstraction_SendAbstractionType_value) proto.RegisterEnum("FilesystemVersion_VersionType", FilesystemVersion_VersionType_name, FilesystemVersion_VersionType_value) } @@ -1338,61 +1431,67 @@ var _Replication_serviceDesc = grpc.ServiceDesc{ Metadata: "pdu.proto", } -func init() { proto.RegisterFile("pdu.proto", fileDescriptor_pdu_483c6918b7b3d747) } +func init() { proto.RegisterFile("pdu.proto", fileDescriptor_pdu_2d84e8d7d278a80d) } -var fileDescriptor_pdu_483c6918b7b3d747 = []byte{ - // 833 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x56, 0x5f, 0x6f, 0xe3, 0x44, - 0x10, 0xaf, 0x13, 0xa7, 0x75, 0x26, 0x3d, 0x2e, 0x9d, 0x96, 0x93, 0xb1, 0xe0, 0x54, 0x2d, 0x08, - 0xe5, 0x2a, 0x61, 0xa1, 0xf2, 0x47, 0x42, 0x48, 0x27, 0xd1, 0xb4, 0xbd, 0x3b, 0x01, 0x47, 0xb4, - 0x35, 0x27, 0x74, 0x6f, 0x26, 0x19, 0xb5, 0x56, 0x1d, 0xaf, 0xbb, 0xe3, 0xa0, 0x0b, 0xe2, 0x89, - 0x47, 0xbe, 0x1e, 0x7c, 0x10, 0x3e, 0x02, 0xf2, 0xc6, 0x4e, 0x9c, 0xd8, 0x41, 0x79, 0xca, 0xce, - 0x6f, 0x66, 0x77, 0x67, 0x7f, 0xf3, 0x9b, 0x71, 0xa0, 0x9b, 0x4e, 0x66, 0x7e, 0xaa, 0x55, 0xa6, - 0xc4, 0x31, 0x1c, 0xfd, 0x10, 0x71, 0x76, 0x1d, 0xc5, 0xc4, 0x73, 0xce, 0x68, 0x2a, 0xe9, 0x41, - 0x5c, 0xd4, 0x41, 0xc6, 0xcf, 0xa0, 0xb7, 0x02, 0xd8, 0xb5, 0x4e, 0xdb, 0x83, 0xde, 0x79, 0xcf, - 0xaf, 0x04, 0x55, 0xfd, 0xe2, 0x2f, 0x0b, 0x60, 0x65, 0x23, 0x82, 0x3d, 0x0a, 0xb3, 0x3b, 0xd7, - 0x3a, 0xb5, 0x06, 0x5d, 0x69, 0xd6, 0x78, 0x0a, 0x3d, 0x49, 0x3c, 0x9b, 0x52, 0xa0, 0xee, 0x29, - 0x71, 0x5b, 0xc6, 0x55, 0x85, 0xf0, 0x13, 0x78, 0xf4, 0x8a, 0x47, 0x71, 0x38, 0xa6, 0x3b, 0x15, - 0x4f, 0x48, 0xbb, 0xed, 0x53, 0x6b, 0xe0, 0xc8, 0x75, 0x30, 0x3f, 0xe7, 0x15, 0x5f, 0x25, 0x63, - 0x3d, 0x4f, 0x33, 0x9a, 0xb8, 0xb6, 0x89, 0xa9, 0x42, 0xe2, 0x5b, 0xf8, 0x60, 0xfd, 0x41, 0x6f, - 0x48, 0x73, 0xa4, 0x12, 0x96, 0xf4, 0x80, 0x4f, 0xab, 0x89, 0x16, 0x09, 0x56, 0x10, 0xf1, 0xfd, - 0xf6, 0xcd, 0x8c, 0x3e, 0x38, 0xa5, 0x59, 0x50, 0x82, 0x7e, 0x2d, 0x52, 0x2e, 0x63, 0xc4, 0x3f, - 0x16, 0x1c, 0xd5, 0xfc, 0x78, 0x0e, 0x76, 0x30, 0x4f, 0xc9, 0x5c, 0xfe, 0xde, 0xf9, 0xd3, 0xfa, - 0x09, 0x7e, 0xf1, 0x9b, 0x47, 0x49, 0x13, 0x9b, 0x33, 0xfa, 0x3a, 0x9c, 0x52, 0x41, 0x9b, 0x59, - 0xe7, 0xd8, 0x8b, 0x59, 0x34, 0x31, 0x34, 0xd9, 0xd2, 0xac, 0xf1, 0x43, 0xe8, 0x0e, 0x35, 0x85, - 0x19, 0x05, 0xbf, 0xbc, 0x30, 0xdc, 0xd8, 0x72, 0x05, 0xa0, 0x07, 0x8e, 0x31, 0x22, 0x95, 0xb8, - 0x1d, 0x73, 0xd2, 0xd2, 0x16, 0xcf, 0xa0, 0x57, 0xb9, 0x16, 0x0f, 0xc1, 0xb9, 0x49, 0xc2, 0x94, - 0xef, 0x54, 0xd6, 0xdf, 0xcb, 0xad, 0x0b, 0xa5, 0xee, 0xa7, 0xa1, 0xbe, 0xef, 0x5b, 0xe2, 0x6f, - 0x0b, 0x0e, 0x6e, 0x28, 0x99, 0xec, 0xc0, 0x27, 0x7e, 0x0a, 0xf6, 0xb5, 0x56, 0x53, 0x93, 0x78, - 0x33, 0x5d, 0xc6, 0x8f, 0x02, 0x5a, 0x81, 0x32, 0x4f, 0x69, 0x8e, 0x6a, 0x05, 0x6a, 0x53, 0x42, - 0x76, 0x5d, 0x42, 0x02, 0xba, 0x2b, 0x69, 0x74, 0x0c, 0xbf, 0xb6, 0x1f, 0xe8, 0x48, 0xae, 0x60, - 0x7c, 0x02, 0xfb, 0x97, 0x7a, 0x2e, 0x67, 0x89, 0xbb, 0x6f, 0xb4, 0x53, 0x58, 0xe2, 0x4b, 0x70, - 0x46, 0x5a, 0xa5, 0xa4, 0xb3, 0xf9, 0x92, 0x6e, 0xab, 0x42, 0xf7, 0x09, 0x74, 0xde, 0x84, 0xf1, - 0xac, 0xac, 0xc1, 0xc2, 0x10, 0x7f, 0x2e, 0xb9, 0x60, 0x1c, 0xc0, 0xe3, 0x9f, 0x99, 0x26, 0x9b, - 0x32, 0x77, 0xe4, 0x26, 0x8c, 0x02, 0x0e, 0xaf, 0xde, 0xa5, 0x34, 0xce, 0x68, 0x72, 0x13, 0xfd, - 0x4e, 0xe6, 0xdd, 0x6d, 0xb9, 0x86, 0xe1, 0x33, 0x80, 0x22, 0x9f, 0x88, 0xd8, 0xb5, 0x8d, 0xdc, - 0xba, 0x7e, 0x99, 0xa2, 0xac, 0x38, 0xc5, 0x73, 0xe8, 0xe7, 0x39, 0x0c, 0xd5, 0x34, 0x8d, 0x29, - 0x23, 0x53, 0x98, 0x33, 0xe8, 0xfd, 0xa4, 0xa3, 0xdb, 0x28, 0x09, 0x63, 0x49, 0x0f, 0x05, 0xff, - 0x8e, 0x5f, 0xd4, 0x4d, 0x56, 0x9d, 0x02, 0x6b, 0xfb, 0x59, 0xfc, 0x01, 0x20, 0x69, 0x4c, 0xd1, - 0x6f, 0xb4, 0x4b, 0x99, 0x17, 0xe5, 0x6b, 0xfd, 0x6f, 0xf9, 0xce, 0xa0, 0x3f, 0x8c, 0x29, 0xd4, - 0x55, 0x7e, 0x16, 0x2d, 0x5e, 0xc3, 0xc5, 0x61, 0xe5, 0x76, 0x16, 0xb7, 0x70, 0x7c, 0x49, 0x9c, - 0x69, 0x35, 0x2f, 0x35, 0xb9, 0x4b, 0x2f, 0xe3, 0xe7, 0xd0, 0x5d, 0xc6, 0xbb, 0xad, 0xad, 0xfd, - 0xba, 0x0a, 0x12, 0x6f, 0x01, 0x37, 0x2e, 0x2a, 0xda, 0xbe, 0x34, 0xcd, 0x2d, 0x5b, 0xda, 0xbe, - 0x8c, 0xc9, 0x95, 0x72, 0xa5, 0xb5, 0xd2, 0xa5, 0x52, 0x8c, 0x21, 0x2e, 0x9b, 0x1e, 0x91, 0x4f, - 0xda, 0x83, 0xfc, 0xe1, 0x71, 0x56, 0x8e, 0x94, 0x63, 0xbf, 0x9e, 0x82, 0x2c, 0x63, 0xc4, 0xd7, - 0x70, 0x22, 0x29, 0x8d, 0xa3, 0xb1, 0xe9, 0xda, 0xe1, 0x4c, 0xb3, 0xd2, 0xbb, 0xcc, 0xb5, 0xa0, - 0x71, 0x1f, 0xe3, 0x49, 0x31, 0x44, 0xf2, 0x1d, 0xf6, 0xcb, 0xbd, 0xe5, 0x18, 0x71, 0x5e, 0xab, - 0x8c, 0xde, 0x45, 0x9c, 0x2d, 0x24, 0xfc, 0x72, 0x4f, 0x2e, 0x91, 0x0b, 0x07, 0xf6, 0x17, 0xe9, - 0x88, 0x8f, 0xe1, 0x60, 0x14, 0x25, 0xb7, 0x79, 0x02, 0x2e, 0x1c, 0xfc, 0x48, 0xcc, 0xe1, 0x6d, - 0xd9, 0x35, 0xa5, 0x29, 0x3e, 0x2a, 0x83, 0x38, 0xef, 0xab, 0xab, 0xf1, 0x9d, 0x2a, 0xfb, 0x2a, - 0x5f, 0x9f, 0x0d, 0xa0, 0x1d, 0xe8, 0x28, 0x1f, 0x31, 0x97, 0x2a, 0xc9, 0x86, 0xa1, 0xa6, 0xfe, - 0x1e, 0x76, 0xa1, 0x73, 0x1d, 0xc6, 0x4c, 0x7d, 0x0b, 0x1d, 0xb0, 0x03, 0x3d, 0xa3, 0x7e, 0xeb, - 0xfc, 0xdf, 0x56, 0x3e, 0x00, 0x96, 0x8f, 0x40, 0x0f, 0xec, 0xfc, 0x60, 0x74, 0xfc, 0x22, 0x09, - 0xaf, 0x5c, 0x31, 0x7e, 0x03, 0x8f, 0xd7, 0xe7, 0x38, 0x23, 0xfa, 0xb5, 0x8f, 0x9f, 0x57, 0xc7, - 0x18, 0x47, 0xf0, 0xa4, 0xf9, 0x13, 0x80, 0x9e, 0xbf, 0xf5, 0xc3, 0xe2, 0x6d, 0xf7, 0x31, 0x3e, - 0x87, 0xfe, 0x66, 0xe9, 0xf1, 0xc4, 0x6f, 0x90, 0xb4, 0xd7, 0x84, 0x32, 0x7e, 0x07, 0x47, 0xb5, - 0xe2, 0xe1, 0xfb, 0x7e, 0x93, 0x10, 0xbc, 0x46, 0x98, 0xf1, 0x2b, 0x78, 0xb4, 0xd6, 0xe2, 0x78, - 0xe4, 0x6f, 0x8e, 0x0c, 0xaf, 0x06, 0xf1, 0x45, 0xe7, 0x6d, 0x3b, 0x9d, 0xcc, 0x7e, 0xdd, 0x37, - 0xff, 0x1f, 0xbe, 0xf8, 0x2f, 0x00, 0x00, 0xff, 0xff, 0x27, 0x95, 0xc1, 0x78, 0x4c, 0x08, 0x00, - 0x00, +var fileDescriptor_pdu_2d84e8d7d278a80d = []byte{ + // 940 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x56, 0x6d, 0x6f, 0xe3, 0xc4, + 0x13, 0xaf, 0x13, 0xa7, 0x75, 0x26, 0xed, 0xbf, 0xee, 0x24, 0xff, 0x23, 0x44, 0x70, 0xaa, 0x96, + 0x13, 0xca, 0x55, 0x60, 0xa1, 0xf0, 0x20, 0x10, 0xe8, 0xa4, 0x6b, 0xd2, 0x5e, 0x8b, 0xe0, 0x88, + 0xb6, 0xb9, 0x0a, 0x9d, 0xc4, 0x0b, 0x37, 0x1e, 0x5a, 0xab, 0x8e, 0xd7, 0xdd, 0x75, 0xd0, 0x05, + 0xf1, 0x8a, 0x77, 0xf0, 0xf5, 0xe0, 0x73, 0x20, 0x3e, 0x02, 0xf2, 0xc6, 0x4e, 0x1c, 0xdb, 0x3d, + 0xf5, 0x55, 0x76, 0x7e, 0x33, 0xe3, 0x9d, 0xc7, 0xdf, 0x06, 0x9a, 0x91, 0x37, 0x77, 0x22, 0x29, + 0x62, 0xc1, 0xda, 0x70, 0xf0, 0x9d, 0xaf, 0xe2, 0x53, 0x3f, 0x20, 0xb5, 0x50, 0x31, 0xcd, 0x38, + 0xdd, 0xb1, 0xe3, 0x32, 0xa8, 0xf0, 0x63, 0x68, 0xad, 0x01, 0xd5, 0x35, 0x0e, 0xeb, 0xfd, 0xd6, + 0xa0, 0xe5, 0xe4, 0x8c, 0xf2, 0x7a, 0xf6, 0xa7, 0x01, 0xb0, 0x96, 0x11, 0xc1, 0x1c, 0xbb, 0xf1, + 0x4d, 0xd7, 0x38, 0x34, 0xfa, 0x4d, 0xae, 0xcf, 0x78, 0x08, 0x2d, 0x4e, 0x6a, 0x3e, 0xa3, 0x89, + 0xb8, 0xa5, 0xb0, 0x5b, 0xd3, 0xaa, 0x3c, 0x84, 0x4f, 0x60, 0xef, 0x5c, 0x8d, 0x03, 0x77, 0x4a, + 0x37, 0x22, 0xf0, 0x48, 0x76, 0xeb, 0x87, 0x46, 0xdf, 0xe2, 0x9b, 0x60, 0xf2, 0x9d, 0x73, 0x75, + 0x12, 0x4e, 0xe5, 0x22, 0x8a, 0xc9, 0xeb, 0x9a, 0xda, 0x26, 0x0f, 0xb1, 0xaf, 0xe1, 0xdd, 0xcd, + 0x84, 0x2e, 0x49, 0x2a, 0x5f, 0x84, 0x8a, 0xd3, 0x1d, 0x3e, 0xce, 0x07, 0x9a, 0x06, 0x98, 0x43, + 0xd8, 0x1f, 0xc6, 0xfd, 0xde, 0x0a, 0x1d, 0xb0, 0x32, 0x31, 0xad, 0x09, 0x3a, 0x25, 0x4b, 0xbe, + 0xb2, 0xc1, 0x6f, 0xc0, 0xbe, 0xa0, 0xd0, 0x7b, 0x7e, 0xa5, 0x62, 0xe9, 0x4e, 0x63, 0xed, 0x57, + 0xd3, 0x7e, 0xb6, 0x53, 0x50, 0xf0, 0x92, 0x25, 0xfb, 0xc7, 0x80, 0xfd, 0x02, 0x88, 0x5f, 0x82, + 0x39, 0x59, 0x44, 0xa4, 0x23, 0xff, 0xdf, 0xe0, 0x49, 0xf1, 0x2b, 0x45, 0x39, 0xb1, 0xe5, 0xda, + 0x03, 0x3b, 0xd0, 0xf8, 0x56, 0x5c, 0x9d, 0x8f, 0xd2, 0xd2, 0x2f, 0x05, 0xfc, 0x08, 0x76, 0xd2, + 0x68, 0x75, 0xb9, 0xab, 0x13, 0xca, 0x4c, 0xd8, 0x4f, 0xd0, 0xae, 0xb8, 0x00, 0xf7, 0xa0, 0xf9, + 0x2a, 0xf4, 0xe8, 0x67, 0x3f, 0x24, 0xcf, 0xde, 0xc2, 0x77, 0xa0, 0xcd, 0x29, 0x0a, 0xfc, 0xa9, + 0x9b, 0x58, 0x0c, 0xe7, 0x52, 0x09, 0x79, 0x39, 0xb0, 0x0d, 0xdc, 0x05, 0xeb, 0x22, 0xa6, 0xe8, + 0x4c, 0x04, 0x9e, 0x5d, 0x43, 0x1b, 0x76, 0x13, 0xe9, 0x58, 0x88, 0xdb, 0x99, 0x2b, 0x6f, 0xed, + 0x3a, 0xfb, 0xdb, 0x80, 0x83, 0xd2, 0xed, 0x38, 0xd8, 0x48, 0xf9, 0x71, 0x39, 0x3e, 0x27, 0xfd, + 0xcd, 0x25, 0x8b, 0x60, 0xbe, 0x74, 0x67, 0x94, 0xe6, 0xaa, 0xcf, 0x09, 0xf6, 0x62, 0xee, 0x7b, + 0x3a, 0x4f, 0x93, 0xeb, 0x33, 0xbe, 0x07, 0xcd, 0xa1, 0x24, 0x37, 0xa6, 0xc9, 0x8f, 0x2f, 0xf4, + 0x2c, 0x99, 0x7c, 0x0d, 0x60, 0x0f, 0x2c, 0x2d, 0x24, 0xd5, 0x69, 0xe8, 0x2f, 0xad, 0x64, 0xf6, + 0x14, 0x5a, 0xb9, 0x6b, 0x75, 0x6a, 0xa1, 0x1b, 0xa9, 0x1b, 0x11, 0xdb, 0x5b, 0x89, 0xb4, 0x4a, + 0xcb, 0x60, 0x7f, 0x19, 0xb0, 0x93, 0x94, 0xed, 0x01, 0xf3, 0x87, 0x1f, 0x82, 0x79, 0x2a, 0xc5, + 0x4c, 0x07, 0x5e, 0xdd, 0x0c, 0xad, 0x47, 0x06, 0xb5, 0x89, 0x78, 0x4b, 0xcb, 0x6a, 0x13, 0x51, + 0x5c, 0x39, 0xb3, 0xbc, 0x72, 0x0c, 0x9a, 0xeb, 0x55, 0x6a, 0xe8, 0xfa, 0x9a, 0xce, 0x44, 0xfa, + 0x7c, 0x0d, 0xe3, 0x23, 0xd8, 0x1e, 0xc9, 0x05, 0x9f, 0x87, 0xdd, 0x6d, 0xbd, 0x6b, 0xa9, 0xc4, + 0x3e, 0x03, 0x6b, 0x2c, 0x45, 0x44, 0x32, 0x5e, 0xac, 0xca, 0x6d, 0xe4, 0xca, 0xdd, 0x81, 0xc6, + 0xa5, 0x1b, 0xcc, 0xb3, 0x1e, 0x2c, 0x05, 0xf6, 0xfb, 0xaa, 0x16, 0x0a, 0xfb, 0xb0, 0xff, 0x4a, + 0x91, 0x57, 0xa4, 0x05, 0x8b, 0x17, 0x61, 0x64, 0xb0, 0x7b, 0xf2, 0x26, 0xa2, 0x69, 0x4c, 0xde, + 0x85, 0xff, 0x2b, 0xe9, 0xbc, 0xeb, 0x7c, 0x03, 0xc3, 0xa7, 0x00, 0x69, 0x3c, 0x3e, 0xa9, 0xae, + 0xa9, 0xb7, 0xac, 0xe9, 0x64, 0x21, 0xf2, 0x9c, 0x92, 0x3d, 0x5b, 0xae, 0xe5, 0x50, 0xcc, 0xa2, + 0x80, 0x62, 0xd2, 0x8d, 0x39, 0x82, 0xd6, 0x0f, 0xd2, 0xbf, 0xf6, 0x43, 0x37, 0xe0, 0x74, 0x97, + 0xd6, 0xdf, 0x72, 0xd2, 0xbe, 0xf1, 0xbc, 0x92, 0x61, 0xc9, 0x5f, 0xb1, 0xdf, 0x00, 0x38, 0x4d, + 0xc9, 0xff, 0x85, 0x1e, 0xd2, 0xe6, 0x65, 0xfb, 0x6a, 0x6f, 0x6d, 0xdf, 0x11, 0xd8, 0xc3, 0x80, + 0x5c, 0x99, 0xaf, 0xcf, 0x92, 0x12, 0x4b, 0x38, 0xdb, 0xcd, 0xdd, 0xae, 0xd8, 0x35, 0xb4, 0x47, + 0xa4, 0x62, 0x29, 0x16, 0xd9, 0x4c, 0x3e, 0x84, 0xfb, 0xf0, 0x13, 0x68, 0xae, 0xec, 0x53, 0x9a, + 0xaa, 0x8a, 0x6d, 0x6d, 0xc4, 0x5e, 0x03, 0x16, 0x2e, 0x4a, 0x59, 0x32, 0x13, 0xf5, 0x2d, 0xf7, + 0xb0, 0x64, 0x66, 0x93, 0x4c, 0xca, 0x89, 0x94, 0x42, 0x66, 0x93, 0xa2, 0x05, 0x36, 0xaa, 0x4a, + 0x22, 0x79, 0x99, 0x76, 0x92, 0xc4, 0x83, 0x38, 0x63, 0xe0, 0xb6, 0x53, 0x0e, 0x81, 0x67, 0x36, + 0xec, 0x0b, 0xe8, 0x94, 0xb8, 0xe8, 0x21, 0xef, 0xc0, 0xa4, 0xd2, 0x4f, 0x61, 0x27, 0x25, 0x91, + 0xc4, 0xc3, 0x3c, 0xdb, 0x5a, 0xd1, 0x88, 0xf5, 0x52, 0xc4, 0xf4, 0xc6, 0x57, 0xf1, 0x72, 0x84, + 0xcf, 0xb6, 0xf8, 0x0a, 0x39, 0xb6, 0x60, 0x7b, 0x19, 0x0e, 0xfb, 0x00, 0x76, 0xc6, 0x7e, 0x78, + 0x9d, 0x04, 0xd0, 0x85, 0x9d, 0xef, 0x49, 0x29, 0xf7, 0x3a, 0xdb, 0x9a, 0x4c, 0x64, 0xef, 0x67, + 0x46, 0x2a, 0xd9, 0xab, 0x93, 0xe9, 0x8d, 0xc8, 0xf6, 0x2a, 0x39, 0x1f, 0xf5, 0xa1, 0x3e, 0x91, + 0x7e, 0x42, 0x31, 0x23, 0x11, 0xc6, 0x43, 0x57, 0x92, 0xbd, 0x85, 0x4d, 0x68, 0x9c, 0xba, 0x81, + 0x22, 0xdb, 0x40, 0x0b, 0xcc, 0x89, 0x9c, 0x93, 0x5d, 0x1b, 0xfc, 0x5b, 0x4b, 0x08, 0x60, 0x95, + 0x04, 0xf6, 0xc0, 0x4c, 0x3e, 0x8c, 0x96, 0x93, 0x06, 0xd1, 0xcb, 0x4e, 0x0a, 0xbf, 0x82, 0xfd, + 0xcd, 0x67, 0x4f, 0x21, 0x3a, 0xa5, 0x3f, 0x0b, 0xbd, 0x32, 0xa6, 0x70, 0x0c, 0x8f, 0xaa, 0x5f, + 0x4c, 0xec, 0x39, 0xf7, 0x3e, 0xc4, 0xbd, 0xfb, 0x75, 0x0a, 0x9f, 0x81, 0x5d, 0x6c, 0x3d, 0x76, + 0x9c, 0x8a, 0x91, 0xee, 0x55, 0xa1, 0x0a, 0x9f, 0xc3, 0x41, 0xa9, 0x79, 0xf8, 0x7f, 0xa7, 0x6a, + 0x10, 0x7a, 0x95, 0xb0, 0xc2, 0xcf, 0x61, 0x6f, 0x63, 0xc5, 0xf1, 0xc0, 0x29, 0x52, 0x46, 0xaf, + 0x04, 0xa9, 0xe3, 0xc6, 0xeb, 0x7a, 0xe4, 0xcd, 0xaf, 0xb6, 0xf5, 0xff, 0xad, 0x4f, 0xff, 0x0b, + 0x00, 0x00, 0xff, 0xff, 0x0a, 0xa0, 0x3f, 0xc2, 0x7c, 0x09, 0x00, 0x00, } diff --git a/replication/logic/pdu/pdu.proto b/replication/logic/pdu/pdu.proto index 983504e..6221577 100644 --- a/replication/logic/pdu/pdu.proto +++ b/replication/logic/pdu/pdu.proto @@ -25,7 +25,22 @@ message Filesystem { message ListFilesystemVersionsReq { string Filesystem = 1; } -message ListFilesystemVersionsRes { repeated FilesystemVersion Versions = 1; } +message ListFilesystemVersionsRes { + repeated FilesystemVersion Versions = 1; + repeated SendAbstraction SendAbstractions = 2; +} + +message SendAbstraction { + enum SendAbstractionType { + Undefined = 0; + ReplicationCursorV2 = 1; + StepHold = 2; + StepBookmark = 3; + }; + SendAbstractionType Type = 1; + string JobID = 2; + FilesystemVersion Version = 3; +} message FilesystemVersion { enum VersionType { @@ -80,9 +95,7 @@ message SendRes { repeated Property Properties = 4; } -message SendCompletedReq { - SendReq OriginalReq = 2; -} +message SendCompletedReq { SendReq OriginalReq = 2; } message SendCompletedRes {} @@ -98,7 +111,7 @@ message ReceiveReq { message ReceiveRes {} message DestroySnapshotsReq { - string Filesystem = 1; +string Filesystem = 1; // Path to filesystem, snapshot or bookmark to be destroyed repeated FilesystemVersion Snapshots = 2; } diff --git a/zfs/versions.go b/zfs/versions.go index 604f5ec..e78fb11 100644 --- a/zfs/versions.go +++ b/zfs/versions.go @@ -115,6 +115,8 @@ func (v FilesystemVersion) RelName() string { } func (v FilesystemVersion) String() string { return v.RelName() } +func (v FilesystemVersion) GetCreation() time.Time { return v.Creation } + // Only takes into account those attributes of FilesystemVersion that // are immutable over time in ZFS. func FilesystemVersionEqualIdentity(a, b FilesystemVersion) bool { diff --git a/zfs/zfs.go b/zfs/zfs.go index 1b99a0f..9f572cd 100644 --- a/zfs/zfs.go +++ b/zfs/zfs.go @@ -1500,6 +1500,30 @@ func tryParseDestroySnapshotsError(arg string, stderr []byte) *DestroySnapshotsE } } +type ErrDestroySnapshotDatasetIsBusy struct { + *DestroySnapshotsError + Name string +} + +var _ error = (*ErrDestroySnapshotDatasetIsBusy)(nil) + +func tryErrDestroySnapshotDatasetIsBusy(arg string, stderr []byte) *ErrDestroySnapshotDatasetIsBusy { + dsne := tryParseDestroySnapshotsError(arg, stderr) + if dsne == nil { + return nil + } + if len(dsne.Reason) != 1 { + return nil + } + if dsne.Reason[0] == "dataset is busy" { + return &ErrDestroySnapshotDatasetIsBusy{ + DestroySnapshotsError: dsne, + Name: dsne.Undestroyable[0], + } + } + return nil +} + func ZFSDestroy(ctx context.Context, arg string) (err error) { var dstype, filesystem string @@ -1533,6 +1557,8 @@ func ZFSDestroy(ctx context.Context, arg string) (err error) { err = &DatasetDoesNotExist{arg} } else if dsNotExistErr := tryDatasetDoesNotExist(filesystem, stdio); dsNotExistErr != nil { err = dsNotExistErr + } else if dsBusy := tryErrDestroySnapshotDatasetIsBusy(arg, stdio); dsBusy != nil { + err = dsBusy } else if dserr := tryParseDestroySnapshotsError(arg, stdio); dserr != nil { err = dserr }