Reporting

This commit is contained in:
Christian Schwarz 2018-08-15 20:29:34 +02:00
parent 7303d91abf
commit 991f13a3da
6 changed files with 387 additions and 154 deletions

View File

@ -6,7 +6,7 @@ import "strconv"
const ( const (
_FSReplicationState_name_0 = "FSQueuedFSActive" _FSReplicationState_name_0 = "FSQueuedFSActive"
_FSReplicationState_name_1 = "FSRetry" _FSReplicationState_name_1 = "FSRetryWait"
_FSReplicationState_name_2 = "FSPermanentError" _FSReplicationState_name_2 = "FSPermanentError"
_FSReplicationState_name_3 = "FSCompleted" _FSReplicationState_name_3 = "FSCompleted"
) )

View File

@ -4,9 +4,9 @@ package replication
import "strconv" import "strconv"
const _FSReplicationStepState_name = "StepPendingStepActiveStepRetryStepPermanentErrorStepCompleted" const _FSReplicationStepState_name = "StepPendingStepRetryStepPermanentErrorStepCompleted"
var _FSReplicationStepState_index = [...]uint8{0, 11, 21, 30, 48, 61} var _FSReplicationStepState_index = [...]uint8{0, 11, 20, 38, 51}
func (i FSReplicationStepState) String() string { func (i FSReplicationStepState) String() string {
if i < 0 || i >= FSReplicationStepState(len(_FSReplicationStepState_index)-1) { if i < 0 || i >= FSReplicationStepState(len(_FSReplicationStepState_index)-1) {

View File

@ -7,6 +7,7 @@ import (
"io" "io"
"net" "net"
"sort" "sort"
"sync"
"time" "time"
) )
@ -14,7 +15,7 @@ import (
type ReplicationState int type ReplicationState int
const ( const (
Planning ReplicationState = iota Planning ReplicationState = 1 << iota
PlanningError PlanningError
Working Working
WorkingWait WorkingWait
@ -22,54 +23,67 @@ const (
ContextDone ContextDone
) )
type replicationQueueItem struct {
retriesSinceLastError int
fsr *FSReplication
}
type Replication struct { type Replication struct {
// lock protects all fields of this struct (but not the fields behind pointers!)
lock sync.Mutex
state ReplicationState state ReplicationState
// Working / WorkingWait // Working / WorkingWait
pending, completed []*FSReplication pending, completed []*replicationQueueItem
active *replicationQueueItem
// PlanningError // PlanningError
planningError error planningError error
// ContextDone // ContextDone
contextError error contextError error
sleepUntil time.Time
} }
//go:generate stringer -type=FSReplicationState
type FSReplicationState int type FSReplicationState int
//go:generate stringer -type=FSReplicationState
const ( const (
FSQueued FSReplicationState = 1 << iota FSQueued FSReplicationState = 1 << iota
FSActive FSActive
FSRetry FSRetryWait
FSPermanentError FSPermanentError
FSCompleted FSCompleted
) )
type FSReplication struct { type FSReplication struct {
lock sync.Mutex
state FSReplicationState state FSReplicationState
fs *Filesystem fs *Filesystem
permanentError error permanentError error
retryAt time.Time
completed, pending []*FSReplicationStep completed, pending []*FSReplicationStep
active *FSReplicationStep
} }
func newFSReplicationPermanentError(fs *Filesystem, err error) *FSReplication { func newReplicationQueueItemPermanentError(fs *Filesystem, err error) *replicationQueueItem {
return &FSReplication{ return &replicationQueueItem{0, &FSReplication{
state: FSPermanentError, state: FSPermanentError,
fs: fs, fs: fs,
permanentError: err, permanentError: err,
} }}
} }
type FSReplicationBuilder struct { type replicationQueueItemBuilder struct {
r *FSReplication r *FSReplication
steps []*FSReplicationStep steps []*FSReplicationStep
} }
func buildNewFSReplication(fs *Filesystem) *FSReplicationBuilder { func buildNewFSReplication(fs *Filesystem) *replicationQueueItemBuilder {
return &FSReplicationBuilder{ return &replicationQueueItemBuilder{
r: &FSReplication{ r: &FSReplication{
fs: fs, fs: fs,
pending: make([]*FSReplicationStep, 0), pending: make([]*FSReplicationStep, 0),
@ -77,7 +91,7 @@ func buildNewFSReplication(fs *Filesystem) *FSReplicationBuilder {
} }
} }
func (b *FSReplicationBuilder) AddStep(from, to *FilesystemVersion) *FSReplication { func (b *replicationQueueItemBuilder) AddStep(from, to *FilesystemVersion) *replicationQueueItemBuilder {
step := &FSReplicationStep{ step := &FSReplicationStep{
state: StepPending, state: StepPending,
fsrep: b.r, fsrep: b.r,
@ -85,17 +99,17 @@ func (b *FSReplicationBuilder) AddStep(from, to *FilesystemVersion) *FSReplicati
to: to, to: to,
} }
b.r.pending = append(b.r.pending, step) b.r.pending = append(b.r.pending, step)
return b.r return b
} }
func (b *FSReplicationBuilder) Complete() *FSReplication { func (b *replicationQueueItemBuilder) Complete() *replicationQueueItem {
if len(b.r.pending) > 0 { if len(b.r.pending) > 0 {
b.r.state = FSQueued b.r.state = FSQueued
} else { } else {
b.r.state = FSCompleted b.r.state = FSCompleted
} }
r := b.r r := b.r
return r return &replicationQueueItem{0, r}
} }
//go:generate stringer -type=FSReplicationStepState //go:generate stringer -type=FSReplicationStepState
@ -103,13 +117,16 @@ type FSReplicationStepState int
const ( const (
StepPending FSReplicationStepState = iota StepPending FSReplicationStepState = iota
StepActive
StepRetry StepRetry
StepPermanentError StepPermanentError
StepCompleted StepCompleted
) )
type FSReplicationStep struct { type FSReplicationStep struct {
// only protects state, err
// from, to and fsrep are assumed to be immutable
lock sync.Mutex
state FSReplicationStepState state FSReplicationStepState
from, to *FilesystemVersion from, to *FilesystemVersion
fsrep *FSReplication fsrep *FSReplication
@ -119,7 +136,7 @@ type FSReplicationStep struct {
} }
func (r *Replication) Drive(ctx context.Context, ep EndpointPair, retryNow chan struct{}) { func (r *Replication) Drive(ctx context.Context, ep EndpointPair, retryNow chan struct{}) {
for !(r.state == Completed || r.state == ContextDone) { for r.state&(Completed|ContextDone) == 0 {
pre := r.state pre := r.state
preTime := time.Now() preTime := time.Now()
r.doDrive(ctx, ep, retryNow) r.doDrive(ctx, ep, retryNow)
@ -128,7 +145,22 @@ func (r *Replication) Drive(ctx context.Context, ep EndpointPair, retryNow chan
getLogger(ctx). getLogger(ctx).
WithField("transition", fmt.Sprintf("%s => %s", pre, post)). WithField("transition", fmt.Sprintf("%s => %s", pre, post)).
WithField("duration", delta). WithField("duration", delta).
Debug("state transition") Debug("main state transition")
now := time.Now()
sleepDuration := r.sleepUntil.Sub(now)
if sleepDuration > 100*time.Millisecond {
getLogger(ctx).
WithField("duration", sleepDuration).
WithField("wakeup_at", r.sleepUntil).
Error("sleeping until next attempt")
timer := time.NewTimer(sleepDuration)
select {
case <-timer.C:
case <-ctx.Done():
case <-retryNow:
}
timer.Stop()
}
} }
} }
@ -140,21 +172,16 @@ func (r *Replication) doDrive(ctx context.Context, ep EndpointPair, retryNow cha
r.tryBuildPlan(ctx, ep) r.tryBuildPlan(ctx, ep)
case PlanningError: case PlanningError:
w := time.NewTimer(10 * time.Second) // FIXME constant make configurable r.sleepUntil = time.Now().Add(10 * time.Second) // FIXME constant make configurable
defer w.Stop()
select {
case <-ctx.Done():
r.state = ContextDone
r.contextError = ctx.Err()
case <-retryNow:
r.state = Planning
r.planningError = nil
case <-w.C:
r.state = Planning
r.planningError = nil
}
case Working: case Working:
withLocks := func(f func()) {
r.lock.Lock()
defer r.lock.Unlock()
f()
}
withLocks(func() {
if r.active == nil {
if len(r.pending) == 0 { if len(r.pending) == 0 {
r.state = Completed r.state = Completed
@ -163,11 +190,11 @@ func (r *Replication) doDrive(ctx context.Context, ep EndpointPair, retryNow cha
sort.Slice(r.pending, func(i, j int) bool { sort.Slice(r.pending, func(i, j int) bool {
a, b := r.pending[i], r.pending[j] a, b := r.pending[i], r.pending[j]
statePrio := func(x *FSReplication) int { statePrio := func(x *replicationQueueItem) int {
if !(x.state == FSQueued || x.state == FSRetry) { if x.fsr.state&(FSQueued|FSRetryWait) == 0 {
panic(x) panic(x)
} }
if x.state == FSQueued { if x.fsr.state == FSQueued {
return 0 return 0
} else { } else {
return 1 return 1
@ -178,48 +205,53 @@ func (r *Replication) doDrive(ctx context.Context, ep EndpointPair, retryNow cha
return aprio < bprio return aprio < bprio
} }
// now we know they are the same state // now we know they are the same state
if a.state == FSQueued { if a.fsr.state == FSQueued {
return a.nextStepDate().Before(b.nextStepDate()) return a.fsr.nextStepDate().Before(b.fsr.nextStepDate())
} }
if a.state == FSRetry { if a.fsr.state == FSRetryWait {
return a.retryAt.Before(b.retryAt) return a.retriesSinceLastError < b.retriesSinceLastError
} }
panic("should not be reached") panic("should not be reached")
}) })
fsrep := r.pending[0] r.active = r.pending[0]
r.pending = r.pending[1:]
}
if fsrep.state == FSRetry { if r.active.fsr.state == FSRetryWait {
r.state = WorkingWait r.state = WorkingWait
return return
} }
if fsrep.state != FSQueued { if r.active.fsr.state != FSQueued {
panic(fsrep) panic(r.active)
}
})
if r.active == nil {
return
} }
fsState := fsrep.takeStep(ctx, ep) fsState := r.active.fsr.drive(ctx, ep)
if fsState&(FSPermanentError|FSCompleted) != 0 {
r.pending = r.pending[1:] withLocks(func() {
r.completed = append(r.completed, fsrep)
if fsState&FSQueued != 0 {
r.active.retriesSinceLastError = 0
} else if fsState&FSRetryWait != 0 {
r.active.retriesSinceLastError++
} else if fsState&(FSPermanentError|FSCompleted) != 0 {
r.completed = append(r.completed, r.active)
r.active = nil
} else {
panic(r.active)
} }
})
case WorkingWait: case WorkingWait:
fsrep := r.pending[0] r.sleepUntil = time.Now().Add(10 * time.Second) // FIXME make configurable
w := time.NewTimer(fsrep.retryAt.Sub(time.Now()))
defer w.Stop() default:
select { panic(r.state)
case <-ctx.Done():
r.state = ContextDone
r.contextError = ctx.Err()
case <-retryNow:
for _, fsr := range r.pending {
fsr.retryNow()
}
r.state = Working
case <-w.C:
fsrep.retryNow() // avoid timer jitter
r.state = Working
}
} }
} }
@ -227,16 +259,19 @@ func (r *Replication) tryBuildPlan(ctx context.Context, ep EndpointPair) Replica
log := getLogger(ctx) log := getLogger(ctx)
updateLock := func() func() {
r.lock.Lock()
return func() {
r.lock.Unlock()
}
}
planningError := func(err error) ReplicationState { planningError := func(err error) ReplicationState {
defer updateLock()()
r.state = PlanningError r.state = PlanningError
r.planningError = err r.planningError = err
return r.state return r.state
} }
done := func() ReplicationState {
r.state = Working
r.planningError = nil
return r.state
}
sfss, err := ep.Sender().ListFilesystems(ctx) sfss, err := ep.Sender().ListFilesystems(ctx)
if err != nil { if err != nil {
@ -250,8 +285,8 @@ func (r *Replication) tryBuildPlan(ctx context.Context, ep EndpointPair) Replica
return planningError(err) return planningError(err)
} }
r.pending = make([]*FSReplication, 0, len(sfss)) pending := make([]*replicationQueueItem, 0, len(sfss))
r.completed = make([]*FSReplication, 0, len(sfss)) completed := make([]*replicationQueueItem, 0, len(sfss))
mainlog := log mainlog := log
for _, fs := range sfss { for _, fs := range sfss {
@ -268,7 +303,7 @@ func (r *Replication) tryBuildPlan(ctx context.Context, ep EndpointPair) Replica
if len(sfsvs) <= 1 { if len(sfsvs) <= 1 {
err := errors.New("sender does not have any versions") err := errors.New("sender does not have any versions")
log.Error(err.Error()) log.Error(err.Error())
r.completed = append(r.completed, newFSReplicationPermanentError(fs, err)) completed = append(completed, newReplicationQueueItemPermanentError(fs, err))
continue continue
} }
@ -307,33 +342,40 @@ func (r *Replication) tryBuildPlan(ctx context.Context, ep EndpointPair) Replica
} }
} }
if path == nil { if path == nil {
r.completed = append(r.completed, newFSReplicationPermanentError(fs, conflict)) completed = append(completed, newReplicationQueueItemPermanentError(fs, conflict))
continue continue
} }
fsreplbuilder := buildNewFSReplication(fs) builder := buildNewFSReplication(fs)
if len(path) == 1 { if len(path) == 1 {
fsreplbuilder.AddStep(nil, path[0]) builder.AddStep(nil, path[0])
} else { } else {
for i := 0; i < len(path)-1; i++ { for i := 0; i < len(path)-1; i++ {
fsreplbuilder.AddStep(path[i], path[i+1]) builder.AddStep(path[i], path[i+1])
} }
} }
fsrepl := fsreplbuilder.Complete() qitem := builder.Complete()
switch fsrepl.state { switch qitem.fsr.state {
case FSCompleted: case FSCompleted:
r.completed = append(r.completed, fsreplbuilder.Complete()) completed = append(completed, qitem)
case FSQueued: case FSQueued:
r.pending = append(r.pending, fsreplbuilder.Complete()) pending = append(pending, qitem)
default: default:
panic(fsrepl) panic(qitem)
} }
} }
return done()
defer updateLock()()
r.completed = completed
r.pending = pending
r.state = Working
r.planningError = nil
return r.state
} }
// caller must have exclusive access to f
func (f *FSReplication) nextStepDate() time.Time { func (f *FSReplication) nextStepDate() time.Time {
if f.state != FSQueued { if f.state != FSQueued {
panic(f) panic(f)
@ -345,43 +387,71 @@ func (f *FSReplication) nextStepDate() time.Time {
return ct return ct
} }
func (f *FSReplication) takeStep(ctx context.Context, ep EndpointPair) FSReplicationState { func (f *FSReplication) drive(ctx context.Context, ep EndpointPair) FSReplicationState {
if f.state != FSQueued { f.lock.Lock()
panic(f) defer f.lock.Unlock()
for f.state&(FSRetryWait|FSPermanentError|FSCompleted) == 0 {
pre := f.state
preTime := time.Now()
f.doDrive(ctx, ep)
delta := time.Now().Sub(preTime)
post := f.state
getLogger(ctx).
WithField("transition", fmt.Sprintf("%s => %s", pre, post)).
WithField("duration", delta).
Debug("fsr state transition")
}
return f.state
} }
// caller must hold f.lock
func (f *FSReplication) doDrive(ctx context.Context, ep EndpointPair) FSReplicationState {
switch f.state {
case FSPermanentError:
fallthrough
case FSCompleted:
return f.state
case FSRetryWait:
f.state = FSQueued
return f.state
case FSQueued:
if f.active == nil {
if len(f.pending) == 0 {
f.state = FSCompleted
return f.state
}
f.active = f.pending[0]
f.pending = f.pending[1:]
}
f.state = FSActive f.state = FSActive
step := f.pending[0] return f.state
stepState := step.do(ctx, ep)
case FSActive:
var stepState FSReplicationStepState
func() { // drop lock during long call
f.lock.Unlock()
defer f.lock.Lock()
stepState = f.active.do(ctx, ep)
}()
switch stepState { switch stepState {
case StepCompleted: case StepCompleted:
f.pending = f.pending[1:] f.completed = append(f.completed, f.active)
f.completed = append(f.completed, step) f.active = nil
if len(f.pending) > 0 { if len(f.pending) > 0 {
f.state = FSQueued f.state = FSQueued
} else { } else {
f.state = FSCompleted f.state = FSCompleted
} }
case StepRetry: case StepRetry:
f.state = FSRetry f.state = FSRetryWait
f.retryAt = time.Now().Add(10 * time.Second) // FIXME hardcoded constant
case StepPermanentError: case StepPermanentError:
f.state = FSPermanentError f.state = FSPermanentError
} }
return f.state return f.state
} }
func (f *FSReplication) retryNow() {
if f.state != FSRetry {
panic(f) panic(f)
} }
f.retryAt = time.Time{}
f.state = FSQueued
}
func (s *FSReplicationStep) do(ctx context.Context, ep EndpointPair) FSReplicationStepState { func (s *FSReplicationStep) do(ctx context.Context, ep EndpointPair) FSReplicationStepState {
@ -392,20 +462,30 @@ func (s *FSReplicationStep) do(ctx context.Context, ep EndpointPair) FSReplicati
WithField("step", s.String()) WithField("step", s.String())
updateStateError := func(err error) FSReplicationStepState { updateStateError := func(err error) FSReplicationStepState {
s.lock.Lock()
defer s.lock.Unlock()
s.err = err s.err = err
switch err { switch err {
case io.EOF: fallthrough case io.EOF:
case io.ErrUnexpectedEOF: fallthrough fallthrough
case io.ErrUnexpectedEOF:
fallthrough
case io.ErrClosedPipe: case io.ErrClosedPipe:
return StepRetry s.state = StepRetry
return s.state
} }
if _, ok := err.(net.Error); ok { if _, ok := err.(net.Error); ok {
return StepRetry s.state = StepRetry
return s.state
} }
return StepPermanentError s.state = StepPermanentError
return s.state
} }
updateStateCompleted := func() FSReplicationStepState { updateStateCompleted := func() FSReplicationStepState {
s.lock.Lock()
defer s.lock.Unlock()
s.err = nil s.err = nil
s.state = StepCompleted s.state = StepCompleted
return s.state return s.state
@ -471,4 +551,3 @@ func (s *FSReplicationStep) String() string {
return fmt.Sprintf("%s(%s => %s)", s.fsrep.fs.Path, s.from.RelName(), s.to.RelName()) return fmt.Sprintf("%s(%s => %s)", s.fsrep.fs.Path, s.from.RelName(), s.to.RelName())
} }
} }

View File

@ -1,10 +1,14 @@
package replication package replication
import ( import (
"os"
"syscall"
"encoding/json"
"context" "context"
"fmt" "fmt"
"github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/logger"
"io" "io"
"os/signal"
) )
type ReplicationEndpoint interface { type ReplicationEndpoint interface {
@ -131,7 +135,41 @@ func resolveConflict(conflict error) (path []*FilesystemVersion, msg string) {
// Replicate continues with the replication of the remaining file systems. // Replicate continues with the replication of the remaining file systems.
// Depending on the type of error, failed replications are retried in an unspecified order (currently FIFO). // Depending on the type of error, failed replications are retried in an unspecified order (currently FIFO).
func Replicate(ctx context.Context, ep EndpointPair, retryNow chan struct{}) { func Replicate(ctx context.Context, ep EndpointPair, retryNow chan struct{}) {
r := Replication{} r := Replication{
state: Planning,
}
c := make(chan os.Signal)
defer close(c)
signal.Notify(c, syscall.SIGHUP)
go func() {
f, err := os.OpenFile("/tmp/report", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
getLogger(ctx).WithError(err).Error("cannot open report file")
panic(err)
}
defer f.Close()
for {
select {
case <-ctx.Done():
return
case sig := <-c:
if sig == nil {
return
}
report := r.Report()
enc := json.NewEncoder(f)
enc.SetIndent(" ", " ")
if err := enc.Encode(report); err != nil {
getLogger(ctx).WithError(err).Error("cannot encode report")
panic(err)
}
f.Write([]byte("\n"))
f.Sync()
}
}
}()
r.Drive(ctx, ep, retryNow) r.Drive(ctx, ep, retryNow)
} }

View File

@ -4,13 +4,32 @@ package replication
import "strconv" import "strconv"
const _ReplicationState_name = "PlanningPlanningErrorWorkingWorkingWaitCompletedContextDone" const (
_ReplicationState_name_0 = "PlanningPlanningError"
_ReplicationState_name_1 = "Working"
_ReplicationState_name_2 = "WorkingWait"
_ReplicationState_name_3 = "Completed"
_ReplicationState_name_4 = "ContextDone"
)
var _ReplicationState_index = [...]uint8{0, 8, 21, 28, 39, 48, 59} var (
_ReplicationState_index_0 = [...]uint8{0, 8, 21}
)
func (i ReplicationState) String() string { func (i ReplicationState) String() string {
if i < 0 || i >= ReplicationState(len(_ReplicationState_index)-1) { switch {
case 1 <= i && i <= 2:
i -= 1
return _ReplicationState_name_0[_ReplicationState_index_0[i]:_ReplicationState_index_0[i+1]]
case i == 4:
return _ReplicationState_name_1
case i == 8:
return _ReplicationState_name_2
case i == 16:
return _ReplicationState_name_3
case i == 32:
return _ReplicationState_name_4
default:
return "ReplicationState(" + strconv.FormatInt(int64(i), 10) + ")" return "ReplicationState(" + strconv.FormatInt(int64(i), 10) + ")"
} }
return _ReplicationState_name[_ReplicationState_index[i]:_ReplicationState_index[i+1]]
} }

View File

@ -0,0 +1,97 @@
package replication
type Report struct {
Status string
Problem string
Completed []*FilesystemReplicationReport
Pending []*FilesystemReplicationReport
Active *FilesystemReplicationReport
}
type StepReport struct {
From, To string
Status string
Problem string
}
type FilesystemReplicationReport struct {
Filesystem string
Status string
Problem string
Steps []*StepReport
}
func stepReportFromStep(step *FSReplicationStep) *StepReport {
var from string // FIXME follow same convention as ZFS: to should be nil on full send
if step.from != nil {
from = step.from.RelName()
}
rep := StepReport{
From: from,
To: step.to.RelName(),
Status: step.state.String(),
}
return &rep
}
// access to fsr's members must be exclusive
func filesystemReplicationReportFromQueueItem(qitem *replicationQueueItem) *FilesystemReplicationReport {
fsr := qitem.fsr
fsr.lock.Lock()
defer fsr.lock.Unlock()
rep := FilesystemReplicationReport{
Filesystem: fsr.fs.Path,
Status: fsr.state.String(),
}
if fsr.state&FSPermanentError != 0 {
rep.Problem = fsr.permanentError.Error()
return &rep
}
rep.Steps = make([]*StepReport, 0, len(fsr.completed)+len(fsr.pending) + 1)
for _, step := range fsr.completed {
rep.Steps = append(rep.Steps, stepReportFromStep(step))
}
if fsr.active != nil {
rep.Steps = append(rep.Steps, stepReportFromStep(fsr.active))
}
for _, step := range fsr.pending {
rep.Steps = append(rep.Steps, stepReportFromStep(step))
}
return &rep
}
func (r *Replication) Report() *Report {
r.lock.Lock()
defer r.lock.Unlock()
rep := Report{
Status: r.state.String(),
}
if r.state&(Planning|PlanningError|ContextDone) != 0 {
switch r.state {
case PlanningError:
rep.Problem = r.planningError.Error()
case ContextDone:
rep.Problem = r.contextError.Error()
}
return &rep
}
rep.Pending = make([]*FilesystemReplicationReport, 0, len(r.pending))
rep.Completed = make([]*FilesystemReplicationReport, 0, len(r.completed)) // room for active (potentially)
for _, qitem := range r.pending {
rep.Pending = append(rep.Pending, filesystemReplicationReportFromQueueItem(qitem))
}
for _, qitem := range r.completed {
rep.Completed = append(rep.Completed, filesystemReplicationReportFromQueueItem(qitem))
}
rep.Active = filesystemReplicationReportFromQueueItem(r.active)
return &rep
}