WIP: zrepl wait active JOB INVOCATION_ID WHAT (callback-based implementation)

This commit is contained in:
Christian Schwarz 2021-03-21 19:00:59 +01:00
parent 5c6d69a69c
commit 68b895d0bc
4 changed files with 106 additions and 7 deletions

View File

@ -77,6 +77,7 @@ const (
ControlJobEndpointVersion string = "/version" ControlJobEndpointVersion string = "/version"
ControlJobEndpointStatus string = "/status" ControlJobEndpointStatus string = "/status"
ControlJobEndpointSignal string = "/signal" ControlJobEndpointSignal string = "/signal"
ControlJobEndpointWaitActive string = "/wait/active"
) )
func (j *controlJob) Run(ctx context.Context) { func (j *controlJob) Run(ctx context.Context) {
@ -130,6 +131,52 @@ func (j *controlJob) Run(ctx context.Context) {
return s, nil return s, nil
}}) }})
mux.Handle(ControlJobEndpointWaitActive, requestLogger{log: log, handler: jsonRequestResponder{log, func(decoder jsonDecoder) (v interface{}, err error) {
type reqT struct {
Job string
InvocationId uint64
What string
}
var req reqT
if decoder(&req) != nil {
return nil, errors.Errorf("decode failed")
}
j.jobs.m.RLock()
jo, ok := j.jobs.jobs[req.Job]
if !ok {
j.jobs.m.RUnlock()
return struct{}{}, fmt.Errorf("unknown job name %q", req.Job)
}
ajo, ok := jo.(*job.ActiveSide)
if !ok {
v, err = struct{}{}, fmt.Errorf("job %q is not an active side (it's a %T)", jo.Name(), jo)
j.jobs.m.RUnlock()
return v, err
}
cbCalled := make(chan struct{})
err = ajo.AddActiveSideWaiter(req.InvocationId, req.What, func() {
log.WithField("request", req).Debug("active side waiter done")
close(cbCalled)
})
j.jobs.m.RUnlock() // unlock before waiting!
if err != nil {
return struct{}{}, err
}
select {
// TODO ctx with timeout!
case <-cbCalled:
return struct{}{}, nil
}
}}})
mux.Handle(ControlJobEndpointSignal, mux.Handle(ControlJobEndpointSignal,
requestLogger{log: log, handler: jsonRequestResponder{log, func(decoder jsonDecoder) (interface{}, error) { requestLogger{log: log, handler: jsonRequestResponder{log, func(decoder jsonDecoder) (interface{}, error) {
type reqT struct { type reqT struct {
@ -155,6 +202,7 @@ func (j *controlJob) Run(ctx context.Context) {
return struct{}{}, err return struct{}{}, err
}}}) }}})
server := http.Server{ server := http.Server{
Handler: mux, Handler: mux,
// control socket is local, 1s timeout should be more than sufficient, even on a loaded system // control socket is local, 1s timeout should be more than sufficient, even on a loaded system

View File

@ -45,6 +45,8 @@ type ActiveSide struct {
tasksMtx sync.Mutex tasksMtx sync.Mutex
tasks activeSideTasks tasks activeSideTasks
activeInvocationId uint64 // 0 <=> inactive
doneWaiters, nextWaiters []func()
} }
//go:generate enumer -type=ActiveSideState //go:generate enumer -type=ActiveSideState
@ -433,6 +435,7 @@ func (j *ActiveSide) Run(ctx context.Context) {
go j.mode.RunPeriodic(periodicCtx, periodicDone) go j.mode.RunPeriodic(periodicCtx, periodicDone)
invocationCount := 0 invocationCount := 0
outer: outer:
for { for {
log.Info("wait for replications") log.Info("wait for replications")
@ -446,12 +449,59 @@ outer:
case <-periodicDone: case <-periodicDone:
} }
invocationCount++ invocationCount++
j.tasksMtx.Lock()
j.activeInvocationId = uint64(invocationCount)
j.doneWaiters = j.nextWaiters
j.nextWaiters = nil
j.tasksMtx.Unlock()
invocationCtx, endSpan := trace.WithSpan(ctx, fmt.Sprintf("invocation-%d", invocationCount)) invocationCtx, endSpan := trace.WithSpan(ctx, fmt.Sprintf("invocation-%d", invocationCount))
j.do(invocationCtx) j.do(invocationCtx)
j.tasksMtx.Lock()
j.activeInvocationId = 0
for _, f := range j.doneWaiters {
go f()
}
j.doneWaiters = nil
j.tasksMtx.Unlock()
endSpan() endSpan()
} }
} }
type AddActiveSideWaiterRequest struct {
InvocationId uint64
What string
}
func (j *ActiveSide) AddActiveSideWaiter(invocationId uint64, what string, cb func()) error {
j.tasksMtx.Lock()
defer j.tasksMtx.Unlock()
var targetQueue *[]func()
if invocationId == 0 {
if j.activeInvocationId != 0 {
targetQueue = &j.doneWaiters
} else {
targetQueue = &j.nextWaiters
}
} else if j.activeInvocationId == invocationId {
targetQueue = &j.nextWaiters
} else {
return fmt.Errorf("invocation %d is not the current invocation, current invocation is %d (0 means no active invocation); pass id '0' to wait for the next invocation", invocationId, j.activeInvocationId)
}
switch what {
case "invocation-done":
*targetQueue = append(*targetQueue, cb)
default:
return fmt.Errorf("unknown wait target %q", what)
}
return nil
}
func (j *ActiveSide) do(ctx context.Context) { func (j *ActiveSide) do(ctx context.Context) {
j.mode.ConnectEndpoints(ctx, j.connecter) j.mode.ConnectEndpoints(ctx, j.connecter)
@ -492,7 +542,6 @@ func (j *ActiveSide) do(ctx context.Context) {
GetLogger(ctx).Info("start replication") GetLogger(ctx).Info("start replication")
repWait(true) // wait blocking repWait(true) // wait blocking
repCancel() // always cancel to free up context resources repCancel() // always cancel to free up context resources
replicationReport := j.tasks.replicationReport() replicationReport := j.tasks.replicationReport()
j.promReplicationErrors.Set(float64(replicationReport.GetFailedFilesystemsCountInLatestAttempt())) j.promReplicationErrors.Set(float64(replicationReport.GetFailedFilesystemsCountInLatestAttempt()))

View File

@ -19,6 +19,7 @@ func GetLogger(ctx context.Context) Logger {
return logging.GetLogger(ctx, logging.SubsysJob) return logging.GetLogger(ctx, logging.SubsysJob)
} }
type Job interface { type Job interface {
Name() string Name() string
Run(ctx context.Context) Run(ctx context.Context)

View File

@ -12,6 +12,7 @@ func init() {
cli.AddSubcommand(daemon.DaemonCmd) cli.AddSubcommand(daemon.DaemonCmd)
cli.AddSubcommand(status.Subcommand) cli.AddSubcommand(status.Subcommand)
cli.AddSubcommand(client.SignalCmd) cli.AddSubcommand(client.SignalCmd)
cli.AddSubcommand(client.WaitCmd)
cli.AddSubcommand(client.StdinserverCmd) cli.AddSubcommand(client.StdinserverCmd)
cli.AddSubcommand(client.ConfigcheckCmd) cli.AddSubcommand(client.ConfigcheckCmd)
cli.AddSubcommand(client.VersionCmd) cli.AddSubcommand(client.VersionCmd)