signal decoupling wakeup snapshot prune reset

This commit is contained in:
Calistoc 2021-04-07 11:59:18 +02:00
parent 42a8484980
commit 6b87589aa1
13 changed files with 135 additions and 23 deletions

View File

@ -11,8 +11,8 @@ import (
)
var SignalCmd = &cli.Subcommand{
Use: "signal [wakeup|reset|snapshot] JOB",
Short: "wake up a job from wait state, abort its current invocation, run a snapshot job",
Use: "signal [wakeup|snapshot|prune|reset] JOB",
Short: "wake up a job from wait state, run a snapshot job, run a prune job, abort its current invocation",
Run: func(ctx context.Context, subcommand *cli.Subcommand, args []string) error {
return runSignalCmd(subcommand.Config(), args)
},
@ -20,7 +20,7 @@ var SignalCmd = &cli.Subcommand{
func runSignalCmd(config *config.Config, args []string) error {
if len(args) != 2 {
return errors.Errorf("Expected 2 arguments: [wakeup|reset|snapshot] JOB")
return errors.Errorf("Expected 2 arguments: [wakeup|snapshot|prune|reset] JOB")
}
httpc, err := controlHttpClient(config.Global.Control.SockPath)

View File

@ -63,6 +63,10 @@ func (c *Client) SignalSnapshot(job string) error {
return c.signal(job, "snapshot")
}
func (c *Client) SignalPrune(job string) error {
return c.signal(job, "prune")
}
func (c *Client) SignalReset(job string) error {
return c.signal(job, "reset")
}

View File

@ -21,6 +21,7 @@ type Client interface {
StatusRaw() ([]byte, error)
SignalWakeup(job string) error
SignalSnapshot(job string) error
SignalPrune(job string) error
SignalReset(job string) error
}

View File

@ -281,8 +281,8 @@ func interactive(c Client, flag statusFlags) error {
if !ok {
return nil
}
signals := []string{"wakeup", "snapshot", "reset"}
clientFuncs := []func(job string) error{c.SignalWakeup, c.SignalSnapshot, c.SignalReset}
signals := []string{"wakeup", "snapshot", "prune", "reset"}
clientFuncs := []func(job string) error{c.SignalWakeup, c.SignalSnapshot, c.SignalPrune, c.SignalReset}
sigMod := tview.NewModal()
sigMod.SetBackgroundColor(tcell.ColorDefault)
sigMod.SetBorder(true)

View File

@ -149,6 +149,8 @@ func (j *controlJob) Run(ctx context.Context) {
err = j.jobs.reset(req.Name)
case "snapshot":
err = j.jobs.dosnapshot(req.Name)
case "prune":
err = j.jobs.doprune(req.Name)
default:
err = fmt.Errorf("operation %q is invalid", req.Op)
}

View File

@ -20,6 +20,7 @@ import (
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/job"
"github.com/zrepl/zrepl/daemon/job/doprune"
"github.com/zrepl/zrepl/daemon/job/dosnapshot"
"github.com/zrepl/zrepl/daemon/job/reset"
"github.com/zrepl/zrepl/daemon/job/wakeup"
@ -136,6 +137,7 @@ type jobs struct {
wakeups map[string]wakeup.Func // by Job.Name
resets map[string]reset.Func // by Job.Name
dosnapshots map[string]dosnapshot.Func // by Job.Name
doprunes map[string]doprune.Func // by Job.Name
jobs map[string]job.Job
}
@ -144,6 +146,7 @@ func newJobs() *jobs {
wakeups: make(map[string]wakeup.Func),
resets: make(map[string]reset.Func),
dosnapshots: make(map[string]dosnapshot.Func),
doprunes: make(map[string]doprune.Func),
jobs: make(map[string]job.Job),
}
}
@ -226,6 +229,17 @@ func (s *jobs) dosnapshot(job string) error {
return wu()
}
func (s *jobs) doprune(job string) error {
s.m.RLock()
defer s.m.RUnlock()
wu, ok := s.doprunes[job]
if !ok {
return errors.Errorf("Job %s does not exist", job)
}
return wu()
}
const (
jobNamePrometheus = "_prometheus"
jobNameControl = "_control"
@ -259,9 +273,11 @@ func (s *jobs) start(ctx context.Context, j job.Job, internal bool) {
ctx, wakeup := wakeup.Context(ctx)
ctx, resetFunc := reset.Context(ctx)
ctx, dosnapshotFunc := dosnapshot.Context(ctx)
ctx, dopruneFunc := doprune.Context(ctx)
s.wakeups[jobName] = wakeup
s.resets[jobName] = resetFunc
s.dosnapshots[jobName] = dosnapshotFunc
s.doprunes[jobName] = dopruneFunc
s.wg.Add(1)
go func() {

View File

@ -11,15 +11,16 @@ import (
"github.com/prometheus/common/log"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/replication"
"github.com/zrepl/zrepl/util/envconst"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/job/doprune"
"github.com/zrepl/zrepl/daemon/job/reset"
"github.com/zrepl/zrepl/daemon/job/wakeup"
"github.com/zrepl/zrepl/daemon/pruner"
"github.com/zrepl/zrepl/daemon/snapper"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/replication"
"github.com/zrepl/zrepl/replication/driver"
"github.com/zrepl/zrepl/replication/logic"
"github.com/zrepl/zrepl/replication/report"
@ -441,14 +442,28 @@ outer:
log.WithError(ctx.Err()).Info("context")
break outer
case <-doprune.Wait(ctx):
invocationCount++
invocationCtx, endSpan := trace.WithSpan(ctx, fmt.Sprintf("invocation-%d", invocationCount))
j.doPrune(invocationCtx)
endSpan()
case <-wakeup.Wait(ctx):
j.mode.ResetConnectBackoff()
invocationCount++
invocationCtx, endSpan := trace.WithSpan(ctx, fmt.Sprintf("invocation-%d", invocationCount))
j.do(invocationCtx)
endSpan()
case <-periodicDone:
invocationCount++
invocationCtx, endSpan := trace.WithSpan(ctx, fmt.Sprintf("invocation-%d", invocationCount))
j.do(invocationCtx)
j.doPrune(invocationCtx)
endSpan()
}
invocationCount++
invocationCtx, endSpan := trace.WithSpan(ctx, fmt.Sprintf("invocation-%d", invocationCount))
j.do(invocationCtx)
endSpan()
}
}
@ -482,7 +497,9 @@ func (j *ActiveSide) do(ctx context.Context) {
var repWait driver.WaitFunc
j.updateTasks(func(tasks *activeSideTasks) {
// reset it
*tasks = activeSideTasks{}
//*tasks = activeSideTasks{} // reset Pruning Sender/Receiver: in zrepl status
tasks.replicationReport = nil // just reset the Replication:
tasks.replicationCancel = func() { repCancel(); endSpan() }
tasks.replicationReport, repWait = replication.Do(
ctx, j.replicationDriverConfig, logic.NewPlanner(j.promRepStateSecs, j.promBytesReplicated, sender, receiver, j.mode.PlannerPolicy()),
@ -498,6 +515,29 @@ func (j *ActiveSide) do(ctx context.Context) {
endSpan()
}
j.updateTasks(func(tasks *activeSideTasks) {
tasks.state = ActiveSideDone
})
}
func (j *ActiveSide) doPrune(ctx context.Context) {
j.mode.ConnectEndpoints(ctx, j.connecter)
defer j.mode.DisconnectEndpoints()
// allow cancellation of an invocation (this function)
ctx, cancelThisRun := context.WithCancel(ctx)
defer cancelThisRun()
go func() {
select {
case <-reset.Wait(ctx):
log.Info("reset received, cancelling current invocation")
cancelThisRun()
case <-ctx.Done():
}
}()
sender, receiver := j.mode.SenderReceiver()
{
select {

View File

@ -0,0 +1,35 @@
package doprune
import (
"context"
"errors"
)
type contextKey int
const contextKeyDoprune contextKey = iota
func Wait(ctx context.Context) <-chan struct{} {
wc, ok := ctx.Value(contextKeyDoprune).(chan struct{})
if !ok {
wc = make(chan struct{})
}
return wc
}
type Func func() error
var AlreadyDoprune = errors.New("Cannot start pruning")
func Context(ctx context.Context) (context.Context, Func) {
wc := make(chan struct{})
wuf := func() error {
select {
case wc <- struct{}{}:
return nil
default:
return AlreadyDoprune
}
}
return context.WithValue(ctx, contextKeyDoprune, wc), wuf
}

View File

@ -19,7 +19,7 @@ func Wait(ctx context.Context) <-chan struct{} {
type Func func() error
var AlreadyDosnapshot = errors.New("already snapshotting")
var AlreadyDosnapshot = errors.New("Cannot start snapshotting")
func Context(ctx context.Context) (context.Context, Func) {
wc := make(chan struct{})

View File

@ -19,7 +19,7 @@ func Wait(ctx context.Context) <-chan struct{} {
type Func func() error
var AlreadyReset = errors.New("already reset")
var AlreadyReset = errors.New("Cannot reset")
func Context(ctx context.Context) (context.Context, Func) {
wc := make(chan struct{})

View File

@ -14,7 +14,7 @@ import (
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/filters"
"github.com/zrepl/zrepl/daemon/job/wakeup"
"github.com/zrepl/zrepl/daemon/job/doprune"
"github.com/zrepl/zrepl/daemon/pruner"
"github.com/zrepl/zrepl/daemon/snapper"
"github.com/zrepl/zrepl/endpoint"
@ -118,7 +118,7 @@ outer:
log.WithError(ctx.Err()).Info("context")
break outer
case <-wakeup.Wait(ctx):
case <-doprune.Wait(ctx):
case <-periodicDone:
}
invocationCount++

View File

@ -19,7 +19,7 @@ func Wait(ctx context.Context) <-chan struct{} {
type Func func() error
var AlreadyWokenUp = errors.New("already woken up")
var AlreadyWokenUp = errors.New("Cannot wakeup")
func Context(ctx context.Context) (context.Context, Func) {
wc := make(chan struct{})

View File

@ -62,6 +62,8 @@ type Snapper struct {
mtx sync.Mutex
state State
signal_dosnapshot bool
// set in state Plan, used in Waiting
lastInvocation time.Time
@ -210,10 +212,12 @@ func syncUp(a args, u updater) state {
case <-t.C:
return u(func(s *Snapper) {
s.state = Planning
s.signal_dosnapshot = false
}).sf()
case <-dosnapshot.Wait(a.ctx):
return u(func(s *Snapper) {
s.state = Planning
s.signal_dosnapshot = true
}).sf()
case <-a.ctx.Done():
return onMainCtxDone(a.ctx, u)
@ -329,11 +333,19 @@ func snapshot(a args, u updater) state {
})
}
select {
case a.snapshotsTaken <- struct{}{}:
default:
if a.snapshotsTaken != nil {
getLogger(a.ctx).Warn("callback channel is full, discarding snapshot update event")
var signal_dosnapshot bool
u(func(snapper *Snapper) {
signal_dosnapshot = snapper.signal_dosnapshot
})
if !signal_dosnapshot {
select {
// this will start Replication & Pruning
case a.snapshotsTaken <- struct{}{}:
default:
if a.snapshotsTaken != nil {
getLogger(a.ctx).Warn("callback channel is full, discarding snapshot update event")
}
}
}
@ -382,10 +394,12 @@ func wait(a args, u updater) state {
case <-t.C:
return u(func(snapper *Snapper) {
snapper.state = Planning
snapper.signal_dosnapshot = false
}).sf()
case <-dosnapshot.Wait(a.ctx):
return u(func(snapper *Snapper) {
snapper.state = Planning
return u(func(s *Snapper) {
s.state = Planning
s.signal_dosnapshot = true
}).sf()
case <-a.ctx.Done():
return onMainCtxDone(a.ctx, u)