WIP: states with updater func instead of direct locking

This commit is contained in:
Christian Schwarz 2018-08-16 01:26:09 +02:00
parent 991f13a3da
commit 094eced2c7
4 changed files with 187 additions and 155 deletions

View File

@ -78,6 +78,11 @@ func (j *ControlJob) JobStart(ctx context.Context) {
requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) { requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) {
return daemon.Status(), nil return daemon.Status(), nil
}}}) }}})
mux.Handle("/pulljobreport",
requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) {
j := daemon.conf.Jobs["debian"]
return j.(*PullJob).Report(), nil
}}})
server := http.Server{Handler: mux} server := http.Server{Handler: mux}
outer: outer:

View File

@ -29,6 +29,7 @@ type PullJob struct {
Debug JobDebugSettings Debug JobDebugSettings
task *Task task *Task
rep *replication.Replication
} }
func parsePullJob(c JobParsingContext, name string, i map[string]interface{}) (j *PullJob, err error) { func parsePullJob(c JobParsingContext, name string, i map[string]interface{}) (j *PullJob, err error) {
@ -186,26 +187,14 @@ func (j *PullJob) doRun(ctx context.Context) {
return return
} }
usr2 := make(chan os.Signal)
defer close(usr2)
signal.Notify(usr2, syscall.SIGUSR2)
defer signal.Stop(usr2)
retryNow := make(chan struct{}, 1) // buffered so we don't leak the goroutine
go func() {
for {
sig := <-usr2
if sig != nil {
retryNow <- struct{}{}
} else {
break
}
}
}()
ctx = replication.ContextWithLogger(ctx, replicationLogAdaptor{j.task.Log().WithField("subsystem", "replication")}) ctx = replication.ContextWithLogger(ctx, replicationLogAdaptor{j.task.Log().WithField("subsystem", "replication")})
ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{j.task.Log().WithField("subsystem", "rpc.protocol")}) ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{j.task.Log().WithField("subsystem", "rpc.protocol")})
ctx = context.WithValue(ctx, contextKeyLog, j.task.Log().WithField("subsystem", "rpc.endpoint")) ctx = context.WithValue(ctx, contextKeyLog, j.task.Log().WithField("subsystem", "rpc.endpoint"))
replication.Replicate(ctx, replication.NewEndpointPairPull(sender, puller), retryNow)
j.rep = &replication.Replication{}
retryNow := make(chan struct{})
j.rep.Drive(ctx, replication.NewEndpointPairPull(sender, puller), retryNow)
client.Close() client.Close()
j.task.Finish() j.task.Finish()
@ -221,6 +210,10 @@ func (j *PullJob) doRun(ctx context.Context) {
} }
func (j *PullJob) Report() *replication.Report {
return j.rep.Report()
}
func (j *PullJob) JobStatus(ctxt context.Context) (*JobStatus, error) { func (j *PullJob) JobStatus(ctxt context.Context) (*JobStatus, error) {
return &JobStatus{Tasks: []*TaskStatus{j.task.Status()}}, nil return &JobStatus{Tasks: []*TaskStatus{j.task.Status()}}, nil
} }

View File

