Implement jobrun package, abstraction for cron-like goroutines.

Unlike cron, there is no overtaking though.
This commit is contained in:
Christian Schwarz 2017-04-29 18:26:43 +02:00
parent d9ecfc8eb4
commit 526255a9ef
3 changed files with 178 additions and 10 deletions

View File

@ -85,7 +85,7 @@ func parsePushs(v interface{}) (p []Push, err error) {
p = make([]Push, len(asList))
for _, e := range asList {
for i, e := range asList {
push := Push{
To: e.To,
Datasets: make([]zfs.DatasetPath, len(e.Datasets)),
@ -97,7 +97,7 @@ func parsePushs(v interface{}) (p []Push, err error) {
}
}
p = append(p, push)
p[i] = push
}
return
@ -116,20 +116,20 @@ func parsePulls(v interface{}) (p []Pull, err error) {
p = make([]Pull, len(asList))
for _, e := range asList {
for i, e := range asList {
pull := Pull{
From: e.From,
}
if pull.Mapping, err = parseComboMapping(e.Mapping); err != nil {
return
}
p = append(p, pull)
p[i] = pull
}
return
}
func parseSinks(v interface{}) (s []Sink, err error) {
func parseSinks(v interface{}) (sinks []Sink, err error) {
var asList []interface{}
var ok bool
@ -137,14 +137,14 @@ func parseSinks(v interface{}) (s []Sink, err error) {
return nil, errors.New("expected list")
}
s = make([]Sink, len(asList))
sinks = make([]Sink, len(asList))
for _, i := range asList {
for i, s := range asList {
var sink Sink
if sink, err = parseSink(i); err != nil {
if sink, err = parseSink(s); err != nil {
return
}
s = append(s, sink)
sinks[i] = sink
}
return

View File

@ -4,9 +4,12 @@ import (
"errors"
"fmt"
"github.com/urfave/cli"
"github.com/zrepl/zrepl/jobrun"
"github.com/zrepl/zrepl/rpc"
"github.com/zrepl/zrepl/sshbytestream"
"io"
"sync"
"time"
)
type Role uint
@ -18,6 +21,7 @@ const (
var conf Config
var handler Handler
var runner *jobrun.JobRunner
func main() {
@ -37,6 +41,8 @@ func main() {
return
}
handler = Handler{}
runner = jobrun.NewJobRunner()
return
}
app.Commands = []cli.Command{
@ -73,7 +79,51 @@ func doSink(c *cli.Context) (err error) {
func doRun(c *cli.Context) error {
fmt.Printf("%#v", conf)
// Do every pull, do every push
// Scheduling
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
runner.Start()
}()
for i := range conf.Pulls {
pull := conf.Pulls[i]
j := jobrun.Job{
Name: fmt.Sprintf("pull%d", i),
Interval: time.Duration(5 * time.Second),
Repeats: true,
RunFunc: func() error {
fmt.Printf("%v: %#v\n", time.Now(), pull)
time.Sleep(10 * time.Second)
fmt.Printf("%v: %#v\n", time.Now(), pull)
return nil
},
}
runner.AddJob(j)
}
for i := range conf.Pushs {
push := conf.Pushs[i]
j := jobrun.Job{
Name: fmt.Sprintf("push%d", i),
Interval: time.Duration(5 * time.Second),
Repeats: true,
RunFunc: func() error {
fmt.Printf("%v: %#v\n", time.Now(), push)
return nil
},
}
runner.AddJob(j)
}
wg.Wait()
return nil
}

118
jobrun/jobrun.go Normal file
View File

@ -0,0 +1,118 @@
package jobrun
import (
"fmt"
"time"
)
type Job struct {
Name string
RunFunc func() (err error)
LastStart time.Time
Interval time.Duration
Repeats bool
}
type JobRunner struct {
newJobChan chan Job
finishedJobChan chan Job
scheduleTimer <-chan time.Time
pending map[string]Job
running map[string]Job
}
func NewJobRunner() *JobRunner {
return &JobRunner{
newJobChan: make(chan Job),
finishedJobChan: make(chan Job),
pending: make(map[string]Job),
running: make(map[string]Job),
}
}
func (r *JobRunner) AddJobChan() chan<- Job {
return r.newJobChan
}
func (r *JobRunner) AddJob(j Job) {
r.newJobChan <- j
}
func (r *JobRunner) Start() {
loop:
select {
case newJob := <-r.newJobChan:
_, jobPending := r.pending[newJob.Name]
_, jobRunning := r.running[newJob.Name]
if jobPending || jobRunning {
panic("job already in runner")
}
r.pending[newJob.Name] = newJob
case finishedJob := <-r.finishedJobChan:
runTime := time.Since(finishedJob.LastStart)
fmt.Printf("[%s] finished after %v\n", finishedJob.Name, runTime)
if runTime > finishedJob.Interval {
fmt.Printf("[%s] WARN: job exceeded interval of %v\n", finishedJob.Name, finishedJob.Interval)
}
delete(r.running, finishedJob.Name)
if finishedJob.Repeats {
r.pending[finishedJob.Name] = finishedJob
}
case <-r.scheduleTimer:
}
if len(r.pending) == 0 && len(r.running) == 0 {
return
}
// Find jobs to run
var now time.Time
var jobPending bool
now = time.Now()
jobPending = false
nextJobDue := now.Add(time.Minute) // max(pending.Interval)
for jobName, job := range r.pending {
jobDueTime := job.LastStart.Add(job.Interval)
if jobDueTime.After(time.Now()) {
if jobDueTime.Before(nextJobDue) {
nextJobDue = jobDueTime
}
jobPending = true
continue
}
// This job is due, run it
delete(r.pending, jobName)
r.running[jobName] = job
job.LastStart = now
go func(job Job) {
job.RunFunc()
r.finishedJobChan <- job
}(job)
}
if jobPending || len(r.running) > 0 {
nextJobDue = nextJobDue.Add(time.Second).Round(time.Second)
fmt.Printf("jobrun: waiting until %v\n", nextJobDue)
r.scheduleTimer = time.After(nextJobDue.Sub(now))
goto loop
}
}