fixup pruner

This commit is contained in:
Christian Schwarz 2018-08-30 11:49:06 +02:00
parent d684302864
commit 12dd240b5f
2 changed files with 70 additions and 52 deletions

View File

@ -2,24 +2,24 @@ package pruner
import ( import (
"context" "context"
"fmt"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/pruning" "github.com/zrepl/zrepl/pruning"
"github.com/zrepl/zrepl/replication/pdu" "github.com/zrepl/zrepl/replication/pdu"
"net"
"sync" "sync"
"time" "time"
"fmt"
"net"
"github.com/zrepl/zrepl/logger"
) )
// Try to keep it compatible with gitub.com/zrepl/zrepl/replication.Endpoint // Try to keep it compatible with gitub.com/zrepl/zrepl/replication.Endpoint
type Receiver interface { type History interface {
HasFilesystemVersion(ctx context.Context, fs string, version *pdu.FilesystemVersion) (bool, error) SnapshotReplicationStatus(ctx context.Context, req *pdu.SnapshotReplicationStatusReq) (*pdu.SnapshotReplicationStatusRes, error)
} }
type Target interface { type Target interface {
ListFilesystems(ctx context.Context) ([]*pdu.Filesystem, error) ListFilesystems(ctx context.Context) ([]*pdu.Filesystem, error)
ListFilesystemVersions(ctx context.Context, fs string) ([]*pdu.FilesystemVersion, error) // fix depS ListFilesystemVersions(ctx context.Context, fs string) ([]*pdu.FilesystemVersion, error) // fix depS
DestroySnapshots(ctx context.Context, fs string, snaps []*pdu.FilesystemVersion) ([]*pdu.FilesystemVersion, error) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error)
} }
type Logger = logger.Logger type Logger = logger.Logger
@ -32,7 +32,7 @@ func WithLogger(ctx context.Context, log Logger) context.Context {
return context.WithValue(ctx, contextKeyLogger, log) return context.WithValue(ctx, contextKeyLogger, log)
} }
func getLogger(ctx context.Context) Logger { func GetLogger(ctx context.Context) Logger {
if l, ok := ctx.Value(contextKeyLogger).(Logger); ok { if l, ok := ctx.Value(contextKeyLogger).(Logger); ok {
return l return l
} }
@ -42,13 +42,12 @@ func getLogger(ctx context.Context) Logger {
type args struct { type args struct {
ctx context.Context ctx context.Context
target Target target Target
receiver Receiver receiver History
rules []pruning.KeepRule rules []pruning.KeepRule
retryWait time.Duration retryWait time.Duration
} }
type Pruner struct { type Pruner struct {
args args args args
mtx sync.RWMutex mtx sync.RWMutex
@ -62,10 +61,9 @@ type Pruner struct {
// State Exec // State Exec
prunePending []fs prunePending []fs
pruneCompleted []fs pruneCompleted []fs
} }
func NewPruner(retryWait time.Duration, target Target, receiver Receiver, rules []pruning.KeepRule) *Pruner { func NewPruner(retryWait time.Duration, target Target, receiver History, rules []pruning.KeepRule) *Pruner {
p := &Pruner{ p := &Pruner{
args: args{nil, target, receiver, rules, retryWait}, // ctx is filled in Prune() args: args{nil, target, receiver, rules, retryWait}, // ctx is filled in Prune()
state: Plan, state: Plan,
@ -85,7 +83,6 @@ const (
Done Done
) )
func (s State) statefunc() state { func (s State) statefunc() state {
var statemap = map[State]state{ var statemap = map[State]state{
Plan: statePlan, Plan: statePlan,
@ -117,7 +114,7 @@ func (p *Pruner) prune(args args) {
return p.state return p.state
}) })
post := p.state post := p.state
getLogger(args.ctx). GetLogger(args.ctx).
WithField("transition", fmt.Sprintf("%s=>%s", pre, post)). WithField("transition", fmt.Sprintf("%s=>%s", pre, post)).
Debug("state transition") Debug("state transition")
} }
@ -136,7 +133,7 @@ type fs struct {
err error err error
} }
func (f* fs) Update(err error) { func (f *fs) Update(err error) {
f.mtx.Lock() f.mtx.Lock()
defer f.mtx.Unlock() defer f.mtx.Unlock()
f.err = err f.err = err
@ -174,9 +171,12 @@ func onErr(u updater, e error) state {
return return
} }
switch p.state { switch p.state {
case Plan: p.state = PlanWait case Plan:
case Exec: p.state = ExecWait p.state = PlanWait
default: panic(p.state) case Exec:
p.state = ExecWait
default:
panic(p.state)
} }
}).statefunc() }).statefunc()
} }
@ -210,7 +210,12 @@ func statePlan(a *args, u updater) state {
if err != nil { if err != nil {
return onErr(u, fmt.Errorf("%s%s has invalid creation date: %s", tfs, tfsv.RelName(), err)) return onErr(u, fmt.Errorf("%s%s has invalid creation date: %s", tfs, tfsv.RelName(), err))
} }
replicated, err := receiver.HasFilesystemVersion(ctx, tfs.Path, tfsv) req := pdu.SnapshotReplicationStatusReq{
Filesystem: tfs.Path,
Snapshot: tfsv.Name,
Op: pdu.SnapshotReplicationStatusReq_Get,
}
res, err := receiver.SnapshotReplicationStatus(ctx, &req)
if err != nil && shouldRetry(err) { if err != nil && shouldRetry(err) {
return onErr(u, err) return onErr(u, err)
} else if err != nil { } else if err != nil {
@ -218,8 +223,9 @@ func statePlan(a *args, u updater) state {
pfs.snaps = nil pfs.snaps = nil
break break
} }
pfs.snaps = append(pfs.snaps, snapshot{ pfs.snaps = append(pfs.snaps, snapshot{
replicated: replicated, replicated: res.Replicated,
date: creation, date: creation,
fsv: tfsv, fsv: tfsv,
}) })
@ -256,13 +262,22 @@ func stateExec(a *args, u updater) state {
return state.statefunc() return state.statefunc()
} }
GetLogger(a.ctx).Debug(fmt.Sprintf("%#v", a.rules))
destroyListI := pruning.PruneSnapshots(pfs.snaps, a.rules) destroyListI := pruning.PruneSnapshots(pfs.snaps, a.rules)
destroyList := make([]*pdu.FilesystemVersion, len(destroyListI)) destroyList := make([]*pdu.FilesystemVersion, len(destroyListI))
for i := range destroyList { for i := range destroyList {
destroyList[i] = destroyListI[i].(snapshot).fsv destroyList[i] = destroyListI[i].(snapshot).fsv
GetLogger(a.ctx).
WithField("fs", pfs.path).
WithField("destroy_snap", destroyList[i].Name).
Debug("policy destroys snapshot")
} }
pfs.Update(nil) pfs.Update(nil)
_, err := a.target.DestroySnapshots(a.ctx, pfs.path, destroyList) req := pdu.DestroySnapshotsReq{
Filesystem: pfs.path,
Snapshots: destroyList,
}
_, err := a.target.DestroySnapshots(a.ctx, &req)
pfs.Update(err) pfs.Update(err)
if err != nil && shouldRetry(err) { if err != nil && shouldRetry(err) {
return onErr(u, err) return onErr(u, err)

View File

@ -1,15 +1,15 @@
package pruner package pruner
import ( import (
"testing"
"github.com/zrepl/zrepl/replication/pdu"
"context" "context"
"github.com/zrepl/zrepl/pruning"
"fmt" "fmt"
"time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"net"
"github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/pruning"
"github.com/zrepl/zrepl/replication/pdu"
"net"
"testing"
"time"
) )
type mockFS struct { type mockFS struct {
@ -72,26 +72,29 @@ func (t *mockTarget) ListFilesystemVersions(ctx context.Context, fs string) ([]*
return nil, fmt.Errorf("filesystem %s does not exist", fs) return nil, fmt.Errorf("filesystem %s does not exist", fs)
} }
func (t *mockTarget) DestroySnapshots(ctx context.Context, fs string, snaps []*pdu.FilesystemVersion) ([]*pdu.FilesystemVersion, error) { func (t *mockTarget) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) {
fs, snaps := req.Filesystem, req.Snapshots
if len(t.destroyErrs[fs]) != 0 { if len(t.destroyErrs[fs]) != 0 {
e := t.destroyErrs[fs][0] e := t.destroyErrs[fs][0]
t.destroyErrs[fs] = t.destroyErrs[fs][1:] t.destroyErrs[fs] = t.destroyErrs[fs][1:]
return nil, e return nil, e
} }
destroyed := t.destroyed[fs] destroyed := t.destroyed[fs]
for _, s := range snaps { res := make([]*pdu.DestroySnapshotRes, len(snaps))
for i, s := range snaps {
destroyed = append(destroyed, s.Name) destroyed = append(destroyed, s.Name)
res[i] = &pdu.DestroySnapshotRes{Error: "", Snapshot: s}
} }
t.destroyed[fs] = destroyed t.destroyed[fs] = destroyed
return snaps, nil return &pdu.DestroySnapshotsRes{Results: res}, nil
} }
type mockReceiver struct { type mockHistory struct {
fss []mockFS fss []mockFS
errs map[string][]error errs map[string][]error
} }
func (r *mockReceiver) HasFilesystemVersion(ctx context.Context, fs string, version *pdu.FilesystemVersion) (bool, error) { func (r *mockHistory) WasSnapshotReplicated(ctx context.Context, fs string, version *pdu.FilesystemVersion) (bool, error) {
if len(r.errs[fs]) > 0 { if len(r.errs[fs]) > 0 {
e := r.errs[fs][0] e := r.errs[fs][0]
@ -161,7 +164,7 @@ func TestPruner_Prune(t *testing.T) {
}, },
}, },
} }
receiver := &mockReceiver{ history := &mockHistory{
errs: map[string][]error{ errs: map[string][]error{
"zroot/foo": { "zroot/foo": {
&net.OpError{Op: "fakeerror4"}, &net.OpError{Op: "fakeerror4"},
@ -174,15 +177,15 @@ func TestPruner_Prune(t *testing.T) {
keepRules := []pruning.KeepRule{pruning.MustKeepRegex("^keep")} keepRules := []pruning.KeepRule{pruning.MustKeepRegex("^keep")}
p := NewPruner(10*time.Millisecond, target, receiver, keepRules) p := NewPruner(10*time.Millisecond, target, history, keepRules)
ctx := context.Background() ctx := context.Background()
ctx = WithLogger(ctx, logger.NewTestLogger(t)) ctx = WithLogger(ctx, logger.NewTestLogger(t))
p.Prune(ctx) p.Prune(ctx)
exp := map[string][]string{ exp := map[string][]string{
"zroot/bar":{"drop_g"}, "zroot/bar": {"drop_g"},
// drop_c is prohibited by failing destroy // drop_c is prohibited by failing destroy
// drop_i is prohibiteed by failing HasFilesystemVersion call // drop_i is prohibiteed by failing WasSnapshotReplicated call
} }
assert.Equal(t, exp, target.destroyed) assert.Equal(t, exp, target.destroyed)