diff --git a/cmd/config.go b/cmd/config.go index 27f814f..f3b538b 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/jinzhu/copier" "github.com/mitchellh/mapstructure" + "github.com/zrepl/zrepl/jobrun" "github.com/zrepl/zrepl/rpc" "github.com/zrepl/zrepl/sshbytestream" . "github.com/zrepl/zrepl/util" @@ -13,6 +14,7 @@ import ( "io" "io/ioutil" "strings" + "time" ) type Pool struct { @@ -42,12 +44,15 @@ type Push struct { To *Pool Filter zfs.DatasetMapping InitialReplPolicy rpc.InitialReplPolicy + RepeatStrategy jobrun.RepeatStrategy } type Pull struct { From *Pool Mapping zfs.DatasetMapping InitialReplPolicy rpc.InitialReplPolicy + RepeatStrategy jobrun.RepeatStrategy } + type ClientMapping struct { From string Mapping zfs.DatasetMapping @@ -171,6 +176,7 @@ func parsePushs(v interface{}, pl poolLookup) (p []Push, err error) { To string Filter map[string]string InitialReplPolicy string + Repeat map[string]string }, 0) if err = mapstructure.Decode(v, &asList); err != nil { @@ -197,6 +203,10 @@ func parsePushs(v interface{}, pl poolLookup) (p []Push, err error) { return } + if push.RepeatStrategy, err = parseRepeatStrategy(e.Repeat); err != nil { + return + } + p[i] = push } @@ -209,6 +219,7 @@ func parsePulls(v interface{}, pl poolLookup) (p []Pull, err error) { From string Mapping map[string]string InitialReplPolicy string + Repeat map[string]string }, 0) if err = mapstructure.Decode(v, &asList); err != nil { @@ -241,6 +252,9 @@ func parsePulls(v interface{}, pl poolLookup) (p []Pull, err error) { if pull.InitialReplPolicy, err = parseInitialReplPolicy(e.InitialReplPolicy, rpc.DEFAULT_INITIAL_REPL_POLICY); err != nil { return } + if pull.RepeatStrategy, err = parseRepeatStrategy(e.Repeat); err != nil { + return + } p[i] = pull } @@ -272,6 +286,25 @@ err: return } +func parseRepeatStrategy(r map[string]string) (s jobrun.RepeatStrategy, err error) { + + if r == nil { + return jobrun.NoRepeatStrategy{}, nil + } + + if repeatStr, ok := r["interval"]; ok { + d, err := time.ParseDuration(repeatStr) + if err != nil { + return nil, err + } + s = &jobrun.PeriodicRepeatStrategy{d} + return s, err + } else { + return nil, fmt.Errorf("attribute 'interval' not found but required in repeat specification") + } + +} + func expectList(v interface{}) (asList []interface{}, err error) { var ok bool if asList, ok = v.([]interface{}); !ok { diff --git a/cmd/main.go b/cmd/main.go index 9f5c86b..bf91a56 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -158,9 +158,8 @@ func cmdRun(c *cli.Context) error { i := 0 for _, pull := range conf.Pulls { jobs[i] = jobrun.Job{ - Name: fmt.Sprintf("pull%d", i), - Interval: time.Duration(5 * time.Second), - Repeats: true, + Name: fmt.Sprintf("pull.%d", i), + RepeatStrategy: pull.RepeatStrategy, RunFunc: func(log jobrun.Logger) error { log.Printf("doing pull: %v", pull) return jobPull(pull, c, log) @@ -170,9 +169,8 @@ func cmdRun(c *cli.Context) error { } for _, push := range conf.Pushs { jobs[i] = jobrun.Job{ - Name: fmt.Sprintf("push%d", i), - Interval: time.Duration(5 * time.Second), - Repeats: true, + Name: fmt.Sprintf("push.%d", i), + RepeatStrategy: push.RepeatStrategy, RunFunc: func(log jobrun.Logger) error { log.Printf("doing push: %v", push) return jobPush(push, c, log) diff --git a/cmd/sampleconf/zrepl.yml b/cmd/sampleconf/zrepl.yml index b565373..b5a8980 100644 --- a/cmd/sampleconf/zrepl.yml +++ b/cmd/sampleconf/zrepl.yml @@ -23,6 +23,8 @@ pulls: # local replication, only allowed in pull mode # the from name 'local' is reserved for this purpose - from: local + repeat: + interval: 15m mapping: { "tank/usr/home":"mirrorpool/foo/bar" } diff --git a/jobrun/jobrun.go b/jobrun/jobrun.go index c34ea7f..0100e34 100644 --- a/jobrun/jobrun.go +++ b/jobrun/jobrun.go @@ -19,12 +19,24 @@ func (l jobLogger) Printf(format string, v ...interface{}) { } type Job struct { - Name string - RunFunc func(log Logger) (err error) - LastStart time.Time - LastError error - Interval time.Duration - Repeats bool + Name string + RunFunc func(log Logger) (err error) + LastStart time.Time + LastError error + DueAt time.Time + RepeatStrategy RepeatStrategy +} + +type JobRunResult struct { + Start time.Time + Finish time.Time + Error error +} + +func (r JobRunResult) RunTime() time.Duration { return r.Finish.Sub(r.Start) } + +type RepeatStrategy interface { + ShouldReschedule(lastResult JobRunResult) (nextDue time.Time, reschedule bool) } type JobRunner struct { @@ -78,15 +90,20 @@ loop: case finishedJob := <-r.finishedJobChan: - runTime := time.Since(finishedJob.LastStart) + delete(r.running, finishedJob.Name) - r.logger.Printf("[%s] finished after %v\n", finishedJob.Name, runTime) - if runTime > finishedJob.Interval { - r.logger.Printf("[%s] job exceeded interval of %v\n", finishedJob.Name, finishedJob.Interval) + res := JobRunResult{ + Start: finishedJob.LastStart, + Finish: time.Now(), + Error: finishedJob.LastError, } - delete(r.running, finishedJob.Name) - if finishedJob.Repeats { + r.logger.Printf("[%s] finished after %s\n", finishedJob.Name, res.RunTime()) + + dueTime, resched := finishedJob.RepeatStrategy.ShouldReschedule(res) + if resched { + r.logger.Printf("[%s] rescheduling to %s", dueTime) + finishedJob.DueAt = dueTime r.pending[finishedJob.Name] = finishedJob } @@ -108,11 +125,9 @@ loop: for jobName, job := range r.pending { - jobDueTime := job.LastStart.Add(job.Interval) - - if jobDueTime.After(time.Now()) { - if jobDueTime.Before(nextJobDue) { - nextJobDue = jobDueTime + if job.DueAt.After(time.Now()) { + if job.DueAt.Before(nextJobDue) { + nextJobDue = job.DueAt } jobPending = true continue diff --git a/jobrun/repeat_strategies.go b/jobrun/repeat_strategies.go new file mode 100644 index 0000000..cec32d4 --- /dev/null +++ b/jobrun/repeat_strategies.go @@ -0,0 +1,25 @@ +package jobrun + +import ( + "time" +) + +type NoRepeatStrategy struct{} + +func (s NoRepeatStrategy) ShouldReschedule(lastResult JobRunResult) (time.Time, bool) { + return time.Time{}, false +} + +type PeriodicRepeatStrategy struct { + Interval time.Duration +} + +func (s *PeriodicRepeatStrategy) ShouldReschedule(lastResult JobRunResult) (next time.Time, shouldRun bool) { + // Don't care about the result + shouldRun = true + next = lastResult.Start.Add(s.Interval) + if next.Before(time.Now()) { + next = time.Now() + } + return +}