adopt Task API: infect datastructures

refs #10
This commit is contained in:
Christian Schwarz 2017-12-26 19:36:27 +01:00
parent ce351146cf
commit 58ee796394
8 changed files with 54 additions and 15 deletions

View File

@ -9,6 +9,7 @@ import (
) )
type IntervalAutosnap struct { type IntervalAutosnap struct {
task *Task
DatasetFilter zfs.DatasetFilter DatasetFilter zfs.DatasetFilter
Prefix string Prefix string
SnapshotInterval time.Duration SnapshotInterval time.Duration

View File

@ -63,7 +63,7 @@ type PrunePolicy interface {
} }
type PruningJob interface { type PruningJob interface {
Pruner(side PrunePolicySide, dryRun bool) (Pruner, error) Pruner(task *Task, side PrunePolicySide, dryRun bool) (Pruner, error)
} }
// A type for constants describing different prune policies of a PruningJob // A type for constants describing different prune policies of a PruningJob

View File

@ -20,6 +20,10 @@ type LocalJob struct {
PruneLHS PrunePolicy PruneLHS PrunePolicy
PruneRHS PrunePolicy PruneRHS PrunePolicy
Debug JobDebugSettings Debug JobDebugSettings
snapperTask *Task
replTask *Task
pruneRHSTask *Task
pruneLHSTask *Task
} }
func parseLocalJob(c JobParsingContext, name string, i map[string]interface{}) (j *LocalJob, err error) { func parseLocalJob(c JobParsingContext, name string, i map[string]interface{}) (j *LocalJob, err error) {
@ -84,6 +88,11 @@ func (j *LocalJob) JobStart(ctx context.Context) {
log := ctx.Value(contextKeyLog).(Logger) log := ctx.Value(contextKeyLog).(Logger)
defer log.Info("exiting") defer log.Info("exiting")
j.snapperTask = NewTask("snapshot", log)
j.replTask = NewTask("repl", log)
j.pruneRHSTask = NewTask("prune_rhs", log)
j.pruneLHSTask = NewTask("prune_lhs", log)
local := rpc.NewLocalRPC() local := rpc.NewLocalRPC()
// Allow access to any dataset since we control what mapping // Allow access to any dataset since we control what mapping
// is passed to the pull routine. // is passed to the pull routine.
@ -95,17 +104,18 @@ func (j *LocalJob) JobStart(ctx context.Context) {
registerEndpoints(local, handler) registerEndpoints(local, handler)
snapper := IntervalAutosnap{ snapper := IntervalAutosnap{
task: j.snapperTask,
DatasetFilter: j.Mapping.AsFilter(), DatasetFilter: j.Mapping.AsFilter(),
Prefix: j.SnapshotPrefix, Prefix: j.SnapshotPrefix,
SnapshotInterval: j.Interval, SnapshotInterval: j.Interval,
} }
plhs, err := j.Pruner(PrunePolicySideLeft, false) plhs, err := j.Pruner(j.pruneLHSTask, PrunePolicySideLeft, false)
if err != nil { if err != nil {
log.WithError(err).Error("error creating lhs pruner") log.WithError(err).Error("error creating lhs pruner")
return return
} }
prhs, err := j.Pruner(PrunePolicySideRight, false) prhs, err := j.Pruner(j.pruneRHSTask, PrunePolicySideRight, false)
if err != nil { if err != nil {
log.WithError(err).Error("error creating rhs pruner") log.WithError(err).Error("error creating rhs pruner")
return return
@ -137,7 +147,7 @@ outer:
{ {
log := pullCtx.Value(contextKeyLog).(Logger) log := pullCtx.Value(contextKeyLog).(Logger)
log.Debug("replicating from lhs to rhs") log.Debug("replicating from lhs to rhs")
puller := Puller{local, log, j.Mapping, j.InitialReplPolicy} puller := Puller{j.replTask, local, log, j.Mapping, j.InitialReplPolicy}
if err := puller.doPull(); err != nil { if err := puller.doPull(); err != nil {
log.WithError(err).Error("error replicating lhs to rhs") log.WithError(err).Error("error replicating lhs to rhs")
} }
@ -174,10 +184,15 @@ outer:
} }
func (j *LocalJob) JobStatus(ctxt context.Context) (*JobStatus, error) { func (j *LocalJob) JobStatus(ctxt context.Context) (*JobStatus, error) {
return &JobStatus{}, nil return &JobStatus{Tasks: []*TaskStatus{
j.snapperTask.Status(),
j.pruneLHSTask.Status(),
j.pruneRHSTask.Status(),
j.replTask.Status(),
}}, nil
} }
func (j *LocalJob) Pruner(side PrunePolicySide, dryRun bool) (p Pruner, err error) { func (j *LocalJob) Pruner(task *Task, side PrunePolicySide, dryRun bool) (p Pruner, err error) {
var dsfilter zfs.DatasetFilter var dsfilter zfs.DatasetFilter
var pp PrunePolicy var pp PrunePolicy
@ -198,6 +213,7 @@ func (j *LocalJob) Pruner(side PrunePolicySide, dryRun bool) (p Pruner, err erro
} }
p = Pruner{ p = Pruner{
task,
time.Now(), time.Now(),
dryRun, dryRun,
dsfilter, dsfilter,

View File

@ -21,6 +21,8 @@ type PullJob struct {
InitialReplPolicy InitialReplPolicy InitialReplPolicy InitialReplPolicy
Prune PrunePolicy Prune PrunePolicy
Debug JobDebugSettings Debug JobDebugSettings
task *Task
} }
func parsePullJob(c JobParsingContext, name string, i map[string]interface{}) (j *PullJob, err error) { func parsePullJob(c JobParsingContext, name string, i map[string]interface{}) (j *PullJob, err error) {
@ -95,6 +97,8 @@ func (j *PullJob) JobStart(ctx context.Context) {
log := ctx.Value(contextKeyLog).(Logger) log := ctx.Value(contextKeyLog).(Logger)
defer log.Info("exiting") defer log.Info("exiting")
j.task = NewTask("main", log)
log = j.task.Log()
ticker := time.NewTicker(j.Interval) ticker := time.NewTicker(j.Interval)
@ -120,7 +124,7 @@ start:
log.Info("starting pull") log.Info("starting pull")
pullLog := log.WithField(logTaskField, "pull") pullLog := log.WithField(logTaskField, "pull")
puller := Puller{client, pullLog, j.Mapping, j.InitialReplPolicy} puller := Puller{j.task, client, pullLog, j.Mapping, j.InitialReplPolicy}
if err = puller.doPull(); err != nil { if err = puller.doPull(); err != nil {
log.WithError(err).Error("error doing pull") log.WithError(err).Error("error doing pull")
} }
@ -129,7 +133,7 @@ start:
log.Info("starting prune") log.Info("starting prune")
prunectx := context.WithValue(ctx, contextKeyLog, log.WithField(logTaskField, "prune")) prunectx := context.WithValue(ctx, contextKeyLog, log.WithField(logTaskField, "prune"))
pruner, err := j.Pruner(PrunePolicySideDefault, false) pruner, err := j.Pruner(j.task, PrunePolicySideDefault, false)
if err != nil { if err != nil {
log.WithError(err).Error("error creating pruner") log.WithError(err).Error("error creating pruner")
return return
@ -150,11 +154,12 @@ start:
} }
func (j *PullJob) JobStatus(ctxt context.Context) (*JobStatus, error) { func (j *PullJob) JobStatus(ctxt context.Context) (*JobStatus, error) {
return &JobStatus{}, nil return &JobStatus{Tasks: []*TaskStatus{j.task.Status()}}, nil
} }
func (j *PullJob) Pruner(side PrunePolicySide, dryRun bool) (p Pruner, err error) { func (j *PullJob) Pruner(task *Task, side PrunePolicySide, dryRun bool) (p Pruner, err error) {
p = Pruner{ p = Pruner{
task,
time.Now(), time.Now(),
dryRun, dryRun,
j.pruneFilter, j.pruneFilter,

View File

@ -19,6 +19,9 @@ type SourceJob struct {
Interval time.Duration Interval time.Duration
Prune PrunePolicy Prune PrunePolicy
Debug JobDebugSettings Debug JobDebugSettings
serveTask *Task
autosnapTask *Task
pruneTask *Task
} }
func parseSourceJob(c JobParsingContext, name string, i map[string]interface{}) (j *SourceJob, err error) { func parseSourceJob(c JobParsingContext, name string, i map[string]interface{}) (j *SourceJob, err error) {
@ -78,8 +81,13 @@ func (j *SourceJob) JobStart(ctx context.Context) {
log := ctx.Value(contextKeyLog).(Logger) log := ctx.Value(contextKeyLog).(Logger)
defer log.Info("exiting") defer log.Info("exiting")
a := IntervalAutosnap{DatasetFilter: j.Filesystems, Prefix: j.SnapshotPrefix, SnapshotInterval: j.Interval} j.autosnapTask = NewTask("autosnap", log)
p, err := j.Pruner(PrunePolicySideDefault, false) j.pruneTask = NewTask("prune", log)
j.serveTask = NewTask("serve", log)
a := IntervalAutosnap{j.autosnapTask, j.Filesystems, j.SnapshotPrefix, j.Interval}
p, err := j.Pruner(j.pruneTask, PrunePolicySideDefault, false)
if err != nil { if err != nil {
log.WithError(err).Error("error creating pruner") log.WithError(err).Error("error creating pruner")
return return
@ -109,11 +117,17 @@ outer:
} }
func (j *SourceJob) JobStatus(ctxt context.Context) (*JobStatus, error) { func (j *SourceJob) JobStatus(ctxt context.Context) (*JobStatus, error) {
return &JobStatus{}, nil return &JobStatus{
Tasks: []*TaskStatus{
j.autosnapTask.Status(),
j.pruneTask.Status(),
j.serveTask.Status(),
}}, nil
} }
func (j *SourceJob) Pruner(side PrunePolicySide, dryRun bool) (p Pruner, err error) { func (j *SourceJob) Pruner(task *Task, side PrunePolicySide, dryRun bool) (p Pruner, err error) {
p = Pruner{ p = Pruner{
task,
time.Now(), time.Now(),
dryRun, dryRun,
j.Filesystems, j.Filesystems,

View File

@ -8,6 +8,7 @@ import (
) )
type Pruner struct { type Pruner struct {
task *Task
Now time.Time Now time.Time
DryRun bool DryRun bool
DatasetFilter zfs.DatasetFilter DatasetFilter zfs.DatasetFilter

View File

@ -53,6 +53,7 @@ func closeRPCWithTimeout(log Logger, remote rpc.RPCClient, timeout time.Duration
} }
type Puller struct { type Puller struct {
task *Task
Remote rpc.RPCClient Remote rpc.RPCClient
Log Logger Log Logger
Mapping DatasetMapping Mapping DatasetMapping

View File

@ -170,7 +170,8 @@ func doTestPrunePolicy(cmd *cobra.Command, args []string) {
log.Printf("job dump:\n%s", pretty.Sprint(jobp)) log.Printf("job dump:\n%s", pretty.Sprint(jobp))
pruner, err := jobp.Pruner(testPrunePolicyArgs.side, true) task := NewTask("", log)
pruner, err := jobp.Pruner(task, testPrunePolicyArgs.side, true)
if err != nil { if err != nil {
log.Printf("cannot create test pruner: %s", err) log.Printf("cannot create test pruner: %s", err)
os.Exit(1) os.Exit(1)