move wakeup subcommand into signal subcommand and add reset subcommand

This commit is contained in:
Christian Schwarz 2018-10-12 20:50:30 +02:00
parent 025fbda984
commit 89e0103abd
6 changed files with 92 additions and 13 deletions

View File

@ -6,9 +6,9 @@ import (
"github.com/zrepl/zrepl/daemon"
)
func RunWakeup(config *config.Config, args []string) error {
if len(args) != 1 {
return errors.Errorf("Expected 1 argument: job")
func RunSignal(config *config.Config, args []string) error {
if len(args) != 2 {
return errors.Errorf("Expected 2 arguments: [wakeup|reset] JOB")
}
httpc, err := controlHttpClient(config.Global.Control.SockPath)
@ -16,11 +16,13 @@ func RunWakeup(config *config.Config, args []string) error {
return err
}
err = jsonRequestResponse(httpc, daemon.ControlJobEndpointWakeup,
err = jsonRequestResponse(httpc, daemon.ControlJobEndpointSignal,
struct {
Name string
Op string
}{
Name: args[0],
Name: args[1],
Op: args[0],
},
struct{}{},
)

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/daemon/job"
@ -65,7 +66,7 @@ const (
ControlJobEndpointPProf string = "/debug/pprof"
ControlJobEndpointVersion string = "/version"
ControlJobEndpointStatus string = "/status"
ControlJobEndpointWakeup string = "/wakeup"
ControlJobEndpointSignal string = "/signal"
)
func (j *controlJob) Run(ctx context.Context) {
@ -104,17 +105,26 @@ func (j *controlJob) Run(ctx context.Context) {
return s, nil
}}})
mux.Handle(ControlJobEndpointWakeup,
mux.Handle(ControlJobEndpointSignal,
requestLogger{log: log, handler: jsonRequestResponder{func(decoder jsonDecoder) (interface{}, error) {
type reqT struct {
Name string
Op string
}
var req reqT
if decoder(&req) != nil {
return nil, errors.Errorf("decode failed")
}
err := j.jobs.wakeup(req.Name)
var err error
switch req.Op {
case "wakeup":
err = j.jobs.wakeup(req.Name)
case "reset":
err = j.jobs.reset(req.Name)
default:
err = fmt.Errorf("operation %q is invalid", req.Op)
}
return struct{}{}, err
}}})

View File

@ -7,6 +7,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/job"
"github.com/zrepl/zrepl/daemon/job/reset"
"github.com/zrepl/zrepl/daemon/job/wakeup"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/logger"
@ -102,12 +103,14 @@ type jobs struct {
// m protects all fields below it
m sync.RWMutex
wakeups map[string]wakeup.Func // by Job.Name
resets map[string]reset.Func // by Job.Name
jobs map[string]job.Job
}
func newJobs() *jobs {
return &jobs{
wakeups: make(map[string]wakeup.Func),
resets: make(map[string]reset.Func),
jobs: make(map[string]job.Job),
}
}
@ -163,6 +166,17 @@ func (s *jobs) wakeup(job string) error {
return wu()
}
func (s *jobs) reset(job string) error {
s.m.RLock()
defer s.m.RUnlock()
wu, ok := s.resets[job]
if !ok {
return errors.Errorf("Job %s does not exist", job)
}
return wu()
}
const (
jobNamePrometheus = "_prometheus"
jobNameControl = "_control"
@ -195,7 +209,9 @@ func (s *jobs) start(ctx context.Context, j job.Job, internal bool) {
s.jobs[jobName] = j
ctx = job.WithLogger(ctx, jobLog)
ctx, wakeup := wakeup.Context(ctx)
ctx, resetFunc := reset.Context(ctx)
s.wakeups[jobName] = wakeup
s.resets[jobName] = resetFunc
s.wg.Add(1)
go func() {

View File

@ -6,6 +6,7 @@ import (
"github.com/problame/go-streamrpc"
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/job/reset"
"github.com/zrepl/zrepl/daemon/job/wakeup"
"github.com/zrepl/zrepl/daemon/transport/connecter"
"github.com/zrepl/zrepl/daemon/filters"
@ -248,6 +249,21 @@ func (j *ActiveSide) do(ctx context.Context) {
log := GetLogger(ctx)
ctx = logging.WithSubsystemLoggers(ctx, log)
// allow cancellation of an invocation (this function)
ctx, cancelThisRun := context.WithCancel(ctx)
defer cancelThisRun()
runDone := make(chan struct{})
defer close(runDone)
go func() {
select {
case <-runDone:
case <-reset.Wait(ctx):
log.Info("reset received, cancelling current invocation")
cancelThisRun()
case <-ctx.Done():
}
}()
client, err := j.clientFactory.NewClient()
if err != nil {
log.WithError(err).Error("factory cannot instantiate streamrpc client")

35
daemon/job/reset/reset.go Normal file
View File

@ -0,0 +1,35 @@
package reset
import (
"context"
"errors"
)
type contextKey int
const contextKeyReset contextKey = iota
func Wait(ctx context.Context) <-chan struct{} {
wc, ok := ctx.Value(contextKeyReset).(chan struct{})
if !ok {
wc = make(chan struct{})
}
return wc
}
type Func func() error
var AlreadyReset = errors.New("already reset")
func Context(ctx context.Context) (context.Context, Func) {
wc := make(chan struct{})
wuf := func() error {
select {
case wc <- struct{}{}:
return nil
default:
return AlreadyReset
}
}
return context.WithValue(ctx, contextKeyReset, wc), wuf
}

10
main.go
View File

@ -29,15 +29,15 @@ var daemonCmd = &cobra.Command{
},
}
var wakeupCmd = &cobra.Command{
Use: "wakeup JOB",
Short: "trigger replication and subsequent pruning for a job",
var signalCmd = &cobra.Command{
Use: "signal [wakeup|reset] JOB",
Short: "wake up a job from wait state or abort its current invocation",
RunE: func(cmd *cobra.Command, args []string) error {
conf, err := config.ParseConfig(rootArgs.configFile)
if err != nil {
return err
}
return client.RunWakeup(conf, args)
return client.RunSignal(conf, args)
},
}
@ -153,7 +153,7 @@ func init() {
//cobra.OnInitialize(initConfig)
rootCmd.PersistentFlags().StringVar(&rootArgs.configFile, "config", "", "config file path")
rootCmd.AddCommand(daemonCmd)
rootCmd.AddCommand(wakeupCmd)
rootCmd.AddCommand(signalCmd)
statusCmd.Flags().BoolVar(&statusCmdFlags.Raw, "raw", false, "dump raw status description from zrepl daemon")
rootCmd.AddCommand(statusCmd)
rootCmd.AddCommand(stdinserverCmd)