jobrun: use notificationChannel instead of logger for communicating events

This commit is contained in:
Christian Schwarz 2017-09-01 16:44:58 +02:00
parent f8979d6e83
commit 8a96267ef4
2 changed files with 50 additions and 30 deletions

View File

@ -3,6 +3,7 @@ package cmd
import ( import (
"fmt" "fmt"
"os" "os"
"sync"
"time" "time"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -22,7 +23,14 @@ func init() {
func cmdAutosnap(cmd *cobra.Command, args []string) { func cmdAutosnap(cmd *cobra.Command, args []string) {
var wg sync.WaitGroup
r := jobrun.NewJobRunner(log) r := jobrun.NewJobRunner(log)
wg.Add(1)
go func() {
r.Run()
wg.Done()
}()
if len(args) < 1 { if len(args) < 1 {
log.Printf("must specify exactly one job as positional argument") log.Printf("must specify exactly one job as positional argument")
@ -37,7 +45,7 @@ func cmdAutosnap(cmd *cobra.Command, args []string) {
r.AddJob(snap) r.AddJob(snap)
r.Wait() wg.Wait()
} }

View File

@ -16,7 +16,7 @@ type jobLogger struct {
} }
func (l jobLogger) Printf(format string, v ...interface{}) { func (l jobLogger) Printf(format string, v ...interface{}) {
l.MainLog.Printf(fmt.Sprintf("job[%s]: %s", l.JobName, format), v...) l.MainLog.Printf(fmt.Sprintf("[%s]: %s", l.JobName, format), v...)
} }
type Job interface { type Job interface {
@ -25,6 +25,24 @@ type Job interface {
JobRepeatStrategy() RepeatStrategy JobRepeatStrategy() RepeatStrategy
} }
type JobEvent interface{}
type JobFinishedEvent struct {
Job Job
Result JobRunResult
}
type JobScheduledEvent struct {
Job Job
DueAt time.Time
}
type JobrunIdleEvent struct {
SleepUntil time.Time
}
type JobrunFinishedEvent struct{}
type JobMetadata struct { type JobMetadata struct {
Job Job Job Job
name string name string
@ -47,7 +65,7 @@ type RepeatStrategy interface {
type JobRunner struct { type JobRunner struct {
logger Logger logger Logger
notificationChan chan JobMetadata notificationChan chan<- JobEvent
newJobChan chan Job newJobChan chan Job
finishedJobChan chan JobMetadata finishedJobChan chan JobMetadata
scheduleTimer <-chan time.Time scheduleTimer <-chan time.Time
@ -59,7 +77,6 @@ type JobRunner struct {
func NewJobRunner(logger Logger) *JobRunner { func NewJobRunner(logger Logger) *JobRunner {
return &JobRunner{ return &JobRunner{
logger: logger, logger: logger,
notificationChan: make(chan JobMetadata),
newJobChan: make(chan Job), newJobChan: make(chan Job),
finishedJobChan: make(chan JobMetadata), finishedJobChan: make(chan JobMetadata),
pending: make(map[string]JobMetadata), pending: make(map[string]JobMetadata),
@ -68,26 +85,22 @@ func NewJobRunner(logger Logger) *JobRunner {
} }
func (r *JobRunner) AddJob(j Job) { func (r *JobRunner) AddJob(j Job) {
go func(j Job) {
r.newJobChan <- j r.newJobChan <- j
}(j)
} }
func (r *JobRunner) NotificationChan() <-chan JobMetadata { func (r *JobRunner) SetNotificationChannel(c chan<- JobEvent) {
return r.notificationChan r.notificationChan = c
} }
func (r *JobRunner) Start() { func (r *JobRunner) postEvent(n JobEvent) {
r.wait.Add(1) if r.notificationChan != nil {
go func() { r.notificationChan <- n
r.loop() }
r.wait.Done()
}()
} }
func (r *JobRunner) Wait() { func (r *JobRunner) Run() {
r.wait.Wait()
}
func (r *JobRunner) loop() {
loop: loop:
select { select {
@ -117,11 +130,11 @@ loop:
Error: finishedJob.LastError, Error: finishedJob.LastError,
} }
r.logger.Printf("[%s] finished after %s\n", finishedJob.name, res.RunTime()) r.postEvent(JobFinishedEvent{finishedJob.Job, res})
dueTime, resched := finishedJob.Job.JobRepeatStrategy().ShouldReschedule(res) dueTime, resched := finishedJob.Job.JobRepeatStrategy().ShouldReschedule(res)
if resched { if resched {
r.logger.Printf("[%s] rescheduling to %s", finishedJob.name, dueTime) r.postEvent(JobScheduledEvent{finishedJob.Job, dueTime})
finishedJob.DueAt = dueTime finishedJob.DueAt = dueTime
r.pending[finishedJob.name] = finishedJob r.pending[finishedJob.name] = finishedJob
} }
@ -159,19 +172,18 @@ loop:
go func(job JobMetadata) { go func(job JobMetadata) {
jobLog := jobLogger{r.logger, job.name} jobLog := jobLogger{r.logger, job.name}
if err := job.Job.JobDo(jobLog); err != nil { job.LastError = job.Job.JobDo(jobLog)
job.LastError = err
r.notificationChan <- job
}
r.finishedJobChan <- job r.finishedJobChan <- job
}(job) }(job)
} }
if jobPending || len(r.running) > 0 { if jobPending || len(r.running) > 0 {
r.logger.Printf("waiting until %v\n", nextJobDue) r.postEvent(JobrunIdleEvent{nextJobDue})
r.scheduleTimer = time.After(nextJobDue.Sub(now)) r.scheduleTimer = time.After(nextJobDue.Sub(now))
goto loop goto loop
} }
r.postEvent(JobrunFinishedEvent{})
} }