Implement periodic snapshotting.

This commit is contained in:
Christian Schwarz 2018-09-04 14:46:02 -07:00
parent 754b253043
commit e161347e47
6 changed files with 471 additions and 5 deletions

View File

@ -1,6 +1,9 @@
package filters package filters
import "github.com/zrepl/zrepl/zfs" import (
"github.com/zrepl/zrepl/zfs"
"strings"
)
type AnyFSVFilter struct{} type AnyFSVFilter struct{}
@ -13,3 +16,26 @@ var _ zfs.FilesystemVersionFilter = AnyFSVFilter{}
func (AnyFSVFilter) Filter(t zfs.VersionType, name string) (accept bool, err error) { func (AnyFSVFilter) Filter(t zfs.VersionType, name string) (accept bool, err error) {
return true, nil return true, nil
} }
type PrefixFilter struct {
prefix string
fstype zfs.VersionType
fstypeSet bool // optionals anyone?
}
var _ zfs.FilesystemVersionFilter = &PrefixFilter{}
func NewPrefixFilter(prefix string) *PrefixFilter {
return &PrefixFilter{prefix: prefix}
}
func NewTypedPrefixFilter(prefix string, versionType zfs.VersionType) *PrefixFilter {
return &PrefixFilter{prefix, versionType, true}
}
func (f *PrefixFilter) Filter(t zfs.VersionType, name string) (accept bool, err error) {
fstypeMatches := (!f.fstypeSet || t == f.fstype)
prefixMatches := strings.HasPrefix(name, f.prefix)
return fstypeMatches && prefixMatches, nil
}

View File

@ -11,6 +11,7 @@ import (
"github.com/zrepl/zrepl/replication" "github.com/zrepl/zrepl/replication"
"sync" "sync"
"github.com/zrepl/zrepl/daemon/logging" "github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/daemon/snapper"
) )
type Push struct { type Push struct {
@ -20,6 +21,8 @@ type Push struct {
prunerFactory *pruner.PrunerFactory prunerFactory *pruner.PrunerFactory
snapper *snapper.Snapper
mtx sync.Mutex mtx sync.Mutex
replication *replication.Replication replication *replication.Replication
} }
@ -34,15 +37,21 @@ func PushFromConfig(g *config.Global, in *config.PushJob) (j *Push, err error) {
return nil, errors.Wrap(err, "cannot build client") return nil, errors.Wrap(err, "cannot build client")
} }
if j.fsfilter, err = filters.DatasetMapFilterFromConfig(in.Filesystems); err != nil { fsf, err := filters.DatasetMapFilterFromConfig(in.Filesystems)
if err != nil {
return nil, errors.Wrap(err, "cannnot build filesystem filter") return nil, errors.Wrap(err, "cannnot build filesystem filter")
} }
j.fsfilter = fsf
j.prunerFactory, err = pruner.NewPrunerFactory(in.Pruning) j.prunerFactory, err = pruner.NewPrunerFactory(in.Pruning)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if j.snapper, err = snapper.FromConfig(g, fsf, &in.Snapshotting); err != nil {
return nil, errors.Wrap(err, "cannot build snapper")
}
return j, nil return j, nil
} }
@ -68,6 +77,14 @@ func (j *Push) Run(ctx context.Context) {
defer log.Info("job exiting") defer log.Info("job exiting")
snapshotsTaken := make(chan struct{})
{
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ctx = logging.WithSubsystemLoggers(ctx, log)
go j.snapper.Run(ctx, snapshotsTaken)
}
invocationCount := 0 invocationCount := 0
outer: outer:
for { for {
@ -76,11 +93,13 @@ outer:
case <-ctx.Done(): case <-ctx.Done():
log.WithError(ctx.Err()).Info("context") log.WithError(ctx.Err()).Info("context")
break outer break outer
case <-WaitWakeup(ctx): case <-WaitWakeup(ctx):
invocationCount++ case <-snapshotsTaken:
invLog := log.WithField("invocation", invocationCount)
j.do(WithLogger(ctx, invLog))
} }
invocationCount++
invLog := log.WithField("invocation", invocationCount)
j.do(WithLogger(ctx, invLog))
} }
} }

View File

@ -14,6 +14,7 @@ import (
"github.com/zrepl/zrepl/replication" "github.com/zrepl/zrepl/replication"
"github.com/zrepl/zrepl/tlsconf" "github.com/zrepl/zrepl/tlsconf"
"os" "os"
"github.com/zrepl/zrepl/daemon/snapper"
) )
func OutletsFromConfig(in config.LoggingOutletEnumList) (*logger.Outlets, error) { func OutletsFromConfig(in config.LoggingOutletEnumList) (*logger.Outlets, error) {
@ -69,6 +70,7 @@ func WithSubsystemLoggers(ctx context.Context, log logger.Logger) context.Contex
ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{log.WithField(SubsysField, "rpc")}) ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{log.WithField(SubsysField, "rpc")})
ctx = endpoint.WithLogger(ctx, log.WithField(SubsysField, "endpoint")) ctx = endpoint.WithLogger(ctx, log.WithField(SubsysField, "endpoint"))
ctx = pruner.WithLogger(ctx, log.WithField(SubsysField, "pruning")) ctx = pruner.WithLogger(ctx, log.WithField(SubsysField, "pruning"))
ctx = snapper.WithLogger(ctx, log.WithField(SubsysField, "snapshot"))
return ctx return ctx
} }

