mirror of
https://github.com/zrepl/zrepl.git
synced 2025-02-18 03:11:55 +01:00
455 lines
10 KiB
Go
455 lines
10 KiB
Go
package pruner
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/zrepl/zrepl/daemon/logging"
|
|
"github.com/zrepl/zrepl/daemon/logging/trace"
|
|
"github.com/zrepl/zrepl/endpoint"
|
|
"github.com/zrepl/zrepl/pruning"
|
|
"github.com/zrepl/zrepl/zfs"
|
|
)
|
|
|
|
type Pruner struct {
|
|
fsfilter endpoint.FSFilter
|
|
jid endpoint.JobID
|
|
side Side
|
|
keepRules []pruning.KeepRule
|
|
|
|
// all channels consumed by the run loop
|
|
reportReqs chan reportRequest
|
|
stopReqs chan stopRequest
|
|
done chan struct{}
|
|
fsListRes chan fsListRes
|
|
|
|
state State
|
|
|
|
listFilesystemsError error // only in state StateListFilesystemsError
|
|
fsPruners []*FSPruner // only in state StateFanOutFilesystems
|
|
}
|
|
|
|
//go:generate enumer -type=State -json
|
|
type State int
|
|
|
|
const (
|
|
StateInitialized State = iota
|
|
StateListFilesystems
|
|
StateListFilesystemsError
|
|
StateFanOutFilesystems
|
|
StateDone
|
|
)
|
|
|
|
type Report struct {
|
|
State State
|
|
ListFilesystemsError error // only valid in StateListFilesystemsError
|
|
Filesystems []*FSReport // valid from StateFanOutFilesystems
|
|
}
|
|
|
|
type reportRequest struct {
|
|
ctx context.Context
|
|
reply chan *Report
|
|
}
|
|
|
|
type runRequest struct {
|
|
complete chan struct{}
|
|
}
|
|
|
|
type stopRequest struct {
|
|
complete chan struct{}
|
|
}
|
|
|
|
type fsListRes struct {
|
|
filesystems []*zfs.DatasetPath
|
|
err error
|
|
}
|
|
|
|
type Side interface {
|
|
// may return both nil, indicating there is no replication position
|
|
GetReplicationPosition(ctx context.Context, fs string) (*zfs.FilesystemVersion, error)
|
|
isSide() Side
|
|
}
|
|
|
|
func NewPruner(fsfilter endpoint.FSFilter, jid endpoint.JobID, side Side, keepRules []pruning.KeepRule) *Pruner {
|
|
return &Pruner{
|
|
fsfilter,
|
|
jid,
|
|
side,
|
|
keepRules,
|
|
make(chan reportRequest),
|
|
make(chan stopRequest),
|
|
make(chan struct{}),
|
|
make(chan fsListRes),
|
|
StateInitialized,
|
|
nil,
|
|
nil,
|
|
}
|
|
}
|
|
|
|
func (p *Pruner) Run(ctx context.Context) *Report {
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
if p.state != StateInitialized {
|
|
panic("Run can onl[y be called once")
|
|
}
|
|
|
|
go func() {
|
|
fss, err := zfs.ZFSListMapping(ctx, p.fsfilter)
|
|
p.fsListRes <- fsListRes{fss, err}
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case res := <-p.fsListRes:
|
|
if res.err != nil {
|
|
p.state = StateListFilesystemsError
|
|
p.listFilesystemsError = res.err
|
|
close(p.done)
|
|
continue
|
|
}
|
|
|
|
p.state = StateFanOutFilesystems
|
|
|
|
p.fsPruners = make([]*FSPruner, len(res.filesystems))
|
|
_, add, end := trace.WithTaskGroup(ctx, "pruner-fan-out-fs")
|
|
for i, fs := range res.filesystems {
|
|
p.fsPruners[i] = NewFSPruner(p.jid, p.side, p.keepRules, fs)
|
|
add(func(ctx context.Context) {
|
|
p.fsPruners[i].Run(ctx)
|
|
})
|
|
}
|
|
go func() {
|
|
end()
|
|
close(p.done)
|
|
}()
|
|
|
|
case req := <-p.stopReqs:
|
|
cancel()
|
|
go func() {
|
|
<-p.done
|
|
close(req.complete)
|
|
}()
|
|
case req := <-p.reportReqs:
|
|
req.reply <- p.report(req.ctx)
|
|
case <-p.done:
|
|
p.state = StateDone
|
|
return p.report(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Pruner) Report(ctx context.Context) *Report {
|
|
req := reportRequest{
|
|
ctx: ctx,
|
|
reply: make(chan *Report, 1),
|
|
}
|
|
select {
|
|
case p.reportReqs <- req:
|
|
return <-req.reply
|
|
case <-ctx.Done():
|
|
return nil
|
|
case <-p.done:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (p *Pruner) report(ctx context.Context) *Report {
|
|
fsreports := make([]*FSReport, len(p.fsPruners))
|
|
for i := range fsreports {
|
|
fsreports[i] = p.fsPruners[i].report()
|
|
}
|
|
return &Report{
|
|
State: p.state,
|
|
ListFilesystemsError: p.listFilesystemsError,
|
|
Filesystems: fsreports,
|
|
}
|
|
}
|
|
|
|
// implements pruning.Snapshot
|
|
type snapshot struct {
|
|
replicated bool
|
|
stepHolds []pruning.StepHold
|
|
zfs.FilesystemVersion
|
|
|
|
state SnapState
|
|
destroyOp *zfs.DestroySnapOp
|
|
}
|
|
|
|
//go:generate enumer -type=SnapState -json
|
|
type SnapState int
|
|
|
|
const (
|
|
SnapStateInitialized SnapState = iota
|
|
SnapStateKeeping
|
|
SnapStateDeletePending
|
|
SnapStateDeleteAttempted
|
|
)
|
|
|
|
// implements pruning.StepHold
|
|
type stepHold struct {
|
|
endpoint.Abstraction
|
|
}
|
|
|
|
func (s snapshot) Replicated() bool { return s.replicated }
|
|
func (s snapshot) StepHolds() []pruning.StepHold { return s.stepHolds }
|
|
|
|
func (s stepHold) GetJobID() endpoint.JobID { return *s.Abstraction.GetJobID() }
|
|
|
|
type FSPruner struct {
|
|
jid endpoint.JobID
|
|
side Side
|
|
keepRules []pruning.KeepRule
|
|
fsp *zfs.DatasetPath
|
|
|
|
state FSState
|
|
|
|
// all channels consumed by the run loop
|
|
planned chan fsPlanRes
|
|
executed chan fsExecuteRes
|
|
done chan struct{}
|
|
reportReqs chan fsReportReq
|
|
|
|
keepList []*snapshot // valid in FSStateExecuting and forward
|
|
destroyList []*snapshot // valid in FSStateExecuting and forward, field .destroyOp is invalid until FSStateExecuting is left
|
|
|
|
}
|
|
|
|
type fsPlanRes struct {
|
|
keepList []*snapshot
|
|
destroyList []*snapshot
|
|
err error
|
|
}
|
|
|
|
type fsExecuteRes struct {
|
|
completedDestroyOps []*zfs.DestroySnapOp // same len() as FSPruner.destroyList
|
|
}
|
|
|
|
type fsReportReq struct {
|
|
res chan *FSReport
|
|
}
|
|
|
|
type FSReport struct {
|
|
State FSState
|
|
KeepList []*SnapReport
|
|
Destroy []*SnapReport
|
|
}
|
|
|
|
type SnapReport struct {
|
|
State SnapState
|
|
Name string
|
|
Replicated bool
|
|
StepHoldCount int
|
|
DestroyError error
|
|
}
|
|
|
|
//go:generate enumer -type=FSState -json
|
|
type FSState int
|
|
|
|
const (
|
|
FSStateInitialized FSState = iota
|
|
FSStatePlanning
|
|
FSStatePlanErr
|
|
FSStateExecuting
|
|
FSStateExecuteErr
|
|
FSStateExecuteSuccess
|
|
)
|
|
|
|
func (s FSState) IsTerminal() bool {
|
|
return s == FSStatePlanErr || s == FSStateExecuteErr || s == FSStateExecuteSuccess
|
|
}
|
|
|
|
func NewFSPruner(jid endpoint.JobID, side Side, keepRules []pruning.KeepRule, fsp *zfs.DatasetPath) *FSPruner {
|
|
return &FSPruner{
|
|
jid, side, keepRules, fsp,
|
|
FSStateInitialized,
|
|
make(chan fsPlanRes),
|
|
make(chan fsExecuteRes),
|
|
make(chan struct{}),
|
|
make(chan fsReportReq),
|
|
nil, nil,
|
|
}
|
|
}
|
|
|
|
func (p *FSPruner) Run(ctx context.Context) *FSReport {
|
|
|
|
defer func() {
|
|
}()
|
|
|
|
p.state = FSStatePlanning
|
|
|
|
go func() { p.planned <- p.plan(ctx) }()
|
|
|
|
out:
|
|
for !p.state.IsTerminal() {
|
|
select {
|
|
case res := <-p.planned:
|
|
|
|
if res.err != nil {
|
|
p.state = FSStatePlanErr
|
|
continue
|
|
}
|
|
p.state = FSStateExecuting
|
|
p.keepList = res.keepList
|
|
p.destroyList = res.destroyList
|
|
|
|
go func() { p.executed <- p.execute(ctx, p.destroyList) }()
|
|
|
|
case res := <-p.executed:
|
|
|
|
if len(res.completedDestroyOps) != len(p.destroyList) {
|
|
panic("impl error: completedDestroyOps is a vector corresponding to entries in p.destroyList")
|
|
}
|
|
|
|
var erronous []*zfs.DestroySnapOp
|
|
for i, op := range res.completedDestroyOps {
|
|
if *op.ErrOut != nil {
|
|
erronous = append(erronous, op)
|
|
}
|
|
p.destroyList[i].destroyOp = op
|
|
p.destroyList[i].state = SnapStateDeleteAttempted
|
|
}
|
|
if len(erronous) > 0 {
|
|
p.state = FSStateExecuteErr
|
|
} else {
|
|
p.state = FSStateExecuteSuccess
|
|
}
|
|
|
|
close(p.done)
|
|
|
|
case <-p.reportReqs:
|
|
panic("unimp")
|
|
case <-p.done:
|
|
break out
|
|
}
|
|
}
|
|
|
|
// TODO render last FS report
|
|
return nil
|
|
}
|
|
|
|
func (p *FSPruner) plan(ctx context.Context) fsPlanRes {
|
|
fs := p.fsp.ToString()
|
|
vs, err := zfs.ZFSListFilesystemVersions(ctx, p.fsp, zfs.ListFilesystemVersionsOptions{})
|
|
if err != nil {
|
|
return fsPlanRes{err: errors.Wrap(err, "list filesystem versions")}
|
|
}
|
|
|
|
allJobsStepHolds, absErrs, err := endpoint.ListAbstractions(ctx, endpoint.ListZFSHoldsAndBookmarksQuery{
|
|
FS: endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{
|
|
FS: &fs,
|
|
},
|
|
What: endpoint.AbstractionTypeSet{
|
|
endpoint.AbstractionStepHold: true,
|
|
},
|
|
Concurrency: 1,
|
|
})
|
|
if err != nil {
|
|
return fsPlanRes{err: errors.Wrap(err, "list abstractions")}
|
|
}
|
|
if len(absErrs) > 0 {
|
|
logging.GetLogger(ctx, logging.SubsysPruning).WithError(endpoint.ListAbstractionsErrors(absErrs)).
|
|
Error("error listing some step holds, prune attempt might fail with 'dataset is busy' errors")
|
|
}
|
|
|
|
repPos, err := p.side.GetReplicationPosition(ctx, p.fsp.ToString())
|
|
if err != nil {
|
|
return fsPlanRes{err: errors.Wrap(err, "get replication position")}
|
|
}
|
|
|
|
vsAsSnaps := make([]pruning.Snapshot, len(vs))
|
|
for i := range vs {
|
|
var repPosCreateTxgOrZero uint64
|
|
if repPos != nil {
|
|
repPosCreateTxgOrZero = repPos.GetCreateTXG()
|
|
}
|
|
s := &snapshot{
|
|
state: SnapStateInitialized,
|
|
FilesystemVersion: vs[i],
|
|
replicated: vs[i].GetCreateTXG() <= repPosCreateTxgOrZero,
|
|
}
|
|
for _, h := range allJobsStepHolds {
|
|
if zfs.FilesystemVersionEqualIdentity(vs[i], h.GetFilesystemVersion()) {
|
|
s.stepHolds = append(s.stepHolds, stepHold{h})
|
|
}
|
|
}
|
|
vsAsSnaps[i] = s
|
|
}
|
|
|
|
downcastToSnapshots := func(l []pruning.Snapshot) (r []*snapshot) {
|
|
r = make([]*snapshot, len(l))
|
|
for i, e := range l {
|
|
r[i] = e.(*snapshot)
|
|
}
|
|
return r
|
|
}
|
|
pruningResult := pruning.PruneSnapshots(vsAsSnaps, p.keepRules)
|
|
remove, keep := downcastToSnapshots(pruningResult.Remove), downcastToSnapshots(pruningResult.Keep)
|
|
if len(remove)+len(keep) != len(vsAsSnaps) {
|
|
for _, s := range vsAsSnaps {
|
|
r, _ := json.MarshalIndent(s.(*snapshot).report(), "", " ")
|
|
fmt.Fprintf(os.Stderr, "%s\n", string(r))
|
|
}
|
|
panic("indecisive")
|
|
}
|
|
|
|
for _, s := range remove {
|
|
s.state = SnapStateDeletePending
|
|
}
|
|
for _, s := range keep {
|
|
s.state = SnapStateKeeping
|
|
}
|
|
|
|
return fsPlanRes{keepList: keep, destroyList: remove, err: nil}
|
|
}
|
|
|
|
func (p *FSPruner) execute(ctx context.Context, destroyList []*snapshot) fsExecuteRes {
|
|
ops := make([]*zfs.DestroySnapOp, len(destroyList))
|
|
for i, fsv := range p.destroyList {
|
|
ops[i] = &zfs.DestroySnapOp{
|
|
Filesystem: p.fsp.ToString(),
|
|
Name: fsv.GetName(),
|
|
ErrOut: new(error),
|
|
}
|
|
}
|
|
zfs.ZFSDestroyFilesystemVersions(ctx, ops)
|
|
|
|
return fsExecuteRes{completedDestroyOps: ops}
|
|
}
|
|
|
|
func (p *FSPruner) report() *FSReport {
|
|
return &FSReport{
|
|
State: p.state,
|
|
KeepList: p.reportRenderSnapReports(p.keepList),
|
|
Destroy: p.reportRenderSnapReports(p.destroyList),
|
|
}
|
|
}
|
|
|
|
func (p *FSPruner) reportRenderSnapReports(l []*snapshot) (r []*SnapReport) {
|
|
r = make([]*SnapReport, len(l))
|
|
for i := range l {
|
|
r[i] = l[i].report()
|
|
}
|
|
return r
|
|
}
|
|
|
|
func (s *snapshot) report() *SnapReport {
|
|
var snapErr error
|
|
if s.state == SnapStateDeleteAttempted {
|
|
if *s.destroyOp.ErrOut != nil {
|
|
snapErr = (*s.destroyOp.ErrOut)
|
|
}
|
|
}
|
|
return &SnapReport{
|
|
State: s.state,
|
|
Name: s.Name,
|
|
Replicated: s.Replicated(),
|
|
StepHoldCount: len(s.stepHolds),
|
|
DestroyError: snapErr,
|
|
}
|
|
}
|