refactor snapper & support cron-based snapshotting

fixes https://github.com/zrepl/zrepl/issues/554
refs https://github.com/zrepl/zrepl/discussions/547#discussioncomment-1936126
This commit is contained in:
Christian Schwarz 2022-04-12 01:26:41 +02:00
parent a9c61b4b0b
commit c743c7b03f
21 changed files with 1195 additions and 686 deletions

View File

@ -547,10 +547,20 @@ func renderPrunerReport(t *stringbuilder.B, r *pruner.Report, fsfilter FilterFun
func renderSnapperReport(t *stringbuilder.B, r *snapper.Report, fsfilter FilterFunc) {
if r == nil {
t.Printf("<snapshot type does not have a report>\n")
t.Printf("<no snapshotting report available>\n")
return
}
t.Printf("Type: %s\n", r.Type)
if r.Periodic != nil {
renderSnapperReportPeriodic(t, r.Periodic, fsfilter)
} else if r.Cron != nil {
renderSnapperReportCron(t, r.Cron, fsfilter)
} else {
t.Printf("<no details available>")
}
}
func renderSnapperReportPeriodic(t *stringbuilder.B, r *snapper.PeriodicReport, fsfilter FilterFunc) {
t.Printf("Status: %s", r.State)
t.Newline()
@ -561,8 +571,25 @@ func renderSnapperReport(t *stringbuilder.B, r *snapper.Report, fsfilter FilterF
t.Printf("Sleep until: %s\n", r.SleepUntil)
}
sort.Slice(r.Progress, func(i, j int) bool {
return strings.Compare(r.Progress[i].Path, r.Progress[j].Path) == -1
renderSnapperPlanReportFilesystem(t, r.Progress, fsfilter)
}
func renderSnapperReportCron(t *stringbuilder.B, r *snapper.CronReport, fsfilter FilterFunc) {
t.Printf("State: %s\n", r.State)
now := time.Now()
if r.WakeupTime.After(now) {
t.Printf("Sleep until: %s (%s remaining)\n", r.WakeupTime, r.WakeupTime.Sub(now).Round(time.Second))
} else {
t.Printf("Started: %s (lasting %s)\n", r.WakeupTime, now.Sub(r.WakeupTime).Round(time.Second))
}
renderSnapperPlanReportFilesystem(t, r.Progress, fsfilter)
}
func renderSnapperPlanReportFilesystem(t *stringbuilder.B, fss []*snapper.ReportFilesystem, fsfilter FilterFunc) {
sort.Slice(fss, func(i, j int) bool {
return strings.Compare(fss[i].Path, fss[j].Path) == -1
})
dur := func(d time.Duration) string {
@ -575,8 +602,8 @@ func renderSnapperReport(t *stringbuilder.B, r *snapper.Report, fsfilter FilterF
var widths struct {
path, state, duration int
}
rows := make([]*row, 0, len(r.Progress))
for _, fs := range r.Progress {
rows := make([]*row, 0, len(fss))
for _, fs := range fss {
if !fsfilter(fs.Path) {
continue
}
@ -619,9 +646,11 @@ func renderSnapperReport(t *stringbuilder.B, r *snapper.Report, fsfilter FilterF
t.Printf("%s %s %s", path, state, duration)
t.PrintfDrawIndentedAndWrappedIfMultiline(" %s", r.remainder)
if r.hookReport != "" {
t.PrintfDrawIndentedAndWrappedIfMultiline("%s", r.hookReport)
t.AddIndent(1)
t.Newline()
t.Printf("%s", r.hookReport)
t.AddIndent(-1)
}
t.Newline()
}
}

View File

@ -11,6 +11,7 @@ import (
"time"
"github.com/pkg/errors"
"github.com/robfig/cron/v3"
"github.com/zrepl/yaml-config"
"github.com/zrepl/zrepl/util/datasizeunit"
@ -233,6 +234,38 @@ type SnapshottingPeriodic struct {
Hooks HookList `yaml:"hooks,optional"`
}
type CronSpec struct {
Schedule cron.Schedule
}
var _ yaml.Unmarshaler = &CronSpec{}
func (s *CronSpec) UnmarshalYAML(unmarshal func(v interface{}, not_strict bool) error) error {
var specString string
if err := unmarshal(&specString, false); err != nil {
return err
}
// Use standard cron format.
// Disable the various "descriptors" (@daily, etc)
// They are just aliases to "top of hour", "midnight", etc.
parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.SecondOptional)
sched, err := parser.Parse(specString)
if err != nil {
return errors.Wrap(err, "cron syntax invalid")
}
s.Schedule = sched
return nil
}
type SnapshottingCron struct {
Type string `yaml:"type"`
Prefix string `yaml:"prefix"`
Cron CronSpec `yaml:"cron"`
Hooks HookList `yaml:"hooks,optional"`
}
type SnapshottingManual struct {
Type string `yaml:"type"`
}
@ -556,6 +589,7 @@ func (t *SnapshottingEnum) UnmarshalYAML(u func(interface{}, bool) error) (err e
t.Ret, err = enumUnmarshal(u, map[string]interface{}{
"periodic": &SnapshottingPeriodic{},
"manual": &SnapshottingManual{},
"cron": &SnapshottingCron{},
})
return
}

View File

@ -13,6 +13,7 @@ import (
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zrepl/yaml-config"
)
func TestSampleConfigsAreParsedWithoutErrors(t *testing.T) {
@ -86,3 +87,53 @@ func TestTrimSpaceEachLineAndPad(t *testing.T) {
`
assert.Equal(t, " \n foo\n bar baz\n \n", trimSpaceEachLineAndPad(foo, " "))
}
func TestCronSpec(t *testing.T) {
expectAccept := []string{
`"* * * * *"`,
`"0-10 * * * *"`,
`"* 0-5,8,12 * * *"`,
}
expectFail := []string{
`* * * *`,
``,
`23`,
`"@reboot"`,
`"@every 1h30m"`,
`"@daily"`,
`* * * * * *`,
}
for _, input := range expectAccept {
t.Run(input, func(t *testing.T) {
s := fmt.Sprintf("spec: %s\n", input)
var v struct {
Spec CronSpec
}
v.Spec.Schedule = nil
t.Logf("input:\n%s", s)
err := yaml.UnmarshalStrict([]byte(s), &v)
t.Logf("error: %T %s", err, err)
require.NoError(t, err)
require.NotNil(t, v.Spec.Schedule)
})
}
for _, input := range expectFail {
t.Run(input, func(t *testing.T) {
s := fmt.Sprintf("spec: %s\n", input)
var v struct {
Spec CronSpec
}
v.Spec.Schedule = nil
t.Logf("input: %q", s)
err := yaml.UnmarshalStrict([]byte(s), &v)
t.Logf("error: %T %s", err, err)
require.Error(t, err)
require.Nil(t, v.Spec.Schedule)
})
}
}

View File

@ -0,0 +1,14 @@
jobs:
- name: snapjob
type: snap
filesystems: {
"tank<": true,
}
snapshotting:
type: cron
prefix: zrepl_snapjob_
cron: "*/5 * * * *"
pruning:
keep:
- type: last_n
count: 60

View File

@ -93,7 +93,14 @@ func (r *CommandHookReport) String() string {
cmdLine.WriteString(fmt.Sprintf("%s'%s'", sep, a))
}
return fmt.Sprintf("command hook invocation: \"%s\"", cmdLine.String()) // no %q to make copy-pastable
var msg string
if r.Err == nil {
msg = "command hook"
} else {
msg = fmt.Sprintf("command hook failed with %q", r.Err)
}
return fmt.Sprintf("%s: \"%s\"", msg, cmdLine.String()) // no %q to make copy-pastable
}
func (r *CommandHookReport) Error() string {
if r.Err == nil {

View File

@ -101,7 +101,7 @@ type modePush struct {
receiver *rpc.Client
senderConfig *endpoint.SenderConfig
plannerPolicy *logic.PlannerPolicy
snapper *snapper.PeriodicOrManual
snapper snapper.Snapper
}
func (m *modePush) ConnectEndpoints(ctx context.Context, connecter transport.Connecter) {
@ -137,7 +137,8 @@ func (m *modePush) RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{}
}
func (m *modePush) SnapperReport() *snapper.Report {
return m.snapper.Report()
r := m.snapper.Report()
return &r
}
func (m *modePush) ResetConnectBackoff() {

View File

@ -58,7 +58,7 @@ func modeSinkFromConfig(g *config.Global, in *config.SinkJob, jobID endpoint.Job
type modeSource struct {
senderConfig *endpoint.SenderConfig
snapper *snapper.PeriodicOrManual
snapper snapper.Snapper
}
func modeSourceFromConfig(g *config.Global, in *config.SourceJob, jobID endpoint.JobID) (m *modeSource, err error) {
@ -88,7 +88,8 @@ func (m *modeSource) RunPeriodic(ctx context.Context) {
}
func (m *modeSource) SnapperReport() *snapper.Report {
return m.snapper.Report()
r := m.snapper.Report()
return &r
}
func passiveSideFromConfig(g *config.Global, in *config.PassiveJob, configJob interface{}, parseFlags config.ParseFlags) (s *PassiveSide, err error) {

View File

@ -26,7 +26,7 @@ import (
type SnapJob struct {
name endpoint.JobID
fsfilter zfs.DatasetFilter
snapper *snapper.PeriodicOrManual
snapper snapper.Snapper
prunerFactory *pruner.LocalPrunerFactory
@ -86,7 +86,8 @@ func (j *SnapJob) Status() *Status {
s.Pruning = j.pruner.Report()
}
j.prunerMtx.Unlock()
s.Snapshotting = j.snapper.Report()
r := j.snapper.Report()
s.Snapshotting = &r
return &Status{Type: t, JobSpecific: s}
}

172
daemon/snapper/cron.go Normal file
View File

@ -0,0 +1,172 @@
package snapper
import (
"context"
"fmt"
"sync"
"time"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/hooks"
"github.com/zrepl/zrepl/zfs"
)
func cronFromConfig(fsf zfs.DatasetFilter, in config.SnapshottingCron) (*Cron, error) {
hooksList, err := hooks.ListFromConfig(&in.Hooks)
if err != nil {
return nil, errors.Wrap(err, "hook config error")
}
planArgs := planArgs{
prefix: in.Prefix,
hooks: hooksList,
}
return &Cron{config: in, fsf: fsf, planArgs: planArgs}, nil
}
type Cron struct {
config config.SnapshottingCron
fsf zfs.DatasetFilter
planArgs planArgs
mtx sync.RWMutex
running bool
wakeupTime time.Time // zero value means uninit
lastError error
lastPlan *plan
wakeupWhileRunningCount int
}
func (s *Cron) Run(ctx context.Context, snapshotsTaken chan<- struct{}) {
t := time.NewTimer(0)
defer func() {
if !t.Stop() {
select {
case <-t.C:
default:
}
}
}()
for {
now := time.Now()
s.mtx.Lock()
s.wakeupTime = s.config.Cron.Schedule.Next(now)
s.mtx.Unlock()
// Re-arm the timer.
// Need to Stop before Reset, see docs.
if !t.Stop() {
// Use non-blocking read from timer channel
// because, except for the first loop iteration,
// the channel is already drained
select {
case <-t.C:
default:
}
}
t.Reset(s.wakeupTime.Sub(now))
select {
case <-ctx.Done():
return
case <-t.C:
getLogger(ctx).Debug("cron timer fired")
s.mtx.Lock()
if s.running {
getLogger(ctx).Warn("snapshotting triggered according to cron rules but previous snapshotting is not done; not taking a snapshot this time")
s.wakeupWhileRunningCount++
s.mtx.Unlock()
continue
}
s.lastError = nil
s.lastPlan = nil
s.wakeupWhileRunningCount = 0
s.running = true
s.mtx.Unlock()
go func() {
err := s.do(ctx)
s.mtx.Lock()
s.lastError = err
s.running = false
s.mtx.Unlock()
select {
case snapshotsTaken <- struct{}{}:
default:
if snapshotsTaken != nil {
getLogger(ctx).Warn("callback channel is full, discarding snapshot update event")
}
}
}()
}
}
}
func (s *Cron) do(ctx context.Context) error {
fss, err := zfs.ZFSListMapping(ctx, s.fsf)
if err != nil {
return errors.Wrap(err, "cannot list filesystems")
}
p := makePlan(s.planArgs, fss)
s.mtx.Lock()
s.lastPlan = p
s.lastError = nil
s.mtx.Unlock()
ok := p.execute(ctx, false)
if !ok {
return errors.New("one or more snapshots could not be created, check logs for details")
} else {
return nil
}
}
type CronState string
const (
CronStateRunning CronState = "running"
CronStateWaiting CronState = "waiting"
)
type CronReport struct {
State CronState
WakeupTime time.Time
Errors []string
Progress []*ReportFilesystem
}
func (s *Cron) Report() Report {
s.mtx.Lock()
defer s.mtx.Unlock()
r := CronReport{}
r.WakeupTime = s.wakeupTime
if s.running {
r.State = CronStateRunning
} else {
r.State = CronStateWaiting
}
if s.lastError != nil {
r.Errors = append(r.Errors, s.lastError.Error())
}
if s.wakeupWhileRunningCount > 0 {
r.Errors = append(r.Errors, fmt.Sprintf("cron frequency is too high; snapshots were not taken %d times", s.wakeupWhileRunningCount))
}
r.Progress = nil
if s.lastPlan != nil {
r.Progress = s.lastPlan.report()
}
return Report{Type: TypeCron, Cron: &r}
}

View File

@ -0,0 +1,68 @@
package snapper
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zrepl/yaml-config"
"github.com/zrepl/zrepl/config"
)
func TestCronLibraryWorks(t *testing.T) {
type testCase struct {
spec string
in time.Time
expect time.Time
}
dhm := func(day, hour, minutes int) time.Time {
return time.Date(2022, 7, day, hour, minutes, 0, 0, time.UTC)
}
hm := func(hour, minutes int) time.Time {
return dhm(23, hour, minutes)
}
tcs := []testCase{
{"0-10 * * * *", dhm(17, 1, 10), dhm(17, 2, 0)},
{"0-10 * * * *", dhm(17, 23, 10), dhm(18, 0, 0)},
{"0-10 * * * *", hm(1, 9), hm(1, 10)},
{"0-10 * * * *", hm(1, 9), hm(1, 10)},
{"1,3,5 * * * *", hm(1, 1), hm(1, 3)},
{"1,3,5 * * * *", hm(1, 2), hm(1, 3)},
{"1,3,5 * * * *", hm(1, 3), hm(1, 5)},
{"1,3,5 * * * *", hm(1, 5), hm(2, 1)},
{"* 0-5,8,12 * * *", hm(0, 0), hm(0, 1)},
{"* 0-5,8,12 * * *", hm(4, 59), hm(5, 0)},
{"* 0-5,8,12 * * *", hm(5, 0), hm(5, 1)},
{"* 0-5,8,12 * * *", hm(5, 59), hm(8, 0)},
{"* 0-5,8,12 * * *", hm(8, 59), hm(12, 0)},
// https://github.com/zrepl/zrepl/pull/614#issuecomment-1188358989
{"53 17,18,19 * * *", dhm(23, 17, 52), dhm(23, 17, 53)},
{"53 17,18,19 * * *", dhm(23, 17, 53), dhm(23, 18, 53)},
{"53 17,18,19 * * *", dhm(23, 18, 53), dhm(23, 19, 53)},
{"53 17,18,19 * * *", dhm(23, 19, 53), dhm(24 /* ! */, 17, 53)},
}
for i, tc := range tcs {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
var s struct {
Cron config.CronSpec `yaml:"cron"`
}
inp := fmt.Sprintf("cron: %q", tc.spec)
fmt.Println("spec is ", inp)
err := yaml.UnmarshalStrict([]byte(inp), &s)
require.NoError(t, err)
actual := s.Cron.Schedule.Next(tc.in)
assert.Equal(t, tc.expect, actual)
})
}
}

251
daemon/snapper/impl.go Normal file
View File

@ -0,0 +1,251 @@
package snapper
import (
"context"
"fmt"
"sort"
"strings"
"time"
"github.com/zrepl/zrepl/daemon/hooks"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/util/chainlock"
"github.com/zrepl/zrepl/zfs"
)
type planArgs struct {
prefix string
hooks *hooks.List
}
type plan struct {
mtx chainlock.L
args planArgs
snaps map[*zfs.DatasetPath]*snapProgress
}
func makePlan(args planArgs, fss []*zfs.DatasetPath) *plan {
snaps := make(map[*zfs.DatasetPath]*snapProgress, len(fss))
for _, fs := range fss {
snaps[fs] = &snapProgress{state: SnapPending}
}
return &plan{snaps: snaps, args: args}
}
//go:generate stringer -type=SnapState
type SnapState uint
const (
SnapPending SnapState = 1 << iota
SnapStarted
SnapDone
SnapError
)
// All fields protected by Snapper.mtx
type snapProgress struct {
state SnapState
// SnapStarted, SnapDone, SnapError
name string
startAt time.Time
hookPlan *hooks.Plan
// SnapDone
doneAt time.Time
// SnapErr TODO disambiguate state
runResults hooks.PlanReport
}
func (plan *plan) execute(ctx context.Context, dryRun bool) (ok bool) {
hookMatchCount := make(map[hooks.Hook]int, len(*plan.args.hooks))
for _, h := range *plan.args.hooks {
hookMatchCount[h] = 0
}
anyFsHadErr := false
// TODO channel programs -> allow a little jitter?
for fs, progress := range plan.snaps {
suffix := time.Now().In(time.UTC).Format("20060102_150405_000")
snapname := fmt.Sprintf("%s%s", plan.args.prefix, suffix)
ctx := logging.WithInjectedField(ctx, "fs", fs.ToString())
ctx = logging.WithInjectedField(ctx, "snap", snapname)
hookEnvExtra := hooks.Env{
hooks.EnvFS: fs.ToString(),
hooks.EnvSnapshot: snapname,
}
jobCallback := hooks.NewCallbackHookForFilesystem("snapshot", fs, func(ctx context.Context) (err error) {
l := getLogger(ctx)
l.Debug("create snapshot")
err = zfs.ZFSSnapshot(ctx, fs, snapname, false) // TODO propagate context to ZFSSnapshot
if err != nil {
l.WithError(err).Error("cannot create snapshot")
}
return
})
fsHadErr := false
var hookPlanReport hooks.PlanReport
var hookPlan *hooks.Plan
{
filteredHooks, err := plan.args.hooks.CopyFilteredForFilesystem(fs)
if err != nil {
getLogger(ctx).WithError(err).Error("unexpected filter error")
fsHadErr = true
goto updateFSState
}
// account for running hooks
for _, h := range filteredHooks {
hookMatchCount[h] = hookMatchCount[h] + 1
}
var planErr error
hookPlan, planErr = hooks.NewPlan(&filteredHooks, hooks.PhaseSnapshot, jobCallback, hookEnvExtra)
if planErr != nil {
fsHadErr = true
getLogger(ctx).WithError(planErr).Error("cannot create job hook plan")
goto updateFSState
}
}
plan.mtx.HoldWhile(func() {
progress.name = snapname
progress.startAt = time.Now()
progress.hookPlan = hookPlan
progress.state = SnapStarted
})
{
getLogger(ctx).WithField("report", hookPlan.Report().String()).Debug("begin run job plan")
hookPlan.Run(ctx, dryRun)
hookPlanReport = hookPlan.Report()
fsHadErr = hookPlanReport.HadError() // not just fatal errors
if fsHadErr {
getLogger(ctx).WithField("report", hookPlanReport.String()).Error("end run job plan with error")
} else {
getLogger(ctx).WithField("report", hookPlanReport.String()).Info("end run job plan successful")
}
}
updateFSState:
anyFsHadErr = anyFsHadErr || fsHadErr
plan.mtx.HoldWhile(func() {
progress.doneAt = time.Now()
progress.state = SnapDone
if fsHadErr {
progress.state = SnapError
}
progress.runResults = hookPlanReport
})
}
for h, mc := range hookMatchCount {
if mc == 0 {
hookIdx := -1
for idx, ah := range *plan.args.hooks {
if ah == h {
hookIdx = idx
break
}
}
getLogger(ctx).WithField("hook", h.String()).WithField("hook_number", hookIdx+1).Warn("hook did not match any snapshotted filesystems")
}
}
return !anyFsHadErr
}
type ReportFilesystem struct {
Path string
State SnapState
// Valid in SnapStarted and later
SnapName string
StartAt time.Time
Hooks string
HooksHadError bool
// Valid in SnapDone | SnapError
DoneAt time.Time
}
func (plan *plan) report() []*ReportFilesystem {
plan.mtx.Lock()
defer plan.mtx.Unlock()
pReps := make([]*ReportFilesystem, 0, len(plan.snaps))
for fs, p := range plan.snaps {
var hooksStr string
var hooksHadError bool
if p.hookPlan != nil {
hooksStr, hooksHadError = p.report()
}
pReps = append(pReps, &ReportFilesystem{
Path: fs.ToString(),
State: p.state,
SnapName: p.name,
StartAt: p.startAt,
DoneAt: p.doneAt,
Hooks: hooksStr,
HooksHadError: hooksHadError,
})
}
sort.Slice(pReps, func(i, j int) bool {
return strings.Compare(pReps[i].Path, pReps[j].Path) == -1
})
return pReps
}
func (p *snapProgress) report() (hooksStr string, hooksHadError bool) {
hr := p.hookPlan.Report()
// FIXME: technically this belongs into client
// but we can't serialize hooks.Step ATM
rightPad := func(str string, length int, pad string) string {
if len(str) > length {
return str[:length]
}
return str + strings.Repeat(pad, length-len(str))
}
hooksHadError = hr.HadError()
rows := make([][]string, len(hr))
const numCols = 4
lens := make([]int, numCols)
for i, e := range hr {
rows[i] = make([]string, numCols)
rows[i][0] = fmt.Sprintf("%d", i+1)
rows[i][1] = e.Status.String()
runTime := "..."
if e.Status != hooks.StepPending {
runTime = e.End.Sub(e.Begin).Round(time.Millisecond).String()
}
rows[i][2] = runTime
rows[i][3] = ""
if e.Report != nil {
rows[i][3] = e.Report.String()
}
for j, col := range lens {
if len(rows[i][j]) > col {
lens[j] = len(rows[i][j])
}
}
}
rowsFlat := make([]string, len(hr))
for i, r := range rows {
colsPadded := make([]string, len(r))
for j, c := range r[:len(r)-1] {
colsPadded[j] = rightPad(c, lens[j], " ")
}
colsPadded[len(r)-1] = r[len(r)-1]
rowsFlat[i] = strings.Join(colsPadded, " ")
}
hooksStr = strings.Join(rowsFlat, "\n")
return hooksStr, hooksHadError
}

15
daemon/snapper/manual.go Normal file
View File

@ -0,0 +1,15 @@
package snapper
import (
"context"
)
type manual struct{}
func (s *manual) Run(ctx context.Context, wakeUpCommon chan<- struct{}) {
// nothing to do
}
func (s *manual) Report() Report {
return Report{Type: TypeManual, Manual: &struct{}{}}
}

399
daemon/snapper/periodic.go Normal file
View File

@ -0,0 +1,399 @@
package snapper
import (
"context"
"fmt"
"sort"
"sync"
"time"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/hooks"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/util/envconst"
"github.com/zrepl/zrepl/zfs"
)
func periodicFromConfig(g *config.Global, fsf zfs.DatasetFilter, in *config.SnapshottingPeriodic) (*Periodic, error) {
if in.Prefix == "" {
return nil, errors.New("prefix must not be empty")
}
if in.Interval <= 0 {
return nil, errors.New("interval must be positive")
}
hookList, err := hooks.ListFromConfig(&in.Hooks)
if err != nil {
return nil, errors.Wrap(err, "hook config error")
}
args := periodicArgs{
interval: in.Interval,
fsf: fsf,
planArgs: planArgs{
prefix: in.Prefix,
hooks: hookList,
},
// ctx and log is set in Run()
}
return &Periodic{state: SyncUp, args: args}, nil
}
type periodicArgs struct {
ctx context.Context
interval time.Duration
fsf zfs.DatasetFilter
planArgs planArgs
snapshotsTaken chan<- struct{}
dryRun bool
}
type Periodic struct {
args periodicArgs
mtx sync.Mutex
state State
// set in state Plan, used in Waiting
lastInvocation time.Time
// valid for state Snapshotting
plan *plan
// valid for state SyncUp and Waiting
sleepUntil time.Time
// valid for state Err
err error
}
//go:generate stringer -type=State
type State uint
const (
SyncUp State = 1 << iota
SyncUpErrWait
Planning
Snapshotting
Waiting
ErrorWait
Stopped
)
func (s State) sf() state {
m := map[State]state{
SyncUp: periodicStateSyncUp,
SyncUpErrWait: periodicStateWait,
Planning: periodicStatePlan,
Snapshotting: periodicStateSnapshot,
Waiting: periodicStateWait,
ErrorWait: periodicStateWait,
Stopped: nil,
}
return m[s]
}
type updater func(u func(*Periodic)) State
type state func(a periodicArgs, u updater) state
func (s *Periodic) Run(ctx context.Context, snapshotsTaken chan<- struct{}) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
getLogger(ctx).Debug("start")
defer getLogger(ctx).Debug("stop")
s.args.snapshotsTaken = snapshotsTaken
s.args.ctx = ctx
s.args.dryRun = false // for future expansion
u := func(u func(*Periodic)) State {
s.mtx.Lock()
defer s.mtx.Unlock()
if u != nil {
u(s)
}
return s.state
}
var st state = periodicStateSyncUp
for st != nil {
pre := u(nil)
st = st(s.args, u)
post := u(nil)
getLogger(ctx).
WithField("transition", fmt.Sprintf("%s=>%s", pre, post)).
Debug("state transition")
}
}
func onErr(err error, u updater) state {
return u(func(s *Periodic) {
s.err = err
preState := s.state
switch s.state {
case SyncUp:
s.state = SyncUpErrWait
case Planning:
fallthrough
case Snapshotting:
s.state = ErrorWait
}
getLogger(s.args.ctx).WithError(err).WithField("pre_state", preState).WithField("post_state", s.state).Error("snapshotting error")
}).sf()
}
func onMainCtxDone(ctx context.Context, u updater) state {
return u(func(s *Periodic) {
s.err = ctx.Err()
s.state = Stopped
}).sf()
}
func periodicStateSyncUp(a periodicArgs, u updater) state {
u(func(snapper *Periodic) {
snapper.lastInvocation = time.Now()
})
fss, err := listFSes(a.ctx, a.fsf)
if err != nil {
return onErr(err, u)
}
syncPoint, err := findSyncPoint(a.ctx, fss, a.planArgs.prefix, a.interval)
if err != nil {
return onErr(err, u)
}
u(func(s *Periodic) {
s.sleepUntil = syncPoint
})
t := time.NewTimer(time.Until(syncPoint))
defer t.Stop()
select {
case <-t.C:
return u(func(s *Periodic) {
s.state = Planning
}).sf()
case <-a.ctx.Done():
return onMainCtxDone(a.ctx, u)
}
}
func periodicStatePlan(a periodicArgs, u updater) state {
u(func(snapper *Periodic) {
snapper.lastInvocation = time.Now()
})
fss, err := listFSes(a.ctx, a.fsf)
if err != nil {
return onErr(err, u)
}
p := makePlan(a.planArgs, fss)
return u(func(s *Periodic) {
s.state = Snapshotting
s.plan = p
s.err = nil
}).sf()
}
func periodicStateSnapshot(a periodicArgs, u updater) state {
var plan *plan
u(func(snapper *Periodic) {
plan = snapper.plan
})
ok := plan.execute(a.ctx, false)
select {
case a.snapshotsTaken <- struct{}{}:
default:
if a.snapshotsTaken != nil {
getLogger(a.ctx).Warn("callback channel is full, discarding snapshot update event")
}
}
return u(func(snapper *Periodic) {
if !ok {
snapper.state = ErrorWait
snapper.err = errors.New("one or more snapshots could not be created, check logs for details")
} else {
snapper.state = Waiting
snapper.err = nil
}
}).sf()
}
func periodicStateWait(a periodicArgs, u updater) state {
var sleepUntil time.Time
u(func(snapper *Periodic) {
lastTick := snapper.lastInvocation
snapper.sleepUntil = lastTick.Add(a.interval)
sleepUntil = snapper.sleepUntil
log := getLogger(a.ctx).WithField("sleep_until", sleepUntil).WithField("duration", a.interval)
logFunc := log.Debug
if snapper.state == ErrorWait || snapper.state == SyncUpErrWait {
logFunc = log.Error
}
logFunc("enter wait-state after error")
})
t := time.NewTimer(time.Until(sleepUntil))
defer t.Stop()
select {
case <-t.C:
return u(func(snapper *Periodic) {
snapper.state = Planning
}).sf()
case <-a.ctx.Done():
return onMainCtxDone(a.ctx, u)
}
}
func listFSes(ctx context.Context, mf zfs.DatasetFilter) (fss []*zfs.DatasetPath, err error) {
return zfs.ZFSListMapping(ctx, mf)
}
var syncUpWarnNoSnapshotUntilSyncupMinDuration = envconst.Duration("ZREPL_SNAPPER_SYNCUP_WARN_MIN_DURATION", 1*time.Second)
// see docs/snapshotting.rst
func findSyncPoint(ctx context.Context, fss []*zfs.DatasetPath, prefix string, interval time.Duration) (syncPoint time.Time, err error) {
const (
prioHasVersions int = iota
prioNoVersions
)
type snapTime struct {
ds *zfs.DatasetPath
prio int // lower is higher
time time.Time
}
if len(fss) == 0 {
return time.Now(), nil
}
snaptimes := make([]snapTime, 0, len(fss))
hardErrs := 0
now := time.Now()
getLogger(ctx).Debug("examine filesystem state to find sync point")
for _, d := range fss {
ctx := logging.WithInjectedField(ctx, "fs", d.ToString())
syncPoint, err := findSyncPointFSNextOptimalSnapshotTime(ctx, now, interval, prefix, d)
if err == findSyncPointFSNoFilesystemVersionsErr {
snaptimes = append(snaptimes, snapTime{
ds: d,
prio: prioNoVersions,
time: now,
})
} else if err != nil {
hardErrs++
getLogger(ctx).WithError(err).Error("cannot determine optimal sync point for this filesystem")
} else {
getLogger(ctx).WithField("syncPoint", syncPoint).Debug("found optimal sync point for this filesystem")
snaptimes = append(snaptimes, snapTime{
ds: d,
prio: prioHasVersions,
time: syncPoint,
})
}
}
if hardErrs == len(fss) {
return time.Time{}, fmt.Errorf("hard errors in determining sync point for every matching filesystem")
}
if len(snaptimes) == 0 {
panic("implementation error: loop must either inc hardErrs or add result to snaptimes")
}
// sort ascending by (prio,time)
// => those filesystems with versions win over those without any
sort.Slice(snaptimes, func(i, j int) bool {
if snaptimes[i].prio == snaptimes[j].prio {
return snaptimes[i].time.Before(snaptimes[j].time)
}
return snaptimes[i].prio < snaptimes[j].prio
})
winnerSyncPoint := snaptimes[0].time
l := getLogger(ctx).WithField("syncPoint", winnerSyncPoint.String())
l.Info("determined sync point")
if winnerSyncPoint.Sub(now) > syncUpWarnNoSnapshotUntilSyncupMinDuration {
for _, st := range snaptimes {
if st.prio == prioNoVersions {
l.WithField("fs", st.ds.ToString()).Warn("filesystem will not be snapshotted until sync point")
}
}
}
return snaptimes[0].time, nil
}
var findSyncPointFSNoFilesystemVersionsErr = fmt.Errorf("no filesystem versions")
func findSyncPointFSNextOptimalSnapshotTime(ctx context.Context, now time.Time, interval time.Duration, prefix string, d *zfs.DatasetPath) (time.Time, error) {
fsvs, err := zfs.ZFSListFilesystemVersions(ctx, d, zfs.ListFilesystemVersionsOptions{
Types: zfs.Snapshots,
ShortnamePrefix: prefix,
})
if err != nil {
return time.Time{}, errors.Wrap(err, "list filesystem versions")
}
if len(fsvs) <= 0 {
return time.Time{}, findSyncPointFSNoFilesystemVersionsErr
}
// Sort versions by creation
sort.SliceStable(fsvs, func(i, j int) bool {
return fsvs[i].CreateTXG < fsvs[j].CreateTXG
})
latest := fsvs[len(fsvs)-1]
getLogger(ctx).WithField("creation", latest.Creation).Debug("found latest snapshot")
since := now.Sub(latest.Creation)
if since < 0 {
return time.Time{}, fmt.Errorf("snapshot %q is from the future: creation=%q now=%q", latest.ToAbsPath(d), latest.Creation, now)
}
return latest.Creation.Add(interval), nil
}
type PeriodicReport struct {
State State
// valid in state SyncUp and Waiting
SleepUntil time.Time
// valid in state Err
Error string
// valid in state Snapshotting
Progress []*ReportFilesystem
}
func (s *Periodic) Report() Report {
s.mtx.Lock()
defer s.mtx.Unlock()
var progress []*ReportFilesystem = nil
if s.plan != nil {
progress = s.plan.report()
}
r := &PeriodicReport{
State: s.state,
SleepUntil: s.sleepUntil,
Error: errOrEmptyString(s.err),
Progress: progress,
}
return Report{Type: TypePeriodic, Periodic: r}
}

View File

@ -3,497 +3,40 @@ package snapper
import (
"context"
"fmt"
"sort"
"sync"
"time"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/hooks"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/util/envconst"
"github.com/zrepl/zrepl/zfs"
)
//go:generate stringer -type=SnapState
type SnapState uint
type Type string
const (
SnapPending SnapState = 1 << iota
SnapStarted
SnapDone
SnapError
TypePeriodic Type = "periodic"
TypeCron Type = "cron"
TypeManual Type = "manual"
)
// All fields protected by Snapper.mtx
type snapProgress struct {
state SnapState
// SnapStarted, SnapDone, SnapError
name string
startAt time.Time
hookPlan *hooks.Plan
// SnapDone
doneAt time.Time
// SnapErr TODO disambiguate state
runResults hooks.PlanReport
type Snapper interface {
Run(ctx context.Context, snapshotsTaken chan<- struct{})
Report() Report
}
type args struct {
ctx context.Context
prefix string
interval time.Duration
fsf zfs.DatasetFilter
snapshotsTaken chan<- struct{}
hooks *hooks.List
dryRun bool
type Report struct {
Type Type
Periodic *PeriodicReport
Cron *CronReport
Manual *struct{}
}
type Snapper struct {
args args
mtx sync.Mutex
state State
// set in state Plan, used in Waiting
lastInvocation time.Time
// valid for state Snapshotting
plan map[*zfs.DatasetPath]*snapProgress
// valid for state SyncUp and Waiting
sleepUntil time.Time
// valid for state Err
err error
}
//go:generate stringer -type=State
type State uint
const (
SyncUp State = 1 << iota
SyncUpErrWait
Planning
Snapshotting
Waiting
ErrorWait
Stopped
)
func (s State) sf() state {
m := map[State]state{
SyncUp: syncUp,
SyncUpErrWait: wait,
Planning: plan,
Snapshotting: snapshot,
Waiting: wait,
ErrorWait: wait,
Stopped: nil,
}
return m[s]
}
type updater func(u func(*Snapper)) State
type state func(a args, u updater) state
type Logger = logger.Logger
func getLogger(ctx context.Context) Logger {
return logging.GetLogger(ctx, logging.SubsysSnapshot)
}
func PeriodicFromConfig(g *config.Global, fsf zfs.DatasetFilter, in *config.SnapshottingPeriodic) (*Snapper, error) {
if in.Prefix == "" {
return nil, errors.New("prefix must not be empty")
}
if in.Interval <= 0 {
return nil, errors.New("interval must be positive")
}
hookList, err := hooks.ListFromConfig(&in.Hooks)
if err != nil {
return nil, errors.Wrap(err, "hook config error")
}
args := args{
prefix: in.Prefix,
interval: in.Interval,
fsf: fsf,
hooks: hookList,
// ctx and log is set in Run()
}
return &Snapper{state: SyncUp, args: args}, nil
}
func (s *Snapper) Run(ctx context.Context, snapshotsTaken chan<- struct{}) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
getLogger(ctx).Debug("start")
defer getLogger(ctx).Debug("stop")
s.args.snapshotsTaken = snapshotsTaken
s.args.ctx = ctx
s.args.dryRun = false // for future expansion
u := func(u func(*Snapper)) State {
s.mtx.Lock()
defer s.mtx.Unlock()
if u != nil {
u(s)
}
return s.state
}
var st state = syncUp
for st != nil {
pre := u(nil)
st = st(s.args, u)
post := u(nil)
getLogger(ctx).
WithField("transition", fmt.Sprintf("%s=>%s", pre, post)).
Debug("state transition")
}
}
func onErr(err error, u updater) state {
return u(func(s *Snapper) {
s.err = err
preState := s.state
switch s.state {
case SyncUp:
s.state = SyncUpErrWait
case Planning:
fallthrough
case Snapshotting:
s.state = ErrorWait
}
getLogger(s.args.ctx).WithError(err).WithField("pre_state", preState).WithField("post_state", s.state).Error("snapshotting error")
}).sf()
}
func onMainCtxDone(ctx context.Context, u updater) state {
return u(func(s *Snapper) {
s.err = ctx.Err()
s.state = Stopped
}).sf()
}
func syncUp(a args, u updater) state {
u(func(snapper *Snapper) {
snapper.lastInvocation = time.Now()
})
fss, err := listFSes(a.ctx, a.fsf)
if err != nil {
return onErr(err, u)
}
syncPoint, err := findSyncPoint(a.ctx, fss, a.prefix, a.interval)
if err != nil {
return onErr(err, u)
}
u(func(s *Snapper) {
s.sleepUntil = syncPoint
})
t := time.NewTimer(time.Until(syncPoint))
defer t.Stop()
select {
case <-t.C:
return u(func(s *Snapper) {
s.state = Planning
}).sf()
case <-a.ctx.Done():
return onMainCtxDone(a.ctx, u)
}
}
func plan(a args, u updater) state {
u(func(snapper *Snapper) {
snapper.lastInvocation = time.Now()
})
fss, err := listFSes(a.ctx, a.fsf)
if err != nil {
return onErr(err, u)
}
plan := make(map[*zfs.DatasetPath]*snapProgress, len(fss))
for _, fs := range fss {
plan[fs] = &snapProgress{state: SnapPending}
}
return u(func(s *Snapper) {
s.state = Snapshotting
s.plan = plan
s.err = nil
}).sf()
}
func snapshot(a args, u updater) state {
var plan map[*zfs.DatasetPath]*snapProgress
u(func(snapper *Snapper) {
plan = snapper.plan
})
hookMatchCount := make(map[hooks.Hook]int, len(*a.hooks))
for _, h := range *a.hooks {
hookMatchCount[h] = 0
}
anyFsHadErr := false
// TODO channel programs -> allow a little jitter?
for fs, progress := range plan {
suffix := time.Now().In(time.UTC).Format("20060102_150405_000")
snapname := fmt.Sprintf("%s%s", a.prefix, suffix)
ctx := logging.WithInjectedField(a.ctx, "fs", fs.ToString())
ctx = logging.WithInjectedField(ctx, "snap", snapname)
hookEnvExtra := hooks.Env{
hooks.EnvFS: fs.ToString(),
hooks.EnvSnapshot: snapname,
}
jobCallback := hooks.NewCallbackHookForFilesystem("snapshot", fs, func(ctx context.Context) (err error) {
l := getLogger(ctx)
l.Debug("create snapshot")
err = zfs.ZFSSnapshot(ctx, fs, snapname, false) // TODO propagate context to ZFSSnapshot
if err != nil {
l.WithError(err).Error("cannot create snapshot")
}
return
})
fsHadErr := false
var planReport hooks.PlanReport
var plan *hooks.Plan
{
filteredHooks, err := a.hooks.CopyFilteredForFilesystem(fs)
if err != nil {
getLogger(ctx).WithError(err).Error("unexpected filter error")
fsHadErr = true
goto updateFSState
}
// account for running hooks
for _, h := range filteredHooks {
hookMatchCount[h] = hookMatchCount[h] + 1
}
var planErr error
plan, planErr = hooks.NewPlan(&filteredHooks, hooks.PhaseSnapshot, jobCallback, hookEnvExtra)
if planErr != nil {
fsHadErr = true
getLogger(ctx).WithError(planErr).Error("cannot create job hook plan")
goto updateFSState
}
}
u(func(snapper *Snapper) {
progress.name = snapname
progress.startAt = time.Now()
progress.hookPlan = plan
progress.state = SnapStarted
})
{
getLogger(ctx).WithField("report", plan.Report().String()).Debug("begin run job plan")
plan.Run(ctx, a.dryRun)
planReport = plan.Report()
fsHadErr = planReport.HadError() // not just fatal errors
if fsHadErr {
getLogger(ctx).WithField("report", planReport.String()).Error("end run job plan with error")
} else {
getLogger(ctx).WithField("report", planReport.String()).Info("end run job plan successful")
}
}
updateFSState:
anyFsHadErr = anyFsHadErr || fsHadErr
u(func(snapper *Snapper) {
progress.doneAt = time.Now()
progress.state = SnapDone
if fsHadErr {
progress.state = SnapError
}
progress.runResults = planReport
})
}
select {
case a.snapshotsTaken <- struct{}{}:
func FromConfig(g *config.Global, fsf zfs.DatasetFilter, in config.SnapshottingEnum) (Snapper, error) {
switch v := in.Ret.(type) {
case *config.SnapshottingPeriodic:
return periodicFromConfig(g, fsf, v)
case *config.SnapshottingCron:
return cronFromConfig(fsf, *v)
case *config.SnapshottingManual:
return &manual{}, nil
default:
if a.snapshotsTaken != nil {
getLogger(a.ctx).Warn("callback channel is full, discarding snapshot update event")
}
}
for h, mc := range hookMatchCount {
if mc == 0 {
hookIdx := -1
for idx, ah := range *a.hooks {
if ah == h {
hookIdx = idx
break
}
}
getLogger(a.ctx).WithField("hook", h.String()).WithField("hook_number", hookIdx+1).Warn("hook did not match any snapshotted filesystems")
}
}
return u(func(snapper *Snapper) {
if anyFsHadErr {
snapper.state = ErrorWait
snapper.err = errors.New("one or more snapshots could not be created, check logs for details")
} else {
snapper.state = Waiting
snapper.err = nil
}
}).sf()
}
func wait(a args, u updater) state {
var sleepUntil time.Time
u(func(snapper *Snapper) {
lastTick := snapper.lastInvocation
snapper.sleepUntil = lastTick.Add(a.interval)
sleepUntil = snapper.sleepUntil
log := getLogger(a.ctx).WithField("sleep_until", sleepUntil).WithField("duration", a.interval)
logFunc := log.Debug
if snapper.state == ErrorWait || snapper.state == SyncUpErrWait {
logFunc = log.Error
}
logFunc("enter wait-state after error")
})
t := time.NewTimer(time.Until(sleepUntil))
defer t.Stop()
select {
case <-t.C:
return u(func(snapper *Snapper) {
snapper.state = Planning
}).sf()
case <-a.ctx.Done():
return onMainCtxDone(a.ctx, u)
return nil, fmt.Errorf("unknown snapshotting type %T", v)
}
}
func listFSes(ctx context.Context, mf zfs.DatasetFilter) (fss []*zfs.DatasetPath, err error) {
return zfs.ZFSListMapping(ctx, mf)
}
var syncUpWarnNoSnapshotUntilSyncupMinDuration = envconst.Duration("ZREPL_SNAPPER_SYNCUP_WARN_MIN_DURATION", 1*time.Second)
// see docs/snapshotting.rst
func findSyncPoint(ctx context.Context, fss []*zfs.DatasetPath, prefix string, interval time.Duration) (syncPoint time.Time, err error) {
const (
prioHasVersions int = iota
prioNoVersions
)
type snapTime struct {
ds *zfs.DatasetPath
prio int // lower is higher
time time.Time
}
if len(fss) == 0 {
return time.Now(), nil
}
snaptimes := make([]snapTime, 0, len(fss))
hardErrs := 0
now := time.Now()
getLogger(ctx).Debug("examine filesystem state to find sync point")
for _, d := range fss {
ctx := logging.WithInjectedField(ctx, "fs", d.ToString())
syncPoint, err := findSyncPointFSNextOptimalSnapshotTime(ctx, now, interval, prefix, d)
if err == findSyncPointFSNoFilesystemVersionsErr {
snaptimes = append(snaptimes, snapTime{
ds: d,
prio: prioNoVersions,
time: now,
})
} else if err != nil {
hardErrs++
getLogger(ctx).WithError(err).Error("cannot determine optimal sync point for this filesystem")
} else {
getLogger(ctx).WithField("syncPoint", syncPoint).Debug("found optimal sync point for this filesystem")
snaptimes = append(snaptimes, snapTime{
ds: d,
prio: prioHasVersions,
time: syncPoint,
})
}
}
if hardErrs == len(fss) {
return time.Time{}, fmt.Errorf("hard errors in determining sync point for every matching filesystem")
}
if len(snaptimes) == 0 {
panic("implementation error: loop must either inc hardErrs or add result to snaptimes")
}
// sort ascending by (prio,time)
// => those filesystems with versions win over those without any
sort.Slice(snaptimes, func(i, j int) bool {
if snaptimes[i].prio == snaptimes[j].prio {
return snaptimes[i].time.Before(snaptimes[j].time)
}
return snaptimes[i].prio < snaptimes[j].prio
})
winnerSyncPoint := snaptimes[0].time
l := getLogger(ctx).WithField("syncPoint", winnerSyncPoint.String())
l.Info("determined sync point")
if winnerSyncPoint.Sub(now) > syncUpWarnNoSnapshotUntilSyncupMinDuration {
for _, st := range snaptimes {
if st.prio == prioNoVersions {
l.WithField("fs", st.ds.ToString()).Warn("filesystem will not be snapshotted until sync point")
}
}
}
return snaptimes[0].time, nil
}
var findSyncPointFSNoFilesystemVersionsErr = fmt.Errorf("no filesystem versions")
func findSyncPointFSNextOptimalSnapshotTime(ctx context.Context, now time.Time, interval time.Duration, prefix string, d *zfs.DatasetPath) (time.Time, error) {
fsvs, err := zfs.ZFSListFilesystemVersions(ctx, d, zfs.ListFilesystemVersionsOptions{
Types: zfs.Snapshots,
ShortnamePrefix: prefix,
})
if err != nil {
return time.Time{}, errors.Wrap(err, "list filesystem versions")
}
if len(fsvs) <= 0 {
return time.Time{}, findSyncPointFSNoFilesystemVersionsErr
}
// Sort versions by creation
sort.SliceStable(fsvs, func(i, j int) bool {
return fsvs[i].CreateTXG < fsvs[j].CreateTXG
})
latest := fsvs[len(fsvs)-1]
getLogger(ctx).WithField("creation", latest.Creation).Debug("found latest snapshot")
since := now.Sub(latest.Creation)
if since < 0 {
return time.Time{}, fmt.Errorf("snapshot %q is from the future: creation=%q now=%q", latest.ToAbsPath(d), latest.Creation, now)
}
return latest.Creation.Add(interval), nil
}

View File

@ -1,48 +0,0 @@
package snapper
import (
"context"
"fmt"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/zfs"
)
// FIXME: properly abstract snapshotting:
// - split up things that trigger snapshotting from the mechanism
// - timer-based trigger (periodic)
// - call from control socket (manual)
// - mixed modes?
// - support a `zrepl snapshot JOBNAME` subcommand for config.SnapshottingManual
type PeriodicOrManual struct {
s *Snapper
}
func (s *PeriodicOrManual) Run(ctx context.Context, wakeUpCommon chan<- struct{}) {
if s.s != nil {
s.s.Run(ctx, wakeUpCommon)
}
}
// Returns nil if manual
func (s *PeriodicOrManual) Report() *Report {
if s.s != nil {
return s.s.Report()
}
return nil
}
func FromConfig(g *config.Global, fsf zfs.DatasetFilter, in config.SnapshottingEnum) (*PeriodicOrManual, error) {
switch v := in.Ret.(type) {
case *config.SnapshottingPeriodic:
snapper, err := PeriodicFromConfig(g, fsf, v)
if err != nil {
return nil, err
}
return &PeriodicOrManual{snapper}, nil
case *config.SnapshottingManual:
return &PeriodicOrManual{}, nil
default:
return nil, fmt.Errorf("unknown snapshotting type %T", v)
}
}

View File

@ -1,118 +0,0 @@
package snapper
import (
"fmt"
"sort"
"strings"
"time"
"github.com/zrepl/zrepl/daemon/hooks"
)
type Report struct {
State State
// valid in state SyncUp and Waiting
SleepUntil time.Time
// valid in state Err
Error string
// valid in state Snapshotting
Progress []*ReportFilesystem
}
type ReportFilesystem struct {
Path string
State SnapState
// Valid in SnapStarted and later
SnapName string
StartAt time.Time
Hooks string
HooksHadError bool
// Valid in SnapDone | SnapError
DoneAt time.Time
}
func errOrEmptyString(e error) string {
if e != nil {
return e.Error()
}
return ""
}
func (s *Snapper) Report() *Report {
s.mtx.Lock()
defer s.mtx.Unlock()
pReps := make([]*ReportFilesystem, 0, len(s.plan))
for fs, p := range s.plan {
var hooksStr string
var hooksHadError bool
if p.hookPlan != nil {
hr := p.hookPlan.Report()
// FIXME: technically this belongs into client
// but we can't serialize hooks.Step ATM
rightPad := func(str string, length int, pad string) string {
if len(str) > length {
return str[:length]
}
return str + strings.Repeat(pad, length-len(str))
}
hooksHadError = hr.HadError()
rows := make([][]string, len(hr))
const numCols = 4
lens := make([]int, numCols)
for i, e := range hr {
rows[i] = make([]string, numCols)
rows[i][0] = fmt.Sprintf("%d", i+1)
rows[i][1] = e.Status.String()
runTime := "..."
if e.Status != hooks.StepPending {
runTime = e.End.Sub(e.Begin).Round(time.Millisecond).String()
}
rows[i][2] = runTime
rows[i][3] = ""
if e.Report != nil {
rows[i][3] = e.Report.String()
}
for j, col := range lens {
if len(rows[i][j]) > col {
lens[j] = len(rows[i][j])
}
}
}
rowsFlat := make([]string, len(hr))
for i, r := range rows {
colsPadded := make([]string, len(r))
for j, c := range r[:len(r)-1] {
colsPadded[j] = rightPad(c, lens[j], " ")
}
colsPadded[len(r)-1] = r[len(r)-1]
rowsFlat[i] = strings.Join(colsPadded, " ")
}
hooksStr = strings.Join(rowsFlat, "\n")
}
pReps = append(pReps, &ReportFilesystem{
Path: fs.ToString(),
State: p.state,
SnapName: p.name,
StartAt: p.startAt,
DoneAt: p.doneAt,
Hooks: hooksStr,
HooksHadError: hooksHadError,
})
}
sort.Slice(pReps, func(i, j int) bool {
return strings.Compare(pReps[i].Path, pReps[j].Path) == -1
})
r := &Report{
State: s.state,
SleepUntil: s.sleepUntil,
Error: errOrEmptyString(s.err),
Progress: pReps,
}
return r
}

21
daemon/snapper/util.go Normal file
View File

@ -0,0 +1,21 @@
package snapper
import (
"context"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/logger"
)
type Logger = logger.Logger
func getLogger(ctx context.Context) Logger {
return logging.GetLogger(ctx, logging.SubsysSnapshot)
}
func errOrEmptyString(e error) string {
if e != nil {
return e.Error()
}
return ""
}

View File

@ -21,6 +21,7 @@ Developers should consult the git commit log or GitHub issue tracker.
* `Feature Wishlist on GitHub <https://github.com/zrepl/zrepl/discussions/547>`_
* |feature| :ref:`Schedule-based snapshotting<job-snapshotting--cron>` using ``cron`` syntax instead of an interval.
* |feature| Add ``ZREPL_DESTROY_MAX_BATCH_SIZE`` env var (default 0=unlimited).
* |bugfix| Fix resuming from interrupted replications that use ``send.raw`` on unencrypted datasets.

View File

@ -5,53 +5,117 @@
Taking Snaphots
===============
The ``push``, ``source`` and ``snap`` jobs can automatically take periodic snapshots of the filesystems matched by the ``filesystems`` filter field.
The snapshot names are composed of a user-defined prefix followed by a UTC date formatted like ``20060102_150405_000``.
We use UTC because it will avoid name conflicts when switching time zones or between summer and winter time.
You can configure zrepl to take snapshots of the filesystems in the ``filesystems`` field specified in ``push``, ``source`` and ``snap`` jobs.
When a job is started, the snapshotter attempts to get the snapshotting rhythms of the matched ``filesystems`` in sync because snapshotting all filesystems at the same time results in a more consistent backup.
To find that sync point, the most recent snapshot, made by the snapshotter, in any of the matched ``filesystems`` is used.
A filesystem that does not have snapshots by the snapshotter has lower priority than filesystem that do, and thus might not be snapshotted (and replicated) until it is snapshotted at the next sync point.
The following snapshotting types are supported:
For ``push`` jobs, replication is automatically triggered after all filesystems have been snapshotted.
Note that the ``zrepl signal wakeup JOB`` subcommand does not trigger snapshotting.
.. list-table::
:widths: 20 70
:header-rows: 1
* - ``snapshotting.type``
- Comment
* - ``periodic``
- Ensure that snapshots are taken at a particular interval.
* - ``cron``
- Use cron spec to take snapshots at particular points in time.
* - ``manual``
- zrepl does not take any snapshots by itself.
The ``periodic`` and ``cron`` snapshotting types share some common options and behavior:
* **Naming:** The snapshot names are composed of a user-defined ``prefix`` followed by a UTC date formatted like ``20060102_150405_000``.
We use UTC because it will avoid name conflicts when switching time zones or between summer and winter time.
* **Hooks:** You can configure hooks to run before or after zrepl takes the snapshots. See :ref:`below <job-snapshotting-hooks>` for details.
* **Push replication:** After creating all snapshots, the snapshotter will wake up the replication part of the job, if it's a ``push`` job.
Note that snapshotting is decoupled from replication, i.e., if it is down or takes too long, snapshots will still be taken.
Note further that other jobs are not woken up by snapshotting.
.. NOTE::
There is **no concept of ownership** of the snapshots that are created by ``periodic`` or ``cron``.
Thus, there is no distinction between zrepl-created snapshots and user-created snapshots during replication or pruning.
In particular, pruning will take all snapshots into consideration by default.
To constrain pruning to just zrepl-created snapshots:
1. Assign a unique `prefix` to the snapshotter and
2. Use the ``regex`` functionality of the various pruning ``keep`` rules to just consider snapshots with that prefix.
There is currently no way to constrain replication to just zrepl-created snapshots.
Follow and comment at :issue:`403` if you need this functionality.
.. NOTE::
The ``zrepl signal wakeup JOB`` subcommand does not trigger snapshotting.
``periodic`` Snapshotting
-------------------------
::
jobs:
- type: push
filesystems: {
"<": true,
"tmp": false
}
snapshotting:
type: periodic
prefix: zrepl_
interval: 10m
hooks: ...
...
jobs:
- ...
filesystems: { ... }
snapshotting:
type: periodic
prefix: zrepl_
interval: 10m
hooks: ...
pruning: ...
There is also a ``manual`` snapshotting type, which covers the following use cases:
The ``periodic`` snapshotter ensures that snapshots are taken in the specified ``interval``.
If you use zrepl for backup, this translates into your recovery point objective (RPO).
To meet your RPO, you still need to monitor that replication, which happens asynchronously to snapshotting, actually works.
* Existing infrastructure for automatic snapshots: you only want to use this zrepl job for replication.
* Handling snapshotting through a separate ``snap`` job.
It is desirable to get all ``filesystems`` snapshotted simultaneously because it results in a more consistent backup.
To accomplish this while still maintaining the ``interval``, the ``periodic`` snapshotter attempts to get the snapshotting rhythms in sync.
To find that sync point, the most recent snapshot, created by the snapshotter, in any of the matched ``filesystems`` is used.
A filesystem that does not have snapshots by the snapshotter has lower priority than filesystem that do, and thus might not be snapshotted (and replicated) until it is snapshotted at the next sync point.
The snapshotter uses the ``prefix`` to identify which snapshots it created.
Note that you will have to trigger replication manually using the ``zrepl signal wakeup JOB`` subcommand in that case.
.. _job-snapshotting--cron:
``cron`` Snapshotting
---------------------
::
jobs:
- type: snap
filesystems: { ... }
snapshotting:
type: cron
prefix: zrepl_
# (second, optional) minute hour day-of-month month day-of-week
# This example takes snapshots daily at 3:00.
cron: "0 3 * * *"
pruning: ...
In ``cron`` mode, the snapshotter takes snaphots at fixed points in time.
See https://en.wikipedia.org/wiki/Cron for details on the syntax.
zrepl uses the ``the github.com/robfig/cron/v3`` Go package for parsing.
An optional field for "seconds" is supported to take snapshots at sub-minute frequencies.
``manual`` Snapshotting
-----------------------
::
jobs:
- type: push
filesystems: {
"<": true,
"tmp": false
}
snapshotting:
type: manual
...
In ``manual`` mode, zrepl does not take snapshots by itself.
Manual snapshotting is most useful if you have existing infrastructure for snapshot management.
Or, if you want to decouple snapshot management from replication using a zrepl ``snap`` job.
See :ref:`this quickstart guide <quickstart-backup-to-external-disk>` for an example.
To trigger replication after taking snapshots, use the ``zrepl signal wakeup JOB`` command.
.. _job-snapshotting-hooks:
Pre- and Post-Snapshot Hooks

1
go.mod
View File

@ -29,6 +29,7 @@ require (
github.com/problame/go-netssh v0.0.0-20200601114649-26439f9f0dc5
github.com/prometheus/client_golang v1.2.1
github.com/prometheus/common v0.7.0
github.com/robfig/cron/v3 v3.0.1
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44 // indirect; go1.12 thinks it needs this
github.com/spf13/cobra v0.0.2
github.com/spf13/pflag v1.0.5

2
go.sum
View File

@ -161,6 +161,8 @@ github.com/prometheus/procfs v0.0.5/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDa
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44 h1:tB9NOR21++IjLyVx3/PCPhWMwqGNCMQEH96A6dMZ/gc=
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=