358
daemon/snapper/snapper.go Normal file
View File

@ -0,0 +1,358 @@
package snapper
import (
"github.com/zrepl/zrepl/config"
"github.com/pkg/errors"
"time"
"context"
"github.com/zrepl/zrepl/daemon/filters"
"fmt"
"github.com/zrepl/zrepl/zfs"
"sort"
"github.com/zrepl/zrepl/logger"
"sync"
)
//go:generate stringer -type=SnapState
type SnapState uint
const (
SnapPending SnapState = 1 << iota
SnapStarted
SnapDone
SnapError
)
type snapProgress struct {
state SnapState
// SnapStarted, SnapDone, SnapError
name string
startAt time.Time
// SnapDone
doneAt time.Time
// SnapErr
err error
}
type args struct {
ctx context.Context
log Logger
prefix string
interval time.Duration
fsf *filters.DatasetMapFilter
snapshotsTaken chan<-struct{}
}
type Snapper struct {
args args
mtx sync.Mutex
state State
// set in state Plan, used in Waiting
lastInvocation time.Time
// valid for state Snapshotting
plan map[*zfs.DatasetPath]snapProgress
// valid for state SyncUp and Waiting
sleepUntil time.Time
// valid for state Err
err error
}
//go:generate stringer -type=State
type State uint
const (
SyncUp State = 1<<iota
Planning
Snapshotting
Waiting
Error
)
func (s State) sf() state {
m := map[State]state{
SyncUp: syncUp,
Planning: plan,
Snapshotting: snapshot,
Waiting: wait,
Error: nil,
}
return m[s]
}
type updater func(u func(*Snapper)) State
type state func(a args, u updater) state
type contextKey int
const (
contextKeyLog contextKey = 0
)
type Logger = logger.Logger
func WithLogger(ctx context.Context, log Logger) context.Context {
return context.WithValue(ctx, contextKeyLog, log)
}
func getLogger(ctx context.Context) Logger {
if log, ok := ctx.Value(contextKeyLog).(Logger); ok {
return log
}
return logger.NewNullLogger()
}
func FromConfig(g *config.Global, fsf *filters.DatasetMapFilter, in *config.Snapshotting) (*Snapper, error) {
if in.SnapshotPrefix == "" {
return nil, errors.New("prefix must not be empty")
}
if in.Interval <= 0 {
return nil, errors.New("interval must be positive")
}
args := args{
prefix: in.SnapshotPrefix,
interval: in.Interval,
fsf: fsf,
// ctx and log is set in Run()
}
return &Snapper{state: SyncUp, args: args}, nil
}
func (s *Snapper) Run(ctx context.Context, snapshotsTaken chan<- struct{}) {
getLogger(ctx).Debug("start")
defer getLogger(ctx).Debug("stop")
s.args.snapshotsTaken = snapshotsTaken
s.args.ctx = ctx
s.args.log = getLogger(ctx)
u := func(u func(*Snapper)) State {
s.mtx.Lock()
defer s.mtx.Unlock()
if u != nil {
u(s)
}
return s.state
}
var st state = syncUp
for st != nil {
pre := u(nil)
st = st(s.args, u)
post := u(nil)
getLogger(ctx).
WithField("transition", fmt.Sprintf("%s=>%s", pre, post)).
Debug("state transition")
}
}
func onErr(err error, u updater) state {
return u(func(s *Snapper) {
s.err = err
s.state = Error
}).sf()
}
func syncUp(a args, u updater) state {
fss, err := listFSes(a.fsf)
if err != nil {
return onErr(err, u)
}
syncPoint, err := findSyncPoint(a.log, fss, a.prefix, a.interval)
if err != nil {
return onErr(err, u)
}
u(func(s *Snapper){
s.sleepUntil = syncPoint
})
t := time.NewTimer(syncPoint.Sub(time.Now()))
defer t.Stop()
select {
case <-t.C:
return u(func(s *Snapper) {
s.state = Planning
}).sf()
case <-a.ctx.Done():
return onErr(err, u)
}
}
func plan(a args, u updater) state {
u(func(snapper *Snapper) {
snapper.lastInvocation = time.Now()
})
fss, err := listFSes(a.fsf)
if err != nil {
return onErr(err, u)
}
plan := make(map[*zfs.DatasetPath]snapProgress, len(fss))
for _, fs := range fss {
plan[fs] = snapProgress{state: SnapPending}
}
return u(func(s *Snapper) {
s.state = Snapshotting
s.plan = plan
}).sf()
}
func snapshot(a args, u updater) state {
var plan map[*zfs.DatasetPath]snapProgress
u(func(snapper *Snapper) {
plan = snapper.plan
})
hadErr := false
// TODO channel programs -> allow a little jitter?
for fs, progress := range plan {
suffix := time.Now().In(time.UTC).Format("20060102_150405_000")
snapname := fmt.Sprintf("%s%s", a.prefix, suffix)
l := a.log.
WithField("fs", fs.ToString()).
WithField("snap", snapname)
u(func(snapper *Snapper) {
progress.name = snapname
progress.startAt = time.Now()
progress.state = SnapStarted
})
l.Debug("create snapshot")
err := zfs.ZFSSnapshot(fs, snapname, false)
if err != nil {
hadErr = true
l.WithError(err).Error("cannot create snapshot")
}
doneAt := time.Now()
u(func(snapper *Snapper) {
progress.doneAt = doneAt
progress.state = SnapDone
if err != nil {
progress.state = SnapError
progress.err = err
}
})
}
select {
case a.snapshotsTaken <- struct{}{}:
default:
a.log.Warn("callback channel is full, discarding snapshot update event")
}
return u(func(snapper *Snapper) {
if hadErr {
snapper.state = Error
snapper.err = errors.New("one or more snapshots could not be created")
} else {
snapper.state = Waiting
}
}).sf()
}
func wait(a args, u updater) state {
var sleepUntil time.Time
u(func(snapper *Snapper) {
lastTick := snapper.lastInvocation
snapper.sleepUntil = lastTick.Add(a.interval)
sleepUntil = snapper.sleepUntil
})
t := time.NewTimer(sleepUntil.Sub(time.Now()))
defer t.Stop()
select {
case <-t.C:
return u(func(snapper *Snapper) {
snapper.state = Planning
}).sf()
case <-a.ctx.Done():
return onErr(a.ctx.Err(), u)
}
}
func listFSes(mf *filters.DatasetMapFilter) (fss []*zfs.DatasetPath, err error) {
return zfs.ZFSListMapping(mf)
}
func findSyncPoint(log Logger, fss []*zfs.DatasetPath, prefix string, interval time.Duration) (syncPoint time.Time, err error) {
type snapTime struct {
ds *zfs.DatasetPath
time time.Time
}
if len(fss) == 0 {
return time.Now(), nil
}
snaptimes := make([]snapTime, 0, len(fss))
now := time.Now()
log.Debug("examine filesystem state")
for _, d := range fss {
l := log.WithField("fs", d.ToString())
fsvs, err := zfs.ZFSListFilesystemVersions(d, filters.NewTypedPrefixFilter(prefix, zfs.Snapshot))
if err != nil {
l.WithError(err).Error("cannot list filesystem versions")
continue
}
if len(fsvs) <= 0 {
l.WithField("prefix", prefix).Debug("no filesystem versions with prefix")
continue
}
// Sort versions by creation
sort.SliceStable(fsvs, func(i, j int) bool {
return fsvs[i].CreateTXG < fsvs[j].CreateTXG
})
latest := fsvs[len(fsvs)-1]
l.WithField("creation", latest.Creation).
Debug("found latest snapshot")
since := now.Sub(latest.Creation)
if since < 0 {
l.WithField("snapshot", latest.Name).
WithField("creation", latest.Creation).
Error("snapshot is from the future")
continue
}
next := now
if since < interval {
next = latest.Creation.Add(interval)
}
snaptimes = append(snaptimes, snapTime{d, next})
}
if len(snaptimes) == 0 {
snaptimes = append(snaptimes, snapTime{nil, now})
}
sort.Slice(snaptimes, func(i, j int) bool {
return snaptimes[i].time.Before(snaptimes[j].time)
})
return snaptimes[0].time, nil
}

