zrepl/daemon/snapper/snapper.go

511 lines
12 KiB
Go
Raw Normal View History

2018-09-04 23:46:02 +02:00
package snapper
import (
"context"
"fmt"
"sort"
"sync"
2019-03-22 19:41:12 +01:00
"time"
2018-09-04 23:46:02 +02:00
2019-03-22 19:41:12 +01:00
"github.com/pkg/errors"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/filters"
"github.com/zrepl/zrepl/daemon/hooks"
2019-03-22 19:41:12 +01:00
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/util/envconst"
2019-03-22 19:41:12 +01:00
"github.com/zrepl/zrepl/zfs"
)
2018-09-04 23:46:02 +02:00
//go:generate stringer -type=SnapState
type SnapState uint
const (
SnapPending SnapState = 1 << iota
SnapStarted
SnapDone
SnapError
)
// All fields protected by Snapper.mtx
2018-09-04 23:46:02 +02:00
type snapProgress struct {
state SnapState
// SnapStarted, SnapDone, SnapError
name string
startAt time.Time
hookPlan *hooks.Plan
2018-09-04 23:46:02 +02:00
// SnapDone
doneAt time.Time
// SnapErr TODO disambiguate state
runResults hooks.PlanReport
2018-09-04 23:46:02 +02:00
}
type args struct {
ctx context.Context
log Logger
prefix string
interval time.Duration
fsf *filters.DatasetMapFilter
2019-03-22 19:41:12 +01:00
snapshotsTaken chan<- struct{}
hooks *hooks.List
dryRun bool
2018-09-04 23:46:02 +02:00
}
type Snapper struct {
args args
2019-03-22 19:41:12 +01:00
mtx sync.Mutex
2018-09-04 23:46:02 +02:00
state State
// set in state Plan, used in Waiting
lastInvocation time.Time
// valid for state Snapshotting
plan map[*zfs.DatasetPath]*snapProgress
2018-09-04 23:46:02 +02:00
// valid for state SyncUp and Waiting
sleepUntil time.Time
// valid for state Err
err error
}
//go:generate stringer -type=State
type State uint
const (
2019-03-22 19:41:12 +01:00
SyncUp State = 1 << iota
SyncUpErrWait
2018-09-04 23:46:02 +02:00
Planning
Snapshotting
Waiting
ErrorWait
Stopped
2018-09-04 23:46:02 +02:00
)
func (s State) sf() state {
m := map[State]state{
2019-03-22 19:41:12 +01:00
SyncUp: syncUp,
SyncUpErrWait: wait,
2019-03-22 19:41:12 +01:00
Planning: plan,
Snapshotting: snapshot,
Waiting: wait,
ErrorWait: wait,
Stopped: nil,
2018-09-04 23:46:02 +02:00
}
return m[s]
}
type updater func(u func(*Snapper)) State
type state func(a args, u updater) state
type contextKey int
const (
contextKeyLog contextKey = 0
)
type Logger = logger.Logger
func WithLogger(ctx context.Context, log Logger) context.Context {
return context.WithValue(ctx, contextKeyLog, log)
}
func getLogger(ctx context.Context) Logger {
if log, ok := ctx.Value(contextKeyLog).(Logger); ok {
return log
}
return logger.NewNullLogger()
}
func PeriodicFromConfig(g *config.Global, fsf *filters.DatasetMapFilter, in *config.SnapshottingPeriodic) (*Snapper, error) {
if in.Prefix == "" {
2018-09-04 23:46:02 +02:00
return nil, errors.New("prefix must not be empty")
}
if in.Interval <= 0 {
return nil, errors.New("interval must be positive")
}
hookList, err := hooks.ListFromConfig(&in.Hooks)
if err != nil {
return nil, errors.Wrap(err, "hook config error")
}
2018-09-04 23:46:02 +02:00
args := args{
2019-03-22 19:41:12 +01:00
prefix: in.Prefix,
2018-09-04 23:46:02 +02:00
interval: in.Interval,
2019-03-22 19:41:12 +01:00
fsf: fsf,
hooks: hookList,
2018-09-04 23:46:02 +02:00
// ctx and log is set in Run()
}
return &Snapper{state: SyncUp, args: args}, nil
}
func (s *Snapper) Run(ctx context.Context, snapshotsTaken chan<- struct{}) {
getLogger(ctx).Debug("start")
defer getLogger(ctx).Debug("stop")
s.args.snapshotsTaken = snapshotsTaken
s.args.ctx = ctx
s.args.log = getLogger(ctx)
s.args.dryRun = false // for future expansion
2018-09-04 23:46:02 +02:00
u := func(u func(*Snapper)) State {
s.mtx.Lock()
defer s.mtx.Unlock()
if u != nil {
u(s)
}
return s.state
}
var st state = syncUp
for st != nil {
pre := u(nil)
st = st(s.args, u)
post := u(nil)
getLogger(ctx).
WithField("transition", fmt.Sprintf("%s=>%s", pre, post)).
Debug("state transition")
}
}
func onErr(err error, u updater) state {
return u(func(s *Snapper) {
s.err = err
preState := s.state
switch s.state {
case SyncUp:
s.state = SyncUpErrWait
case Planning:
fallthrough
case Snapshotting:
s.state = ErrorWait
}
s.args.log.WithError(err).WithField("pre_state", preState).WithField("post_state", s.state).Error("snapshotting error")
2018-09-04 23:46:02 +02:00
}).sf()
}
func onMainCtxDone(ctx context.Context, u updater) state {
return u(func(s *Snapper) {
s.err = ctx.Err()
s.state = Stopped
}).sf()
}
2018-09-04 23:46:02 +02:00
func syncUp(a args, u updater) state {
u(func(snapper *Snapper) {
snapper.lastInvocation = time.Now()
})
fss, err := listFSes(a.ctx, a.fsf)
2018-09-04 23:46:02 +02:00
if err != nil {
return onErr(err, u)
}
syncPoint, err := findSyncPoint(a.log, fss, a.prefix, a.interval)
if err != nil {
return onErr(err, u)
}
2019-03-22 19:41:12 +01:00
u(func(s *Snapper) {
2018-09-04 23:46:02 +02:00
s.sleepUntil = syncPoint
})
t := time.NewTimer(time.Until(syncPoint))
2018-09-04 23:46:02 +02:00
defer t.Stop()
select {
case <-t.C:
return u(func(s *Snapper) {
s.state = Planning
}).sf()
case <-a.ctx.Done():
return onMainCtxDone(a.ctx, u)
2018-09-04 23:46:02 +02:00
}
}
func plan(a args, u updater) state {
u(func(snapper *Snapper) {
snapper.lastInvocation = time.Now()
})
fss, err := listFSes(a.ctx, a.fsf)
2018-09-04 23:46:02 +02:00
if err != nil {
return onErr(err, u)
}
plan := make(map[*zfs.DatasetPath]*snapProgress, len(fss))
2018-09-04 23:46:02 +02:00
for _, fs := range fss {
plan[fs] = &snapProgress{state: SnapPending}
2018-09-04 23:46:02 +02:00
}
return u(func(s *Snapper) {
s.state = Snapshotting
s.plan = plan
s.err = nil
2018-09-04 23:46:02 +02:00
}).sf()
}
func snapshot(a args, u updater) state {
var plan map[*zfs.DatasetPath]*snapProgress
2018-09-04 23:46:02 +02:00
u(func(snapper *Snapper) {
plan = snapper.plan
})
hookMatchCount := make(map[hooks.Hook]int, len(*a.hooks))
for _, h := range *a.hooks {
hookMatchCount[h] = 0
}
anyFsHadErr := false
2018-09-04 23:46:02 +02:00
// TODO channel programs -> allow a little jitter?
for fs, progress := range plan {
suffix := time.Now().In(time.UTC).Format("20060102_150405_000")
snapname := fmt.Sprintf("%s%s", a.prefix, suffix)
l := a.log.
WithField("fs", fs.ToString()).
WithField("snap", snapname)
hookEnvExtra := hooks.Env{
hooks.EnvFS: fs.ToString(),
hooks.EnvSnapshot: snapname,
2018-09-04 23:46:02 +02:00
}
jobCallback := hooks.NewCallbackHookForFilesystem("snapshot", fs, func(_ context.Context) (err error) {
l.Debug("create snapshot")
err = zfs.ZFSSnapshot(fs, snapname, false) // TODO propagate context to ZFSSnapshot
if err != nil {
l.WithError(err).Error("cannot create snapshot")
}
return
})
fsHadErr := false
var planReport hooks.PlanReport
var plan *hooks.Plan
{
filteredHooks, err := a.hooks.CopyFilteredForFilesystem(fs)
if err != nil {
l.WithError(err).Error("unexpected filter error")
fsHadErr = true
goto updateFSState
}
// account for running hooks
for _, h := range filteredHooks {
hookMatchCount[h] = hookMatchCount[h] + 1
}
var planErr error
plan, planErr = hooks.NewPlan(&filteredHooks, hooks.PhaseSnapshot, jobCallback, hookEnvExtra)
if planErr != nil {
fsHadErr = true
l.WithError(planErr).Error("cannot create job hook plan")
goto updateFSState
}
}
u(func(snapper *Snapper) {
progress.name = snapname
progress.startAt = time.Now()
progress.hookPlan = plan
progress.state = SnapStarted
})
{
l := hooks.GetLogger(a.ctx).WithField("fs", fs.ToString()).WithField("snap", snapname)
l.WithField("report", plan.Report().String()).Debug("begin run job plan")
plan.Run(hooks.WithLogger(a.ctx, l), a.dryRun)
planReport = plan.Report()
fsHadErr = planReport.HadError() // not just fatal errors
if fsHadErr {
l.WithField("report", planReport.String()).Error("end run job plan with error")
} else {
l.WithField("report", planReport.String()).Info("end run job plan successful")
}
}
updateFSState:
anyFsHadErr = anyFsHadErr || fsHadErr
2018-09-04 23:46:02 +02:00
u(func(snapper *Snapper) {
progress.doneAt = time.Now()
2018-09-04 23:46:02 +02:00
progress.state = SnapDone
if fsHadErr {
2018-09-04 23:46:02 +02:00
progress.state = SnapError
}
progress.runResults = planReport
2018-09-04 23:46:02 +02:00
})
}
select {
case a.snapshotsTaken <- struct{}{}:
default:
if a.snapshotsTaken != nil {
a.log.Warn("callback channel is full, discarding snapshot update event")
}
2018-09-04 23:46:02 +02:00
}
for h, mc := range hookMatchCount {
if mc == 0 {
hookIdx := -1
for idx, ah := range *a.hooks {
if ah == h {
hookIdx = idx
break
}
}
a.log.WithField("hook", h.String()).WithField("hook_number", hookIdx+1).Warn("hook did not match any snapshotted filesystems")
}
}
2018-09-04 23:46:02 +02:00
return u(func(snapper *Snapper) {
if anyFsHadErr {
snapper.state = ErrorWait
snapper.err = errors.New("one or more snapshots could not be created, check logs for details")
2018-09-04 23:46:02 +02:00
} else {
snapper.state = Waiting
snapper.err = nil
2018-09-04 23:46:02 +02:00
}
}).sf()
}
func wait(a args, u updater) state {
var sleepUntil time.Time
u(func(snapper *Snapper) {
lastTick := snapper.lastInvocation
snapper.sleepUntil = lastTick.Add(a.interval)
sleepUntil = snapper.sleepUntil
log := a.log.WithField("sleep_until", sleepUntil).WithField("duration", a.interval)
logFunc := log.Debug
if snapper.state == ErrorWait || snapper.state == SyncUpErrWait {
logFunc = log.Error
}
logFunc("enter wait-state after error")
2018-09-04 23:46:02 +02:00
})
t := time.NewTimer(time.Until(sleepUntil))
2018-09-04 23:46:02 +02:00
defer t.Stop()
select {
case <-t.C:
return u(func(snapper *Snapper) {
snapper.state = Planning
}).sf()
case <-a.ctx.Done():
return onMainCtxDone(a.ctx, u)
2018-09-04 23:46:02 +02:00
}
}
func listFSes(ctx context.Context, mf *filters.DatasetMapFilter) (fss []*zfs.DatasetPath, err error) {
return zfs.ZFSListMapping(ctx, mf)
2018-09-04 23:46:02 +02:00
}
var syncUpWarnNoSnapshotUntilSyncupMinDuration = envconst.Duration("ZREPL_SNAPPER_SYNCUP_WARN_MIN_DURATION", 1*time.Second)
// see docs/snapshotting.rst
2018-09-04 23:46:02 +02:00
func findSyncPoint(log Logger, fss []*zfs.DatasetPath, prefix string, interval time.Duration) (syncPoint time.Time, err error) {
const (
prioHasVersions int = iota
prioNoVersions
)
2018-09-04 23:46:02 +02:00
type snapTime struct {
ds *zfs.DatasetPath
prio int // lower is higher
2018-09-04 23:46:02 +02:00
time time.Time
}
if len(fss) == 0 {
return time.Now(), nil
}
snaptimes := make([]snapTime, 0, len(fss))
hardErrs := 0
2018-09-04 23:46:02 +02:00
now := time.Now()
log.Debug("examine filesystem state to find sync point")
2018-09-04 23:46:02 +02:00
for _, d := range fss {
l := log.WithField("fs", d.ToString())
syncPoint, err := findSyncPointFSNextOptimalSnapshotTime(l, now, interval, prefix, d)
if err == findSyncPointFSNoFilesystemVersionsErr {
snaptimes = append(snaptimes, snapTime{
ds: d,
prio: prioNoVersions,
time: now,
})
} else if err != nil {
hardErrs++
l.WithError(err).Error("cannot determine optimal sync point for this filesystem")
} else {
l.WithField("syncPoint", syncPoint).Debug("found optimal sync point for this filesystem")
snaptimes = append(snaptimes, snapTime{
ds: d,
prio: prioHasVersions,
time: syncPoint,
})
2018-09-04 23:46:02 +02:00
}
}
2018-09-04 23:46:02 +02:00
if hardErrs == len(fss) {
return time.Time{}, fmt.Errorf("hard errors in determining sync point for every matching filesystem")
}
2018-09-04 23:46:02 +02:00
if len(snaptimes) == 0 {
panic("implementation error: loop must either inc hardErrs or add result to snaptimes")
}
2018-09-04 23:46:02 +02:00
// sort ascending by (prio,time)
// => those filesystems with versions win over those without any
sort.Slice(snaptimes, func(i, j int) bool {
if snaptimes[i].prio == snaptimes[j].prio {
return snaptimes[i].time.Before(snaptimes[j].time)
2018-09-04 23:46:02 +02:00
}
return snaptimes[i].prio < snaptimes[j].prio
})
winnerSyncPoint := snaptimes[0].time
l := log.WithField("syncPoint", winnerSyncPoint.String())
l.Info("determined sync point")
if winnerSyncPoint.Sub(now) > syncUpWarnNoSnapshotUntilSyncupMinDuration {
for _, st := range snaptimes {
if st.prio == prioNoVersions {
l.WithField("fs", st.ds.ToString()).Warn("filesystem will not be snapshotted until sync point")
}
2018-09-04 23:46:02 +02:00
}
}
return snaptimes[0].time, nil
}
var findSyncPointFSNoFilesystemVersionsErr = fmt.Errorf("no filesystem versions")
func findSyncPointFSNextOptimalSnapshotTime(l Logger, now time.Time, interval time.Duration, prefix string, d *zfs.DatasetPath) (time.Time, error) {
fsvs, err := zfs.ZFSListFilesystemVersions(d, filters.NewTypedPrefixFilter(prefix, zfs.Snapshot))
if err != nil {
return time.Time{}, errors.Wrap(err, "list filesystem versions")
}
if len(fsvs) <= 0 {
return time.Time{}, findSyncPointFSNoFilesystemVersionsErr
2018-09-04 23:46:02 +02:00
}
// Sort versions by creation
sort.SliceStable(fsvs, func(i, j int) bool {
return fsvs[i].CreateTXG < fsvs[j].CreateTXG
2018-09-04 23:46:02 +02:00
})
latest := fsvs[len(fsvs)-1]
l.WithField("creation", latest.Creation).Debug("found latest snapshot")
since := now.Sub(latest.Creation)
if since < 0 {
return time.Time{}, fmt.Errorf("snapshot %q is from the future: creation=%q now=%q", latest.ToAbsPath(d), latest.Creation, now)
}
2018-09-04 23:46:02 +02:00
return latest.Creation.Add(interval), nil
2018-09-04 23:46:02 +02:00
}