diff --git a/client/signal.go b/client/signal.go index 11bf4e4..27eea32 100644 --- a/client/signal.go +++ b/client/signal.go @@ -11,8 +11,8 @@ import ( ) var SignalCmd = &cli.Subcommand{ - Use: "signal [replication|reset|snapshot] JOB", - Short: "run a job replication, abort its current invocation, run a snapshot job", + Use: "signal [wakeup|reset] JOB", + Short: "wake up a job from wait state or abort its current invocation", Run: func(ctx context.Context, subcommand *cli.Subcommand, args []string) error { return runSignalCmd(subcommand.Config(), args) }, @@ -20,7 +20,7 @@ var SignalCmd = &cli.Subcommand{ func runSignalCmd(config *config.Config, args []string) error { if len(args) != 2 { - return errors.Errorf("Expected 2 arguments: [replication|reset|snapshot] JOB") + return errors.Errorf("Expected 2 arguments: [wakeup|reset] JOB") } httpc, err := controlHttpClient(config.Global.Control.SockPath) diff --git a/config/samples/quickstart_backup_to_external_disk.yml b/config/samples/quickstart_backup_to_external_disk.yml index 4eae374..9d7c151 100644 --- a/config/samples/quickstart_backup_to_external_disk.yml +++ b/config/samples/quickstart_backup_to_external_disk.yml @@ -40,7 +40,7 @@ jobs: # This job pushes to the local sink defined in job `backuppool_sink`. # We trigger replication manually from the command line / udev rules using -# `zrepl signal replication push_to_drive` +# `zrepl signal wakeup push_to_drive` - type: push name: push_to_drive connect: diff --git a/daemon/control.go b/daemon/control.go index 5efcf5e..edd1624 100644 --- a/daemon/control.go +++ b/daemon/control.go @@ -143,12 +143,10 @@ func (j *controlJob) Run(ctx context.Context) { var err error switch req.Op { - case "replication": - err = j.jobs.doreplication(req.Name) + case "wakeup": + err = j.jobs.wakeup(req.Name) case "reset": err = j.jobs.reset(req.Name) - case "snapshot": - err = j.jobs.dosnapshot(req.Name) default: err = fmt.Errorf("operation %q is invalid", req.Op) } diff --git a/daemon/daemon.go b/daemon/daemon.go index 2bde353..144d0c4 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -20,9 +20,8 @@ import ( "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon/job" - "github.com/zrepl/zrepl/daemon/job/doreplication" - "github.com/zrepl/zrepl/daemon/job/dosnapshot" "github.com/zrepl/zrepl/daemon/job/reset" + "github.com/zrepl/zrepl/daemon/job/wakeup" "github.com/zrepl/zrepl/daemon/logging" "github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/version" @@ -132,19 +131,17 @@ type jobs struct { wg sync.WaitGroup // m protects all fields below it - m sync.RWMutex - doreplications map[string]doreplication.Func // by Job.Name - resets map[string]reset.Func // by Job.Name - dosnapshots map[string]dosnapshot.Func // by Job.Name - jobs map[string]job.Job + m sync.RWMutex + wakeups map[string]wakeup.Func // by Job.Name + resets map[string]reset.Func // by Job.Name + jobs map[string]job.Job } func newJobs() *jobs { return &jobs{ - doreplications: make(map[string]doreplication.Func), - resets: make(map[string]reset.Func), - dosnapshots: make(map[string]dosnapshot.Func), - jobs: make(map[string]job.Job), + wakeups: make(map[string]wakeup.Func), + resets: make(map[string]reset.Func), + jobs: make(map[string]job.Job), } } @@ -193,11 +190,11 @@ func (s *jobs) status() map[string]*job.Status { return ret } -func (s *jobs) doreplication(job string) error { +func (s *jobs) wakeup(job string) error { s.m.RLock() defer s.m.RUnlock() - wu, ok := s.doreplications[job] + wu, ok := s.wakeups[job] if !ok { return errors.Errorf("Job %s does not exist", job) } @@ -215,17 +212,6 @@ func (s *jobs) reset(job string) error { return wu() } -func (s *jobs) dosnapshot(job string) error { - s.m.RLock() - defer s.m.RUnlock() - - wu, ok := s.dosnapshots[job] - if !ok { - return errors.Errorf("Job %s does not exist", job) - } - return wu() -} - const ( jobNamePrometheus = "_prometheus" jobNameControl = "_control" @@ -256,12 +242,10 @@ func (s *jobs) start(ctx context.Context, j job.Job, internal bool) { s.jobs[jobName] = j ctx = zfscmd.WithJobID(ctx, j.Name()) - ctx, doreplication := doreplication.Context(ctx) + ctx, wakeup := wakeup.Context(ctx) ctx, resetFunc := reset.Context(ctx) - ctx, dosnapshotFunc := dosnapshot.Context(ctx) - s.doreplications[jobName] = doreplication + s.wakeups[jobName] = wakeup s.resets[jobName] = resetFunc - s.dosnapshots[jobName] = dosnapshotFunc s.wg.Add(1) go func() { diff --git a/daemon/job/active.go b/daemon/job/active.go index ec3a8af..cc6cbc1 100644 --- a/daemon/job/active.go +++ b/daemon/job/active.go @@ -14,8 +14,8 @@ import ( "github.com/zrepl/zrepl/util/envconst" "github.com/zrepl/zrepl/config" - "github.com/zrepl/zrepl/daemon/job/doreplication" "github.com/zrepl/zrepl/daemon/job/reset" + "github.com/zrepl/zrepl/daemon/job/wakeup" "github.com/zrepl/zrepl/daemon/pruner" "github.com/zrepl/zrepl/daemon/snapper" "github.com/zrepl/zrepl/endpoint" @@ -89,7 +89,7 @@ type activeMode interface { SenderReceiver() (logic.Sender, logic.Receiver) Type() Type PlannerPolicy() logic.PlannerPolicy - RunPeriodic(ctx context.Context, replicationCommon chan<- struct{}) + RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{}) SnapperReport() *snapper.Report ResetConnectBackoff() } @@ -131,8 +131,8 @@ func (m *modePush) Type() Type { return TypePush } func (m *modePush) PlannerPolicy() logic.PlannerPolicy { return *m.plannerPolicy } -func (m *modePush) RunPeriodic(ctx context.Context, replicationCommon chan<- struct{}) { - m.snapper.Run(ctx, replicationCommon) +func (m *modePush) RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{}) { + m.snapper.Run(ctx, wakeUpCommon) } func (m *modePush) SnapperReport() *snapper.Report { @@ -214,10 +214,10 @@ func (*modePull) Type() Type { return TypePull } func (m *modePull) PlannerPolicy() logic.PlannerPolicy { return *m.plannerPolicy } -func (m *modePull) RunPeriodic(ctx context.Context, replicationCommon chan<- struct{}) { +func (m *modePull) RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{}) { if m.interval.Manual { GetLogger(ctx).Info("manual pull configured, periodic pull disabled") - // "waiting for wakeup replications" is printed in common ActiveSide.do + // "waiting for wakeups" is printed in common ActiveSide.do return } t := time.NewTicker(m.interval.Interval) @@ -226,12 +226,12 @@ func (m *modePull) RunPeriodic(ctx context.Context, replicationCommon chan<- str select { case <-t.C: select { - case replicationCommon <- struct{}{}: + case wakeUpCommon <- struct{}{}: default: GetLogger(ctx). WithField("pull_interval", m.interval). Warn("pull job took longer than pull interval") - replicationCommon <- struct{}{} // block anyways, to queue up the wakeup replication + wakeUpCommon <- struct{}{} // block anyways, to queue up the wakeup } case <-ctx.Done(): return @@ -435,13 +435,13 @@ func (j *ActiveSide) Run(ctx context.Context) { invocationCount := 0 outer: for { - log.Info("wait for replications") + log.Info("wait for wakeups") select { case <-ctx.Done(): log.WithError(ctx.Err()).Info("context") break outer - case <-doreplication.Wait(ctx): + case <-wakeup.Wait(ctx): j.mode.ResetConnectBackoff() case <-periodicDone: } diff --git a/daemon/job/doreplication/doreplication.go b/daemon/job/doreplication/doreplication.go deleted file mode 100644 index b08d834..0000000 --- a/daemon/job/doreplication/doreplication.go +++ /dev/null @@ -1,35 +0,0 @@ -package doreplication - -import ( - "context" - "errors" -) - -type contextKey int - -const contextKeyReplication contextKey = iota - -func Wait(ctx context.Context) <-chan struct{} { - wc, ok := ctx.Value(contextKeyReplication).(chan struct{}) - if !ok { - wc = make(chan struct{}) - } - return wc -} - -type Func func() error - -var AlreadyReplicating = errors.New("already replicating") - -func Context(ctx context.Context) (context.Context, Func) { - wc := make(chan struct{}) - wuf := func() error { - select { - case wc <- struct{}{}: - return nil - default: - return AlreadyReplicating - } - } - return context.WithValue(ctx, contextKeyReplication, wc), wuf -} diff --git a/daemon/job/snapjob.go b/daemon/job/snapjob.go index be1d021..ea446ba 100644 --- a/daemon/job/snapjob.go +++ b/daemon/job/snapjob.go @@ -9,12 +9,12 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/zrepl/zrepl/daemon/job/doreplication" "github.com/zrepl/zrepl/daemon/logging/trace" "github.com/zrepl/zrepl/util/nodefault" "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon/filters" + "github.com/zrepl/zrepl/daemon/job/wakeup" "github.com/zrepl/zrepl/daemon/pruner" "github.com/zrepl/zrepl/daemon/snapper" "github.com/zrepl/zrepl/endpoint" @@ -112,13 +112,13 @@ func (j *SnapJob) Run(ctx context.Context) { invocationCount := 0 outer: for { - log.Info("wait for replications") + log.Info("wait for wakeups") select { case <-ctx.Done(): log.WithError(ctx.Err()).Info("context") break outer - case <-doreplication.Wait(ctx): + case <-wakeup.Wait(ctx): case <-periodicDone: } invocationCount++ diff --git a/daemon/job/dosnapshot/dosnapshot.go b/daemon/job/wakeup/wakeup.go similarity index 57% rename from daemon/job/dosnapshot/dosnapshot.go rename to daemon/job/wakeup/wakeup.go index aa9bfef..a099b53 100644 --- a/daemon/job/dosnapshot/dosnapshot.go +++ b/daemon/job/wakeup/wakeup.go @@ -1,4 +1,4 @@ -package dosnapshot +package wakeup import ( "context" @@ -7,10 +7,10 @@ import ( type contextKey int -const contextKeyDosnapshot contextKey = iota +const contextKeyWakeup contextKey = iota func Wait(ctx context.Context) <-chan struct{} { - wc, ok := ctx.Value(contextKeyDosnapshot).(chan struct{}) + wc, ok := ctx.Value(contextKeyWakeup).(chan struct{}) if !ok { wc = make(chan struct{}) } @@ -19,7 +19,7 @@ func Wait(ctx context.Context) <-chan struct{} { type Func func() error -var AlreadyDosnapshot = errors.New("already snapshotting") +var AlreadyWokenUp = errors.New("already woken up") func Context(ctx context.Context) (context.Context, Func) { wc := make(chan struct{}) @@ -28,8 +28,8 @@ func Context(ctx context.Context) (context.Context, Func) { case wc <- struct{}{}: return nil default: - return AlreadyDosnapshot + return AlreadyWokenUp } } - return context.WithValue(ctx, contextKeyDosnapshot, wc), wuf + return context.WithValue(ctx, contextKeyWakeup, wc), wuf } diff --git a/daemon/snapper/snapper.go b/daemon/snapper/snapper.go index 7ebb8d8..cd4335b 100644 --- a/daemon/snapper/snapper.go +++ b/daemon/snapper/snapper.go @@ -9,7 +9,6 @@ import ( "github.com/pkg/errors" - "github.com/zrepl/zrepl/daemon/job/dosnapshot" "github.com/zrepl/zrepl/daemon/logging/trace" "github.com/zrepl/zrepl/config" @@ -211,10 +210,6 @@ func syncUp(a args, u updater) state { return u(func(s *Snapper) { s.state = Planning }).sf() - case <-dosnapshot.Wait(a.ctx): - return u(func(s *Snapper) { - s.state = Planning - }).sf() case <-a.ctx.Done(): return onMainCtxDone(a.ctx, u) } @@ -383,10 +378,6 @@ func wait(a args, u updater) state { return u(func(snapper *Snapper) { snapper.state = Planning }).sf() - case <-dosnapshot.Wait(a.ctx): - return u(func(snapper *Snapper) { - snapper.state = Planning - }).sf() case <-a.ctx.Done(): return onMainCtxDone(a.ctx, u) } diff --git a/daemon/snapper/snapper_all.go b/daemon/snapper/snapper_all.go index 1eee319..3141069 100644 --- a/daemon/snapper/snapper_all.go +++ b/daemon/snapper/snapper_all.go @@ -18,9 +18,9 @@ type PeriodicOrManual struct { s *Snapper } -func (s *PeriodicOrManual) Run(ctx context.Context, replicationCommon chan<- struct{}) { +func (s *PeriodicOrManual) Run(ctx context.Context, wakeUpCommon chan<- struct{}) { if s.s != nil { - s.s.Run(ctx, replicationCommon) + s.s.Run(ctx, wakeUpCommon) } } diff --git a/docs/changelog.rst b/docs/changelog.rst index cf27085..c551e6d 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -32,7 +32,6 @@ We use the following annotations for classifying changes: 0.4.0 ----- -* |break| Change syntax to trigger a job replication, rename ``zrepl signal wakeup JOB`` to ``zrepl signal replication JOB`` * |feature| support setting zfs send / recv flags in the config (send: ``-wLcepbS`` , recv: ``-ox`` ). Config docs :ref:`here ` and :ref:`here ` . * |feature| parallel replication is now configurable (disabled by default, :ref:`config docs here ` ). diff --git a/docs/configuration/jobs.rst b/docs/configuration/jobs.rst index b54a487..bab5a13 100644 --- a/docs/configuration/jobs.rst +++ b/docs/configuration/jobs.rst @@ -78,7 +78,7 @@ Job Type ``pull`` ``$root_fs/$source_path`` * - ``interval`` - | Interval at which to pull from the source job (e.g. ``10m``). - | ``manual`` disables periodic pulling, replication then only happens on :ref:`replication `. + | ``manual`` disables periodic pulling, replication then only happens on :ref:`wakeup `. * - ``pruning`` - |pruning-spec| diff --git a/docs/configuration/snapshotting.rst b/docs/configuration/snapshotting.rst index 486f1ff..3fd4572 100644 --- a/docs/configuration/snapshotting.rst +++ b/docs/configuration/snapshotting.rst @@ -15,7 +15,7 @@ A filesystem that does not have snapshots by the snapshotter has lower priority For ``push`` jobs, replication is automatically triggered after all filesystems have been snapshotted. -Note that the ``zrepl signal replication JOB`` subcommand does not trigger snapshotting. +Note that the ``zrepl signal wakeup JOB`` subcommand does not trigger snapshotting. :: @@ -38,7 +38,7 @@ There is also a ``manual`` snapshotting type, which covers the following use cas * Existing infrastructure for automatic snapshots: you only want to use this zrepl job for replication. * Handling snapshotting through a separate ``snap`` job. -Note that you will have to trigger replication manually using the ``zrepl signal replication JOB`` subcommand in that case. +Note that you will have to trigger replication manually using the ``zrepl signal wakeup JOB`` subcommand in that case. :: diff --git a/docs/quickstart.rst b/docs/quickstart.rst index 206104a..557e847 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -55,8 +55,8 @@ Watch it Work ============= Run ``zrepl status`` on the active side of the replication setup to monitor snaphotting, replication and pruning activity. -To re-trigger replication (snapshots are separate!), use ``zrepl signal replication JOBNAME``. -(refer to the example use case document if you are uncertain which job you want to start replication). +To re-trigger replication (snapshots are separate!), use ``zrepl signal wakeup JOBNAME``. +(refer to the example use case document if you are uncertain which job you want to wake up). You can also use basic UNIX tools to inspect see what's going on. If you like tmux, here is a handy script that works on FreeBSD: :: diff --git a/docs/usage.rst b/docs/usage.rst index 4b1a2eb..9ff7cdc 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -13,7 +13,7 @@ CLI Overview The zrepl binary is self-documenting: run ``zrepl help`` for an overview of the available subcommands or ``zrepl SUBCOMMAND --help`` for information on available flags, etc. -.. _cli-signal-replication: +.. _cli-signal-wakeup: .. list-table:: :widths: 30 70 @@ -29,7 +29,7 @@ CLI Overview - show job activity, or with ``--raw`` for JSON output * - ``zrepl stdinserver`` - see :ref:`transport-ssh+stdinserver` - * - ``zrepl signal replication JOB`` + * - ``zrepl signal wakeup JOB`` - manually trigger replication + pruning of JOB * - ``zrepl signal reset JOB`` - manually abort current replication + pruning of JOB