diff --git a/client/wakeup.go b/client/signal.go similarity index 52% rename from client/wakeup.go rename to client/signal.go index d6a224f..273ea18 100644 --- a/client/wakeup.go +++ b/client/signal.go @@ -6,9 +6,9 @@ import ( "github.com/zrepl/zrepl/daemon" ) -func RunWakeup(config *config.Config, args []string) error { - if len(args) != 1 { - return errors.Errorf("Expected 1 argument: job") +func RunSignal(config *config.Config, args []string) error { + if len(args) != 2 { + return errors.Errorf("Expected 2 arguments: [wakeup|reset] JOB") } httpc, err := controlHttpClient(config.Global.Control.SockPath) @@ -16,11 +16,13 @@ func RunWakeup(config *config.Config, args []string) error { return err } - err = jsonRequestResponse(httpc, daemon.ControlJobEndpointWakeup, + err = jsonRequestResponse(httpc, daemon.ControlJobEndpointSignal, struct { Name string + Op string }{ - Name: args[0], + Name: args[1], + Op: args[0], }, struct{}{}, ) diff --git a/daemon/control.go b/daemon/control.go index 0be8a0d..9163dbc 100644 --- a/daemon/control.go +++ b/daemon/control.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "fmt" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/zrepl/zrepl/daemon/job" @@ -65,7 +66,7 @@ const ( ControlJobEndpointPProf string = "/debug/pprof" ControlJobEndpointVersion string = "/version" ControlJobEndpointStatus string = "/status" - ControlJobEndpointWakeup string = "/wakeup" + ControlJobEndpointSignal string = "/signal" ) func (j *controlJob) Run(ctx context.Context) { @@ -104,17 +105,26 @@ func (j *controlJob) Run(ctx context.Context) { return s, nil }}}) - mux.Handle(ControlJobEndpointWakeup, + mux.Handle(ControlJobEndpointSignal, requestLogger{log: log, handler: jsonRequestResponder{func(decoder jsonDecoder) (interface{}, error) { type reqT struct { Name string + Op string } var req reqT if decoder(&req) != nil { return nil, errors.Errorf("decode failed") } - err := j.jobs.wakeup(req.Name) + var err error + switch req.Op { + case "wakeup": + err = j.jobs.wakeup(req.Name) + case "reset": + err = j.jobs.reset(req.Name) + default: + err = fmt.Errorf("operation %q is invalid", req.Op) + } return struct{}{}, err }}}) diff --git a/daemon/daemon.go b/daemon/daemon.go index 41cf667..9f0e185 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -7,6 +7,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon/job" + "github.com/zrepl/zrepl/daemon/job/reset" "github.com/zrepl/zrepl/daemon/job/wakeup" "github.com/zrepl/zrepl/daemon/logging" "github.com/zrepl/zrepl/logger" @@ -102,12 +103,14 @@ type jobs struct { // m protects all fields below it m sync.RWMutex wakeups map[string]wakeup.Func // by Job.Name + resets map[string]reset.Func // by Job.Name jobs map[string]job.Job } func newJobs() *jobs { return &jobs{ wakeups: make(map[string]wakeup.Func), + resets: make(map[string]reset.Func), jobs: make(map[string]job.Job), } } @@ -163,6 +166,17 @@ func (s *jobs) wakeup(job string) error { return wu() } +func (s *jobs) reset(job string) error { + s.m.RLock() + defer s.m.RUnlock() + + wu, ok := s.resets[job] + if !ok { + return errors.Errorf("Job %s does not exist", job) + } + return wu() +} + const ( jobNamePrometheus = "_prometheus" jobNameControl = "_control" @@ -195,7 +209,9 @@ func (s *jobs) start(ctx context.Context, j job.Job, internal bool) { s.jobs[jobName] = j ctx = job.WithLogger(ctx, jobLog) ctx, wakeup := wakeup.Context(ctx) + ctx, resetFunc := reset.Context(ctx) s.wakeups[jobName] = wakeup + s.resets[jobName] = resetFunc s.wg.Add(1) go func() { diff --git a/daemon/job/active.go b/daemon/job/active.go index b50216d..0d78abb 100644 --- a/daemon/job/active.go +++ b/daemon/job/active.go @@ -6,6 +6,7 @@ import ( "github.com/problame/go-streamrpc" "github.com/prometheus/client_golang/prometheus" "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/daemon/job/reset" "github.com/zrepl/zrepl/daemon/job/wakeup" "github.com/zrepl/zrepl/daemon/transport/connecter" "github.com/zrepl/zrepl/daemon/filters" @@ -248,6 +249,21 @@ func (j *ActiveSide) do(ctx context.Context) { log := GetLogger(ctx) ctx = logging.WithSubsystemLoggers(ctx, log) + // allow cancellation of an invocation (this function) + ctx, cancelThisRun := context.WithCancel(ctx) + defer cancelThisRun() + runDone := make(chan struct{}) + defer close(runDone) + go func() { + select { + case <-runDone: + case <-reset.Wait(ctx): + log.Info("reset received, cancelling current invocation") + cancelThisRun() + case <-ctx.Done(): + } + }() + client, err := j.clientFactory.NewClient() if err != nil { log.WithError(err).Error("factory cannot instantiate streamrpc client") diff --git a/daemon/job/reset/reset.go b/daemon/job/reset/reset.go new file mode 100644 index 0000000..b7322ed --- /dev/null +++ b/daemon/job/reset/reset.go @@ -0,0 +1,35 @@ +package reset + +import ( + "context" + "errors" +) + +type contextKey int + +const contextKeyReset contextKey = iota + +func Wait(ctx context.Context) <-chan struct{} { + wc, ok := ctx.Value(contextKeyReset).(chan struct{}) + if !ok { + wc = make(chan struct{}) + } + return wc +} + +type Func func() error + +var AlreadyReset = errors.New("already reset") + +func Context(ctx context.Context) (context.Context, Func) { + wc := make(chan struct{}) + wuf := func() error { + select { + case wc <- struct{}{}: + return nil + default: + return AlreadyReset + } + } + return context.WithValue(ctx, contextKeyReset, wc), wuf +} diff --git a/main.go b/main.go index d23aae8..7c112a6 100644 --- a/main.go +++ b/main.go @@ -29,15 +29,15 @@ var daemonCmd = &cobra.Command{ }, } -var wakeupCmd = &cobra.Command{ - Use: "wakeup JOB", - Short: "trigger replication and subsequent pruning for a job", +var signalCmd = &cobra.Command{ + Use: "signal [wakeup|reset] JOB", + Short: "wake up a job from wait state or abort its current invocation", RunE: func(cmd *cobra.Command, args []string) error { conf, err := config.ParseConfig(rootArgs.configFile) if err != nil { return err } - return client.RunWakeup(conf, args) + return client.RunSignal(conf, args) }, } @@ -153,7 +153,7 @@ func init() { //cobra.OnInitialize(initConfig) rootCmd.PersistentFlags().StringVar(&rootArgs.configFile, "config", "", "config file path") rootCmd.AddCommand(daemonCmd) - rootCmd.AddCommand(wakeupCmd) + rootCmd.AddCommand(signalCmd) statusCmd.Flags().BoolVar(&statusCmdFlags.Raw, "raw", false, "dump raw status description from zrepl daemon") rootCmd.AddCommand(statusCmd) rootCmd.AddCommand(stdinserverCmd)