diff --git a/cmd/config.go b/cmd/config.go index 3e07343..71cf873 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -85,7 +85,7 @@ func parsePushs(v interface{}) (p []Push, err error) { p = make([]Push, len(asList)) - for _, e := range asList { + for i, e := range asList { push := Push{ To: e.To, Datasets: make([]zfs.DatasetPath, len(e.Datasets)), @@ -97,7 +97,7 @@ func parsePushs(v interface{}) (p []Push, err error) { } } - p = append(p, push) + p[i] = push } return @@ -116,20 +116,20 @@ func parsePulls(v interface{}) (p []Pull, err error) { p = make([]Pull, len(asList)) - for _, e := range asList { + for i, e := range asList { pull := Pull{ From: e.From, } if pull.Mapping, err = parseComboMapping(e.Mapping); err != nil { return } - p = append(p, pull) + p[i] = pull } return } -func parseSinks(v interface{}) (s []Sink, err error) { +func parseSinks(v interface{}) (sinks []Sink, err error) { var asList []interface{} var ok bool @@ -137,14 +137,14 @@ func parseSinks(v interface{}) (s []Sink, err error) { return nil, errors.New("expected list") } - s = make([]Sink, len(asList)) + sinks = make([]Sink, len(asList)) - for _, i := range asList { + for i, s := range asList { var sink Sink - if sink, err = parseSink(i); err != nil { + if sink, err = parseSink(s); err != nil { return } - s = append(s, sink) + sinks[i] = sink } return diff --git a/cmd/main.go b/cmd/main.go index 4e4da4d..f50cad4 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -4,9 +4,12 @@ import ( "errors" "fmt" "github.com/urfave/cli" + "github.com/zrepl/zrepl/jobrun" "github.com/zrepl/zrepl/rpc" "github.com/zrepl/zrepl/sshbytestream" "io" + "sync" + "time" ) type Role uint @@ -18,6 +21,7 @@ const ( var conf Config var handler Handler +var runner *jobrun.JobRunner func main() { @@ -37,6 +41,8 @@ func main() { return } handler = Handler{} + + runner = jobrun.NewJobRunner() return } app.Commands = []cli.Command{ @@ -73,7 +79,51 @@ func doSink(c *cli.Context) (err error) { func doRun(c *cli.Context) error { - fmt.Printf("%#v", conf) + // Do every pull, do every push + // Scheduling + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + runner.Start() + }() + + for i := range conf.Pulls { + pull := conf.Pulls[i] + + j := jobrun.Job{ + Name: fmt.Sprintf("pull%d", i), + Interval: time.Duration(5 * time.Second), + Repeats: true, + RunFunc: func() error { + fmt.Printf("%v: %#v\n", time.Now(), pull) + time.Sleep(10 * time.Second) + fmt.Printf("%v: %#v\n", time.Now(), pull) + return nil + }, + } + + runner.AddJob(j) + } + + for i := range conf.Pushs { + push := conf.Pushs[i] + + j := jobrun.Job{ + Name: fmt.Sprintf("push%d", i), + Interval: time.Duration(5 * time.Second), + Repeats: true, + RunFunc: func() error { + fmt.Printf("%v: %#v\n", time.Now(), push) + return nil + }, + } + + runner.AddJob(j) + } + + wg.Wait() return nil } diff --git a/jobrun/jobrun.go b/jobrun/jobrun.go new file mode 100644 index 0000000..15841bd --- /dev/null +++ b/jobrun/jobrun.go @@ -0,0 +1,118 @@ +package jobrun + +import ( + "fmt" + "time" +) + +type Job struct { + Name string + RunFunc func() (err error) + LastStart time.Time + Interval time.Duration + Repeats bool +} + +type JobRunner struct { + newJobChan chan Job + finishedJobChan chan Job + scheduleTimer <-chan time.Time + pending map[string]Job + running map[string]Job +} + +func NewJobRunner() *JobRunner { + return &JobRunner{ + newJobChan: make(chan Job), + finishedJobChan: make(chan Job), + pending: make(map[string]Job), + running: make(map[string]Job), + } +} + +func (r *JobRunner) AddJobChan() chan<- Job { + return r.newJobChan +} + +func (r *JobRunner) AddJob(j Job) { + r.newJobChan <- j +} + +func (r *JobRunner) Start() { + +loop: + select { + + case newJob := <-r.newJobChan: + + _, jobPending := r.pending[newJob.Name] + _, jobRunning := r.running[newJob.Name] + + if jobPending || jobRunning { + panic("job already in runner") + } + + r.pending[newJob.Name] = newJob + + case finishedJob := <-r.finishedJobChan: + + runTime := time.Since(finishedJob.LastStart) + + fmt.Printf("[%s] finished after %v\n", finishedJob.Name, runTime) + if runTime > finishedJob.Interval { + fmt.Printf("[%s] WARN: job exceeded interval of %v\n", finishedJob.Name, finishedJob.Interval) + } + + delete(r.running, finishedJob.Name) + if finishedJob.Repeats { + r.pending[finishedJob.Name] = finishedJob + } + + case <-r.scheduleTimer: + } + + if len(r.pending) == 0 && len(r.running) == 0 { + return + } + + // Find jobs to run + var now time.Time + var jobPending bool + + now = time.Now() + jobPending = false + + nextJobDue := now.Add(time.Minute) // max(pending.Interval) + + for jobName, job := range r.pending { + + jobDueTime := job.LastStart.Add(job.Interval) + + if jobDueTime.After(time.Now()) { + if jobDueTime.Before(nextJobDue) { + nextJobDue = jobDueTime + } + jobPending = true + continue + } + // This job is due, run it + + delete(r.pending, jobName) + r.running[jobName] = job + job.LastStart = now + + go func(job Job) { + job.RunFunc() + r.finishedJobChan <- job + }(job) + + } + + if jobPending || len(r.running) > 0 { + nextJobDue = nextJobDue.Add(time.Second).Round(time.Second) + fmt.Printf("jobrun: waiting until %v\n", nextJobDue) + r.scheduleTimer = time.After(nextJobDue.Sub(now)) + goto loop + } + +}