diff --git a/daemon/job/push.go b/daemon/job/push.go index f234981..a292847 100644 --- a/daemon/job/push.go +++ b/daemon/job/push.go @@ -11,6 +11,8 @@ import ( "github.com/zrepl/zrepl/endpoint" "github.com/zrepl/zrepl/replication" "sync" + "github.com/zrepl/zrepl/daemon/pruner" + "github.com/zrepl/zrepl/pruning" ) type Push struct { @@ -18,6 +20,9 @@ type Push struct { connecter streamrpc.Connecter fsfilter endpoint.FSFilter + keepRulesSender []pruning.KeepRule + keepRulesReceiver []pruning.KeepRule + mtx sync.Mutex replication *replication.Replication } @@ -83,4 +88,14 @@ func (j *Push) do(ctx context.Context) { ctx = logging.WithSubsystemLoggers(ctx, log) rep.Drive(ctx, sender, receiver) + + // Prune sender + senderPruner := pruner.NewPruner(sender, receiver, j.keepRulesSender) + senderPruner.Prune(ctx) + + // Prune receiver + receiverPruner := pruner.NewPruner(receiver, receiver, j.keepRulesReceiver) + receiverPruner.Prune(ctx) + } + diff --git a/daemon/logging/build_logging.go b/daemon/logging/build_logging.go index f673033..c6bea78 100644 --- a/daemon/logging/build_logging.go +++ b/daemon/logging/build_logging.go @@ -13,6 +13,7 @@ import ( "github.com/zrepl/zrepl/replication" "github.com/zrepl/zrepl/tlsconf" "os" + "github.com/zrepl/zrepl/daemon/pruner" ) func OutletsFromConfig(in []config.LoggingOutletEnum) (*logger.Outlets, error) { @@ -67,6 +68,7 @@ func WithSubsystemLoggers(ctx context.Context, log logger.Logger) context.Contex ctx = replication.WithLogger(ctx, log.WithField(SubsysField, "repl")) ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{log.WithField(SubsysField, "rpc")}) ctx = endpoint.WithLogger(ctx, log.WithField(SubsysField, "endpoint")) + ctx = pruner.WithLogger(ctx, log.WithField(SubsysField, "pruning")) return ctx } diff --git a/daemon/pruner/pruner.go b/daemon/pruner/pruner.go new file mode 100644 index 0000000..303ba14 --- /dev/null +++ b/daemon/pruner/pruner.go @@ -0,0 +1,297 @@ +package pruner + +import ( + "context" + "github.com/zrepl/zrepl/pruning" + "github.com/zrepl/zrepl/replication/pdu" + "sync" + "time" + "fmt" + "net" + "github.com/zrepl/zrepl/logger" +) + +// Try to keep it compatible with gitub.com/zrepl/zrepl/replication.Endpoint +type Receiver interface { + HasFilesystemVersion(ctx context.Context, fs string, version *pdu.FilesystemVersion) (bool, error) +} + +type Target interface { + ListFilesystems(ctx context.Context) ([]*pdu.Filesystem, error) + ListFilesystemVersions(ctx context.Context, fs string) ([]*pdu.FilesystemVersion, error) // fix depS + DestroySnapshots(ctx context.Context, fs string, snaps []*pdu.FilesystemVersion) ([]*pdu.FilesystemVersion, error) +} + +type Logger = logger.Logger + +type contextKey int + +const contextKeyLogger contextKey = 0 + +func WithLogger(ctx context.Context, log Logger) context.Context { + return context.WithValue(ctx, contextKeyLogger, log) +} + +func getLogger(ctx context.Context) Logger { + if l, ok := ctx.Value(contextKeyLogger).(Logger); ok { + return l + } + return logger.NewNullLogger() +} + +type args struct { + ctx context.Context + target Target + receiver Receiver + rules []pruning.KeepRule + retryWait time.Duration +} + +type Pruner struct { + + args args + + mtx sync.RWMutex + + state State + + // State ErrWait|ErrPerm + sleepUntil time.Time + err error + + // State Exec + prunePending []fs + pruneCompleted []fs + +} + +func NewPruner(retryWait time.Duration, target Target, receiver Receiver, rules []pruning.KeepRule) *Pruner { + p := &Pruner{ + args: args{nil, target, receiver, rules, retryWait}, // ctx is filled in Prune() + state: Plan, + } + return p +} + +//go:generate stringer -type=State +type State int + +const ( + Plan State = 1 << iota + PlanWait + Exec + ExecWait + ErrPerm + Done +) + + +func (s State) statefunc() state { + var statemap = map[State]state{ + Plan: statePlan, + PlanWait: statePlanWait, + Exec: stateExec, + ExecWait: stateExecWait, + ErrPerm: nil, + Done: nil, + } + return statemap[s] +} + +type updater func(func(*Pruner)) State +type state func(args *args, u updater) state + +func (p *Pruner) Prune(ctx context.Context) { + p.args.ctx = ctx + p.prune(p.args) +} + +func (p *Pruner) prune(args args) { + s := p.state.statefunc() + for s != nil { + pre := p.state + s = s(&args, func(f func(*Pruner)) State { + p.mtx.Lock() + defer p.mtx.Unlock() + f(p) + return p.state + }) + post := p.state + getLogger(args.ctx). + WithField("transition", fmt.Sprintf("%s=>%s", pre, post)). + Debug("state transition") + } +} + +func (p *Pruner) Report() interface{} { + return nil // FIXME TODO +} + +type fs struct { + path string + snaps []pruning.Snapshot + + mtx sync.RWMutex + // for Plan + err error +} + +func (f* fs) Update(err error) { + f.mtx.Lock() + defer f.mtx.Unlock() + f.err = err +} + +type snapshot struct { + replicated bool + date time.Time + fsv *pdu.FilesystemVersion +} + +var _ pruning.Snapshot = snapshot{} + +func (s snapshot) Name() string { return s.fsv.Name } + +func (s snapshot) Replicated() bool { return s.replicated } + +func (s snapshot) Date() time.Time { return s.date } + +func shouldRetry(e error) bool { + switch e.(type) { + case nil: + return true + case net.Error: + return true + } + return false +} + +func onErr(u updater, e error) state { + return u(func(p *Pruner) { + p.err = e + if !shouldRetry(e) { + p.state = ErrPerm + return + } + switch p.state { + case Plan: p.state = PlanWait + case Exec: p.state = ExecWait + default: panic(p.state) + } + }).statefunc() +} + +func statePlan(a *args, u updater) state { + + ctx, target, receiver := a.ctx, a.target, a.receiver + + tfss, err := target.ListFilesystems(ctx) + if err != nil { + return onErr(u, err) + } + + pfss := make([]fs, len(tfss)) + for i, tfs := range tfss { + tfsvs, err := target.ListFilesystemVersions(ctx, tfs.Path) + if err != nil { + return onErr(u, err) + } + + pfs := fs{ + path: tfs.Path, + snaps: make([]pruning.Snapshot, 0, len(tfsvs)), + } + + for _, tfsv := range tfsvs { + if tfsv.Type != pdu.FilesystemVersion_Snapshot { + continue + } + creation, err := tfsv.CreationAsTime() + if err != nil { + return onErr(u, fmt.Errorf("%s%s has invalid creation date: %s", tfs, tfsv.RelName(), err)) + } + replicated, err := receiver.HasFilesystemVersion(ctx, tfs.Path, tfsv) + if err != nil && shouldRetry(err) { + return onErr(u, err) + } else if err != nil { + pfs.err = err + pfs.snaps = nil + break + } + pfs.snaps = append(pfs.snaps, snapshot{ + replicated: replicated, + date: creation, + fsv: tfsv, + }) + + } + + pfss[i] = pfs + + } + + return u(func(pruner *Pruner) { + for _, pfs := range pfss { + if pfs.err != nil { + pruner.pruneCompleted = append(pruner.pruneCompleted, pfs) + } else { + pruner.prunePending = append(pruner.prunePending, pfs) + } + } + pruner.state = Exec + }).statefunc() +} + +func stateExec(a *args, u updater) state { + + var pfs fs + state := u(func(pruner *Pruner) { + if len(pruner.prunePending) == 0 { + pruner.state = Done + return + } + pfs = pruner.prunePending[0] + }) + if state != Exec { + return state.statefunc() + } + + destroyListI := pruning.PruneSnapshots(pfs.snaps, a.rules) + destroyList := make([]*pdu.FilesystemVersion, len(destroyListI)) + for i := range destroyList { + destroyList[i] = destroyListI[i].(snapshot).fsv + } + pfs.Update(nil) + _, err := a.target.DestroySnapshots(a.ctx, pfs.path, destroyList) + pfs.Update(err) + if err != nil && shouldRetry(err) { + return onErr(u, err) + } + // if it's not retryable, treat is like as being done + + return u(func(pruner *Pruner) { + pruner.pruneCompleted = append(pruner.pruneCompleted, pfs) + pruner.prunePending = pruner.prunePending[1:] + }).statefunc() +} + +func stateExecWait(a *args, u updater) state { + return doWait(Exec, a, u) +} + +func statePlanWait(a *args, u updater) state { + return doWait(Plan, a, u) +} + +func doWait(goback State, a *args, u updater) state { + timer := time.NewTimer(a.retryWait) + defer timer.Stop() + select { + case <-timer.C: + return u(func(pruner *Pruner) { + pruner.state = goback + }).statefunc() + case <-a.ctx.Done(): + return onErr(u, a.ctx.Err()) + } +} diff --git a/daemon/pruner/pruner_test.go b/daemon/pruner/pruner_test.go new file mode 100644 index 0000000..6b560ef --- /dev/null +++ b/daemon/pruner/pruner_test.go @@ -0,0 +1,192 @@ +package pruner + +import ( + "testing" + "github.com/zrepl/zrepl/replication/pdu" + "context" + "github.com/zrepl/zrepl/pruning" + "fmt" + "time" + "github.com/stretchr/testify/assert" + "net" + "github.com/zrepl/zrepl/logger" +) + +type mockFS struct { + path string + snaps []string +} + +func (m *mockFS) Filesystem() *pdu.Filesystem { + return &pdu.Filesystem{ + Path: m.path, + } +} + +func (m *mockFS) FilesystemVersions() []*pdu.FilesystemVersion { + versions := make([]*pdu.FilesystemVersion, len(m.snaps)) + for i, v := range m.snaps { + versions[i] = &pdu.FilesystemVersion{ + Type: pdu.FilesystemVersion_Snapshot, + Name: v, + Creation: pdu.FilesystemVersionCreation(time.Unix(0, 0)), + } + } + return versions +} + +type mockTarget struct { + fss []mockFS + destroyed map[string][]string + listVersionsErrs map[string][]error + listFilesystemsErr []error + destroyErrs map[string][]error +} + +func (t *mockTarget) ListFilesystems(ctx context.Context) ([]*pdu.Filesystem, error) { + if len(t.listFilesystemsErr) > 0 { + e := t.listFilesystemsErr[0] + t.listFilesystemsErr = t.listFilesystemsErr[1:] + return nil, e + } + fss := make([]*pdu.Filesystem, len(t.fss)) + for i := range fss { + fss[i] = t.fss[i].Filesystem() + } + return fss, nil +} + +func (t *mockTarget) ListFilesystemVersions(ctx context.Context, fs string) ([]*pdu.FilesystemVersion, error) { + if len(t.listVersionsErrs[fs]) != 0 { + e := t.listVersionsErrs[fs][0] + t.listVersionsErrs[fs] = t.listVersionsErrs[fs][1:] + return nil, e + } + + for _, mfs := range t.fss { + if mfs.path != fs { + continue + } + return mfs.FilesystemVersions(), nil + } + return nil, fmt.Errorf("filesystem %s does not exist", fs) +} + +func (t *mockTarget) DestroySnapshots(ctx context.Context, fs string, snaps []*pdu.FilesystemVersion) ([]*pdu.FilesystemVersion, error) { + if len(t.destroyErrs[fs]) != 0 { + e := t.destroyErrs[fs][0] + t.destroyErrs[fs] = t.destroyErrs[fs][1:] + return nil, e + } + destroyed := t.destroyed[fs] + for _, s := range snaps { + destroyed = append(destroyed, s.Name) + } + t.destroyed[fs] = destroyed + return snaps, nil +} + +type mockReceiver struct { + fss []mockFS + errs map[string][]error +} + +func (r *mockReceiver) HasFilesystemVersion(ctx context.Context, fs string, version *pdu.FilesystemVersion) (bool, error) { + + if len(r.errs[fs]) > 0 { + e := r.errs[fs][0] + r.errs[fs] = r.errs[fs][1:] + return false, e + } + + for _, mfs := range r.fss { + if mfs.path != fs { + continue + } + for _, v := range mfs.FilesystemVersions() { + if v.Type == version.Type && v.Name == v.Name && v.CreateTXG == version.CreateTXG { + return true, nil + } + } + } + return false, nil +} + +func TestPruner_Prune(t *testing.T) { + + var _ net.Error = &net.OpError{} // we use it below + target := &mockTarget{ + listFilesystemsErr: []error{ + &net.OpError{Op: "fakerror0"}, + }, + listVersionsErrs: map[string][]error{ + "zroot/foo": { + &net.OpError{Op: "fakeerror1"}, // should be classified as temporaty + &net.OpError{Op: "fakeerror2"}, + }, + }, + destroyErrs: map[string][]error{ + "zroot/foo": { + fmt.Errorf("permanent error"), + }, + "zroot/bar": { + &net.OpError{Op: "fakeerror3"}, + }, + }, + destroyed: make(map[string][]string), + fss: []mockFS{ + { + path: "zroot/foo", + snaps: []string{ + "keep_a", + "keep_b", + "drop_c", + "keep_d", + }, + }, + { + path: "zroot/bar", + snaps: []string{ + "keep_e", + "keep_f", + "drop_g", + }, + }, + { + path: "zroot/baz", + snaps: []string{ + "keep_h", + "drop_i", + }, + }, + }, + } + receiver := &mockReceiver{ + errs: map[string][]error{ + "zroot/foo": { + &net.OpError{Op: "fakeerror4"}, + }, + "zroot/baz": { + fmt.Errorf("permanent error2"), + }, + }, + } + + keepRules := []pruning.KeepRule{pruning.MustKeepRegex("^keep")} + + p := NewPruner(10*time.Millisecond, target, receiver, keepRules) + ctx := context.Background() + ctx = WithLogger(ctx, logger.NewTestLogger(t)) + p.Prune(ctx) + + exp := map[string][]string{ + "zroot/bar":{"drop_g"}, + // drop_c is prohibited by failing destroy + // drop_i is prohibiteed by failing HasFilesystemVersion call + } + + assert.Equal(t, exp, target.destroyed) + + //assert.Equal(t, map[string][]error{}, target.listVersionsErrs, "retried") + +} diff --git a/daemon/pruner/state_string.go b/daemon/pruner/state_string.go new file mode 100644 index 0000000..68e04e1 --- /dev/null +++ b/daemon/pruner/state_string.go @@ -0,0 +1,35 @@ +// Code generated by "stringer -type=State"; DO NOT EDIT. + +package pruner + +import "strconv" + +const ( + _State_name_0 = "PlanPlanWait" + _State_name_1 = "Exec" + _State_name_2 = "ExecWait" + _State_name_3 = "ErrPerm" + _State_name_4 = "Done" +) + +var ( + _State_index_0 = [...]uint8{0, 4, 12} +) + +func (i State) String() string { + switch { + case 1 <= i && i <= 2: + i -= 1 + return _State_name_0[_State_index_0[i]:_State_index_0[i+1]] + case i == 4: + return _State_name_1 + case i == 8: + return _State_name_2 + case i == 16: + return _State_name_3 + case i == 32: + return _State_name_4 + default: + return "State(" + strconv.FormatInt(int64(i), 10) + ")" + } +} diff --git a/logger/testlogger.go b/logger/testlogger.go new file mode 100644 index 0000000..c416109 --- /dev/null +++ b/logger/testlogger.go @@ -0,0 +1,28 @@ +package logger + +import ( + "testing" +) + +type testLogger struct { + Logger +} + +type testingLoggerOutlet struct { + t *testing.T +} + +func (o testingLoggerOutlet) WriteEntry(entry Entry) error { + o.t.Logf("%#v", entry) + return nil +} + +var _ Logger = testLogger{} + +func NewTestLogger(t *testing.T) Logger { + outlets := NewOutlets() + outlets.Add(&testingLoggerOutlet{t}, Debug) + return &testLogger{ + Logger: NewLogger(outlets, 0), + } +} diff --git a/pruning/pruning.go b/pruning/pruning.go index 97b866b..2a754ec 100644 --- a/pruning/pruning.go +++ b/pruning/pruning.go @@ -14,6 +14,7 @@ type Snapshot interface { Date() time.Time } +// The returned snapshot list is guaranteed to only contains elements of input parameter snaps func PruneSnapshots(snaps []Snapshot, keepRules []KeepRule) []Snapshot { if len(keepRules) == 0 { diff --git a/replication/pdu/pdu_extras.go b/replication/pdu/pdu_extras.go index b009d3b..4d718a8 100644 --- a/replication/pdu/pdu_extras.go +++ b/replication/pdu/pdu_extras.go @@ -41,6 +41,10 @@ func FilesystemVersionFromZFS(fsv zfs.FilesystemVersion) *FilesystemVersion { } } +func FilesystemVersionCreation(t time.Time) string { + return t.Format(time.RFC3339) +} + func (v *FilesystemVersion) CreationAsTime() (time.Time, error) { return time.Parse(time.RFC3339, v.Creation) }