@ -29,14 +29,12 @@ type replicationQueueItem struct {
} }
type Replication struct { type Replication struct {
// lock protects all fields of this struct (but not the fields behind pointers!) // lock protects all fields of this struct (but not the fields behind pointers!)
lock sync.Mutex lock sync.Mutex
state ReplicationState state ReplicationState
// Working / WorkingWait // Working, WorkingWait, Completed, ContextDone
pending, completed []*replicationQueueItem pending, completed []*replicationQueueItem
active *replicationQueueItem active *replicationQueueItem
@ -46,9 +44,13 @@ type Replication struct {
// ContextDone // ContextDone
contextError error contextError error
// PlanningError, WorkingWait
sleepUntil time.Time sleepUntil time.Time
} }
type replicationUpdater func(func(*Replication))
type replicationStateFunc func(context.Context, EndpointPair, replicationUpdater) replicationStateFunc
//go:generate stringer -type=FSReplicationState //go:generate stringer -type=FSReplicationState
type FSReplicationState int type FSReplicationState int
@ -136,153 +138,58 @@ type FSReplicationStep struct {
} }
func (r *Replication) Drive(ctx context.Context, ep EndpointPair, retryNow chan struct{}) { func (r *Replication) Drive(ctx context.Context, ep EndpointPair, retryNow chan struct{}) {
for r.state&(Completed|ContextDone) == 0 {
pre := r.state var u replicationUpdater = func(f func(*Replication)) {
r.lock.Lock()
defer r.lock.Unlock()
f(r)
}
var s replicationStateFunc = rsfPlanning
var pre, post ReplicationState
for s != nil {
preTime := time.Now() preTime := time.Now()
r.doDrive(ctx, ep, retryNow) u(func(r *Replication){
pre = r.state
})
s = s(ctx, ep, u)
delta := time.Now().Sub(preTime) delta := time.Now().Sub(preTime)
post := r.state u(func(r *Replication){
post = r.state
})
getLogger(ctx). getLogger(ctx).
WithField("transition", fmt.Sprintf("%s => %s", pre, post)). WithField("transition", fmt.Sprintf("%s => %s", pre, post)).
WithField("duration", delta). WithField("duration", delta).
Debug("main state transition") Debug("main state transition")
now := time.Now() }
sleepDuration := r.sleepUntil.Sub(now)
if sleepDuration > 100*time.Millisecond {
getLogger(ctx). getLogger(ctx).
WithField("duration", sleepDuration). WithField("final_state", post).
WithField("wakeup_at", r.sleepUntil). Debug("main final state")
Error("sleeping until next attempt")
timer := time.NewTimer(sleepDuration)
select {
case <-timer.C:
case <-ctx.Done():
case <-retryNow:
}
timer.Stop()
}
}
} }
func (r *Replication) doDrive(ctx context.Context, ep EndpointPair, retryNow chan struct{}) { func rsfPlanning(ctx context.Context, ep EndpointPair, u replicationUpdater) replicationStateFunc {
switch r.state {
case Planning:
r.tryBuildPlan(ctx, ep)
case PlanningError:
r.sleepUntil = time.Now().Add(10 * time.Second) // FIXME constant make configurable
case Working:
withLocks := func(f func()) {
r.lock.Lock()
defer r.lock.Unlock()
f()
}
withLocks(func() {
if r.active == nil {
if len(r.pending) == 0 {
r.state = Completed
return
}
sort.Slice(r.pending, func(i, j int) bool {
a, b := r.pending[i], r.pending[j]
statePrio := func(x *replicationQueueItem) int {
if x.fsr.state&(FSQueued|FSRetryWait) == 0 {
panic(x)
}
if x.fsr.state == FSQueued {
return 0
} else {
return 1
}
}
aprio, bprio := statePrio(a), statePrio(b)
if aprio != bprio {
return aprio < bprio
}
// now we know they are the same state
if a.fsr.state == FSQueued {
return a.fsr.nextStepDate().Before(b.fsr.nextStepDate())
}
if a.fsr.state == FSRetryWait {
return a.retriesSinceLastError < b.retriesSinceLastError
}
panic("should not be reached")
})
r.active = r.pending[0]
r.pending = r.pending[1:]
}
if r.active.fsr.state == FSRetryWait {
r.state = WorkingWait
return
}
if r.active.fsr.state != FSQueued {
panic(r.active)
}
})
if r.active == nil {
return
}
fsState := r.active.fsr.drive(ctx, ep)
withLocks(func() {
if fsState&FSQueued != 0 {
r.active.retriesSinceLastError = 0
} else if fsState&FSRetryWait != 0 {
r.active.retriesSinceLastError++
} else if fsState&(FSPermanentError|FSCompleted) != 0 {
r.completed = append(r.completed, r.active)
r.active = nil
} else {
panic(r.active)
}
})
case WorkingWait:
r.sleepUntil = time.Now().Add(10 * time.Second) // FIXME make configurable
default:
panic(r.state)
}
}
func (r *Replication) tryBuildPlan(ctx context.Context, ep EndpointPair) ReplicationState {
log := getLogger(ctx) log := getLogger(ctx)
updateLock := func() func() { handlePlanningError := func(err error) replicationStateFunc {
r.lock.Lock() u(func(r *Replication){
return func() {
r.lock.Unlock()
}
}
planningError := func(err error) ReplicationState {
defer updateLock()()
r.state = PlanningError r.state = PlanningError
r.planningError = err r.planningError = err
return r.state })
return rsfPlanningError
} }
sfss, err := ep.Sender().ListFilesystems(ctx) sfss, err := ep.Sender().ListFilesystems(ctx)
if err != nil { if err != nil {
log.WithError(err).Error("error listing sender filesystems") log.WithError(err).Error("error listing sender filesystems")
return planningError(err) return handlePlanningError(err)
} }
rfss, err := ep.Receiver().ListFilesystems(ctx) rfss, err := ep.Receiver().ListFilesystems(ctx)
if err != nil { if err != nil {
log.WithError(err).Error("error listing receiver filesystems") log.WithError(err).Error("error listing receiver filesystems")
return planningError(err) return handlePlanningError(err)
} }
pending := make([]*replicationQueueItem, 0, len(sfss)) pending := make([]*replicationQueueItem, 0, len(sfss))
@ -297,7 +204,7 @@ func (r *Replication) tryBuildPlan(ctx context.Context, ep EndpointPair) Replica
sfsvs, err := ep.Sender().ListFilesystemVersions(ctx, fs.Path) sfsvs, err := ep.Sender().ListFilesystemVersions(ctx, fs.Path)
if err != nil { if err != nil {
log.WithError(err).Error("cannot get remote filesystem versions") log.WithError(err).Error("cannot get remote filesystem versions")
return planningError(err) return handlePlanningError(err)
} }
if len(sfsvs) <= 1 { if len(sfsvs) <= 1 {
@ -323,7 +230,7 @@ func (r *Replication) tryBuildPlan(ctx context.Context, ep EndpointPair) Replica
continue continue
} }
log.WithError(err).Error("receiver error") log.WithError(err).Error("receiver error")
return planningError(err) return handlePlanningError(err)
} }
} else { } else {
rfsvs = []*FilesystemVersion{} rfsvs = []*FilesystemVersion{}
@ -366,13 +273,139 @@ func (r *Replication) tryBuildPlan(ctx context.Context, ep EndpointPair) Replica
} }
u(func(r *Replication){
defer updateLock()()
r.completed = completed r.completed = completed
r.pending = pending r.pending = pending
r.state = Working r.state = Working
r.planningError = nil r.planningError = nil
return r.state })
return rsfWorking
}
func rsfPlanningError(ctx context.Context, ep EndpointPair, u replicationUpdater) replicationStateFunc {
sleepTime := 10*time.Second
u(func(r *Replication){
r.sleepUntil = time.Now().Add(sleepTime)
})
t := time.NewTimer(sleepTime) // FIXME make constant onfigurable
defer t.Stop()
select {
case <- ctx.Done():
u(func(r *Replication){
r.state = ContextDone
r.contextError = ctx.Err()
})
return nil
case <- t.C:
u(func(r *Replication){
r.state = Planning
})
return rsfPlanning
}
}
func rsfWorking(ctx context.Context, ep EndpointPair, u replicationUpdater) (rsfNext replicationStateFunc) {
var active *replicationQueueItem
u(func(r *Replication) {
if r.active != nil {
active = r.active
return
}
if len(r.pending) == 0 {
r.state = Completed
return
}
sort.Slice(r.pending, func(i, j int) bool {
a, b := r.pending[i], r.pending[j]
statePrio := func(x *replicationQueueItem) int {
if x.fsr.state&(FSQueued|FSRetryWait) == 0 {
panic(x)
}
if x.fsr.state == FSQueued {
return 0
} else {
return 1
}
}
aprio, bprio := statePrio(a), statePrio(b)
if aprio != bprio {
return aprio < bprio
}
// now we know they are the same state
if a.fsr.state == FSQueued {
return a.fsr.nextStepDate().Before(b.fsr.nextStepDate())
}
if a.fsr.state == FSRetryWait {
return a.retriesSinceLastError < b.retriesSinceLastError
}
panic("should not be reached")
})
r.active = r.pending[0]
active = r.active
r.pending = r.pending[1:]
})
if active == nil {
return rsfNext
}
if active.fsr.state == FSRetryWait {
u(func(r *Replication) {
r.state = WorkingWait
})
return rsfWorkingWait
}
if active.fsr.state != FSQueued {
panic(active)
}
fsState := active.fsr.drive(ctx, ep)
u(func(r *Replication) {
if fsState&FSQueued != 0 {
r.active.retriesSinceLastError = 0
} else if fsState&FSRetryWait != 0 {
r.active.retriesSinceLastError++
} else if fsState&(FSPermanentError|FSCompleted) != 0 {
r.completed = append(r.completed, r.active)
r.active = nil
} else {
panic(r.active)
}
})
return rsfWorking
}
func rsfWorkingWait(ctx context.Context, ep EndpointPair, u replicationUpdater) replicationStateFunc {
sleepTime := 10*time.Second
u(func(r* Replication){
r.sleepUntil = time.Now().Add(sleepTime)
})
t := time.NewTimer(sleepTime)
defer t.Stop()
select {
case <- ctx.Done():
u(func(r *Replication){
r.state = ContextDone
r.contextError = ctx.Err()
})
return nil
case <- t.C:
u(func(r *Replication){
r.state = Working
})
return rsfWorking
}
} }
// caller must have exclusive access to f // caller must have exclusive access to f

View File

@ -91,7 +91,8 @@ func (r *Replication) Report() *Report {
rep.Completed = append(rep.Completed, filesystemReplicationReportFromQueueItem(qitem)) rep.Completed = append(rep.Completed, filesystemReplicationReportFromQueueItem(qitem))
} }
if r.active != nil {
rep.Active = filesystemReplicationReportFromQueueItem(r.active) rep.Active = filesystemReplicationReportFromQueueItem(r.active)
}
return &rep return &rep
} }