mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-21 16:03:32 +01:00
snapper: fix delayed snapshots caused by system suspend/resume
See explainer comment in periodic.go for details. fixes https://github.com/zrepl/zrepl/issues/611
This commit is contained in:
parent
3ffb69bfb0
commit
6260b75031
@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/zrepl/zrepl/config"
|
||||
"github.com/zrepl/zrepl/daemon/hooks"
|
||||
"github.com/zrepl/zrepl/util/suspendresumesafetimer"
|
||||
"github.com/zrepl/zrepl/zfs"
|
||||
)
|
||||
|
||||
@ -43,67 +44,45 @@ type Cron struct {
|
||||
|
||||
func (s *Cron) Run(ctx context.Context, snapshotsTaken chan<- struct{}) {
|
||||
|
||||
t := time.NewTimer(0)
|
||||
defer func() {
|
||||
if !t.Stop() {
|
||||
select {
|
||||
case <-t.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}()
|
||||
for {
|
||||
now := time.Now()
|
||||
s.mtx.Lock()
|
||||
s.wakeupTime = s.config.Cron.Schedule.Next(now)
|
||||
s.mtx.Unlock()
|
||||
|
||||
// Re-arm the timer.
|
||||
// Need to Stop before Reset, see docs.
|
||||
if !t.Stop() {
|
||||
// Use non-blocking read from timer channel
|
||||
// because, except for the first loop iteration,
|
||||
// the channel is already drained
|
||||
select {
|
||||
case <-t.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
t.Reset(s.wakeupTime.Sub(now))
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
ctxDone := suspendresumesafetimer.SleepUntil(ctx, s.wakeupTime)
|
||||
if ctxDone != nil {
|
||||
return
|
||||
case <-t.C:
|
||||
getLogger(ctx).Debug("cron timer fired")
|
||||
s.mtx.Lock()
|
||||
if s.running {
|
||||
getLogger(ctx).Warn("snapshotting triggered according to cron rules but previous snapshotting is not done; not taking a snapshot this time")
|
||||
s.wakeupWhileRunningCount++
|
||||
s.mtx.Unlock()
|
||||
continue
|
||||
}
|
||||
s.lastError = nil
|
||||
s.lastPlan = nil
|
||||
s.wakeupWhileRunningCount = 0
|
||||
s.running = true
|
||||
s.mtx.Unlock()
|
||||
go func() {
|
||||
err := s.do(ctx)
|
||||
s.mtx.Lock()
|
||||
s.lastError = err
|
||||
s.running = false
|
||||
s.mtx.Unlock()
|
||||
|
||||
select {
|
||||
case snapshotsTaken <- struct{}{}:
|
||||
default:
|
||||
if snapshotsTaken != nil {
|
||||
getLogger(ctx).Warn("callback channel is full, discarding snapshot update event")
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
getLogger(ctx).Debug("cron timer fired")
|
||||
s.mtx.Lock()
|
||||
if s.running {
|
||||
getLogger(ctx).Warn("snapshotting triggered according to cron rules but previous snapshotting is not done; not taking a snapshot this time")
|
||||
s.wakeupWhileRunningCount++
|
||||
s.mtx.Unlock()
|
||||
continue
|
||||
}
|
||||
s.lastError = nil
|
||||
s.lastPlan = nil
|
||||
s.wakeupWhileRunningCount = 0
|
||||
s.running = true
|
||||
s.mtx.Unlock()
|
||||
go func() {
|
||||
err := s.do(ctx)
|
||||
s.mtx.Lock()
|
||||
s.lastError = err
|
||||
s.running = false
|
||||
s.mtx.Unlock()
|
||||
|
||||
select {
|
||||
case snapshotsTaken <- struct{}{}:
|
||||
default:
|
||||
if snapshotsTaken != nil {
|
||||
getLogger(ctx).Warn("callback channel is full, discarding snapshot update event")
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
"github.com/zrepl/zrepl/daemon/hooks"
|
||||
"github.com/zrepl/zrepl/daemon/logging"
|
||||
"github.com/zrepl/zrepl/util/envconst"
|
||||
"github.com/zrepl/zrepl/util/suspendresumesafetimer"
|
||||
"github.com/zrepl/zrepl/zfs"
|
||||
)
|
||||
|
||||
@ -172,16 +173,13 @@ func periodicStateSyncUp(a periodicArgs, u updater) state {
|
||||
u(func(s *Periodic) {
|
||||
s.sleepUntil = syncPoint
|
||||
})
|
||||
t := time.NewTimer(time.Until(syncPoint))
|
||||
defer t.Stop()
|
||||
select {
|
||||
case <-t.C:
|
||||
return u(func(s *Periodic) {
|
||||
s.state = Planning
|
||||
}).sf()
|
||||
case <-a.ctx.Done():
|
||||
ctxDone := suspendresumesafetimer.SleepUntil(a.ctx, syncPoint)
|
||||
if ctxDone != nil {
|
||||
return onMainCtxDone(a.ctx, u)
|
||||
}
|
||||
return u(func(s *Periodic) {
|
||||
s.state = Planning
|
||||
}).sf()
|
||||
}
|
||||
|
||||
func periodicStatePlan(a periodicArgs, u updater) state {
|
||||
@ -242,17 +240,13 @@ func periodicStateWait(a periodicArgs, u updater) state {
|
||||
logFunc("enter wait-state after error")
|
||||
})
|
||||
|
||||
t := time.NewTimer(time.Until(sleepUntil))
|
||||
defer t.Stop()
|
||||
|
||||
select {
|
||||
case <-t.C:
|
||||
return u(func(snapper *Periodic) {
|
||||
snapper.state = Planning
|
||||
}).sf()
|
||||
case <-a.ctx.Done():
|
||||
ctxDone := suspendresumesafetimer.SleepUntil(a.ctx, sleepUntil)
|
||||
if ctxDone != nil {
|
||||
return onMainCtxDone(a.ctx, u)
|
||||
}
|
||||
return u(func(snapper *Periodic) {
|
||||
snapper.state = Planning
|
||||
}).sf()
|
||||
}
|
||||
|
||||
func listFSes(ctx context.Context, mf zfs.DatasetFilter) (fss []*zfs.DatasetPath, err error) {
|
||||
|
96
util/suspendresumesafetimer/suspendresumesafetimer.go
Normal file
96
util/suspendresumesafetimer/suspendresumesafetimer.go
Normal file
@ -0,0 +1,96 @@
|
||||
package suspendresumesafetimer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/zrepl/zrepl/util/envconst"
|
||||
)
|
||||
|
||||
// The returned error is guaranteed to be the ctx.Err()
|
||||
func SleepUntil(ctx context.Context, sleepUntil time.Time) error {
|
||||
|
||||
// We use .Round(0) to strip the monotonic clock reading from the time.Time
|
||||
// returned by time.Now(). That will make the before/after check in the ticker
|
||||
// for-loop compare wall-clock times instead of monotonic time.
|
||||
// Comparing wall clock time is necessary because monotonic time does not progress
|
||||
// while the system is suspended.
|
||||
//
|
||||
// Background
|
||||
//
|
||||
// A time.Time carries a wallclock timestamp and optionally a monotonic clock timestamp.
|
||||
// time.Now() returns a time.Time that carries both.
|
||||
// time.Time.Add() applies the same delta to both timestamps in the time.Time.
|
||||
// x.Sub(y) will return the *monotonic* delta if both x and y carry a monotonic timestamp.
|
||||
// time.Until(x) == x.Sub(now) where `now` will have a monotonic timestamp.
|
||||
// So, time.Until(x) with an `x` that has monotonic timestamp will return monotonic delta.
|
||||
//
|
||||
// Why Do We Care?
|
||||
//
|
||||
// On systems that suspend/resume, wall clock time progresses during suspend but
|
||||
// monotonic time does not.
|
||||
//
|
||||
// So, suppose the following sequence of events:
|
||||
// x <== time.Now()
|
||||
// System suspends for 1 hour
|
||||
// delta <== time.Now().Sub(x)
|
||||
// `delta` will be near 0 because time.Until() subtracts the monotonic
|
||||
// timestamps, and monotonic time didn't progress during suspend.
|
||||
//
|
||||
// Now strip the timestamp using .Round(0)
|
||||
// x <== time.Now().Round(0)
|
||||
// System suspends for 1 hour
|
||||
// delta <== time.Now().Sub(x)
|
||||
// `delta` will be 1 hour because time.Sub() subtracted wallclock timestamps
|
||||
// because x didn't have a monotonic timestamp because we stripped it using .Round(0).
|
||||
//
|
||||
//
|
||||
sleepUntil = sleepUntil.Round(0)
|
||||
|
||||
// Set up a timer so that, if the system doesn't suspend/resume,
|
||||
// we get a precise wake-up time from the native Go timer.
|
||||
monotonicClockTimer := time.NewTimer(time.Until(sleepUntil))
|
||||
defer func() {
|
||||
if !monotonicClockTimer.Stop() {
|
||||
// non-blocking read since we can come here when
|
||||
// we've already drained the channel through
|
||||
// case <-monotonicClockTimer.C
|
||||
// in the `for` loop below.
|
||||
select {
|
||||
case <-monotonicClockTimer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Set up a ticker so that we're guaranteed to wake up periodically.
|
||||
// We'll then get the current wall-clock time and check ourselves
|
||||
// whether we're past the requested expiration time.
|
||||
// Pick a 10 second check interval by default since it's rare that
|
||||
// suspend/resume is done more frequently.
|
||||
ticker := time.NewTicker(envconst.Duration("ZREPL_WALLCLOCKTIMER_MAX_DELAY", 10*time.Second))
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-monotonicClockTimer.C:
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
now := time.Now()
|
||||
if now.Before(sleepUntil) {
|
||||
// Continue waiting.
|
||||
|
||||
// Reset the monotonic timer to reset drift.
|
||||
if !monotonicClockTimer.Stop() {
|
||||
<-monotonicClockTimer.C
|
||||
}
|
||||
monotonicClockTimer.Reset(time.Until(sleepUntil))
|
||||
|
||||
continue
|
||||
}
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user