From 69bfcb7bed71c2a26ac342bef5a94b746f61ac3f Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 19 Oct 2018 16:27:05 +0200 Subject: [PATCH] daemon/active: implement watchdog to handle stuck replication / pruners ActiveSide.do() can only run sequentially, i.e. we cannot run replication and pruning in parallel. Why? * go-streamrpc only allows one active request at a time (this is bad design and should be fixed at some point) * replication and pruning are implemented independently, but work on the same resources (snapshots) A: pruning might destroy a snapshot that is planned to be replicated B: replication might replicate snapshots that should be pruned We do not have any resource management / locking for A and B, but we have a use case where users don't want their machine fill up with snapshots if replication does not work. That means we _have_ to run the pruners. A further complication is that we cannot just cancel the replication context after a timeout and move on to the pruner: it could be initial replication and we don't know how long it will take. (And we don't have resumable send & recv yet). With the previous commits, we can implement the watchdog using context cancellation. Note that the 'MadeProgress()' calls can only be placed right before non-error state transition. Otherwise, we could end up in a live-lock. --- client/status.go | 3 +- daemon/job/active.go | 118 ++++++++++++++++++++++++++++++++----- daemon/pruner/pruner.go | 13 ++++ replication/fsrep/fsfsm.go | 24 +++++--- replication/mainfsm.go | 30 +++++++--- util/envconst/envconst.go | 25 ++++++++ util/io.go | 21 ++++++- util/watchdog/watchdog.go | 41 +++++++++++++ 8 files changed, 240 insertions(+), 35 deletions(-) create mode 100644 util/envconst/envconst.go create mode 100644 util/watchdog/watchdog.go diff --git a/client/status.go b/client/status.go index 75ced10..ab5b800 100644 --- a/client/status.go +++ b/client/status.go @@ -274,8 +274,7 @@ func (t *tui) renderReplicationReport(rep *replication.Report) { t.printf("Problem: %s", rep.Problem) t.newline() } - if rep.SleepUntil.After(time.Now()) && - state & ^(replication.PermanentError|replication.Completed) != 0 { + if rep.SleepUntil.After(time.Now()) && !state.IsTerminal() { t.printf("Sleeping until %s (%s left)\n", rep.SleepUntil, rep.SleepUntil.Sub(time.Now())) } diff --git a/daemon/job/active.go b/daemon/job/active.go index 482b368..f6e2ecf 100644 --- a/daemon/job/active.go +++ b/daemon/job/active.go @@ -13,6 +13,8 @@ import ( "github.com/zrepl/zrepl/daemon/pruner" "github.com/zrepl/zrepl/endpoint" "github.com/zrepl/zrepl/replication" + "github.com/zrepl/zrepl/util/envconst" + "github.com/zrepl/zrepl/util/watchdog" "github.com/zrepl/zrepl/zfs" "sync" "github.com/zrepl/zrepl/daemon/logging" @@ -38,7 +40,9 @@ type ActiveSide struct { type activeSideTasks struct { replication *replication.Replication + replicationCancel context.CancelFunc prunerSender, prunerReceiver *pruner.Pruner + prunerSenderCancel, prunerReceiverCancel context.CancelFunc } func (a *ActiveSide) updateTasks(u func(*activeSideTasks)) activeSideTasks { @@ -262,6 +266,57 @@ func (j *ActiveSide) do(ctx context.Context) { } }() + // watchdog + go func() { + // if no progress after 1 minute, kill the task + wdto := envconst.Duration("ZREPL_JOB_WATCHDOG_TIMEOUT", 1*time.Minute) + log.WithField("watchdog_timeout", wdto.String()).Debug("starting watchdog") + + t := time.NewTicker(wdto) + defer t.Stop() + + var ( + rep, prunerSender, prunerReceiver watchdog.Progress + ) + for { + select { + case <-runDone: + return + case <-ctx.Done(): + return + case <-t.C: // fall + } + + log := log.WithField("watchdog_timeout", wdto.String()) // shadowing! + + j.updateTasks(func(tasks *activeSideTasks) { + if tasks.replication != nil && + !tasks.replication.Progress.ExpectProgress(&rep) && + !tasks.replication.State().IsTerminal() { + log.Error("replication did not make progress, cancelling") + tasks.replicationCancel() + } + if tasks.prunerSender != nil && + !tasks.prunerSender.Progress.ExpectProgress(&prunerSender) && + !tasks.prunerSender.State().IsTerminal() { + log.Error("pruner:sender did not make progress, cancelling") + tasks.prunerSenderCancel() + } + if tasks.prunerReceiver != nil && + !tasks.prunerReceiver.Progress.ExpectProgress(&prunerReceiver) && + !tasks.prunerReceiver.State().IsTerminal() { + log.Error("pruner:receiver did not make progress, cancelling") + tasks.prunerReceiverCancel() + } + }) + log.WithField("replication_progress", rep.String()). + WithField("pruner_sender_progress", prunerSender.String()). + WithField("pruner_receiver_progress", prunerReceiver.String()). + Debug("watchdog did run") + + } + }() + client, err := j.clientFactory.NewClient() if err != nil { log.WithError(err).Error("factory cannot instantiate streamrpc client") @@ -270,21 +325,52 @@ func (j *ActiveSide) do(ctx context.Context) { sender, receiver, err := j.mode.SenderReceiver(client) - tasks := j.updateTasks(func(tasks *activeSideTasks) { - // reset it - *tasks = activeSideTasks{} - tasks.replication = replication.NewReplication(j.promRepStateSecs, j.promBytesReplicated) - }) + { + select { + case <-ctx.Done(): + return + default: + } + ctx, repCancel := context.WithCancel(ctx) + tasks := j.updateTasks(func(tasks *activeSideTasks) { + // reset it + *tasks = activeSideTasks{} + tasks.replicationCancel = repCancel + tasks.replication = replication.NewReplication(j.promRepStateSecs, j.promBytesReplicated) + }) + log.Info("start replication") + tasks.replication.Drive(ctx, sender, receiver) + repCancel() // always cancel to free up context resources + } - log.Info("start replication") - tasks.replication.Drive(ctx, sender, receiver) - - tasks = j.updateTasks(func(tasks *activeSideTasks) { - tasks.prunerSender = j.prunerFactory.BuildSenderPruner(ctx, sender, sender) - tasks.prunerReceiver = j.prunerFactory.BuildReceiverPruner(ctx, receiver, sender) - }) - log.Info("start pruning sender") - tasks.prunerSender.Prune() - log.Info("start pruning receiver") - tasks.prunerReceiver.Prune() + { + select { + case <-ctx.Done(): + return + default: + } + ctx, senderCancel := context.WithCancel(ctx) + tasks := j.updateTasks(func(tasks *activeSideTasks) { + tasks.prunerSender = j.prunerFactory.BuildSenderPruner(ctx, sender, sender) + tasks.prunerSenderCancel = senderCancel + }) + log.Info("start pruning sender") + tasks.prunerSender.Prune() + senderCancel() + } + { + select { + case <-ctx.Done(): + return + default: + } + ctx, receiverCancel := context.WithCancel(ctx) + tasks := j.updateTasks(func(tasks *activeSideTasks) { + tasks.prunerReceiver = j.prunerFactory.BuildReceiverPruner(ctx, receiver, sender) + tasks.prunerReceiverCancel = receiverCancel + }) + log.Info("start pruning receiver") + tasks.prunerReceiver.Prune() + receiverCancel() + } } diff --git a/daemon/pruner/pruner.go b/daemon/pruner/pruner.go index cddd687..5838951 100644 --- a/daemon/pruner/pruner.go +++ b/daemon/pruner/pruner.go @@ -9,6 +9,7 @@ import ( "github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/pruning" "github.com/zrepl/zrepl/replication/pdu" + "github.com/zrepl/zrepl/util/watchdog" "net" "sort" "sync" @@ -56,6 +57,8 @@ type args struct { type Pruner struct { args args + Progress watchdog.KeepAlive + mtx sync.RWMutex state State @@ -175,6 +178,10 @@ func (s State) statefunc() state { return statemap[s] } +func (s State) IsTerminal() bool { + return s.statefunc() == nil +} + type updater func(func(*Pruner)) State type state func(args *args, u updater) state @@ -249,6 +256,12 @@ func (p *Pruner) Report() *Report { return &r } +func (p *Pruner) State() State { + p.mtx.Lock() + defer p.mtx.Unlock() + return p.state +} + type fs struct { path string diff --git a/replication/fsrep/fsfsm.go b/replication/fsrep/fsfsm.go index 02626eb..8aae8f1 100644 --- a/replication/fsrep/fsfsm.go +++ b/replication/fsrep/fsfsm.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "github.com/prometheus/client_golang/prometheus" + "github.com/zrepl/zrepl/util/watchdog" "io" "net" "sync" @@ -191,7 +192,7 @@ type ReplicationStep struct { expectedSize int64 // 0 means no size estimate present / possible } -func (f *Replication) TakeStep(ctx context.Context, sender Sender, receiver Receiver) (post State, nextStepDate, retryWaitUntil time.Time) { +func (f *Replication) TakeStep(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver) (post State, nextStepDate, retryWaitUntil time.Time) { var u updater = func(fu func(*Replication)) State { f.lock.Lock() @@ -205,7 +206,7 @@ func (f *Replication) TakeStep(ctx context.Context, sender Sender, receiver Rece pre := u(nil) preTime := time.Now() - s = s(ctx, sender, receiver, u) + s = s(ctx, ka, sender, receiver, u) delta := time.Now().Sub(preTime) post = u(func(f *Replication) { @@ -233,11 +234,11 @@ func (f *Replication) RetryWaitUntil() time.Time { type updater func(func(fsr *Replication)) State -type state func(ctx context.Context, sender Sender, receiver Receiver, u updater) state +type state func(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state var RetrySleepDuration = 10 * time.Second // FIXME make configurable -func stateReady(ctx context.Context, sender Sender, receiver Receiver, u updater) state { +func stateReady(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state { var current *ReplicationStep s := u(func(f *Replication) { @@ -251,7 +252,7 @@ func stateReady(ctx context.Context, sender Sender, receiver Receiver, u updater return s.fsrsf() } - stepState := current.doReplication(ctx, sender, receiver) + stepState := current.doReplication(ctx, ka, sender, receiver) return u(func(f *Replication) { switch stepState { @@ -277,7 +278,7 @@ func stateReady(ctx context.Context, sender Sender, receiver Receiver, u updater }).fsrsf() } -func stateRetryWait(ctx context.Context, sender Sender, receiver Receiver, u updater) state { +func stateRetryWait(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state { var sleepUntil time.Time u(func(f *Replication) { sleepUntil = f.retryWaitUntil @@ -337,7 +338,7 @@ func shouldRetry(err error) bool { return false } -func (s *ReplicationStep) doReplication(ctx context.Context, sender Sender, receiver Receiver) StepState { +func (s *ReplicationStep) doReplication(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver) StepState { fs := s.parent.fs @@ -380,6 +381,9 @@ func (s *ReplicationStep) doReplication(ctx context.Context, sender Sender, rece } s.byteCounter = util.NewByteCounterReader(sstream) + s.byteCounter.SetCallback(1*time.Second, func(i int64) { + ka.MadeProgress() + }) defer func() { s.parent.promBytesReplicated.Add(float64(s.byteCounter.Bytes())) }() @@ -404,14 +408,15 @@ func (s *ReplicationStep) doReplication(ctx context.Context, sender Sender, rece return updateStateError(err) } log.Debug("receive finished") + ka.MadeProgress() updateStateCompleted() - return s.doMarkReplicated(ctx, sender) + return s.doMarkReplicated(ctx, ka, sender) } -func (s *ReplicationStep) doMarkReplicated(ctx context.Context, sender Sender) StepState { +func (s *ReplicationStep) doMarkReplicated(ctx context.Context, ka *watchdog.KeepAlive, sender Sender) StepState { log := getLogger(ctx). WithField("filesystem", s.parent.fs). @@ -456,6 +461,7 @@ func (s *ReplicationStep) doMarkReplicated(ctx context.Context, sender Sender) S log.Error(err.Error()) return updateStateError(err) } + ka.MadeProgress() return updateStateCompleted() } diff --git a/replication/mainfsm.go b/replication/mainfsm.go index f79ebc4..ee9a338 100644 --- a/replication/mainfsm.go +++ b/replication/mainfsm.go @@ -8,6 +8,7 @@ import ( "fmt" "github.com/prometheus/client_golang/prometheus" "github.com/zrepl/zrepl/daemon/job/wakeup" + "github.com/zrepl/zrepl/util/watchdog" "math/bits" "net" "sync" @@ -47,6 +48,10 @@ func (s State) rsf() state { return m[idx] } +func (s State) IsTerminal() bool { + return s.rsf() == nil +} + // Replication implements the replication of multiple file systems from a Sender to a Receiver. // // It is a state machine that is driven by the Drive method @@ -56,6 +61,8 @@ type Replication struct { promSecsPerState *prometheus.HistogramVec // labels: state promBytesReplicated *prometheus.CounterVec // labels: filesystem + Progress watchdog.KeepAlive + // lock protects all fields of this struct (but not the fields behind pointers!) lock sync.Mutex @@ -122,7 +129,7 @@ func NewFilteredError(fs string) *FilteredError { func (f FilteredError) Error() string { return "endpoint does not allow access to filesystem " + f.fs } type updater func(func(*Replication)) (newState State) -type state func(ctx context.Context, sender Sender, receiver Receiver, u updater) state +type state func(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state // Drive starts the state machine and returns only after replication has finished (with or without errors). // The Logger in ctx is used for both debug and error logging, but is not guaranteed to be stable @@ -147,7 +154,7 @@ func (r *Replication) Drive(ctx context.Context, sender Sender, receiver Receive for s != nil { preTime := time.Now() pre = u(nil) - s = s(ctx, sender, receiver, u) + s = s(ctx, &r.Progress, sender, receiver, u) delta := time.Now().Sub(preTime) r.promSecsPerState.WithLabelValues(pre.String()).Observe(delta.Seconds()) post = u(nil) @@ -198,7 +205,7 @@ func isPermanent(err error) bool { return false } -func statePlanning(ctx context.Context, sender Sender, receiver Receiver, u updater) state { +func statePlanning(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state { log := getLogger(ctx) @@ -307,9 +314,12 @@ func statePlanning(ctx context.Context, sender Sender, receiver Receiver, u upda log.WithError(err).Error("error computing size estimate") return handlePlanningError(err) } + q.Add(qitem) } + ka.MadeProgress() + return u(func(r *Replication) { r.completed = nil r.queue = q @@ -318,7 +328,7 @@ func statePlanning(ctx context.Context, sender Sender, receiver Receiver, u upda }).rsf() } -func statePlanningError(ctx context.Context, sender Sender, receiver Receiver, u updater) state { +func statePlanningError(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state { var sleepUntil time.Time u(func(r *Replication) { sleepUntil = r.sleepUntil @@ -340,7 +350,7 @@ func statePlanningError(ctx context.Context, sender Sender, receiver Receiver, u }).rsf() } -func stateWorking(ctx context.Context, sender Sender, receiver Receiver, u updater) state { +func stateWorking(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state { var active *ReplicationQueueItemHandle rsfNext := u(func(r *Replication) { @@ -365,7 +375,7 @@ func stateWorking(ctx context.Context, sender Sender, receiver Receiver, u updat }).rsf() } - state, nextStepDate, retryWaitUntil := active.GetFSReplication().TakeStep(ctx, sender, receiver) + state, nextStepDate, retryWaitUntil := active.GetFSReplication().TakeStep(ctx, ka, sender, receiver) return u(func(r *Replication) { active.Update(state, nextStepDate, retryWaitUntil) r.active = nil @@ -383,7 +393,7 @@ func stateWorking(ctx context.Context, sender Sender, receiver Receiver, u updat return u(nil).rsf() } -func stateWorkingWait(ctx context.Context, sender Sender, receiver Receiver, u updater) state { +func stateWorkingWait(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state { var sleepUntil time.Time u(func(r *Replication) { sleepUntil = r.sleepUntil @@ -445,3 +455,9 @@ func (r *Replication) Report() *Report { return &rep } + +func (r *Replication) State() State { + r.lock.Lock() + defer r.lock.Unlock() + return r.state +} diff --git a/util/envconst/envconst.go b/util/envconst/envconst.go new file mode 100644 index 0000000..8159aae --- /dev/null +++ b/util/envconst/envconst.go @@ -0,0 +1,25 @@ +package envconst + +import ( + "os" + "sync" + "time" +) + +var cache sync.Map + +func Duration(varname string, def time.Duration) time.Duration { + if v, ok := cache.Load(varname); ok { + return v.(time.Duration) + } + e := os.Getenv(varname) + if e == "" { + return def + } + d, err := time.ParseDuration(e) + if err != nil { + panic(err) + } + cache.Store(varname, d) + return d +} diff --git a/util/io.go b/util/io.go index f9210b3..8448e9d 100644 --- a/util/io.go +++ b/util/io.go @@ -5,6 +5,7 @@ import ( "net" "os" "sync/atomic" + "time" ) type NetConnLogger struct { @@ -101,6 +102,14 @@ func (c *ChainedReader) Read(buf []byte) (n int, err error) { type ByteCounterReader struct { reader io.ReadCloser + + // called & accessed synchronously during Read, no external access + cb func(full int64) + cbEvery time.Duration + lastCbAt time.Time + bytesSinceLastCb int64 + + // set atomically because it may be read by multiple threads bytes int64 } @@ -110,13 +119,23 @@ func NewByteCounterReader(reader io.ReadCloser) *ByteCounterReader { } } +func (b *ByteCounterReader) SetCallback(every time.Duration, cb func(full int64)) { + b.cbEvery = every + b.cb = cb +} + func (b *ByteCounterReader) Close() error { return b.reader.Close() } func (b *ByteCounterReader) Read(p []byte) (n int, err error) { n, err = b.reader.Read(p) - atomic.AddInt64(&b.bytes, int64(n)) + full := atomic.AddInt64(&b.bytes, int64(n)) + now := time.Now() + if b.cb != nil && now.Sub(b.lastCbAt) > b.cbEvery { + b.cb(full) + b.lastCbAt = now + } return n, err } diff --git a/util/watchdog/watchdog.go b/util/watchdog/watchdog.go new file mode 100644 index 0000000..96cc4e3 --- /dev/null +++ b/util/watchdog/watchdog.go @@ -0,0 +1,41 @@ +package watchdog + +import ( + "fmt" + "sync" + "time" +) + +type Progress struct { + lastUpd time.Time +} + +func (p *Progress) String() string { + return fmt.Sprintf("last update at %s", p.lastUpd) +} + +func (p *Progress) madeProgressSince(p2 *Progress) bool { + if p.lastUpd.IsZero() && p2.lastUpd.IsZero() { + return false + } + return p.lastUpd.After(p2.lastUpd) +} + +type KeepAlive struct { + mtx sync.Mutex + p Progress +} + +func (k *KeepAlive) MadeProgress() { + k.mtx.Lock() + defer k.mtx.Unlock() + k.p.lastUpd = time.Now() +} + +func (k *KeepAlive) ExpectProgress(last *Progress) (madeProgress bool) { + k.mtx.Lock() + defer k.mtx.Unlock() + madeProgress = k.p.madeProgressSince(last) + *last = k.p + return madeProgress +}