add signal 'snapshot', rename existing signal 'wakeup' to 'replication'

This commit is contained in:
Calistoc 2021-01-01 23:32:35 +01:00 committed by Christian Schwarz
parent 0ceea1b792
commit 2c8c2cfa14
15 changed files with 113 additions and 46 deletions

View File

@ -11,8 +11,8 @@ import (
) )
var SignalCmd = &cli.Subcommand{ var SignalCmd = &cli.Subcommand{
Use: "signal [wakeup|reset] JOB", Use: "signal [replication|reset|snapshot] JOB",
Short: "wake up a job from wait state or abort its current invocation", Short: "run a job replication, abort its current invocation, run a snapshot job",
Run: func(ctx context.Context, subcommand *cli.Subcommand, args []string) error { Run: func(ctx context.Context, subcommand *cli.Subcommand, args []string) error {
return runSignalCmd(subcommand.Config(), args) return runSignalCmd(subcommand.Config(), args)
}, },
@ -20,7 +20,7 @@ var SignalCmd = &cli.Subcommand{
func runSignalCmd(config *config.Config, args []string) error { func runSignalCmd(config *config.Config, args []string) error {
if len(args) != 2 { if len(args) != 2 {
return errors.Errorf("Expected 2 arguments: [wakeup|reset] JOB") return errors.Errorf("Expected 2 arguments: [replication|reset|snapshot] JOB")
} }
httpc, err := controlHttpClient(config.Global.Control.SockPath) httpc, err := controlHttpClient(config.Global.Control.SockPath)

View File

@ -40,7 +40,7 @@ jobs:
# This job pushes to the local sink defined in job `backuppool_sink`. # This job pushes to the local sink defined in job `backuppool_sink`.
# We trigger replication manually from the command line / udev rules using # We trigger replication manually from the command line / udev rules using
# `zrepl signal wakeup push_to_drive` # `zrepl signal replication push_to_drive`
- type: push - type: push
name: push_to_drive name: push_to_drive
connect: connect:

View File

@ -143,10 +143,12 @@ func (j *controlJob) Run(ctx context.Context) {
var err error var err error
switch req.Op { switch req.Op {
case "wakeup": case "replication":
err = j.jobs.wakeup(req.Name) err = j.jobs.doreplication(req.Name)
case "reset": case "reset":
err = j.jobs.reset(req.Name) err = j.jobs.reset(req.Name)
case "snapshot":
err = j.jobs.dosnapshot(req.Name)
default: default:
err = fmt.Errorf("operation %q is invalid", req.Op) err = fmt.Errorf("operation %q is invalid", req.Op)
} }

View File

@ -20,8 +20,9 @@ import (
"github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/job" "github.com/zrepl/zrepl/daemon/job"
"github.com/zrepl/zrepl/daemon/job/doreplication"
"github.com/zrepl/zrepl/daemon/job/dosnapshot"
"github.com/zrepl/zrepl/daemon/job/reset" "github.com/zrepl/zrepl/daemon/job/reset"
"github.com/zrepl/zrepl/daemon/job/wakeup"
"github.com/zrepl/zrepl/daemon/logging" "github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/version" "github.com/zrepl/zrepl/version"
@ -131,17 +132,19 @@ type jobs struct {
wg sync.WaitGroup wg sync.WaitGroup
// m protects all fields below it // m protects all fields below it
m sync.RWMutex m sync.RWMutex
wakeups map[string]wakeup.Func // by Job.Name doreplications map[string]doreplication.Func // by Job.Name
resets map[string]reset.Func // by Job.Name resets map[string]reset.Func // by Job.Name
jobs map[string]job.Job dosnapshots map[string]dosnapshot.Func // by Job.Name
jobs map[string]job.Job
} }
func newJobs() *jobs { func newJobs() *jobs {
return &jobs{ return &jobs{
wakeups: make(map[string]wakeup.Func), doreplications: make(map[string]doreplication.Func),
resets: make(map[string]reset.Func), resets: make(map[string]reset.Func),
jobs: make(map[string]job.Job), dosnapshots: make(map[string]dosnapshot.Func),
jobs: make(map[string]job.Job),
} }
} }
@ -190,11 +193,11 @@ func (s *jobs) status() map[string]*job.Status {
return ret return ret
} }
func (s *jobs) wakeup(job string) error { func (s *jobs) doreplication(job string) error {
s.m.RLock() s.m.RLock()
defer s.m.RUnlock() defer s.m.RUnlock()
wu, ok := s.wakeups[job] wu, ok := s.doreplications[job]
if !ok { if !ok {
return errors.Errorf("Job %s does not exist", job) return errors.Errorf("Job %s does not exist", job)
} }
@ -212,6 +215,17 @@ func (s *jobs) reset(job string) error {
return wu() return wu()
} }
func (s *jobs) dosnapshot(job string) error {
s.m.RLock()
defer s.m.RUnlock()
wu, ok := s.dosnapshots[job]
if !ok {
return errors.Errorf("Job %s does not exist", job)
}
return wu()
}
const ( const (
jobNamePrometheus = "_prometheus" jobNamePrometheus = "_prometheus"
jobNameControl = "_control" jobNameControl = "_control"
@ -242,10 +256,12 @@ func (s *jobs) start(ctx context.Context, j job.Job, internal bool) {
s.jobs[jobName] = j s.jobs[jobName] = j
ctx = zfscmd.WithJobID(ctx, j.Name()) ctx = zfscmd.WithJobID(ctx, j.Name())
ctx, wakeup := wakeup.Context(ctx) ctx, doreplication := doreplication.Context(ctx)
ctx, resetFunc := reset.Context(ctx) ctx, resetFunc := reset.Context(ctx)
s.wakeups[jobName] = wakeup ctx, dosnapshotFunc := dosnapshot.Context(ctx)
s.doreplications[jobName] = doreplication
s.resets[jobName] = resetFunc s.resets[jobName] = resetFunc
s.dosnapshots[jobName] = dosnapshotFunc
s.wg.Add(1) s.wg.Add(1)
go func() { go func() {

View File

@ -14,8 +14,8 @@ import (
"github.com/zrepl/zrepl/util/envconst" "github.com/zrepl/zrepl/util/envconst"
"github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/job/doreplication"
"github.com/zrepl/zrepl/daemon/job/reset" "github.com/zrepl/zrepl/daemon/job/reset"
"github.com/zrepl/zrepl/daemon/job/wakeup"
"github.com/zrepl/zrepl/daemon/pruner" "github.com/zrepl/zrepl/daemon/pruner"
"github.com/zrepl/zrepl/daemon/snapper" "github.com/zrepl/zrepl/daemon/snapper"
"github.com/zrepl/zrepl/endpoint" "github.com/zrepl/zrepl/endpoint"
@ -89,7 +89,7 @@ type activeMode interface {
SenderReceiver() (logic.Sender, logic.Receiver) SenderReceiver() (logic.Sender, logic.Receiver)
Type() Type Type() Type
PlannerPolicy() logic.PlannerPolicy PlannerPolicy() logic.PlannerPolicy
RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{}) RunPeriodic(ctx context.Context, replicationCommon chan<- struct{})
SnapperReport() *snapper.Report SnapperReport() *snapper.Report
ResetConnectBackoff() ResetConnectBackoff()
} }
@ -131,8 +131,8 @@ func (m *modePush) Type() Type { return TypePush }
func (m *modePush) PlannerPolicy() logic.PlannerPolicy { return *m.plannerPolicy } func (m *modePush) PlannerPolicy() logic.PlannerPolicy { return *m.plannerPolicy }
func (m *modePush) RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{}) { func (m *modePush) RunPeriodic(ctx context.Context, replicationCommon chan<- struct{}) {
m.snapper.Run(ctx, wakeUpCommon) m.snapper.Run(ctx, replicationCommon)
} }
func (m *modePush) SnapperReport() *snapper.Report { func (m *modePush) SnapperReport() *snapper.Report {
@ -214,10 +214,10 @@ func (*modePull) Type() Type { return TypePull }
func (m *modePull) PlannerPolicy() logic.PlannerPolicy { return *m.plannerPolicy } func (m *modePull) PlannerPolicy() logic.PlannerPolicy { return *m.plannerPolicy }
func (m *modePull) RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{}) { func (m *modePull) RunPeriodic(ctx context.Context, replicationCommon chan<- struct{}) {
if m.interval.Manual { if m.interval.Manual {
GetLogger(ctx).Info("manual pull configured, periodic pull disabled") GetLogger(ctx).Info("manual pull configured, periodic pull disabled")
// "waiting for wakeups" is printed in common ActiveSide.do // "waiting for wakeup replications" is printed in common ActiveSide.do
return return
} }
t := time.NewTicker(m.interval.Interval) t := time.NewTicker(m.interval.Interval)
@ -226,12 +226,12 @@ func (m *modePull) RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{}
select { select {
case <-t.C: case <-t.C:
select { select {
case wakeUpCommon <- struct{}{}: case replicationCommon <- struct{}{}:
default: default:
GetLogger(ctx). GetLogger(ctx).
WithField("pull_interval", m.interval). WithField("pull_interval", m.interval).
Warn("pull job took longer than pull interval") Warn("pull job took longer than pull interval")
wakeUpCommon <- struct{}{} // block anyways, to queue up the wakeup replicationCommon <- struct{}{} // block anyways, to queue up the wakeup replication
} }
case <-ctx.Done(): case <-ctx.Done():
return return
@ -435,13 +435,13 @@ func (j *ActiveSide) Run(ctx context.Context) {
invocationCount := 0 invocationCount := 0
outer: outer:
for { for {
log.Info("wait for wakeups") log.Info("wait for replications")
select { select {
case <-ctx.Done(): case <-ctx.Done():
log.WithError(ctx.Err()).Info("context") log.WithError(ctx.Err()).Info("context")
break outer break outer
case <-wakeup.Wait(ctx): case <-doreplication.Wait(ctx):
j.mode.ResetConnectBackoff() j.mode.ResetConnectBackoff()
case <-periodicDone: case <-periodicDone:
} }

View File

@ -0,0 +1,35 @@
package doreplication
import (
"context"
"errors"
)
type contextKey int
const contextKeyReplication contextKey = iota
func Wait(ctx context.Context) <-chan struct{} {
wc, ok := ctx.Value(contextKeyReplication).(chan struct{})
if !ok {
wc = make(chan struct{})
}
return wc
}
type Func func() error
var AlreadyReplicating = errors.New("already replicating")
func Context(ctx context.Context) (context.Context, Func) {
wc := make(chan struct{})
wuf := func() error {
select {
case wc <- struct{}{}:
return nil
default:
return AlreadyReplicating
}
}
return context.WithValue(ctx, contextKeyReplication, wc), wuf
}

View File

@ -1,4 +1,4 @@
package wakeup package dosnapshot
import ( import (
"context" "context"
@ -7,10 +7,10 @@ import (
type contextKey int type contextKey int
const contextKeyWakeup contextKey = iota const contextKeyDosnapshot contextKey = iota
func Wait(ctx context.Context) <-chan struct{} { func Wait(ctx context.Context) <-chan struct{} {
wc, ok := ctx.Value(contextKeyWakeup).(chan struct{}) wc, ok := ctx.Value(contextKeyDosnapshot).(chan struct{})
if !ok { if !ok {
wc = make(chan struct{}) wc = make(chan struct{})
} }
@ -19,7 +19,7 @@ func Wait(ctx context.Context) <-chan struct{} {
type Func func() error type Func func() error
var AlreadyWokenUp = errors.New("already woken up") var AlreadyDosnapshot = errors.New("already snapshotting")
func Context(ctx context.Context) (context.Context, Func) { func Context(ctx context.Context) (context.Context, Func) {
wc := make(chan struct{}) wc := make(chan struct{})
@ -28,8 +28,8 @@ func Context(ctx context.Context) (context.Context, Func) {
case wc <- struct{}{}: case wc <- struct{}{}:
return nil return nil
default: default:
return AlreadyWokenUp return AlreadyDosnapshot
} }
} }
return context.WithValue(ctx, contextKeyWakeup, wc), wuf return context.WithValue(ctx, contextKeyDosnapshot, wc), wuf
} }

View File

@ -9,12 +9,12 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/daemon/job/doreplication"
"github.com/zrepl/zrepl/daemon/logging/trace" "github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/util/nodefault" "github.com/zrepl/zrepl/util/nodefault"
"github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/filters" "github.com/zrepl/zrepl/daemon/filters"
"github.com/zrepl/zrepl/daemon/job/wakeup"
"github.com/zrepl/zrepl/daemon/pruner" "github.com/zrepl/zrepl/daemon/pruner"
"github.com/zrepl/zrepl/daemon/snapper" "github.com/zrepl/zrepl/daemon/snapper"
"github.com/zrepl/zrepl/endpoint" "github.com/zrepl/zrepl/endpoint"
@ -112,13 +112,13 @@ func (j *SnapJob) Run(ctx context.Context) {
invocationCount := 0 invocationCount := 0
outer: outer:
for { for {
log.Info("wait for wakeups") log.Info("wait for replications")
select { select {
case <-ctx.Done(): case <-ctx.Done():
log.WithError(ctx.Err()).Info("context") log.WithError(ctx.Err()).Info("context")
break outer break outer
case <-wakeup.Wait(ctx): case <-doreplication.Wait(ctx):
case <-periodicDone: case <-periodicDone:
} }
invocationCount++ invocationCount++

View File

@ -9,6 +9,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/zrepl/zrepl/daemon/job/dosnapshot"
"github.com/zrepl/zrepl/daemon/logging/trace" "github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/config"
@ -210,6 +211,10 @@ func syncUp(a args, u updater) state {
return u(func(s *Snapper) { return u(func(s *Snapper) {
s.state = Planning s.state = Planning
}).sf() }).sf()
case <-dosnapshot.Wait(a.ctx):
return u(func(s *Snapper) {
s.state = Planning
}).sf()
case <-a.ctx.Done(): case <-a.ctx.Done():
return onMainCtxDone(a.ctx, u) return onMainCtxDone(a.ctx, u)
} }
@ -378,6 +383,10 @@ func wait(a args, u updater) state {
return u(func(snapper *Snapper) { return u(func(snapper *Snapper) {
snapper.state = Planning snapper.state = Planning
}).sf() }).sf()
case <-dosnapshot.Wait(a.ctx):
return u(func(snapper *Snapper) {
snapper.state = Planning
}).sf()
case <-a.ctx.Done(): case <-a.ctx.Done():
return onMainCtxDone(a.ctx, u) return onMainCtxDone(a.ctx, u)
} }

View File

@ -18,9 +18,9 @@ type PeriodicOrManual struct {
s *Snapper s *Snapper
} }
func (s *PeriodicOrManual) Run(ctx context.Context, wakeUpCommon chan<- struct{}) { func (s *PeriodicOrManual) Run(ctx context.Context, replicationCommon chan<- struct{}) {
if s.s != nil { if s.s != nil {
s.s.Run(ctx, wakeUpCommon) s.s.Run(ctx, replicationCommon)
} }
} }

View File

@ -27,6 +27,11 @@ We use the following annotations for classifying changes:
* |bugfix| Change that fixes a bug, no regressions or incompatibilities expected. * |bugfix| Change that fixes a bug, no regressions or incompatibilities expected.
* |docs| Change to the documentation. * |docs| Change to the documentation.
0.4.0
-----
* |break| Change syntax to trigger a job replication, rename ``zrepl signal wakeup JOB`` to ``zrepl signal replication JOB``
0.3.1 0.3.1
----- -----

View File

@ -78,7 +78,7 @@ Job Type ``pull``
``$root_fs/$source_path`` ``$root_fs/$source_path``
* - ``interval`` * - ``interval``
- | Interval at which to pull from the source job (e.g. ``10m``). - | Interval at which to pull from the source job (e.g. ``10m``).
| ``manual`` disables periodic pulling, replication then only happens on :ref:`wakeup <cli-signal-wakeup>`. | ``manual`` disables periodic pulling, replication then only happens on :ref:`replication <cli-signal-replication>`.
* - ``pruning`` * - ``pruning``
- |pruning-spec| - |pruning-spec|

View File

@ -15,7 +15,7 @@ A filesystem that does not have snapshots by the snapshotter has lower priority
For ``push`` jobs, replication is automatically triggered after all filesystems have been snapshotted. For ``push`` jobs, replication is automatically triggered after all filesystems have been snapshotted.
Note that the ``zrepl signal wakeup JOB`` subcommand does not trigger snapshotting. Note that the ``zrepl signal replication JOB`` subcommand does not trigger snapshotting.
:: ::
@ -38,7 +38,7 @@ There is also a ``manual`` snapshotting type, which covers the following use cas
* Existing infrastructure for automatic snapshots: you only want to use this zrepl job for replication. * Existing infrastructure for automatic snapshots: you only want to use this zrepl job for replication.
* Handling snapshotting through a separate ``snap`` job. * Handling snapshotting through a separate ``snap`` job.
Note that you will have to trigger replication manually using the ``zrepl signal wakeup JOB`` subcommand in that case. Note that you will have to trigger replication manually using the ``zrepl signal replication JOB`` subcommand in that case.
:: ::

View File

@ -55,8 +55,8 @@ Watch it Work
============= =============
Run ``zrepl status`` on the active side of the replication setup to monitor snaphotting, replication and pruning activity. Run ``zrepl status`` on the active side of the replication setup to monitor snaphotting, replication and pruning activity.
To re-trigger replication (snapshots are separate!), use ``zrepl signal wakeup JOBNAME``. To re-trigger replication (snapshots are separate!), use ``zrepl signal replication JOBNAME``.
(refer to the example use case document if you are uncertain which job you want to wake up). (refer to the example use case document if you are uncertain which job you want to start replication).
You can also use basic UNIX tools to inspect see what's going on. You can also use basic UNIX tools to inspect see what's going on.
If you like tmux, here is a handy script that works on FreeBSD: :: If you like tmux, here is a handy script that works on FreeBSD: ::

View File

@ -13,7 +13,7 @@ CLI Overview
The zrepl binary is self-documenting: The zrepl binary is self-documenting:
run ``zrepl help`` for an overview of the available subcommands or ``zrepl SUBCOMMAND --help`` for information on available flags, etc. run ``zrepl help`` for an overview of the available subcommands or ``zrepl SUBCOMMAND --help`` for information on available flags, etc.
.. _cli-signal-wakeup: .. _cli-signal-replication:
.. list-table:: .. list-table::
:widths: 30 70 :widths: 30 70
@ -29,7 +29,7 @@ CLI Overview
- show job activity, or with ``--raw`` for JSON output - show job activity, or with ``--raw`` for JSON output
* - ``zrepl stdinserver`` * - ``zrepl stdinserver``
- see :ref:`transport-ssh+stdinserver` - see :ref:`transport-ssh+stdinserver`
* - ``zrepl signal wakeup JOB`` * - ``zrepl signal replication JOB``
- manually trigger replication + pruning of JOB - manually trigger replication + pruning of JOB
* - ``zrepl signal reset JOB`` * - ``zrepl signal reset JOB``
- manually abort current replication + pruning of JOB - manually abort current replication + pruning of JOB