diff --git a/cmd/autosnap.go b/cmd/autosnap.go index 7996eaa..7c69820 100644 --- a/cmd/autosnap.go +++ b/cmd/autosnap.go @@ -3,6 +3,7 @@ package cmd import ( "fmt" "os" + "sync" "time" "github.com/spf13/cobra" @@ -22,7 +23,14 @@ func init() { func cmdAutosnap(cmd *cobra.Command, args []string) { + var wg sync.WaitGroup + r := jobrun.NewJobRunner(log) + wg.Add(1) + go func() { + r.Run() + wg.Done() + }() if len(args) < 1 { log.Printf("must specify exactly one job as positional argument") @@ -37,7 +45,7 @@ func cmdAutosnap(cmd *cobra.Command, args []string) { r.AddJob(snap) - r.Wait() + wg.Wait() } diff --git a/jobrun/jobrun.go b/jobrun/jobrun.go index 886ed92..482cc19 100644 --- a/jobrun/jobrun.go +++ b/jobrun/jobrun.go @@ -16,7 +16,7 @@ type jobLogger struct { } func (l jobLogger) Printf(format string, v ...interface{}) { - l.MainLog.Printf(fmt.Sprintf("job[%s]: %s", l.JobName, format), v...) + l.MainLog.Printf(fmt.Sprintf("[%s]: %s", l.JobName, format), v...) } type Job interface { @@ -25,6 +25,24 @@ type Job interface { JobRepeatStrategy() RepeatStrategy } +type JobEvent interface{} + +type JobFinishedEvent struct { + Job Job + Result JobRunResult +} + +type JobScheduledEvent struct { + Job Job + DueAt time.Time +} + +type JobrunIdleEvent struct { + SleepUntil time.Time +} + +type JobrunFinishedEvent struct{} + type JobMetadata struct { Job Job name string @@ -47,7 +65,7 @@ type RepeatStrategy interface { type JobRunner struct { logger Logger - notificationChan chan JobMetadata + notificationChan chan<- JobEvent newJobChan chan Job finishedJobChan chan JobMetadata scheduleTimer <-chan time.Time @@ -58,36 +76,31 @@ type JobRunner struct { func NewJobRunner(logger Logger) *JobRunner { return &JobRunner{ - logger: logger, - notificationChan: make(chan JobMetadata), - newJobChan: make(chan Job), - finishedJobChan: make(chan JobMetadata), - pending: make(map[string]JobMetadata), - running: make(map[string]JobMetadata), + logger: logger, + newJobChan: make(chan Job), + finishedJobChan: make(chan JobMetadata), + pending: make(map[string]JobMetadata), + running: make(map[string]JobMetadata), } } func (r *JobRunner) AddJob(j Job) { - r.newJobChan <- j + go func(j Job) { + r.newJobChan <- j + }(j) } -func (r *JobRunner) NotificationChan() <-chan JobMetadata { - return r.notificationChan +func (r *JobRunner) SetNotificationChannel(c chan<- JobEvent) { + r.notificationChan = c } -func (r *JobRunner) Start() { - r.wait.Add(1) - go func() { - r.loop() - r.wait.Done() - }() +func (r *JobRunner) postEvent(n JobEvent) { + if r.notificationChan != nil { + r.notificationChan <- n + } } -func (r *JobRunner) Wait() { - r.wait.Wait() -} - -func (r *JobRunner) loop() { +func (r *JobRunner) Run() { loop: select { @@ -117,11 +130,11 @@ loop: Error: finishedJob.LastError, } - r.logger.Printf("[%s] finished after %s\n", finishedJob.name, res.RunTime()) + r.postEvent(JobFinishedEvent{finishedJob.Job, res}) dueTime, resched := finishedJob.Job.JobRepeatStrategy().ShouldReschedule(res) if resched { - r.logger.Printf("[%s] rescheduling to %s", finishedJob.name, dueTime) + r.postEvent(JobScheduledEvent{finishedJob.Job, dueTime}) finishedJob.DueAt = dueTime r.pending[finishedJob.name] = finishedJob } @@ -159,19 +172,18 @@ loop: go func(job JobMetadata) { jobLog := jobLogger{r.logger, job.name} - if err := job.Job.JobDo(jobLog); err != nil { - job.LastError = err - r.notificationChan <- job - } + job.LastError = job.Job.JobDo(jobLog) r.finishedJobChan <- job }(job) } if jobPending || len(r.running) > 0 { - r.logger.Printf("waiting until %v\n", nextJobDue) + r.postEvent(JobrunIdleEvent{nextJobDue}) r.scheduleTimer = time.After(nextJobDue.Sub(now)) goto loop } + r.postEvent(JobrunFinishedEvent{}) + }