mirror of
https://github.com/zrepl/zrepl.git
synced 2025-02-16 18:30:54 +01:00
parent
58ee796394
commit
13562b48ed
128
cmd/autosnap.go
128
cmd/autosnap.go
@ -13,40 +13,38 @@ type IntervalAutosnap struct {
|
||||
DatasetFilter zfs.DatasetFilter
|
||||
Prefix string
|
||||
SnapshotInterval time.Duration
|
||||
|
||||
log Logger
|
||||
snaptimes []snapTime
|
||||
}
|
||||
|
||||
type snapTime struct {
|
||||
ds *zfs.DatasetPath
|
||||
time time.Time
|
||||
}
|
||||
|
||||
func (a *IntervalAutosnap) Run(ctx context.Context, didSnaps chan struct{}) {
|
||||
|
||||
a.log = ctx.Value(contextKeyLog).(Logger)
|
||||
|
||||
const LOG_TIME_FMT string = time.ANSIC
|
||||
|
||||
ds, err := zfs.ZFSListMapping(a.DatasetFilter)
|
||||
func (a *IntervalAutosnap) filterFilesystems() (fss []*zfs.DatasetPath, stop bool) {
|
||||
a.task.Enter("filter_filesystems")
|
||||
defer a.task.Finish()
|
||||
fss, err := zfs.ZFSListMapping(a.DatasetFilter)
|
||||
stop = err != nil || len(fss) == 0
|
||||
if err != nil {
|
||||
a.log.WithError(err).Error("cannot list datasets")
|
||||
return
|
||||
a.task.Log().WithError(err).Error("cannot list datasets")
|
||||
}
|
||||
if len(ds) == 0 {
|
||||
a.log.Warn("no filesystem matching filesystem filter")
|
||||
return
|
||||
if len(fss) == 0 {
|
||||
a.task.Log().Warn("no filesystem matching filesystem filter")
|
||||
}
|
||||
return fss, stop
|
||||
}
|
||||
|
||||
func (a *IntervalAutosnap) findSyncPoint(fss []*zfs.DatasetPath) (syncPoint time.Time, err error) {
|
||||
a.task.Enter("find_sync_point")
|
||||
defer a.task.Finish()
|
||||
type snapTime struct {
|
||||
ds *zfs.DatasetPath
|
||||
time time.Time
|
||||
}
|
||||
|
||||
a.snaptimes = make([]snapTime, len(ds))
|
||||
snaptimes := make([]snapTime, 0, len(fss))
|
||||
|
||||
now := time.Now()
|
||||
|
||||
a.log.Debug("examine filesystem state")
|
||||
for i, d := range ds {
|
||||
a.task.Log().Debug("examine filesystem state")
|
||||
for _, d := range fss {
|
||||
|
||||
l := a.log.WithField(logFSField, d.ToString())
|
||||
l := a.task.Log().WithField(logFSField, d.ToString())
|
||||
|
||||
fsvs, err := zfs.ZFSListFilesystemVersions(d, NewTypedPrefixFilter(a.Prefix, zfs.Snapshot))
|
||||
if err != nil {
|
||||
@ -55,7 +53,6 @@ func (a *IntervalAutosnap) Run(ctx context.Context, didSnaps chan struct{}) {
|
||||
}
|
||||
if len(fsvs) <= 0 {
|
||||
l.WithField("prefix", a.Prefix).Info("no filesystem versions with prefix")
|
||||
a.snaptimes[i] = snapTime{d, now}
|
||||
continue
|
||||
}
|
||||
|
||||
@ -79,34 +76,74 @@ func (a *IntervalAutosnap) Run(ctx context.Context, didSnaps chan struct{}) {
|
||||
if since < a.SnapshotInterval {
|
||||
next = latest.Creation.Add(a.SnapshotInterval)
|
||||
}
|
||||
a.snaptimes[i] = snapTime{d, next}
|
||||
snaptimes = append(snaptimes, snapTime{d, next})
|
||||
}
|
||||
|
||||
sort.Slice(a.snaptimes, func(i, j int) bool {
|
||||
return a.snaptimes[i].time.Before(a.snaptimes[j].time)
|
||||
if len(snaptimes) == 0 {
|
||||
snaptimes = append(snaptimes, snapTime{nil, now})
|
||||
}
|
||||
|
||||
sort.Slice(snaptimes, func(i, j int) bool {
|
||||
return snaptimes[i].time.Before(snaptimes[j].time)
|
||||
})
|
||||
|
||||
syncPoint := a.snaptimes[0]
|
||||
a.log.WithField("sync_point", syncPoint.time.Format(LOG_TIME_FMT)).
|
||||
return snaptimes[0].time, nil
|
||||
|
||||
}
|
||||
|
||||
func (a *IntervalAutosnap) waitForSyncPoint(ctx context.Context, syncPoint time.Time) {
|
||||
a.task.Enter("wait_sync_point")
|
||||
defer a.task.Finish()
|
||||
|
||||
const LOG_TIME_FMT string = time.ANSIC
|
||||
|
||||
a.task.Log().WithField("sync_point", syncPoint.Format(LOG_TIME_FMT)).
|
||||
Info("wait for sync point")
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
a.log.WithError(ctx.Err()).Info("context done")
|
||||
a.task.Log().WithError(ctx.Err()).Info("context done")
|
||||
return
|
||||
case <-time.After(syncPoint.Sub(time.Now())):
|
||||
}
|
||||
}
|
||||
|
||||
case <-time.After(syncPoint.time.Sub(now)):
|
||||
a.log.Debug("snapshot all filesystems to enable further snaps in lockstep")
|
||||
a.doSnapshots(didSnaps)
|
||||
func (a *IntervalAutosnap) syncUpRun(ctx context.Context, didSnaps chan struct{}) (stop bool) {
|
||||
a.task.Enter("sync_up")
|
||||
defer a.task.Finish()
|
||||
|
||||
fss, stop := a.filterFilesystems()
|
||||
if stop {
|
||||
return true
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(a.SnapshotInterval)
|
||||
syncPoint, err := a.findSyncPoint(fss)
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
a.waitForSyncPoint(ctx, syncPoint)
|
||||
|
||||
a.task.Log().Debug("snapshot all filesystems to enable further snaps in lockstep")
|
||||
a.doSnapshots(didSnaps)
|
||||
return false
|
||||
}
|
||||
|
||||
func (a *IntervalAutosnap) Run(ctx context.Context, didSnaps chan struct{}) {
|
||||
|
||||
if a.syncUpRun(ctx, didSnaps) {
|
||||
return
|
||||
}
|
||||
|
||||
// task drops back to idle here
|
||||
|
||||
a.task.Log().Debug("setting up ticker in SnapshotInterval")
|
||||
ticker := time.NewTicker(a.SnapshotInterval)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
ticker.Stop()
|
||||
a.log.WithError(ctx.Err()).Info("context done")
|
||||
a.task.Log().WithError(ctx.Err()).Info("context done")
|
||||
return
|
||||
|
||||
case <-ticker.C:
|
||||
@ -118,10 +155,13 @@ func (a *IntervalAutosnap) Run(ctx context.Context, didSnaps chan struct{}) {
|
||||
|
||||
func (a *IntervalAutosnap) doSnapshots(didSnaps chan struct{}) {
|
||||
|
||||
// fetch new dataset list in case user added new dataset
|
||||
ds, err := zfs.ZFSListMapping(a.DatasetFilter)
|
||||
if err != nil {
|
||||
a.log.WithError(err).Error("cannot list datasets")
|
||||
a.task.Enter("do_snapshots")
|
||||
defer a.task.Finish()
|
||||
|
||||
// don't cache the result from previous run in case the user added
|
||||
// a new dataset in the meantime
|
||||
ds, stop := a.filterFilesystems()
|
||||
if stop {
|
||||
return
|
||||
}
|
||||
|
||||
@ -130,19 +170,19 @@ func (a *IntervalAutosnap) doSnapshots(didSnaps chan struct{}) {
|
||||
suffix := time.Now().In(time.UTC).Format("20060102_150405_000")
|
||||
snapname := fmt.Sprintf("%s%s", a.Prefix, suffix)
|
||||
|
||||
l := a.log.WithField(logFSField, d.ToString()).
|
||||
l := a.task.Log().WithField(logFSField, d.ToString()).
|
||||
WithField("snapname", snapname)
|
||||
|
||||
l.Info("create snapshot")
|
||||
err := zfs.ZFSSnapshot(d, snapname, false)
|
||||
if err != nil {
|
||||
a.log.WithError(err).Error("cannot create snapshot")
|
||||
a.task.Log().WithError(err).Error("cannot create snapshot")
|
||||
}
|
||||
|
||||
l.Info("create corresponding bookmark")
|
||||
err = zfs.ZFSBookmark(d, snapname, snapname)
|
||||
if err != nil {
|
||||
a.log.WithError(err).Error("cannot create bookmark")
|
||||
a.task.Log().WithError(err).Error("cannot create bookmark")
|
||||
}
|
||||
|
||||
}
|
||||
@ -150,7 +190,7 @@ func (a *IntervalAutosnap) doSnapshots(didSnaps chan struct{}) {
|
||||
select {
|
||||
case didSnaps <- struct{}{}:
|
||||
default:
|
||||
a.log.Error("warning: callback channel is full, discarding")
|
||||
a.task.Log().Error("warning: callback channel is full, discarding")
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user