zrepl/daemon/pruner/pruner.go

580 lines
15 KiB
Go
Raw Permalink Normal View History

2018-08-29 19:00:45 +02:00
package pruner
import (
"context"
2018-08-30 11:49:06 +02:00
"fmt"
2019-03-22 19:41:12 +01:00
"sort"
"strings"
"sync"
"time"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
2019-03-22 19:41:12 +01:00
"github.com/zrepl/zrepl/config"
2018-08-30 11:49:06 +02:00
"github.com/zrepl/zrepl/logger"
2018-08-29 19:00:45 +02:00
"github.com/zrepl/zrepl/pruning"
"github.com/zrepl/zrepl/replication/logic/pdu"
"github.com/zrepl/zrepl/util/envconst"
2018-08-29 19:00:45 +02:00
)
// Try to keep it compatible with gitub.com/zrepl/zrepl/endpoint.Endpoint
2018-08-30 11:49:06 +02:00
type History interface {
ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error)
ListFilesystems(ctx context.Context, req *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error)
2018-08-29 19:00:45 +02:00
}
// Try to keep it compatible with gitub.com/zrepl/zrepl/endpoint.Endpoint
2018-08-29 19:00:45 +02:00
type Target interface {
ListFilesystems(ctx context.Context, req *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error)
ListFilesystemVersions(ctx context.Context, req *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error)
2018-08-30 11:49:06 +02:00
DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error)
2018-08-29 19:00:45 +02:00
}
type Logger = logger.Logger
type contextKey int
const contextKeyLogger contextKey = 0
func WithLogger(ctx context.Context, log Logger) context.Context {
return context.WithValue(ctx, contextKeyLogger, log)
}
2018-08-30 11:49:06 +02:00
func GetLogger(ctx context.Context) Logger {
2018-08-29 19:00:45 +02:00
if l, ok := ctx.Value(contextKeyLogger).(Logger); ok {
return l
}
return logger.NewNullLogger()
}
type args struct {
ctx context.Context
target Target
receiver History
rules []pruning.KeepRule
retryWait time.Duration
considerSnapAtCursorReplicated bool
2019-03-22 19:41:12 +01:00
promPruneSecs prometheus.Observer
2018-08-29 19:00:45 +02:00
}
type Pruner struct {
args args
mtx sync.RWMutex
state State
// State PlanErr
2019-03-22 19:41:12 +01:00
err error
2018-08-29 19:00:45 +02:00
// State Exec
execQueue *execQueue
2018-08-29 19:00:45 +02:00
}
type PrunerFactory struct {
senderRules []pruning.KeepRule
receiverRules []pruning.KeepRule
retryWait time.Duration
considerSnapAtCursorReplicated bool
2019-03-22 19:41:12 +01:00
promPruneSecs *prometheus.HistogramVec
}
2019-03-17 21:18:25 +01:00
type LocalPrunerFactory struct {
keepRules []pruning.KeepRule
retryWait time.Duration
promPruneSecs *prometheus.HistogramVec
}
2019-03-17 21:18:25 +01:00
func NewLocalPrunerFactory(in config.PruningLocal, promPruneSecs *prometheus.HistogramVec) (*LocalPrunerFactory, error) {
rules, err := pruning.RulesFromConfig(in.Keep)
if err != nil {
return nil, errors.Wrap(err, "cannot build pruning rules")
}
for _, r := range in.Keep {
if _, ok := r.Ret.(*config.PruneKeepNotReplicated); ok {
// rule NotReplicated for a local pruner doesn't make sense
// because no replication happens with that job type
return nil, fmt.Errorf("single-site pruner cannot support `not_replicated` keep rule")
}
}
2019-03-17 21:18:25 +01:00
f := &LocalPrunerFactory{
keepRules: rules,
retryWait: envconst.Duration("ZREPL_PRUNER_RETRY_INTERVAL", 10*time.Second),
promPruneSecs: promPruneSecs,
}
return f, nil
}
func NewPrunerFactory(in config.PruningSenderReceiver, promPruneSecs *prometheus.HistogramVec) (*PrunerFactory, error) {
keepRulesReceiver, err := pruning.RulesFromConfig(in.KeepReceiver)
if err != nil {
return nil, errors.Wrap(err, "cannot build receiver pruning rules")
}
keepRulesSender, err := pruning.RulesFromConfig(in.KeepSender)
if err != nil {
return nil, errors.Wrap(err, "cannot build sender pruning rules")
}
considerSnapAtCursorReplicated := false
for _, r := range in.KeepSender {
knr, ok := r.Ret.(*config.PruneKeepNotReplicated)
if !ok {
continue
}
considerSnapAtCursorReplicated = considerSnapAtCursorReplicated || !knr.KeepSnapshotAtCursor
}
f := &PrunerFactory{
2019-03-22 19:41:12 +01:00
senderRules: keepRulesSender,
receiverRules: keepRulesReceiver,
retryWait: envconst.Duration("ZREPL_PRUNER_RETRY_INTERVAL", 10*time.Second),
considerSnapAtCursorReplicated: considerSnapAtCursorReplicated,
2019-03-22 19:41:12 +01:00
promPruneSecs: promPruneSecs,
}
return f, nil
}
func (f *PrunerFactory) BuildSenderPruner(ctx context.Context, target Target, receiver History) *Pruner {
p := &Pruner{
args: args{
WithLogger(ctx, GetLogger(ctx).WithField("prune_side", "sender")),
target,
receiver,
f.senderRules,
f.retryWait,
f.considerSnapAtCursorReplicated,
f.promPruneSecs.WithLabelValues("sender"),
},
state: Plan,
}
return p
}
func (f *PrunerFactory) BuildReceiverPruner(ctx context.Context, target Target, receiver History) *Pruner {
2018-08-29 19:00:45 +02:00
p := &Pruner{
args: args{
WithLogger(ctx, GetLogger(ctx).WithField("prune_side", "receiver")),
target,
receiver,
f.receiverRules,
f.retryWait,
false, // senseless here anyways
f.promPruneSecs.WithLabelValues("receiver"),
},
2018-08-29 19:00:45 +02:00
state: Plan,
}
return p
}
2019-03-17 21:18:25 +01:00
func (f *LocalPrunerFactory) BuildLocalPruner(ctx context.Context, target Target, receiver History) *Pruner {
p := &Pruner{
args: args{
ctx,
target,
receiver,
f.keepRules,
f.retryWait,
false, // considerSnapAtCursorReplicated is not relevant for local pruning
2018-11-21 04:26:03 +01:00
f.promPruneSecs.WithLabelValues("local"),
},
state: Plan,
}
return p
}
//go:generate enumer -type=State
2018-08-29 19:00:45 +02:00
type State int
const (
Plan State = 1 << iota
PlanErr
2018-08-29 19:00:45 +02:00
Exec
ExecErr
2018-08-29 19:00:45 +02:00
Done
)
type updater func(func(*Pruner))
2018-08-29 19:00:45 +02:00
func (p *Pruner) Prune() {
2018-08-29 19:00:45 +02:00
p.prune(p.args)
}
func (p *Pruner) prune(args args) {
u := func(f func(*Pruner)) {
2019-03-22 19:41:12 +01:00
p.mtx.Lock()
defer p.mtx.Unlock()
f(p)
}
// TODO support automatic retries
// It is advisable to merge this code with package replication/driver before
// That will likely require re-modelling struct fs like replication/driver.attempt,
// including figuring out how to resume a plan after being interrupted by network errors
// The non-retrying code in this package should move straight to replication/logic.
doOneAttempt(&args, u)
2019-03-22 19:41:12 +01:00
}
2018-08-29 19:00:45 +02:00
type Report struct {
State string
Error string
Pending, Completed []FSReport
}
type FSReport struct {
Filesystem string
SnapshotList, DestroyList []SnapshotReport
SkipReason FSSkipReason
LastError string
}
type SnapshotReport struct {
2019-03-22 19:41:12 +01:00
Name string
Replicated bool
2019-03-22 19:41:12 +01:00
Date time.Time
}
func (p *Pruner) Report() *Report {
p.mtx.Lock()
defer p.mtx.Unlock()
r := Report{State: p.state.String()}
2019-03-22 19:41:12 +01:00
if p.err != nil {
r.Error = p.err.Error()
}
if p.execQueue != nil {
r.Pending, r.Completed = p.execQueue.Report()
}
return &r
2018-08-29 19:00:45 +02:00
}
func (p *Pruner) State() State {
p.mtx.Lock()
defer p.mtx.Unlock()
return p.state
}
2018-08-29 19:00:45 +02:00
type fs struct {
2019-03-22 19:41:12 +01:00
path string
// permanent error during planning
planErr error
planErrContext string
// if != "", the fs was skipped for planning and the field
// contains the reason
skipReason FSSkipReason
// snapshots presented by target
// (type snapshot)
2018-08-29 19:00:45 +02:00
snaps []pruning.Snapshot
// destroy list returned by pruning.PruneSnapshots(snaps)
// (type snapshot)
destroyList []pruning.Snapshot
2018-08-29 19:00:45 +02:00
mtx sync.RWMutex
// only during Exec state, also used by execQueue
execErrLast error
}
type FSSkipReason string
const (
NotSkipped = ""
SkipPlaceholder = "filesystem is placeholder"
SkipNoCorrespondenceOnSender = "filesystem has no correspondence on sender"
)
func (r FSSkipReason) NotSkipped() bool {
return r == NotSkipped
2018-08-29 19:00:45 +02:00
}
func (f *fs) Report() FSReport {
f.mtx.Lock()
defer f.mtx.Unlock()
r := FSReport{}
r.Filesystem = f.path
r.SkipReason = f.skipReason
if !r.SkipReason.NotSkipped() {
return r
}
if f.planErr != nil {
r.LastError = f.planErr.Error()
2019-03-22 19:41:12 +01:00
} else if f.execErrLast != nil {
r.LastError = f.execErrLast.Error()
}
r.SnapshotList = make([]SnapshotReport, len(f.snaps))
for i, snap := range f.snaps {
r.SnapshotList[i] = snap.(snapshot).Report()
}
r.DestroyList = make([]SnapshotReport, len(f.destroyList))
2019-03-22 19:41:12 +01:00
for i, snap := range f.destroyList {
r.DestroyList[i] = snap.(snapshot).Report()
}
return r
}
2018-08-29 19:00:45 +02:00
type snapshot struct {
replicated bool
2018-08-30 11:49:06 +02:00
date time.Time
fsv *pdu.FilesystemVersion
2018-08-29 19:00:45 +02:00
}
func (s snapshot) Report() SnapshotReport {
return SnapshotReport{
Name: s.Name(),
Replicated: s.Replicated(),
Date: s.Date(),
}
}
2018-08-29 19:00:45 +02:00
var _ pruning.Snapshot = snapshot{}
func (s snapshot) Name() string { return s.fsv.Name }
func (s snapshot) Replicated() bool { return s.replicated }
func (s snapshot) Date() time.Time { return s.date }
func doOneAttempt(a *args, u updater) {
2018-08-29 19:00:45 +02:00
ctx, target, receiver := a.ctx, a.target, a.receiver
sfssres, err := receiver.ListFilesystems(ctx, &pdu.ListFilesystemReq{})
if err != nil {
u(func(p *Pruner) {
p.state = PlanErr
p.err = err
})
return
}
sfss := make(map[string]*pdu.Filesystem)
for _, sfs := range sfssres.GetFilesystems() {
sfss[sfs.GetPath()] = sfs
}
tfssres, err := target.ListFilesystems(ctx, &pdu.ListFilesystemReq{})
2018-08-29 19:00:45 +02:00
if err != nil {
u(func(p *Pruner) {
p.state = PlanErr
p.err = err
})
return
2018-08-29 19:00:45 +02:00
}
tfss := tfssres.GetFilesystems()
2018-08-29 19:00:45 +02:00
2018-09-05 02:25:10 +02:00
pfss := make([]*fs, len(tfss))
tfss_loop:
2018-08-29 19:00:45 +02:00
for i, tfs := range tfss {
l := GetLogger(ctx).WithField("fs", tfs.Path)
l.Debug("plan filesystem")
pfs := &fs{
path: tfs.Path,
}
pfss[i] = pfs
if tfs.GetIsPlaceholder() {
pfs.skipReason = SkipPlaceholder
l.WithField("skip_reason", pfs.skipReason).Debug("skipping filesystem")
continue
} else if sfs := sfss[tfs.GetPath()]; sfs == nil {
pfs.skipReason = SkipNoCorrespondenceOnSender
l.WithField("skip_reason", pfs.skipReason).WithField("sfs", sfs.GetPath()).Debug("skipping filesystem")
continue
}
pfsPlanErrAndLog := func(err error, message string) {
t := fmt.Sprintf("%T", err)
pfs.planErr = err
pfs.planErrContext = message
l.WithField("orig_err_type", t).WithError(err).Error(fmt.Sprintf("%s: plan error, skipping filesystem", message))
}
tfsvsres, err := target.ListFilesystemVersions(ctx, &pdu.ListFilesystemVersionsReq{Filesystem: tfs.Path})
2018-08-29 19:00:45 +02:00
if err != nil {
pfsPlanErrAndLog(err, "cannot list filesystem versions")
continue tfss_loop
2018-08-29 19:00:45 +02:00
}
tfsvs := tfsvsres.GetVersions()
// no progress here since we could run in a live-lock (must have used target AND receiver before progress)
pfs.snaps = make([]pruning.Snapshot, 0, len(tfsvs))
2018-08-29 19:00:45 +02:00
rcReq := &pdu.ReplicationCursorReq{
Filesystem: tfs.Path,
}
rc, err := receiver.ReplicationCursor(ctx, rcReq)
if err != nil {
pfsPlanErrAndLog(err, "cannot get replication cursor bookmark")
continue tfss_loop
}
if rc.GetNotexist() {
err := errors.New("replication cursor bookmark does not exist (one successful replication is required before pruning works)")
pfsPlanErrAndLog(err, "")
continue tfss_loop
}
// scan from older to newer, all snapshots older than cursor are interpreted as replicated
sort.Slice(tfsvs, func(i, j int) bool {
return tfsvs[i].CreateTXG < tfsvs[j].CreateTXG
})
haveCursorSnapshot := false
for _, tfsv := range tfsvs {
if tfsv.Type != pdu.FilesystemVersion_Snapshot {
continue
}
if tfsv.Guid == rc.GetGuid() {
haveCursorSnapshot = true
}
}
preCursor := haveCursorSnapshot
2018-08-29 19:00:45 +02:00
for _, tfsv := range tfsvs {
if tfsv.Type != pdu.FilesystemVersion_Snapshot {
continue
}
creation, err := tfsv.CreationAsTime()
if err != nil {
err := fmt.Errorf("%s: %s", tfsv.RelName(), err)
pfsPlanErrAndLog(err, "fs version with invalid creation date")
continue tfss_loop
}
// note that we cannot use CreateTXG because target and receiver could be on different pools
atCursor := tfsv.Guid == rc.GetGuid()
preCursor = preCursor && !atCursor
2018-08-29 19:00:45 +02:00
pfs.snaps = append(pfs.snaps, snapshot{
replicated: preCursor || (a.considerSnapAtCursorReplicated && atCursor),
2018-08-29 19:00:45 +02:00
date: creation,
2018-08-30 11:49:06 +02:00
fsv: tfsv,
2018-08-29 19:00:45 +02:00
})
}
if preCursor {
pfsPlanErrAndLog(fmt.Errorf("replication cursor not found in prune target filesystem versions"), "")
continue tfss_loop
}
2018-08-29 19:00:45 +02:00
// Apply prune rules
pfs.destroyList = pruning.PruneSnapshots(pfs.snaps, a.rules)
2018-08-29 19:00:45 +02:00
}
u(func(pruner *Pruner) {
pruner.execQueue = newExecQueue(len(pfss))
2018-08-29 19:00:45 +02:00
for _, pfs := range pfss {
pruner.execQueue.Put(pfs, nil, false)
2018-08-29 19:00:45 +02:00
}
pruner.state = Exec
})
2018-08-29 19:00:45 +02:00
for {
2019-03-22 19:41:12 +01:00
var pfs *fs
u(func(pruner *Pruner) {
2019-03-22 19:41:12 +01:00
pfs = pruner.execQueue.Pop()
})
if pfs == nil {
break
}
doOneAttemptExec(a, u, pfs)
}
var rep *Report
{
// must not hold lock for report
var pruner *Pruner
u(func(p *Pruner) {
pruner = p
})
rep = pruner.Report()
}
u(func(p *Pruner) {
if len(rep.Pending) > 0 {
panic("queue should not have pending items at this point")
}
hadErr := false
for _, fsr := range rep.Completed {
hadErr = hadErr || fsr.SkipReason.NotSkipped() && fsr.LastError != ""
2019-03-22 19:41:12 +01:00
}
if hadErr {
p.state = ExecErr
} else {
p.state = Done
2018-08-29 19:00:45 +02:00
}
})
2019-03-22 19:41:12 +01:00
}
2018-08-29 19:00:45 +02:00
// attempts to exec pfs, puts it back into the queue with the result
func doOneAttemptExec(a *args, u updater, pfs *fs) {
destroyList := make([]*pdu.FilesystemVersion, len(pfs.destroyList))
2018-08-29 19:00:45 +02:00
for i := range destroyList {
destroyList[i] = pfs.destroyList[i].(snapshot).fsv
2018-08-30 11:49:06 +02:00
GetLogger(a.ctx).
WithField("fs", pfs.path).
WithField("destroy_snap", destroyList[i].Name).
Debug("policy destroys snapshot")
2018-08-29 19:00:45 +02:00
}
2018-08-30 11:49:06 +02:00
req := pdu.DestroySnapshotsReq{
Filesystem: pfs.path,
Snapshots: destroyList,
}
GetLogger(a.ctx).WithField("fs", pfs.path).Debug("destroying snapshots")
res, err := a.target.DestroySnapshots(a.ctx, &req)
if err != nil {
u(func(pruner *Pruner) {
pruner.execQueue.Put(pfs, err, false)
})
return
}
// check if all snapshots were destroyed
destroyResults := make(map[string]*pdu.DestroySnapshotRes)
for _, fsres := range res.Results {
destroyResults[fsres.Snapshot.Name] = fsres
}
err = nil
destroyFails := make([]*pdu.DestroySnapshotRes, 0)
for _, reqDestroy := range destroyList {
2019-03-22 19:41:12 +01:00
res, ok := destroyResults[reqDestroy.Name]
if !ok {
err = fmt.Errorf("missing destroy-result for %s", reqDestroy.RelName())
break
} else if res.Error != "" {
destroyFails = append(destroyFails, res)
}
}
if err == nil && len(destroyFails) > 0 {
names := make([]string, len(destroyFails))
pairs := make([]string, len(destroyFails))
allSame := true
lastMsg := destroyFails[0].Error
2019-03-22 19:41:12 +01:00
for i := 0; i < len(destroyFails); i++ {
allSame = allSame && destroyFails[i].Error == lastMsg
relname := destroyFails[i].Snapshot.RelName()
names[i] = relname
pairs[i] = fmt.Sprintf("(%s: %s)", relname, destroyFails[i].Error)
}
if allSame {
err = fmt.Errorf("destroys failed %s: %s",
strings.Join(names, ", "), lastMsg)
} else {
err = fmt.Errorf("destroys failed: %s", strings.Join(pairs, ", "))
}
}
u(func(pruner *Pruner) {
pruner.execQueue.Put(pfs, err, err == nil)
})
if err != nil {
GetLogger(a.ctx).WithError(err).Error("target could not destroy snapshots")
return
2018-08-29 19:00:45 +02:00
}
}