daemon/active: implement watchdog to handle stuck replication / pruners

ActiveSide.do() can only run sequentially, i.e. we cannot run
replication and pruning in parallel. Why?

* go-streamrpc only allows one active request at a time
(this is bad design and should be fixed at some point)
* replication and pruning are implemented independently, but work on the
same resources (snapshots)

A: pruning might destroy a snapshot that is planned to be replicated
B: replication might replicate snapshots that should be pruned

We do not have any resource management / locking for A and B, but we
have a use case where users don't want their machine fill up with
snapshots if replication does not work.
That means we _have_ to run the pruners.

A further complication is that we cannot just cancel the replication
context after a timeout and move on to the pruner: it could be initial
replication and we don't know how long it will take.
(And we don't have resumable send & recv yet).

With the previous commits, we can implement the watchdog using context
cancellation.
Note that the 'MadeProgress()' calls can only be placed right before
non-error state transition. Otherwise, we could end up in a live-lock.
This commit is contained in:
Christian Schwarz 2018-10-19 16:27:05 +02:00
parent 4ede99b08c
commit 69bfcb7bed
8 changed files with 240 additions and 35 deletions

View File

@ -274,8 +274,7 @@ func (t *tui) renderReplicationReport(rep *replication.Report) {
t.printf("Problem: %s", rep.Problem)
t.newline()
}
if rep.SleepUntil.After(time.Now()) &&
state & ^(replication.PermanentError|replication.Completed) != 0 {
if rep.SleepUntil.After(time.Now()) && !state.IsTerminal() {
t.printf("Sleeping until %s (%s left)\n", rep.SleepUntil, rep.SleepUntil.Sub(time.Now()))
}

View File

@ -13,6 +13,8 @@ import (
"github.com/zrepl/zrepl/daemon/pruner"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/replication"
"github.com/zrepl/zrepl/util/envconst"
"github.com/zrepl/zrepl/util/watchdog"
"github.com/zrepl/zrepl/zfs"
"sync"
"github.com/zrepl/zrepl/daemon/logging"
@ -38,7 +40,9 @@ type ActiveSide struct {
type activeSideTasks struct {
replication *replication.Replication
replicationCancel context.CancelFunc
prunerSender, prunerReceiver *pruner.Pruner
prunerSenderCancel, prunerReceiverCancel context.CancelFunc
}
func (a *ActiveSide) updateTasks(u func(*activeSideTasks)) activeSideTasks {
@ -262,6 +266,57 @@ func (j *ActiveSide) do(ctx context.Context) {
}
}()
// watchdog
go func() {
// if no progress after 1 minute, kill the task
wdto := envconst.Duration("ZREPL_JOB_WATCHDOG_TIMEOUT", 1*time.Minute)
log.WithField("watchdog_timeout", wdto.String()).Debug("starting watchdog")
t := time.NewTicker(wdto)
defer t.Stop()
var (
rep, prunerSender, prunerReceiver watchdog.Progress
)
for {
select {
case <-runDone:
return
case <-ctx.Done():
return
case <-t.C: // fall
}
log := log.WithField("watchdog_timeout", wdto.String()) // shadowing!
j.updateTasks(func(tasks *activeSideTasks) {
if tasks.replication != nil &&
!tasks.replication.Progress.ExpectProgress(&rep) &&
!tasks.replication.State().IsTerminal() {
log.Error("replication did not make progress, cancelling")
tasks.replicationCancel()
}
if tasks.prunerSender != nil &&
!tasks.prunerSender.Progress.ExpectProgress(&prunerSender) &&
!tasks.prunerSender.State().IsTerminal() {
log.Error("pruner:sender did not make progress, cancelling")
tasks.prunerSenderCancel()
}
if tasks.prunerReceiver != nil &&
!tasks.prunerReceiver.Progress.ExpectProgress(&prunerReceiver) &&
!tasks.prunerReceiver.State().IsTerminal() {
log.Error("pruner:receiver did not make progress, cancelling")
tasks.prunerReceiverCancel()
}
})
log.WithField("replication_progress", rep.String()).
WithField("pruner_sender_progress", prunerSender.String()).
WithField("pruner_receiver_progress", prunerReceiver.String()).
Debug("watchdog did run")
}
}()
client, err := j.clientFactory.NewClient()
if err != nil {
log.WithError(err).Error("factory cannot instantiate streamrpc client")
@ -270,21 +325,52 @@ func (j *ActiveSide) do(ctx context.Context) {
sender, receiver, err := j.mode.SenderReceiver(client)
{
select {
case <-ctx.Done():
return
default:
}
ctx, repCancel := context.WithCancel(ctx)
tasks := j.updateTasks(func(tasks *activeSideTasks) {
// reset it
*tasks = activeSideTasks{}
tasks.replicationCancel = repCancel
tasks.replication = replication.NewReplication(j.promRepStateSecs, j.promBytesReplicated)
})
log.Info("start replication")
tasks.replication.Drive(ctx, sender, receiver)
repCancel() // always cancel to free up context resources
}
tasks = j.updateTasks(func(tasks *activeSideTasks) {
{
select {
case <-ctx.Done():
return
default:
}
ctx, senderCancel := context.WithCancel(ctx)
tasks := j.updateTasks(func(tasks *activeSideTasks) {
tasks.prunerSender = j.prunerFactory.BuildSenderPruner(ctx, sender, sender)
tasks.prunerReceiver = j.prunerFactory.BuildReceiverPruner(ctx, receiver, sender)
tasks.prunerSenderCancel = senderCancel
})
log.Info("start pruning sender")
tasks.prunerSender.Prune()
senderCancel()
}
{
select {
case <-ctx.Done():
return
default:
}
ctx, receiverCancel := context.WithCancel(ctx)
tasks := j.updateTasks(func(tasks *activeSideTasks) {
tasks.prunerReceiver = j.prunerFactory.BuildReceiverPruner(ctx, receiver, sender)
tasks.prunerReceiverCancel = receiverCancel
})
log.Info("start pruning receiver")
tasks.prunerReceiver.Prune()
receiverCancel()
}
}

View File

@ -9,6 +9,7 @@ import (
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/pruning"
"github.com/zrepl/zrepl/replication/pdu"
"github.com/zrepl/zrepl/util/watchdog"
"net"
"sort"
"sync"
@ -56,6 +57,8 @@ type args struct {
type Pruner struct {
args args
Progress watchdog.KeepAlive
mtx sync.RWMutex
state State
@ -175,6 +178,10 @@ func (s State) statefunc() state {
return statemap[s]
}
func (s State) IsTerminal() bool {
return s.statefunc() == nil
}
type updater func(func(*Pruner)) State
type state func(args *args, u updater) state
@ -249,6 +256,12 @@ func (p *Pruner) Report() *Report {
return &r
}
func (p *Pruner) State() State {
p.mtx.Lock()
defer p.mtx.Unlock()
return p.state
}
type fs struct {
path string

View File

@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/util/watchdog"
"io"
"net"
"sync"
@ -191,7 +192,7 @@ type ReplicationStep struct {
expectedSize int64 // 0 means no size estimate present / possible
}
func (f *Replication) TakeStep(ctx context.Context, sender Sender, receiver Receiver) (post State, nextStepDate, retryWaitUntil time.Time) {
func (f *Replication) TakeStep(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver) (post State, nextStepDate, retryWaitUntil time.Time) {
var u updater = func(fu func(*Replication)) State {
f.lock.Lock()
@ -205,7 +206,7 @@ func (f *Replication) TakeStep(ctx context.Context, sender Sender, receiver Rece
pre := u(nil)
preTime := time.Now()
s = s(ctx, sender, receiver, u)
s = s(ctx, ka, sender, receiver, u)
delta := time.Now().Sub(preTime)
post = u(func(f *Replication) {
@ -233,11 +234,11 @@ func (f *Replication) RetryWaitUntil() time.Time {
type updater func(func(fsr *Replication)) State
type state func(ctx context.Context, sender Sender, receiver Receiver, u updater) state
type state func(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state
var RetrySleepDuration = 10 * time.Second // FIXME make configurable
func stateReady(ctx context.Context, sender Sender, receiver Receiver, u updater) state {
func stateReady(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state {
var current *ReplicationStep
s := u(func(f *Replication) {
@ -251,7 +252,7 @@ func stateReady(ctx context.Context, sender Sender, receiver Receiver, u updater
return s.fsrsf()
}
stepState := current.doReplication(ctx, sender, receiver)
stepState := current.doReplication(ctx, ka, sender, receiver)
return u(func(f *Replication) {
switch stepState {
@ -277,7 +278,7 @@ func stateReady(ctx context.Context, sender Sender, receiver Receiver, u updater
}).fsrsf()
}
func stateRetryWait(ctx context.Context, sender Sender, receiver Receiver, u updater) state {
func stateRetryWait(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state {
var sleepUntil time.Time
u(func(f *Replication) {
sleepUntil = f.retryWaitUntil
@ -337,7 +338,7 @@ func shouldRetry(err error) bool {
return false
}
func (s *ReplicationStep) doReplication(ctx context.Context, sender Sender, receiver Receiver) StepState {
func (s *ReplicationStep) doReplication(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver) StepState {
fs := s.parent.fs
@ -380,6 +381,9 @@ func (s *ReplicationStep) doReplication(ctx context.Context, sender Sender, rece
}
s.byteCounter = util.NewByteCounterReader(sstream)
s.byteCounter.SetCallback(1*time.Second, func(i int64) {
ka.MadeProgress()
})
defer func() {
s.parent.promBytesReplicated.Add(float64(s.byteCounter.Bytes()))
}()
@ -404,14 +408,15 @@ func (s *ReplicationStep) doReplication(ctx context.Context, sender Sender, rece
return updateStateError(err)
}
log.Debug("receive finished")
ka.MadeProgress()
updateStateCompleted()
return s.doMarkReplicated(ctx, sender)
return s.doMarkReplicated(ctx, ka, sender)
}
func (s *ReplicationStep) doMarkReplicated(ctx context.Context, sender Sender) StepState {
func (s *ReplicationStep) doMarkReplicated(ctx context.Context, ka *watchdog.KeepAlive, sender Sender) StepState {
log := getLogger(ctx).
WithField("filesystem", s.parent.fs).
@ -456,6 +461,7 @@ func (s *ReplicationStep) doMarkReplicated(ctx context.Context, sender Sender) S
log.Error(err.Error())
return updateStateError(err)
}
ka.MadeProgress()
return updateStateCompleted()
}

View File

@ -8,6 +8,7 @@ import (
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/daemon/job/wakeup"
"github.com/zrepl/zrepl/util/watchdog"
"math/bits"
"net"
"sync"
@ -47,6 +48,10 @@ func (s State) rsf() state {
return m[idx]
}
func (s State) IsTerminal() bool {
return s.rsf() == nil
}
// Replication implements the replication of multiple file systems from a Sender to a Receiver.
//
// It is a state machine that is driven by the Drive method
@ -56,6 +61,8 @@ type Replication struct {
promSecsPerState *prometheus.HistogramVec // labels: state
promBytesReplicated *prometheus.CounterVec // labels: filesystem
Progress watchdog.KeepAlive
// lock protects all fields of this struct (but not the fields behind pointers!)
lock sync.Mutex
@ -122,7 +129,7 @@ func NewFilteredError(fs string) *FilteredError {
func (f FilteredError) Error() string { return "endpoint does not allow access to filesystem " + f.fs }
type updater func(func(*Replication)) (newState State)
type state func(ctx context.Context, sender Sender, receiver Receiver, u updater) state
type state func(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state
// Drive starts the state machine and returns only after replication has finished (with or without errors).
// The Logger in ctx is used for both debug and error logging, but is not guaranteed to be stable
@ -147,7 +154,7 @@ func (r *Replication) Drive(ctx context.Context, sender Sender, receiver Receive
for s != nil {
preTime := time.Now()
pre = u(nil)
s = s(ctx, sender, receiver, u)
s = s(ctx, &r.Progress, sender, receiver, u)
delta := time.Now().Sub(preTime)
r.promSecsPerState.WithLabelValues(pre.String()).Observe(delta.Seconds())
post = u(nil)
@ -198,7 +205,7 @@ func isPermanent(err error) bool {
return false
}
func statePlanning(ctx context.Context, sender Sender, receiver Receiver, u updater) state {
func statePlanning(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state {
log := getLogger(ctx)
@ -307,9 +314,12 @@ func statePlanning(ctx context.Context, sender Sender, receiver Receiver, u upda
log.WithError(err).Error("error computing size estimate")
return handlePlanningError(err)
}
q.Add(qitem)
}
ka.MadeProgress()
return u(func(r *Replication) {
r.completed = nil
r.queue = q
@ -318,7 +328,7 @@ func statePlanning(ctx context.Context, sender Sender, receiver Receiver, u upda
}).rsf()
}
func statePlanningError(ctx context.Context, sender Sender, receiver Receiver, u updater) state {
func statePlanningError(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state {
var sleepUntil time.Time
u(func(r *Replication) {
sleepUntil = r.sleepUntil
@ -340,7 +350,7 @@ func statePlanningError(ctx context.Context, sender Sender, receiver Receiver, u
}).rsf()
}
func stateWorking(ctx context.Context, sender Sender, receiver Receiver, u updater) state {
func stateWorking(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state {
var active *ReplicationQueueItemHandle
rsfNext := u(func(r *Replication) {
@ -365,7 +375,7 @@ func stateWorking(ctx context.Context, sender Sender, receiver Receiver, u updat
}).rsf()
}
state, nextStepDate, retryWaitUntil := active.GetFSReplication().TakeStep(ctx, sender, receiver)
state, nextStepDate, retryWaitUntil := active.GetFSReplication().TakeStep(ctx, ka, sender, receiver)
return u(func(r *Replication) {
active.Update(state, nextStepDate, retryWaitUntil)
r.active = nil
@ -383,7 +393,7 @@ func stateWorking(ctx context.Context, sender Sender, receiver Receiver, u updat
return u(nil).rsf()
}
func stateWorkingWait(ctx context.Context, sender Sender, receiver Receiver, u updater) state {
func stateWorkingWait(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state {
var sleepUntil time.Time
u(func(r *Replication) {
sleepUntil = r.sleepUntil
@ -445,3 +455,9 @@ func (r *Replication) Report() *Report {
return &rep
}
func (r *Replication) State() State {
r.lock.Lock()
defer r.lock.Unlock()
return r.state
}

25
util/envconst/envconst.go Normal file
View File

@ -0,0 +1,25 @@
package envconst
import (
"os"
"sync"
"time"
)
var cache sync.Map
func Duration(varname string, def time.Duration) time.Duration {
if v, ok := cache.Load(varname); ok {
return v.(time.Duration)
}
e := os.Getenv(varname)
if e == "" {
return def
}
d, err := time.ParseDuration(e)
if err != nil {
panic(err)
}
cache.Store(varname, d)
return d
}

View File

@ -5,6 +5,7 @@ import (
"net"
"os"
"sync/atomic"
"time"
)
type NetConnLogger struct {
@ -101,6 +102,14 @@ func (c *ChainedReader) Read(buf []byte) (n int, err error) {
type ByteCounterReader struct {
reader io.ReadCloser
// called & accessed synchronously during Read, no external access
cb func(full int64)
cbEvery time.Duration
lastCbAt time.Time
bytesSinceLastCb int64
// set atomically because it may be read by multiple threads
bytes int64
}
@ -110,13 +119,23 @@ func NewByteCounterReader(reader io.ReadCloser) *ByteCounterReader {
}
}
func (b *ByteCounterReader) SetCallback(every time.Duration, cb func(full int64)) {
b.cbEvery = every
b.cb = cb
}
func (b *ByteCounterReader) Close() error {
return b.reader.Close()
}
func (b *ByteCounterReader) Read(p []byte) (n int, err error) {
n, err = b.reader.Read(p)
atomic.AddInt64(&b.bytes, int64(n))
full := atomic.AddInt64(&b.bytes, int64(n))
now := time.Now()
if b.cb != nil && now.Sub(b.lastCbAt) > b.cbEvery {
b.cb(full)
b.lastCbAt = now
}
return n, err
}

41
util/watchdog/watchdog.go Normal file
View File

@ -0,0 +1,41 @@
package watchdog
import (
"fmt"
"sync"
"time"
)
type Progress struct {
lastUpd time.Time
}
func (p *Progress) String() string {
return fmt.Sprintf("last update at %s", p.lastUpd)
}
func (p *Progress) madeProgressSince(p2 *Progress) bool {
if p.lastUpd.IsZero() && p2.lastUpd.IsZero() {
return false
}
return p.lastUpd.After(p2.lastUpd)
}
type KeepAlive struct {
mtx sync.Mutex
p Progress
}
func (k *KeepAlive) MadeProgress() {
k.mtx.Lock()
defer k.mtx.Unlock()
k.p.lastUpd = time.Now()
}
func (k *KeepAlive) ExpectProgress(last *Progress) (madeProgress bool) {
k.mtx.Lock()
defer k.mtx.Unlock()
madeProgress = k.p.madeProgressSince(last)
*last = k.p
return madeProgress
}