diff --git a/client/signal.go b/client/signal.go index a288d4d..025af15 100644 --- a/client/signal.go +++ b/client/signal.go @@ -11,8 +11,8 @@ import ( ) var SignalCmd = &cli.Subcommand{ - Use: "signal [wakeup|reset|snapshot] JOB", - Short: "wake up a job from wait state, abort its current invocation, run a snapshot job", + Use: "signal [wakeup|snapshot|prune|reset] JOB", + Short: "wake up a job from wait state, run a snapshot job, run a prune job, abort its current invocation", Run: func(ctx context.Context, subcommand *cli.Subcommand, args []string) error { return runSignalCmd(subcommand.Config(), args) }, @@ -20,7 +20,7 @@ var SignalCmd = &cli.Subcommand{ func runSignalCmd(config *config.Config, args []string) error { if len(args) != 2 { - return errors.Errorf("Expected 2 arguments: [wakeup|reset|snapshot] JOB") + return errors.Errorf("Expected 2 arguments: [wakeup|snapshot|prune|reset] JOB") } httpc, err := controlHttpClient(config.Global.Control.SockPath) diff --git a/client/status/client/client.go b/client/status/client/client.go index 12f58b6..739163d 100644 --- a/client/status/client/client.go +++ b/client/status/client/client.go @@ -63,6 +63,10 @@ func (c *Client) SignalSnapshot(job string) error { return c.signal(job, "snapshot") } +func (c *Client) SignalPrune(job string) error { + return c.signal(job, "prune") +} + func (c *Client) SignalReset(job string) error { return c.signal(job, "reset") } diff --git a/client/status/status.go b/client/status/status.go index 9173a43..004d371 100644 --- a/client/status/status.go +++ b/client/status/status.go @@ -21,6 +21,7 @@ type Client interface { StatusRaw() ([]byte, error) SignalWakeup(job string) error SignalSnapshot(job string) error + SignalPrune(job string) error SignalReset(job string) error } diff --git a/client/status/status_interactive.go b/client/status/status_interactive.go index 01389b1..8d006a5 100644 --- a/client/status/status_interactive.go +++ b/client/status/status_interactive.go @@ -281,8 +281,8 @@ func interactive(c Client, flag statusFlags) error { if !ok { return nil } - signals := []string{"wakeup", "snapshot", "reset"} - clientFuncs := []func(job string) error{c.SignalWakeup, c.SignalSnapshot, c.SignalReset} + signals := []string{"wakeup", "snapshot", "prune", "reset"} + clientFuncs := []func(job string) error{c.SignalWakeup, c.SignalSnapshot, c.SignalPrune, c.SignalReset} sigMod := tview.NewModal() sigMod.SetBackgroundColor(tcell.ColorDefault) sigMod.SetBorder(true) diff --git a/daemon/control.go b/daemon/control.go index a3c1b8a..3fd71d2 100644 --- a/daemon/control.go +++ b/daemon/control.go @@ -149,6 +149,8 @@ func (j *controlJob) Run(ctx context.Context) { err = j.jobs.reset(req.Name) case "snapshot": err = j.jobs.dosnapshot(req.Name) + case "prune": + err = j.jobs.doprune(req.Name) default: err = fmt.Errorf("operation %q is invalid", req.Op) } diff --git a/daemon/daemon.go b/daemon/daemon.go index 785a9f6..ad83464 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -20,6 +20,7 @@ import ( "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon/job" + "github.com/zrepl/zrepl/daemon/job/doprune" "github.com/zrepl/zrepl/daemon/job/dosnapshot" "github.com/zrepl/zrepl/daemon/job/reset" "github.com/zrepl/zrepl/daemon/job/wakeup" @@ -136,6 +137,7 @@ type jobs struct { wakeups map[string]wakeup.Func // by Job.Name resets map[string]reset.Func // by Job.Name dosnapshots map[string]dosnapshot.Func // by Job.Name + doprunes map[string]doprune.Func // by Job.Name jobs map[string]job.Job } @@ -144,6 +146,7 @@ func newJobs() *jobs { wakeups: make(map[string]wakeup.Func), resets: make(map[string]reset.Func), dosnapshots: make(map[string]dosnapshot.Func), + doprunes: make(map[string]doprune.Func), jobs: make(map[string]job.Job), } } @@ -226,6 +229,17 @@ func (s *jobs) dosnapshot(job string) error { return wu() } +func (s *jobs) doprune(job string) error { + s.m.RLock() + defer s.m.RUnlock() + + wu, ok := s.doprunes[job] + if !ok { + return errors.Errorf("Job %s does not exist", job) + } + return wu() +} + const ( jobNamePrometheus = "_prometheus" jobNameControl = "_control" @@ -259,9 +273,11 @@ func (s *jobs) start(ctx context.Context, j job.Job, internal bool) { ctx, wakeup := wakeup.Context(ctx) ctx, resetFunc := reset.Context(ctx) ctx, dosnapshotFunc := dosnapshot.Context(ctx) + ctx, dopruneFunc := doprune.Context(ctx) s.wakeups[jobName] = wakeup s.resets[jobName] = resetFunc s.dosnapshots[jobName] = dosnapshotFunc + s.doprunes[jobName] = dopruneFunc s.wg.Add(1) go func() { diff --git a/daemon/job/active.go b/daemon/job/active.go index cc6cbc1..692246f 100644 --- a/daemon/job/active.go +++ b/daemon/job/active.go @@ -11,15 +11,16 @@ import ( "github.com/prometheus/common/log" "github.com/zrepl/zrepl/daemon/logging/trace" + "github.com/zrepl/zrepl/replication" "github.com/zrepl/zrepl/util/envconst" "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/daemon/job/doprune" "github.com/zrepl/zrepl/daemon/job/reset" "github.com/zrepl/zrepl/daemon/job/wakeup" "github.com/zrepl/zrepl/daemon/pruner" "github.com/zrepl/zrepl/daemon/snapper" "github.com/zrepl/zrepl/endpoint" - "github.com/zrepl/zrepl/replication" "github.com/zrepl/zrepl/replication/driver" "github.com/zrepl/zrepl/replication/logic" "github.com/zrepl/zrepl/replication/report" @@ -441,14 +442,28 @@ outer: log.WithError(ctx.Err()).Info("context") break outer + case <-doprune.Wait(ctx): + invocationCount++ + invocationCtx, endSpan := trace.WithSpan(ctx, fmt.Sprintf("invocation-%d", invocationCount)) + j.doPrune(invocationCtx) + endSpan() + case <-wakeup.Wait(ctx): j.mode.ResetConnectBackoff() + + invocationCount++ + invocationCtx, endSpan := trace.WithSpan(ctx, fmt.Sprintf("invocation-%d", invocationCount)) + j.do(invocationCtx) + endSpan() + case <-periodicDone: + invocationCount++ + invocationCtx, endSpan := trace.WithSpan(ctx, fmt.Sprintf("invocation-%d", invocationCount)) + j.do(invocationCtx) + j.doPrune(invocationCtx) + endSpan() } - invocationCount++ - invocationCtx, endSpan := trace.WithSpan(ctx, fmt.Sprintf("invocation-%d", invocationCount)) - j.do(invocationCtx) - endSpan() + } } @@ -482,7 +497,9 @@ func (j *ActiveSide) do(ctx context.Context) { var repWait driver.WaitFunc j.updateTasks(func(tasks *activeSideTasks) { // reset it - *tasks = activeSideTasks{} + //*tasks = activeSideTasks{} // reset Pruning Sender/Receiver: in zrepl status + tasks.replicationReport = nil // just reset the Replication: + tasks.replicationCancel = func() { repCancel(); endSpan() } tasks.replicationReport, repWait = replication.Do( ctx, j.replicationDriverConfig, logic.NewPlanner(j.promRepStateSecs, j.promBytesReplicated, sender, receiver, j.mode.PlannerPolicy()), @@ -498,6 +515,29 @@ func (j *ActiveSide) do(ctx context.Context) { endSpan() } + j.updateTasks(func(tasks *activeSideTasks) { + tasks.state = ActiveSideDone + }) +} + +func (j *ActiveSide) doPrune(ctx context.Context) { + + j.mode.ConnectEndpoints(ctx, j.connecter) + defer j.mode.DisconnectEndpoints() + + // allow cancellation of an invocation (this function) + ctx, cancelThisRun := context.WithCancel(ctx) + defer cancelThisRun() + go func() { + select { + case <-reset.Wait(ctx): + log.Info("reset received, cancelling current invocation") + cancelThisRun() + case <-ctx.Done(): + } + }() + + sender, receiver := j.mode.SenderReceiver() { select { diff --git a/daemon/job/doprune/doprune.go b/daemon/job/doprune/doprune.go new file mode 100644 index 0000000..266a99a --- /dev/null +++ b/daemon/job/doprune/doprune.go @@ -0,0 +1,35 @@ +package doprune + +import ( + "context" + "errors" +) + +type contextKey int + +const contextKeyDoprune contextKey = iota + +func Wait(ctx context.Context) <-chan struct{} { + wc, ok := ctx.Value(contextKeyDoprune).(chan struct{}) + if !ok { + wc = make(chan struct{}) + } + return wc +} + +type Func func() error + +var AlreadyDoprune = errors.New("Cannot start pruning") + +func Context(ctx context.Context) (context.Context, Func) { + wc := make(chan struct{}) + wuf := func() error { + select { + case wc <- struct{}{}: + return nil + default: + return AlreadyDoprune + } + } + return context.WithValue(ctx, contextKeyDoprune, wc), wuf +} diff --git a/daemon/job/dosnapshot/dosnapshot.go b/daemon/job/dosnapshot/dosnapshot.go index aa9bfef..b90003b 100644 --- a/daemon/job/dosnapshot/dosnapshot.go +++ b/daemon/job/dosnapshot/dosnapshot.go @@ -19,7 +19,7 @@ func Wait(ctx context.Context) <-chan struct{} { type Func func() error -var AlreadyDosnapshot = errors.New("already snapshotting") +var AlreadyDosnapshot = errors.New("Cannot start snapshotting") func Context(ctx context.Context) (context.Context, Func) { wc := make(chan struct{}) diff --git a/daemon/job/reset/reset.go b/daemon/job/reset/reset.go index b7322ed..c8d4fba 100644 --- a/daemon/job/reset/reset.go +++ b/daemon/job/reset/reset.go @@ -19,7 +19,7 @@ func Wait(ctx context.Context) <-chan struct{} { type Func func() error -var AlreadyReset = errors.New("already reset") +var AlreadyReset = errors.New("Cannot reset") func Context(ctx context.Context) (context.Context, Func) { wc := make(chan struct{}) diff --git a/daemon/job/snapjob.go b/daemon/job/snapjob.go index ea446ba..ba73ecf 100644 --- a/daemon/job/snapjob.go +++ b/daemon/job/snapjob.go @@ -14,7 +14,7 @@ import ( "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon/filters" - "github.com/zrepl/zrepl/daemon/job/wakeup" + "github.com/zrepl/zrepl/daemon/job/doprune" "github.com/zrepl/zrepl/daemon/pruner" "github.com/zrepl/zrepl/daemon/snapper" "github.com/zrepl/zrepl/endpoint" @@ -118,7 +118,7 @@ outer: log.WithError(ctx.Err()).Info("context") break outer - case <-wakeup.Wait(ctx): + case <-doprune.Wait(ctx): case <-periodicDone: } invocationCount++ diff --git a/daemon/job/wakeup/wakeup.go b/daemon/job/wakeup/wakeup.go index a099b53..0b4cf72 100644 --- a/daemon/job/wakeup/wakeup.go +++ b/daemon/job/wakeup/wakeup.go @@ -19,7 +19,7 @@ func Wait(ctx context.Context) <-chan struct{} { type Func func() error -var AlreadyWokenUp = errors.New("already woken up") +var AlreadyWokenUp = errors.New("Cannot wakeup") func Context(ctx context.Context) (context.Context, Func) { wc := make(chan struct{}) diff --git a/daemon/snapper/snapper.go b/daemon/snapper/snapper.go index 7ebb8d8..d7f7348 100644 --- a/daemon/snapper/snapper.go +++ b/daemon/snapper/snapper.go @@ -62,6 +62,8 @@ type Snapper struct { mtx sync.Mutex state State + signal_dosnapshot bool + // set in state Plan, used in Waiting lastInvocation time.Time @@ -210,10 +212,12 @@ func syncUp(a args, u updater) state { case <-t.C: return u(func(s *Snapper) { s.state = Planning + s.signal_dosnapshot = false }).sf() case <-dosnapshot.Wait(a.ctx): return u(func(s *Snapper) { s.state = Planning + s.signal_dosnapshot = true }).sf() case <-a.ctx.Done(): return onMainCtxDone(a.ctx, u) @@ -329,11 +333,19 @@ func snapshot(a args, u updater) state { }) } - select { - case a.snapshotsTaken <- struct{}{}: - default: - if a.snapshotsTaken != nil { - getLogger(a.ctx).Warn("callback channel is full, discarding snapshot update event") + var signal_dosnapshot bool + u(func(snapper *Snapper) { + signal_dosnapshot = snapper.signal_dosnapshot + }) + + if !signal_dosnapshot { + select { + // this will start Replication & Pruning + case a.snapshotsTaken <- struct{}{}: + default: + if a.snapshotsTaken != nil { + getLogger(a.ctx).Warn("callback channel is full, discarding snapshot update event") + } } } @@ -382,10 +394,12 @@ func wait(a args, u updater) state { case <-t.C: return u(func(snapper *Snapper) { snapper.state = Planning + snapper.signal_dosnapshot = false }).sf() case <-dosnapshot.Wait(a.ctx): - return u(func(snapper *Snapper) { - snapper.state = Planning + return u(func(s *Snapper) { + s.state = Planning + s.signal_dosnapshot = true }).sf() case <-a.ctx.Done(): return onMainCtxDone(a.ctx, u)