daemon/snapper: refactor sync-up algorithm + warn about FSes awaiting first sync point

refs https://github.com/zrepl/zrepl/issues/256
This commit is contained in:
Christian Schwarz 2020-01-15 19:12:31 +01:00
parent dd508280f0
commit 5b50a66c6c
3 changed files with 88 additions and 34 deletions

View File

@ -13,6 +13,7 @@ import (
"github.com/zrepl/zrepl/daemon/filters"
"github.com/zrepl/zrepl/daemon/hooks"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/util/envconst"
"github.com/zrepl/zrepl/zfs"
)
@ -400,9 +401,19 @@ func listFSes(ctx context.Context, mf *filters.DatasetMapFilter) (fss []*zfs.Dat
return zfs.ZFSListMapping(ctx, mf)
}
var syncUpWarnNoSnapshotUntilSyncupMinDuration = envconst.Duration("ZREPL_SNAPPER_SYNCUP_WARN_MIN_DURATION", 1*time.Second)
// see docs/snapshotting.rst
func findSyncPoint(log Logger, fss []*zfs.DatasetPath, prefix string, interval time.Duration) (syncPoint time.Time, err error) {
const (
prioHasVersions int = iota
prioNoVersions
)
type snapTime struct {
ds *zfs.DatasetPath
prio int // lower is higher
time time.Time
}
@ -411,55 +422,89 @@ func findSyncPoint(log Logger, fss []*zfs.DatasetPath, prefix string, interval t
}
snaptimes := make([]snapTime, 0, len(fss))
hardErrs := 0
now := time.Now()
log.Debug("examine filesystem state")
log.Debug("examine filesystem state to find sync point")
for _, d := range fss {
l := log.WithField("fs", d.ToString())
fsvs, err := zfs.ZFSListFilesystemVersions(d, filters.NewTypedPrefixFilter(prefix, zfs.Snapshot))
if err != nil {
l.WithError(err).Error("cannot list filesystem versions")
continue
}
if len(fsvs) <= 0 {
l.WithField("prefix", prefix).Debug("no filesystem versions with prefix")
continue
syncPoint, err := findSyncPointFSNextOptimalSnapshotTime(l, now, interval, prefix, d)
if err == findSyncPointFSNoFilesystemVersionsErr {
snaptimes = append(snaptimes, snapTime{
ds: d,
prio: prioNoVersions,
time: now,
})
} else if err != nil {
hardErrs++
l.WithError(err).Error("cannot determine optimal sync point for this filesystem")
} else {
l.WithField("syncPoint", syncPoint).Debug("found optimal sync point for this filesystem")
snaptimes = append(snaptimes, snapTime{
ds: d,
prio: prioHasVersions,
time: syncPoint,
})
}
}
// Sort versions by creation
sort.SliceStable(fsvs, func(i, j int) bool {
return fsvs[i].CreateTXG < fsvs[j].CreateTXG
})
latest := fsvs[len(fsvs)-1]
l.WithField("creation", latest.Creation).
Debug("found latest snapshot")
since := now.Sub(latest.Creation)
if since < 0 {
l.WithField("snapshot", latest.Name).
WithField("creation", latest.Creation).
Error("snapshot is from the future")
continue
}
next := now
if since < interval {
next = latest.Creation.Add(interval)
}
snaptimes = append(snaptimes, snapTime{d, next})
if hardErrs == len(fss) {
return time.Time{}, fmt.Errorf("hard errors in determining sync point for every matching filesystem")
}
if len(snaptimes) == 0 {
snaptimes = append(snaptimes, snapTime{nil, now})
panic("implementation error: loop must either inc hardErrs or add result to snaptimes")
}
// sort ascending by (prio,time)
// => those filesystems with versions win over those without any
sort.Slice(snaptimes, func(i, j int) bool {
return snaptimes[i].time.Before(snaptimes[j].time)
if snaptimes[i].prio == snaptimes[j].prio {
return snaptimes[i].time.Before(snaptimes[j].time)
}
return snaptimes[i].prio < snaptimes[j].prio
})
winnerSyncPoint := snaptimes[0].time
l := log.WithField("syncPoint", winnerSyncPoint.String())
l.Info("determined sync point")
if winnerSyncPoint.Sub(now) > syncUpWarnNoSnapshotUntilSyncupMinDuration {
for _, st := range snaptimes {
if st.prio == prioNoVersions {
l.WithField("fs", st.ds.ToString()).Warn("filesystem will not be snapshotted until sync point")
}
}
}
return snaptimes[0].time, nil
}
var findSyncPointFSNoFilesystemVersionsErr = fmt.Errorf("no filesystem versions")
func findSyncPointFSNextOptimalSnapshotTime(l Logger, now time.Time, interval time.Duration, prefix string, d *zfs.DatasetPath) (time.Time, error) {
fsvs, err := zfs.ZFSListFilesystemVersions(d, filters.NewTypedPrefixFilter(prefix, zfs.Snapshot))
if err != nil {
return time.Time{}, errors.Wrap(err, "list filesystem versions")
}
if len(fsvs) <= 0 {
return time.Time{}, findSyncPointFSNoFilesystemVersionsErr
}
// Sort versions by creation
sort.SliceStable(fsvs, func(i, j int) bool {
return fsvs[i].CreateTXG < fsvs[j].CreateTXG
})
latest := fsvs[len(fsvs)-1]
l.WithField("creation", latest.Creation).Debug("found latest snapshot")
since := now.Sub(latest.Creation)
if since < 0 {
return time.Time{}, fmt.Errorf("snapshot %q is from the future: creation=%q now=%q", latest.ToAbsPath(d), latest.Creation, now)
}
return latest.Creation.Add(interval), nil
}

View File

@ -31,6 +31,8 @@ We use the following annotations for classifying changes:
------------------
* |feature| New option ``listen_freebind`` (tcp, tls, prometheus listener)
* |bugfix| |docs| snapshotting: clarify sync-up behavior and warn about filesystems
that will not be snapshotted until the sync-up phase is over
0.2.1
-----

View File

@ -9,8 +9,15 @@ The ``push``, ``source`` and ``snap`` jobs can automatically take periodic snaps
The snapshot names are composed of a user-defined prefix followed by a UTC date formatted like ``20060102_150405_000``.
We use UTC because it will avoid name conflicts when switching time zones or between summer and winter time.
When a job is started, the snapshotter attempts to get the snapshotting rhythms of the matched ``filesystems`` in sync because snapshotting all filesystems at the same time results in a more consistent backup.
To find that sync point, the most recent snapshot, made by the snapshotter, in any of the matched ``filesystems`` is used.
A filesystem that does not have snapshots by the snapshotter has lower priority than filesystem that do, and thus might not be snapshotted (and replicated) until it is snapshotted at the next sync point.
For ``push`` jobs, replication is automatically triggered after all filesystems have been snapshotted.
Note that the ``zrepl signal wakeup JOB`` subcommand does not trigger snapshotting.
::
jobs: