diff --git a/Gopkg.lock b/Gopkg.lock index bb81345..e3785d6 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -174,15 +174,15 @@ revision = "391d2c78c8404a9683d79f75dd24ab53040f89f7" [[projects]] - digest = "1:c2ba1c9dc003c15856e4529dac028cacba08ee8924300f058b3467cde9acf7a9" + digest = "1:1bcbb0a7ad8d3392d446eb583ae5415ff987838a8f7331a36877789be20667e6" name = "github.com/problame/go-streamrpc" packages = [ ".", "internal/pdu", ] pruneopts = "" - revision = "de6f6a4041c77f700f02d8fe749e54efa50811f7" - version = "v0.4" + revision = "d5d111e014342fe1c37f0b71cc37ec5f2afdfd13" + version = "v0.5" [[projects]] branch = "master" diff --git a/Gopkg.toml b/Gopkg.toml index a478c08..6c8a9b7 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -66,7 +66,7 @@ ignored = [ "github.com/inconshreveable/mousetrap" ] [[constraint]] name = "github.com/problame/go-streamrpc" - version = "0.4.0" + version = "0.5.0" diff --git a/Makefile b/Makefile index 6d5f508..c6fc773 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,6 @@ SUBPKGS += pruning/retentiongrid SUBPKGS += replication SUBPKGS += replication/fsrep SUBPKGS += replication/pdu -SUBPKGS += replication/internal/queue SUBPKGS += replication/internal/diff SUBPKGS += tlsconf SUBPKGS += util diff --git a/client/status.go b/client/status.go index d00556e..a34ab49 100644 --- a/client/status.go +++ b/client/status.go @@ -110,6 +110,7 @@ func (t *tui) addIndent(indent int) { t.moveLine(0, 0) } + var statusFlags struct { Raw bool } @@ -514,10 +515,7 @@ func (t *tui) drawBar(length int, bytes, totalBytes int64) { func StringStepState(s fsrep.StepState) string { switch s { case fsrep.StepReplicationReady: return "Ready" - case fsrep.StepReplicationRetry: return "Retry" case fsrep.StepMarkReplicatedReady: return "MarkReady" - case fsrep.StepMarkReplicatedRetry: return "MarkRetry" - case fsrep.StepPermanentError: return "PermanentError" case fsrep.StepCompleted: return "Completed" default: return fmt.Sprintf("UNKNOWN %d", s) diff --git a/daemon/pruner/pruner.go b/daemon/pruner/pruner.go index 7c0572c..56cdba2 100644 --- a/daemon/pruner/pruner.go +++ b/daemon/pruner/pruner.go @@ -11,6 +11,7 @@ import ( "github.com/zrepl/zrepl/replication/pdu" "github.com/zrepl/zrepl/util/envconst" "github.com/zrepl/zrepl/util/watchdog" + "github.com/problame/go-streamrpc" "net" "sort" "strings" @@ -334,6 +335,14 @@ func (s snapshot) Replicated() bool { return s.replicated } func (s snapshot) Date() time.Time { return s.date } +type Error interface { + error + Temporary() bool +} + +var _ Error = net.Error(nil) +var _ Error = streamrpc.Error(nil) + func shouldRetry(e error) bool { if neterr, ok := e.(net.Error); ok { return neterr.Temporary() diff --git a/replication/fsrep/fsfsm.go b/replication/fsrep/fsfsm.go index 1eaa14c..facb889 100644 --- a/replication/fsrep/fsfsm.go +++ b/replication/fsrep/fsfsm.go @@ -71,38 +71,29 @@ type Report struct { Completed, Pending []*StepReport } -//go:generate stringer -type=State +//go:generate enumer -type=State type State uint const ( Ready State = 1 << iota - Retry - PermanentError Completed ) -func (s State) fsrsf() state { - m := map[State]state{ - Ready: stateReady, - Retry: stateRetry, - PermanentError: nil, - Completed: nil, - } - return m[s] -} - -func (s State) IsErrorState() bool { - return s & (Retry|PermanentError) != 0 +type Error interface { + error + Temporary() bool + LocalToFS() bool } type Replication struct { promBytesReplicated prometheus.Counter + fs string + // lock protects all fields below it in this struct, but not the data behind pointers lock sync.Mutex state State - fs string - err error + err Error completed, pending []*ReplicationStep } @@ -112,13 +103,35 @@ func (f *Replication) State() State { return f.state } -func (f *Replication) Err() error { +func (f *Replication) FS() string { return f.fs } + +// returns zero value time.Time{} if no more pending steps +func (f *Replication) NextStepDate() time.Time { + if len(f.pending) == 0 { + return time.Time{} + } + return f.pending[0].to.SnapshotTime() +} + +func (f *Replication) Err() Error { f.lock.Lock() defer f.lock.Unlock() - if f.state & (Retry|PermanentError) != 0 { - return f.err + return f.err +} + +func (f *Replication) CanRetry() bool { + f.lock.Lock() + defer f.lock.Unlock() + if f.state == Completed { + return false } - return nil + if f.state != Ready { + panic(fmt.Sprintf("implementation error: %v", f.state)) + } + if f.err == nil { + return true + } + return f.err.Temporary() } func (f *Replication) UpdateSizeEsitmate(ctx context.Context, sender Sender) error { @@ -162,26 +175,37 @@ func (b *ReplicationBuilder) Done() (r *Replication) { return r } -func NewReplicationWithPermanentError(fs string, err error) *Replication { +type ReplicationConflictError struct { + Err error +} + +func (e *ReplicationConflictError) Timeout() bool { return false } + +func (e *ReplicationConflictError) Temporary() bool { return false } + +func (e *ReplicationConflictError) Error() string { return fmt.Sprintf("permanent error: %s", e.Err.Error()) } + +func (e *ReplicationConflictError) LocalToFS() bool { return true } + +func NewReplicationConflictError(fs string, err error) *Replication { return &Replication{ - state: PermanentError, + state: Completed, fs: fs, - err: err, + err: &ReplicationConflictError{Err: err}, } } -//go:generate stringer -type=StepState +//go:generate enumer -type=StepState type StepState uint const ( StepReplicationReady StepState = 1 << iota - StepReplicationRetry StepMarkReplicatedReady - StepMarkReplicatedRetry - StepPermanentError StepCompleted ) +func (s StepState) IsTerminal() bool { return s == StepCompleted } + type FilesystemVersion interface { SnapshotTime() time.Time GetName() string // name without @ or # @@ -204,7 +228,7 @@ type ReplicationStep struct { expectedSize int64 // 0 means no size estimate present / possible } -func (f *Replication) TakeStep(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver) (post State, nextStepDate time.Time) { +func (f *Replication) Retry(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver) Error { var u updater = func(fu func(*Replication)) State { f.lock.Lock() @@ -214,76 +238,98 @@ func (f *Replication) TakeStep(ctx context.Context, ka *watchdog.KeepAlive, send } return f.state } - var s state = u(nil).fsrsf() - pre := u(nil) - preTime := time.Now() - s = s(ctx, ka, sender, receiver, u) - delta := time.Now().Sub(preTime) - - post = u(func(f *Replication) { - if len(f.pending) == 0 { - return - } - nextStepDate = f.pending[0].to.SnapshotTime() - }) - - getLogger(ctx). - WithField("fs", f.fs). - WithField("transition", fmt.Sprintf("%s => %s", pre, post)). - WithField("duration", delta). - Debug("fsr step taken") - - return post, nextStepDate -} - -type updater func(func(fsr *Replication)) State - -type state func(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state - -func stateReady(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state { var current *ReplicationStep - s := u(func(f *Replication) { + pre := u(nil) + getLogger(ctx).WithField("fsrep_state", pre).Debug("begin fsrep.Retry") + defer func() { + post := u(nil) + getLogger(ctx).WithField("fsrep_transition", post).Debug("end fsrep.Retry") + }() + + st := u(func(f *Replication) { if len(f.pending) == 0 { f.state = Completed return } current = f.pending[0] }) - if s != Ready { - return s.fsrsf() + if st == Completed { + return nil + } + if st != Ready { + panic(fmt.Sprintf("implementation error: %v", st)) } - stepState := current.doReplication(ctx, ka, sender, receiver) + stepCtx := WithLogger(ctx, getLogger(ctx).WithField("step", current)) + getLogger(stepCtx).Debug("take step") + err := current.Retry(stepCtx, ka, sender, receiver) + if err != nil { + getLogger(stepCtx).WithError(err).Error("step could not be completed") + } - return u(func(f *Replication) { - switch stepState { - case StepCompleted: - f.completed = append(f.completed, current) - f.pending = f.pending[1:] - if len(f.pending) > 0 { - f.state = Ready - } else { - f.state = Completed - } - case StepReplicationRetry: - fallthrough - case StepMarkReplicatedRetry: - f.state = Retry - case StepPermanentError: - f.state = PermanentError - f.err = errors.New("a replication step failed with a permanent error") - default: - panic(f) + u(func(fsr *Replication) { + if err != nil { + f.err = &StepError{stepStr: current.String(), err: err} + return } - }).fsrsf() + if err == nil && current.state != StepCompleted { + panic(fmt.Sprintf("implementation error: %v", current.state)) + } + f.err = nil + f.completed = append(f.completed, current) + f.pending = f.pending[1:] + if len(f.pending) > 0 { + f.state = Ready + } else { + f.state = Completed + } + }) + var retErr Error = nil + u(func(fsr *Replication) { + retErr = fsr.err + }) + return retErr } -func stateRetry(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state { - return u(func(fsr *Replication) { - fsr.state = Ready - }).fsrsf() +type updater func(func(fsr *Replication)) State + +type state func(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state + +type StepError struct { + stepStr string + err error +} + +var _ Error = &StepError{} + +func (e StepError) Error() string { + if e.LocalToFS() { + return fmt.Sprintf("step %s failed: %s", e.stepStr, e.err) + } + return e.err.Error() +} + +func (e StepError) Timeout() bool { + if neterr, ok := e.err.(net.Error); ok { + return neterr.Timeout() + } + return false +} + +func (e StepError) Temporary() bool { + if neterr, ok := e.err.(net.Error); ok { + return neterr.Temporary() + } + return false +} + +func (e StepError) LocalToFS() bool { + if _, ok := e.err.(net.Error); ok { + return false + } + return true // conservative approximation: we'd like to check for specific errors returned over RPC here... } func (fsr *Replication) Report() *Report { @@ -295,9 +341,8 @@ func (fsr *Replication) Report() *Report { Status: fsr.state.String(), } - if fsr.state&PermanentError != 0 { + if fsr.err != nil && fsr.err.LocalToFS() { rep.Problem = fsr.err.Error() - return &rep } rep.Completed = make([]*StepReport, len(fsr.completed)) @@ -309,70 +354,48 @@ func (fsr *Replication) Report() *Report { rep.Pending[i] = fsr.pending[i].Report() } - if fsr.state&Retry != 0 { - if len(rep.Pending) != 0 { // should always be true for Retry == true? - rep.Problem = rep.Pending[0].Problem - } - } - return &rep } -func shouldRetry(err error) bool { - switch err { - case io.EOF: - fallthrough - case io.ErrUnexpectedEOF: - fallthrough - case io.ErrClosedPipe: - return true +func (s *ReplicationStep) Retry(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver) error { + switch s.state { + case StepReplicationReady: + return s.doReplication(ctx, ka, sender, receiver) + case StepMarkReplicatedReady: + return s.doMarkReplicated(ctx, ka, sender) + case StepCompleted: + return nil } - if _, ok := err.(net.Error); ok { - return true - } - return false + panic(fmt.Sprintf("implementation error: %v", s.state)) } -func (s *ReplicationStep) doReplication(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver) StepState { +func (s *ReplicationStep) Error() error { + if s.state & (StepReplicationReady|StepMarkReplicatedReady) != 0 { + return s.err + } + return nil +} + +func (s *ReplicationStep) doReplication(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver) error { + + if s.state != StepReplicationReady { + panic(fmt.Sprintf("implementation error: %v", s.state)) + } fs := s.parent.fs - log := getLogger(ctx). - WithField("filesystem", fs). - WithField("step", s.String()) - - updateStateError := func(err error) StepState { - s.lock.Lock() - defer s.lock.Unlock() - - s.err = err - if shouldRetry(s.err) { - s.state = StepReplicationRetry - return s.state - } - s.state = StepPermanentError - return s.state - } - - updateStateCompleted := func() StepState { - s.lock.Lock() - defer s.lock.Unlock() - s.err = nil - s.state = StepMarkReplicatedReady - return s.state - } - + log := getLogger(ctx) sr := s.buildSendRequest(false) log.Debug("initiate send request") sres, sstream, err := sender.Send(ctx, sr) if err != nil { log.WithError(err).Error("send request failed") - return updateStateError(err) + return err } if sstream == nil { err := errors.New("send request did not return a stream, broken endpoint implementation") - return updateStateError(err) + return err } s.byteCounter = util.NewByteCounterReader(sstream) @@ -400,42 +423,23 @@ func (s *ReplicationStep) doReplication(ctx context.Context, ka *watchdog.KeepAl // - an unexpected exit of ZFS on the sending side // - an unexpected exit of ZFS on the receiving side // - a connectivity issue - return updateStateError(err) + return err } log.Debug("receive finished") ka.MadeProgress() - updateStateCompleted() - + s.state = StepMarkReplicatedReady return s.doMarkReplicated(ctx, ka, sender) } -func (s *ReplicationStep) doMarkReplicated(ctx context.Context, ka *watchdog.KeepAlive, sender Sender) StepState { +func (s *ReplicationStep) doMarkReplicated(ctx context.Context, ka *watchdog.KeepAlive, sender Sender) error { - log := getLogger(ctx). - WithField("filesystem", s.parent.fs). - WithField("step", s.String()) - - updateStateError := func(err error) StepState { - s.lock.Lock() - defer s.lock.Unlock() - - s.err = err - if shouldRetry(s.err) { - s.state = StepMarkReplicatedRetry - return s.state - } - s.state = StepPermanentError - return s.state + if s.state != StepMarkReplicatedReady { + panic(fmt.Sprintf("implementation error: %v", s.state)) } - updateStateCompleted := func() StepState { - s.lock.Lock() - defer s.lock.Unlock() - s.state = StepCompleted - return s.state - } + log := getLogger(ctx) log.Debug("advance replication cursor") req := &pdu.ReplicationCursorReq{ @@ -449,25 +453,22 @@ func (s *ReplicationStep) doMarkReplicated(ctx context.Context, ka *watchdog.Kee res, err := sender.ReplicationCursor(ctx, req) if err != nil { log.WithError(err).Error("error advancing replication cursor") - return updateStateError(err) + return err } if res.GetError() != "" { err := fmt.Errorf("cannot advance replication cursor: %s", res.GetError()) log.Error(err.Error()) - return updateStateError(err) + return err } ka.MadeProgress() - return updateStateCompleted() + s.state = StepCompleted + return err } func (s *ReplicationStep) updateSizeEstimate(ctx context.Context, sender Sender) error { - fs := s.parent.fs - - log := getLogger(ctx). - WithField("filesystem", fs). - WithField("step", s.String()) + log := getLogger(ctx) sr := s.buildSendRequest(true) diff --git a/replication/fsrep/state_enumer.go b/replication/fsrep/state_enumer.go new file mode 100644 index 0000000..6e38ece --- /dev/null +++ b/replication/fsrep/state_enumer.go @@ -0,0 +1,50 @@ +// Code generated by "enumer -type=State"; DO NOT EDIT. + +package fsrep + +import ( + "fmt" +) + +const _StateName = "ReadyCompleted" + +var _StateIndex = [...]uint8{0, 5, 14} + +func (i State) String() string { + i -= 1 + if i >= State(len(_StateIndex)-1) { + return fmt.Sprintf("State(%d)", i+1) + } + return _StateName[_StateIndex[i]:_StateIndex[i+1]] +} + +var _StateValues = []State{1, 2} + +var _StateNameToValueMap = map[string]State{ + _StateName[0:5]: 1, + _StateName[5:14]: 2, +} + +// StateString retrieves an enum value from the enum constants string name. +// Throws an error if the param is not part of the enum. +func StateString(s string) (State, error) { + if val, ok := _StateNameToValueMap[s]; ok { + return val, nil + } + return 0, fmt.Errorf("%s does not belong to State values", s) +} + +// StateValues returns all values of the enum +func StateValues() []State { + return _StateValues +} + +// IsAState returns "true" if the value is listed in the enum definition. "false" otherwise +func (i State) IsAState() bool { + for _, v := range _StateValues { + if i == v { + return true + } + } + return false +} diff --git a/replication/fsrep/state_string.go b/replication/fsrep/state_string.go deleted file mode 100644 index 0772f28..0000000 --- a/replication/fsrep/state_string.go +++ /dev/null @@ -1,29 +0,0 @@ -// Code generated by "stringer -type=State"; DO NOT EDIT. - -package fsrep - -import "strconv" - -const ( - _State_name_0 = "ReadyRetry" - _State_name_1 = "PermanentError" - _State_name_2 = "Completed" -) - -var ( - _State_index_0 = [...]uint8{0, 5, 10} -) - -func (i State) String() string { - switch { - case 1 <= i && i <= 2: - i -= 1 - return _State_name_0[_State_index_0[i]:_State_index_0[i+1]] - case i == 4: - return _State_name_1 - case i == 8: - return _State_name_2 - default: - return "State(" + strconv.FormatInt(int64(i), 10) + ")" - } -} diff --git a/replication/fsrep/stepstate_enumer.go b/replication/fsrep/stepstate_enumer.go new file mode 100644 index 0000000..287515c --- /dev/null +++ b/replication/fsrep/stepstate_enumer.go @@ -0,0 +1,61 @@ +// Code generated by "enumer -type=StepState"; DO NOT EDIT. + +package fsrep + +import ( + "fmt" +) + +const ( + _StepStateName_0 = "StepReplicationReadyStepMarkReplicatedReady" + _StepStateName_1 = "StepCompleted" +) + +var ( + _StepStateIndex_0 = [...]uint8{0, 20, 43} + _StepStateIndex_1 = [...]uint8{0, 13} +) + +func (i StepState) String() string { + switch { + case 1 <= i && i <= 2: + i -= 1 + return _StepStateName_0[_StepStateIndex_0[i]:_StepStateIndex_0[i+1]] + case i == 4: + return _StepStateName_1 + default: + return fmt.Sprintf("StepState(%d)", i) + } +} + +var _StepStateValues = []StepState{1, 2, 4} + +var _StepStateNameToValueMap = map[string]StepState{ + _StepStateName_0[0:20]: 1, + _StepStateName_0[20:43]: 2, + _StepStateName_1[0:13]: 4, +} + +// StepStateString retrieves an enum value from the enum constants string name. +// Throws an error if the param is not part of the enum. +func StepStateString(s string) (StepState, error) { + if val, ok := _StepStateNameToValueMap[s]; ok { + return val, nil + } + return 0, fmt.Errorf("%s does not belong to StepState values", s) +} + +// StepStateValues returns all values of the enum +func StepStateValues() []StepState { + return _StepStateValues +} + +// IsAStepState returns "true" if the value is listed in the enum definition. "false" otherwise +func (i StepState) IsAStepState() bool { + for _, v := range _StepStateValues { + if i == v { + return true + } + } + return false +} diff --git a/replication/fsrep/stepstate_string.go b/replication/fsrep/stepstate_string.go deleted file mode 100644 index aeac7e1..0000000 --- a/replication/fsrep/stepstate_string.go +++ /dev/null @@ -1,35 +0,0 @@ -// Code generated by "stringer -type=StepState"; DO NOT EDIT. - -package fsrep - -import "strconv" - -const ( - _StepState_name_0 = "StepReplicationReadyStepReplicationRetry" - _StepState_name_1 = "StepMarkReplicatedReady" - _StepState_name_2 = "StepMarkReplicatedRetry" - _StepState_name_3 = "StepPermanentError" - _StepState_name_4 = "StepCompleted" -) - -var ( - _StepState_index_0 = [...]uint8{0, 20, 40} -) - -func (i StepState) String() string { - switch { - case 1 <= i && i <= 2: - i -= 1 - return _StepState_name_0[_StepState_index_0[i]:_StepState_index_0[i+1]] - case i == 4: - return _StepState_name_1 - case i == 8: - return _StepState_name_2 - case i == 16: - return _StepState_name_3 - case i == 32: - return _StepState_name_4 - default: - return "StepState(" + strconv.FormatInt(int64(i), 10) + ")" - } -} diff --git a/replication/internal/queue/queue.go b/replication/internal/queue/queue.go deleted file mode 100644 index c585612..0000000 --- a/replication/internal/queue/queue.go +++ /dev/null @@ -1,122 +0,0 @@ -package queue - -import ( - "sort" - "time" - - . "github.com/zrepl/zrepl/replication/fsrep" -) - -type replicationQueueItem struct { - // duplicates fsr.state to avoid accessing and locking fsr - state State - // duplicates fsr.current.nextStepDate to avoid accessing & locking fsr - nextStepDate time.Time - errorStateEnterCount int - - fsr *Replication -} - -type ReplicationQueue []*replicationQueueItem - -func NewReplicationQueue() *ReplicationQueue { - q := make(ReplicationQueue, 0) - return &q -} - -func (q ReplicationQueue) Len() int { return len(q) } -func (q ReplicationQueue) Swap(i, j int) { q[i], q[j] = q[j], q[i] } - -type lessmapEntry struct { - prio int - less func(a, b *replicationQueueItem) bool -} - -var lessmap = map[State]lessmapEntry{ - Ready: { - prio: 0, - less: func(a, b *replicationQueueItem) bool { - return a.nextStepDate.Before(b.nextStepDate) - }, - }, - Retry: { - prio: 1, - less: func(a, b *replicationQueueItem) bool { - return a.errorStateEnterCount < b.errorStateEnterCount - }, - }, -} - -func (q ReplicationQueue) Less(i, j int) bool { - - a, b := q[i], q[j] - al, aok := lessmap[a.state] - if !aok { - panic(a) - } - bl, bok := lessmap[b.state] - if !bok { - panic(b) - } - - if al.prio != bl.prio { - return al.prio < bl.prio - } - - return al.less(a, b) -} - -func (q *ReplicationQueue) sort() (done []*Replication) { - // pre-scan for everything that is not ready - newq := make(ReplicationQueue, 0, len(*q)) - done = make([]*Replication, 0, len(*q)) - for _, qitem := range *q { - if _, ok := lessmap[qitem.state]; !ok { - done = append(done, qitem.fsr) - } else { - newq = append(newq, qitem) - } - } - sort.Stable(newq) // stable to avoid flickering in reports - *q = newq - return done -} - -// next remains valid until the next call to GetNext() -func (q *ReplicationQueue) GetNext() (done []*Replication, next *ReplicationQueueItemHandle) { - done = q.sort() - if len(*q) == 0 { - return done, nil - } - next = &ReplicationQueueItemHandle{(*q)[0]} - return done, next -} - -func (q *ReplicationQueue) Add(fsr *Replication) { - *q = append(*q, &replicationQueueItem{ - fsr: fsr, - state: fsr.State(), - }) -} - -func (q *ReplicationQueue) Foreach(fu func(*ReplicationQueueItemHandle)) { - for _, qitem := range *q { - fu(&ReplicationQueueItemHandle{qitem}) - } -} - -type ReplicationQueueItemHandle struct { - i *replicationQueueItem -} - -func (h ReplicationQueueItemHandle) GetFSReplication() *Replication { - return h.i.fsr -} - -func (h ReplicationQueueItemHandle) Update(newState State, nextStepDate time.Time) { - h.i.state = newState - h.i.nextStepDate = nextStepDate - if h.i.state.IsErrorState() { - h.i.errorStateEnterCount++ - } -} diff --git a/replication/mainfsm.go b/replication/mainfsm.go index a2a8948..0372226 100644 --- a/replication/mainfsm.go +++ b/replication/mainfsm.go @@ -10,14 +10,15 @@ import ( "github.com/zrepl/zrepl/daemon/job/wakeup" "github.com/zrepl/zrepl/util/envconst" "github.com/zrepl/zrepl/util/watchdog" + "github.com/problame/go-streamrpc" "math/bits" "net" + "sort" "sync" "time" "github.com/zrepl/zrepl/replication/fsrep" . "github.com/zrepl/zrepl/replication/internal/diff" - . "github.com/zrepl/zrepl/replication/internal/queue" "github.com/zrepl/zrepl/replication/pdu" ) @@ -70,9 +71,9 @@ type Replication struct { state State // Working, WorkingWait, Completed, ContextDone - queue *ReplicationQueue + queue []*fsrep.Replication completed []*fsrep.Replication - active *ReplicationQueueItemHandle + active *fsrep.Replication // == queue[0] or nil, unlike in Report // for PlanningError, WorkingWait and ContextError and Completed err error @@ -87,7 +88,7 @@ type Report struct { SleepUntil time.Time Completed []*fsrep.Report Pending []*fsrep.Report - Active *fsrep.Report + Active *fsrep.Report // not contained in Pending, unlike in struct Replication } func NewReplication(secsPerState *prometheus.HistogramVec, bytesReplicated *prometheus.CounterVec) *Replication { @@ -195,15 +196,20 @@ func resolveConflict(conflict error) (path []*pdu.FilesystemVersion, msg string) var RetryInterval = envconst.Duration("ZREPL_REPLICATION_RETRY_INTERVAL", 4 * time.Second) +type Error interface { + error + Temporary() bool +} + +var _ Error = fsrep.Error(nil) +var _ Error = net.Error(nil) +var _ Error = streamrpc.Error(nil) + func isPermanent(err error) bool { - switch err { - case context.Canceled: return true - case context.DeadlineExceeded: return true + if e, ok := err.(Error); ok { + return !e.Temporary() } - if operr, ok := err.(net.Error); ok { - return !operr.Temporary() - } - return false + return true } func statePlanning(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state { @@ -214,8 +220,10 @@ func statePlanning(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, r handlePlanningError := func(err error) state { return u(func(r *Replication) { - r.err = err - if isPermanent(err) { + ge := GlobalError{Err: err, Temporary: !isPermanent(err)} + log.WithError(ge).Error("encountered global error while planning replication") + r.err = ge + if !ge.Temporary { r.state = PermanentError } else { r.sleepUntil = time.Now().Add(RetryInterval) @@ -239,7 +247,7 @@ func statePlanning(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, r ka.MadeProgress() // for both sender and receiver - q := NewReplicationQueue() + q := make([]*fsrep.Replication, 0, len(sfss)) mainlog := log for _, fs := range sfss { @@ -257,7 +265,7 @@ func statePlanning(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, r if len(sfsvs) < 1 { err := errors.New("sender does not have any versions") log.Error(err.Error()) - q.Add(fsrep.NewReplicationWithPermanentError(fs.Path, err)) + q = append(q, fsrep.NewReplicationConflictError(fs.Path, err)) continue } @@ -298,7 +306,7 @@ func statePlanning(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, r } ka.MadeProgress() if path == nil { - q.Add(fsrep.NewReplicationWithPermanentError(fs.Path, conflict)) + q = append(q, fsrep.NewReplicationConflictError(fs.Path, conflict)) continue } @@ -324,7 +332,7 @@ func statePlanning(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, r } ka.MadeProgress() - q.Add(qitem) + q = append(q, qitem) } ka.MadeProgress() @@ -359,48 +367,118 @@ func statePlanningError(ctx context.Context, ka *watchdog.KeepAlive, sender Send }).rsf() } +type GlobalError struct { + Err error + Temporary bool +} + +func (e GlobalError) Error() string { + errClass := "temporary" + if !e.Temporary { + errClass = "permanent" + } + return fmt.Sprintf("%s global error: %s", errClass, e.Err) +} + +type FilesystemsReplicationFailedError struct { + FilesystemsWithError []*fsrep.Replication +} + +func (e FilesystemsReplicationFailedError) Error() string { + allSame := true + lastErr := e.FilesystemsWithError[0].Err().Error() + for _, fs := range e.FilesystemsWithError { + fsErr := fs.Err().Error() + allSame = allSame && lastErr == fsErr + } + + fsstr := "multiple filesystems" + if len(e.FilesystemsWithError) == 1 { + fsstr = fmt.Sprintf("filesystem %s", e.FilesystemsWithError[0].FS()) + } + errorStr := lastErr + if !allSame { + errorStr = "multiple different errors" + } + return fmt.Sprintf("%s could not be replicated: %s", fsstr, errorStr) +} + func stateWorking(ctx context.Context, ka *watchdog.KeepAlive, sender Sender, receiver Receiver, u updater) state { - var active *ReplicationQueueItemHandle + var active *fsrep.Replication rsfNext := u(func(r *Replication) { - done, next := r.queue.GetNext() - r.completed = append(r.completed, done...) - if next == nil { - r.state = Completed + + r.err = nil + + newq := make([]*fsrep.Replication, 0, len(r.queue)) + for i := range r.queue { + if r.queue[i].CanRetry() { + newq = append(newq, r.queue[i]) + } else { + r.completed = append(r.completed, r.queue[i]) + } } - r.active = next - active = next + sort.SliceStable(newq, func(i, j int) bool { + return newq[i].NextStepDate().Before(newq[j].NextStepDate()) + }) + r.queue = newq + + if len(r.queue) == 0 { + r.state = Completed + fsWithErr := FilesystemsReplicationFailedError{ // prepare it + FilesystemsWithError: make([]*fsrep.Replication, 0, len(r.completed)), + } + for _, fs := range r.completed { + if fs.CanRetry() { + panic(fmt.Sprintf("implementation error: completed contains retryable FS %s %#v", + fs.FS(), fs.Err())) + } + if fs.Err() != nil { + fsWithErr.FilesystemsWithError = append(fsWithErr.FilesystemsWithError, fs) + } + } + if len(fsWithErr.FilesystemsWithError) > 0 { + r.err = fsWithErr + r.state = PermanentError + } + return + } + + active = r.queue[0] // do not dequeue: if it's done, it will be sorted the next time we check for more work + r.active = active }).rsf() if active == nil { return rsfNext } - state, nextStepDate := active.GetFSReplication().TakeStep(ctx, ka, sender, receiver) + activeCtx := fsrep.WithLogger(ctx, getLogger(ctx).WithField("fs", active.FS())) + err := active.Retry(activeCtx, ka, sender, receiver) u(func(r *Replication) { - active.Update(state, nextStepDate) r.active = nil }).rsf() - select { - case <-ctx.Done(): - return u(func(r *Replication) { - r.err = ctx.Err() - r.state = PermanentError - }).rsf() - default: - } - - if err := active.GetFSReplication().Err(); err != nil { - return u(func(r *Replication) { - r.err = err - if isPermanent(err) { - r.state = PermanentError - } else { + if err != nil { + if err.LocalToFS() { + getLogger(ctx).WithError(err). + Error("filesystem replication encountered a filesystem-specific error") + // we stay in this state and let the queuing logic above de-prioritize this failing FS + } else if err.Temporary() { + getLogger(ctx).WithError(err). + Error("filesystem encountered a non-filesystem-specific temporary error, enter retry-wait") + u(func(r *Replication) { + r.err = GlobalError{Err: err, Temporary: true} r.sleepUntil = time.Now().Add(RetryInterval) r.state = WorkingWait - } - }).rsf() + }).rsf() + } else { + getLogger(ctx).WithError(err). + Error("encountered a permanent non-filesystem-specific error") + u(func(r *Replication) { + r.err = GlobalError{Err: err, Temporary: false} + r.state = PermanentError + }).rsf() + } } return u(nil).rsf() @@ -412,7 +490,7 @@ func stateWorkingWait(ctx context.Context, ka *watchdog.KeepAlive, sender Sender sleepUntil = r.sleepUntil }) t := time.NewTimer(RetryInterval) - getLogger(ctx).WithField("until", sleepUntil).Info("retry wait after replication step error") + getLogger(ctx).WithField("until", sleepUntil).Info("retry wait after error") defer t.Stop() select { case <-ctx.Done(): @@ -441,27 +519,26 @@ func (r *Replication) Report() *Report { SleepUntil: r.sleepUntil, } - if r.state&(Planning|PlanningError|PermanentError) != 0 { - if r.err != nil { - rep.Problem = r.err.Error() - } + if r.err != nil { + rep.Problem = r.err.Error() + } + + if r.state&(Planning|PlanningError) != 0 { return &rep } - rep.Pending = make([]*fsrep.Report, 0, r.queue.Len()) + rep.Pending = make([]*fsrep.Report, 0, len(r.queue)) rep.Completed = make([]*fsrep.Report, 0, len(r.completed)) // room for active (potentially) - var active *fsrep.Replication + // since r.active == r.queue[0], do not contain it in pending output + pending := r.queue if r.active != nil { - active = r.active.GetFSReplication() - rep.Active = active.Report() + rep.Active = r.active.Report() + pending = r.queue[1:] + } + for _, fsr := range pending { + rep.Pending= append(rep.Pending, fsr.Report()) } - r.queue.Foreach(func(h *ReplicationQueueItemHandle) { - fsr := h.GetFSReplication() - if active != fsr { - rep.Pending = append(rep.Pending, fsr.Report()) - } - }) for _, fsr := range r.completed { rep.Completed = append(rep.Completed, fsr.Report()) }