From 58ee7963948e68dbbdcf7beb4bcd9ee9a14e99bd Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 26 Dec 2017 19:36:27 +0100 Subject: [PATCH] adopt Task API: infect datastructures refs #10 --- cmd/autosnap.go | 1 + cmd/config.go | 2 +- cmd/config_job_local.go | 26 +++++++++++++++++++++----- cmd/config_job_pull.go | 13 +++++++++---- cmd/config_job_source.go | 22 ++++++++++++++++++---- cmd/prune.go | 1 + cmd/replication.go | 1 + cmd/test.go | 3 ++- 8 files changed, 54 insertions(+), 15 deletions(-) diff --git a/cmd/autosnap.go b/cmd/autosnap.go index 8fce1dd..83368f3 100644 --- a/cmd/autosnap.go +++ b/cmd/autosnap.go @@ -9,6 +9,7 @@ import ( ) type IntervalAutosnap struct { + task *Task DatasetFilter zfs.DatasetFilter Prefix string SnapshotInterval time.Duration diff --git a/cmd/config.go b/cmd/config.go index 5ccc6e1..efa0571 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -63,7 +63,7 @@ type PrunePolicy interface { } type PruningJob interface { - Pruner(side PrunePolicySide, dryRun bool) (Pruner, error) + Pruner(task *Task, side PrunePolicySide, dryRun bool) (Pruner, error) } // A type for constants describing different prune policies of a PruningJob diff --git a/cmd/config_job_local.go b/cmd/config_job_local.go index 75ae3d5..afefdec 100644 --- a/cmd/config_job_local.go +++ b/cmd/config_job_local.go @@ -20,6 +20,10 @@ type LocalJob struct { PruneLHS PrunePolicy PruneRHS PrunePolicy Debug JobDebugSettings + snapperTask *Task + replTask *Task + pruneRHSTask *Task + pruneLHSTask *Task } func parseLocalJob(c JobParsingContext, name string, i map[string]interface{}) (j *LocalJob, err error) { @@ -84,6 +88,11 @@ func (j *LocalJob) JobStart(ctx context.Context) { log := ctx.Value(contextKeyLog).(Logger) defer log.Info("exiting") + j.snapperTask = NewTask("snapshot", log) + j.replTask = NewTask("repl", log) + j.pruneRHSTask = NewTask("prune_rhs", log) + j.pruneLHSTask = NewTask("prune_lhs", log) + local := rpc.NewLocalRPC() // Allow access to any dataset since we control what mapping // is passed to the pull routine. @@ -95,17 +104,18 @@ func (j *LocalJob) JobStart(ctx context.Context) { registerEndpoints(local, handler) snapper := IntervalAutosnap{ + task: j.snapperTask, DatasetFilter: j.Mapping.AsFilter(), Prefix: j.SnapshotPrefix, SnapshotInterval: j.Interval, } - plhs, err := j.Pruner(PrunePolicySideLeft, false) + plhs, err := j.Pruner(j.pruneLHSTask, PrunePolicySideLeft, false) if err != nil { log.WithError(err).Error("error creating lhs pruner") return } - prhs, err := j.Pruner(PrunePolicySideRight, false) + prhs, err := j.Pruner(j.pruneRHSTask, PrunePolicySideRight, false) if err != nil { log.WithError(err).Error("error creating rhs pruner") return @@ -137,7 +147,7 @@ outer: { log := pullCtx.Value(contextKeyLog).(Logger) log.Debug("replicating from lhs to rhs") - puller := Puller{local, log, j.Mapping, j.InitialReplPolicy} + puller := Puller{j.replTask, local, log, j.Mapping, j.InitialReplPolicy} if err := puller.doPull(); err != nil { log.WithError(err).Error("error replicating lhs to rhs") } @@ -174,10 +184,15 @@ outer: } func (j *LocalJob) JobStatus(ctxt context.Context) (*JobStatus, error) { - return &JobStatus{}, nil + return &JobStatus{Tasks: []*TaskStatus{ + j.snapperTask.Status(), + j.pruneLHSTask.Status(), + j.pruneRHSTask.Status(), + j.replTask.Status(), + }}, nil } -func (j *LocalJob) Pruner(side PrunePolicySide, dryRun bool) (p Pruner, err error) { +func (j *LocalJob) Pruner(task *Task, side PrunePolicySide, dryRun bool) (p Pruner, err error) { var dsfilter zfs.DatasetFilter var pp PrunePolicy @@ -198,6 +213,7 @@ func (j *LocalJob) Pruner(side PrunePolicySide, dryRun bool) (p Pruner, err erro } p = Pruner{ + task, time.Now(), dryRun, dsfilter, diff --git a/cmd/config_job_pull.go b/cmd/config_job_pull.go index 4795628..a7abd95 100644 --- a/cmd/config_job_pull.go +++ b/cmd/config_job_pull.go @@ -21,6 +21,8 @@ type PullJob struct { InitialReplPolicy InitialReplPolicy Prune PrunePolicy Debug JobDebugSettings + + task *Task } func parsePullJob(c JobParsingContext, name string, i map[string]interface{}) (j *PullJob, err error) { @@ -95,6 +97,8 @@ func (j *PullJob) JobStart(ctx context.Context) { log := ctx.Value(contextKeyLog).(Logger) defer log.Info("exiting") + j.task = NewTask("main", log) + log = j.task.Log() ticker := time.NewTicker(j.Interval) @@ -120,7 +124,7 @@ start: log.Info("starting pull") pullLog := log.WithField(logTaskField, "pull") - puller := Puller{client, pullLog, j.Mapping, j.InitialReplPolicy} + puller := Puller{j.task, client, pullLog, j.Mapping, j.InitialReplPolicy} if err = puller.doPull(); err != nil { log.WithError(err).Error("error doing pull") } @@ -129,7 +133,7 @@ start: log.Info("starting prune") prunectx := context.WithValue(ctx, contextKeyLog, log.WithField(logTaskField, "prune")) - pruner, err := j.Pruner(PrunePolicySideDefault, false) + pruner, err := j.Pruner(j.task, PrunePolicySideDefault, false) if err != nil { log.WithError(err).Error("error creating pruner") return @@ -150,11 +154,12 @@ start: } func (j *PullJob) JobStatus(ctxt context.Context) (*JobStatus, error) { - return &JobStatus{}, nil + return &JobStatus{Tasks: []*TaskStatus{j.task.Status()}}, nil } -func (j *PullJob) Pruner(side PrunePolicySide, dryRun bool) (p Pruner, err error) { +func (j *PullJob) Pruner(task *Task, side PrunePolicySide, dryRun bool) (p Pruner, err error) { p = Pruner{ + task, time.Now(), dryRun, j.pruneFilter, diff --git a/cmd/config_job_source.go b/cmd/config_job_source.go index 0ed2d55..4717155 100644 --- a/cmd/config_job_source.go +++ b/cmd/config_job_source.go @@ -19,6 +19,9 @@ type SourceJob struct { Interval time.Duration Prune PrunePolicy Debug JobDebugSettings + serveTask *Task + autosnapTask *Task + pruneTask *Task } func parseSourceJob(c JobParsingContext, name string, i map[string]interface{}) (j *SourceJob, err error) { @@ -78,8 +81,13 @@ func (j *SourceJob) JobStart(ctx context.Context) { log := ctx.Value(contextKeyLog).(Logger) defer log.Info("exiting") - a := IntervalAutosnap{DatasetFilter: j.Filesystems, Prefix: j.SnapshotPrefix, SnapshotInterval: j.Interval} - p, err := j.Pruner(PrunePolicySideDefault, false) + j.autosnapTask = NewTask("autosnap", log) + j.pruneTask = NewTask("prune", log) + j.serveTask = NewTask("serve", log) + + a := IntervalAutosnap{j.autosnapTask, j.Filesystems, j.SnapshotPrefix, j.Interval} + p, err := j.Pruner(j.pruneTask, PrunePolicySideDefault, false) + if err != nil { log.WithError(err).Error("error creating pruner") return @@ -109,11 +117,17 @@ outer: } func (j *SourceJob) JobStatus(ctxt context.Context) (*JobStatus, error) { - return &JobStatus{}, nil + return &JobStatus{ + Tasks: []*TaskStatus{ + j.autosnapTask.Status(), + j.pruneTask.Status(), + j.serveTask.Status(), + }}, nil } -func (j *SourceJob) Pruner(side PrunePolicySide, dryRun bool) (p Pruner, err error) { +func (j *SourceJob) Pruner(task *Task, side PrunePolicySide, dryRun bool) (p Pruner, err error) { p = Pruner{ + task, time.Now(), dryRun, j.Filesystems, diff --git a/cmd/prune.go b/cmd/prune.go index a43aac5..9a191aa 100644 --- a/cmd/prune.go +++ b/cmd/prune.go @@ -8,6 +8,7 @@ import ( ) type Pruner struct { + task *Task Now time.Time DryRun bool DatasetFilter zfs.DatasetFilter diff --git a/cmd/replication.go b/cmd/replication.go index 407415f..16fffe8 100644 --- a/cmd/replication.go +++ b/cmd/replication.go @@ -53,6 +53,7 @@ func closeRPCWithTimeout(log Logger, remote rpc.RPCClient, timeout time.Duration } type Puller struct { + task *Task Remote rpc.RPCClient Log Logger Mapping DatasetMapping diff --git a/cmd/test.go b/cmd/test.go index 148719b..7bb4b8d 100644 --- a/cmd/test.go +++ b/cmd/test.go @@ -170,7 +170,8 @@ func doTestPrunePolicy(cmd *cobra.Command, args []string) { log.Printf("job dump:\n%s", pretty.Sprint(jobp)) - pruner, err := jobp.Pruner(testPrunePolicyArgs.side, true) + task := NewTask("", log) + pruner, err := jobp.Pruner(task, testPrunePolicyArgs.side, true) if err != nil { log.Printf("cannot create test pruner: %s", err) os.Exit(1)