pruner: remove retry handling + fix early give-up

Retry handling is broken since the gRPC changes (wrong error classification).
Will come back at some point, hopefully by merging the replication
driver retry infrastructure.

However, the simpler architecture allows an easy fix for the problem
that the pruner practically gave up on the first error it encountered.

fixes #123
This commit is contained in:
Christian Schwarz 2019-03-13 20:50:03 +01:00
parent d78d20e2d0
commit 7584c66bdb
5 changed files with 108 additions and 394 deletions

View File

@ -446,9 +446,6 @@ func (t *tui) renderPrunerReport(r *pruner.Report) {
if r.Error != "" {
t.printf("Error: %s\n", r.Error)
}
if r.SleepUntil.After(time.Now()) {
t.printf("Sleeping until %s (%s left)\n", r.SleepUntil, r.SleepUntil.Sub(time.Now()))
}
type commonFS struct {
*pruner.FSReport
@ -464,8 +461,7 @@ func (t *tui) renderPrunerReport(r *pruner.Report) {
switch state {
case pruner.Plan: fallthrough
case pruner.PlanWait: fallthrough
case pruner.ErrPerm:
case pruner.PlanErr:
return
}
@ -510,7 +506,13 @@ func (t *tui) renderPrunerReport(r *pruner.Report) {
continue
}
if fs.LastError != "" {
t.printf("ERROR (%d): %s\n", fs.ErrorCount, fs.LastError) // whitespace is padding
if strings.ContainsAny(fs.LastError, "\r\n") {
t.printf("ERROR:")
t.printfDrawIndentedAndWrappedIfMultiline("%s\n", fs.LastError)
} else {
t.printfDrawIndentedAndWrappedIfMultiline("ERROR: %s\n", fs.LastError)
}
t.newline()
continue
}

View File

@ -11,7 +11,6 @@ import (
"github.com/zrepl/zrepl/replication/logic/pdu"
"github.com/zrepl/zrepl/util/envconst"
"github.com/zrepl/zrepl/util/watchdog"
"net"
"sort"
"strings"
"sync"
@ -67,8 +66,7 @@ type Pruner struct {
state State
// State ErrWait|ErrPerm
sleepUntil time.Time
// State PlanErr
err error
// State Exec
@ -162,62 +160,34 @@ type State int
const (
Plan State = 1 << iota
PlanWait
PlanErr
Exec
ExecWait
ErrPerm
ExecErr
Done
)
func (s State) statefunc() state {
var statemap = map[State]state{
Plan: statePlan,
PlanWait: statePlanWait,
Exec: stateExec,
ExecWait: stateExecWait,
ErrPerm: nil,
Done: nil,
}
return statemap[s]
}
func (s State) IsTerminal() bool {
return s.statefunc() == nil
}
type updater func(func(*Pruner)) State
type state func(args *args, u updater) state
type updater func(func(*Pruner))
func (p *Pruner) Prune() {
p.prune(p.args)
}
func (p *Pruner) prune(args args) {
s := p.state.statefunc()
for s != nil {
pre := p.state
s = s(&args, func(f func(*Pruner)) State {
u := func(f func(*Pruner)) {
p.mtx.Lock()
defer p.mtx.Unlock()
f(p)
return p.state
})
post := p.state
GetLogger(args.ctx).
WithField("transition", fmt.Sprintf("%s=>%s", pre, post)).
Debug("state transition")
if err := p.Error(); err != nil {
GetLogger(args.ctx).
WithError(p.err).
WithField("state", post.String()).
Error("entering error state after error")
}
// TODO support automatic retries
// It is advisable to merge this code with package replication/driver before
// That will likely require re-modelling struct fs like replication/driver.attempt,
// including figuring out how to resume a plan after being interrupted by network errors
// The non-retrying code in this package should move straight to replication/logic.
doOneAttempt(&args, u)
}
}
type Report struct {
State string
SleepUntil time.Time
Error string
Pending, Completed []FSReport
}
@ -225,7 +195,6 @@ type Report struct {
type FSReport struct {
Filesystem string
SnapshotList, DestroyList []SnapshotReport
ErrorCount int
SkipReason FSSkipReason
LastError string
}
@ -242,14 +211,9 @@ func (p *Pruner) Report() *Report {
r := Report{State: p.state.String()}
if p.state & (PlanWait|ExecWait) != 0 {
r.SleepUntil = p.sleepUntil
}
if p.state & (PlanWait|ExecWait|ErrPerm) != 0 {
if p.err != nil {
r.Error = p.err.Error()
}
}
if p.execQueue != nil {
r.Pending, r.Completed = p.execQueue.Report()
@ -264,20 +228,12 @@ func (p *Pruner) State() State {
return p.state
}
func (p *Pruner) Error() error {
p.mtx.Lock()
defer p.mtx.Unlock()
if p.state & (PlanWait|ExecWait|ErrPerm) != 0 {
return p.err
}
return nil
}
type fs struct {
path string
// permanent error during planning
planErr error
planErrContext string
// if != "", the fs was skipped for planning and the field
// contains the reason
@ -294,7 +250,6 @@ type fs struct {
// only during Exec state, also used by execQueue
execErrLast error
execErrCount int
}
type FSSkipReason string
@ -315,7 +270,6 @@ func (f *fs) Report() FSReport {
r := FSReport{}
r.Filesystem = f.path
r.ErrorCount = f.execErrCount
r.SkipReason = f.skipReason
if !r.SkipReason.NotSkipped() {
return r
@ -362,39 +316,7 @@ func (s snapshot) Replicated() bool { return s.replicated }
func (s snapshot) Date() time.Time { return s.date }
type Error interface {
error
Temporary() bool
}
var _ Error = net.Error(nil)
func shouldRetry(e error) bool {
if neterr, ok := e.(net.Error); ok {
return neterr.Temporary()
}
return false
}
func onErr(u updater, e error) state {
return u(func(p *Pruner) {
p.err = e
if !shouldRetry(e) {
p.state = ErrPerm
return
}
switch p.state {
case Plan:
p.state = PlanWait
case Exec:
p.state = ExecWait
default:
panic(p.state)
}
}).statefunc()
}
func statePlan(a *args, u updater) state {
func doOneAttempt(a *args, u updater) {
ctx, target, receiver := a.ctx, a.target, a.receiver
var ka *watchdog.KeepAlive
@ -404,7 +326,11 @@ func statePlan(a *args, u updater) state {
sfssres, err := receiver.ListFilesystems(ctx, &pdu.ListFilesystemReq{})
if err != nil {
return onErr(u, err)
u(func(p *Pruner) {
p.state = PlanErr
p.err = err
})
return
}
sfss := make(map[string]*pdu.Filesystem)
for _, sfs := range sfssres.GetFilesystems() {
@ -413,17 +339,21 @@ func statePlan(a *args, u updater) state {
tfssres, err := target.ListFilesystems(ctx, &pdu.ListFilesystemReq{})
if err != nil {
return onErr(u, err)
u(func(p *Pruner) {
p.state = PlanErr
p.err = err
})
return
}
tfss := tfssres.GetFilesystems()
pfss := make([]*fs, len(tfss))
tfss_loop:
for i, tfs := range tfss {
l := GetLogger(ctx).WithField("fs", tfs.Path)
l.Debug("plan filesystem")
pfs := &fs{
path: tfs.Path,
}
@ -439,10 +369,17 @@ func statePlan(a *args, u updater) state {
continue
}
pfsPlanErrAndLog := func(err error, message string) {
t := fmt.Sprintf("%T", err)
pfs.planErr = err
pfs.planErrContext = message
l.WithField("orig_err_type", t).WithError(err).Error(fmt.Sprintf("%s: plan error, skipping filesystem", message))
}
tfsvsres, err := target.ListFilesystemVersions(ctx, &pdu.ListFilesystemVersionsReq{Filesystem: tfs.Path})
if err != nil {
l.WithError(err).Error("cannot list filesystem versions")
return onErr(u, err)
pfsPlanErrAndLog(err, "cannot list filesystem versions")
continue tfss_loop
}
tfsvs := tfsvsres.GetVersions()
// no progress here since we could run in a live-lock (must have used target AND receiver before progress)
@ -457,18 +394,16 @@ func statePlan(a *args, u updater) state {
}
rc, err := receiver.ReplicationCursor(ctx, rcReq)
if err != nil {
l.WithError(err).Error("cannot get replication cursor")
return onErr(u, err)
pfsPlanErrAndLog(err, "cannot get replication cursor bookmark")
continue tfss_loop
}
ka.MadeProgress()
if rc.GetNotexist() {
l.Error("replication cursor does not exist, skipping")
pfs.destroyList = []pruning.Snapshot{}
pfs.planErr = fmt.Errorf("replication cursor bookmark does not exist (one successful replication is required before pruning works)")
continue
err := errors.New("replication cursor bookmark does not exist (one successful replication is required before pruning works)")
pfsPlanErrAndLog(err, "")
continue tfss_loop
}
// scan from older to newer, all snapshots older than cursor are interpreted as replicated
sort.Slice(tfsvs, func(i, j int) bool {
return tfsvs[i].CreateTXG < tfsvs[j].CreateTXG
@ -490,11 +425,9 @@ func statePlan(a *args, u updater) state {
}
creation, err := tfsv.CreationAsTime()
if err != nil {
err := fmt.Errorf("%s%s has invalid creation date: %s", tfs, tfsv.RelName(), err)
l.WithError(err).
WithField("tfsv", tfsv.RelName()).
Error("error with fileesystem version")
return onErr(u, err)
err := fmt.Errorf("%s: %s", tfsv.RelName(), err)
pfsPlanErrAndLog(err, "fs version with invalid creation date")
continue tfss_loop
}
// note that we cannot use CreateTXG because target and receiver could be on different pools
atCursor := tfsv.Guid == rc.GetGuid()
@ -506,9 +439,8 @@ func statePlan(a *args, u updater) state {
})
}
if preCursor {
err := fmt.Errorf("replication cursor not found in prune target filesystem versions")
l.Error(err.Error())
return onErr(u, err)
pfsPlanErrAndLog(fmt.Errorf("replication cursor not found in prune target filesystem versions"), "")
continue tfss_loop
}
// Apply prune rules
@ -516,34 +448,56 @@ func statePlan(a *args, u updater) state {
ka.MadeProgress()
}
return u(func(pruner *Pruner) {
u(func(pruner *Pruner) {
pruner.Progress.MadeProgress()
pruner.execQueue = newExecQueue(len(pfss))
for _, pfs := range pfss {
pruner.execQueue.Put(pfs, nil, false)
}
pruner.state = Exec
}).statefunc()
}
func stateExec(a *args, u updater) state {
})
for {
var pfs *fs
state := u(func(pruner *Pruner) {
u(func(pruner *Pruner) {
pfs = pruner.execQueue.Pop()
})
if pfs == nil {
nextState := Done
if pruner.execQueue.HasCompletedFSWithErrors() {
nextState = ErrPerm
break
}
pruner.state = nextState
return
doOneAttemptExec(a, u, pfs)
}
var rep *Report
{
// must not hold lock for report
var pruner *Pruner
u(func(p *Pruner) {
pruner = p
})
rep = pruner.Report()
}
u(func(p *Pruner) {
if len(rep.Pending) > 0 {
panic("queue should not have pending items at this point")
}
hadErr := false
for _, fsr := range rep.Completed {
hadErr = hadErr || fsr.SkipReason.NotSkipped() && fsr.LastError != ""
}
if hadErr {
p.state = ExecErr
} else {
p.state = Done
}
})
if state != Exec {
return state.statefunc()
}
// attempts to exec pfs, puts it back into the queue with the result
func doOneAttemptExec(a *args, u updater, pfs *fs) {
destroyList := make([]*pdu.FilesystemVersion, len(pfs.destroyList))
for i := range destroyList {
destroyList[i] = pfs.destroyList[i].(snapshot).fsv
@ -562,7 +516,7 @@ func stateExec(a *args, u updater) state {
u(func(pruner *Pruner) {
pruner.execQueue.Put(pfs, err, false)
})
return onErr(u, err)
return
}
// check if all snapshots were destroyed
destroyResults := make(map[string]*pdu.DestroySnapshotRes)
@ -603,31 +557,6 @@ func stateExec(a *args, u updater) state {
})
if err != nil {
GetLogger(a.ctx).WithError(err).Error("target could not destroy snapshots")
return onErr(u, err)
}
return u(func(pruner *Pruner) {
pruner.Progress.MadeProgress()
}).statefunc()
}
func stateExecWait(a *args, u updater) state {
return doWait(Exec, a, u)
}
func statePlanWait(a *args, u updater) state {
return doWait(Plan, a, u)
}
func doWait(goback State, a *args, u updater) state {
timer := time.NewTimer(a.retryWait)
defer timer.Stop()
select {
case <-timer.C:
return u(func(pruner *Pruner) {
pruner.state = goback
}).statefunc()
case <-a.ctx.Done():
return onErr(u, a.ctx.Err())
return
}
}

View File

@ -58,10 +58,7 @@ func (q *execQueue) Pop() *fs {
func(q *execQueue) Put(fs *fs, err error, done bool) {
fs.mtx.Lock()
fs.execErrLast = err
if err != nil {
fs.execErrCount++
}
if done || (err != nil && !shouldRetry(fs.execErrLast)) {
if done || err != nil {
fs.mtx.Unlock()
q.mtx.Lock()
q.completed = append(q.completed, fs)
@ -78,9 +75,6 @@ func(q *execQueue) Put(fs *fs, err error, done bool) {
defer q.pending[i].mtx.Unlock()
q.pending[j].mtx.Lock()
defer q.pending[j].mtx.Unlock()
if q.pending[i].execErrCount != q.pending[j].execErrCount {
return q.pending[i].execErrCount < q.pending[j].execErrCount
}
return strings.Compare(q.pending[i].path, q.pending[j].path) == -1
})
q.mtx.Unlock()

View File

@ -1,206 +0,0 @@
package pruner
import (
"context"
"fmt"
"github.com/stretchr/testify/assert"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/pruning"
"github.com/zrepl/zrepl/replication/logic/pdu"
"net"
"testing"
"time"
)
type mockFS struct {
path string
snaps []string
}
func (m *mockFS) Filesystem() *pdu.Filesystem {
return &pdu.Filesystem{
Path: m.path,
}
}
func (m *mockFS) FilesystemVersions() []*pdu.FilesystemVersion {
versions := make([]*pdu.FilesystemVersion, len(m.snaps))
for i, v := range m.snaps {
versions[i] = &pdu.FilesystemVersion{
Type: pdu.FilesystemVersion_Snapshot,
Name: v,
Creation: pdu.FilesystemVersionCreation(time.Unix(0, 0)),
Guid: uint64(i),
}
}
return versions
}
type mockTarget struct {
fss []mockFS
destroyed map[string][]string
listVersionsErrs map[string][]error
listFilesystemsErr []error
destroyErrs map[string][]error
}
func (t *mockTarget) ListFilesystems(ctx context.Context, req *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error) {
if len(t.listFilesystemsErr) > 0 {
e := t.listFilesystemsErr[0]
t.listFilesystemsErr = t.listFilesystemsErr[1:]
return nil, e
}
fss := make([]*pdu.Filesystem, len(t.fss))
for i := range fss {
fss[i] = t.fss[i].Filesystem()
}
return &pdu.ListFilesystemRes{Filesystems: fss}, nil
}
func (t *mockTarget) ListFilesystemVersions(ctx context.Context, req *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error) {
fs := req.Filesystem
if len(t.listVersionsErrs[fs]) != 0 {
e := t.listVersionsErrs[fs][0]
t.listVersionsErrs[fs] = t.listVersionsErrs[fs][1:]
return nil, e
}
for _, mfs := range t.fss {
if mfs.path != fs {
continue
}
return &pdu.ListFilesystemVersionsRes{Versions: mfs.FilesystemVersions()}, nil
}
return nil, fmt.Errorf("filesystem %s does not exist", fs)
}
func (t *mockTarget) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) {
fs, snaps := req.Filesystem, req.Snapshots
if len(t.destroyErrs[fs]) != 0 {
e := t.destroyErrs[fs][0]
t.destroyErrs[fs] = t.destroyErrs[fs][1:]
return nil, e
}
destroyed := t.destroyed[fs]
res := make([]*pdu.DestroySnapshotRes, len(snaps))
for i, s := range snaps {
destroyed = append(destroyed, s.Name)
res[i] = &pdu.DestroySnapshotRes{Error: "", Snapshot: s}
}
t.destroyed[fs] = destroyed
return &pdu.DestroySnapshotsRes{Results: res}, nil
}
type mockCursor struct {
snapname string
guid uint64
}
type mockHistory struct {
errs map[string][]error
cursors map[string]*mockCursor
}
func (r *mockHistory) ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) {
fs := req.Filesystem
if len(r.errs[fs]) > 0 {
e := r.errs[fs][0]
r.errs[fs] = r.errs[fs][1:]
return nil, e
}
return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Guid{Guid: 0}}, nil
}
type stubNetErr struct {
msg string
temporary, timeout bool
}
var _ net.Error = stubNetErr{}
func (e stubNetErr) Error() string {
return e.msg
}
func (e stubNetErr) Temporary() bool { return e.temporary }
func (e stubNetErr) Timeout() bool { return e.timeout }
func TestPruner_Prune(t *testing.T) {
var _ net.Error = &net.OpError{} // we use it below
target := &mockTarget{
listFilesystemsErr: []error{
stubNetErr{msg: "fakerror0", temporary: true},
},
listVersionsErrs: map[string][]error{
"zroot/foo": {
stubNetErr{msg: "fakeerror1", temporary: true},
stubNetErr{msg: "fakeerror2", temporary: true,},
},
},
destroyErrs: map[string][]error{
"zroot/baz": {
stubNetErr{msg: "fakeerror3", temporary: true}, // first error puts it back in the queue
stubNetErr{msg:"permanent error"}, // so it will be last when pruner gives up due to permanent err
},
},
destroyed: make(map[string][]string),
fss: []mockFS{
{
path: "zroot/foo",
snaps: []string{
"keep_a",
"keep_b",
"drop_c",
"keep_d",
},
},
{
path: "zroot/bar",
snaps: []string{
"keep_e",
"keep_f",
"drop_g",
},
},
{
path: "zroot/baz",
snaps: []string{
"keep_h",
"drop_i",
},
},
},
}
history := &mockHistory{
errs: map[string][]error{
"zroot/foo": {
stubNetErr{msg: "fakeerror4", temporary: true},
},
},
}
keepRules := []pruning.KeepRule{pruning.MustKeepRegex("^keep", false)}
p := Pruner{
args: args{
ctx: WithLogger(context.Background(), logger.NewTestLogger(t)),
target: target,
receiver: history,
rules: keepRules,
retryWait: 10*time.Millisecond,
},
state: Plan,
}
p.Prune()
exp := map[string][]string{
"zroot/foo": {"drop_c"},
"zroot/bar": {"drop_g"},
}
assert.Equal(t, exp, target.destroyed)
//assert.Equal(t, map[string][]error{}, target.listVersionsErrs, "retried")
}

View File

@ -7,19 +7,17 @@ import (
)
const (
_StateName_0 = "PlanPlanWait"
_StateName_0 = "PlanPlanErr"
_StateName_1 = "Exec"
_StateName_2 = "ExecWait"
_StateName_3 = "ErrPerm"
_StateName_4 = "Done"
_StateName_2 = "ExecErr"
_StateName_3 = "Done"
)
var (
_StateIndex_0 = [...]uint8{0, 4, 12}
_StateIndex_0 = [...]uint8{0, 4, 11}
_StateIndex_1 = [...]uint8{0, 4}
_StateIndex_2 = [...]uint8{0, 8}
_StateIndex_3 = [...]uint8{0, 7}
_StateIndex_4 = [...]uint8{0, 4}
_StateIndex_2 = [...]uint8{0, 7}
_StateIndex_3 = [...]uint8{0, 4}
)
func (i State) String() string {
@ -33,22 +31,19 @@ func (i State) String() string {
return _StateName_2
case i == 16:
return _StateName_3
case i == 32:
return _StateName_4
default:
return fmt.Sprintf("State(%d)", i)
}
}
var _StateValues = []State{1, 2, 4, 8, 16, 32}
var _StateValues = []State{1, 2, 4, 8, 16}
var _StateNameToValueMap = map[string]State{
_StateName_0[0:4]: 1,
_StateName_0[4:12]: 2,
_StateName_0[4:11]: 2,
_StateName_1[0:4]: 4,
_StateName_2[0:8]: 8,
_StateName_3[0:7]: 16,
_StateName_4[0:4]: 32,
_StateName_2[0:7]: 8,
_StateName_3[0:4]: 16,
}
// StateString retrieves an enum value from the enum constants string name.