diff --git a/daemon/pruner/pruner.go b/daemon/pruner/pruner.go index 303ba14..8041cc0 100644 --- a/daemon/pruner/pruner.go +++ b/daemon/pruner/pruner.go @@ -2,24 +2,24 @@ package pruner import ( "context" + "fmt" + "github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/pruning" "github.com/zrepl/zrepl/replication/pdu" + "net" "sync" "time" - "fmt" - "net" - "github.com/zrepl/zrepl/logger" ) // Try to keep it compatible with gitub.com/zrepl/zrepl/replication.Endpoint -type Receiver interface { - HasFilesystemVersion(ctx context.Context, fs string, version *pdu.FilesystemVersion) (bool, error) +type History interface { + SnapshotReplicationStatus(ctx context.Context, req *pdu.SnapshotReplicationStatusReq) (*pdu.SnapshotReplicationStatusRes, error) } type Target interface { ListFilesystems(ctx context.Context) ([]*pdu.Filesystem, error) ListFilesystemVersions(ctx context.Context, fs string) ([]*pdu.FilesystemVersion, error) // fix depS - DestroySnapshots(ctx context.Context, fs string, snaps []*pdu.FilesystemVersion) ([]*pdu.FilesystemVersion, error) + DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) } type Logger = logger.Logger @@ -32,7 +32,7 @@ func WithLogger(ctx context.Context, log Logger) context.Context { return context.WithValue(ctx, contextKeyLogger, log) } -func getLogger(ctx context.Context) Logger { +func GetLogger(ctx context.Context) Logger { if l, ok := ctx.Value(contextKeyLogger).(Logger); ok { return l } @@ -40,15 +40,14 @@ func getLogger(ctx context.Context) Logger { } type args struct { - ctx context.Context - target Target - receiver Receiver - rules []pruning.KeepRule - retryWait time.Duration + ctx context.Context + target Target + receiver History + rules []pruning.KeepRule + retryWait time.Duration } type Pruner struct { - args args mtx sync.RWMutex @@ -60,14 +59,13 @@ type Pruner struct { err error // State Exec - prunePending []fs + prunePending []fs pruneCompleted []fs - } -func NewPruner(retryWait time.Duration, target Target, receiver Receiver, rules []pruning.KeepRule) *Pruner { +func NewPruner(retryWait time.Duration, target Target, receiver History, rules []pruning.KeepRule) *Pruner { p := &Pruner{ - args: args{nil, target, receiver, rules, retryWait}, // ctx is filled in Prune() + args: args{nil, target, receiver, rules, retryWait}, // ctx is filled in Prune() state: Plan, } return p @@ -85,7 +83,6 @@ const ( Done ) - func (s State) statefunc() state { var statemap = map[State]state{ Plan: statePlan, @@ -117,7 +114,7 @@ func (p *Pruner) prune(args args) { return p.state }) post := p.state - getLogger(args.ctx). + GetLogger(args.ctx). WithField("transition", fmt.Sprintf("%s=>%s", pre, post)). Debug("state transition") } @@ -128,7 +125,7 @@ func (p *Pruner) Report() interface{} { } type fs struct { - path string + path string snaps []pruning.Snapshot mtx sync.RWMutex @@ -136,7 +133,7 @@ type fs struct { err error } -func (f* fs) Update(err error) { +func (f *fs) Update(err error) { f.mtx.Lock() defer f.mtx.Unlock() f.err = err @@ -144,8 +141,8 @@ func (f* fs) Update(err error) { type snapshot struct { replicated bool - date time.Time - fsv *pdu.FilesystemVersion + date time.Time + fsv *pdu.FilesystemVersion } var _ pruning.Snapshot = snapshot{} @@ -174,9 +171,12 @@ func onErr(u updater, e error) state { return } switch p.state { - case Plan: p.state = PlanWait - case Exec: p.state = ExecWait - default: panic(p.state) + case Plan: + p.state = PlanWait + case Exec: + p.state = ExecWait + default: + panic(p.state) } }).statefunc() } @@ -210,7 +210,12 @@ func statePlan(a *args, u updater) state { if err != nil { return onErr(u, fmt.Errorf("%s%s has invalid creation date: %s", tfs, tfsv.RelName(), err)) } - replicated, err := receiver.HasFilesystemVersion(ctx, tfs.Path, tfsv) + req := pdu.SnapshotReplicationStatusReq{ + Filesystem: tfs.Path, + Snapshot: tfsv.Name, + Op: pdu.SnapshotReplicationStatusReq_Get, + } + res, err := receiver.SnapshotReplicationStatus(ctx, &req) if err != nil && shouldRetry(err) { return onErr(u, err) } else if err != nil { @@ -218,10 +223,11 @@ func statePlan(a *args, u updater) state { pfs.snaps = nil break } + pfs.snaps = append(pfs.snaps, snapshot{ - replicated: replicated, + replicated: res.Replicated, date: creation, - fsv: tfsv, + fsv: tfsv, }) } @@ -256,15 +262,24 @@ func stateExec(a *args, u updater) state { return state.statefunc() } + GetLogger(a.ctx).Debug(fmt.Sprintf("%#v", a.rules)) destroyListI := pruning.PruneSnapshots(pfs.snaps, a.rules) destroyList := make([]*pdu.FilesystemVersion, len(destroyListI)) for i := range destroyList { destroyList[i] = destroyListI[i].(snapshot).fsv + GetLogger(a.ctx). + WithField("fs", pfs.path). + WithField("destroy_snap", destroyList[i].Name). + Debug("policy destroys snapshot") } pfs.Update(nil) - _, err := a.target.DestroySnapshots(a.ctx, pfs.path, destroyList) + req := pdu.DestroySnapshotsReq{ + Filesystem: pfs.path, + Snapshots: destroyList, + } + _, err := a.target.DestroySnapshots(a.ctx, &req) pfs.Update(err) - if err != nil && shouldRetry(err) { + if err != nil && shouldRetry(err) { return onErr(u, err) } // if it's not retryable, treat is like as being done diff --git a/daemon/pruner/pruner_test.go b/daemon/pruner/pruner_test.go index 6b560ef..ce8fb7c 100644 --- a/daemon/pruner/pruner_test.go +++ b/daemon/pruner/pruner_test.go @@ -1,15 +1,15 @@ package pruner import ( - "testing" - "github.com/zrepl/zrepl/replication/pdu" "context" - "github.com/zrepl/zrepl/pruning" "fmt" - "time" "github.com/stretchr/testify/assert" - "net" "github.com/zrepl/zrepl/logger" + "github.com/zrepl/zrepl/pruning" + "github.com/zrepl/zrepl/replication/pdu" + "net" + "testing" + "time" ) type mockFS struct { @@ -27,8 +27,8 @@ func (m *mockFS) FilesystemVersions() []*pdu.FilesystemVersion { versions := make([]*pdu.FilesystemVersion, len(m.snaps)) for i, v := range m.snaps { versions[i] = &pdu.FilesystemVersion{ - Type: pdu.FilesystemVersion_Snapshot, - Name: v, + Type: pdu.FilesystemVersion_Snapshot, + Name: v, Creation: pdu.FilesystemVersionCreation(time.Unix(0, 0)), } } @@ -36,11 +36,11 @@ func (m *mockFS) FilesystemVersions() []*pdu.FilesystemVersion { } type mockTarget struct { - fss []mockFS - destroyed map[string][]string - listVersionsErrs map[string][]error + fss []mockFS + destroyed map[string][]string + listVersionsErrs map[string][]error listFilesystemsErr []error - destroyErrs map[string][]error + destroyErrs map[string][]error } func (t *mockTarget) ListFilesystems(ctx context.Context) ([]*pdu.Filesystem, error) { @@ -72,26 +72,29 @@ func (t *mockTarget) ListFilesystemVersions(ctx context.Context, fs string) ([]* return nil, fmt.Errorf("filesystem %s does not exist", fs) } -func (t *mockTarget) DestroySnapshots(ctx context.Context, fs string, snaps []*pdu.FilesystemVersion) ([]*pdu.FilesystemVersion, error) { +func (t *mockTarget) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) { + fs, snaps := req.Filesystem, req.Snapshots if len(t.destroyErrs[fs]) != 0 { e := t.destroyErrs[fs][0] t.destroyErrs[fs] = t.destroyErrs[fs][1:] return nil, e } destroyed := t.destroyed[fs] - for _, s := range snaps { + res := make([]*pdu.DestroySnapshotRes, len(snaps)) + for i, s := range snaps { destroyed = append(destroyed, s.Name) + res[i] = &pdu.DestroySnapshotRes{Error: "", Snapshot: s} } t.destroyed[fs] = destroyed - return snaps, nil + return &pdu.DestroySnapshotsRes{Results: res}, nil } -type mockReceiver struct { - fss []mockFS +type mockHistory struct { + fss []mockFS errs map[string][]error } -func (r *mockReceiver) HasFilesystemVersion(ctx context.Context, fs string, version *pdu.FilesystemVersion) (bool, error) { +func (r *mockHistory) WasSnapshotReplicated(ctx context.Context, fs string, version *pdu.FilesystemVersion) (bool, error) { if len(r.errs[fs]) > 0 { e := r.errs[fs][0] @@ -161,7 +164,7 @@ func TestPruner_Prune(t *testing.T) { }, }, } - receiver := &mockReceiver{ + history := &mockHistory{ errs: map[string][]error{ "zroot/foo": { &net.OpError{Op: "fakeerror4"}, @@ -174,15 +177,15 @@ func TestPruner_Prune(t *testing.T) { keepRules := []pruning.KeepRule{pruning.MustKeepRegex("^keep")} - p := NewPruner(10*time.Millisecond, target, receiver, keepRules) + p := NewPruner(10*time.Millisecond, target, history, keepRules) ctx := context.Background() ctx = WithLogger(ctx, logger.NewTestLogger(t)) p.Prune(ctx) exp := map[string][]string{ - "zroot/bar":{"drop_g"}, + "zroot/bar": {"drop_g"}, // drop_c is prohibited by failing destroy - // drop_i is prohibiteed by failing HasFilesystemVersion call + // drop_i is prohibiteed by failing WasSnapshotReplicated call } assert.Equal(t, exp, target.destroyed)