replication: rewrite error handling + simplify state machines

* Remove explicity state machine code for all but replication.Replication
* Introduce explicit error types that satisfy interfaces which provide
  sufficient information for replication.Replication to make intelligent
  retry + queuing decisions

  * Temporary()
  * LocalToFS()

* Remove the queue and replace it with a simple array that we sort each
  time (yay no generics :( )
This commit is contained in:
Christian Schwarz 2018-10-21 13:42:27 +02:00
parent ae5e60b1ae
commit b2844569c8
12 changed files with 426 additions and 417 deletions

6
Gopkg.lock generated
View File

@ -174,15 +174,15 @@
revision = "391d2c78c8404a9683d79f75dd24ab53040f89f7" revision = "391d2c78c8404a9683d79f75dd24ab53040f89f7"
[[projects]] [[projects]]
digest = "1:c2ba1c9dc003c15856e4529dac028cacba08ee8924300f058b3467cde9acf7a9" digest = "1:1bcbb0a7ad8d3392d446eb583ae5415ff987838a8f7331a36877789be20667e6"
name = "github.com/problame/go-streamrpc" name = "github.com/problame/go-streamrpc"
packages = [ packages = [
".", ".",
"internal/pdu", "internal/pdu",
] ]
pruneopts = "" pruneopts = ""
revision = "de6f6a4041c77f700f02d8fe749e54efa50811f7" revision = "d5d111e014342fe1c37f0b71cc37ec5f2afdfd13"
version = "v0.4" version = "v0.5"
[[projects]] [[projects]]
branch = "master" branch = "master"

View File

@ -66,7 +66,7 @@ ignored = [ "github.com/inconshreveable/mousetrap" ]
[[constraint]] [[constraint]]
name = "github.com/problame/go-streamrpc" name = "github.com/problame/go-streamrpc"
version = "0.4.0" version = "0.5.0"

View File

@ -22,7 +22,6 @@ SUBPKGS += pruning/retentiongrid
SUBPKGS += replication SUBPKGS += replication
SUBPKGS += replication/fsrep SUBPKGS += replication/fsrep
SUBPKGS += replication/pdu SUBPKGS += replication/pdu
SUBPKGS += replication/internal/queue
SUBPKGS += replication/internal/diff SUBPKGS += replication/internal/diff
SUBPKGS += tlsconf SUBPKGS += tlsconf
SUBPKGS += util SUBPKGS += util

View File

@ -110,6 +110,7 @@ func (t *tui) addIndent(indent int) {
t.moveLine(0, 0) t.moveLine(0, 0)
} }
var statusFlags struct { var statusFlags struct {
Raw bool Raw bool
} }
@ -514,10 +515,7 @@ func (t *tui) drawBar(length int, bytes, totalBytes int64) {
func StringStepState(s fsrep.StepState) string { func StringStepState(s fsrep.StepState) string {
switch s { switch s {
case fsrep.StepReplicationReady: return "Ready" case fsrep.StepReplicationReady: return "Ready"
case fsrep.StepReplicationRetry: return "Retry"
case fsrep.StepMarkReplicatedReady: return "MarkReady" case fsrep.StepMarkReplicatedReady: return "MarkReady"
case fsrep.StepMarkReplicatedRetry: return "MarkRetry"
case fsrep.StepPermanentError: return "PermanentError"
case fsrep.StepCompleted: return "Completed" case fsrep.StepCompleted: return "Completed"
default: default:
return fmt.Sprintf("UNKNOWN %d", s) return fmt.Sprintf("UNKNOWN %d", s)

View File

@ -11,6 +11,7 @@ import (
"github.com/zrepl/zrepl/replication/pdu" "github.com/zrepl/zrepl/replication/pdu"
"github.com/zrepl/zrepl/util/envconst" "github.com/zrepl/zrepl/util/envconst"
"github.com/zrepl/zrepl/util/watchdog" "github.com/zrepl/zrepl/util/watchdog"
"github.com/problame/go-streamrpc"
"net" "net"
"sort" "sort"
"strings" "strings"
@ -334,6 +335,14 @@ func (s snapshot) Replicated() bool { return s.replicated }
func (s snapshot) Date() time.Time { return s.date } func (s snapshot) Date() time.Time { return s.date }
type Error interface {
error
Temporary() bool
}
var _ Error = net.Error(nil)
var _ Error = streamrpc.Error(nil)
func shouldRetry(e error) bool { func shouldRetry(e error) bool {
if neterr, ok := e.(net.Error); ok { if neterr, ok := e.(net.Error); ok {
return neterr.Temporary() return neterr.Temporary()

View File

@ -71,38 +71,29 @@ type Report struct {
Completed, Pending []*StepReport Completed, Pending []*StepReport
} }
//go:generate stringer -type=State //go:generate enumer -type=State
type State uint type State uint
const ( const (
Ready State = 1 << iota Ready State = 1 << iota
Retry
PermanentError
Completed Completed
) )
func (s State) fsrsf() state { type Error interface {
m := map[State]state{ error
Ready: stateReady, Temporary() bool
Retry: stateRetry, LocalToFS() bool
PermanentError: nil,
Completed: nil,
}
return m[s]
}
func (s State) IsErrorState() bool {
return s & (Retry|PermanentError) != 0
} }
type Replication struct { type Replication struct {
promBytesReplicated prometheus.Counter promBytesReplicated prometheus.Counter
fs string
// lock protects all fields below it in this struct, but not the data behind pointers // lock protects all fields below it in this struct, but not the data behind pointers
lock sync.Mutex lock sync.Mutex
state State state State
fs string err Error
err error
completed, pending []*ReplicationStep completed, pending []*ReplicationStep
} }
@ -112,13 +103,35 @@ func (f *Replication) State() State {
return f.state return f.state
} }
func (f *Replication) Err() error { func (f *Replication) FS() string { return f.fs }
// returns zero value time.Time{} if no more pending steps
func (f *Replication) NextStepDate() time.Time {
if len(f.pending) == 0 {
return time.Time{}
}
return f.pending[0].to.SnapshotTime()
}
func (f *Replication) Err() Error {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
if f.state & (Retry|PermanentError) != 0 {
return f.err return f.err
}
func (f *Replication) CanRetry() bool {
f.lock.Lock()
defer f.lock.Unlock()
if f.state == Completed {
return false
} }
return nil if f.state != Ready {
panic(fmt.Sprintf("implementation error: %v", f.state))
}
if f.err == nil {
return true
}
return f.err.Temporary()
} }
func (f *Replication) UpdateSizeEsitmate(ctx context.Context, sender Sender) error { func (f *Replication) UpdateSizeEsitmate(ctx context.Context, sender Sender) error {
@ -162,26 +175,37 @@ func (b *ReplicationBuilder) Done() (r *Replication) {
return r return r
} }
func NewReplicationWithPermanentError(fs string, err error) *Replication { type ReplicationConflictError struct {
Err error
}
func (e *ReplicationConflictError) Timeout() bool { return false }
func (e *ReplicationConflictError) Temporary() bool { return false }
func (e *ReplicationConflictError) Error() string { return fmt.Sprintf("permanent error: %s", e.Err.Error()) }
func (e *ReplicationConflictError) LocalToFS() bool { return true }
func NewReplicationConflictError(fs string, err error) *Replication {
return &Replication{ return &Replication{
state: PermanentError, state: Completed,
fs: fs, fs: fs,
err: err, err: &ReplicationConflictError{Err: err},
} }
} }
//go:generate stringer -type=StepState //go:generate enumer -type=StepState
type StepState uint type StepState uint
const ( const (
StepReplicationReady StepState = 1 << iota StepReplicationReady StepState = 1 << iota
StepReplicationRetry
StepMarkReplicatedReady StepMarkReplicatedReady
StepMarkReplicatedRetry
StepPermanentError
StepCompleted StepCompleted
) )
func (s StepState) IsTerminal() bool { return s == StepCompleted }
type FilesystemVersion interface { type FilesystemVersion interface {
SnapshotTime() time.Time SnapshotTime() time.Time
GetName() string // name without @ or # GetName() string // name without @ or #
@ -204,7 +228,7 @@ type ReplicationStep struct {
expectedSize int64 // 0 means no size estimate present / possible expectedSize int64 // 0 means no size estimate present / possible
} }
func (f *Replication) TakeStep(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver) (post State, nextStepDate time.Time) { func (f *Replication) Retry(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver) Error {
var u updater = func(fu func(*Replication)) State { var u updater = func(fu func(*Replication)) State {
f.lock.Lock() f.lock.Lock()
@ -214,52 +238,46 @@ func (f *Replication) TakeStep(ctx context.Context, ka *watchdog.KeepAlive, send
} }
return f.state return f.state
} }
var s state = u(nil).fsrsf()
pre := u(nil)
preTime := time.Now()
s = s(ctx, ka, sender, receiver, u)
delta := time.Now().Sub(preTime)
post = u(func(f *Replication) {
if len(f.pending) == 0 {
return
}
nextStepDate = f.pending[0].to.SnapshotTime()
})
getLogger(ctx).
WithField("fs", f.fs).
WithField("transition", fmt.Sprintf("%s => %s", pre, post)).
WithField("duration", delta).
Debug("fsr step taken")
return post, nextStepDate
}
type updater func(func(fsr *Replication)) State
type state func(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state
func stateReady(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state {
var current *ReplicationStep var current *ReplicationStep
s := u(func(f *Replication) { pre := u(nil)
getLogger(ctx).WithField("fsrep_state", pre).Debug("begin fsrep.Retry")
defer func() {
post := u(nil)
getLogger(ctx).WithField("fsrep_transition", post).Debug("end fsrep.Retry")
}()
st := u(func(f *Replication) {
if len(f.pending) == 0 { if len(f.pending) == 0 {
f.state = Completed f.state = Completed
return return
} }
current = f.pending[0] current = f.pending[0]
}) })
if s != Ready { if st == Completed {
return s.fsrsf() return nil
}
if st != Ready {
panic(fmt.Sprintf("implementation error: %v", st))
} }
stepState := current.doReplication(ctx, ka, sender, receiver) stepCtx := WithLogger(ctx, getLogger(ctx).WithField("step", current))
getLogger(stepCtx).Debug("take step")
err := current.Retry(stepCtx, ka, sender, receiver)
if err != nil {
getLogger(stepCtx).WithError(err).Error("step could not be completed")
}
return u(func(f *Replication) { u(func(fsr *Replication) {
switch stepState { if err != nil {
case StepCompleted: f.err = &StepError{stepStr: current.String(), err: err}
return
}
if err == nil && current.state != StepCompleted {
panic(fmt.Sprintf("implementation error: %v", current.state))
}
f.err = nil
f.completed = append(f.completed, current) f.completed = append(f.completed, current)
f.pending = f.pending[1:] f.pending = f.pending[1:]
if len(f.pending) > 0 { if len(f.pending) > 0 {
@ -267,23 +285,51 @@ func stateReady(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, rece
} else { } else {
f.state = Completed f.state = Completed
} }
case StepReplicationRetry: })
fallthrough var retErr Error = nil
case StepMarkReplicatedRetry: u(func(fsr *Replication) {
f.state = Retry retErr = fsr.err
case StepPermanentError: })
f.state = PermanentError return retErr
f.err = errors.New("a replication step failed with a permanent error")
default:
panic(f)
}
}).fsrsf()
} }
func stateRetry(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state { type updater func(func(fsr *Replication)) State
return u(func(fsr *Replication) {
fsr.state = Ready type state func(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state
}).fsrsf()
type StepError struct {
stepStr string
err error
}
var _ Error = &StepError{}
func (e StepError) Error() string {
if e.LocalToFS() {
return fmt.Sprintf("step %s failed: %s", e.stepStr, e.err)
}
return e.err.Error()
}
func (e StepError) Timeout() bool {
if neterr, ok := e.err.(net.Error); ok {
return neterr.Timeout()
}
return false
}
func (e StepError) Temporary() bool {
if neterr, ok := e.err.(net.Error); ok {
return neterr.Temporary()
}
return false
}
func (e StepError) LocalToFS() bool {
if _, ok := e.err.(net.Error); ok {
return false
}
return true // conservative approximation: we'd like to check for specific errors returned over RPC here...
} }
func (fsr *Replication) Report() *Report { func (fsr *Replication) Report() *Report {
@ -295,9 +341,8 @@ func (fsr *Replication) Report() *Report {
Status: fsr.state.String(), Status: fsr.state.String(),
} }
if fsr.state&PermanentError != 0 { if fsr.err != nil && fsr.err.LocalToFS() {
rep.Problem = fsr.err.Error() rep.Problem = fsr.err.Error()
return &rep
} }
rep.Completed = make([]*StepReport, len(fsr.completed)) rep.Completed = make([]*StepReport, len(fsr.completed))
@ -309,70 +354,48 @@ func (fsr *Replication) Report() *Report {
rep.Pending[i] = fsr.pending[i].Report() rep.Pending[i] = fsr.pending[i].Report()
} }
if fsr.state&Retry != 0 {
if len(rep.Pending) != 0 { // should always be true for Retry == true?
rep.Problem = rep.Pending[0].Problem
}
}
return &rep return &rep
} }
func shouldRetry(err error) bool { func (s *ReplicationStep) Retry(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver) error {
switch err { switch s.state {
case io.EOF: case StepReplicationReady:
fallthrough return s.doReplication(ctx, ka, sender, receiver)
case io.ErrUnexpectedEOF: case StepMarkReplicatedReady:
fallthrough return s.doMarkReplicated(ctx, ka, sender)
case io.ErrClosedPipe: case StepCompleted:
return true return nil
} }
if _, ok := err.(net.Error); ok { panic(fmt.Sprintf("implementation error: %v", s.state))
return true
}
return false
} }
func (s *ReplicationStep) doReplication(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver) StepState { func (s *ReplicationStep) Error() error {
if s.state & (StepReplicationReady|StepMarkReplicatedReady) != 0 {
return s.err
}
return nil
}
func (s *ReplicationStep) doReplication(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver) error {
if s.state != StepReplicationReady {
panic(fmt.Sprintf("implementation error: %v", s.state))
}
fs := s.parent.fs fs := s.parent.fs
log := getLogger(ctx). log := getLogger(ctx)
WithField("filesystem", fs).
WithField("step", s.String())
updateStateError := func(err error) StepState {
s.lock.Lock()
defer s.lock.Unlock()
s.err = err
if shouldRetry(s.err) {
s.state = StepReplicationRetry
return s.state
}
s.state = StepPermanentError
return s.state
}
updateStateCompleted := func() StepState {
s.lock.Lock()
defer s.lock.Unlock()
s.err = nil
s.state = StepMarkReplicatedReady
return s.state
}
sr := s.buildSendRequest(false) sr := s.buildSendRequest(false)
log.Debug("initiate send request") log.Debug("initiate send request")
sres, sstream, err := sender.Send(ctx, sr) sres, sstream, err := sender.Send(ctx, sr)
if err != nil { if err != nil {
log.WithError(err).Error("send request failed") log.WithError(err).Error("send request failed")
return updateStateError(err) return err
} }
if sstream == nil { if sstream == nil {
err := errors.New("send request did not return a stream, broken endpoint implementation") err := errors.New("send request did not return a stream, broken endpoint implementation")
return updateStateError(err) return err
} }
s.byteCounter = util.NewByteCounterReader(sstream) s.byteCounter = util.NewByteCounterReader(sstream)
@ -400,42 +423,23 @@ func (s *ReplicationStep) doReplication(ctx context.Context, ka *watchdog.KeepAl
// - an unexpected exit of ZFS on the sending side // - an unexpected exit of ZFS on the sending side
// - an unexpected exit of ZFS on the receiving side // - an unexpected exit of ZFS on the receiving side
// - a connectivity issue // - a connectivity issue
return updateStateError(err) return err
} }
log.Debug("receive finished") log.Debug("receive finished")
ka.MadeProgress() ka.MadeProgress()
updateStateCompleted() s.state = StepMarkReplicatedReady
return s.doMarkReplicated(ctx, ka, sender) return s.doMarkReplicated(ctx, ka, sender)
} }
func (s *ReplicationStep) doMarkReplicated(ctx context.Context, ka *watchdog.KeepAlive, sender Sender) StepState { func (s *ReplicationStep) doMarkReplicated(ctx context.Context, ka *watchdog.KeepAlive, sender Sender) error {
log := getLogger(ctx). if s.state != StepMarkReplicatedReady {
WithField("filesystem", s.parent.fs). panic(fmt.Sprintf("implementation error: %v", s.state))
WithField("step", s.String())
updateStateError := func(err error) StepState {
s.lock.Lock()
defer s.lock.Unlock()
s.err = err
if shouldRetry(s.err) {
s.state = StepMarkReplicatedRetry
return s.state
}
s.state = StepPermanentError
return s.state
} }
updateStateCompleted := func() StepState { log := getLogger(ctx)
s.lock.Lock()
defer s.lock.Unlock()
s.state = StepCompleted
return s.state
}
log.Debug("advance replication cursor") log.Debug("advance replication cursor")
req := &pdu.ReplicationCursorReq{ req := &pdu.ReplicationCursorReq{
@ -449,25 +453,22 @@ func (s *ReplicationStep) doMarkReplicated(ctx context.Context, ka *watchdog.Kee
res, err := sender.ReplicationCursor(ctx, req) res, err := sender.ReplicationCursor(ctx, req)
if err != nil { if err != nil {
log.WithError(err).Error("error advancing replication cursor") log.WithError(err).Error("error advancing replication cursor")
return updateStateError(err) return err
} }
if res.GetError() != "" { if res.GetError() != "" {
err := fmt.Errorf("cannot advance replication cursor: %s", res.GetError()) err := fmt.Errorf("cannot advance replication cursor: %s", res.GetError())
log.Error(err.Error()) log.Error(err.Error())
return updateStateError(err) return err
} }
ka.MadeProgress() ka.MadeProgress()
return updateStateCompleted() s.state = StepCompleted
return err
} }
func (s *ReplicationStep) updateSizeEstimate(ctx context.Context, sender Sender) error { func (s *ReplicationStep) updateSizeEstimate(ctx context.Context, sender Sender) error {
fs := s.parent.fs log := getLogger(ctx)
log := getLogger(ctx).
WithField("filesystem", fs).
WithField("step", s.String())
sr := s.buildSendRequest(true) sr := s.buildSendRequest(true)

View File

@ -0,0 +1,50 @@
// Code generated by "enumer -type=State"; DO NOT EDIT.
package fsrep
import (
"fmt"
)
const _StateName = "ReadyCompleted"
var _StateIndex = [...]uint8{0, 5, 14}
func (i State) String() string {
i -= 1
if i >= State(len(_StateIndex)-1) {
return fmt.Sprintf("State(%d)", i+1)
}
return _StateName[_StateIndex[i]:_StateIndex[i+1]]
}
var _StateValues = []State{1, 2}
var _StateNameToValueMap = map[string]State{
_StateName[0:5]: 1,
_StateName[5:14]: 2,
}
// StateString retrieves an enum value from the enum constants string name.
// Throws an error if the param is not part of the enum.
func StateString(s string) (State, error) {
if val, ok := _StateNameToValueMap[s]; ok {
return val, nil
}
return 0, fmt.Errorf("%s does not belong to State values", s)
}
// StateValues returns all values of the enum
func StateValues() []State {
return _StateValues
}
// IsAState returns "true" if the value is listed in the enum definition. "false" otherwise
func (i State) IsAState() bool {
for _, v := range _StateValues {
if i == v {
return true
}
}
return false
}

View File

@ -1,29 +0,0 @@
// Code generated by "stringer -type=State"; DO NOT EDIT.
package fsrep
import "strconv"
const (
_State_name_0 = "ReadyRetry"
_State_name_1 = "PermanentError"
_State_name_2 = "Completed"
)
var (
_State_index_0 = [...]uint8{0, 5, 10}
)
func (i State) String() string {
switch {
case 1 <= i && i <= 2:
i -= 1
return _State_name_0[_State_index_0[i]:_State_index_0[i+1]]
case i == 4:
return _State_name_1
case i == 8:
return _State_name_2
default:
return "State(" + strconv.FormatInt(int64(i), 10) + ")"
}
}

View File

@ -0,0 +1,61 @@
// Code generated by "enumer -type=StepState"; DO NOT EDIT.
package fsrep
import (
"fmt"
)
const (
_StepStateName_0 = "StepReplicationReadyStepMarkReplicatedReady"
_StepStateName_1 = "StepCompleted"
)
var (
_StepStateIndex_0 = [...]uint8{0, 20, 43}
_StepStateIndex_1 = [...]uint8{0, 13}
)
func (i StepState) String() string {
switch {
case 1 <= i && i <= 2:
i -= 1
return _StepStateName_0[_StepStateIndex_0[i]:_StepStateIndex_0[i+1]]
case i == 4:
return _StepStateName_1
default:
return fmt.Sprintf("StepState(%d)", i)
}
}
var _StepStateValues = []StepState{1, 2, 4}
var _StepStateNameToValueMap = map[string]StepState{
_StepStateName_0[0:20]: 1,
_StepStateName_0[20:43]: 2,
_StepStateName_1[0:13]: 4,
}
// StepStateString retrieves an enum value from the enum constants string name.
// Throws an error if the param is not part of the enum.
func StepStateString(s string) (StepState, error) {
if val, ok := _StepStateNameToValueMap[s]; ok {
return val, nil
}
return 0, fmt.Errorf("%s does not belong to StepState values", s)
}
// StepStateValues returns all values of the enum
func StepStateValues() []StepState {
return _StepStateValues
}
// IsAStepState returns "true" if the value is listed in the enum definition. "false" otherwise
func (i StepState) IsAStepState() bool {
for _, v := range _StepStateValues {
if i == v {
return true
}
}
return false
}

View File

@ -1,35 +0,0 @@
// Code generated by "stringer -type=StepState"; DO NOT EDIT.
package fsrep
import "strconv"
const (
_StepState_name_0 = "StepReplicationReadyStepReplicationRetry"
_StepState_name_1 = "StepMarkReplicatedReady"
_StepState_name_2 = "StepMarkReplicatedRetry"
_StepState_name_3 = "StepPermanentError"
_StepState_name_4 = "StepCompleted"
)
var (
_StepState_index_0 = [...]uint8{0, 20, 40}
)
func (i StepState) String() string {
switch {
case 1 <= i && i <= 2:
i -= 1
return _StepState_name_0[_StepState_index_0[i]:_StepState_index_0[i+1]]
case i == 4:
return _StepState_name_1
case i == 8:
return _StepState_name_2
case i == 16:
return _StepState_name_3
case i == 32:
return _StepState_name_4
default:
return "StepState(" + strconv.FormatInt(int64(i), 10) + ")"
}
}

View File

@ -1,122 +0,0 @@
package queue
import (
"sort"
"time"
. "github.com/zrepl/zrepl/replication/fsrep"
)
type replicationQueueItem struct {
// duplicates fsr.state to avoid accessing and locking fsr
state State
// duplicates fsr.current.nextStepDate to avoid accessing & locking fsr
nextStepDate time.Time
errorStateEnterCount int
fsr *Replication
}
type ReplicationQueue []*replicationQueueItem
func NewReplicationQueue() *ReplicationQueue {
q := make(ReplicationQueue, 0)
return &q
}
func (q ReplicationQueue) Len() int { return len(q) }
func (q ReplicationQueue) Swap(i, j int) { q[i], q[j] = q[j], q[i] }
type lessmapEntry struct {
prio int
less func(a, b *replicationQueueItem) bool
}
var lessmap = map[State]lessmapEntry{
Ready: {
prio: 0,
less: func(a, b *replicationQueueItem) bool {
return a.nextStepDate.Before(b.nextStepDate)
},
},
Retry: {
prio: 1,
less: func(a, b *replicationQueueItem) bool {
return a.errorStateEnterCount < b.errorStateEnterCount
},
},
}
func (q ReplicationQueue) Less(i, j int) bool {
a, b := q[i], q[j]
al, aok := lessmap[a.state]
if !aok {
panic(a)
}
bl, bok := lessmap[b.state]
if !bok {
panic(b)
}
if al.prio != bl.prio {
return al.prio < bl.prio
}
return al.less(a, b)
}
func (q *ReplicationQueue) sort() (done []*Replication) {
// pre-scan for everything that is not ready
newq := make(ReplicationQueue, 0, len(*q))
done = make([]*Replication, 0, len(*q))
for _, qitem := range *q {
if _, ok := lessmap[qitem.state]; !ok {
done = append(done, qitem.fsr)
} else {
newq = append(newq, qitem)
}
}
sort.Stable(newq) // stable to avoid flickering in reports
*q = newq
return done
}
// next remains valid until the next call to GetNext()
func (q *ReplicationQueue) GetNext() (done []*Replication, next *ReplicationQueueItemHandle) {
done = q.sort()
if len(*q) == 0 {
return done, nil
}
next = &ReplicationQueueItemHandle{(*q)[0]}
return done, next
}
func (q *ReplicationQueue) Add(fsr *Replication) {
*q = append(*q, &replicationQueueItem{
fsr: fsr,
state: fsr.State(),
})
}
func (q *ReplicationQueue) Foreach(fu func(*ReplicationQueueItemHandle)) {
for _, qitem := range *q {
fu(&ReplicationQueueItemHandle{qitem})
}
}
type ReplicationQueueItemHandle struct {
i *replicationQueueItem
}
func (h ReplicationQueueItemHandle) GetFSReplication() *Replication {
return h.i.fsr
}
func (h ReplicationQueueItemHandle) Update(newState State, nextStepDate time.Time) {
h.i.state = newState
h.i.nextStepDate = nextStepDate
if h.i.state.IsErrorState() {
h.i.errorStateEnterCount++
}
}

View File

@ -10,14 +10,15 @@ import (
"github.com/zrepl/zrepl/daemon/job/wakeup" "github.com/zrepl/zrepl/daemon/job/wakeup"
"github.com/zrepl/zrepl/util/envconst" "github.com/zrepl/zrepl/util/envconst"
"github.com/zrepl/zrepl/util/watchdog" "github.com/zrepl/zrepl/util/watchdog"
"github.com/problame/go-streamrpc"
"math/bits" "math/bits"
"net" "net"
"sort"
"sync" "sync"
"time" "time"
"github.com/zrepl/zrepl/replication/fsrep" "github.com/zrepl/zrepl/replication/fsrep"
. "github.com/zrepl/zrepl/replication/internal/diff" . "github.com/zrepl/zrepl/replication/internal/diff"
. "github.com/zrepl/zrepl/replication/internal/queue"
"github.com/zrepl/zrepl/replication/pdu" "github.com/zrepl/zrepl/replication/pdu"
) )
@ -70,9 +71,9 @@ type Replication struct {
state State state State
// Working, WorkingWait, Completed, ContextDone // Working, WorkingWait, Completed, ContextDone
queue *ReplicationQueue queue []*fsrep.Replication
completed []*fsrep.Replication completed []*fsrep.Replication
active *ReplicationQueueItemHandle active *fsrep.Replication // == queue[0] or nil, unlike in Report
// for PlanningError, WorkingWait and ContextError and Completed // for PlanningError, WorkingWait and ContextError and Completed
err error err error
@ -87,7 +88,7 @@ type Report struct {
SleepUntil time.Time SleepUntil time.Time
Completed []*fsrep.Report Completed []*fsrep.Report
Pending []*fsrep.Report Pending []*fsrep.Report
Active *fsrep.Report Active *fsrep.Report // not contained in Pending, unlike in struct Replication
} }
func NewReplication(secsPerState *prometheus.HistogramVec, bytesReplicated *prometheus.CounterVec) *Replication { func NewReplication(secsPerState *prometheus.HistogramVec, bytesReplicated *prometheus.CounterVec) *Replication {
@ -195,15 +196,20 @@ func resolveConflict(conflict error) (path []*pdu.FilesystemVersion, msg string)
var RetryInterval = envconst.Duration("ZREPL_REPLICATION_RETRY_INTERVAL", 4 * time.Second) var RetryInterval = envconst.Duration("ZREPL_REPLICATION_RETRY_INTERVAL", 4 * time.Second)
type Error interface {
error
Temporary() bool
}
var _ Error = fsrep.Error(nil)
var _ Error = net.Error(nil)
var _ Error = streamrpc.Error(nil)
func isPermanent(err error) bool { func isPermanent(err error) bool {
switch err { if e, ok := err.(Error); ok {
case context.Canceled: return true return !e.Temporary()
case context.DeadlineExceeded: return true
} }
if operr, ok := err.(net.Error); ok { return true
return !operr.Temporary()
}
return false
} }
func statePlanning(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state { func statePlanning(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state {
@ -214,8 +220,10 @@ func statePlanning(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, r
handlePlanningError := func(err error) state { handlePlanningError := func(err error) state {
return u(func(r *Replication) { return u(func(r *Replication) {
r.err = err ge := GlobalError{Err: err, Temporary: !isPermanent(err)}
if isPermanent(err) { log.WithError(ge).Error("encountered global error while planning replication")
r.err = ge
if !ge.Temporary {
r.state = PermanentError r.state = PermanentError
} else { } else {
r.sleepUntil = time.Now().Add(RetryInterval) r.sleepUntil = time.Now().Add(RetryInterval)
@ -239,7 +247,7 @@ func statePlanning(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, r
ka.MadeProgress() // for both sender and receiver ka.MadeProgress() // for both sender and receiver
q := NewReplicationQueue() q := make([]*fsrep.Replication, 0, len(sfss))
mainlog := log mainlog := log
for _, fs := range sfss { for _, fs := range sfss {
@ -257,7 +265,7 @@ func statePlanning(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, r
if len(sfsvs) < 1 { if len(sfsvs) < 1 {
err := errors.New("sender does not have any versions") err := errors.New("sender does not have any versions")
log.Error(err.Error()) log.Error(err.Error())
q.Add(fsrep.NewReplicationWithPermanentError(fs.Path, err)) q = append(q, fsrep.NewReplicationConflictError(fs.Path, err))
continue continue
} }
@ -298,7 +306,7 @@ func statePlanning(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, r
} }
ka.MadeProgress() ka.MadeProgress()
if path == nil { if path == nil {
q.Add(fsrep.NewReplicationWithPermanentError(fs.Path, conflict)) q = append(q, fsrep.NewReplicationConflictError(fs.Path, conflict))
continue continue
} }
@ -324,7 +332,7 @@ func statePlanning(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, r
} }
ka.MadeProgress() ka.MadeProgress()
q.Add(qitem) q = append(q, qitem)
} }
ka.MadeProgress() ka.MadeProgress()
@ -359,48 +367,118 @@ func statePlanningError(ctx context.Context, ka *watchdog.KeepAlive, sender Send
}).rsf() }).rsf()
} }
type GlobalError struct {
Err error
Temporary bool
}
func (e GlobalError) Error() string {
errClass := "temporary"
if !e.Temporary {
errClass = "permanent"
}
return fmt.Sprintf("%s global error: %s", errClass, e.Err)
}
type FilesystemsReplicationFailedError struct {
FilesystemsWithError []*fsrep.Replication
}
func (e FilesystemsReplicationFailedError) Error() string {
allSame := true
lastErr := e.FilesystemsWithError[0].Err().Error()
for _, fs := range e.FilesystemsWithError {
fsErr := fs.Err().Error()
allSame = allSame && lastErr == fsErr
}
fsstr := "multiple filesystems"
if len(e.FilesystemsWithError) == 1 {
fsstr = fmt.Sprintf("filesystem %s", e.FilesystemsWithError[0].FS())
}
errorStr := lastErr
if !allSame {
errorStr = "multiple different errors"
}
return fmt.Sprintf("%s could not be replicated: %s", fsstr, errorStr)
}
func stateWorking(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state { func stateWorking(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state {
var active *ReplicationQueueItemHandle var active *fsrep.Replication
rsfNext := u(func(r *Replication) { rsfNext := u(func(r *Replication) {
done, next := r.queue.GetNext()
r.completed = append(r.completed, done...) r.err = nil
if next == nil {
r.state = Completed newq := make([]*fsrep.Replication, 0, len(r.queue))
for i := range r.queue {
if r.queue[i].CanRetry() {
newq = append(newq, r.queue[i])
} else {
r.completed = append(r.completed, r.queue[i])
} }
r.active = next }
active = next sort.SliceStable(newq, func(i, j int) bool {
return newq[i].NextStepDate().Before(newq[j].NextStepDate())
})
r.queue = newq
if len(r.queue) == 0 {
r.state = Completed
fsWithErr := FilesystemsReplicationFailedError{ // prepare it
FilesystemsWithError: make([]*fsrep.Replication, 0, len(r.completed)),
}
for _, fs := range r.completed {
if fs.CanRetry() {
panic(fmt.Sprintf("implementation error: completed contains retryable FS %s %#v",
fs.FS(), fs.Err()))
}
if fs.Err() != nil {
fsWithErr.FilesystemsWithError = append(fsWithErr.FilesystemsWithError, fs)
}
}
if len(fsWithErr.FilesystemsWithError) > 0 {
r.err = fsWithErr
r.state = PermanentError
}
return
}
active = r.queue[0] // do not dequeue: if it's done, it will be sorted the next time we check for more work
r.active = active
}).rsf() }).rsf()
if active == nil { if active == nil {
return rsfNext return rsfNext
} }
state, nextStepDate := active.GetFSReplication().TakeStep(ctx, ka, sender, receiver) activeCtx := fsrep.WithLogger(ctx, getLogger(ctx).WithField("fs", active.FS()))
err := active.Retry(activeCtx, ka, sender, receiver)
u(func(r *Replication) { u(func(r *Replication) {
active.Update(state, nextStepDate)
r.active = nil r.active = nil
}).rsf() }).rsf()
select { if err != nil {
case <-ctx.Done(): if err.LocalToFS() {
return u(func(r *Replication) { getLogger(ctx).WithError(err).
r.err = ctx.Err() Error("filesystem replication encountered a filesystem-specific error")
r.state = PermanentError // we stay in this state and let the queuing logic above de-prioritize this failing FS
}).rsf() } else if err.Temporary() {
default: getLogger(ctx).WithError(err).
} Error("filesystem encountered a non-filesystem-specific temporary error, enter retry-wait")
u(func(r *Replication) {
if err := active.GetFSReplication().Err(); err != nil { r.err = GlobalError{Err: err, Temporary: true}
return u(func(r *Replication) {
r.err = err
if isPermanent(err) {
r.state = PermanentError
} else {
r.sleepUntil = time.Now().Add(RetryInterval) r.sleepUntil = time.Now().Add(RetryInterval)
r.state = WorkingWait r.state = WorkingWait
}
}).rsf() }).rsf()
} else {
getLogger(ctx).WithError(err).
Error("encountered a permanent non-filesystem-specific error")
u(func(r *Replication) {
r.err = GlobalError{Err: err, Temporary: false}
r.state = PermanentError
}).rsf()
}
} }
return u(nil).rsf() return u(nil).rsf()
@ -412,7 +490,7 @@ func stateWorkingWait(ctx context.Context, ka *watchdog.KeepAlive, sender Sender
sleepUntil = r.sleepUntil sleepUntil = r.sleepUntil
}) })
t := time.NewTimer(RetryInterval) t := time.NewTimer(RetryInterval)
getLogger(ctx).WithField("until", sleepUntil).Info("retry wait after replication step error") getLogger(ctx).WithField("until", sleepUntil).Info("retry wait after error")
defer t.Stop() defer t.Stop()
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -441,27 +519,26 @@ func (r *Replication) Report() *Report {
SleepUntil: r.sleepUntil, SleepUntil: r.sleepUntil,
} }
if r.state&(Planning|PlanningError|PermanentError) != 0 {
if r.err != nil { if r.err != nil {
rep.Problem = r.err.Error() rep.Problem = r.err.Error()
} }
if r.state&(Planning|PlanningError) != 0 {
return &rep return &rep
} }
rep.Pending = make([]*fsrep.Report, 0, r.queue.Len()) rep.Pending = make([]*fsrep.Report, 0, len(r.queue))
rep.Completed = make([]*fsrep.Report, 0, len(r.completed)) // room for active (potentially) rep.Completed = make([]*fsrep.Report, 0, len(r.completed)) // room for active (potentially)
var active *fsrep.Replication // since r.active == r.queue[0], do not contain it in pending output
pending := r.queue
if r.active != nil { if r.active != nil {
active = r.active.GetFSReplication() rep.Active = r.active.Report()
rep.Active = active.Report() pending = r.queue[1:]
} }
r.queue.Foreach(func(h *ReplicationQueueItemHandle) { for _, fsr := range pending {
fsr := h.GetFSReplication() rep.Pending= append(rep.Pending, fsr.Report())
if active != fsr {
rep.Pending = append(rep.Pending, fsr.Report())
} }
})
for _, fsr := range r.completed { for _, fsr := range r.completed {
rep.Completed = append(rep.Completed, fsr.Report()) rep.Completed = append(rep.Completed, fsr.Report())
} }