mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-22 00:13:52 +01:00
remove unused package jobrun
This commit is contained in:
parent
7c86628f3b
commit
4b23648e6e
189
jobrun/jobrun.go
189
jobrun/jobrun.go
@ -1,189 +0,0 @@
|
||||
package jobrun
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Logger interface {
|
||||
Printf(format string, v ...interface{})
|
||||
}
|
||||
|
||||
type jobLogger struct {
|
||||
MainLog Logger
|
||||
JobName string
|
||||
}
|
||||
|
||||
func (l jobLogger) Printf(format string, v ...interface{}) {
|
||||
l.MainLog.Printf(fmt.Sprintf("[%s]: %s", l.JobName, format), v...)
|
||||
}
|
||||
|
||||
type Job interface {
|
||||
JobName() string
|
||||
JobDo(log Logger) (err error)
|
||||
JobRepeatStrategy() RepeatStrategy
|
||||
}
|
||||
|
||||
type JobEvent interface{}
|
||||
|
||||
type JobFinishedEvent struct {
|
||||
Job Job
|
||||
Result JobRunResult
|
||||
}
|
||||
|
||||
type JobScheduledEvent struct {
|
||||
Job Job
|
||||
DueAt time.Time
|
||||
}
|
||||
|
||||
type JobrunIdleEvent struct {
|
||||
SleepUntil time.Time
|
||||
}
|
||||
|
||||
type JobrunFinishedEvent struct{}
|
||||
|
||||
type JobMetadata struct {
|
||||
Job Job
|
||||
name string
|
||||
LastStart time.Time
|
||||
LastError error
|
||||
DueAt time.Time
|
||||
}
|
||||
|
||||
type JobRunResult struct {
|
||||
Start time.Time
|
||||
Finish time.Time
|
||||
Error error
|
||||
}
|
||||
|
||||
func (r JobRunResult) RunTime() time.Duration { return r.Finish.Sub(r.Start) }
|
||||
|
||||
type RepeatStrategy interface {
|
||||
ShouldReschedule(lastResult JobRunResult) (nextDue time.Time, reschedule bool)
|
||||
}
|
||||
|
||||
type JobRunner struct {
|
||||
logger Logger
|
||||
notificationChan chan<- JobEvent
|
||||
newJobChan chan Job
|
||||
finishedJobChan chan JobMetadata
|
||||
scheduleTimer <-chan time.Time
|
||||
pending map[string]JobMetadata
|
||||
running map[string]JobMetadata
|
||||
wait sync.WaitGroup
|
||||
}
|
||||
|
||||
func NewJobRunner(logger Logger) *JobRunner {
|
||||
return &JobRunner{
|
||||
logger: logger,
|
||||
newJobChan: make(chan Job),
|
||||
finishedJobChan: make(chan JobMetadata),
|
||||
pending: make(map[string]JobMetadata),
|
||||
running: make(map[string]JobMetadata),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *JobRunner) AddJob(j Job) {
|
||||
go func(j Job) {
|
||||
r.newJobChan <- j
|
||||
}(j)
|
||||
}
|
||||
|
||||
func (r *JobRunner) SetNotificationChannel(c chan<- JobEvent) {
|
||||
r.notificationChan = c
|
||||
}
|
||||
|
||||
func (r *JobRunner) postEvent(n JobEvent) {
|
||||
if r.notificationChan != nil {
|
||||
r.notificationChan <- n
|
||||
}
|
||||
}
|
||||
|
||||
func (r *JobRunner) Run() {
|
||||
|
||||
loop:
|
||||
select {
|
||||
|
||||
case j := <-r.newJobChan:
|
||||
|
||||
jn := j.JobName()
|
||||
|
||||
_, jobPending := r.pending[jn]
|
||||
_, jobRunning := r.running[jn]
|
||||
|
||||
if jobPending || jobRunning {
|
||||
panic("job already in runner")
|
||||
}
|
||||
|
||||
jm := JobMetadata{name: jn, Job: j}
|
||||
|
||||
r.pending[jn] = jm
|
||||
|
||||
case finishedJob := <-r.finishedJobChan:
|
||||
|
||||
delete(r.running, finishedJob.name)
|
||||
|
||||
res := JobRunResult{
|
||||
Start: finishedJob.LastStart,
|
||||
Finish: time.Now(),
|
||||
Error: finishedJob.LastError,
|
||||
}
|
||||
|
||||
r.postEvent(JobFinishedEvent{finishedJob.Job, res})
|
||||
|
||||
dueTime, resched := finishedJob.Job.JobRepeatStrategy().ShouldReschedule(res)
|
||||
if resched {
|
||||
r.postEvent(JobScheduledEvent{finishedJob.Job, dueTime})
|
||||
finishedJob.DueAt = dueTime
|
||||
r.pending[finishedJob.name] = finishedJob
|
||||
}
|
||||
|
||||
case <-r.scheduleTimer:
|
||||
}
|
||||
|
||||
if len(r.pending) == 0 && len(r.running) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Find jobs to run
|
||||
var now time.Time
|
||||
var jobPending bool
|
||||
|
||||
now = time.Now()
|
||||
jobPending = false
|
||||
|
||||
nextJobDue := now.Add(time.Minute) // max(pending.Interval)
|
||||
|
||||
for jobName, job := range r.pending {
|
||||
|
||||
if job.DueAt.After(now) {
|
||||
if job.DueAt.Before(nextJobDue) {
|
||||
nextJobDue = job.DueAt
|
||||
}
|
||||
jobPending = true
|
||||
continue
|
||||
}
|
||||
// This job is due, run it
|
||||
|
||||
delete(r.pending, jobName)
|
||||
r.running[jobName] = job
|
||||
job.LastStart = now
|
||||
|
||||
go func(job JobMetadata) {
|
||||
jobLog := jobLogger{r.logger, job.name}
|
||||
job.LastError = job.Job.JobDo(jobLog)
|
||||
r.finishedJobChan <- job
|
||||
}(job)
|
||||
|
||||
}
|
||||
|
||||
if jobPending || len(r.running) > 0 {
|
||||
r.postEvent(JobrunIdleEvent{nextJobDue})
|
||||
r.scheduleTimer = time.After(nextJobDue.Sub(now))
|
||||
goto loop
|
||||
}
|
||||
|
||||
r.postEvent(JobrunFinishedEvent{})
|
||||
|
||||
}
|
@ -1,25 +0,0 @@
|
||||
package jobrun
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type NoRepeatStrategy struct{}
|
||||
|
||||
func (s NoRepeatStrategy) ShouldReschedule(lastResult JobRunResult) (time.Time, bool) {
|
||||
return time.Time{}, false
|
||||
}
|
||||
|
||||
type PeriodicRepeatStrategy struct {
|
||||
Interval time.Duration
|
||||
}
|
||||
|
||||
func (s *PeriodicRepeatStrategy) ShouldReschedule(lastResult JobRunResult) (next time.Time, shouldRun bool) {
|
||||
// Don't care about the result
|
||||
shouldRun = true
|
||||
next = lastResult.Start.Add(s.Interval)
|
||||
if next.Before(time.Now()) {
|
||||
next = time.Now()
|
||||
}
|
||||
return
|
||||
}
|
Loading…
Reference in New Issue
Block a user