View File

@ -0,0 +1,29 @@
// Code generated by "stringer -type=SnapState"; DO NOT EDIT.
package snapper
import "strconv"
const (
_SnapState_name_0 = "SnapPendingSnapStarted"
_SnapState_name_1 = "SnapDone"
_SnapState_name_2 = "SnapError"
)
var (
_SnapState_index_0 = [...]uint8{0, 11, 22}
)
func (i SnapState) String() string {
switch {
case 1 <= i && i <= 2:
i -= 1
return _SnapState_name_0[_SnapState_index_0[i]:_SnapState_index_0[i+1]]
case i == 4:
return _SnapState_name_1
case i == 8:
return _SnapState_name_2
default:
return "SnapState(" + strconv.FormatInt(int64(i), 10) + ")"
}
}

View File

@ -0,0 +1,32 @@
// Code generated by "stringer -type=State"; DO NOT EDIT.
package snapper
import "strconv"
const (
_State_name_0 = "SyncUpPlanning"
_State_name_1 = "Snapshotting"
_State_name_2 = "Waiting"
_State_name_3 = "Error"
)
var (
_State_index_0 = [...]uint8{0, 6, 14}
)
func (i State) String() string {
switch {
case 1 <= i && i <= 2:
i -= 1
return _State_name_0[_State_index_0[i]:_State_index_0[i+1]]
case i == 4:
return _State_name_1
case i == 8:
return _State_name_2
case i == 16:
return _State_name_3
default:
return "State(" + strconv.FormatInt(int64(i), 10) + ")"
}
}