diff --git a/daemon/filters/fsvfilter.go b/daemon/filters/fsvfilter.go index 0ec6225..3abcca8 100644 --- a/daemon/filters/fsvfilter.go +++ b/daemon/filters/fsvfilter.go @@ -1,6 +1,9 @@ package filters -import "github.com/zrepl/zrepl/zfs" +import ( + "github.com/zrepl/zrepl/zfs" + "strings" +) type AnyFSVFilter struct{} @@ -13,3 +16,26 @@ var _ zfs.FilesystemVersionFilter = AnyFSVFilter{} func (AnyFSVFilter) Filter(t zfs.VersionType, name string) (accept bool, err error) { return true, nil } + + +type PrefixFilter struct { + prefix string + fstype zfs.VersionType + fstypeSet bool // optionals anyone? +} + +var _ zfs.FilesystemVersionFilter = &PrefixFilter{} + +func NewPrefixFilter(prefix string) *PrefixFilter { + return &PrefixFilter{prefix: prefix} +} + +func NewTypedPrefixFilter(prefix string, versionType zfs.VersionType) *PrefixFilter { + return &PrefixFilter{prefix, versionType, true} +} + +func (f *PrefixFilter) Filter(t zfs.VersionType, name string) (accept bool, err error) { + fstypeMatches := (!f.fstypeSet || t == f.fstype) + prefixMatches := strings.HasPrefix(name, f.prefix) + return fstypeMatches && prefixMatches, nil +} diff --git a/daemon/job/push.go b/daemon/job/push.go index fa7fd45..ee9d00d 100644 --- a/daemon/job/push.go +++ b/daemon/job/push.go @@ -11,6 +11,7 @@ import ( "github.com/zrepl/zrepl/replication" "sync" "github.com/zrepl/zrepl/daemon/logging" + "github.com/zrepl/zrepl/daemon/snapper" ) type Push struct { @@ -20,6 +21,8 @@ type Push struct { prunerFactory *pruner.PrunerFactory + snapper *snapper.Snapper + mtx sync.Mutex replication *replication.Replication } @@ -34,15 +37,21 @@ func PushFromConfig(g *config.Global, in *config.PushJob) (j *Push, err error) { return nil, errors.Wrap(err, "cannot build client") } - if j.fsfilter, err = filters.DatasetMapFilterFromConfig(in.Filesystems); err != nil { + fsf, err := filters.DatasetMapFilterFromConfig(in.Filesystems) + if err != nil { return nil, errors.Wrap(err, "cannnot build filesystem filter") } + j.fsfilter = fsf j.prunerFactory, err = pruner.NewPrunerFactory(in.Pruning) if err != nil { return nil, err } + if j.snapper, err = snapper.FromConfig(g, fsf, &in.Snapshotting); err != nil { + return nil, errors.Wrap(err, "cannot build snapper") + } + return j, nil } @@ -68,6 +77,14 @@ func (j *Push) Run(ctx context.Context) { defer log.Info("job exiting") + snapshotsTaken := make(chan struct{}) + { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + ctx = logging.WithSubsystemLoggers(ctx, log) + go j.snapper.Run(ctx, snapshotsTaken) + } + invocationCount := 0 outer: for { @@ -76,11 +93,13 @@ outer: case <-ctx.Done(): log.WithError(ctx.Err()).Info("context") break outer + case <-WaitWakeup(ctx): - invocationCount++ - invLog := log.WithField("invocation", invocationCount) - j.do(WithLogger(ctx, invLog)) + case <-snapshotsTaken: } + invocationCount++ + invLog := log.WithField("invocation", invocationCount) + j.do(WithLogger(ctx, invLog)) } } diff --git a/daemon/logging/build_logging.go b/daemon/logging/build_logging.go index fbaf9c2..56099bb 100644 --- a/daemon/logging/build_logging.go +++ b/daemon/logging/build_logging.go @@ -14,6 +14,7 @@ import ( "github.com/zrepl/zrepl/replication" "github.com/zrepl/zrepl/tlsconf" "os" + "github.com/zrepl/zrepl/daemon/snapper" ) func OutletsFromConfig(in config.LoggingOutletEnumList) (*logger.Outlets, error) { @@ -69,6 +70,7 @@ func WithSubsystemLoggers(ctx context.Context, log logger.Logger) context.Contex ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{log.WithField(SubsysField, "rpc")}) ctx = endpoint.WithLogger(ctx, log.WithField(SubsysField, "endpoint")) ctx = pruner.WithLogger(ctx, log.WithField(SubsysField, "pruning")) + ctx = snapper.WithLogger(ctx, log.WithField(SubsysField, "snapshot")) return ctx } diff --git a/daemon/snapper/snapper.go b/daemon/snapper/snapper.go new file mode 100644 index 0000000..2071389 --- /dev/null +++ b/daemon/snapper/snapper.go @@ -0,0 +1,358 @@ +package snapper + +import ( + "github.com/zrepl/zrepl/config" + "github.com/pkg/errors" + "time" + "context" + "github.com/zrepl/zrepl/daemon/filters" + "fmt" + "github.com/zrepl/zrepl/zfs" + "sort" + "github.com/zrepl/zrepl/logger" + "sync" +) + + +//go:generate stringer -type=SnapState +type SnapState uint + +const ( + SnapPending SnapState = 1 << iota + SnapStarted + SnapDone + SnapError +) + +type snapProgress struct { + state SnapState + + // SnapStarted, SnapDone, SnapError + name string + startAt time.Time + + // SnapDone + doneAt time.Time + + // SnapErr + err error +} + +type args struct { + ctx context.Context + log Logger + prefix string + interval time.Duration + fsf *filters.DatasetMapFilter + snapshotsTaken chan<-struct{} +} + +type Snapper struct { + args args + + mtx sync.Mutex + state State + + // set in state Plan, used in Waiting + lastInvocation time.Time + + // valid for state Snapshotting + plan map[*zfs.DatasetPath]snapProgress + + // valid for state SyncUp and Waiting + sleepUntil time.Time + + // valid for state Err + err error +} + +//go:generate stringer -type=State +type State uint + +const ( + SyncUp State = 1<%s", pre, post)). + Debug("state transition") + + } + +} + +func onErr(err error, u updater) state { + return u(func(s *Snapper) { + s.err = err + s.state = Error + }).sf() +} + +func syncUp(a args, u updater) state { + fss, err := listFSes(a.fsf) + if err != nil { + return onErr(err, u) + } + syncPoint, err := findSyncPoint(a.log, fss, a.prefix, a.interval) + if err != nil { + return onErr(err, u) + } + u(func(s *Snapper){ + s.sleepUntil = syncPoint + }) + t := time.NewTimer(syncPoint.Sub(time.Now())) + defer t.Stop() + select { + case <-t.C: + return u(func(s *Snapper) { + s.state = Planning + }).sf() + case <-a.ctx.Done(): + return onErr(err, u) + } +} + +func plan(a args, u updater) state { + u(func(snapper *Snapper) { + snapper.lastInvocation = time.Now() + }) + fss, err := listFSes(a.fsf) + if err != nil { + return onErr(err, u) + } + + plan := make(map[*zfs.DatasetPath]snapProgress, len(fss)) + for _, fs := range fss { + plan[fs] = snapProgress{state: SnapPending} + } + return u(func(s *Snapper) { + s.state = Snapshotting + s.plan = plan + }).sf() +} + +func snapshot(a args, u updater) state { + + var plan map[*zfs.DatasetPath]snapProgress + u(func(snapper *Snapper) { + plan = snapper.plan + }) + + hadErr := false + // TODO channel programs -> allow a little jitter? + for fs, progress := range plan { + suffix := time.Now().In(time.UTC).Format("20060102_150405_000") + snapname := fmt.Sprintf("%s%s", a.prefix, suffix) + + l := a.log. + WithField("fs", fs.ToString()). + WithField("snap", snapname) + + u(func(snapper *Snapper) { + progress.name = snapname + progress.startAt = time.Now() + progress.state = SnapStarted + }) + + l.Debug("create snapshot") + err := zfs.ZFSSnapshot(fs, snapname, false) + if err != nil { + hadErr = true + l.WithError(err).Error("cannot create snapshot") + } + doneAt := time.Now() + + u(func(snapper *Snapper) { + progress.doneAt = doneAt + progress.state = SnapDone + if err != nil { + progress.state = SnapError + progress.err = err + } + }) + } + + select { + case a.snapshotsTaken <- struct{}{}: + default: + a.log.Warn("callback channel is full, discarding snapshot update event") + } + + return u(func(snapper *Snapper) { + if hadErr { + snapper.state = Error + snapper.err = errors.New("one or more snapshots could not be created") + } else { + snapper.state = Waiting + } + }).sf() +} + +func wait(a args, u updater) state { + var sleepUntil time.Time + u(func(snapper *Snapper) { + lastTick := snapper.lastInvocation + snapper.sleepUntil = lastTick.Add(a.interval) + sleepUntil = snapper.sleepUntil + }) + + t := time.NewTimer(sleepUntil.Sub(time.Now())) + defer t.Stop() + + select { + case <-t.C: + return u(func(snapper *Snapper) { + snapper.state = Planning + }).sf() + case <-a.ctx.Done(): + return onErr(a.ctx.Err(), u) + } +} + +func listFSes(mf *filters.DatasetMapFilter) (fss []*zfs.DatasetPath, err error) { + return zfs.ZFSListMapping(mf) +} + +func findSyncPoint(log Logger, fss []*zfs.DatasetPath, prefix string, interval time.Duration) (syncPoint time.Time, err error) { + type snapTime struct { + ds *zfs.DatasetPath + time time.Time + } + + if len(fss) == 0 { + return time.Now(), nil + } + + snaptimes := make([]snapTime, 0, len(fss)) + + now := time.Now() + + log.Debug("examine filesystem state") + for _, d := range fss { + + l := log.WithField("fs", d.ToString()) + + fsvs, err := zfs.ZFSListFilesystemVersions(d, filters.NewTypedPrefixFilter(prefix, zfs.Snapshot)) + if err != nil { + l.WithError(err).Error("cannot list filesystem versions") + continue + } + if len(fsvs) <= 0 { + l.WithField("prefix", prefix).Debug("no filesystem versions with prefix") + continue + } + + // Sort versions by creation + sort.SliceStable(fsvs, func(i, j int) bool { + return fsvs[i].CreateTXG < fsvs[j].CreateTXG + }) + + latest := fsvs[len(fsvs)-1] + l.WithField("creation", latest.Creation). + Debug("found latest snapshot") + + since := now.Sub(latest.Creation) + if since < 0 { + l.WithField("snapshot", latest.Name). + WithField("creation", latest.Creation). + Error("snapshot is from the future") + continue + } + next := now + if since < interval { + next = latest.Creation.Add(interval) + } + snaptimes = append(snaptimes, snapTime{d, next}) + } + + if len(snaptimes) == 0 { + snaptimes = append(snaptimes, snapTime{nil, now}) + } + + sort.Slice(snaptimes, func(i, j int) bool { + return snaptimes[i].time.Before(snaptimes[j].time) + }) + + return snaptimes[0].time, nil + +} + diff --git a/daemon/snapper/snapstate_string.go b/daemon/snapper/snapstate_string.go new file mode 100644 index 0000000..faa7347 --- /dev/null +++ b/daemon/snapper/snapstate_string.go @@ -0,0 +1,29 @@ +// Code generated by "stringer -type=SnapState"; DO NOT EDIT. + +package snapper + +import "strconv" + +const ( + _SnapState_name_0 = "SnapPendingSnapStarted" + _SnapState_name_1 = "SnapDone" + _SnapState_name_2 = "SnapError" +) + +var ( + _SnapState_index_0 = [...]uint8{0, 11, 22} +) + +func (i SnapState) String() string { + switch { + case 1 <= i && i <= 2: + i -= 1 + return _SnapState_name_0[_SnapState_index_0[i]:_SnapState_index_0[i+1]] + case i == 4: + return _SnapState_name_1 + case i == 8: + return _SnapState_name_2 + default: + return "SnapState(" + strconv.FormatInt(int64(i), 10) + ")" + } +} diff --git a/daemon/snapper/state_string.go b/daemon/snapper/state_string.go new file mode 100644 index 0000000..485e7d9 --- /dev/null +++ b/daemon/snapper/state_string.go @@ -0,0 +1,32 @@ +// Code generated by "stringer -type=State"; DO NOT EDIT. + +package snapper + +import "strconv" + +const ( + _State_name_0 = "SyncUpPlanning" + _State_name_1 = "Snapshotting" + _State_name_2 = "Waiting" + _State_name_3 = "Error" +) + +var ( + _State_index_0 = [...]uint8{0, 6, 14} +) + +func (i State) String() string { + switch { + case 1 <= i && i <= 2: + i -= 1 + return _State_name_0[_State_index_0[i]:_State_index_0[i+1]] + case i == 4: + return _State_name_1 + case i == 8: + return _State_name_2 + case i == 16: + return _State_name_3 + default: + return "State(" + strconv.FormatInt(int64(i), 10) + ")" + } +}