diff --git a/daemon/snapper/cron.go b/daemon/snapper/cron.go index 8d50e41..0797988 100644 --- a/daemon/snapper/cron.go +++ b/daemon/snapper/cron.go @@ -10,6 +10,7 @@ import ( "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon/hooks" + "github.com/zrepl/zrepl/util/suspendresumesafetimer" "github.com/zrepl/zrepl/zfs" ) @@ -43,67 +44,45 @@ type Cron struct { func (s *Cron) Run(ctx context.Context, snapshotsTaken chan<- struct{}) { - t := time.NewTimer(0) - defer func() { - if !t.Stop() { - select { - case <-t.C: - default: - } - } - }() for { now := time.Now() s.mtx.Lock() s.wakeupTime = s.config.Cron.Schedule.Next(now) s.mtx.Unlock() - // Re-arm the timer. - // Need to Stop before Reset, see docs. - if !t.Stop() { - // Use non-blocking read from timer channel - // because, except for the first loop iteration, - // the channel is already drained - select { - case <-t.C: - default: - } - } - t.Reset(s.wakeupTime.Sub(now)) - - select { - case <-ctx.Done(): + ctxDone := suspendresumesafetimer.SleepUntil(ctx, s.wakeupTime) + if ctxDone != nil { return - case <-t.C: - getLogger(ctx).Debug("cron timer fired") - s.mtx.Lock() - if s.running { - getLogger(ctx).Warn("snapshotting triggered according to cron rules but previous snapshotting is not done; not taking a snapshot this time") - s.wakeupWhileRunningCount++ - s.mtx.Unlock() - continue - } - s.lastError = nil - s.lastPlan = nil - s.wakeupWhileRunningCount = 0 - s.running = true - s.mtx.Unlock() - go func() { - err := s.do(ctx) - s.mtx.Lock() - s.lastError = err - s.running = false - s.mtx.Unlock() - - select { - case snapshotsTaken <- struct{}{}: - default: - if snapshotsTaken != nil { - getLogger(ctx).Warn("callback channel is full, discarding snapshot update event") - } - } - }() } + + getLogger(ctx).Debug("cron timer fired") + s.mtx.Lock() + if s.running { + getLogger(ctx).Warn("snapshotting triggered according to cron rules but previous snapshotting is not done; not taking a snapshot this time") + s.wakeupWhileRunningCount++ + s.mtx.Unlock() + continue + } + s.lastError = nil + s.lastPlan = nil + s.wakeupWhileRunningCount = 0 + s.running = true + s.mtx.Unlock() + go func() { + err := s.do(ctx) + s.mtx.Lock() + s.lastError = err + s.running = false + s.mtx.Unlock() + + select { + case snapshotsTaken <- struct{}{}: + default: + if snapshotsTaken != nil { + getLogger(ctx).Warn("callback channel is full, discarding snapshot update event") + } + } + }() } } diff --git a/daemon/snapper/periodic.go b/daemon/snapper/periodic.go index 46b6127..d6068f7 100644 --- a/daemon/snapper/periodic.go +++ b/daemon/snapper/periodic.go @@ -15,6 +15,7 @@ import ( "github.com/zrepl/zrepl/daemon/hooks" "github.com/zrepl/zrepl/daemon/logging" "github.com/zrepl/zrepl/util/envconst" + "github.com/zrepl/zrepl/util/suspendresumesafetimer" "github.com/zrepl/zrepl/zfs" ) @@ -172,16 +173,13 @@ func periodicStateSyncUp(a periodicArgs, u updater) state { u(func(s *Periodic) { s.sleepUntil = syncPoint }) - t := time.NewTimer(time.Until(syncPoint)) - defer t.Stop() - select { - case <-t.C: - return u(func(s *Periodic) { - s.state = Planning - }).sf() - case <-a.ctx.Done(): + ctxDone := suspendresumesafetimer.SleepUntil(a.ctx, syncPoint) + if ctxDone != nil { return onMainCtxDone(a.ctx, u) } + return u(func(s *Periodic) { + s.state = Planning + }).sf() } func periodicStatePlan(a periodicArgs, u updater) state { @@ -242,17 +240,13 @@ func periodicStateWait(a periodicArgs, u updater) state { logFunc("enter wait-state after error") }) - t := time.NewTimer(time.Until(sleepUntil)) - defer t.Stop() - - select { - case <-t.C: - return u(func(snapper *Periodic) { - snapper.state = Planning - }).sf() - case <-a.ctx.Done(): + ctxDone := suspendresumesafetimer.SleepUntil(a.ctx, sleepUntil) + if ctxDone != nil { return onMainCtxDone(a.ctx, u) } + return u(func(snapper *Periodic) { + snapper.state = Planning + }).sf() } func listFSes(ctx context.Context, mf zfs.DatasetFilter) (fss []*zfs.DatasetPath, err error) { diff --git a/util/suspendresumesafetimer/suspendresumesafetimer.go b/util/suspendresumesafetimer/suspendresumesafetimer.go new file mode 100644 index 0000000..ad26678 --- /dev/null +++ b/util/suspendresumesafetimer/suspendresumesafetimer.go @@ -0,0 +1,96 @@ +package suspendresumesafetimer + +import ( + "context" + "time" + + "github.com/zrepl/zrepl/util/envconst" +) + +// The returned error is guaranteed to be the ctx.Err() +func SleepUntil(ctx context.Context, sleepUntil time.Time) error { + + // We use .Round(0) to strip the monotonic clock reading from the time.Time + // returned by time.Now(). That will make the before/after check in the ticker + // for-loop compare wall-clock times instead of monotonic time. + // Comparing wall clock time is necessary because monotonic time does not progress + // while the system is suspended. + // + // Background + // + // A time.Time carries a wallclock timestamp and optionally a monotonic clock timestamp. + // time.Now() returns a time.Time that carries both. + // time.Time.Add() applies the same delta to both timestamps in the time.Time. + // x.Sub(y) will return the *monotonic* delta if both x and y carry a monotonic timestamp. + // time.Until(x) == x.Sub(now) where `now` will have a monotonic timestamp. + // So, time.Until(x) with an `x` that has monotonic timestamp will return monotonic delta. + // + // Why Do We Care? + // + // On systems that suspend/resume, wall clock time progresses during suspend but + // monotonic time does not. + // + // So, suppose the following sequence of events: + // x <== time.Now() + // System suspends for 1 hour + // delta <== time.Now().Sub(x) + // `delta` will be near 0 because time.Until() subtracts the monotonic + // timestamps, and monotonic time didn't progress during suspend. + // + // Now strip the timestamp using .Round(0) + // x <== time.Now().Round(0) + // System suspends for 1 hour + // delta <== time.Now().Sub(x) + // `delta` will be 1 hour because time.Sub() subtracted wallclock timestamps + // because x didn't have a monotonic timestamp because we stripped it using .Round(0). + // + // + sleepUntil = sleepUntil.Round(0) + + // Set up a timer so that, if the system doesn't suspend/resume, + // we get a precise wake-up time from the native Go timer. + monotonicClockTimer := time.NewTimer(time.Until(sleepUntil)) + defer func() { + if !monotonicClockTimer.Stop() { + // non-blocking read since we can come here when + // we've already drained the channel through + // case <-monotonicClockTimer.C + // in the `for` loop below. + select { + case <-monotonicClockTimer.C: + default: + } + } + }() + + // Set up a ticker so that we're guaranteed to wake up periodically. + // We'll then get the current wall-clock time and check ourselves + // whether we're past the requested expiration time. + // Pick a 10 second check interval by default since it's rare that + // suspend/resume is done more frequently. + ticker := time.NewTicker(envconst.Duration("ZREPL_WALLCLOCKTIMER_MAX_DELAY", 10*time.Second)) + defer ticker.Stop() + + for { + select { + case <-monotonicClockTimer.C: + return nil + case <-ticker.C: + now := time.Now() + if now.Before(sleepUntil) { + // Continue waiting. + + // Reset the monotonic timer to reset drift. + if !monotonicClockTimer.Stop() { + <-monotonicClockTimer.C + } + monotonicClockTimer.Reset(time.Until(sleepUntil)) + + continue + } + return nil + case <-ctx.Done(): + return ctx.Err() + } + } +}