From 094eced2c7a17759167701517577ecc45a0e5fc9 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 16 Aug 2018 01:26:09 +0200 Subject: [PATCH] WIP: states with updater func instead of direct locking --- cmd/config_job_control.go | 5 + cmd/config_job_pull.go | 25 +-- cmd/replication.v2/plan.go | 307 +++++++++++++++++++---------------- cmd/replication.v2/report.go | 5 +- 4 files changed, 187 insertions(+), 155 deletions(-) diff --git a/cmd/config_job_control.go b/cmd/config_job_control.go index 1e7baae..300e4b0 100644 --- a/cmd/config_job_control.go +++ b/cmd/config_job_control.go @@ -78,6 +78,11 @@ func (j *ControlJob) JobStart(ctx context.Context) { requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) { return daemon.Status(), nil }}}) + mux.Handle("/pulljobreport", + requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) { + j := daemon.conf.Jobs["debian"] + return j.(*PullJob).Report(), nil + }}}) server := http.Server{Handler: mux} outer: diff --git a/cmd/config_job_pull.go b/cmd/config_job_pull.go index 08d5f1e..a5766c8 100644 --- a/cmd/config_job_pull.go +++ b/cmd/config_job_pull.go @@ -29,6 +29,7 @@ type PullJob struct { Debug JobDebugSettings task *Task + rep *replication.Replication } func parsePullJob(c JobParsingContext, name string, i map[string]interface{}) (j *PullJob, err error) { @@ -186,26 +187,14 @@ func (j *PullJob) doRun(ctx context.Context) { return } - usr2 := make(chan os.Signal) - defer close(usr2) - signal.Notify(usr2, syscall.SIGUSR2) - defer signal.Stop(usr2) - retryNow := make(chan struct{}, 1) // buffered so we don't leak the goroutine - go func() { - for { - sig := <-usr2 - if sig != nil { - retryNow <- struct{}{} - } else { - break - } - } - }() ctx = replication.ContextWithLogger(ctx, replicationLogAdaptor{j.task.Log().WithField("subsystem", "replication")}) ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{j.task.Log().WithField("subsystem", "rpc.protocol")}) ctx = context.WithValue(ctx, contextKeyLog, j.task.Log().WithField("subsystem", "rpc.endpoint")) - replication.Replicate(ctx, replication.NewEndpointPairPull(sender, puller), retryNow) + + j.rep = &replication.Replication{} + retryNow := make(chan struct{}) + j.rep.Drive(ctx, replication.NewEndpointPairPull(sender, puller), retryNow) client.Close() j.task.Finish() @@ -221,6 +210,10 @@ func (j *PullJob) doRun(ctx context.Context) { } +func (j *PullJob) Report() *replication.Report { + return j.rep.Report() +} + func (j *PullJob) JobStatus(ctxt context.Context) (*JobStatus, error) { return &JobStatus{Tasks: []*TaskStatus{j.task.Status()}}, nil } diff --git a/cmd/replication.v2/plan.go b/cmd/replication.v2/plan.go index 0e1f9b7..be275f0 100644 --- a/cmd/replication.v2/plan.go +++ b/cmd/replication.v2/plan.go @@ -29,14 +29,12 @@ type replicationQueueItem struct { } type Replication struct { - // lock protects all fields of this struct (but not the fields behind pointers!) lock sync.Mutex state ReplicationState - // Working / WorkingWait - + // Working, WorkingWait, Completed, ContextDone pending, completed []*replicationQueueItem active *replicationQueueItem @@ -46,9 +44,13 @@ type Replication struct { // ContextDone contextError error + // PlanningError, WorkingWait sleepUntil time.Time } +type replicationUpdater func(func(*Replication)) +type replicationStateFunc func(context.Context, EndpointPair, replicationUpdater) replicationStateFunc + //go:generate stringer -type=FSReplicationState type FSReplicationState int @@ -136,153 +138,58 @@ type FSReplicationStep struct { } func (r *Replication) Drive(ctx context.Context, ep EndpointPair, retryNow chan struct{}) { - for r.state&(Completed|ContextDone) == 0 { - pre := r.state + + var u replicationUpdater = func(f func(*Replication)) { + r.lock.Lock() + defer r.lock.Unlock() + f(r) + } + + var s replicationStateFunc = rsfPlanning + var pre, post ReplicationState + for s != nil { preTime := time.Now() - r.doDrive(ctx, ep, retryNow) + u(func(r *Replication){ + pre = r.state + }) + s = s(ctx, ep, u) delta := time.Now().Sub(preTime) - post := r.state + u(func(r *Replication){ + post = r.state + }) getLogger(ctx). WithField("transition", fmt.Sprintf("%s => %s", pre, post)). WithField("duration", delta). Debug("main state transition") - now := time.Now() - sleepDuration := r.sleepUntil.Sub(now) - if sleepDuration > 100*time.Millisecond { - getLogger(ctx). - WithField("duration", sleepDuration). - WithField("wakeup_at", r.sleepUntil). - Error("sleeping until next attempt") - timer := time.NewTimer(sleepDuration) - select { - case <-timer.C: - case <-ctx.Done(): - case <-retryNow: - } - timer.Stop() - } } + + getLogger(ctx). + WithField("final_state", post). + Debug("main final state") } -func (r *Replication) doDrive(ctx context.Context, ep EndpointPair, retryNow chan struct{}) { - - switch r.state { - - case Planning: - r.tryBuildPlan(ctx, ep) - - case PlanningError: - r.sleepUntil = time.Now().Add(10 * time.Second) // FIXME constant make configurable - - case Working: - withLocks := func(f func()) { - r.lock.Lock() - defer r.lock.Unlock() - f() - } - withLocks(func() { - if r.active == nil { - - if len(r.pending) == 0 { - r.state = Completed - return - } - - sort.Slice(r.pending, func(i, j int) bool { - a, b := r.pending[i], r.pending[j] - statePrio := func(x *replicationQueueItem) int { - if x.fsr.state&(FSQueued|FSRetryWait) == 0 { - panic(x) - } - if x.fsr.state == FSQueued { - return 0 - } else { - return 1 - } - } - aprio, bprio := statePrio(a), statePrio(b) - if aprio != bprio { - return aprio < bprio - } - // now we know they are the same state - if a.fsr.state == FSQueued { - return a.fsr.nextStepDate().Before(b.fsr.nextStepDate()) - } - if a.fsr.state == FSRetryWait { - return a.retriesSinceLastError < b.retriesSinceLastError - } - panic("should not be reached") - }) - - r.active = r.pending[0] - r.pending = r.pending[1:] - } - - if r.active.fsr.state == FSRetryWait { - r.state = WorkingWait - return - } - if r.active.fsr.state != FSQueued { - panic(r.active) - } - }) - - if r.active == nil { - return - } - - fsState := r.active.fsr.drive(ctx, ep) - - withLocks(func() { - - if fsState&FSQueued != 0 { - r.active.retriesSinceLastError = 0 - } else if fsState&FSRetryWait != 0 { - r.active.retriesSinceLastError++ - } else if fsState&(FSPermanentError|FSCompleted) != 0 { - r.completed = append(r.completed, r.active) - r.active = nil - } else { - panic(r.active) - } - }) - - case WorkingWait: - r.sleepUntil = time.Now().Add(10 * time.Second) // FIXME make configurable - - default: - panic(r.state) - } -} - -func (r *Replication) tryBuildPlan(ctx context.Context, ep EndpointPair) ReplicationState { - +func rsfPlanning(ctx context.Context, ep EndpointPair, u replicationUpdater) replicationStateFunc { + log := getLogger(ctx) - updateLock := func() func() { - r.lock.Lock() - return func() { - r.lock.Unlock() - } - } - - planningError := func(err error) ReplicationState { - defer updateLock()() - r.state = PlanningError - r.planningError = err - return r.state + handlePlanningError := func(err error) replicationStateFunc { + u(func(r *Replication){ + r.state = PlanningError + r.planningError = err + }) + return rsfPlanningError } sfss, err := ep.Sender().ListFilesystems(ctx) if err != nil { log.WithError(err).Error("error listing sender filesystems") - return planningError(err) + return handlePlanningError(err) } rfss, err := ep.Receiver().ListFilesystems(ctx) if err != nil { log.WithError(err).Error("error listing receiver filesystems") - return planningError(err) + return handlePlanningError(err) } pending := make([]*replicationQueueItem, 0, len(sfss)) @@ -297,7 +204,7 @@ func (r *Replication) tryBuildPlan(ctx context.Context, ep EndpointPair) Replica sfsvs, err := ep.Sender().ListFilesystemVersions(ctx, fs.Path) if err != nil { log.WithError(err).Error("cannot get remote filesystem versions") - return planningError(err) + return handlePlanningError(err) } if len(sfsvs) <= 1 { @@ -323,7 +230,7 @@ func (r *Replication) tryBuildPlan(ctx context.Context, ep EndpointPair) Replica continue } log.WithError(err).Error("receiver error") - return planningError(err) + return handlePlanningError(err) } } else { rfsvs = []*FilesystemVersion{} @@ -366,13 +273,139 @@ func (r *Replication) tryBuildPlan(ctx context.Context, ep EndpointPair) Replica } + u(func(r *Replication){ + r.completed = completed + r.pending = pending + r.state = Working + r.planningError = nil + }) + return rsfWorking +} - defer updateLock()() - r.completed = completed - r.pending = pending - r.state = Working - r.planningError = nil - return r.state +func rsfPlanningError(ctx context.Context, ep EndpointPair, u replicationUpdater) replicationStateFunc { + sleepTime := 10*time.Second + u(func(r *Replication){ + r.sleepUntil = time.Now().Add(sleepTime) + }) + t := time.NewTimer(sleepTime) // FIXME make constant onfigurable + defer t.Stop() + select { + case <- ctx.Done(): + u(func(r *Replication){ + r.state = ContextDone + r.contextError = ctx.Err() + }) + return nil + case <- t.C: + u(func(r *Replication){ + r.state = Planning + }) + return rsfPlanning + } +} + +func rsfWorking(ctx context.Context, ep EndpointPair, u replicationUpdater) (rsfNext replicationStateFunc) { + + var active *replicationQueueItem + + u(func(r *Replication) { + if r.active != nil { + active = r.active + return + } + + if len(r.pending) == 0 { + r.state = Completed + return + } + + sort.Slice(r.pending, func(i, j int) bool { + a, b := r.pending[i], r.pending[j] + statePrio := func(x *replicationQueueItem) int { + if x.fsr.state&(FSQueued|FSRetryWait) == 0 { + panic(x) + } + if x.fsr.state == FSQueued { + return 0 + } else { + return 1 + } + } + aprio, bprio := statePrio(a), statePrio(b) + if aprio != bprio { + return aprio < bprio + } + // now we know they are the same state + if a.fsr.state == FSQueued { + return a.fsr.nextStepDate().Before(b.fsr.nextStepDate()) + } + if a.fsr.state == FSRetryWait { + return a.retriesSinceLastError < b.retriesSinceLastError + } + panic("should not be reached") + }) + + r.active = r.pending[0] + active = r.active + r.pending = r.pending[1:] + + }) + + if active == nil { + return rsfNext + } + + if active.fsr.state == FSRetryWait { + u(func(r *Replication) { + r.state = WorkingWait + }) + return rsfWorkingWait + } + if active.fsr.state != FSQueued { + panic(active) + } + + fsState := active.fsr.drive(ctx, ep) + + u(func(r *Replication) { + + if fsState&FSQueued != 0 { + r.active.retriesSinceLastError = 0 + } else if fsState&FSRetryWait != 0 { + r.active.retriesSinceLastError++ + } else if fsState&(FSPermanentError|FSCompleted) != 0 { + r.completed = append(r.completed, r.active) + r.active = nil + } else { + panic(r.active) + } + + }) + + return rsfWorking + +} + +func rsfWorkingWait(ctx context.Context, ep EndpointPair, u replicationUpdater) replicationStateFunc { + sleepTime := 10*time.Second + u(func(r* Replication){ + r.sleepUntil = time.Now().Add(sleepTime) + }) + t := time.NewTimer(sleepTime) + defer t.Stop() + select { + case <- ctx.Done(): + u(func(r *Replication){ + r.state = ContextDone + r.contextError = ctx.Err() + }) + return nil + case <- t.C: + u(func(r *Replication){ + r.state = Working + }) + return rsfWorking + } } // caller must have exclusive access to f diff --git a/cmd/replication.v2/report.go b/cmd/replication.v2/report.go index 7e4ccaa..7fe0ccb 100644 --- a/cmd/replication.v2/report.go +++ b/cmd/replication.v2/report.go @@ -91,7 +91,8 @@ func (r *Replication) Report() *Report { rep.Completed = append(rep.Completed, filesystemReplicationReportFromQueueItem(qitem)) } - rep.Active = filesystemReplicationReportFromQueueItem(r.active) - + if r.active != nil { + rep.Active = filesystemReplicationReportFromQueueItem(r.active) + } return &rep }