cmd/jobrun: repeat strategies as part of jobrun

This commit is contained in:
Christian Schwarz 2017-06-09 21:00:28 +02:00
parent 93d098162e
commit af2aa9dfe1
5 changed files with 96 additions and 23 deletions

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"github.com/jinzhu/copier" "github.com/jinzhu/copier"
"github.com/mitchellh/mapstructure" "github.com/mitchellh/mapstructure"
"github.com/zrepl/zrepl/jobrun"
"github.com/zrepl/zrepl/rpc" "github.com/zrepl/zrepl/rpc"
"github.com/zrepl/zrepl/sshbytestream" "github.com/zrepl/zrepl/sshbytestream"
. "github.com/zrepl/zrepl/util" . "github.com/zrepl/zrepl/util"
@ -13,6 +14,7 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"strings" "strings"
"time"
) )
type Pool struct { type Pool struct {
@ -42,12 +44,15 @@ type Push struct {
To *Pool To *Pool
Filter zfs.DatasetMapping Filter zfs.DatasetMapping
InitialReplPolicy rpc.InitialReplPolicy InitialReplPolicy rpc.InitialReplPolicy
RepeatStrategy jobrun.RepeatStrategy
} }
type Pull struct { type Pull struct {
From *Pool From *Pool
Mapping zfs.DatasetMapping Mapping zfs.DatasetMapping
InitialReplPolicy rpc.InitialReplPolicy InitialReplPolicy rpc.InitialReplPolicy
RepeatStrategy jobrun.RepeatStrategy
} }
type ClientMapping struct { type ClientMapping struct {
From string From string
Mapping zfs.DatasetMapping Mapping zfs.DatasetMapping
@ -171,6 +176,7 @@ func parsePushs(v interface{}, pl poolLookup) (p []Push, err error) {
To string To string
Filter map[string]string Filter map[string]string
InitialReplPolicy string InitialReplPolicy string
Repeat map[string]string
}, 0) }, 0)
if err = mapstructure.Decode(v, &asList); err != nil { if err = mapstructure.Decode(v, &asList); err != nil {
@ -197,6 +203,10 @@ func parsePushs(v interface{}, pl poolLookup) (p []Push, err error) {
return return
} }
if push.RepeatStrategy, err = parseRepeatStrategy(e.Repeat); err != nil {
return
}
p[i] = push p[i] = push
} }
@ -209,6 +219,7 @@ func parsePulls(v interface{}, pl poolLookup) (p []Pull, err error) {
From string From string
Mapping map[string]string Mapping map[string]string
InitialReplPolicy string InitialReplPolicy string
Repeat map[string]string
}, 0) }, 0)
if err = mapstructure.Decode(v, &asList); err != nil { if err = mapstructure.Decode(v, &asList); err != nil {
@ -241,6 +252,9 @@ func parsePulls(v interface{}, pl poolLookup) (p []Pull, err error) {
if pull.InitialReplPolicy, err = parseInitialReplPolicy(e.InitialReplPolicy, rpc.DEFAULT_INITIAL_REPL_POLICY); err != nil { if pull.InitialReplPolicy, err = parseInitialReplPolicy(e.InitialReplPolicy, rpc.DEFAULT_INITIAL_REPL_POLICY); err != nil {
return return
} }
if pull.RepeatStrategy, err = parseRepeatStrategy(e.Repeat); err != nil {
return
}
p[i] = pull p[i] = pull
} }
@ -272,6 +286,25 @@ err:
return return
} }
func parseRepeatStrategy(r map[string]string) (s jobrun.RepeatStrategy, err error) {
if r == nil {
return jobrun.NoRepeatStrategy{}, nil
}
if repeatStr, ok := r["interval"]; ok {
d, err := time.ParseDuration(repeatStr)
if err != nil {
return nil, err
}
s = &jobrun.PeriodicRepeatStrategy{d}
return s, err
} else {
return nil, fmt.Errorf("attribute 'interval' not found but required in repeat specification")
}
}
func expectList(v interface{}) (asList []interface{}, err error) { func expectList(v interface{}) (asList []interface{}, err error) {
var ok bool var ok bool
if asList, ok = v.([]interface{}); !ok { if asList, ok = v.([]interface{}); !ok {

View File

@ -158,9 +158,8 @@ func cmdRun(c *cli.Context) error {
i := 0 i := 0
for _, pull := range conf.Pulls { for _, pull := range conf.Pulls {
jobs[i] = jobrun.Job{ jobs[i] = jobrun.Job{
Name: fmt.Sprintf("pull%d", i), Name: fmt.Sprintf("pull.%d", i),
Interval: time.Duration(5 * time.Second), RepeatStrategy: pull.RepeatStrategy,
Repeats: true,
RunFunc: func(log jobrun.Logger) error { RunFunc: func(log jobrun.Logger) error {
log.Printf("doing pull: %v", pull) log.Printf("doing pull: %v", pull)
return jobPull(pull, c, log) return jobPull(pull, c, log)
@ -170,9 +169,8 @@ func cmdRun(c *cli.Context) error {
} }
for _, push := range conf.Pushs { for _, push := range conf.Pushs {
jobs[i] = jobrun.Job{ jobs[i] = jobrun.Job{
Name: fmt.Sprintf("push%d", i), Name: fmt.Sprintf("push.%d", i),
Interval: time.Duration(5 * time.Second), RepeatStrategy: push.RepeatStrategy,
Repeats: true,
RunFunc: func(log jobrun.Logger) error { RunFunc: func(log jobrun.Logger) error {
log.Printf("doing push: %v", push) log.Printf("doing push: %v", push)
return jobPush(push, c, log) return jobPush(push, c, log)

View File

@ -23,6 +23,8 @@ pulls:
# local replication, only allowed in pull mode # local replication, only allowed in pull mode
# the from name 'local' is reserved for this purpose # the from name 'local' is reserved for this purpose
- from: local - from: local
repeat:
interval: 15m
mapping: { mapping: {
"tank/usr/home":"mirrorpool/foo/bar" "tank/usr/home":"mirrorpool/foo/bar"
} }

View File

@ -19,12 +19,24 @@ func (l jobLogger) Printf(format string, v ...interface{}) {
} }
type Job struct { type Job struct {
Name string Name string
RunFunc func(log Logger) (err error) RunFunc func(log Logger) (err error)
LastStart time.Time LastStart time.Time
LastError error LastError error
Interval time.Duration DueAt time.Time
Repeats bool RepeatStrategy RepeatStrategy
}
type JobRunResult struct {
Start time.Time
Finish time.Time
Error error
}
func (r JobRunResult) RunTime() time.Duration { return r.Finish.Sub(r.Start) }
type RepeatStrategy interface {
ShouldReschedule(lastResult JobRunResult) (nextDue time.Time, reschedule bool)
} }
type JobRunner struct { type JobRunner struct {
@ -78,15 +90,20 @@ loop:
case finishedJob := <-r.finishedJobChan: case finishedJob := <-r.finishedJobChan:
runTime := time.Since(finishedJob.LastStart) delete(r.running, finishedJob.Name)
r.logger.Printf("[%s] finished after %v\n", finishedJob.Name, runTime) res := JobRunResult{
if runTime > finishedJob.Interval { Start: finishedJob.LastStart,
r.logger.Printf("[%s] job exceeded interval of %v\n", finishedJob.Name, finishedJob.Interval) Finish: time.Now(),
Error: finishedJob.LastError,
} }
delete(r.running, finishedJob.Name) r.logger.Printf("[%s] finished after %s\n", finishedJob.Name, res.RunTime())
if finishedJob.Repeats {
dueTime, resched := finishedJob.RepeatStrategy.ShouldReschedule(res)
if resched {
r.logger.Printf("[%s] rescheduling to %s", dueTime)
finishedJob.DueAt = dueTime
r.pending[finishedJob.Name] = finishedJob r.pending[finishedJob.Name] = finishedJob
} }
@ -108,11 +125,9 @@ loop:
for jobName, job := range r.pending { for jobName, job := range r.pending {
jobDueTime := job.LastStart.Add(job.Interval) if job.DueAt.After(time.Now()) {
if job.DueAt.Before(nextJobDue) {
if jobDueTime.After(time.Now()) { nextJobDue = job.DueAt
if jobDueTime.Before(nextJobDue) {
nextJobDue = jobDueTime
} }
jobPending = true jobPending = true
continue continue

View File

@ -0,0 +1,25 @@
package jobrun
import (
"time"
)
type NoRepeatStrategy struct{}
func (s NoRepeatStrategy) ShouldReschedule(lastResult JobRunResult) (time.Time, bool) {
return time.Time{}, false
}
type PeriodicRepeatStrategy struct {
Interval time.Duration
}
func (s *PeriodicRepeatStrategy) ShouldReschedule(lastResult JobRunResult) (next time.Time, shouldRun bool) {
// Don't care about the result
shouldRun = true
next = lastResult.Start.Add(s.Interval)
if next.Before(time.Now()) {
next = time.Now()
}
return
}