From 68b895d0bc47d1c8621acfe585651644dc69417f Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sun, 21 Mar 2021 19:00:59 +0100 Subject: [PATCH] WIP: zrepl wait active JOB INVOCATION_ID WHAT (callback-based implementation) --- daemon/control.go | 56 ++++++++++++++++++++++++++++++++++++++++---- daemon/job/active.go | 55 ++++++++++++++++++++++++++++++++++++++++--- daemon/job/job.go | 1 + main.go | 1 + 4 files changed, 106 insertions(+), 7 deletions(-) diff --git a/daemon/control.go b/daemon/control.go index 5efcf5e..40c60f2 100644 --- a/daemon/control.go +++ b/daemon/control.go @@ -73,10 +73,11 @@ func (j *controlJob) RegisterMetrics(registerer prometheus.Registerer) { } const ( - ControlJobEndpointPProf string = "/debug/pprof" - ControlJobEndpointVersion string = "/version" - ControlJobEndpointStatus string = "/status" - ControlJobEndpointSignal string = "/signal" + ControlJobEndpointPProf string = "/debug/pprof" + ControlJobEndpointVersion string = "/version" + ControlJobEndpointStatus string = "/status" + ControlJobEndpointSignal string = "/signal" + ControlJobEndpointWaitActive string = "/wait/active" ) func (j *controlJob) Run(ctx context.Context) { @@ -130,6 +131,52 @@ func (j *controlJob) Run(ctx context.Context) { return s, nil }}) + mux.Handle(ControlJobEndpointWaitActive, requestLogger{log: log, handler: jsonRequestResponder{log, func(decoder jsonDecoder) (v interface{}, err error) { + type reqT struct { + Job string + InvocationId uint64 + What string + } + var req reqT + if decoder(&req) != nil { + return nil, errors.Errorf("decode failed") + } + + j.jobs.m.RLock() + + jo, ok := j.jobs.jobs[req.Job] + if !ok { + j.jobs.m.RUnlock() + return struct{}{}, fmt.Errorf("unknown job name %q", req.Job) + } + + ajo, ok := jo.(*job.ActiveSide) + if !ok { + v, err = struct{}{}, fmt.Errorf("job %q is not an active side (it's a %T)", jo.Name(), jo) + j.jobs.m.RUnlock() + return v, err + } + + cbCalled := make(chan struct{}) + err = ajo.AddActiveSideWaiter(req.InvocationId, req.What, func() { + log.WithField("request", req).Debug("active side waiter done") + close(cbCalled) + }) + + j.jobs.m.RUnlock() // unlock before waiting! + + if err != nil { + return struct{}{}, err + } + + select { + // TODO ctx with timeout! + case <-cbCalled: + return struct{}{}, nil + } + + }}}) + mux.Handle(ControlJobEndpointSignal, requestLogger{log: log, handler: jsonRequestResponder{log, func(decoder jsonDecoder) (interface{}, error) { type reqT struct { @@ -155,6 +202,7 @@ func (j *controlJob) Run(ctx context.Context) { return struct{}{}, err }}}) + server := http.Server{ Handler: mux, // control socket is local, 1s timeout should be more than sufficient, even on a loaded system diff --git a/daemon/job/active.go b/daemon/job/active.go index ec3a8af..5f23845 100644 --- a/daemon/job/active.go +++ b/daemon/job/active.go @@ -43,8 +43,10 @@ type ActiveSide struct { promBytesReplicated *prometheus.CounterVec // labels: filesystem promReplicationErrors prometheus.Gauge - tasksMtx sync.Mutex - tasks activeSideTasks + tasksMtx sync.Mutex + tasks activeSideTasks + activeInvocationId uint64 // 0 <=> inactive + doneWaiters, nextWaiters []func() } //go:generate enumer -type=ActiveSideState @@ -433,6 +435,7 @@ func (j *ActiveSide) Run(ctx context.Context) { go j.mode.RunPeriodic(periodicCtx, periodicDone) invocationCount := 0 + outer: for { log.Info("wait for replications") @@ -446,12 +449,59 @@ outer: case <-periodicDone: } invocationCount++ + + j.tasksMtx.Lock() + j.activeInvocationId = uint64(invocationCount) + j.doneWaiters = j.nextWaiters + j.nextWaiters = nil + j.tasksMtx.Unlock() + invocationCtx, endSpan := trace.WithSpan(ctx, fmt.Sprintf("invocation-%d", invocationCount)) j.do(invocationCtx) + + j.tasksMtx.Lock() + j.activeInvocationId = 0 + for _, f := range j.doneWaiters { + go f() + } + j.doneWaiters = nil + j.tasksMtx.Unlock() + endSpan() } } +type AddActiveSideWaiterRequest struct { + InvocationId uint64 + What string +} + +func (j *ActiveSide) AddActiveSideWaiter(invocationId uint64, what string, cb func()) error { + j.tasksMtx.Lock() + defer j.tasksMtx.Unlock() + + var targetQueue *[]func() + if invocationId == 0 { + if j.activeInvocationId != 0 { + targetQueue = &j.doneWaiters + } else { + targetQueue = &j.nextWaiters + } + } else if j.activeInvocationId == invocationId { + targetQueue = &j.nextWaiters + } else { + return fmt.Errorf("invocation %d is not the current invocation, current invocation is %d (0 means no active invocation); pass id '0' to wait for the next invocation", invocationId, j.activeInvocationId) + } + + switch what { + case "invocation-done": + *targetQueue = append(*targetQueue, cb) + default: + return fmt.Errorf("unknown wait target %q", what) + } + return nil +} + func (j *ActiveSide) do(ctx context.Context) { j.mode.ConnectEndpoints(ctx, j.connecter) @@ -492,7 +542,6 @@ func (j *ActiveSide) do(ctx context.Context) { GetLogger(ctx).Info("start replication") repWait(true) // wait blocking repCancel() // always cancel to free up context resources - replicationReport := j.tasks.replicationReport() j.promReplicationErrors.Set(float64(replicationReport.GetFailedFilesystemsCountInLatestAttempt())) diff --git a/daemon/job/job.go b/daemon/job/job.go index 57aeecd..ac07cab 100644 --- a/daemon/job/job.go +++ b/daemon/job/job.go @@ -19,6 +19,7 @@ func GetLogger(ctx context.Context) Logger { return logging.GetLogger(ctx, logging.SubsysJob) } + type Job interface { Name() string Run(ctx context.Context) diff --git a/main.go b/main.go index 7e239ab..3f7625f 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ func init() { cli.AddSubcommand(daemon.DaemonCmd) cli.AddSubcommand(status.Subcommand) cli.AddSubcommand(client.SignalCmd) + cli.AddSubcommand(client.WaitCmd) cli.AddSubcommand(client.StdinserverCmd) cli.AddSubcommand(client.ConfigcheckCmd) cli.AddSubcommand(client.VersionCmd)