jobrun/cmd: implement jobrun.Job for config objects

This commit is contained in:
Christian Schwarz 2017-09-01 14:42:12 +02:00
parent 582ae83da3
commit f8979d6e83
5 changed files with 100 additions and 56 deletions

View File

@ -2,12 +2,12 @@ package cmd
import (
"fmt"
"os"
"time"
"github.com/spf13/cobra"
"github.com/zrepl/zrepl/jobrun"
"github.com/zrepl/zrepl/zfs"
"os"
"sync"
"time"
)
var AutosnapCmd = &cobra.Command{
@ -22,12 +22,7 @@ func init() {
func cmdAutosnap(cmd *cobra.Command, args []string) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
runner.Start()
}()
r := jobrun.NewJobRunner(log)
if len(args) < 1 {
log.Printf("must specify exactly one job as positional argument")
@ -40,18 +35,9 @@ func cmdAutosnap(cmd *cobra.Command, args []string) {
os.Exit(1)
}
job := jobrun.JobMetadata{
Name: snap.JobName,
RepeatStrategy: snap.Interval,
RunFunc: func(log jobrun.Logger) error {
log.Printf("doing autosnap: %v", snap)
ctx := AutosnapContext{snap}
return doAutosnap(ctx, log)
},
}
runner.AddJob(job)
r.AddJob(snap)
wg.Wait()
r.Wait()
}

View File

@ -49,14 +49,14 @@ type SSHTransport struct {
}
type Push struct {
JobName string // for use with jobrun package
jobName string // for use with jobrun package
To *Remote
Filter zfs.DatasetFilter
InitialReplPolicy InitialReplPolicy
RepeatStrategy jobrun.RepeatStrategy
}
type Pull struct {
JobName string // for use with jobrun package
jobName string // for use with jobrun package
From *Remote
Mapping DatasetMapFilter
InitialReplPolicy InitialReplPolicy
@ -64,14 +64,14 @@ type Pull struct {
}
type Prune struct {
JobName string // for use with jobrun package
jobName string // for use with jobrun package
DatasetFilter zfs.DatasetFilter
SnapshotFilter zfs.FilesystemVersionFilter
RetentionPolicy *RetentionGrid // TODO abstract interface to support future policies?
}
type Autosnap struct {
JobName string // for use with jobrun package
jobName string // for use with jobrun package
Prefix string
Interval jobrun.RepeatStrategy
DatasetFilter zfs.DatasetFilter
@ -230,7 +230,7 @@ func parsePushs(v interface{}, rl remoteLookup) (p map[string]*Push, err error)
To: toRemote,
}
if push.JobName, err = fullJobName(JobSectionPush, name); err != nil {
if push.jobName, err = fullJobName(JobSectionPush, name); err != nil {
return
}
if push.Filter, err = parseDatasetMapFilter(e.Filter, true); err != nil {
@ -289,7 +289,7 @@ func parsePulls(v interface{}, rl remoteLookup) (p map[string]*Pull, err error)
pull := &Pull{
From: fromRemote,
}
if pull.JobName, err = fullJobName(JobSectionPull, name); err != nil {
if pull.jobName, err = fullJobName(JobSectionPull, name); err != nil {
return
}
if pull.Mapping, err = parseDatasetMapFilter(e.Mapping, false); err != nil {
@ -507,7 +507,7 @@ func parsePrune(e map[string]interface{}, name string) (prune *Prune, err error)
prune = &Prune{}
if prune.JobName, err = fullJobName(JobSectionPrune, name); err != nil {
if prune.jobName, err = fullJobName(JobSectionPrune, name); err != nil {
return
}
@ -705,7 +705,7 @@ func parseAutosnap(m interface{}, name string) (a *Autosnap, err error) {
a = &Autosnap{}
if a.JobName, err = fullJobName(JobSectionAutosnap, name); err != nil {
if a.jobName, err = fullJobName(JobSectionAutosnap, name); err != nil {
return
}

43
cmd/jobs.go Normal file
View File

@ -0,0 +1,43 @@
package cmd
import (
"time"
"github.com/zrepl/zrepl/jobrun"
)
func (p *Pull) JobName() string {
return p.jobName
}
func (p *Pull) JobDo(log jobrun.Logger) (err error) {
return jobPull(p, log)
}
func (p *Pull) JobRepeatStrategy() jobrun.RepeatStrategy {
return p.RepeatStrategy
}
func (a *Autosnap) JobName() string {
return a.jobName
}
func (a *Autosnap) JobDo(log jobrun.Logger) (err error) {
return doAutosnap(AutosnapContext{a}, log)
}
func (a *Autosnap) JobRepeatStrategy() jobrun.RepeatStrategy {
return a.Interval
}
func (p *Prune) JobName() string {
return p.jobName
}
func (p *Prune) JobDo(log jobrun.Logger) (err error) {
return doPrune(PruneContext{p, time.Now(), false}, log)
}
func (p *Prune) JobRepeatStrategy() jobrun.RepeatStrategy {
return p.Repeat
}

View File

@ -13,7 +13,6 @@ package cmd
import (
"fmt"
"github.com/spf13/cobra"
"github.com/zrepl/zrepl/jobrun"
"golang.org/x/sys/unix"
"io"
golog "log"
@ -29,7 +28,6 @@ type Logger interface {
// global state / facilities
var (
conf Config
runner *jobrun.JobRunner
logFlags int = golog.LUTC | golog.Ldate | golog.Ltime
logOut io.Writer
log Logger
@ -97,8 +95,6 @@ func initConfig() {
os.Exit(1)
}
jobrunLogger := golog.New(os.Stderr, "jobrun ", logFlags)
runner = jobrun.NewJobRunner(jobrunLogger)
return
}

View File

@ -2,6 +2,7 @@ package jobrun
import (
"fmt"
"sync"
"time"
)
@ -18,13 +19,18 @@ func (l jobLogger) Printf(format string, v ...interface{}) {
l.MainLog.Printf(fmt.Sprintf("job[%s]: %s", l.JobName, format), v...)
}
type Job interface {
JobName() string
JobDo(log Logger) (err error)
JobRepeatStrategy() RepeatStrategy
}
type JobMetadata struct {
Name string
RunFunc func(log Logger) (err error)
LastStart time.Time
LastError error
DueAt time.Time
RepeatStrategy RepeatStrategy
Job Job
name string
LastStart time.Time
LastError error
DueAt time.Time
}
type JobRunResult struct {
@ -42,29 +48,26 @@ type RepeatStrategy interface {
type JobRunner struct {
logger Logger
notificationChan chan JobMetadata
newJobChan chan JobMetadata
newJobChan chan Job
finishedJobChan chan JobMetadata
scheduleTimer <-chan time.Time
pending map[string]JobMetadata
running map[string]JobMetadata
wait sync.WaitGroup
}
func NewJobRunner(logger Logger) *JobRunner {
return &JobRunner{
logger: logger,
notificationChan: make(chan JobMetadata),
newJobChan: make(chan JobMetadata),
newJobChan: make(chan Job),
finishedJobChan: make(chan JobMetadata),
pending: make(map[string]JobMetadata),
running: make(map[string]JobMetadata),
}
}
func (r *JobRunner) AddJobChan() chan<- JobMetadata {
return r.newJobChan
}
func (r *JobRunner) AddJob(j JobMetadata) {
func (r *JobRunner) AddJob(j Job) {
r.newJobChan <- j
}
@ -73,24 +76,40 @@ func (r *JobRunner) NotificationChan() <-chan JobMetadata {
}
func (r *JobRunner) Start() {
r.wait.Add(1)
go func() {
r.loop()
r.wait.Done()
}()
}
func (r *JobRunner) Wait() {
r.wait.Wait()
}
func (r *JobRunner) loop() {
loop:
select {
case newJob := <-r.newJobChan:
case j := <-r.newJobChan:
_, jobPending := r.pending[newJob.Name]
_, jobRunning := r.running[newJob.Name]
jn := j.JobName()
_, jobPending := r.pending[jn]
_, jobRunning := r.running[jn]
if jobPending || jobRunning {
panic("job already in runner")
}
r.pending[newJob.Name] = newJob
jm := JobMetadata{name: jn, Job: j}
r.pending[jn] = jm
case finishedJob := <-r.finishedJobChan:
delete(r.running, finishedJob.Name)
delete(r.running, finishedJob.name)
res := JobRunResult{
Start: finishedJob.LastStart,
@ -98,13 +117,13 @@ loop:
Error: finishedJob.LastError,
}
r.logger.Printf("[%s] finished after %s\n", finishedJob.Name, res.RunTime())
r.logger.Printf("[%s] finished after %s\n", finishedJob.name, res.RunTime())
dueTime, resched := finishedJob.RepeatStrategy.ShouldReschedule(res)
dueTime, resched := finishedJob.Job.JobRepeatStrategy().ShouldReschedule(res)
if resched {
r.logger.Printf("[%s] rescheduling to %s", finishedJob.Name, dueTime)
r.logger.Printf("[%s] rescheduling to %s", finishedJob.name, dueTime)
finishedJob.DueAt = dueTime
r.pending[finishedJob.Name] = finishedJob
r.pending[finishedJob.name] = finishedJob
}
case <-r.scheduleTimer:
@ -139,8 +158,8 @@ loop:
job.LastStart = now
go func(job JobMetadata) {
jobLog := jobLogger{r.logger, job.Name}
if err := job.RunFunc(jobLog); err != nil {
jobLog := jobLogger{r.logger, job.name}
if err := job.Job.JobDo(jobLog); err != nil {
job.LastError = err
r.notificationChan <- job
}