From 97a14dba90d5df5f53fd3f28c13c05fe9c1f2571 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sun, 21 Mar 2021 21:57:26 +0100 Subject: [PATCH] WIP PoC signalling --- client/signal.go | 37 +++++++++--- client/status/client/client.go | 15 +++-- client/wait.go | 68 +++++++++++++---------- daemon/control.go | 61 +++++++++++--------- daemon/daemon.go | 54 +----------------- daemon/job/active.go | 63 ++++++++++++++++++--- daemon/job/doreplication/doreplication.go | 35 ------------ daemon/job/dosnapshot/dosnapshot.go | 35 ------------ daemon/job/reset/reset.go | 35 ------------ 9 files changed, 172 insertions(+), 231 deletions(-) delete mode 100644 daemon/job/doreplication/doreplication.go delete mode 100644 daemon/job/dosnapshot/dosnapshot.go delete mode 100644 daemon/job/reset/reset.go diff --git a/client/signal.go b/client/signal.go index 11bf4e4..5a3ec79 100644 --- a/client/signal.go +++ b/client/signal.go @@ -2,16 +2,19 @@ package client import ( "context" + "encoding/json" + "fmt" "github.com/pkg/errors" "github.com/zrepl/zrepl/cli" "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon" + "github.com/zrepl/zrepl/daemon/job" ) var SignalCmd = &cli.Subcommand{ - Use: "signal [replication|reset|snapshot] JOB", + Use: "signal JOB [replication|reset|snapshot]", Short: "run a job replication, abort its current invocation, run a snapshot job", Run: func(ctx context.Context, subcommand *cli.Subcommand, args []string) error { return runSignalCmd(subcommand.Config(), args) @@ -28,15 +31,35 @@ func runSignalCmd(config *config.Config, args []string) error { return err } - err = jsonRequestResponse(httpc, daemon.ControlJobEndpointSignal, + jobName := args[0] + what := args[1] + + var res job.ActiveSideSignalResponse + err = jsonRequestResponse(httpc, daemon.ControlJobEndpointSignalActive, struct { - Name string - Op string + Job string + job.ActiveSideSignalRequest }{ - Name: args[1], - Op: args[0], + Job: jobName, + ActiveSideSignalRequest: job.ActiveSideSignalRequest{ + What: what, + }, }, - struct{}{}, + &res, ) + + pollRequest := daemon.ControlJobEndpointSignalActiveRequest{ + Job: jobName, + ActiveSidePollRequest: job.ActiveSidePollRequest{ + InvocationId: res.InvocationId, + What: what, + }, + } + + j, err := json.Marshal(pollRequest) + if err != nil { + panic(err) + } + fmt.Println(string(j)) return err } diff --git a/client/status/client/client.go b/client/status/client/client.go index 476ed25..d495a18 100644 --- a/client/status/client/client.go +++ b/client/status/client/client.go @@ -11,6 +11,7 @@ import ( "github.com/pkg/errors" "github.com/zrepl/zrepl/daemon" + "github.com/zrepl/zrepl/daemon/job" ) type Client struct { @@ -42,14 +43,16 @@ func (c *Client) StatusRaw() ([]byte, error) { return r, nil } -func (c *Client) signal(job, sig string) error { - return jsonRequestResponse(c.h, daemon.ControlJobEndpointSignal, +func (c *Client) signal(jobName, sig string) error { + return jsonRequestResponse(c.h, daemon.ControlJobEndpointSignalActive, struct { - Name string - Op string + Job string + job.ActiveSideSignalRequest }{ - Name: job, - Op: sig, + Job: jobName, + ActiveSideSignalRequest: job.ActiveSideSignalRequest{ + What: sig, + }, }, struct{}{}, ) diff --git a/client/wait.go b/client/wait.go index f11331b..397df6c 100644 --- a/client/wait.go +++ b/client/wait.go @@ -2,6 +2,7 @@ package client import ( "context" + "encoding/json" "fmt" "strconv" "time" @@ -17,12 +18,13 @@ import ( ) var waitCmdArgs struct { - verbose bool + verbose bool interval time.Duration + token string } var WaitCmd = &cli.Subcommand{ - Use: "wait [active JOB INVOCATION_ID WHAT]", + Use: "wait [-t TOKEN | [replication|snapshotting|prune_sender|prune_receiver JOB]]", Short: "", Run: func(ctx context.Context, subcommand *cli.Subcommand, args []string) error { return runWaitCmd(subcommand.Config(), args) @@ -30,6 +32,7 @@ var WaitCmd = &cli.Subcommand{ SetupFlags: func(f *pflag.FlagSet) { f.BoolVarP(&waitCmdArgs.verbose, "verbose", "v", false, "verbose output") f.DurationVarP(&waitCmdArgs.interval, "poll-interval", "i", 100*time.Millisecond, "poll interval") + f.StringVarP(&waitCmdArgs.token, "token", "t", "", "token produced by 'signal' subcommand") }, } @@ -40,43 +43,50 @@ func runWaitCmd(config *config.Config, args []string) error { return err } - if args[0] != "active" { - panic(args) + var pollRequest daemon.ControlJobEndpointSignalActiveRequest + if waitCmdArgs.token != "" { + if len(args) != 0 { + return fmt.Errorf("-t and regular usage is mutually exclusive") + } + err := json.Unmarshal([]byte(waitCmdArgs.token), &pollRequest) + if err != nil { + return errors.Wrap(err, "cannot unmarshal token") + } + } else { + + if args[0] != "active" { + panic(args) + } + args = args[1:] + + jobName := args[0] + + invocationId, err := strconv.ParseUint(args[1], 10, 64) + if err != nil { + return errors.Wrap(err, "parse invocation id") + } + + waitWhat := args[2] + + // updated by subsequent requests + pollRequest = daemon.ControlJobEndpointSignalActiveRequest{ + Job: jobName, + ActiveSidePollRequest: job.ActiveSidePollRequest{ + InvocationId: invocationId, + What: waitWhat, + }, + } } - args = args[1:] - - jobName := args[0] - - invocationId, err := strconv.ParseUint(args[1], 10, 64) - if err != nil { - return errors.Wrap(err, "parse invocation id") - } - - waitWhat := args[2] doneErr := fmt.Errorf("done") - var pollRequest job.ActiveSidePollRequest - - // updated by subsequent requests - pollRequest = job.ActiveSidePollRequest{ - InvocationId: invocationId, - What: waitWhat, - } - pollOnce := func() error { var res job.ActiveSidePollResponse if waitCmdArgs.verbose { pretty.Println("making poll request", pollRequest) } err = jsonRequestResponse(httpc, daemon.ControlJobEndpointPollActive, - struct { - Job string - job.ActiveSidePollRequest - }{ - Job: jobName, - ActiveSidePollRequest: pollRequest, - }, + pollRequest, &res, ) if err != nil { diff --git a/daemon/control.go b/daemon/control.go index 9696302..38810e6 100644 --- a/daemon/control.go +++ b/daemon/control.go @@ -73,13 +73,18 @@ func (j *controlJob) RegisterMetrics(registerer prometheus.Registerer) { } const ( - ControlJobEndpointPProf string = "/debug/pprof" - ControlJobEndpointVersion string = "/version" - ControlJobEndpointStatus string = "/status" - ControlJobEndpointSignal string = "/signal" - ControlJobEndpointPollActive string = "/poll/active" + ControlJobEndpointPProf string = "/debug/pprof" + ControlJobEndpointVersion string = "/version" + ControlJobEndpointStatus string = "/status" + ControlJobEndpointSignalActive string = "/signal/active" + ControlJobEndpointPollActive string = "/poll/active" ) +type ControlJobEndpointSignalActiveRequest struct { + Job string + job.ActiveSidePollRequest +} + func (j *controlJob) Run(ctx context.Context) { log := job.GetLogger(ctx) @@ -132,11 +137,7 @@ func (j *controlJob) Run(ctx context.Context) { }}) mux.Handle(ControlJobEndpointPollActive, requestLogger{log: log, handler: jsonRequestResponder{log, func(decoder jsonDecoder) (v interface{}, err error) { - type reqT struct { - Job string - job.ActiveSidePollRequest - } - var req reqT + var req ControlJobEndpointSignalActiveRequest if decoder(&req) != nil { return nil, errors.Errorf("decode failed") } @@ -163,30 +164,40 @@ func (j *controlJob) Run(ctx context.Context) { return res, err }}}) - mux.Handle(ControlJobEndpointSignal, - requestLogger{log: log, handler: jsonRequestResponder{log, func(decoder jsonDecoder) (interface{}, error) { + mux.Handle(ControlJobEndpointSignalActive, + requestLogger{log: log, handler: jsonRequestResponder{log, func(decoder jsonDecoder) (v interface{}, err error) { type reqT struct { - Name string - Op string + Job string + job.ActiveSideSignalRequest } var req reqT if decoder(&req) != nil { return nil, errors.Errorf("decode failed") } - var err error - switch req.Op { - case "replication": - err = j.jobs.doreplication(req.Name) - case "reset": - err = j.jobs.reset(req.Name) - case "snapshot": - err = j.jobs.dosnapshot(req.Name) - default: - err = fmt.Errorf("operation %q is invalid", req.Op) + // FIXME dedup the following code with ControlJobEndpointPollActive + + j.jobs.m.RLock() + + jo, ok := j.jobs.jobs[req.Job] + if !ok { + j.jobs.m.RUnlock() + return struct{}{}, fmt.Errorf("unknown job name %q", req.Job) } - return struct{}{}, err + ajo, ok := jo.(*job.ActiveSide) + if !ok { + v, err = struct{}{}, fmt.Errorf("job %q is not an active side (it's a %T)", jo.Name(), jo) + j.jobs.m.RUnlock() + return v, err + } + + res, err := ajo.Signal(req.ActiveSideSignalRequest) + + j.jobs.m.RUnlock() + + return res, err + }}}) server := http.Server{ diff --git a/daemon/daemon.go b/daemon/daemon.go index 2bde353..f2f4097 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -20,9 +20,6 @@ import ( "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon/job" - "github.com/zrepl/zrepl/daemon/job/doreplication" - "github.com/zrepl/zrepl/daemon/job/dosnapshot" - "github.com/zrepl/zrepl/daemon/job/reset" "github.com/zrepl/zrepl/daemon/logging" "github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/version" @@ -132,19 +129,13 @@ type jobs struct { wg sync.WaitGroup // m protects all fields below it - m sync.RWMutex - doreplications map[string]doreplication.Func // by Job.Name - resets map[string]reset.Func // by Job.Name - dosnapshots map[string]dosnapshot.Func // by Job.Name - jobs map[string]job.Job + m sync.RWMutex + jobs map[string]job.Job } func newJobs() *jobs { return &jobs{ - doreplications: make(map[string]doreplication.Func), - resets: make(map[string]reset.Func), - dosnapshots: make(map[string]dosnapshot.Func), - jobs: make(map[string]job.Job), + jobs: make(map[string]job.Job), } } @@ -193,39 +184,6 @@ func (s *jobs) status() map[string]*job.Status { return ret } -func (s *jobs) doreplication(job string) error { - s.m.RLock() - defer s.m.RUnlock() - - wu, ok := s.doreplications[job] - if !ok { - return errors.Errorf("Job %s does not exist", job) - } - return wu() -} - -func (s *jobs) reset(job string) error { - s.m.RLock() - defer s.m.RUnlock() - - wu, ok := s.resets[job] - if !ok { - return errors.Errorf("Job %s does not exist", job) - } - return wu() -} - -func (s *jobs) dosnapshot(job string) error { - s.m.RLock() - defer s.m.RUnlock() - - wu, ok := s.dosnapshots[job] - if !ok { - return errors.Errorf("Job %s does not exist", job) - } - return wu() -} - const ( jobNamePrometheus = "_prometheus" jobNameControl = "_control" @@ -256,12 +214,6 @@ func (s *jobs) start(ctx context.Context, j job.Job, internal bool) { s.jobs[jobName] = j ctx = zfscmd.WithJobID(ctx, j.Name()) - ctx, doreplication := doreplication.Context(ctx) - ctx, resetFunc := reset.Context(ctx) - ctx, dosnapshotFunc := dosnapshot.Context(ctx) - s.doreplications[jobName] = doreplication - s.resets[jobName] = resetFunc - s.dosnapshots[jobName] = dosnapshotFunc s.wg.Add(1) go func() { diff --git a/daemon/job/active.go b/daemon/job/active.go index 2c2a183..8bae016 100644 --- a/daemon/job/active.go +++ b/daemon/job/active.go @@ -14,7 +14,6 @@ import ( "github.com/zrepl/zrepl/util/envconst" "github.com/zrepl/zrepl/config" - "github.com/zrepl/zrepl/daemon/job/doreplication" "github.com/zrepl/zrepl/daemon/job/reset" "github.com/zrepl/zrepl/daemon/pruner" "github.com/zrepl/zrepl/daemon/snapper" @@ -47,6 +46,7 @@ type ActiveSide struct { tasks activeSideTasks nextInvocationId uint64 activeInvocationId uint64 // 0 <=> inactive + signal chan struct{} } //go:generate enumer -type=ActiveSideState @@ -65,6 +65,7 @@ type activeSideTasks struct { // valid for state ActiveSideReplicating, ActiveSidePruneSender, ActiveSidePruneReceiver, ActiveSideDone replicationReport driver.ReportFunc replicationCancel context.CancelFunc + replicationDone *report.Report // valid for state ActiveSidePruneSender, ActiveSidePruneReceiver, ActiveSideDone prunerSender, prunerReceiver *pruner.Pruner @@ -91,7 +92,7 @@ type activeMode interface { SenderReceiver() (logic.Sender, logic.Receiver) Type() Type PlannerPolicy() logic.PlannerPolicy - RunPeriodic(ctx context.Context, replicationCommon chan<- struct{}) + RunPeriodic(ctx context.Context, wakePeriodic <-chan struct{}, replicationCommon chan<- struct{}) SnapperReport() *snapper.Report ResetConnectBackoff() } @@ -133,7 +134,7 @@ func (m *modePush) Type() Type { return TypePush } func (m *modePush) PlannerPolicy() logic.PlannerPolicy { return *m.plannerPolicy } -func (m *modePush) RunPeriodic(ctx context.Context, replicationCommon chan<- struct{}) { +func (m *modePush) RunPeriodic(ctx context.Context, wakePeriodic <-chan struct{}, replicationCommon chan<- struct{}) { m.snapper.Run(ctx, replicationCommon) } @@ -216,7 +217,7 @@ func (*modePull) Type() Type { return TypePull } func (m *modePull) PlannerPolicy() logic.PlannerPolicy { return *m.plannerPolicy } -func (m *modePull) RunPeriodic(ctx context.Context, replicationCommon chan<- struct{}) { +func (m *modePull) RunPeriodic(ctx context.Context, wakePeriodic <-chan struct{}, replicationCommon chan<- struct{}) { if m.interval.Manual { GetLogger(ctx).Info("manual pull configured, periodic pull disabled") // "waiting for wakeup replications" is printed in common ActiveSide.do @@ -432,8 +433,11 @@ func (j *ActiveSide) Run(ctx context.Context) { defer cancel() periodicCtx, endTask := trace.WithTask(ctx, "periodic") defer endTask() - go j.mode.RunPeriodic(periodicCtx, periodicDone) + wakePeriodic := make(chan struct{}) + go j.mode.RunPeriodic(periodicCtx, wakePeriodic, periodicDone) + + j.signal = make(chan struct{}) j.nextInvocationId = 1 outer: @@ -444,7 +448,7 @@ outer: log.WithError(ctx.Err()).Info("context") break outer - case <-doreplication.Wait(ctx): + case <-j.signal: j.mode.ResetConnectBackoff() case <-periodicDone: } @@ -490,7 +494,7 @@ func (j *ActiveSide) Poll(req ActiveSidePollRequest) (*ActiveSidePollResponse, e } switch req.What { - case "invocation-done": + case "invocation": var done bool if j.activeInvocationId == 0 { done = waitForId < j.nextInvocationId @@ -504,6 +508,47 @@ func (j *ActiveSide) Poll(req ActiveSidePollRequest) (*ActiveSidePollResponse, e } } +type ActiveSideSignalRequest struct { + What string +} + +type ActiveSideSignalResponse struct { + InvocationId uint64 +} + +func (j *ActiveSide) Signal(req ActiveSideSignalRequest) (*ActiveSideSignalResponse, error) { + // switch req.What { + // case "replication": + // invocationId, err = j.jobs.doreplication(req.Name) + // case "reset": + // err = j.jobs.reset(req.Name) + // case "snapshot": + // err = j.jobs.dosnapshot(req.Name) + // default: + // err = fmt.Errorf("operation %q is invalid", req.Op) + // } + + switch req.What { + case "invocation": + j.tasksMtx.Lock() + var invocationId uint64 + if j.activeInvocationId != 0 { + invocationId = j.activeInvocationId + } else { + invocationId = j.nextInvocationId + } + // non-blocking send (.Run() must not hold mutex while waiting for signals) + select { + case j.signal <- struct{}{}: + default: + } + j.tasksMtx.Unlock() + return &ActiveSideSignalResponse{InvocationId: invocationId}, nil + default: + return nil, fmt.Errorf("unknown signal %q", req.What) + } +} + func (j *ActiveSide) do(ctx context.Context) { j.mode.ConnectEndpoints(ctx, j.connecter) @@ -546,7 +591,9 @@ func (j *ActiveSide) do(ctx context.Context) { repCancel() // always cancel to free up context resources replicationReport := j.tasks.replicationReport() j.promReplicationErrors.Set(float64(replicationReport.GetFailedFilesystemsCountInLatestAttempt())) - + j.updateTasks(func(tasks *activeSideTasks) { + tasks.replicationDone = replicationReport + }) endSpan() } diff --git a/daemon/job/doreplication/doreplication.go b/daemon/job/doreplication/doreplication.go deleted file mode 100644 index b08d834..0000000 --- a/daemon/job/doreplication/doreplication.go +++ /dev/null @@ -1,35 +0,0 @@ -package doreplication - -import ( - "context" - "errors" -) - -type contextKey int - -const contextKeyReplication contextKey = iota - -func Wait(ctx context.Context) <-chan struct{} { - wc, ok := ctx.Value(contextKeyReplication).(chan struct{}) - if !ok { - wc = make(chan struct{}) - } - return wc -} - -type Func func() error - -var AlreadyReplicating = errors.New("already replicating") - -func Context(ctx context.Context) (context.Context, Func) { - wc := make(chan struct{}) - wuf := func() error { - select { - case wc <- struct{}{}: - return nil - default: - return AlreadyReplicating - } - } - return context.WithValue(ctx, contextKeyReplication, wc), wuf -} diff --git a/daemon/job/dosnapshot/dosnapshot.go b/daemon/job/dosnapshot/dosnapshot.go deleted file mode 100644 index aa9bfef..0000000 --- a/daemon/job/dosnapshot/dosnapshot.go +++ /dev/null @@ -1,35 +0,0 @@ -package dosnapshot - -import ( - "context" - "errors" -) - -type contextKey int - -const contextKeyDosnapshot contextKey = iota - -func Wait(ctx context.Context) <-chan struct{} { - wc, ok := ctx.Value(contextKeyDosnapshot).(chan struct{}) - if !ok { - wc = make(chan struct{}) - } - return wc -} - -type Func func() error - -var AlreadyDosnapshot = errors.New("already snapshotting") - -func Context(ctx context.Context) (context.Context, Func) { - wc := make(chan struct{}) - wuf := func() error { - select { - case wc <- struct{}{}: - return nil - default: - return AlreadyDosnapshot - } - } - return context.WithValue(ctx, contextKeyDosnapshot, wc), wuf -} diff --git a/daemon/job/reset/reset.go b/daemon/job/reset/reset.go deleted file mode 100644 index b7322ed..0000000 --- a/daemon/job/reset/reset.go +++ /dev/null @@ -1,35 +0,0 @@ -package reset - -import ( - "context" - "errors" -) - -type contextKey int - -const contextKeyReset contextKey = iota - -func Wait(ctx context.Context) <-chan struct{} { - wc, ok := ctx.Value(contextKeyReset).(chan struct{}) - if !ok { - wc = make(chan struct{}) - } - return wc -} - -type Func func() error - -var AlreadyReset = errors.New("already reset") - -func Context(ctx context.Context) (context.Context, Func) { - wc := make(chan struct{}) - wuf := func() error { - select { - case wc <- struct{}{}: - return nil - default: - return AlreadyReset - } - } - return context.WithValue(ctx, contextKeyReset, wc), wuf -}