zrepl/client/signal.go
Christian Schwarz 17add553d3 WIP runtime-controllable concurrency for replication
Changes done so far:
- signal to route concurrency request up to the stepQueue
    - pretty hacky, no status reporting yet
- stepQueue upsizing (confirmed that it works, no intermediary commit)

Stuck at: stepQueue downsizing
- idea was to have the stepQueue signal to the activated step that it
  should suspend
- ideally, we'd just kill everything associated with the step and track
  it as restartable
    - the driver model doesn't allow for that though within an attempt
    - would need to start a separate run for the step
- less perfect: tell the downsized steps to stop copying, but leave
  all zfs sends + rpc conns open
    - - doesn't give back the resoures that a step aquired before
      being selected as downsize victim (open zfs processe + TCP conn,
      some memory in the pipe)
    - - looks weird to user if the ps aux
    - + re-waking a step is easy: just tell it to proceed with copying
    - (the impl would likely pass a check function down into Step.do
      and have it check that functino periodically. the suspend should
      be acknowledged, and stepQueue should only remove the step from
      the active queue _after_ that step has acknowledged that is
      suspended)
2020-02-17 17:22:52 +01:00

48 lines
986 B
Go

package client
import (
"github.com/pkg/errors"
"github.com/zrepl/zrepl/cli"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon"
)
var SignalCmd = &cli.Subcommand{
Use: "signal [wakeup|reset] JOB [DATA]",
Short: "wake up a job from wait state or abort its current invocation",
Run: func(subcommand *cli.Subcommand, args []string) error {
return runSignalCmd(subcommand.Config(), args)
},
}
func runSignalCmd(config *config.Config, args []string) error {
if len(args) < 2 || len(args) > 3 {
return errors.Errorf("Expected 2 arguments: [wakeup|reset|set-concurrency] JOB [DATA]")
}
var data string
if len(args) == 3 {
data = args[2]
}
httpc, err := controlHttpClient(config.Global.Control.SockPath)
if err != nil {
return err
}
err = jsonRequestResponse(httpc, daemon.ControlJobEndpointSignal,
struct {
Name string
Op string
Data string
}{
Name: args[1],
Op: args[0],
Data: data,
},
struct{}{},
)
return err
}