mirror of
https://github.com/zrepl/zrepl.git
synced 2025-03-06 19:21:34 +01:00
rename signal wakeup to replicate
This commit is contained in:
parent
6b87589aa1
commit
9237a782f0
@ -11,8 +11,8 @@ import (
|
||||
)
|
||||
|
||||
var SignalCmd = &cli.Subcommand{
|
||||
Use: "signal [wakeup|snapshot|prune|reset] JOB",
|
||||
Short: "wake up a job from wait state, run a snapshot job, run a prune job, abort its current invocation",
|
||||
Use: "signal [replicate|snapshot|prune|reset] JOB",
|
||||
Short: "replicate a job, run a snapshot job, run a prune job, abort its current invocation",
|
||||
Run: func(ctx context.Context, subcommand *cli.Subcommand, args []string) error {
|
||||
return runSignalCmd(subcommand.Config(), args)
|
||||
},
|
||||
@ -20,7 +20,7 @@ var SignalCmd = &cli.Subcommand{
|
||||
|
||||
func runSignalCmd(config *config.Config, args []string) error {
|
||||
if len(args) != 2 {
|
||||
return errors.Errorf("Expected 2 arguments: [wakeup|snapshot|prune|reset] JOB")
|
||||
return errors.Errorf("Expected 2 arguments: [replicate|snapshot|prune|reset] JOB")
|
||||
}
|
||||
|
||||
httpc, err := controlHttpClient(config.Global.Control.SockPath)
|
||||
|
@ -55,8 +55,8 @@ func (c *Client) signal(job, sig string) error {
|
||||
)
|
||||
}
|
||||
|
||||
func (c *Client) SignalWakeup(job string) error {
|
||||
return c.signal(job, "wakeup")
|
||||
func (c *Client) SignalReplicate(job string) error {
|
||||
return c.signal(job, "replicate")
|
||||
}
|
||||
|
||||
func (c *Client) SignalSnapshot(job string) error {
|
||||
|
@ -19,7 +19,7 @@ import (
|
||||
type Client interface {
|
||||
Status() (daemon.Status, error)
|
||||
StatusRaw() ([]byte, error)
|
||||
SignalWakeup(job string) error
|
||||
SignalReplicate(job string) error
|
||||
SignalSnapshot(job string) error
|
||||
SignalPrune(job string) error
|
||||
SignalReset(job string) error
|
||||
|
@ -281,8 +281,8 @@ func interactive(c Client, flag statusFlags) error {
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
signals := []string{"wakeup", "snapshot", "prune", "reset"}
|
||||
clientFuncs := []func(job string) error{c.SignalWakeup, c.SignalSnapshot, c.SignalPrune, c.SignalReset}
|
||||
signals := []string{"replicate", "snapshot", "prune", "reset"}
|
||||
clientFuncs := []func(job string) error{c.SignalReplicate, c.SignalSnapshot, c.SignalPrune, c.SignalReset}
|
||||
sigMod := tview.NewModal()
|
||||
sigMod.SetBackgroundColor(tcell.ColorDefault)
|
||||
sigMod.SetBorder(true)
|
||||
|
@ -40,7 +40,7 @@ jobs:
|
||||
|
||||
# This job pushes to the local sink defined in job `backuppool_sink`.
|
||||
# We trigger replication manually from the command line / udev rules using
|
||||
# `zrepl signal wakeup push_to_drive`
|
||||
# `zrepl signal replicate push_to_drive`
|
||||
- type: push
|
||||
name: push_to_drive
|
||||
connect:
|
||||
|
@ -143,14 +143,14 @@ func (j *controlJob) Run(ctx context.Context) {
|
||||
|
||||
var err error
|
||||
switch req.Op {
|
||||
case "wakeup":
|
||||
err = j.jobs.wakeup(req.Name)
|
||||
case "reset":
|
||||
err = j.jobs.reset(req.Name)
|
||||
case "replicate":
|
||||
err = j.jobs.doreplicate(req.Name)
|
||||
case "snapshot":
|
||||
err = j.jobs.dosnapshot(req.Name)
|
||||
case "prune":
|
||||
err = j.jobs.doprune(req.Name)
|
||||
case "reset":
|
||||
err = j.jobs.reset(req.Name)
|
||||
default:
|
||||
err = fmt.Errorf("operation %q is invalid", req.Op)
|
||||
}
|
||||
|
@ -21,9 +21,9 @@ import (
|
||||
"github.com/zrepl/zrepl/config"
|
||||
"github.com/zrepl/zrepl/daemon/job"
|
||||
"github.com/zrepl/zrepl/daemon/job/doprune"
|
||||
"github.com/zrepl/zrepl/daemon/job/doreplicate"
|
||||
"github.com/zrepl/zrepl/daemon/job/dosnapshot"
|
||||
"github.com/zrepl/zrepl/daemon/job/reset"
|
||||
"github.com/zrepl/zrepl/daemon/job/wakeup"
|
||||
"github.com/zrepl/zrepl/daemon/logging"
|
||||
"github.com/zrepl/zrepl/logger"
|
||||
"github.com/zrepl/zrepl/version"
|
||||
@ -133,21 +133,21 @@ type jobs struct {
|
||||
wg sync.WaitGroup
|
||||
|
||||
// m protects all fields below it
|
||||
m sync.RWMutex
|
||||
wakeups map[string]wakeup.Func // by Job.Name
|
||||
resets map[string]reset.Func // by Job.Name
|
||||
dosnapshots map[string]dosnapshot.Func // by Job.Name
|
||||
doprunes map[string]doprune.Func // by Job.Name
|
||||
jobs map[string]job.Job
|
||||
m sync.RWMutex
|
||||
doreplicates map[string]doreplicate.Func // by Job.Name
|
||||
dosnapshots map[string]dosnapshot.Func // by Job.Name
|
||||
doprunes map[string]doprune.Func // by Job.Name
|
||||
resets map[string]reset.Func // by Job.Name
|
||||
jobs map[string]job.Job
|
||||
}
|
||||
|
||||
func newJobs() *jobs {
|
||||
return &jobs{
|
||||
wakeups: make(map[string]wakeup.Func),
|
||||
resets: make(map[string]reset.Func),
|
||||
dosnapshots: make(map[string]dosnapshot.Func),
|
||||
doprunes: make(map[string]doprune.Func),
|
||||
jobs: make(map[string]job.Job),
|
||||
doreplicates: make(map[string]doreplicate.Func),
|
||||
dosnapshots: make(map[string]dosnapshot.Func),
|
||||
doprunes: make(map[string]doprune.Func),
|
||||
resets: make(map[string]reset.Func),
|
||||
jobs: make(map[string]job.Job),
|
||||
}
|
||||
}
|
||||
|
||||
@ -196,22 +196,11 @@ func (s *jobs) status() map[string]*job.Status {
|
||||
return ret
|
||||
}
|
||||
|
||||
func (s *jobs) wakeup(job string) error {
|
||||
func (s *jobs) doreplicate(job string) error {
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
wu, ok := s.wakeups[job]
|
||||
if !ok {
|
||||
return errors.Errorf("Job %s does not exist", job)
|
||||
}
|
||||
return wu()
|
||||
}
|
||||
|
||||
func (s *jobs) reset(job string) error {
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
wu, ok := s.resets[job]
|
||||
wu, ok := s.doreplicates[job]
|
||||
if !ok {
|
||||
return errors.Errorf("Job %s does not exist", job)
|
||||
}
|
||||
@ -240,6 +229,17 @@ func (s *jobs) doprune(job string) error {
|
||||
return wu()
|
||||
}
|
||||
|
||||
func (s *jobs) reset(job string) error {
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
wu, ok := s.resets[job]
|
||||
if !ok {
|
||||
return errors.Errorf("Job %s does not exist", job)
|
||||
}
|
||||
return wu()
|
||||
}
|
||||
|
||||
const (
|
||||
jobNamePrometheus = "_prometheus"
|
||||
jobNameControl = "_control"
|
||||
@ -270,14 +270,14 @@ func (s *jobs) start(ctx context.Context, j job.Job, internal bool) {
|
||||
|
||||
s.jobs[jobName] = j
|
||||
ctx = zfscmd.WithJobID(ctx, j.Name())
|
||||
ctx, wakeup := wakeup.Context(ctx)
|
||||
ctx, resetFunc := reset.Context(ctx)
|
||||
ctx, doreplicateFunc := doreplicate.Context(ctx)
|
||||
ctx, dosnapshotFunc := dosnapshot.Context(ctx)
|
||||
ctx, dopruneFunc := doprune.Context(ctx)
|
||||
s.wakeups[jobName] = wakeup
|
||||
s.resets[jobName] = resetFunc
|
||||
ctx, resetFunc := reset.Context(ctx)
|
||||
s.doreplicates[jobName] = doreplicateFunc
|
||||
s.dosnapshots[jobName] = dosnapshotFunc
|
||||
s.doprunes[jobName] = dopruneFunc
|
||||
s.resets[jobName] = resetFunc
|
||||
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
|
@ -16,8 +16,8 @@ import (
|
||||
|
||||
"github.com/zrepl/zrepl/config"
|
||||
"github.com/zrepl/zrepl/daemon/job/doprune"
|
||||
"github.com/zrepl/zrepl/daemon/job/doreplicate"
|
||||
"github.com/zrepl/zrepl/daemon/job/reset"
|
||||
"github.com/zrepl/zrepl/daemon/job/wakeup"
|
||||
"github.com/zrepl/zrepl/daemon/pruner"
|
||||
"github.com/zrepl/zrepl/daemon/snapper"
|
||||
"github.com/zrepl/zrepl/endpoint"
|
||||
@ -448,7 +448,7 @@ outer:
|
||||
j.doPrune(invocationCtx)
|
||||
endSpan()
|
||||
|
||||
case <-wakeup.Wait(ctx):
|
||||
case <-doreplicate.Wait(ctx):
|
||||
j.mode.ResetConnectBackoff()
|
||||
|
||||
invocationCount++
|
||||
|
@ -1,4 +1,4 @@
|
||||
package wakeup
|
||||
package doreplicate
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -7,10 +7,10 @@ import (
|
||||
|
||||
type contextKey int
|
||||
|
||||
const contextKeyWakeup contextKey = iota
|
||||
const contextKeyReplicate contextKey = iota
|
||||
|
||||
func Wait(ctx context.Context) <-chan struct{} {
|
||||
wc, ok := ctx.Value(contextKeyWakeup).(chan struct{})
|
||||
wc, ok := ctx.Value(contextKeyReplicate).(chan struct{})
|
||||
if !ok {
|
||||
wc = make(chan struct{})
|
||||
}
|
||||
@ -19,7 +19,7 @@ func Wait(ctx context.Context) <-chan struct{} {
|
||||
|
||||
type Func func() error
|
||||
|
||||
var AlreadyWokenUp = errors.New("Cannot wakeup")
|
||||
var AlreadyReplicate = errors.New("Cannot start replication")
|
||||
|
||||
func Context(ctx context.Context) (context.Context, Func) {
|
||||
wc := make(chan struct{})
|
||||
@ -28,8 +28,8 @@ func Context(ctx context.Context) (context.Context, Func) {
|
||||
case wc <- struct{}{}:
|
||||
return nil
|
||||
default:
|
||||
return AlreadyWokenUp
|
||||
return AlreadyReplicate
|
||||
}
|
||||
}
|
||||
return context.WithValue(ctx, contextKeyWakeup, wc), wuf
|
||||
return context.WithValue(ctx, contextKeyReplicate, wc), wuf
|
||||
}
|
Loading…
Reference in New Issue
Block a user