WIP endpoint abstractions + pruning integration / pruner rewrite

This commit is contained in:
Christian Schwarz 2020-05-29 00:09:43 +02:00
parent f0146d03d3
commit 4a0104a44f
26 changed files with 1672 additions and 164 deletions

View File

@ -309,6 +309,11 @@ type PruneKeepNotReplicated struct {
KeepSnapshotAtCursor bool `yaml:"keep_snapshot_at_cursor,optional,default=true"`
}
type PruneKeepStepHolds struct {
Type string `yaml:"type"`
AdditionalJobIds []string `yaml:"additional_job_ids,optional"`
}
type PruneKeepLastN struct {
Type string `yaml:"type"`
Count int `yaml:"count"`
@ -480,6 +485,7 @@ func (t *ServeEnum) UnmarshalYAML(u func(interface{}, bool) error) (err error) {
func (t *PruningEnum) UnmarshalYAML(u func(interface{}, bool) error) (err error) {
t.Ret, err = enumUnmarshal(u, map[string]interface{}{
"step_holds": &PruneKeepStepHolds{},
"not_replicated": &PruneKeepNotReplicated{},
"last_n": &PruneKeepLastN{},
"grid": &PruneGrid{},

View File

@ -0,0 +1,72 @@
// Code generated by "enumer -type=FSState -json"; DO NOT EDIT.
//
package pruner
import (
"encoding/json"
"fmt"
)
const _FSStateName = "FSStateInitializedFSStatePlanningFSStatePlanErrFSStateExecutingFSStateExecuteErrFSStateExecuteSuccess"
var _FSStateIndex = [...]uint8{0, 18, 33, 47, 63, 80, 101}
func (i FSState) String() string {
if i < 0 || i >= FSState(len(_FSStateIndex)-1) {
return fmt.Sprintf("FSState(%d)", i)
}
return _FSStateName[_FSStateIndex[i]:_FSStateIndex[i+1]]
}
var _FSStateValues = []FSState{0, 1, 2, 3, 4, 5}
var _FSStateNameToValueMap = map[string]FSState{
_FSStateName[0:18]: 0,
_FSStateName[18:33]: 1,
_FSStateName[33:47]: 2,
_FSStateName[47:63]: 3,
_FSStateName[63:80]: 4,
_FSStateName[80:101]: 5,
}
// FSStateString retrieves an enum value from the enum constants string name.
// Throws an error if the param is not part of the enum.
func FSStateString(s string) (FSState, error) {
if val, ok := _FSStateNameToValueMap[s]; ok {
return val, nil
}
return 0, fmt.Errorf("%s does not belong to FSState values", s)
}
// FSStateValues returns all values of the enum
func FSStateValues() []FSState {
return _FSStateValues
}
// IsAFSState returns "true" if the value is listed in the enum definition. "false" otherwise
func (i FSState) IsAFSState() bool {
for _, v := range _FSStateValues {
if i == v {
return true
}
}
return false
}
// MarshalJSON implements the json.Marshaler interface for FSState
func (i FSState) MarshalJSON() ([]byte, error) {
return json.Marshal(i.String())
}
// UnmarshalJSON implements the json.Unmarshaler interface for FSState
func (i *FSState) UnmarshalJSON(data []byte) error {
var s string
if err := json.Unmarshal(data, &s); err != nil {
return fmt.Errorf("FSState should be a string, got %s", data)
}
var err error
*i, err = FSStateString(s)
return err
}

454
daemon/pruner.v2/pruner.go Normal file
View File

@ -0,0 +1,454 @@
package pruner
import (
"context"
"encoding/json"
"fmt"
"os"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/pruning"
"github.com/zrepl/zrepl/zfs"
)
type Pruner struct {
fsfilter endpoint.FSFilter
jid endpoint.JobID
side Side
keepRules []pruning.KeepRule
// all channels consumed by the run loop
reportReqs chan reportRequest
stopReqs chan stopRequest
done chan struct{}
fsListRes chan fsListRes
state State
listFilesystemsError error // only in state StateListFilesystemsError
fsPruners []*FSPruner // only in state StateFanOutFilesystems
}
//go:generate enumer -type=State -json
type State int
const (
StateInitialized State = iota
StateListFilesystems
StateListFilesystemsError
StateFanOutFilesystems
StateDone
)
type Report struct {
State State
ListFilesystemsError error // only valid in StateListFilesystemsError
Filesystems []*FSReport // valid from StateFanOutFilesystems
}
type reportRequest struct {
ctx context.Context
reply chan *Report
}
type runRequest struct {
complete chan struct{}
}
type stopRequest struct {
complete chan struct{}
}
type fsListRes struct {
filesystems []*zfs.DatasetPath
err error
}
type Side interface {
// may return both nil, indicating there is no replication position
GetReplicationPosition(ctx context.Context, fs string) (*zfs.FilesystemVersion, error)
isSide() Side
}
func NewPruner(fsfilter endpoint.FSFilter, jid endpoint.JobID, side Side, keepRules []pruning.KeepRule) *Pruner {
return &Pruner{
fsfilter,
jid,
side,
keepRules,
make(chan reportRequest),
make(chan stopRequest),
make(chan struct{}),
make(chan fsListRes),
StateInitialized,
nil,
nil,
}
}
func (p *Pruner) Run(ctx context.Context) *Report {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if p.state != StateInitialized {
panic("Run can onl[y be called once")
}
go func() {
fss, err := zfs.ZFSListMapping(ctx, p.fsfilter)
p.fsListRes <- fsListRes{fss, err}
}()
for {
select {
case res := <-p.fsListRes:
if res.err != nil {
p.state = StateListFilesystemsError
p.listFilesystemsError = res.err
close(p.done)
continue
}
p.state = StateFanOutFilesystems
p.fsPruners = make([]*FSPruner, len(res.filesystems))
_, add, end := trace.WithTaskGroup(ctx, "pruner-fan-out-fs")
for i, fs := range res.filesystems {
p.fsPruners[i] = NewFSPruner(p.jid, p.side, p.keepRules, fs)
add(func(ctx context.Context) {
p.fsPruners[i].Run(ctx)
})
}
go func() {
end()
close(p.done)
}()
case req := <-p.stopReqs:
cancel()
go func() {
<-p.done
close(req.complete)
}()
case req := <-p.reportReqs:
req.reply <- p.report(req.ctx)
case <-p.done:
p.state = StateDone
return p.report(ctx)
}
}
}
func (p *Pruner) Report(ctx context.Context) *Report {
req := reportRequest{
ctx: ctx,
reply: make(chan *Report, 1),
}
select {
case p.reportReqs <- req:
return <-req.reply
case <-ctx.Done():
return nil
case <-p.done:
return nil
}
}
func (p *Pruner) report(ctx context.Context) *Report {
fsreports := make([]*FSReport, len(p.fsPruners))
for i := range fsreports {
fsreports[i] = p.fsPruners[i].report()
}
return &Report{
State: p.state,
ListFilesystemsError: p.listFilesystemsError,
Filesystems: fsreports,
}
}
// implements pruning.Snapshot
type snapshot struct {
replicated bool
stepHolds []pruning.StepHold
zfs.FilesystemVersion
state SnapState
destroyOp *zfs.DestroySnapOp
}
//go:generate enumer -type=SnapState -json
type SnapState int
const (
SnapStateInitialized SnapState = iota
SnapStateKeeping
SnapStateDeletePending
SnapStateDeleteAttempted
)
// implements pruning.StepHold
type stepHold struct {
endpoint.Abstraction
}
func (s snapshot) Replicated() bool { return s.replicated }
func (s snapshot) StepHolds() []pruning.StepHold { return s.stepHolds }
func (s stepHold) GetJobID() endpoint.JobID { return *s.Abstraction.GetJobID() }
type FSPruner struct {
jid endpoint.JobID
side Side
keepRules []pruning.KeepRule
fsp *zfs.DatasetPath
state FSState
// all channels consumed by the run loop
planned chan fsPlanRes
executed chan fsExecuteRes
done chan struct{}
reportReqs chan fsReportReq
keepList []*snapshot // valid in FSStateExecuting and forward
destroyList []*snapshot // valid in FSStateExecuting and forward, field .destroyOp is invalid until FSStateExecuting is left
}
type fsPlanRes struct {
keepList []*snapshot
destroyList []*snapshot
err error
}
type fsExecuteRes struct {
completedDestroyOps []*zfs.DestroySnapOp // same len() as FSPruner.destroyList
}
type fsReportReq struct {
res chan *FSReport
}
type FSReport struct {
State FSState
KeepList []*SnapReport
Destroy []*SnapReport
}
type SnapReport struct {
State SnapState
Name string
Replicated bool
StepHoldCount int
DestroyError error
}
//go:generate enumer -type=FSState -json
type FSState int
const (
FSStateInitialized FSState = iota
FSStatePlanning
FSStatePlanErr
FSStateExecuting
FSStateExecuteErr
FSStateExecuteSuccess
)
func (s FSState) IsTerminal() bool {
return s == FSStatePlanErr || s == FSStateExecuteErr || s == FSStateExecuteSuccess
}
func NewFSPruner(jid endpoint.JobID, side Side, keepRules []pruning.KeepRule, fsp *zfs.DatasetPath) *FSPruner {
return &FSPruner{
jid, side, keepRules, fsp,
FSStateInitialized,
make(chan fsPlanRes),
make(chan fsExecuteRes),
make(chan struct{}),
make(chan fsReportReq),
nil, nil,
}
}
func (p *FSPruner) Run(ctx context.Context) *FSReport {
defer func() {
}()
p.state = FSStatePlanning
go func() { p.planned <- p.plan(ctx) }()
out:
for !p.state.IsTerminal() {
select {
case res := <-p.planned:
if res.err != nil {
p.state = FSStatePlanErr
continue
}
p.state = FSStateExecuting
p.keepList = res.keepList
p.destroyList = res.destroyList
go func() { p.executed <- p.execute(ctx, p.destroyList) }()
case res := <-p.executed:
if len(res.completedDestroyOps) != len(p.destroyList) {
panic("impl error: completedDestroyOps is a vector corresponding to entries in p.destroyList")
}
var erronous []*zfs.DestroySnapOp
for i, op := range res.completedDestroyOps {
if *op.ErrOut != nil {
erronous = append(erronous, op)
}
p.destroyList[i].destroyOp = op
p.destroyList[i].state = SnapStateDeleteAttempted
}
if len(erronous) > 0 {
p.state = FSStateExecuteErr
} else {
p.state = FSStateExecuteSuccess
}
close(p.done)
case <-p.reportReqs:
panic("unimp")
case <-p.done:
break out
}
}
// TODO render last FS report
return nil
}
func (p *FSPruner) plan(ctx context.Context) fsPlanRes {
fs := p.fsp.ToString()
vs, err := zfs.ZFSListFilesystemVersions(ctx, p.fsp, zfs.ListFilesystemVersionsOptions{})
if err != nil {
return fsPlanRes{err: errors.Wrap(err, "list filesystem versions")}
}
allJobsStepHolds, absErrs, err := endpoint.ListAbstractions(ctx, endpoint.ListZFSHoldsAndBookmarksQuery{
FS: endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{
FS: &fs,
},
What: endpoint.AbstractionTypeSet{
endpoint.AbstractionStepHold: true,
},
Concurrency: 1,
})
if err != nil {
return fsPlanRes{err: errors.Wrap(err, "list abstractions")}
}
if len(absErrs) > 0 {
logging.GetLogger(ctx, logging.SubsysPruning).WithError(endpoint.ListAbstractionsErrors(absErrs)).
Error("error listing some step holds, prune attempt might fail with 'dataset is busy' errors")
}
repPos, err := p.side.GetReplicationPosition(ctx, p.fsp.ToString())
if err != nil {
return fsPlanRes{err: errors.Wrap(err, "get replication position")}
}
vsAsSnaps := make([]pruning.Snapshot, len(vs))
for i := range vs {
var repPosCreateTxgOrZero uint64
if repPos != nil {
repPosCreateTxgOrZero = repPos.GetCreateTXG()
}
s := &snapshot{
state: SnapStateInitialized,
FilesystemVersion: vs[i],
replicated: vs[i].GetCreateTXG() <= repPosCreateTxgOrZero,
}
for _, h := range allJobsStepHolds {
if zfs.FilesystemVersionEqualIdentity(vs[i], h.GetFilesystemVersion()) {
s.stepHolds = append(s.stepHolds, stepHold{h})
}
}
vsAsSnaps[i] = s
}
downcastToSnapshots := func(l []pruning.Snapshot) (r []*snapshot) {
r = make([]*snapshot, len(l))
for i, e := range l {
r[i] = e.(*snapshot)
}
return r
}
pruningResult := pruning.PruneSnapshots(vsAsSnaps, p.keepRules)
remove, keep := downcastToSnapshots(pruningResult.Remove), downcastToSnapshots(pruningResult.Keep)
if len(remove)+len(keep) != len(vsAsSnaps) {
for _, s := range vsAsSnaps {
r, _ := json.MarshalIndent(s.(*snapshot).report(), "", " ")
fmt.Fprintf(os.Stderr, "%s\n", string(r))
}
panic("indecisive")
}
for _, s := range remove {
s.state = SnapStateDeletePending
}
for _, s := range keep {
s.state = SnapStateKeeping
}
return fsPlanRes{keepList: keep, destroyList: remove, err: nil}
}
func (p *FSPruner) execute(ctx context.Context, destroyList []*snapshot) fsExecuteRes {
ops := make([]*zfs.DestroySnapOp, len(destroyList))
for i, fsv := range p.destroyList {
ops[i] = &zfs.DestroySnapOp{
Filesystem: p.fsp.ToString(),
Name: fsv.GetName(),
ErrOut: new(error),
}
}
zfs.ZFSDestroyFilesystemVersions(ctx, ops)
return fsExecuteRes{completedDestroyOps: ops}
}
func (p *FSPruner) report() *FSReport {
return &FSReport{
State: p.state,
KeepList: p.reportRenderSnapReports(p.keepList),
Destroy: p.reportRenderSnapReports(p.destroyList),
}
}
func (p *FSPruner) reportRenderSnapReports(l []*snapshot) (r []*SnapReport) {
r = make([]*SnapReport, len(l))
for i := range l {
r[i] = l[i].report()
}
return r
}
func (s *snapshot) report() *SnapReport {
var snapErr error
if s.state == SnapStateDeleteAttempted {
if *s.destroyOp.ErrOut != nil {
snapErr = (*s.destroyOp.ErrOut)
}
}
return &SnapReport{
State: s.state,
Name: s.Name,
Replicated: s.Replicated(),
StepHoldCount: len(s.stepHolds),
DestroyError: snapErr,
}
}

View File

@ -0,0 +1,27 @@
package pruner
import (
"context"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/zfs"
)
type SideSender struct {
jobID endpoint.JobID
}
func NewSideSender(jid endpoint.JobID) *SideSender {
return &SideSender{jid}
}
func (s *SideSender) isSide() Side { return nil }
var _ Side = (*SideSender)(nil)
func (s *SideSender) GetReplicationPosition(ctx context.Context, fs string) (*zfs.FilesystemVersion, error) {
if fs == "" {
panic("must not pass zero value for fs")
}
return endpoint.GetMostRecentReplicationCursorOfJob(ctx, fs, s.jobID)
}

View File

@ -0,0 +1,70 @@
// Code generated by "enumer -type=SnapState -json"; DO NOT EDIT.
//
package pruner
import (
"encoding/json"
"fmt"
)
const _SnapStateName = "SnapStateInitializedSnapStateKeepingSnapStateDeletePendingSnapStateDeleteAttempted"
var _SnapStateIndex = [...]uint8{0, 20, 36, 58, 82}
func (i SnapState) String() string {
if i < 0 || i >= SnapState(len(_SnapStateIndex)-1) {
return fmt.Sprintf("SnapState(%d)", i)
}
return _SnapStateName[_SnapStateIndex[i]:_SnapStateIndex[i+1]]
}
var _SnapStateValues = []SnapState{0, 1, 2, 3}
var _SnapStateNameToValueMap = map[string]SnapState{
_SnapStateName[0:20]: 0,
_SnapStateName[20:36]: 1,
_SnapStateName[36:58]: 2,
_SnapStateName[58:82]: 3,
}
// SnapStateString retrieves an enum value from the enum constants string name.
// Throws an error if the param is not part of the enum.
func SnapStateString(s string) (SnapState, error) {
if val, ok := _SnapStateNameToValueMap[s]; ok {
return val, nil
}
return 0, fmt.Errorf("%s does not belong to SnapState values", s)
}
// SnapStateValues returns all values of the enum
func SnapStateValues() []SnapState {
return _SnapStateValues
}
// IsASnapState returns "true" if the value is listed in the enum definition. "false" otherwise
func (i SnapState) IsASnapState() bool {
for _, v := range _SnapStateValues {
if i == v {
return true
}
}
return false
}
// MarshalJSON implements the json.Marshaler interface for SnapState
func (i SnapState) MarshalJSON() ([]byte, error) {
return json.Marshal(i.String())
}
// UnmarshalJSON implements the json.Unmarshaler interface for SnapState
func (i *SnapState) UnmarshalJSON(data []byte) error {
var s string
if err := json.Unmarshal(data, &s); err != nil {
return fmt.Errorf("SnapState should be a string, got %s", data)
}
var err error
*i, err = SnapStateString(s)
return err
}

View File

@ -0,0 +1,71 @@
// Code generated by "enumer -type=State -json"; DO NOT EDIT.
//
package pruner
import (
"encoding/json"
"fmt"
)
const _StateName = "StateInitializedStateListFilesystemsStateListFilesystemsErrorStateFanOutFilesystemsStateDone"
var _StateIndex = [...]uint8{0, 16, 36, 61, 83, 92}
func (i State) String() string {
if i < 0 || i >= State(len(_StateIndex)-1) {
return fmt.Sprintf("State(%d)", i)
}
return _StateName[_StateIndex[i]:_StateIndex[i+1]]
}
var _StateValues = []State{0, 1, 2, 3, 4}
var _StateNameToValueMap = map[string]State{
_StateName[0:16]: 0,
_StateName[16:36]: 1,
_StateName[36:61]: 2,
_StateName[61:83]: 3,
_StateName[83:92]: 4,
}
// StateString retrieves an enum value from the enum constants string name.
// Throws an error if the param is not part of the enum.
func StateString(s string) (State, error) {
if val, ok := _StateNameToValueMap[s]; ok {
return val, nil
}
return 0, fmt.Errorf("%s does not belong to State values", s)
}
// StateValues returns all values of the enum
func StateValues() []State {
return _StateValues
}
// IsAState returns "true" if the value is listed in the enum definition. "false" otherwise
func (i State) IsAState() bool {
for _, v := range _StateValues {
if i == v {
return true
}
}
return false
}
// MarshalJSON implements the json.Marshaler interface for State
func (i State) MarshalJSON() ([]byte, error) {
return json.Marshal(i.String())
}
// UnmarshalJSON implements the json.Unmarshaler interface for State
func (i *State) UnmarshalJSON(data []byte) error {
var s string
if err := json.Unmarshal(data, &s); err != nil {
return fmt.Errorf("State should be a string, got %s", data)
}
var err error
*i, err = StateString(s)
return err
}

View File

@ -20,15 +20,14 @@ import (
)
// Try to keep it compatible with github.com/zrepl/zrepl/endpoint.Endpoint
type History interface {
ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error)
type Endpoint interface {
ListFilesystems(ctx context.Context, req *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error)
ListFilesystemVersions(ctx context.Context, req *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error)
}
// Try to keep it compatible with github.com/zrepl/zrepl/endpoint.Endpoint
type Target interface {
ListFilesystems(ctx context.Context, req *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error)
ListFilesystemVersions(ctx context.Context, req *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error)
Endpoint
DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error)
}
@ -46,13 +45,14 @@ func GetLogger(ctx context.Context) Logger {
}
type args struct {
ctx context.Context
target Target
receiver History
rules []pruning.KeepRule
retryWait time.Duration
considerSnapAtCursorReplicated bool
promPruneSecs prometheus.Observer
ctx context.Context
target Target
sender, receiver Endpoint
rules []pruning.KeepRule
retryWait time.Duration
considerSnapAtCursorReplicated bool
convertAnyStepHoldToStepBookmark bool
promPruneSecs prometheus.Observer
}
type Pruner struct {
@ -70,11 +70,12 @@ type Pruner struct {
}
type PrunerFactory struct {
senderRules []pruning.KeepRule
receiverRules []pruning.KeepRule
retryWait time.Duration
considerSnapAtCursorReplicated bool
promPruneSecs *prometheus.HistogramVec
senderRules []pruning.KeepRule
receiverRules []pruning.KeepRule
retryWait time.Duration
considerSnapAtCursorReplicated bool
convertAnyStepHoldToStepBookmark bool
promPruneSecs *prometheus.HistogramVec
}
type LocalPrunerFactory struct {
@ -122,25 +123,35 @@ func NewPrunerFactory(in config.PruningSenderReceiver, promPruneSecs *prometheus
}
considerSnapAtCursorReplicated = considerSnapAtCursorReplicated || !knr.KeepSnapshotAtCursor
}
convertAnyStepHoldToStepBookmark := false
for _, r := range in.KeepSender {
_, ok := r.Ret.(*config.PruneKeepStepHolds)
convertAnyStepHoldToStepBookmark = convertAnyStepHoldToStepBookmark || ok
}
f := &PrunerFactory{
senderRules: keepRulesSender,
receiverRules: keepRulesReceiver,
retryWait: envconst.Duration("ZREPL_PRUNER_RETRY_INTERVAL", 10*time.Second),
considerSnapAtCursorReplicated: considerSnapAtCursorReplicated,
promPruneSecs: promPruneSecs,
senderRules: keepRulesSender,
receiverRules: keepRulesReceiver,
retryWait: envconst.Duration("ZREPL_PRUNER_RETRY_INTERVAL", 10*time.Second),
considerSnapAtCursorReplicated: considerSnapAtCursorReplicated,
convertAnyStepHoldToStepBookmark: convertAnyStepHoldToStepBookmark,
promPruneSecs: promPruneSecs,
}
return f, nil
}
func (f *PrunerFactory) BuildSenderPruner(ctx context.Context, target Target, receiver History) *Pruner {
func (f *PrunerFactory) BuildSenderPruner(ctx context.Context, sender Target, receiver Endpoint) *Pruner {
p := &Pruner{
args: args{
context.WithValue(ctx, contextKeyPruneSide, "sender"),
target,
sender,
sender,
receiver,
f.senderRules,
f.retryWait,
f.considerSnapAtCursorReplicated,
f.convertAnyStepHoldToStepBookmark,
f.promPruneSecs.WithLabelValues("sender"),
},
state: Plan,
@ -148,15 +159,17 @@ func (f *PrunerFactory) BuildSenderPruner(ctx context.Context, target Target, re
return p
}
func (f *PrunerFactory) BuildReceiverPruner(ctx context.Context, target Target, receiver History) *Pruner {
func (f *PrunerFactory) BuildReceiverPruner(ctx context.Context, receiver Target, sender Endpoint) *Pruner {
p := &Pruner{
args: args{
context.WithValue(ctx, contextKeyPruneSide, "receiver"),
target,
receiver,
sender,
receiver,
f.receiverRules,
f.retryWait,
false, // senseless here anyways
false, // senseless here anyways
f.promPruneSecs.WithLabelValues("receiver"),
},
state: Plan,
@ -164,15 +177,17 @@ func (f *PrunerFactory) BuildReceiverPruner(ctx context.Context, target Target,
return p
}
func (f *LocalPrunerFactory) BuildLocalPruner(ctx context.Context, target Target, receiver History) *Pruner {
func (f *LocalPrunerFactory) BuildLocalPruner(ctx context.Context, target Target) *Pruner {
p := &Pruner{
args: args{
context.WithValue(ctx, contextKeyPruneSide, "local"),
target,
receiver,
target,
target,
f.keepRules,
f.retryWait,
false, // considerSnapAtCursorReplicated is not relevant for local pruning
false, // convertAnyStepHoldToStepBookmark is not relevant for local pruning
f.promPruneSecs.WithLabelValues("local"),
},
state: Plan,
@ -341,11 +356,13 @@ func (s snapshot) Replicated() bool { return s.replicated }
func (s snapshot) Date() time.Time { return s.date }
func (s snapshot) CreateTXG() uint64 { return s.fsv.GetCreateTXG() }
func doOneAttempt(a *args, u updater) {
ctx, target, receiver := a.ctx, a.target, a.receiver
ctx, sender, receiver, target := a.ctx, a.sender, a.receiver, a.target
sfssres, err := receiver.ListFilesystems(ctx, &pdu.ListFilesystemReq{})
sfssres, err := sender.ListFilesystems(ctx, &pdu.ListFilesystemReq{})
if err != nil {
u(func(p *Pruner) {
p.state = PlanErr
@ -407,6 +424,10 @@ tfss_loop:
pfs.snaps = make([]pruning.Snapshot, 0, len(tfsvs))
receiver.ListFilesystemVersions(ctx, &pdu.ListFilesystemVersionsReq{
Filesystem: tfs.Path,
})
rcReq := &pdu.ReplicationCursorReq{
Filesystem: tfs.Path,
}
@ -415,6 +436,7 @@ tfss_loop:
pfsPlanErrAndLog(err, "cannot get replication cursor bookmark")
continue tfss_loop
}
if rc.GetNotexist() {
err := errors.New("replication cursor bookmark does not exist (one successful replication is required before pruning works)")
pfsPlanErrAndLog(err, "")

View File

@ -112,6 +112,13 @@ func (s *Sender) ListFilesystemVersions(ctx context.Context, r *pdu.ListFilesyst
for i := range fsvs {
rfsvs[i] = pdu.FilesystemVersionFromZFS(&fsvs[i])
}
sendAbstractions := sendAbstractionsCacheSingleton.GetByFS(ctx, lp.ToString())
rSabsInfo := make([]*pdu.SendAbstraction, len(sendAbstractions))
for i := range rSabsInfo {
rSabsInfo[i] = SendAbstractionToPDU(sendAbstractions[i])
}
res := &pdu.ListFilesystemVersionsRes{Versions: rfsvs}
return res, nil

View File

@ -82,6 +82,24 @@ func (s *sendAbstractionsCache) InvalidateFSCache(fs string) {
}
func (s *sendAbstractionsCache) GetByFS(ctx context.Context, fs string) (ret []Abstraction) {
defer s.mtx.Lock().Unlock()
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
if fs == "" {
panic("must not pass zero-value fs")
}
s.tryLoadOnDiskSendAbstractions(ctx, fs)
for _, a := range s.abstractions {
if a.GetFS() == fs {
ret = append(ret, a)
}
}
return ret
}
// - logs errors in getting on-disk abstractions
// - only fetches on-disk abstractions once, but every time from the in-memory store
//

View File

@ -0,0 +1,28 @@
package endpoint
import "github.com/zrepl/zrepl/replication/logic/pdu"
func SendAbstractionToPDU(a Abstraction) *pdu.SendAbstraction {
var ty pdu.SendAbstraction_SendAbstractionType
switch a.GetType() {
case AbstractionLastReceivedHold:
panic(a)
case AbstractionReplicationCursorBookmarkV1:
panic(a)
case AbstractionReplicationCursorBookmarkV2:
ty = pdu.SendAbstraction_ReplicationCursorV2
case AbstractionStepHold:
ty = pdu.SendAbstraction_StepHold
case AbstractionStepBookmark:
ty = pdu.SendAbstraction_StepBookmark
default:
panic(a)
}
version := a.GetFilesystemVersion()
return &pdu.SendAbstraction{
Type: ty,
JobID: (*a.GetJobID()).String(),
Version: pdu.FilesystemVersionFromZFS(&version),
}
}

View File

@ -2,8 +2,9 @@ package tests
import (
"fmt"
"strings"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
"github.com/zrepl/zrepl/platformtest"
"github.com/zrepl/zrepl/zfs"
)
@ -17,7 +18,9 @@ func BatchDestroy(ctx *platformtest.Context) {
+ "foo bar@1"
+ "foo bar@2"
+ "foo bar@3"
+ "foo bar@4"
R zfs hold zrepl_platformtest "${ROOTDS}/foo bar@2"
R zfs hold zrepl_platformtest "${ROOTDS}/foo bar@4"
`)
reqs := []*zfs.DestroySnapOp{
@ -31,24 +34,40 @@ func BatchDestroy(ctx *platformtest.Context) {
Filesystem: fmt.Sprintf("%s/foo bar", ctx.RootDataset),
Name: "2",
},
&zfs.DestroySnapOp{
ErrOut: new(error),
Filesystem: fmt.Sprintf("%s/foo bar", ctx.RootDataset),
Name: "non existent",
},
&zfs.DestroySnapOp{
ErrOut: new(error),
Filesystem: fmt.Sprintf("%s/foo bar", ctx.RootDataset),
Name: "4",
},
}
zfs.ZFSDestroyFilesystemVersions(ctx, reqs)
pretty.Println(reqs)
if *reqs[0].ErrOut != nil {
panic("expecting no error")
}
err := (*reqs[1].ErrOut).Error()
if !strings.Contains(err, fmt.Sprintf("%s/foo bar@2", ctx.RootDataset)) {
panic(fmt.Sprintf("expecting error about being unable to destroy @2: %T\n%s", err, err))
}
eBusy, ok := (*reqs[1].ErrOut).(*zfs.ErrDestroySnapshotDatasetIsBusy)
require.True(ctx, ok)
require.Equal(ctx, reqs[1].Name, eBusy.Name)
require.Nil(ctx, *reqs[2].ErrOut, "destroying non-existent snap is not an error (idempotence)")
eBusy, ok = (*reqs[3].ErrOut).(*zfs.ErrDestroySnapshotDatasetIsBusy)
require.True(ctx, ok)
require.Equal(ctx, reqs[3].Name, eBusy.Name)
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
!N "foo bar@3"
!E "foo bar@1"
!E "foo bar@2"
R zfs release zrepl_platformtest "${ROOTDS}/foo bar@2"
- "foo bar@2"
- "foo bar@1"
- "foo bar"
!E "foo bar@4"
`)
}

View File

@ -14,6 +14,7 @@ var Cases = []Case{BatchDestroy,
ListFilesystemVersionsUserrefs,
ListFilesystemVersionsZeroExistIsNotAnError,
ListFilesystemsNoFilter,
Pruner2NotReplicated,
ReceiveForceIntoEncryptedErr,
ReceiveForceRollbackWorksUnencrypted,
ReplicationIncrementalCleansUpStaleAbstractionsWithCacheOnSecondReplication,

View File

@ -0,0 +1,331 @@
package tests
import (
"encoding/json"
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/filters"
"github.com/zrepl/zrepl/daemon/pruner"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/platformtest"
"github.com/zrepl/zrepl/zfs"
)
func PrunerNotReplicated(ctx *platformtest.Context) {
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
DESTROYROOT
CREATEROOT
+ "foo bar"
+ "foo bar@1"
+ "foo bar@2"
+ "foo bar@3"
+ "foo bar@4"
+ "foo bar@5"
`)
c, err := config.ParseConfigBytes([]byte(fmt.Sprintf(`
jobs:
- name: prunetest
type: push
filesystems: {
"%s/foo bar<": true
}
connect:
type: tcp
address: 255.255.255.255:255
snapshotting:
type: manual
pruning:
keep_sender:
- type: not_replicated
- type: last_n
count: 1
keep_receiver:
- type: last_n
count: 2
`, ctx.RootDataset)))
require.NoError(ctx, err)
pushJob := c.Jobs[0].Ret.(*config.PushJob)
dummyHistVec := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "foo",
Subsystem: "foo",
Name: "foo",
Help: "foo",
}, []string{"foo"})
prunerFactory, err := pruner.NewPrunerFactory(pushJob.Pruning, dummyHistVec)
require.NoError(ctx, err)
senderJid := endpoint.MustMakeJobID("sender-job")
fsfilter, err := filters.DatasetMapFilterFromConfig(pushJob.Filesystems)
require.NoError(ctx, err)
sender := endpoint.NewSender(endpoint.SenderConfig{
FSF: fsfilter,
Encrypt: &zfs.NilBool{
B: false,
},
JobID: senderJid,
})
fs := ctx.RootDataset + "/foo bar"
// create a replication cursor to make pruning work at all
_, err = endpoint.CreateReplicationCursor(ctx, fs, fsversion(ctx, fs, "@2"), senderJid)
require.NoError(ctx, err)
p := prunerFactory.BuildSenderPruner(ctx, sender, sender)
p.Prune()
report := p.Report()
reportJSON, err := json.MarshalIndent(report, "", " ")
require.NoError(ctx, err)
ctx.Logf("%s\n", string(reportJSON))
require.Equal(ctx, pruner.Done.String(), report.State)
require.Len(ctx, report.Completed, 1)
fsReport := report.Completed[0]
require.Equal(ctx, fs, fsReport.Filesystem)
require.Empty(ctx, fsReport.SkipReason)
require.Empty(ctx, fsReport.LastError)
require.Len(ctx, fsReport.DestroyList, 1)
require.Equal(ctx, fsReport.DestroyList[0], pruner.SnapshotReport{
Name: "1",
Replicated: true,
Date: fsReport.DestroyList[0].Date,
})
}
func PrunerNoKeepNotReplicatedNoKeepStepHoldConvertsAnyStepHoldToBookmark(ctx *platformtest.Context) {
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
DESTROYROOT
CREATEROOT
+ "foo bar"
+ "foo bar@1"
+ "foo bar@2"
+ "foo bar@3"
+ "foo bar@4"
+ "foo bar@5"
`)
c, err := config.ParseConfigBytes([]byte(fmt.Sprintf(`
jobs:
- name: prunetest
type: push
filesystems: {
"%s/foo bar<": true
}
connect:
type: tcp
address: 255.255.255.255:255
snapshotting:
type: manual
pruning:
keep_sender:
- type: last_n
count: 1
keep_receiver:
- type: last_n
count: 2
`, ctx.RootDataset)))
require.NoError(ctx, err)
pushJob := c.Jobs[0].Ret.(*config.PushJob)
dummyHistVec := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "foo",
Subsystem: "foo",
Name: "foo",
Help: "foo",
}, []string{"foo"})
prunerFactory, err := pruner.NewPrunerFactory(pushJob.Pruning, dummyHistVec)
require.NoError(ctx, err)
senderJid := endpoint.MustMakeJobID("sender-job")
fsfilter, err := filters.DatasetMapFilterFromConfig(pushJob.Filesystems)
require.NoError(ctx, err)
sender := endpoint.NewSender(endpoint.SenderConfig{
FSF: fsfilter,
Encrypt: &zfs.NilBool{
B: false,
},
JobID: senderJid,
})
fs := ctx.RootDataset + "/foo bar"
// create a replication cursor to make pruning work at all
_, err = endpoint.CreateReplicationCursor(ctx, fs, fsversion(ctx, fs, "@2"), senderJid)
require.NoError(ctx, err)
// create step holds for the incremental @2->@3
endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@2"), senderJid)
endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@3"), senderJid)
// create step holds for another job
otherJid := endpoint.MustMakeJobID("other-job")
endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@2"), otherJid)
endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@3"), otherJid)
p := prunerFactory.BuildSenderPruner(ctx, sender, sender)
p.Prune()
report := p.Report()
reportJSON, err := json.MarshalIndent(report, "", " ")
require.NoError(ctx, err)
ctx.Logf("%s\n", string(reportJSON))
require.Equal(ctx, pruner.Done.String(), report.State)
require.Len(ctx, report.Completed, 1)
fsReport := report.Completed[0]
require.Equal(ctx, fs, fsReport.Filesystem)
require.Empty(ctx, fsReport.SkipReason)
require.Empty(ctx, fsReport.LastError)
expectDestroyList := []pruner.SnapshotReport{
{
Name: "1",
Replicated: true,
},
{
Name: "2",
Replicated: true,
},
{
Name: "3",
Replicated: true,
},
{
Name: "4",
Replicated: true,
},
}
for _, d := range fsReport.DestroyList {
d.Date = time.Time{}
}
require.Subset(ctx, fsReport.DestroyList, expectDestroyList)
}
func PrunerNoKeepNotReplicatedButKeepStepHold(ctx *platformtest.Context) {
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
DESTROYROOT
CREATEROOT
+ "foo bar"
+ "foo bar@1"
+ "foo bar@2"
+ "foo bar@3"
+ "foo bar@4"
+ "foo bar@5"
`)
c, err := config.ParseConfigBytes([]byte(fmt.Sprintf(`
jobs:
- name: prunetest
type: push
filesystems: {
"%s/foo bar<": true
}
connect:
type: tcp
address: 255.255.255.255:255
snapshotting:
type: manual
pruning:
keep_sender:
- type: step_holds
- type: last_n
count: 1
keep_receiver:
- type: last_n
count: 2
`, ctx.RootDataset)))
require.NoError(ctx, err)
pushJob := c.Jobs[0].Ret.(*config.PushJob)
dummyHistVec := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "foo",
Subsystem: "foo",
Name: "foo",
Help: "foo",
}, []string{"foo"})
prunerFactory, err := pruner.NewPrunerFactory(pushJob.Pruning, dummyHistVec)
require.NoError(ctx, err)
senderJid := endpoint.MustMakeJobID("sender-job")
fsfilter, err := filters.DatasetMapFilterFromConfig(pushJob.Filesystems)
require.NoError(ctx, err)
sender := endpoint.NewSender(endpoint.SenderConfig{
FSF: fsfilter,
Encrypt: &zfs.NilBool{
B: false,
},
JobID: senderJid,
})
fs := ctx.RootDataset + "/foo bar"
// create a replication cursor to make pruning work at all
_, err = endpoint.CreateReplicationCursor(ctx, fs, fsversion(ctx, fs, "@2"), senderJid)
require.NoError(ctx, err)
// create step holds for the incremental @2->@3
endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@2"), senderJid)
endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@3"), senderJid)
// create step holds for another job
otherJid := endpoint.MustMakeJobID("other-job")
endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@2"), otherJid)
endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@3"), otherJid)
p := prunerFactory.BuildSenderPruner(ctx, sender, sender)
p.Prune()
report := p.Report()
reportJSON, err := json.MarshalIndent(report, "", " ")
require.NoError(ctx, err)
ctx.Logf("%s\n", string(reportJSON))
require.Equal(ctx, pruner.Done.String(), report.State)
require.Len(ctx, report.Completed, 1)
fsReport := report.Completed[0]
require.Equal(ctx, fs, fsReport.Filesystem)
require.Empty(ctx, fsReport.SkipReason)
require.Empty(ctx, fsReport.LastError)
expectDestroyList := []pruner.SnapshotReport{
{
Name: "1",
Replicated: true,
},
{
Name: "4",
Replicated: true,
},
}
for _, d := range fsReport.DestroyList {
d.Date = time.Time{}
}
require.Subset(ctx, fsReport.DestroyList, expectDestroyList)
}

View File

@ -0,0 +1,137 @@
package tests
import (
"encoding/json"
"fmt"
"time"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/filters"
"github.com/zrepl/zrepl/daemon/pruner.v2"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/platformtest"
"github.com/zrepl/zrepl/pruning"
"github.com/zrepl/zrepl/zfs"
)
func Pruner2NotReplicated(ctx *platformtest.Context) {
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
DESTROYROOT
CREATEROOT
+ "foo bar"
+ "foo bar@1"
+ "foo bar@2"
+ "foo bar@3"
+ "foo bar@4"
+ "foo bar@5"
`)
fs := ctx.RootDataset + "/foo bar"
senderJid := endpoint.MustMakeJobID("sender-job")
otherJid1 := endpoint.MustMakeJobID("other-job-1")
otherJid2 := endpoint.MustMakeJobID("other-job-2")
// create step holds for the incremental @2->@3
endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@2"), senderJid)
endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@3"), senderJid)
// create step holds for other-job-1 @2 -> @3
endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@2"), otherJid1)
endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@3"), otherJid1)
// create step hold for other-job-2 @1 (will be pruned)
endpoint.HoldStep(ctx, fs, fsversion(ctx, fs, "@1"), otherJid2)
c, err := config.ParseConfigBytes([]byte(fmt.Sprintf(`
jobs:
- name: prunetest
type: push
filesystems: {
"%s/foo bar": true
}
connect:
type: tcp
address: 255.255.255.255:255
snapshotting:
type: manual
pruning:
keep_sender:
#- type: not_replicated
- type: step_holds
- type: last_n
count: 1
keep_receiver:
- type: last_n
count: 2
`, ctx.RootDataset)))
require.NoError(ctx, err)
pushJob := c.Jobs[0].Ret.(*config.PushJob)
require.NoError(ctx, err)
fsfilter, err := filters.DatasetMapFilterFromConfig(pushJob.Filesystems)
require.NoError(ctx, err)
matchedFilesystems, err := zfs.ZFSListMapping(ctx, fsfilter)
ctx.Logf("%s", pretty.Sprint(matchedFilesystems))
require.NoError(ctx, err)
require.Len(ctx, matchedFilesystems, 1)
sideSender := pruner.NewSideSender(senderJid)
keepRules, err := pruning.RulesFromConfig(senderJid, pushJob.Pruning.KeepSender)
require.NoError(ctx, err)
p := pruner.NewPruner(fsfilter, senderJid, sideSender, keepRules)
runDone := make(chan *pruner.Report)
go func() {
runDone <- p.Run(ctx)
}()
var report *pruner.Report
// concurrency stress
out:
for {
select {
case <-time.After(10 * time.Millisecond):
p.Report(ctx)
case report = <-runDone:
break out
}
}
ctx.Logf("%s\n", pretty.Sprint(report))
reportJSON, err := json.MarshalIndent(report, "", " ")
require.NoError(ctx, err)
ctx.Logf("%s\n", string(reportJSON))
ctx.FailNow()
// fs := ctx.RootDataset + "/foo bar"
// // create a replication cursor to make pruning work at all
// _, err = endpoint.CreateReplicationCursor(ctx, fs, fsversion(ctx, fs, "@2"), senderJid)
// require.NoError(ctx, err)
// p := prunerFactory.BuildSenderPruner(ctx, sender, sender)
// p.Prune()
// report := p.Report()
// require.Equal(ctx, pruner.Done.String(), report.State)
// require.Len(ctx, report.Completed, 1)
// fsReport := report.Completed[0]
// require.Equal(ctx, fs, fsReport.Filesystem)
// require.Empty(ctx, fsReport.SkipReason)
// require.Empty(ctx, fsReport.LastError)
// require.Len(ctx, fsReport.DestroyList, 1)
// require.Equal(ctx, fsReport.DestroyList[0], pruner.SnapshotReport{
// Name: "1",
// Replicated: true,
// Date: fsReport.DestroyList[0].Date,
// })
}

View File

@ -17,10 +17,12 @@ func UndestroyableSnapshotParsing(t *platformtest.Context) {
+ "foo bar@1 2 3"
+ "foo bar@4 5 6"
+ "foo bar@7 8 9"
+ "foo bar@10 11 12"
R zfs hold zrepl_platformtest "${ROOTDS}/foo bar@4 5 6"
R zfs hold zrepl_platformtest "${ROOTDS}/foo bar@7 8 9"
`)
err := zfs.ZFSDestroy(t, fmt.Sprintf("%s/foo bar@1 2 3,4 5 6,7 8 9", t.RootDataset))
err := zfs.ZFSDestroy(t, fmt.Sprintf("%s/foo bar@1 2 3,4 5 6,7 8 9,10 11 12", t.RootDataset))
if err == nil {
panic("expecting destroy error due to hold")
}
@ -30,8 +32,9 @@ func UndestroyableSnapshotParsing(t *platformtest.Context) {
if dse.Filesystem != fmt.Sprintf("%s/foo bar", t.RootDataset) {
panic(dse.Filesystem)
}
require.Equal(t, []string{"4 5 6"}, dse.Undestroyable)
require.Equal(t, []string{"dataset is busy"}, dse.Reason)
expectUndestroyable := []string{"4 5 6", "7 8 9"}
require.Len(t, dse.Undestroyable, len(expectUndestroyable))
require.Subset(t, dse.Undestroyable, expectUndestroyable)
}
}

View File

@ -66,24 +66,26 @@ type retentionGridAdaptor struct {
Snapshot
}
func (a retentionGridAdaptor) Date() time.Time { return a.Snapshot.GetCreation() }
func (a retentionGridAdaptor) LessThan(b retentiongrid.Entry) bool {
return a.Date().Before(b.Date())
}
// Prune filters snapshots with the retention grid.
func (p *KeepGrid) KeepRule(snaps []Snapshot) (destroyList []Snapshot) {
func (p *KeepGrid) KeepRule(snaps []Snapshot) PruneSnapshotsResult {
snaps = filterSnapList(snaps, func(snapshot Snapshot) bool {
return p.re.MatchString(snapshot.Name())
reCandidates := partitionSnapList(snaps, func(snapshot Snapshot) bool {
return p.re.MatchString(snapshot.GetName())
})
if len(snaps) == 0 {
return nil
if len(reCandidates.Remove) == 0 {
return reCandidates
}
// Build adaptors for retention grid
adaptors := make([]retentiongrid.Entry, 0)
for i := range snaps {
adaptors = append(adaptors, retentionGridAdaptor{snaps[i]})
adaptors = append(adaptors, retentionGridAdaptor{reCandidates.Remove[i]})
}
// determine 'now' edge
@ -93,12 +95,17 @@ func (p *KeepGrid) KeepRule(snaps []Snapshot) (destroyList []Snapshot) {
now := adaptors[len(adaptors)-1].Date()
// Evaluate retention grid
_, removea := p.retentionGrid.FitEntries(now, adaptors)
keepa, removea := p.retentionGrid.FitEntries(now, adaptors)
// Revert adaptors
destroyList = make([]Snapshot, len(removea))
destroyList := make([]Snapshot, len(removea))
for i := range removea {
destroyList[i] = removea[i].(retentionGridAdaptor).Snapshot
}
return destroyList
for _, a := range keepa {
reCandidates.Keep = append(reCandidates.Keep, a.(retentionGridAdaptor))
}
reCandidates.Remove = destroyList
return reCandidates
}

View File

@ -1,10 +1,13 @@
package pruning
func filterSnapList(snaps []Snapshot, predicate func(Snapshot) bool) []Snapshot {
r := make([]Snapshot, 0, len(snaps))
func partitionSnapList(snaps []Snapshot, remove func(Snapshot) bool) (r PruneSnapshotsResult) {
r.Keep = make([]Snapshot, 0, len(snaps))
r.Remove = make([]Snapshot, 0, len(snaps))
for i := range snaps {
if predicate(snaps[i]) {
r = append(r, snaps[i])
if remove(snaps[i]) {
r.Remove = append(r.Remove, snaps[i])
} else {
r.Keep = append(r.Keep, snaps[i])
}
}
return r

View File

@ -17,17 +17,17 @@ func NewKeepLastN(n int) (*KeepLastN, error) {
return &KeepLastN{n}, nil
}
func (k KeepLastN) KeepRule(snaps []Snapshot) (destroyList []Snapshot) {
func (k KeepLastN) KeepRule(snaps []Snapshot) PruneSnapshotsResult {
if k.n > len(snaps) {
return []Snapshot{}
return PruneSnapshotsResult{Keep: snaps}
}
res := shallowCopySnapList(snaps)
sort.Slice(res, func(i, j int) bool {
return res[i].Date().After(res[j].Date())
return res[i].GetCreateTXG() > res[j].GetCreateTXG()
})
return res[k.n:]
return PruneSnapshotsResult{Remove: res[k.n:], Keep: res[:k.n]}
}

View File

@ -2,8 +2,8 @@ package pruning
type KeepNotReplicated struct{}
func (*KeepNotReplicated) KeepRule(snaps []Snapshot) (destroyList []Snapshot) {
return filterSnapList(snaps, func(snapshot Snapshot) bool {
func (*KeepNotReplicated) KeepRule(snaps []Snapshot) PruneSnapshotsResult {
return partitionSnapList(snaps, func(snapshot Snapshot) bool {
return snapshot.Replicated()
})
}

View File

@ -27,12 +27,12 @@ func MustKeepRegex(expr string, negate bool) *KeepRegex {
return k
}
func (k *KeepRegex) KeepRule(snaps []Snapshot) []Snapshot {
return filterSnapList(snaps, func(s Snapshot) bool {
func (k *KeepRegex) KeepRule(snaps []Snapshot) PruneSnapshotsResult {
return partitionSnapList(snaps, func(s Snapshot) bool {
if k.negate {
return k.expr.FindStringIndex(s.Name()) != nil
return k.expr.FindStringIndex(s.GetName()) != nil
} else {
return k.expr.FindStringIndex(s.Name()) == nil
return k.expr.FindStringIndex(s.GetName()) == nil
}
})
}

View File

@ -0,0 +1,44 @@
package pruning
import (
"github.com/pkg/errors"
"github.com/zrepl/zrepl/endpoint"
)
type KeepStepHolds struct {
keepJobIDs map[endpoint.JobID]bool
}
var _ KeepRule = (*KeepStepHolds)(nil)
func NewKeepStepHolds(mainJobId endpoint.JobID, additionalJobIdsStrings []string) (_ *KeepStepHolds, err error) {
additionalJobIds := make(map[endpoint.JobID]bool, len(additionalJobIdsStrings))
mainJobId.MustValidate()
additionalJobIds[mainJobId] = true
for i := range additionalJobIdsStrings {
ajid, err := endpoint.MakeJobID(additionalJobIdsStrings[i])
if err != nil {
return nil, errors.WithMessagef(err, "cannot parse job id %q: %s", additionalJobIdsStrings[i])
}
if additionalJobIds[ajid] == true {
return nil, errors.Errorf("duplicate job id %q", ajid)
}
}
return &KeepStepHolds{additionalJobIds}, nil
}
func (h *KeepStepHolds) KeepRule(snaps []Snapshot) PruneSnapshotsResult {
return partitionSnapList(snaps, func(s Snapshot) bool {
holdingJobIDs := make(map[endpoint.JobID]bool)
for _, h := range s.StepHolds() {
holdingJobIDs[h.GetJobID()] = true
}
oneOrMoreOfOurJobIDsHoldsSnap := false
for kjid := range h.keepJobIDs {
oneOrMoreOfOurJobIDsHoldsSnap = oneOrMoreOfOurJobIDsHoldsSnap || holdingJobIDs[kjid]
}
return !oneOrMoreOfOurJobIDsHoldsSnap
})
}

View File

@ -7,47 +7,93 @@ import (
"github.com/pkg/errors"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/endpoint"
)
type KeepRule interface {
KeepRule(snaps []Snapshot) (destroyList []Snapshot)
KeepRule(snaps []Snapshot) PruneSnapshotsResult
}
type Snapshot interface {
Name() string
GetName() string
Replicated() bool
Date() time.Time
GetCreation() time.Time
GetCreateTXG() uint64
StepHolds() []StepHold
}
// The returned snapshot list is guaranteed to only contains elements of input parameter snaps
func PruneSnapshots(snaps []Snapshot, keepRules []KeepRule) []Snapshot {
type StepHold interface {
GetJobID() endpoint.JobID
}
type PruneSnapshotsResult struct {
Remove, Keep []Snapshot
}
// The returned snapshot results are a partition of the snaps argument.
// That means than len(Remove) + len(Keep) == len(snaps)
func PruneSnapshots(snapsI []Snapshot, keepRules []KeepRule) PruneSnapshotsResult {
if len(keepRules) == 0 {
return []Snapshot{}
return PruneSnapshotsResult{Remove: nil, Keep: snapsI}
}
type snapshot struct {
Snapshot
keepCount, removeCount int
}
// project down to snapshot
snaps := make([]Snapshot, len(snapsI))
for i := range snaps {
snaps[i] = &snapshot{snapsI[i], 0, 0}
}
remCount := make(map[Snapshot]int, len(snaps))
for _, r := range keepRules {
ruleRems := r.KeepRule(snaps)
for _, ruleRem := range ruleRems {
remCount[ruleRem]++
ruleImplCheckSet := make(map[Snapshot]int, len(snaps))
for _, s := range snaps {
ruleImplCheckSet[s] = ruleImplCheckSet[s] + 1
}
ruleResults := r.KeepRule(snaps)
for _, s := range snaps {
ruleImplCheckSet[s] = ruleImplCheckSet[s] - 1
}
for _, n := range ruleImplCheckSet {
if n != 0 {
panic(fmt.Sprintf("incorrect rule implementation: %T", r))
}
}
for _, s := range ruleResults.Remove {
s.(*snapshot).removeCount++
}
for _, s := range ruleResults.Keep {
s.(*snapshot).keepCount++
}
}
remove := make([]Snapshot, 0, len(snaps))
for snap, rc := range remCount {
if rc == len(keepRules) {
remove = append(remove, snap)
keep := make([]Snapshot, 0, len(snaps))
for _, sI := range snaps {
s := sI.(*snapshot)
if s.removeCount == len(keepRules) {
// all keep rules agree to remove the snap
remove = append(remove, s.Snapshot)
} else {
keep = append(keep, s.Snapshot)
}
}
return remove
return PruneSnapshotsResult{Remove: remove, Keep: keep}
}
func RulesFromConfig(in []config.PruningEnum) (rules []KeepRule, err error) {
func RulesFromConfig(mainJobId endpoint.JobID, in []config.PruningEnum) (rules []KeepRule, err error) {
rules = make([]KeepRule, len(in))
for i := range in {
rules[i], err = RuleFromConfig(in[i])
rules[i], err = RuleFromConfig(mainJobId, in[i])
if err != nil {
return nil, errors.Wrapf(err, "cannot build rule #%d", i)
}
@ -55,7 +101,7 @@ func RulesFromConfig(in []config.PruningEnum) (rules []KeepRule, err error) {
return rules, nil
}
func RuleFromConfig(in config.PruningEnum) (KeepRule, error) {
func RuleFromConfig(mainJobId endpoint.JobID, in config.PruningEnum) (KeepRule, error) {
switch v := in.Ret.(type) {
case *config.PruneKeepNotReplicated:
return NewKeepNotReplicated(), nil
@ -65,6 +111,8 @@ func RuleFromConfig(in config.PruningEnum) (KeepRule, error) {
return NewKeepRegex(v.Regex, v.Negate)
case *config.PruneGrid:
return NewKeepGrid(v)
case *config.PruneKeepStepHolds:
return NewKeepStepHolds(mainJobId, v.AdditionalJobIds)
default:
return nil, fmt.Errorf("unknown keep rule type %T", v)
}

View File

@ -46,7 +46,36 @@ func (x Tri) String() string {
return proto.EnumName(Tri_name, int32(x))
}
func (Tri) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_pdu_483c6918b7b3d747, []int{0}
return fileDescriptor_pdu_2d84e8d7d278a80d, []int{0}
}
type SendAbstraction_SendAbstractionType int32
const (
SendAbstraction_Undefined SendAbstraction_SendAbstractionType = 0
SendAbstraction_ReplicationCursorV2 SendAbstraction_SendAbstractionType = 1
SendAbstraction_StepHold SendAbstraction_SendAbstractionType = 2
SendAbstraction_StepBookmark SendAbstraction_SendAbstractionType = 3
)
var SendAbstraction_SendAbstractionType_name = map[int32]string{
0: "Undefined",
1: "ReplicationCursorV2",
2: "StepHold",
3: "StepBookmark",
}
var SendAbstraction_SendAbstractionType_value = map[string]int32{
"Undefined": 0,
"ReplicationCursorV2": 1,
"StepHold": 2,
"StepBookmark": 3,
}
func (x SendAbstraction_SendAbstractionType) String() string {
return proto.EnumName(SendAbstraction_SendAbstractionType_name, int32(x))
}
func (SendAbstraction_SendAbstractionType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_pdu_2d84e8d7d278a80d, []int{5, 0}
}
type FilesystemVersion_VersionType int32
@ -69,7 +98,7 @@ func (x FilesystemVersion_VersionType) String() string {
return proto.EnumName(FilesystemVersion_VersionType_name, int32(x))
}
func (FilesystemVersion_VersionType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_pdu_483c6918b7b3d747, []int{5, 0}
return fileDescriptor_pdu_2d84e8d7d278a80d, []int{6, 0}
}
type ListFilesystemReq struct {
@ -82,7 +111,7 @@ func (m *ListFilesystemReq) Reset() { *m = ListFilesystemReq{} }
func (m *ListFilesystemReq) String() string { return proto.CompactTextString(m) }
func (*ListFilesystemReq) ProtoMessage() {}
func (*ListFilesystemReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_483c6918b7b3d747, []int{0}
return fileDescriptor_pdu_2d84e8d7d278a80d, []int{0}
}
func (m *ListFilesystemReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListFilesystemReq.Unmarshal(m, b)
@ -113,7 +142,7 @@ func (m *ListFilesystemRes) Reset() { *m = ListFilesystemRes{} }
func (m *ListFilesystemRes) String() string { return proto.CompactTextString(m) }
func (*ListFilesystemRes) ProtoMessage() {}
func (*ListFilesystemRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_483c6918b7b3d747, []int{1}
return fileDescriptor_pdu_2d84e8d7d278a80d, []int{1}
}
func (m *ListFilesystemRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListFilesystemRes.Unmarshal(m, b)
@ -154,7 +183,7 @@ func (m *Filesystem) Reset() { *m = Filesystem{} }
func (m *Filesystem) String() string { return proto.CompactTextString(m) }
func (*Filesystem) ProtoMessage() {}
func (*Filesystem) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_483c6918b7b3d747, []int{2}
return fileDescriptor_pdu_2d84e8d7d278a80d, []int{2}
}
func (m *Filesystem) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Filesystem.Unmarshal(m, b)
@ -213,7 +242,7 @@ func (m *ListFilesystemVersionsReq) Reset() { *m = ListFilesystemVersion
func (m *ListFilesystemVersionsReq) String() string { return proto.CompactTextString(m) }
func (*ListFilesystemVersionsReq) ProtoMessage() {}
func (*ListFilesystemVersionsReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_483c6918b7b3d747, []int{3}
return fileDescriptor_pdu_2d84e8d7d278a80d, []int{3}
}
func (m *ListFilesystemVersionsReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListFilesystemVersionsReq.Unmarshal(m, b)
@ -242,6 +271,7 @@ func (m *ListFilesystemVersionsReq) GetFilesystem() string {
type ListFilesystemVersionsRes struct {
Versions []*FilesystemVersion `protobuf:"bytes,1,rep,name=Versions,proto3" json:"Versions,omitempty"`
SendAbstractions []*SendAbstraction `protobuf:"bytes,2,rep,name=SendAbstractions,proto3" json:"SendAbstractions,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -251,7 +281,7 @@ func (m *ListFilesystemVersionsRes) Reset() { *m = ListFilesystemVersion
func (m *ListFilesystemVersionsRes) String() string { return proto.CompactTextString(m) }
func (*ListFilesystemVersionsRes) ProtoMessage() {}
func (*ListFilesystemVersionsRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_483c6918b7b3d747, []int{4}
return fileDescriptor_pdu_2d84e8d7d278a80d, []int{4}
}
func (m *ListFilesystemVersionsRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListFilesystemVersionsRes.Unmarshal(m, b)
@ -278,6 +308,67 @@ func (m *ListFilesystemVersionsRes) GetVersions() []*FilesystemVersion {
return nil
}
func (m *ListFilesystemVersionsRes) GetSendAbstractions() []*SendAbstraction {
if m != nil {
return m.SendAbstractions
}
return nil
}
type SendAbstraction struct {
Type SendAbstraction_SendAbstractionType `protobuf:"varint,1,opt,name=Type,proto3,enum=SendAbstraction_SendAbstractionType" json:"Type,omitempty"`
JobID string `protobuf:"bytes,2,opt,name=JobID,proto3" json:"JobID,omitempty"`
Version *FilesystemVersion `protobuf:"bytes,3,opt,name=Version,proto3" json:"Version,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *SendAbstraction) Reset() { *m = SendAbstraction{} }
func (m *SendAbstraction) String() string { return proto.CompactTextString(m) }
func (*SendAbstraction) ProtoMessage() {}
func (*SendAbstraction) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_2d84e8d7d278a80d, []int{5}
}
func (m *SendAbstraction) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SendAbstraction.Unmarshal(m, b)
}
func (m *SendAbstraction) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_SendAbstraction.Marshal(b, m, deterministic)
}
func (dst *SendAbstraction) XXX_Merge(src proto.Message) {
xxx_messageInfo_SendAbstraction.Merge(dst, src)
}
func (m *SendAbstraction) XXX_Size() int {
return xxx_messageInfo_SendAbstraction.Size(m)
}
func (m *SendAbstraction) XXX_DiscardUnknown() {
xxx_messageInfo_SendAbstraction.DiscardUnknown(m)
}
var xxx_messageInfo_SendAbstraction proto.InternalMessageInfo
func (m *SendAbstraction) GetType() SendAbstraction_SendAbstractionType {
if m != nil {
return m.Type
}
return SendAbstraction_Undefined
}
func (m *SendAbstraction) GetJobID() string {
if m != nil {
return m.JobID
}
return ""
}
func (m *SendAbstraction) GetVersion() *FilesystemVersion {
if m != nil {
return m.Version
}
return nil
}
type FilesystemVersion struct {
Type FilesystemVersion_VersionType `protobuf:"varint,1,opt,name=Type,proto3,enum=FilesystemVersion_VersionType" json:"Type,omitempty"`
Name string `protobuf:"bytes,2,opt,name=Name,proto3" json:"Name,omitempty"`
@ -293,7 +384,7 @@ func (m *FilesystemVersion) Reset() { *m = FilesystemVersion{} }
func (m *FilesystemVersion) String() string { return proto.CompactTextString(m) }
func (*FilesystemVersion) ProtoMessage() {}
func (*FilesystemVersion) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_483c6918b7b3d747, []int{5}
return fileDescriptor_pdu_2d84e8d7d278a80d, []int{6}
}
func (m *FilesystemVersion) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_FilesystemVersion.Unmarshal(m, b)
@ -371,7 +462,7 @@ func (m *SendReq) Reset() { *m = SendReq{} }
func (m *SendReq) String() string { return proto.CompactTextString(m) }
func (*SendReq) ProtoMessage() {}
func (*SendReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_483c6918b7b3d747, []int{6}
return fileDescriptor_pdu_2d84e8d7d278a80d, []int{7}
}
func (m *SendReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SendReq.Unmarshal(m, b)
@ -445,7 +536,7 @@ func (m *Property) Reset() { *m = Property{} }
func (m *Property) String() string { return proto.CompactTextString(m) }
func (*Property) ProtoMessage() {}
func (*Property) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_483c6918b7b3d747, []int{7}
return fileDescriptor_pdu_2d84e8d7d278a80d, []int{8}
}
func (m *Property) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Property.Unmarshal(m, b)
@ -496,7 +587,7 @@ func (m *SendRes) Reset() { *m = SendRes{} }
func (m *SendRes) String() string { return proto.CompactTextString(m) }
func (*SendRes) ProtoMessage() {}
func (*SendRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_483c6918b7b3d747, []int{8}
return fileDescriptor_pdu_2d84e8d7d278a80d, []int{9}
}
func (m *SendRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SendRes.Unmarshal(m, b)
@ -548,7 +639,7 @@ func (m *SendCompletedReq) Reset() { *m = SendCompletedReq{} }
func (m *SendCompletedReq) String() string { return proto.CompactTextString(m) }
func (*SendCompletedReq) ProtoMessage() {}
func (*SendCompletedReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_483c6918b7b3d747, []int{9}
return fileDescriptor_pdu_2d84e8d7d278a80d, []int{10}
}
func (m *SendCompletedReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SendCompletedReq.Unmarshal(m, b)
@ -585,7 +676,7 @@ func (m *SendCompletedRes) Reset() { *m = SendCompletedRes{} }
func (m *SendCompletedRes) String() string { return proto.CompactTextString(m) }
func (*SendCompletedRes) ProtoMessage() {}
func (*SendCompletedRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_483c6918b7b3d747, []int{10}
return fileDescriptor_pdu_2d84e8d7d278a80d, []int{11}
}
func (m *SendCompletedRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SendCompletedRes.Unmarshal(m, b)
@ -620,7 +711,7 @@ func (m *ReceiveReq) Reset() { *m = ReceiveReq{} }
func (m *ReceiveReq) String() string { return proto.CompactTextString(m) }
func (*ReceiveReq) ProtoMessage() {}
func (*ReceiveReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_483c6918b7b3d747, []int{11}
return fileDescriptor_pdu_2d84e8d7d278a80d, []int{12}
}
func (m *ReceiveReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReceiveReq.Unmarshal(m, b)
@ -671,7 +762,7 @@ func (m *ReceiveRes) Reset() { *m = ReceiveRes{} }
func (m *ReceiveRes) String() string { return proto.CompactTextString(m) }
func (*ReceiveRes) ProtoMessage() {}
func (*ReceiveRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_483c6918b7b3d747, []int{12}
return fileDescriptor_pdu_2d84e8d7d278a80d, []int{13}
}
func (m *ReceiveRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReceiveRes.Unmarshal(m, b)
@ -704,7 +795,7 @@ func (m *DestroySnapshotsReq) Reset() { *m = DestroySnapshotsReq{} }
func (m *DestroySnapshotsReq) String() string { return proto.CompactTextString(m) }
func (*DestroySnapshotsReq) ProtoMessage() {}
func (*DestroySnapshotsReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_483c6918b7b3d747, []int{13}
return fileDescriptor_pdu_2d84e8d7d278a80d, []int{14}
}
func (m *DestroySnapshotsReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DestroySnapshotsReq.Unmarshal(m, b)
@ -750,7 +841,7 @@ func (m *DestroySnapshotRes) Reset() { *m = DestroySnapshotRes{} }
func (m *DestroySnapshotRes) String() string { return proto.CompactTextString(m) }
func (*DestroySnapshotRes) ProtoMessage() {}
func (*DestroySnapshotRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_483c6918b7b3d747, []int{14}
return fileDescriptor_pdu_2d84e8d7d278a80d, []int{15}
}
func (m *DestroySnapshotRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DestroySnapshotRes.Unmarshal(m, b)
@ -795,7 +886,7 @@ func (m *DestroySnapshotsRes) Reset() { *m = DestroySnapshotsRes{} }
func (m *DestroySnapshotsRes) String() string { return proto.CompactTextString(m) }
func (*DestroySnapshotsRes) ProtoMessage() {}
func (*DestroySnapshotsRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_483c6918b7b3d747, []int{15}
return fileDescriptor_pdu_2d84e8d7d278a80d, []int{16}
}
func (m *DestroySnapshotsRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DestroySnapshotsRes.Unmarshal(m, b)
@ -833,7 +924,7 @@ func (m *ReplicationCursorReq) Reset() { *m = ReplicationCursorReq{} }
func (m *ReplicationCursorReq) String() string { return proto.CompactTextString(m) }
func (*ReplicationCursorReq) ProtoMessage() {}
func (*ReplicationCursorReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_483c6918b7b3d747, []int{16}
return fileDescriptor_pdu_2d84e8d7d278a80d, []int{17}
}
func (m *ReplicationCursorReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReplicationCursorReq.Unmarshal(m, b)
@ -874,7 +965,7 @@ func (m *ReplicationCursorRes) Reset() { *m = ReplicationCursorRes{} }
func (m *ReplicationCursorRes) String() string { return proto.CompactTextString(m) }
func (*ReplicationCursorRes) ProtoMessage() {}
func (*ReplicationCursorRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_483c6918b7b3d747, []int{17}
return fileDescriptor_pdu_2d84e8d7d278a80d, []int{18}
}
func (m *ReplicationCursorRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReplicationCursorRes.Unmarshal(m, b)
@ -1010,7 +1101,7 @@ func (m *PingReq) Reset() { *m = PingReq{} }
func (m *PingReq) String() string { return proto.CompactTextString(m) }
func (*PingReq) ProtoMessage() {}
func (*PingReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_483c6918b7b3d747, []int{18}
return fileDescriptor_pdu_2d84e8d7d278a80d, []int{19}
}
func (m *PingReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_PingReq.Unmarshal(m, b)
@ -1049,7 +1140,7 @@ func (m *PingRes) Reset() { *m = PingRes{} }
func (m *PingRes) String() string { return proto.CompactTextString(m) }
func (*PingRes) ProtoMessage() {}
func (*PingRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_483c6918b7b3d747, []int{19}
return fileDescriptor_pdu_2d84e8d7d278a80d, []int{20}
}
func (m *PingRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_PingRes.Unmarshal(m, b)
@ -1082,6 +1173,7 @@ func init() {
proto.RegisterType((*Filesystem)(nil), "Filesystem")
proto.RegisterType((*ListFilesystemVersionsReq)(nil), "ListFilesystemVersionsReq")
proto.RegisterType((*ListFilesystemVersionsRes)(nil), "ListFilesystemVersionsRes")
proto.RegisterType((*SendAbstraction)(nil), "SendAbstraction")
proto.RegisterType((*FilesystemVersion)(nil), "FilesystemVersion")
proto.RegisterType((*SendReq)(nil), "SendReq")
proto.RegisterType((*Property)(nil), "Property")
@ -1098,6 +1190,7 @@ func init() {
proto.RegisterType((*PingReq)(nil), "PingReq")
proto.RegisterType((*PingRes)(nil), "PingRes")
proto.RegisterEnum("Tri", Tri_name, Tri_value)
proto.RegisterEnum("SendAbstraction_SendAbstractionType", SendAbstraction_SendAbstractionType_name, SendAbstraction_SendAbstractionType_value)
proto.RegisterEnum("FilesystemVersion_VersionType", FilesystemVersion_VersionType_name, FilesystemVersion_VersionType_value)
}
@ -1338,61 +1431,67 @@ var _Replication_serviceDesc = grpc.ServiceDesc{
Metadata: "pdu.proto",
}
func init() { proto.RegisterFile("pdu.proto", fileDescriptor_pdu_483c6918b7b3d747) }
func init() { proto.RegisterFile("pdu.proto", fileDescriptor_pdu_2d84e8d7d278a80d) }
var fileDescriptor_pdu_483c6918b7b3d747 = []byte{
// 833 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x56, 0x5f, 0x6f, 0xe3, 0x44,
0x10, 0xaf, 0x13, 0xa7, 0x75, 0x26, 0x3d, 0x2e, 0x9d, 0x96, 0x93, 0xb1, 0xe0, 0x54, 0x2d, 0x08,
0xe5, 0x2a, 0x61, 0xa1, 0xf2, 0x47, 0x42, 0x48, 0x27, 0xd1, 0xb4, 0xbd, 0x3b, 0x01, 0x47, 0xb4,
0x35, 0x27, 0x74, 0x6f, 0x26, 0x19, 0xb5, 0x56, 0x1d, 0xaf, 0xbb, 0xe3, 0xa0, 0x0b, 0xe2, 0x89,
0x47, 0xbe, 0x1e, 0x7c, 0x10, 0x3e, 0x02, 0xf2, 0xc6, 0x4e, 0x9c, 0xd8, 0x41, 0x79, 0xca, 0xce,
0x6f, 0x66, 0x77, 0x67, 0x7f, 0xf3, 0x9b, 0x71, 0xa0, 0x9b, 0x4e, 0x66, 0x7e, 0xaa, 0x55, 0xa6,
0xc4, 0x31, 0x1c, 0xfd, 0x10, 0x71, 0x76, 0x1d, 0xc5, 0xc4, 0x73, 0xce, 0x68, 0x2a, 0xe9, 0x41,
0x5c, 0xd4, 0x41, 0xc6, 0xcf, 0xa0, 0xb7, 0x02, 0xd8, 0xb5, 0x4e, 0xdb, 0x83, 0xde, 0x79, 0xcf,
0xaf, 0x04, 0x55, 0xfd, 0xe2, 0x2f, 0x0b, 0x60, 0x65, 0x23, 0x82, 0x3d, 0x0a, 0xb3, 0x3b, 0xd7,
0x3a, 0xb5, 0x06, 0x5d, 0x69, 0xd6, 0x78, 0x0a, 0x3d, 0x49, 0x3c, 0x9b, 0x52, 0xa0, 0xee, 0x29,
0x71, 0x5b, 0xc6, 0x55, 0x85, 0xf0, 0x13, 0x78, 0xf4, 0x8a, 0x47, 0x71, 0x38, 0xa6, 0x3b, 0x15,
0x4f, 0x48, 0xbb, 0xed, 0x53, 0x6b, 0xe0, 0xc8, 0x75, 0x30, 0x3f, 0xe7, 0x15, 0x5f, 0x25, 0x63,
0x3d, 0x4f, 0x33, 0x9a, 0xb8, 0xb6, 0x89, 0xa9, 0x42, 0xe2, 0x5b, 0xf8, 0x60, 0xfd, 0x41, 0x6f,
0x48, 0x73, 0xa4, 0x12, 0x96, 0xf4, 0x80, 0x4f, 0xab, 0x89, 0x16, 0x09, 0x56, 0x10, 0xf1, 0xfd,
0xf6, 0xcd, 0x8c, 0x3e, 0x38, 0xa5, 0x59, 0x50, 0x82, 0x7e, 0x2d, 0x52, 0x2e, 0x63, 0xc4, 0x3f,
0x16, 0x1c, 0xd5, 0xfc, 0x78, 0x0e, 0x76, 0x30, 0x4f, 0xc9, 0x5c, 0xfe, 0xde, 0xf9, 0xd3, 0xfa,
0x09, 0x7e, 0xf1, 0x9b, 0x47, 0x49, 0x13, 0x9b, 0x33, 0xfa, 0x3a, 0x9c, 0x52, 0x41, 0x9b, 0x59,
0xe7, 0xd8, 0x8b, 0x59, 0x34, 0x31, 0x34, 0xd9, 0xd2, 0xac, 0xf1, 0x43, 0xe8, 0x0e, 0x35, 0x85,
0x19, 0x05, 0xbf, 0xbc, 0x30, 0xdc, 0xd8, 0x72, 0x05, 0xa0, 0x07, 0x8e, 0x31, 0x22, 0x95, 0xb8,
0x1d, 0x73, 0xd2, 0xd2, 0x16, 0xcf, 0xa0, 0x57, 0xb9, 0x16, 0x0f, 0xc1, 0xb9, 0x49, 0xc2, 0x94,
0xef, 0x54, 0xd6, 0xdf, 0xcb, 0xad, 0x0b, 0xa5, 0xee, 0xa7, 0xa1, 0xbe, 0xef, 0x5b, 0xe2, 0x6f,
0x0b, 0x0e, 0x6e, 0x28, 0x99, 0xec, 0xc0, 0x27, 0x7e, 0x0a, 0xf6, 0xb5, 0x56, 0x53, 0x93, 0x78,
0x33, 0x5d, 0xc6, 0x8f, 0x02, 0x5a, 0x81, 0x32, 0x4f, 0x69, 0x8e, 0x6a, 0x05, 0x6a, 0x53, 0x42,
0x76, 0x5d, 0x42, 0x02, 0xba, 0x2b, 0x69, 0x74, 0x0c, 0xbf, 0xb6, 0x1f, 0xe8, 0x48, 0xae, 0x60,
0x7c, 0x02, 0xfb, 0x97, 0x7a, 0x2e, 0x67, 0x89, 0xbb, 0x6f, 0xb4, 0x53, 0x58, 0xe2, 0x4b, 0x70,
0x46, 0x5a, 0xa5, 0xa4, 0xb3, 0xf9, 0x92, 0x6e, 0xab, 0x42, 0xf7, 0x09, 0x74, 0xde, 0x84, 0xf1,
0xac, 0xac, 0xc1, 0xc2, 0x10, 0x7f, 0x2e, 0xb9, 0x60, 0x1c, 0xc0, 0xe3, 0x9f, 0x99, 0x26, 0x9b,
0x32, 0x77, 0xe4, 0x26, 0x8c, 0x02, 0x0e, 0xaf, 0xde, 0xa5, 0x34, 0xce, 0x68, 0x72, 0x13, 0xfd,
0x4e, 0xe6, 0xdd, 0x6d, 0xb9, 0x86, 0xe1, 0x33, 0x80, 0x22, 0x9f, 0x88, 0xd8, 0xb5, 0x8d, 0xdc,
0xba, 0x7e, 0x99, 0xa2, 0xac, 0x38, 0xc5, 0x73, 0xe8, 0xe7, 0x39, 0x0c, 0xd5, 0x34, 0x8d, 0x29,
0x23, 0x53, 0x98, 0x33, 0xe8, 0xfd, 0xa4, 0xa3, 0xdb, 0x28, 0x09, 0x63, 0x49, 0x0f, 0x05, 0xff,
0x8e, 0x5f, 0xd4, 0x4d, 0x56, 0x9d, 0x02, 0x6b, 0xfb, 0x59, 0xfc, 0x01, 0x20, 0x69, 0x4c, 0xd1,
0x6f, 0xb4, 0x4b, 0x99, 0x17, 0xe5, 0x6b, 0xfd, 0x6f, 0xf9, 0xce, 0xa0, 0x3f, 0x8c, 0x29, 0xd4,
0x55, 0x7e, 0x16, 0x2d, 0x5e, 0xc3, 0xc5, 0x61, 0xe5, 0x76, 0x16, 0xb7, 0x70, 0x7c, 0x49, 0x9c,
0x69, 0x35, 0x2f, 0x35, 0xb9, 0x4b, 0x2f, 0xe3, 0xe7, 0xd0, 0x5d, 0xc6, 0xbb, 0xad, 0xad, 0xfd,
0xba, 0x0a, 0x12, 0x6f, 0x01, 0x37, 0x2e, 0x2a, 0xda, 0xbe, 0x34, 0xcd, 0x2d, 0x5b, 0xda, 0xbe,
0x8c, 0xc9, 0x95, 0x72, 0xa5, 0xb5, 0xd2, 0xa5, 0x52, 0x8c, 0x21, 0x2e, 0x9b, 0x1e, 0x91, 0x4f,
0xda, 0x83, 0xfc, 0xe1, 0x71, 0x56, 0x8e, 0x94, 0x63, 0xbf, 0x9e, 0x82, 0x2c, 0x63, 0xc4, 0xd7,
0x70, 0x22, 0x29, 0x8d, 0xa3, 0xb1, 0xe9, 0xda, 0xe1, 0x4c, 0xb3, 0xd2, 0xbb, 0xcc, 0xb5, 0xa0,
0x71, 0x1f, 0xe3, 0x49, 0x31, 0x44, 0xf2, 0x1d, 0xf6, 0xcb, 0xbd, 0xe5, 0x18, 0x71, 0x5e, 0xab,
0x8c, 0xde, 0x45, 0x9c, 0x2d, 0x24, 0xfc, 0x72, 0x4f, 0x2e, 0x91, 0x0b, 0x07, 0xf6, 0x17, 0xe9,
0x88, 0x8f, 0xe1, 0x60, 0x14, 0x25, 0xb7, 0x79, 0x02, 0x2e, 0x1c, 0xfc, 0x48, 0xcc, 0xe1, 0x6d,
0xd9, 0x35, 0xa5, 0x29, 0x3e, 0x2a, 0x83, 0x38, 0xef, 0xab, 0xab, 0xf1, 0x9d, 0x2a, 0xfb, 0x2a,
0x5f, 0x9f, 0x0d, 0xa0, 0x1d, 0xe8, 0x28, 0x1f, 0x31, 0x97, 0x2a, 0xc9, 0x86, 0xa1, 0xa6, 0xfe,
0x1e, 0x76, 0xa1, 0x73, 0x1d, 0xc6, 0x4c, 0x7d, 0x0b, 0x1d, 0xb0, 0x03, 0x3d, 0xa3, 0x7e, 0xeb,
0xfc, 0xdf, 0x56, 0x3e, 0x00, 0x96, 0x8f, 0x40, 0x0f, 0xec, 0xfc, 0x60, 0x74, 0xfc, 0x22, 0x09,
0xaf, 0x5c, 0x31, 0x7e, 0x03, 0x8f, 0xd7, 0xe7, 0x38, 0x23, 0xfa, 0xb5, 0x8f, 0x9f, 0x57, 0xc7,
0x18, 0x47, 0xf0, 0xa4, 0xf9, 0x13, 0x80, 0x9e, 0xbf, 0xf5, 0xc3, 0xe2, 0x6d, 0xf7, 0x31, 0x3e,
0x87, 0xfe, 0x66, 0xe9, 0xf1, 0xc4, 0x6f, 0x90, 0xb4, 0xd7, 0x84, 0x32, 0x7e, 0x07, 0x47, 0xb5,
0xe2, 0xe1, 0xfb, 0x7e, 0x93, 0x10, 0xbc, 0x46, 0x98, 0xf1, 0x2b, 0x78, 0xb4, 0xd6, 0xe2, 0x78,
0xe4, 0x6f, 0x8e, 0x0c, 0xaf, 0x06, 0xf1, 0x45, 0xe7, 0x6d, 0x3b, 0x9d, 0xcc, 0x7e, 0xdd, 0x37,
0xff, 0x1f, 0xbe, 0xf8, 0x2f, 0x00, 0x00, 0xff, 0xff, 0x27, 0x95, 0xc1, 0x78, 0x4c, 0x08, 0x00,
0x00,
var fileDescriptor_pdu_2d84e8d7d278a80d = []byte{
// 940 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x56, 0x6d, 0x6f, 0xe3, 0xc4,
0x13, 0xaf, 0x13, 0xa7, 0x75, 0x26, 0xed, 0xbf, 0xee, 0x24, 0xff, 0x23, 0x44, 0x70, 0xaa, 0x96,
0x13, 0xca, 0x55, 0x60, 0xa1, 0xf0, 0x20, 0x10, 0xe8, 0xa4, 0x6b, 0xd2, 0x5e, 0x8b, 0xe0, 0x88,
0xb6, 0xb9, 0x0a, 0x9d, 0xc4, 0x0b, 0x37, 0x1e, 0x5a, 0xab, 0x8e, 0xd7, 0xdd, 0x75, 0xd0, 0x05,
0xf1, 0x8a, 0x77, 0xf0, 0xf5, 0xe0, 0x73, 0x20, 0x3e, 0x02, 0xf2, 0xc6, 0x4e, 0x1c, 0xdb, 0x3d,
0xf5, 0x55, 0x76, 0x7e, 0x33, 0xe3, 0x9d, 0xc7, 0xdf, 0x06, 0x9a, 0x91, 0x37, 0x77, 0x22, 0x29,
0x62, 0xc1, 0xda, 0x70, 0xf0, 0x9d, 0xaf, 0xe2, 0x53, 0x3f, 0x20, 0xb5, 0x50, 0x31, 0xcd, 0x38,
0xdd, 0xb1, 0xe3, 0x32, 0xa8, 0xf0, 0x63, 0x68, 0xad, 0x01, 0xd5, 0x35, 0x0e, 0xeb, 0xfd, 0xd6,
0xa0, 0xe5, 0xe4, 0x8c, 0xf2, 0x7a, 0xf6, 0xa7, 0x01, 0xb0, 0x96, 0x11, 0xc1, 0x1c, 0xbb, 0xf1,
0x4d, 0xd7, 0x38, 0x34, 0xfa, 0x4d, 0xae, 0xcf, 0x78, 0x08, 0x2d, 0x4e, 0x6a, 0x3e, 0xa3, 0x89,
0xb8, 0xa5, 0xb0, 0x5b, 0xd3, 0xaa, 0x3c, 0x84, 0x4f, 0x60, 0xef, 0x5c, 0x8d, 0x03, 0x77, 0x4a,
0x37, 0x22, 0xf0, 0x48, 0x76, 0xeb, 0x87, 0x46, 0xdf, 0xe2, 0x9b, 0x60, 0xf2, 0x9d, 0x73, 0x75,
0x12, 0x4e, 0xe5, 0x22, 0x8a, 0xc9, 0xeb, 0x9a, 0xda, 0x26, 0x0f, 0xb1, 0xaf, 0xe1, 0xdd, 0xcd,
0x84, 0x2e, 0x49, 0x2a, 0x5f, 0x84, 0x8a, 0xd3, 0x1d, 0x3e, 0xce, 0x07, 0x9a, 0x06, 0x98, 0x43,
0xd8, 0x1f, 0xc6, 0xfd, 0xde, 0x0a, 0x1d, 0xb0, 0x32, 0x31, 0xad, 0x09, 0x3a, 0x25, 0x4b, 0xbe,
0xb2, 0xc1, 0x6f, 0xc0, 0xbe, 0xa0, 0xd0, 0x7b, 0x7e, 0xa5, 0x62, 0xe9, 0x4e, 0x63, 0xed, 0x57,
0xd3, 0x7e, 0xb6, 0x53, 0x50, 0xf0, 0x92, 0x25, 0xfb, 0xc7, 0x80, 0xfd, 0x02, 0x88, 0x5f, 0x82,
0x39, 0x59, 0x44, 0xa4, 0x23, 0xff, 0xdf, 0xe0, 0x49, 0xf1, 0x2b, 0x45, 0x39, 0xb1, 0xe5, 0xda,
0x03, 0x3b, 0xd0, 0xf8, 0x56, 0x5c, 0x9d, 0x8f, 0xd2, 0xd2, 0x2f, 0x05, 0xfc, 0x08, 0x76, 0xd2,
0x68, 0x75, 0xb9, 0xab, 0x13, 0xca, 0x4c, 0xd8, 0x4f, 0xd0, 0xae, 0xb8, 0x00, 0xf7, 0xa0, 0xf9,
0x2a, 0xf4, 0xe8, 0x67, 0x3f, 0x24, 0xcf, 0xde, 0xc2, 0x77, 0xa0, 0xcd, 0x29, 0x0a, 0xfc, 0xa9,
0x9b, 0x58, 0x0c, 0xe7, 0x52, 0x09, 0x79, 0x39, 0xb0, 0x0d, 0xdc, 0x05, 0xeb, 0x22, 0xa6, 0xe8,
0x4c, 0x04, 0x9e, 0x5d, 0x43, 0x1b, 0x76, 0x13, 0xe9, 0x58, 0x88, 0xdb, 0x99, 0x2b, 0x6f, 0xed,
0x3a, 0xfb, 0xdb, 0x80, 0x83, 0xd2, 0xed, 0x38, 0xd8, 0x48, 0xf9, 0x71, 0x39, 0x3e, 0x27, 0xfd,
0xcd, 0x25, 0x8b, 0x60, 0xbe, 0x74, 0x67, 0x94, 0xe6, 0xaa, 0xcf, 0x09, 0xf6, 0x62, 0xee, 0x7b,
0x3a, 0x4f, 0x93, 0xeb, 0x33, 0xbe, 0x07, 0xcd, 0xa1, 0x24, 0x37, 0xa6, 0xc9, 0x8f, 0x2f, 0xf4,
0x2c, 0x99, 0x7c, 0x0d, 0x60, 0x0f, 0x2c, 0x2d, 0x24, 0xd5, 0x69, 0xe8, 0x2f, 0xad, 0x64, 0xf6,
0x14, 0x5a, 0xb9, 0x6b, 0x75, 0x6a, 0xa1, 0x1b, 0xa9, 0x1b, 0x11, 0xdb, 0x5b, 0x89, 0xb4, 0x4a,
0xcb, 0x60, 0x7f, 0x19, 0xb0, 0x93, 0x94, 0xed, 0x01, 0xf3, 0x87, 0x1f, 0x82, 0x79, 0x2a, 0xc5,
0x4c, 0x07, 0x5e, 0xdd, 0x0c, 0xad, 0x47, 0x06, 0xb5, 0x89, 0x78, 0x4b, 0xcb, 0x6a, 0x13, 0x51,
0x5c, 0x39, 0xb3, 0xbc, 0x72, 0x0c, 0x9a, 0xeb, 0x55, 0x6a, 0xe8, 0xfa, 0x9a, 0xce, 0x44, 0xfa,
0x7c, 0x0d, 0xe3, 0x23, 0xd8, 0x1e, 0xc9, 0x05, 0x9f, 0x87, 0xdd, 0x6d, 0xbd, 0x6b, 0xa9, 0xc4,
0x3e, 0x03, 0x6b, 0x2c, 0x45, 0x44, 0x32, 0x5e, 0xac, 0xca, 0x6d, 0xe4, 0xca, 0xdd, 0x81, 0xc6,
0xa5, 0x1b, 0xcc, 0xb3, 0x1e, 0x2c, 0x05, 0xf6, 0xfb, 0xaa, 0x16, 0x0a, 0xfb, 0xb0, 0xff, 0x4a,
0x91, 0x57, 0xa4, 0x05, 0x8b, 0x17, 0x61, 0x64, 0xb0, 0x7b, 0xf2, 0x26, 0xa2, 0x69, 0x4c, 0xde,
0x85, 0xff, 0x2b, 0xe9, 0xbc, 0xeb, 0x7c, 0x03, 0xc3, 0xa7, 0x00, 0x69, 0x3c, 0x3e, 0xa9, 0xae,
0xa9, 0xb7, 0xac, 0xe9, 0x64, 0x21, 0xf2, 0x9c, 0x92, 0x3d, 0x5b, 0xae, 0xe5, 0x50, 0xcc, 0xa2,
0x80, 0x62, 0xd2, 0x8d, 0x39, 0x82, 0xd6, 0x0f, 0xd2, 0xbf, 0xf6, 0x43, 0x37, 0xe0, 0x74, 0x97,
0xd6, 0xdf, 0x72, 0xd2, 0xbe, 0xf1, 0xbc, 0x92, 0x61, 0xc9, 0x5f, 0xb1, 0xdf, 0x00, 0x38, 0x4d,
0xc9, 0xff, 0x85, 0x1e, 0xd2, 0xe6, 0x65, 0xfb, 0x6a, 0x6f, 0x6d, 0xdf, 0x11, 0xd8, 0xc3, 0x80,
0x5c, 0x99, 0xaf, 0xcf, 0x92, 0x12, 0x4b, 0x38, 0xdb, 0xcd, 0xdd, 0xae, 0xd8, 0x35, 0xb4, 0x47,
0xa4, 0x62, 0x29, 0x16, 0xd9, 0x4c, 0x3e, 0x84, 0xfb, 0xf0, 0x13, 0x68, 0xae, 0xec, 0x53, 0x9a,
0xaa, 0x8a, 0x6d, 0x6d, 0xc4, 0x5e, 0x03, 0x16, 0x2e, 0x4a, 0x59, 0x32, 0x13, 0xf5, 0x2d, 0xf7,
0xb0, 0x64, 0x66, 0x93, 0x4c, 0xca, 0x89, 0x94, 0x42, 0x66, 0x93, 0xa2, 0x05, 0x36, 0xaa, 0x4a,
0x22, 0x79, 0x99, 0x76, 0x92, 0xc4, 0x83, 0x38, 0x63, 0xe0, 0xb6, 0x53, 0x0e, 0x81, 0x67, 0x36,
0xec, 0x0b, 0xe8, 0x94, 0xb8, 0xe8, 0x21, 0xef, 0xc0, 0xa4, 0xd2, 0x4f, 0x61, 0x27, 0x25, 0x91,
0xc4, 0xc3, 0x3c, 0xdb, 0x5a, 0xd1, 0x88, 0xf5, 0x52, 0xc4, 0xf4, 0xc6, 0x57, 0xf1, 0x72, 0x84,
0xcf, 0xb6, 0xf8, 0x0a, 0x39, 0xb6, 0x60, 0x7b, 0x19, 0x0e, 0xfb, 0x00, 0x76, 0xc6, 0x7e, 0x78,
0x9d, 0x04, 0xd0, 0x85, 0x9d, 0xef, 0x49, 0x29, 0xf7, 0x3a, 0xdb, 0x9a, 0x4c, 0x64, 0xef, 0x67,
0x46, 0x2a, 0xd9, 0xab, 0x93, 0xe9, 0x8d, 0xc8, 0xf6, 0x2a, 0x39, 0x1f, 0xf5, 0xa1, 0x3e, 0x91,
0x7e, 0x42, 0x31, 0x23, 0x11, 0xc6, 0x43, 0x57, 0x92, 0xbd, 0x85, 0x4d, 0x68, 0x9c, 0xba, 0x81,
0x22, 0xdb, 0x40, 0x0b, 0xcc, 0x89, 0x9c, 0x93, 0x5d, 0x1b, 0xfc, 0x5b, 0x4b, 0x08, 0x60, 0x95,
0x04, 0xf6, 0xc0, 0x4c, 0x3e, 0x8c, 0x96, 0x93, 0x06, 0xd1, 0xcb, 0x4e, 0x0a, 0xbf, 0x82, 0xfd,
0xcd, 0x67, 0x4f, 0x21, 0x3a, 0xa5, 0x3f, 0x0b, 0xbd, 0x32, 0xa6, 0x70, 0x0c, 0x8f, 0xaa, 0x5f,
0x4c, 0xec, 0x39, 0xf7, 0x3e, 0xc4, 0xbd, 0xfb, 0x75, 0x0a, 0x9f, 0x81, 0x5d, 0x6c, 0x3d, 0x76,
0x9c, 0x8a, 0x91, 0xee, 0x55, 0xa1, 0x0a, 0x9f, 0xc3, 0x41, 0xa9, 0x79, 0xf8, 0x7f, 0xa7, 0x6a,
0x10, 0x7a, 0x95, 0xb0, 0xc2, 0xcf, 0x61, 0x6f, 0x63, 0xc5, 0xf1, 0xc0, 0x29, 0x52, 0x46, 0xaf,
0x04, 0xa9, 0xe3, 0xc6, 0xeb, 0x7a, 0xe4, 0xcd, 0xaf, 0xb6, 0xf5, 0xff, 0xad, 0x4f, 0xff, 0x0b,
0x00, 0x00, 0xff, 0xff, 0x0a, 0xa0, 0x3f, 0xc2, 0x7c, 0x09, 0x00, 0x00,
}

View File

@ -25,7 +25,22 @@ message Filesystem {
message ListFilesystemVersionsReq { string Filesystem = 1; }
message ListFilesystemVersionsRes { repeated FilesystemVersion Versions = 1; }
message ListFilesystemVersionsRes {
repeated FilesystemVersion Versions = 1;
repeated SendAbstraction SendAbstractions = 2;
}
message SendAbstraction {
enum SendAbstractionType {
Undefined = 0;
ReplicationCursorV2 = 1;
StepHold = 2;
StepBookmark = 3;
};
SendAbstractionType Type = 1;
string JobID = 2;
FilesystemVersion Version = 3;
}
message FilesystemVersion {
enum VersionType {
@ -80,9 +95,7 @@ message SendRes {
repeated Property Properties = 4;
}
message SendCompletedReq {
SendReq OriginalReq = 2;
}
message SendCompletedReq { SendReq OriginalReq = 2; }
message SendCompletedRes {}
@ -98,7 +111,7 @@ message ReceiveReq {
message ReceiveRes {}
message DestroySnapshotsReq {
string Filesystem = 1;
string Filesystem = 1;
// Path to filesystem, snapshot or bookmark to be destroyed
repeated FilesystemVersion Snapshots = 2;
}

View File

@ -115,6 +115,8 @@ func (v FilesystemVersion) RelName() string {
}
func (v FilesystemVersion) String() string { return v.RelName() }
func (v FilesystemVersion) GetCreation() time.Time { return v.Creation }
// Only takes into account those attributes of FilesystemVersion that
// are immutable over time in ZFS.
func FilesystemVersionEqualIdentity(a, b FilesystemVersion) bool {

View File

@ -1500,6 +1500,30 @@ func tryParseDestroySnapshotsError(arg string, stderr []byte) *DestroySnapshotsE
}
}
type ErrDestroySnapshotDatasetIsBusy struct {
*DestroySnapshotsError
Name string
}
var _ error = (*ErrDestroySnapshotDatasetIsBusy)(nil)
func tryErrDestroySnapshotDatasetIsBusy(arg string, stderr []byte) *ErrDestroySnapshotDatasetIsBusy {
dsne := tryParseDestroySnapshotsError(arg, stderr)
if dsne == nil {
return nil
}
if len(dsne.Reason) != 1 {
return nil
}
if dsne.Reason[0] == "dataset is busy" {
return &ErrDestroySnapshotDatasetIsBusy{
DestroySnapshotsError: dsne,
Name: dsne.Undestroyable[0],
}
}
return nil
}
func ZFSDestroy(ctx context.Context, arg string) (err error) {
var dstype, filesystem string
@ -1533,6 +1557,8 @@ func ZFSDestroy(ctx context.Context, arg string) (err error) {
err = &DatasetDoesNotExist{arg}
} else if dsNotExistErr := tryDatasetDoesNotExist(filesystem, stdio); dsNotExistErr != nil {
err = dsNotExistErr
} else if dsBusy := tryErrDestroySnapshotDatasetIsBusy(arg, stdio); dsBusy != nil {
err = dsBusy
} else if dserr := tryParseDestroySnapshotsError(arg, stdio); dserr != nil {
err = dserr
}