diff --git a/client/signal.go b/client/signal.go index 27eea32..11bf4e4 100644 --- a/client/signal.go +++ b/client/signal.go @@ -11,8 +11,8 @@ import ( ) var SignalCmd = &cli.Subcommand{ - Use: "signal [wakeup|reset] JOB", - Short: "wake up a job from wait state or abort its current invocation", + Use: "signal [replication|reset|snapshot] JOB", + Short: "run a job replication, abort its current invocation, run a snapshot job", Run: func(ctx context.Context, subcommand *cli.Subcommand, args []string) error { return runSignalCmd(subcommand.Config(), args) }, @@ -20,7 +20,7 @@ var SignalCmd = &cli.Subcommand{ func runSignalCmd(config *config.Config, args []string) error { if len(args) != 2 { - return errors.Errorf("Expected 2 arguments: [wakeup|reset] JOB") + return errors.Errorf("Expected 2 arguments: [replication|reset|snapshot] JOB") } httpc, err := controlHttpClient(config.Global.Control.SockPath) diff --git a/config/samples/quickstart_backup_to_external_disk.yml b/config/samples/quickstart_backup_to_external_disk.yml index 9d7c151..4eae374 100644 --- a/config/samples/quickstart_backup_to_external_disk.yml +++ b/config/samples/quickstart_backup_to_external_disk.yml @@ -40,7 +40,7 @@ jobs: # This job pushes to the local sink defined in job `backuppool_sink`. # We trigger replication manually from the command line / udev rules using -# `zrepl signal wakeup push_to_drive` +# `zrepl signal replication push_to_drive` - type: push name: push_to_drive connect: diff --git a/daemon/control.go b/daemon/control.go index edd1624..5efcf5e 100644 --- a/daemon/control.go +++ b/daemon/control.go @@ -143,10 +143,12 @@ func (j *controlJob) Run(ctx context.Context) { var err error switch req.Op { - case "wakeup": - err = j.jobs.wakeup(req.Name) + case "replication": + err = j.jobs.doreplication(req.Name) case "reset": err = j.jobs.reset(req.Name) + case "snapshot": + err = j.jobs.dosnapshot(req.Name) default: err = fmt.Errorf("operation %q is invalid", req.Op) } diff --git a/daemon/daemon.go b/daemon/daemon.go index 144d0c4..2bde353 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -20,8 +20,9 @@ import ( "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon/job" + "github.com/zrepl/zrepl/daemon/job/doreplication" + "github.com/zrepl/zrepl/daemon/job/dosnapshot" "github.com/zrepl/zrepl/daemon/job/reset" - "github.com/zrepl/zrepl/daemon/job/wakeup" "github.com/zrepl/zrepl/daemon/logging" "github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/version" @@ -131,17 +132,19 @@ type jobs struct { wg sync.WaitGroup // m protects all fields below it - m sync.RWMutex - wakeups map[string]wakeup.Func // by Job.Name - resets map[string]reset.Func // by Job.Name - jobs map[string]job.Job + m sync.RWMutex + doreplications map[string]doreplication.Func // by Job.Name + resets map[string]reset.Func // by Job.Name + dosnapshots map[string]dosnapshot.Func // by Job.Name + jobs map[string]job.Job } func newJobs() *jobs { return &jobs{ - wakeups: make(map[string]wakeup.Func), - resets: make(map[string]reset.Func), - jobs: make(map[string]job.Job), + doreplications: make(map[string]doreplication.Func), + resets: make(map[string]reset.Func), + dosnapshots: make(map[string]dosnapshot.Func), + jobs: make(map[string]job.Job), } } @@ -190,11 +193,11 @@ func (s *jobs) status() map[string]*job.Status { return ret } -func (s *jobs) wakeup(job string) error { +func (s *jobs) doreplication(job string) error { s.m.RLock() defer s.m.RUnlock() - wu, ok := s.wakeups[job] + wu, ok := s.doreplications[job] if !ok { return errors.Errorf("Job %s does not exist", job) } @@ -212,6 +215,17 @@ func (s *jobs) reset(job string) error { return wu() } +func (s *jobs) dosnapshot(job string) error { + s.m.RLock() + defer s.m.RUnlock() + + wu, ok := s.dosnapshots[job] + if !ok { + return errors.Errorf("Job %s does not exist", job) + } + return wu() +} + const ( jobNamePrometheus = "_prometheus" jobNameControl = "_control" @@ -242,10 +256,12 @@ func (s *jobs) start(ctx context.Context, j job.Job, internal bool) { s.jobs[jobName] = j ctx = zfscmd.WithJobID(ctx, j.Name()) - ctx, wakeup := wakeup.Context(ctx) + ctx, doreplication := doreplication.Context(ctx) ctx, resetFunc := reset.Context(ctx) - s.wakeups[jobName] = wakeup + ctx, dosnapshotFunc := dosnapshot.Context(ctx) + s.doreplications[jobName] = doreplication s.resets[jobName] = resetFunc + s.dosnapshots[jobName] = dosnapshotFunc s.wg.Add(1) go func() { diff --git a/daemon/job/active.go b/daemon/job/active.go index cc6cbc1..ec3a8af 100644 --- a/daemon/job/active.go +++ b/daemon/job/active.go @@ -14,8 +14,8 @@ import ( "github.com/zrepl/zrepl/util/envconst" "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/daemon/job/doreplication" "github.com/zrepl/zrepl/daemon/job/reset" - "github.com/zrepl/zrepl/daemon/job/wakeup" "github.com/zrepl/zrepl/daemon/pruner" "github.com/zrepl/zrepl/daemon/snapper" "github.com/zrepl/zrepl/endpoint" @@ -89,7 +89,7 @@ type activeMode interface { SenderReceiver() (logic.Sender, logic.Receiver) Type() Type PlannerPolicy() logic.PlannerPolicy - RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{}) + RunPeriodic(ctx context.Context, replicationCommon chan<- struct{}) SnapperReport() *snapper.Report ResetConnectBackoff() } @@ -131,8 +131,8 @@ func (m *modePush) Type() Type { return TypePush } func (m *modePush) PlannerPolicy() logic.PlannerPolicy { return *m.plannerPolicy } -func (m *modePush) RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{}) { - m.snapper.Run(ctx, wakeUpCommon) +func (m *modePush) RunPeriodic(ctx context.Context, replicationCommon chan<- struct{}) { + m.snapper.Run(ctx, replicationCommon) } func (m *modePush) SnapperReport() *snapper.Report { @@ -214,10 +214,10 @@ func (*modePull) Type() Type { return TypePull } func (m *modePull) PlannerPolicy() logic.PlannerPolicy { return *m.plannerPolicy } -func (m *modePull) RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{}) { +func (m *modePull) RunPeriodic(ctx context.Context, replicationCommon chan<- struct{}) { if m.interval.Manual { GetLogger(ctx).Info("manual pull configured, periodic pull disabled") - // "waiting for wakeups" is printed in common ActiveSide.do + // "waiting for wakeup replications" is printed in common ActiveSide.do return } t := time.NewTicker(m.interval.Interval) @@ -226,12 +226,12 @@ func (m *modePull) RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{} select { case <-t.C: select { - case wakeUpCommon <- struct{}{}: + case replicationCommon <- struct{}{}: default: GetLogger(ctx). WithField("pull_interval", m.interval). Warn("pull job took longer than pull interval") - wakeUpCommon <- struct{}{} // block anyways, to queue up the wakeup + replicationCommon <- struct{}{} // block anyways, to queue up the wakeup replication } case <-ctx.Done(): return @@ -435,13 +435,13 @@ func (j *ActiveSide) Run(ctx context.Context) { invocationCount := 0 outer: for { - log.Info("wait for wakeups") + log.Info("wait for replications") select { case <-ctx.Done(): log.WithError(ctx.Err()).Info("context") break outer - case <-wakeup.Wait(ctx): + case <-doreplication.Wait(ctx): j.mode.ResetConnectBackoff() case <-periodicDone: } diff --git a/daemon/job/doreplication/doreplication.go b/daemon/job/doreplication/doreplication.go new file mode 100644 index 0000000..b08d834 --- /dev/null +++ b/daemon/job/doreplication/doreplication.go @@ -0,0 +1,35 @@ +package doreplication + +import ( + "context" + "errors" +) + +type contextKey int + +const contextKeyReplication contextKey = iota + +func Wait(ctx context.Context) <-chan struct{} { + wc, ok := ctx.Value(contextKeyReplication).(chan struct{}) + if !ok { + wc = make(chan struct{}) + } + return wc +} + +type Func func() error + +var AlreadyReplicating = errors.New("already replicating") + +func Context(ctx context.Context) (context.Context, Func) { + wc := make(chan struct{}) + wuf := func() error { + select { + case wc <- struct{}{}: + return nil + default: + return AlreadyReplicating + } + } + return context.WithValue(ctx, contextKeyReplication, wc), wuf +} diff --git a/daemon/job/wakeup/wakeup.go b/daemon/job/dosnapshot/dosnapshot.go similarity index 57% rename from daemon/job/wakeup/wakeup.go rename to daemon/job/dosnapshot/dosnapshot.go index a099b53..aa9bfef 100644 --- a/daemon/job/wakeup/wakeup.go +++ b/daemon/job/dosnapshot/dosnapshot.go @@ -1,4 +1,4 @@ -package wakeup +package dosnapshot import ( "context" @@ -7,10 +7,10 @@ import ( type contextKey int -const contextKeyWakeup contextKey = iota +const contextKeyDosnapshot contextKey = iota func Wait(ctx context.Context) <-chan struct{} { - wc, ok := ctx.Value(contextKeyWakeup).(chan struct{}) + wc, ok := ctx.Value(contextKeyDosnapshot).(chan struct{}) if !ok { wc = make(chan struct{}) } @@ -19,7 +19,7 @@ func Wait(ctx context.Context) <-chan struct{} { type Func func() error -var AlreadyWokenUp = errors.New("already woken up") +var AlreadyDosnapshot = errors.New("already snapshotting") func Context(ctx context.Context) (context.Context, Func) { wc := make(chan struct{}) @@ -28,8 +28,8 @@ func Context(ctx context.Context) (context.Context, Func) { case wc <- struct{}{}: return nil default: - return AlreadyWokenUp + return AlreadyDosnapshot } } - return context.WithValue(ctx, contextKeyWakeup, wc), wuf + return context.WithValue(ctx, contextKeyDosnapshot, wc), wuf } diff --git a/daemon/job/snapjob.go b/daemon/job/snapjob.go index ea446ba..be1d021 100644 --- a/daemon/job/snapjob.go +++ b/daemon/job/snapjob.go @@ -9,12 +9,12 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/zrepl/zrepl/daemon/job/doreplication" "github.com/zrepl/zrepl/daemon/logging/trace" "github.com/zrepl/zrepl/util/nodefault" "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon/filters" - "github.com/zrepl/zrepl/daemon/job/wakeup" "github.com/zrepl/zrepl/daemon/pruner" "github.com/zrepl/zrepl/daemon/snapper" "github.com/zrepl/zrepl/endpoint" @@ -112,13 +112,13 @@ func (j *SnapJob) Run(ctx context.Context) { invocationCount := 0 outer: for { - log.Info("wait for wakeups") + log.Info("wait for replications") select { case <-ctx.Done(): log.WithError(ctx.Err()).Info("context") break outer - case <-wakeup.Wait(ctx): + case <-doreplication.Wait(ctx): case <-periodicDone: } invocationCount++ diff --git a/daemon/snapper/snapper.go b/daemon/snapper/snapper.go index cd4335b..7ebb8d8 100644 --- a/daemon/snapper/snapper.go +++ b/daemon/snapper/snapper.go @@ -9,6 +9,7 @@ import ( "github.com/pkg/errors" + "github.com/zrepl/zrepl/daemon/job/dosnapshot" "github.com/zrepl/zrepl/daemon/logging/trace" "github.com/zrepl/zrepl/config" @@ -210,6 +211,10 @@ func syncUp(a args, u updater) state { return u(func(s *Snapper) { s.state = Planning }).sf() + case <-dosnapshot.Wait(a.ctx): + return u(func(s *Snapper) { + s.state = Planning + }).sf() case <-a.ctx.Done(): return onMainCtxDone(a.ctx, u) } @@ -378,6 +383,10 @@ func wait(a args, u updater) state { return u(func(snapper *Snapper) { snapper.state = Planning }).sf() + case <-dosnapshot.Wait(a.ctx): + return u(func(snapper *Snapper) { + snapper.state = Planning + }).sf() case <-a.ctx.Done(): return onMainCtxDone(a.ctx, u) } diff --git a/daemon/snapper/snapper_all.go b/daemon/snapper/snapper_all.go index 3141069..1eee319 100644 --- a/daemon/snapper/snapper_all.go +++ b/daemon/snapper/snapper_all.go @@ -18,9 +18,9 @@ type PeriodicOrManual struct { s *Snapper } -func (s *PeriodicOrManual) Run(ctx context.Context, wakeUpCommon chan<- struct{}) { +func (s *PeriodicOrManual) Run(ctx context.Context, replicationCommon chan<- struct{}) { if s.s != nil { - s.s.Run(ctx, wakeUpCommon) + s.s.Run(ctx, replicationCommon) } } diff --git a/docs/changelog.rst b/docs/changelog.rst index fedc4c3..44f6ee2 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -27,6 +27,11 @@ We use the following annotations for classifying changes: * |bugfix| Change that fixes a bug, no regressions or incompatibilities expected. * |docs| Change to the documentation. +0.4.0 +----- + +* |break| Change syntax to trigger a job replication, rename ``zrepl signal wakeup JOB`` to ``zrepl signal replication JOB`` + 0.3.1 ----- diff --git a/docs/configuration/jobs.rst b/docs/configuration/jobs.rst index bab5a13..b54a487 100644 --- a/docs/configuration/jobs.rst +++ b/docs/configuration/jobs.rst @@ -78,7 +78,7 @@ Job Type ``pull`` ``$root_fs/$source_path`` * - ``interval`` - | Interval at which to pull from the source job (e.g. ``10m``). - | ``manual`` disables periodic pulling, replication then only happens on :ref:`wakeup `. + | ``manual`` disables periodic pulling, replication then only happens on :ref:`replication `. * - ``pruning`` - |pruning-spec| diff --git a/docs/configuration/snapshotting.rst b/docs/configuration/snapshotting.rst index 3fd4572..486f1ff 100644 --- a/docs/configuration/snapshotting.rst +++ b/docs/configuration/snapshotting.rst @@ -15,7 +15,7 @@ A filesystem that does not have snapshots by the snapshotter has lower priority For ``push`` jobs, replication is automatically triggered after all filesystems have been snapshotted. -Note that the ``zrepl signal wakeup JOB`` subcommand does not trigger snapshotting. +Note that the ``zrepl signal replication JOB`` subcommand does not trigger snapshotting. :: @@ -38,7 +38,7 @@ There is also a ``manual`` snapshotting type, which covers the following use cas * Existing infrastructure for automatic snapshots: you only want to use this zrepl job for replication. * Handling snapshotting through a separate ``snap`` job. -Note that you will have to trigger replication manually using the ``zrepl signal wakeup JOB`` subcommand in that case. +Note that you will have to trigger replication manually using the ``zrepl signal replication JOB`` subcommand in that case. :: diff --git a/docs/quickstart.rst b/docs/quickstart.rst index 557e847..206104a 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -55,8 +55,8 @@ Watch it Work ============= Run ``zrepl status`` on the active side of the replication setup to monitor snaphotting, replication and pruning activity. -To re-trigger replication (snapshots are separate!), use ``zrepl signal wakeup JOBNAME``. -(refer to the example use case document if you are uncertain which job you want to wake up). +To re-trigger replication (snapshots are separate!), use ``zrepl signal replication JOBNAME``. +(refer to the example use case document if you are uncertain which job you want to start replication). You can also use basic UNIX tools to inspect see what's going on. If you like tmux, here is a handy script that works on FreeBSD: :: diff --git a/docs/usage.rst b/docs/usage.rst index 9ff7cdc..4b1a2eb 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -13,7 +13,7 @@ CLI Overview The zrepl binary is self-documenting: run ``zrepl help`` for an overview of the available subcommands or ``zrepl SUBCOMMAND --help`` for information on available flags, etc. -.. _cli-signal-wakeup: +.. _cli-signal-replication: .. list-table:: :widths: 30 70 @@ -29,7 +29,7 @@ CLI Overview - show job activity, or with ``--raw`` for JSON output * - ``zrepl stdinserver`` - see :ref:`transport-ssh+stdinserver` - * - ``zrepl signal wakeup JOB`` + * - ``zrepl signal replication JOB`` - manually trigger replication + pruning of JOB * - ``zrepl signal reset JOB`` - manually abort current replication + pruning of JOB