Christian Schwarz 438f950be3 pruner: improve cancellation + error handling strategy
Pruner now backs off as soon as there is an error, making that error the
Error field in the pruner report.
The error is also stored in the specific *fs that failed, and we
maintain an error counter per *fs to de-prioritize those fs that failed.
Like with replication, the de-prioritization on errors is to avoid '
getting stuck' with an individual filesystem until the watchdog hits.
2018-10-20 12:46:43 +02:00

569 lines
13 KiB

package pruner
import (
// Try to keep it compatible with gitub.com/zrepl/zrepl/replication.Endpoint
type History interface {
ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error)
type Target interface {
ListFilesystems(ctx context.Context) ([]*pdu.Filesystem, error)
ListFilesystemVersions(ctx context.Context, fs string) ([]*pdu.FilesystemVersion, error) // fix depS
DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error)
type Logger = logger.Logger
type contextKey int
const contextKeyLogger contextKey = 0
func WithLogger(ctx context.Context, log Logger) context.Context {
return context.WithValue(ctx, contextKeyLogger, log)
func GetLogger(ctx context.Context) Logger {
if l, ok := ctx.Value(contextKeyLogger).(Logger); ok {
return l
return logger.NewNullLogger()
type args struct {
ctx context.Context
target Target
receiver History
rules []pruning.KeepRule
retryWait time.Duration
considerSnapAtCursorReplicated bool
promPruneSecs prometheus.Observer
type Pruner struct {
args args
Progress watchdog.KeepAlive
mtx sync.RWMutex
state State
// State ErrWait|ErrPerm
sleepUntil time.Time
err error
// State Exec
execQueue *execQueue
type PrunerFactory struct {
senderRules []pruning.KeepRule
receiverRules []pruning.KeepRule
retryWait time.Duration
considerSnapAtCursorReplicated bool
promPruneSecs *prometheus.HistogramVec
func checkContainsKeep1(rules []pruning.KeepRule) error {
if len(rules) == 0 {
return nil //No keep rules means keep all - ok
for _, e := range rules {
switch e.(type) {
case *pruning.KeepLastN:
return nil
return errors.New("sender keep rules must contain last_n or be empty so that the last snapshot is definitely kept")
func NewPrunerFactory(in config.PruningSenderReceiver, promPruneSecs *prometheus.HistogramVec) (*PrunerFactory, error) {
keepRulesReceiver, err := pruning.RulesFromConfig(in.KeepReceiver)
if err != nil {
return nil, errors.Wrap(err, "cannot build receiver pruning rules")
keepRulesSender, err := pruning.RulesFromConfig(in.KeepSender)
if err != nil {
return nil, errors.Wrap(err, "cannot build sender pruning rules")
considerSnapAtCursorReplicated := false
for _, r := range in.KeepSender {
knr, ok := r.Ret.(*config.PruneKeepNotReplicated)
if !ok {
considerSnapAtCursorReplicated = considerSnapAtCursorReplicated || !knr.KeepSnapshotAtCursor
f := &PrunerFactory{
senderRules: keepRulesSender,
receiverRules: keepRulesReceiver,
retryWait: envconst.Duration("ZREPL_PRUNER_RETRY_INTERVAL", 4 * time.Second),
considerSnapAtCursorReplicated: considerSnapAtCursorReplicated,
promPruneSecs: promPruneSecs,
return f, nil
func (f *PrunerFactory) BuildSenderPruner(ctx context.Context, target Target, receiver History) *Pruner {
p := &Pruner{
args: args{
WithLogger(ctx, GetLogger(ctx).WithField("prune_side", "sender")),
state: Plan,
return p
func (f *PrunerFactory) BuildReceiverPruner(ctx context.Context, target Target, receiver History) *Pruner {
p := &Pruner{
args: args{
WithLogger(ctx, GetLogger(ctx).WithField("prune_side", "receiver")),
false, // senseless here anyways
state: Plan,
return p
//go:generate enumer -type=State
type State int
const (
Plan State = 1 << iota
func (s State) statefunc() state {
var statemap = map[State]state{
Plan: statePlan,
PlanWait: statePlanWait,
Exec: stateExec,
ExecWait: stateExecWait,
ErrPerm: nil,
Done: nil,
return statemap[s]
func (s State) IsTerminal() bool {
return s.statefunc() == nil
type updater func(func(*Pruner)) State
type state func(args *args, u updater) state
func (p *Pruner) Prune() {
func (p *Pruner) prune(args args) {
s := p.state.statefunc()
for s != nil {
pre := p.state
s = s(&args, func(f func(*Pruner)) State {
defer p.mtx.Unlock()
return p.state
post := p.state
WithField("transition", fmt.Sprintf("%s=>%s", pre, post)).
Debug("state transition")
if err := p.Error(); err != nil {
WithField("state", post.String()).
Error("entering error state after error")
type Report struct {
State string
SleepUntil time.Time
Error string
Pending, Completed []FSReport
type FSReport struct {
Filesystem string
SnapshotList, DestroyList []SnapshotReport
ErrorCount int
LastError string
type SnapshotReport struct {
Name string
Replicated bool
Date time.Time
func (p *Pruner) Report() *Report {
defer p.mtx.Unlock()
r := Report{State: p.state.String()}
if p.state & PlanWait|ExecWait != 0 {
r.SleepUntil = p.sleepUntil
if p.state & PlanWait|ExecWait|ErrPerm != 0 {
if p.err != nil {
r.Error = p.err.Error()
if p.execQueue != nil {
r.Pending, r.Completed = p.execQueue.Report()
return &r
func (p *Pruner) State() State {
defer p.mtx.Unlock()
return p.state
func (p *Pruner) Error() error {
defer p.mtx.Unlock()
if p.state & (PlanWait|ExecWait|ErrPerm) != 0 {
return p.err
return nil
type fs struct {
path string
// snapshots presented by target
// (type snapshot)
snaps []pruning.Snapshot
// destroy list returned by pruning.PruneSnapshots(snaps)
// (type snapshot)
destroyList []pruning.Snapshot
mtx sync.RWMutex
// only during Exec state, also used by execQueue
execErrLast error
execErrCount int
func (f *fs) Report() FSReport {
defer f.mtx.Unlock()
r := FSReport{}
r.Filesystem = f.path
r.ErrorCount = f.execErrCount
if f.execErrLast != nil {
r.LastError = f.execErrLast.Error()
r.SnapshotList = make([]SnapshotReport, len(f.snaps))
for i, snap := range f.snaps {
r.SnapshotList[i] = snap.(snapshot).Report()
r.DestroyList = make([]SnapshotReport, len(f.destroyList))
for i, snap := range f.destroyList{
r.DestroyList[i] = snap.(snapshot).Report()
return r
type snapshot struct {
replicated bool
date time.Time
fsv *pdu.FilesystemVersion
func (s snapshot) Report() SnapshotReport {
return SnapshotReport{
Name: s.Name(),
Replicated: s.Replicated(),
Date: s.Date(),
var _ pruning.Snapshot = snapshot{}
func (s snapshot) Name() string { return s.fsv.Name }
func (s snapshot) Replicated() bool { return s.replicated }
func (s snapshot) Date() time.Time { return s.date }
func shouldRetry(e error) bool {
if neterr, ok := e.(net.Error); ok {
return neterr.Temporary()
return false
func onErr(u updater, e error) state {
return u(func(p *Pruner) {
p.err = e
if !shouldRetry(e) {
p.state = ErrPerm
switch p.state {
case Plan:
p.state = PlanWait
case Exec:
p.state = ExecWait
func statePlan(a *args, u updater) state {
ctx, target, receiver := a.ctx, a.target, a.receiver
tfss, err := target.ListFilesystems(ctx)
if err != nil {
return onErr(u, err)
pfss := make([]*fs, len(tfss))
for i, tfs := range tfss {
l := GetLogger(ctx).WithField("fs", tfs.Path)
l.Debug("plan filesystem")
pfs := &fs{
path: tfs.Path,
pfss[i] = pfs
tfsvs, err := target.ListFilesystemVersions(ctx, tfs.Path)
if err != nil {
l.WithError(err).Error("cannot list filesystem versions")
return onErr(u, err)
pfs.snaps = make([]pruning.Snapshot, 0, len(tfsvs))
rcReq := &pdu.ReplicationCursorReq{
Filesystem: tfs.Path,
Op: &pdu.ReplicationCursorReq_Get{
Get: &pdu.ReplicationCursorReq_GetOp{},
rc, err := receiver.ReplicationCursor(ctx, rcReq)
if err != nil {
l.WithError(err).Error("cannot get replication cursor")
return onErr(u, err)
if rc.GetError() != "" {
l.WithField("reqErr", rc.GetError()).Error("cannot get replication cursor")
return onErr(u, err)
// scan from older to newer, all snapshots older than cursor are interpreted as replicated
sort.Slice(tfsvs, func(i, j int) bool {
return tfsvs[i].CreateTXG < tfsvs[j].CreateTXG
haveCursorSnapshot := false
for _, tfsv := range tfsvs {
if tfsv.Type != pdu.FilesystemVersion_Snapshot {
if tfsv.Guid == rc.GetGuid() {
haveCursorSnapshot = true
preCursor := haveCursorSnapshot
for _, tfsv := range tfsvs {
if tfsv.Type != pdu.FilesystemVersion_Snapshot {
creation, err := tfsv.CreationAsTime()
if err != nil {
err := fmt.Errorf("%s%s has invalid creation date: %s", tfs, tfsv.RelName(), err)
WithField("tfsv", tfsv.RelName()).
Error("error with fileesystem version")
return onErr(u, err)
// note that we cannot use CreateTXG because target and receiver could be on different pools
atCursor := tfsv.Guid == rc.GetGuid()
preCursor = preCursor && !atCursor
pfs.snaps = append(pfs.snaps, snapshot{
replicated: preCursor || (a.considerSnapAtCursorReplicated && atCursor),
date: creation,
fsv: tfsv,
if preCursor {
err := fmt.Errorf("replication cursor not found in prune target filesystem versions")
return onErr(u, err)
// Apply prune rules
pfs.destroyList = pruning.PruneSnapshots(pfs.snaps, a.rules)
return u(func(pruner *Pruner) {
pruner.execQueue = newExecQueue(len(pfss))
for _, pfs := range pfss {
pruner.execQueue.Put(pfs, nil, false)
pruner.state = Exec
func stateExec(a *args, u updater) state {
var pfs *fs
state := u(func(pruner *Pruner) {
pfs = pruner.execQueue.Pop()
if pfs == nil {
nextState := Done
if pruner.execQueue.HasCompletedFSWithErrors() {
nextState = ErrPerm
pruner.state = nextState
if state != Exec {
return state.statefunc()
destroyList := make([]*pdu.FilesystemVersion, len(pfs.destroyList))
for i := range destroyList {
destroyList[i] = pfs.destroyList[i].(snapshot).fsv
WithField("fs", pfs.path).
WithField("destroy_snap", destroyList[i].Name).
Debug("policy destroys snapshot")
req := pdu.DestroySnapshotsReq{
Filesystem: pfs.path,
Snapshots: destroyList,
GetLogger(a.ctx).WithField("fs", pfs.path).Debug("destroying snapshots")
res, err := a.target.DestroySnapshots(a.ctx, &req)
if err != nil {
u(func(pruner *Pruner) {
pruner.execQueue.Put(pfs, err, false)
return onErr(u, err)
// check if all snapshots were destroyed
destroyResults := make(map[string]*pdu.DestroySnapshotRes)
for _, fsres := range res.Results {
destroyResults[fsres.Snapshot.Name] = fsres
err = nil
destroyFails := make([]*pdu.DestroySnapshotRes, 0)
for _, reqDestroy := range destroyList {
res, ok := destroyResults[reqDestroy.Name]
if !ok {
err = fmt.Errorf("missing destroy-result for %s", reqDestroy.RelName())
} else if res.Error != "" {
destroyFails = append(destroyFails, res)
if err == nil && len(destroyFails) > 0 {
names := make([]string, len(destroyFails))
pairs := make([]string, len(destroyFails))
allSame := true
lastMsg := destroyFails[0].Error
for i := 0; i < len(destroyFails); i++{
allSame = allSame && destroyFails[i].Error == lastMsg
relname := destroyFails[i].Snapshot.RelName()
names[i] = relname
pairs[i] = fmt.Sprintf("(%s: %s)", relname, destroyFails[i].Error)
if allSame {
err = fmt.Errorf("destroys failed %s: %s",
strings.Join(names, ", "), lastMsg)
} else {
err = fmt.Errorf("destroys failed: %s", strings.Join(pairs, ", "))
u(func(pruner *Pruner) {
pruner.execQueue.Put(pfs, err, err == nil)
if err != nil {
GetLogger(a.ctx).WithError(err).Error("target could not destroy snapshots")
return onErr(u, err)
return u(func(pruner *Pruner) {
func stateExecWait(a *args, u updater) state {
return doWait(Exec, a, u)
func statePlanWait(a *args, u updater) state {
return doWait(Plan, a, u)
func doWait(goback State, a *args, u updater) state {
timer := time.NewTimer(a.retryWait)
defer timer.Stop()
select {
case <-timer.C:
return u(func(pruner *Pruner) {
pruner.state = goback
case <-a.ctx.Done():
return onErr(u, a.ctx.Err())