proper queue abstraction

This commit is contained in:
Christian Schwarz 2018-08-16 14:02:16 +02:00
parent 93929b61e4
commit bf1e626b9a
5 changed files with 199 additions and 140 deletions

View File

@ -192,7 +192,7 @@ func (j *PullJob) doRun(ctx context.Context) {
ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{j.task.Log().WithField("subsystem", "rpc.protocol")})
ctx = context.WithValue(ctx, contextKeyLog, j.task.Log().WithField("subsystem", "rpc.endpoint"))
j.rep = &replication.Replication{}
j.rep = replication.NewReplication()
retryNow := make(chan struct{})
j.rep.Drive(ctx, replication.NewEndpointPairPull(sender, puller), retryNow)

View File

@ -4,13 +4,26 @@ package replication
import "strconv"
const _FSReplicationStepState_name = "StepPendingStepRetryStepPermanentErrorStepCompleted"
const (
_FSReplicationStepState_name_0 = "StepReadyStepRetry"
_FSReplicationStepState_name_1 = "StepPermanentError"
_FSReplicationStepState_name_2 = "StepCompleted"
)
var _FSReplicationStepState_index = [...]uint8{0, 11, 20, 38, 51}
var (
_FSReplicationStepState_index_0 = [...]uint8{0, 9, 18}
)
func (i FSReplicationStepState) String() string {
if i < 0 || i >= FSReplicationStepState(len(_FSReplicationStepState_index)-1) {
switch {
case 1 <= i && i <= 2:
i -= 1
return _FSReplicationStepState_name_0[_FSReplicationStepState_index_0[i]:_FSReplicationStepState_index_0[i+1]]
case i == 4:
return _FSReplicationStepState_name_1
case i == 8:
return _FSReplicationStepState_name_2
default:
return "FSReplicationStepState(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _FSReplicationStepState_name[_FSReplicationStepState_index[i]:_FSReplicationStepState_index[i+1]]
}

View File

@ -42,6 +42,11 @@ func (s ReplicationState) rsf() replicationStateFunc {
type replicationQueueItem struct {
retriesSinceLastError int
// duplicates fsr.state to avoid accessing and locking fsr
state FSReplicationState
// duplicates fsr.current.nextStepDate to avoid accessing & locking fsr
nextStepDate time.Time
fsr *FSReplication
}
@ -52,8 +57,9 @@ type Replication struct {
state ReplicationState
// Working, WorkingWait, Completed, ContextDone
pending, completed []*replicationQueueItem
active *replicationQueueItem
queue *replicationQueue
completed []*FSReplication
active *FSReplication
// PlanningError
planningError error
@ -104,38 +110,17 @@ type FSReplication struct {
}
func newReplicationQueueItemPermanentError(fs *Filesystem, err error) *replicationQueueItem {
return &replicationQueueItem{0, &FSReplication{
return &replicationQueueItem{
retriesSinceLastError: 0,
state: FSPermanentError,
fs: fs,
err: err,
}}
}
type replicationQueueItemBuilder struct {
r *FSReplication
steps []*FSReplicationStep
}
func buildNewFSReplication(fs *Filesystem) *replicationQueueItemBuilder {
return &replicationQueueItemBuilder{
r: &FSReplication{
fs: fs,
pending: make([]*FSReplicationStep, 0),
fsr: &FSReplication{
state: FSPermanentError,
fs: fs,
err: err,
},
}
}
func (b *replicationQueueItemBuilder) AddStep(from, to *FilesystemVersion) *replicationQueueItemBuilder {
step := &FSReplicationStep{
state: StepPending,
fsrep: b.r,
from: from,
to: to,
}
b.r.pending = append(b.r.pending, step)
return b
}
func (b *replicationQueueItemBuilder) Complete() *replicationQueueItem {
if len(b.r.pending) > 0 {
b.r.state = FSReady
@ -143,14 +128,14 @@ func (b *replicationQueueItemBuilder) Complete() *replicationQueueItem {
b.r.state = FSCompleted
}
r := b.r
return &replicationQueueItem{0, r}
return &replicationQueueItem{0, b.r.state, time.Time{}, r}
}
//go:generate stringer -type=FSReplicationStepState
type FSReplicationStepState int
type FSReplicationStepState uint
const (
StepPending FSReplicationStepState = iota
StepReady FSReplicationStepState = 1 << iota
StepRetry
StepPermanentError
StepCompleted
@ -222,8 +207,7 @@ func rsfPlanning(ctx context.Context, ep EndpointPair, u replicationUpdater) rep
return handlePlanningError(err)
}
pending := make([]*replicationQueueItem, 0, len(sfss))
completed := make([]*replicationQueueItem, 0, len(sfss))
q := newReplicationQueue()
mainlog := log
for _, fs := range sfss {
@ -240,7 +224,7 @@ func rsfPlanning(ctx context.Context, ep EndpointPair, u replicationUpdater) rep
if len(sfsvs) <= 1 {
err := errors.New("sender does not have any versions")
log.Error(err.Error())
completed = append(completed, newReplicationQueueItemPermanentError(fs, err))
q.Add(newReplicationQueueItemPermanentError(fs, err))
continue
}
@ -279,11 +263,11 @@ func rsfPlanning(ctx context.Context, ep EndpointPair, u replicationUpdater) rep
}
}
if path == nil {
completed = append(completed, newReplicationQueueItemPermanentError(fs, conflict))
q.Add(newReplicationQueueItemPermanentError(fs, conflict))
continue
}
builder := buildNewFSReplication(fs)
builder := buildReplicationQueueItem(fs)
if len(path) == 1 {
builder.AddStep(nil, path[0])
} else {
@ -292,20 +276,12 @@ func rsfPlanning(ctx context.Context, ep EndpointPair, u replicationUpdater) rep
}
}
qitem := builder.Complete()
switch qitem.fsr.state {
case FSCompleted:
completed = append(completed, qitem)
case FSReady:
pending = append(pending, qitem)
default:
panic(qitem)
}
q.Add(qitem)
}
return u(func(r *Replication) {
r.completed = completed
r.pending = pending
r.completed = nil
r.queue = q
r.planningError = nil
r.state = Working
}).rsf()
@ -333,70 +309,25 @@ func rsfPlanningError(ctx context.Context, ep EndpointPair, u replicationUpdater
func rsfWorking(ctx context.Context, ep EndpointPair, u replicationUpdater) replicationStateFunc {
var active *replicationQueueItem
var active *replicationQueueItemHandle
rsfNext := u(func(r *Replication) {
if r.active != nil {
active = r.active
return
}
if len(r.pending) == 0 {
done, next := r.queue.GetNext()
r.completed = append(r.completed, done...)
if next == nil {
r.state = Completed
return
}
sort.Slice(r.pending, func(i, j int) bool {
a, b := r.pending[i], r.pending[j]
statePrio := func(x *replicationQueueItem) int {
if x.fsr.state&(FSReady|FSRetryWait) == 0 {
panic(x)
}
if x.fsr.state == FSReady {
return 0
} else {
return 1
}
}
aprio, bprio := statePrio(a), statePrio(b)
if aprio != bprio {
return aprio < bprio
}
// now we know they are the same state
if a.fsr.state == FSReady {
return a.fsr.nextStepDate().Before(b.fsr.nextStepDate())
}
if a.fsr.state == FSRetryWait {
return a.retriesSinceLastError < b.retriesSinceLastError
}
panic("should not be reached")
})
r.active = r.pending[0]
active = r.active
r.pending = r.pending[1:]
active = next
}).rsf()
if active == nil {
return rsfNext
}
fsState := active.fsr.takeStep(ctx, ep)
state, nextStepDate := active.GetFSReplication().takeStep(ctx, ep)
return u(func(r *Replication) {
if fsState&FSReady != 0 {
r.active.retriesSinceLastError = 0
} else if fsState&FSRetryWait != 0 {
r.active.retriesSinceLastError++
} else if fsState&(FSPermanentError|FSCompleted) != 0 {
r.completed = append(r.completed, r.active)
r.active = nil
} else {
panic(r.active)
}
active.Update(state, nextStepDate)
}).rsf()
}
@ -420,19 +351,121 @@ func rsfWorkingWait(ctx context.Context, ep EndpointPair, u replicationUpdater)
}
}
// caller must have exclusive access to f
func (f *FSReplication) nextStepDate() time.Time {
if f.state != FSReady {
panic(f)
}
ct, err := f.pending[0].to.CreationAsTime()
if err != nil {
panic(err) // FIXME
}
return ct
type replicationQueueItemBuilder struct {
r *FSReplication
steps []*FSReplicationStep
}
func (f *FSReplication) takeStep(ctx context.Context, ep EndpointPair) FSReplicationState {
func buildReplicationQueueItem(fs *Filesystem) *replicationQueueItemBuilder {
return &replicationQueueItemBuilder{
r: &FSReplication{
fs: fs,
pending: make([]*FSReplicationStep, 0),
},
}
}
func (b *replicationQueueItemBuilder) AddStep(from, to *FilesystemVersion) *replicationQueueItemBuilder {
step := &FSReplicationStep{
state: StepReady,
fsrep: b.r,
from: from,
to: to,
}
b.r.pending = append(b.r.pending, step)
return b
}
type replicationQueue []*replicationQueueItem
func newReplicationQueue() *replicationQueue {
q := make(replicationQueue, 0)
return &q
}
func (q replicationQueue) Len() int { return len(q) }
func (q replicationQueue) Swap(i,j int) { q[i], q[j] = q[j], q[i]}
func (q replicationQueue) Less(i,j int) bool {
a, b := q[i], q[j]
statePrio := func(x *replicationQueueItem) int {
if x.state&(FSReady|FSRetryWait) == 0 {
panic(x)
}
if x.state== FSReady {
return 0
}
return 1
}
aprio, bprio := statePrio(a), statePrio(b)
if aprio != bprio {
return aprio < bprio
}
// now we know they are the same state
if a.state == FSReady {
return a.nextStepDate.Before(b.nextStepDate)
}
if a.state == FSRetryWait {
return a.retriesSinceLastError < b.retriesSinceLastError
}
panic("should not be reached")
}
func (q *replicationQueue) sort() (done []*FSReplication) {
// pre-scan for everything that is not ready
newq := make(replicationQueue, 0, len(*q))
done = make([]*FSReplication, 0, len(*q))
for _, qitem := range *q {
if qitem.state&(FSReady|FSRetryWait) == 0 {
done = append(done, qitem.fsr)
} else {
newq = append(newq, qitem)
}
}
sort.SortStable(newq) // stable to avoid flickering in reports
*q = newq
return done
}
// next remains valid until the next call to GetNext()
func (q *replicationQueue) GetNext() (done []*FSReplication, next *replicationQueueItemHandle) {
done = q.sort()
if len(*q) == 0 {
return done, nil
}
next = &replicationQueueItemHandle{(*q)[0]}
return done, next
}
func (q *replicationQueue) Add(qitem *replicationQueueItem) {
*q = append(*q, qitem)
}
func (q *replicationQueue) Foreach(fu func(*replicationQueueItemHandle)) {
for _, qitem := range *q {
fu(&replicationQueueItemHandle{qitem})
}
}
type replicationQueueItemHandle struct {
i *replicationQueueItem
}
func (h replicationQueueItemHandle) GetFSReplication() *FSReplication {
return h.i.fsr
}
func (h replicationQueueItemHandle) Update(newState FSReplicationState, nextStepDate time.Time) {
h.i.state = newState
h.i.nextStepDate = nextStepDate
if h.i.state&FSReady != 0 {
h.i.retriesSinceLastError = 0
} else if h.i.state&FSRetryWait != 0 {
h.i.retriesSinceLastError++
}
}
func (f *FSReplication) takeStep(ctx context.Context, ep EndpointPair) (post FSReplicationState, nextStepDate time.Time) {
var u fsrUpdater = func(fu func(*FSReplication)) FSReplicationState {
f.lock.Lock()
@ -443,19 +476,29 @@ func (f *FSReplication) takeStep(ctx context.Context, ep EndpointPair) FSReplica
return f.state
}
var s fsrsf = u(nil).fsrsf()
for s != nil {
pre := u(nil)
preTime := time.Now()
s = s(ctx, ep, u)
delta := time.Now().Sub(preTime)
post := u(nil)
getLogger(ctx).
WithField("fs", f.fs.Path).
WithField("transition", fmt.Sprintf("%s => %s", pre, post)).
WithField("duration", delta).
Debug("fsr state transition")
}
return u(nil)
pre := u(nil)
preTime := time.Now()
s = s(ctx, ep, u)
delta := time.Now().Sub(preTime)
post = u(func(f *FSReplication) {
if f.state != FSReady {
return
}
ct, err := f.current.to.CreationAsTime()
if err != nil {
panic(err) // FIXME
}
nextStepDate = ct
})
getLogger(ctx).
WithField("fs", f.fs.Path).
WithField("transition", fmt.Sprintf("%s => %s", pre, post)).
WithField("duration", delta).
Debug("fsr step taken")
return post, nextStepDate
}
type fsrUpdater func(func(fsr *FSReplication)) FSReplicationState
@ -496,6 +539,7 @@ func fsrsfReady(ctx context.Context, ep EndpointPair, u fsrUpdater) fsrsf {
f.state = FSRetryWait
case StepPermanentError:
f.state = FSPermanentError
f.err = errors.New("a replication step failed with a permanent error")
default:
panic(f)
}

View File

@ -126,6 +126,12 @@ func resolveConflict(conflict error) (path []*FilesystemVersion, msg string) {
return nil, "no automated way to handle conflict type"
}
func NewReplication() *Replication {
r := Replication{
state: Planning,
}
return &r
}
// Replicate replicates filesystems from ep.Sender() to ep.Receiver().
//
// All filesystems presented by the sending side are replicated,

View File

@ -35,8 +35,7 @@ func stepReportFromStep(step *FSReplicationStep) *StepReport {
}
// access to fsr's members must be exclusive
func filesystemReplicationReportFromQueueItem(qitem *replicationQueueItem) *FilesystemReplicationReport {
fsr := qitem.fsr
func filesystemReplicationReport(fsr *FSReplication) *FilesystemReplicationReport {
fsr.lock.Lock()
defer fsr.lock.Unlock()
@ -81,18 +80,15 @@ func (r *Replication) Report() *Report {
return &rep
}
rep.Pending = make([]*FilesystemReplicationReport, 0, len(r.pending))
rep.Pending = make([]*FilesystemReplicationReport, 0, r.queue.Len())
rep.Completed = make([]*FilesystemReplicationReport, 0, len(r.completed)) // room for active (potentially)
for _, qitem := range r.pending {
rep.Pending = append(rep.Pending, filesystemReplicationReportFromQueueItem(qitem))
}
for _, qitem := range r.completed {
rep.Completed = append(rep.Completed, filesystemReplicationReportFromQueueItem(qitem))
r.queue.Foreach(func (h *replicationQueueItemHandle){
rep.Pending = append(rep.Pending, filesystemReplicationReport(h.GetFSReplication()))
})
for _, fsr := range r.completed {
rep.Completed = append(rep.Completed, filesystemReplicationReport(fsr))
}
if r.active != nil {
rep.Active = filesystemReplicationReportFromQueueItem(r.active)
}
return &rep
}