remove JobStatus, Task abstraction and 'control status' subcommand

Control status will be replaced by job-specific output at some point.

Task was not useful anymore with state machine, may reintroduce
something similar at a later point, but consider alternatives:

- opentracing.io
- embedding everything in ctx
	- activity stack would work easily
	- log entries via proxy logger.Logger object
- progress reporting should be in status reports of individial jobs
This commit is contained in:
Christian Schwarz 2018-08-26 16:49:40 +02:00
parent 7ff72fb6d9
commit a0f72b585b
11 changed files with 100 additions and 672 deletions

View File

@ -9,29 +9,24 @@ import (
)
type IntervalAutosnap struct {
task *Task
DatasetFilter zfs.DatasetFilter
Prefix string
SnapshotInterval time.Duration
}
func (a *IntervalAutosnap) filterFilesystems() (fss []*zfs.DatasetPath, stop bool) {
a.task.Enter("filter_filesystems")
defer a.task.Finish()
func (a *IntervalAutosnap) filterFilesystems(ctx context.Context) (fss []*zfs.DatasetPath, stop bool) {
fss, err := zfs.ZFSListMapping(a.DatasetFilter)
stop = err != nil
if err != nil {
a.task.Log().WithError(err).Error("cannot list datasets")
getLogger(ctx).WithError(err).Error("cannot list datasets")
}
if len(fss) == 0 {
a.task.Log().Warn("no filesystem matching filesystem filter")
getLogger(ctx).Warn("no filesystem matching filesystem filter")
}
return fss, stop
}
func (a *IntervalAutosnap) findSyncPoint(fss []*zfs.DatasetPath) (syncPoint time.Time, err error) {
a.task.Enter("find_sync_point")
defer a.task.Finish()
func (a *IntervalAutosnap) findSyncPoint(log Logger, fss []*zfs.DatasetPath) (syncPoint time.Time, err error) {
type snapTime struct {
ds *zfs.DatasetPath
time time.Time
@ -45,10 +40,10 @@ func (a *IntervalAutosnap) findSyncPoint(fss []*zfs.DatasetPath) (syncPoint time
now := time.Now()
a.task.Log().Debug("examine filesystem state")
log.Debug("examine filesystem state")
for _, d := range fss {
l := a.task.Log().WithField("fs", d.ToString())
l := log.WithField("fs", d.ToString())
fsvs, err := zfs.ZFSListFilesystemVersions(d, NewPrefixFilter(a.Prefix))
if err != nil {
@ -96,76 +91,71 @@ func (a *IntervalAutosnap) findSyncPoint(fss []*zfs.DatasetPath) (syncPoint time
}
func (a *IntervalAutosnap) waitForSyncPoint(ctx context.Context, syncPoint time.Time) {
a.task.Enter("wait_sync_point")
defer a.task.Finish()
const LOG_TIME_FMT string = time.ANSIC
a.task.Log().WithField("sync_point", syncPoint.Format(LOG_TIME_FMT)).
getLogger(ctx).
WithField("sync_point", syncPoint.Format(LOG_TIME_FMT)).
Info("wait for sync point")
select {
case <-ctx.Done():
a.task.Log().WithError(ctx.Err()).Info("context done")
getLogger(ctx).WithError(ctx.Err()).Info("context done")
return
case <-time.After(syncPoint.Sub(time.Now())):
}
}
func (a *IntervalAutosnap) syncUpRun(ctx context.Context, didSnaps chan struct{}) (stop bool) {
a.task.Enter("sync_up")
defer a.task.Finish()
fss, stop := a.filterFilesystems()
fss, stop := a.filterFilesystems(ctx)
if stop {
return true
}
syncPoint, err := a.findSyncPoint(fss)
syncPoint, err := a.findSyncPoint(getLogger(ctx), fss)
if err != nil {
return true
}
a.waitForSyncPoint(ctx, syncPoint)
a.task.Log().Debug("snapshot all filesystems to enable further snaps in lockstep")
a.doSnapshots(didSnaps)
getLogger(ctx).Debug("snapshot all filesystems to enable further snaps in lockstep")
a.doSnapshots(ctx, didSnaps)
return false
}
func (a *IntervalAutosnap) Run(ctx context.Context, didSnaps chan struct{}) {
log := getLogger(ctx)
if a.syncUpRun(ctx, didSnaps) {
a.task.Log().Error("stoppping autosnap after error in sync up")
log.Error("stoppping autosnap after error in sync up")
return
}
// task drops back to idle here
a.task.Log().Debug("setting up ticker in SnapshotInterval")
log.Debug("setting up ticker in SnapshotInterval")
ticker := time.NewTicker(a.SnapshotInterval)
for {
select {
case <-ctx.Done():
ticker.Stop()
a.task.Log().WithError(ctx.Err()).Info("context done")
log.WithError(ctx.Err()).Info("context done")
return
case <-ticker.C:
a.doSnapshots(didSnaps)
a.doSnapshots(ctx, didSnaps)
}
}
}
func (a *IntervalAutosnap) doSnapshots(didSnaps chan struct{}) {
a.task.Enter("do_snapshots")
defer a.task.Finish()
func (a *IntervalAutosnap) doSnapshots(ctx context.Context, didSnaps chan struct{}) {
log := getLogger(ctx)
// don't cache the result from previous run in case the user added
// a new dataset in the meantime
ds, stop := a.filterFilesystems()
ds, stop := a.filterFilesystems(ctx)
if stop {
return
}
@ -175,20 +165,20 @@ func (a *IntervalAutosnap) doSnapshots(didSnaps chan struct{}) {
suffix := time.Now().In(time.UTC).Format("20060102_150405_000")
snapname := fmt.Sprintf("%s%s", a.Prefix, suffix)
l := a.task.Log().
l := log.
WithField("fs", d.ToString()).
WithField("snapname", snapname)
l.Info("create snapshot")
err := zfs.ZFSSnapshot(d, snapname, false)
if err != nil {
a.task.Log().WithError(err).Error("cannot create snapshot")
l.WithError(err).Error("cannot create snapshot")
}
l.Info("create corresponding bookmark")
err = zfs.ZFSBookmark(d, snapname, snapname)
if err != nil {
a.task.Log().WithError(err).Error("cannot create bookmark")
l.WithError(err).Error("cannot create bookmark")
}
}
@ -196,7 +186,7 @@ func (a *IntervalAutosnap) doSnapshots(didSnaps chan struct{}) {
select {
case didSnaps <- struct{}{}:
default:
a.task.Log().Error("warning: callback channel is full, discarding")
log.Error("warning: callback channel is full, discarding")
}
}

View File

@ -57,7 +57,7 @@ type PrunePolicy interface {
}
type PruningJob interface {
Pruner(task *Task, side PrunePolicySide, dryRun bool) (Pruner, error)
Pruner(side PrunePolicySide, dryRun bool) (Pruner, error)
}
// A type for constants describing different prune policies of a PruningJob

View File

@ -34,10 +34,6 @@ func (j *ControlJob) JobName() string {
func (j *ControlJob) JobType() JobType { return JobTypeControl }
func (j *ControlJob) JobStatus(ctx context.Context) (*JobStatus, error) {
return &JobStatus{Tasks: nil}, nil
}
const (
ControlJobEndpointPProf string = "/debug/pprof"
ControlJobEndpointVersion string = "/version"
@ -76,7 +72,7 @@ func (j *ControlJob) JobStart(ctx context.Context) {
}}})
mux.Handle(ControlJobEndpointStatus,
requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) {
return daemon.Status(), nil
panic("FIXME") // FIXME
}}})
mux.Handle("/pulljobreport",
requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) {

View File

@ -20,11 +20,6 @@ type LocalJob struct {
PruneLHS PrunePolicy
PruneRHS PrunePolicy
Debug JobDebugSettings
snapperTask *Task
mainTask *Task
handlerTask *Task
pruneRHSTask *Task
pruneLHSTask *Task
}
func parseLocalJob(c JobParsingContext, name string, i map[string]interface{}) (j *LocalJob, err error) {
@ -84,13 +79,7 @@ func (j *LocalJob) JobType() JobType { return JobTypeLocal }
func (j *LocalJob) JobStart(ctx context.Context) {
rootLog := getLogger(ctx)
j.snapperTask = NewTask("snapshot", j, rootLog)
j.mainTask = NewTask("main", j, rootLog)
j.handlerTask = NewTask("handler", j, rootLog)
j.pruneRHSTask = NewTask("prune_rhs", j, rootLog)
j.pruneLHSTask = NewTask("prune_lhs", j, rootLog)
log := getLogger(ctx)
// Allow access to any dataset since we control what mapping
// is passed to the pull routine.
@ -103,89 +92,67 @@ func (j *LocalJob) JobStart(ctx context.Context) {
receiver, err := endpoint.NewReceiver(j.Mapping, NewPrefixFilter(j.SnapshotPrefix))
if err != nil {
rootLog.WithError(err).Error("unexpected error setting up local handler")
log.WithError(err).Error("unexpected error setting up local handler")
}
snapper := IntervalAutosnap{
task: j.snapperTask,
DatasetFilter: j.Mapping.AsFilter(),
Prefix: j.SnapshotPrefix,
SnapshotInterval: j.Interval,
}
plhs, err := j.Pruner(j.pruneLHSTask, PrunePolicySideLeft, false)
plhs, err := j.Pruner(PrunePolicySideLeft, false)
if err != nil {
rootLog.WithError(err).Error("error creating lhs pruner")
log.WithError(err).Error("error creating lhs pruner")
return
}
prhs, err := j.Pruner(j.pruneRHSTask, PrunePolicySideRight, false)
prhs, err := j.Pruner(PrunePolicySideRight, false)
if err != nil {
rootLog.WithError(err).Error("error creating rhs pruner")
log.WithError(err).Error("error creating rhs pruner")
return
}
didSnaps := make(chan struct{})
go snapper.Run(ctx, didSnaps)
go snapper.Run(WithLogger(ctx, log.WithField(logSubsysField, "snap")), didSnaps)
outer:
for {
select {
case <-ctx.Done():
j.mainTask.Log().WithError(ctx.Err()).Info("context")
log.WithError(ctx.Err()).Info("context")
break outer
case <-didSnaps:
j.mainTask.Log().Debug("finished taking snapshots")
j.mainTask.Log().Info("starting replication procedure")
log.Debug("finished taking snapshots")
log.Info("starting replication procedure")
}
j.mainTask.Log().Debug("replicating from lhs to rhs")
j.mainTask.Enter("replicate")
rep := replication.NewReplication()
rep.Drive(ctx, sender, receiver)
j.mainTask.Finish()
// use a ctx as soon as Pull gains ctx support
select {
case <-ctx.Done():
break outer
default:
{
ctx := WithLogger(ctx, log.WithField(logSubsysField, "replication"))
rep := replication.NewReplication()
rep.Drive(ctx, sender, receiver)
}
var wg sync.WaitGroup
j.mainTask.Log().Info("pruning lhs")
wg.Add(1)
go func() {
plhs.Run(ctx)
plhs.Run(WithLogger(ctx, log.WithField(logSubsysField, "prune_lhs")))
wg.Done()
}()
j.mainTask.Log().Info("pruning rhs")
wg.Add(1)
go func() {
prhs.Run(ctx)
prhs.Run(WithLogger(ctx, log.WithField(logSubsysField, "prune_rhs")))
wg.Done()
}()
wg.Wait()
}
}
func (j *LocalJob) JobStatus(ctxt context.Context) (*JobStatus, error) {
return &JobStatus{Tasks: []*TaskStatus{
j.snapperTask.Status(),
j.pruneLHSTask.Status(),
j.pruneRHSTask.Status(),
j.mainTask.Status(),
}}, nil
}
func (j *LocalJob) Pruner(task *Task, side PrunePolicySide, dryRun bool) (p Pruner, err error) {
func (j *LocalJob) Pruner(side PrunePolicySide, dryRun bool) (p Pruner, err error) {
var dsfilter zfs.DatasetFilter
var pp PrunePolicy
@ -206,7 +173,6 @@ func (j *LocalJob) Pruner(task *Task, side PrunePolicySide, dryRun bool) (p Prun
}
p = Pruner{
task,
time.Now(),
dryRun,
dsfilter,

View File

@ -70,8 +70,6 @@ func (j *PrometheusJob) JobStart(ctx context.Context) {
}
log := getLogger(ctx)
task := NewTask("main", j, log)
log = task.Log()
l, err := net.Listen("tcp", j.Listen)
if err != nil {
@ -94,6 +92,3 @@ func (j *PrometheusJob) JobStart(ctx context.Context) {
}
func (*PrometheusJob) JobStatus(ctxt context.Context) (*JobStatus, error) {
return &JobStatus{}, nil
}

View File

@ -26,7 +26,6 @@ type PullJob struct {
Prune PrunePolicy
Debug JobDebugSettings
task *Task
rep *replication.Replication
}
@ -107,7 +106,6 @@ func (j *PullJob) JobStart(ctx context.Context) {
log := getLogger(ctx)
defer log.Info("exiting")
j.task = NewTask("main", j, log)
// j.task is idle here idle here
usr1 := make(chan os.Signal)
@ -120,14 +118,14 @@ func (j *PullJob) JobStart(ctx context.Context) {
j.doRun(ctx)
duration := time.Now().Sub(begin)
if duration > j.Interval {
j.task.Log().
log.
WithField("actual_duration", duration).
WithField("configured_interval", j.Interval).
Warn("pull run took longer than configured interval")
}
select {
case <-ctx.Done():
j.task.Log().WithError(ctx.Err()).Info("context")
log.WithError(ctx.Err()).Info("context")
return
case <-ticker.C:
case <-usr1:
@ -150,9 +148,7 @@ var STREAMRPC_CONFIG = &streamrpc.ConnConfig{ // FIXME oversight and configurabi
func (j *PullJob) doRun(ctx context.Context) {
j.task.Enter("run")
defer j.task.Finish()
log := getLogger(ctx)
// FIXME
clientConf := &streamrpc.ClientConfig{
ConnConfig: STREAMRPC_CONFIG,
@ -161,8 +157,6 @@ func (j *PullJob) doRun(ctx context.Context) {
client, err := streamrpc.NewClient(j.Connect, clientConf)
defer client.Close()
j.task.Enter("pull")
sender := endpoint.NewRemote(client)
puller, err := endpoint.NewReceiver(
@ -170,43 +164,37 @@ func (j *PullJob) doRun(ctx context.Context) {
NewPrefixFilter(j.SnapshotPrefix),
)
if err != nil {
j.task.Log().WithError(err).Error("error creating receiver endpoint")
j.task.Finish()
log.WithError(err).Error("error creating receiver endpoint")
return
}
ctx = replication.WithLogger(ctx, replicationLogAdaptor{j.task.Log().WithField(logSubsysField, "replication")})
ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{j.task.Log().WithField(logSubsysField, "rpc.protocol")})
ctx = endpoint.WithLogger(ctx, j.task.Log().WithField(logSubsysField, "rpc.endpoint"))
j.rep = replication.NewReplication()
j.rep.Drive(ctx, sender, puller)
{
ctx := replication.WithLogger(ctx, replicationLogAdaptor{log.WithField(logSubsysField, "replication")})
ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{log.WithField(logSubsysField, "rpc")})
ctx = endpoint.WithLogger(ctx, log.WithField(logSubsysField, "endpoint"))
j.rep = replication.NewReplication()
j.rep.Drive(ctx, sender, puller)
}
client.Close()
j.task.Finish()
j.task.Enter("prune")
pruner, err := j.Pruner(j.task, PrunePolicySideDefault, false)
if err != nil {
j.task.Log().WithError(err).Error("error creating pruner")
} else {
pruner.Run(ctx)
{
ctx := WithLogger(ctx, log.WithField(logSubsysField, "prune"))
pruner, err := j.Pruner(PrunePolicySideDefault, false)
if err != nil {
log.WithError(err).Error("error creating pruner")
} else {
pruner.Run(ctx)
}
}
j.task.Finish()
}
func (j *PullJob) Report() *replication.Report {
return j.rep.Report()
}
func (j *PullJob) JobStatus(ctxt context.Context) (*JobStatus, error) {
return &JobStatus{Tasks: []*TaskStatus{j.task.Status()}}, nil
}
func (j *PullJob) Pruner(task *Task, side PrunePolicySide, dryRun bool) (p Pruner, err error) {
func (j *PullJob) Pruner(side PrunePolicySide, dryRun bool) (p Pruner, err error) {
p = Pruner{
task,
time.Now(),
dryRun,
j.pruneFilter,

View File

@ -19,9 +19,6 @@ type SourceJob struct {
Interval time.Duration
Prune PrunePolicy
Debug JobDebugSettings
serveTask *Task
autosnapTask *Task
pruneTask *Task
}
func parseSourceJob(c JobParsingContext, name string, i map[string]interface{}) (j *SourceJob, err error) {
@ -92,12 +89,8 @@ func (j *SourceJob) JobStart(ctx context.Context) {
log := getLogger(ctx)
defer log.Info("exiting")
j.autosnapTask = NewTask("autosnap", j, log)
j.pruneTask = NewTask("prune", j, log)
j.serveTask = NewTask("serve", j, log)
a := IntervalAutosnap{j.autosnapTask, j.Filesystems, j.SnapshotPrefix, j.Interval}
p, err := j.Pruner(j.pruneTask, PrunePolicySideDefault, false)
a := IntervalAutosnap{j.Filesystems, j.SnapshotPrefix, j.Interval}
p, err := j.Pruner(PrunePolicySideDefault, false)
if err != nil {
log.WithError(err).Error("error creating pruner")
@ -106,8 +99,8 @@ func (j *SourceJob) JobStart(ctx context.Context) {
didSnaps := make(chan struct{})
go j.serve(ctx, j.serveTask)
go a.Run(ctx, didSnaps)
go j.serve(ctx) // logSubsysField set by handleConnection
go a.Run(WithLogger(ctx, log.WithField(logSubsysField, "snap")), didSnaps)
outer:
for {
@ -115,27 +108,15 @@ outer:
case <-ctx.Done():
break outer
case <-didSnaps:
log.Info("starting pruner")
p.Run(ctx)
log.Info("pruner done")
p.Run(WithLogger(ctx, log.WithField(logSubsysField, "prune")))
}
}
log.WithError(ctx.Err()).Info("context")
}
func (j *SourceJob) JobStatus(ctxt context.Context) (*JobStatus, error) {
return &JobStatus{
Tasks: []*TaskStatus{
j.autosnapTask.Status(),
j.pruneTask.Status(),
j.serveTask.Status(),
}}, nil
}
func (j *SourceJob) Pruner(task *Task, side PrunePolicySide, dryRun bool) (p Pruner, err error) {
func (j *SourceJob) Pruner(side PrunePolicySide, dryRun bool) (p Pruner, err error) {
p = Pruner{
task,
time.Now(),
dryRun,
j.Filesystems,
@ -145,11 +126,13 @@ func (j *SourceJob) Pruner(task *Task, side PrunePolicySide, dryRun bool) (p Pru
return
}
func (j *SourceJob) serve(ctx context.Context, task *Task) {
func (j *SourceJob) serve(ctx context.Context) {
log := getLogger(ctx)
listener, err := j.Serve.Listen()
if err != nil {
task.Log().WithError(err).Error("error listening")
getLogger(ctx).WithError(err).Error("error listening")
return
}
@ -173,48 +156,40 @@ outer:
case rwcMsg := <-connChan:
if rwcMsg.err != nil {
task.Log().WithError(rwcMsg.err).Error("error accepting connection")
log.WithError(rwcMsg.err).Error("error accepting connection")
continue
}
j.handleConnection(rwcMsg.conn, task)
j.handleConnection(ctx, rwcMsg.conn)
case <-ctx.Done():
task.Log().WithError(ctx.Err()).Info("context")
log.WithError(ctx.Err()).Info("context")
break outer
}
}
task.Log().Info("closing listener")
task.Enter("close_listener")
defer task.Finish()
log.Info("closing listener")
err = listener.Close()
if err != nil {
task.Log().WithError(err).Error("error closing listener")
log.WithError(err).Error("error closing listener")
}
return
}
func (j *SourceJob) handleConnection(conn net.Conn, task *Task) {
task.Enter("handle_connection")
defer task.Finish()
task.Log().Info("handling client connection")
func (j *SourceJob) handleConnection(ctx context.Context, conn net.Conn) {
log := getLogger(ctx)
log.Info("handling client connection")
senderEP := endpoint.NewSender(j.Filesystems, NewPrefixFilter(j.SnapshotPrefix))
ctx := context.Background()
ctx = endpoint.WithLogger(ctx, task.Log().WithField(logSubsysField, "rpc.endpoint"))
ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{task.Log().WithField(logSubsysField, "rpc.protocol")})
ctx = endpoint.WithLogger(ctx, log.WithField(logSubsysField, "serve"))
ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{log.WithField(logSubsysField, "rpc")})
handler := endpoint.NewHandler(senderEP)
if err := streamrpc.ServeConn(ctx, conn, STREAMRPC_CONFIG, handler.Handle); err != nil {
task.Log().WithError(err).Error("error serving connection")
log.WithError(err).Error("error serving connection")
} else {
task.Log().Info("client closed connection")
log.Info("client closed connection")
}
}

View File

@ -5,7 +5,6 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/dustin/go-humanize"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/zrepl/zrepl/logger"
@ -14,9 +13,6 @@ import (
"net"
"net/http"
"os"
"sort"
"strings"
"time"
)
var controlCmd = &cobra.Command{
@ -67,20 +63,11 @@ var controlStatusCmdArgs struct {
onlyShowJob string
}
var controlStatusCmd = &cobra.Command{
Use: "status [JOB_NAME]",
Short: "get current status",
Run: doControlStatusCmd,
}
func init() {
RootCmd.AddCommand(controlCmd)
controlCmd.AddCommand(pprofCmd)
controlCmd.AddCommand(controlVersionCmd)
controlCmd.AddCommand(controlStatusCmd)
controlStatusCmd.Flags().StringVar(&controlStatusCmdArgs.format, "format", "human", "output format (human|raw)")
controlStatusCmdArgs.level = logger.Warn
controlStatusCmd.Flags().Var(&controlStatusCmdArgs.level, "level", "minimum log level to show")
}
func controlHttpClient() (client http.Client, err error) {
@ -165,141 +152,3 @@ func doControLVersionCmd(cmd *cobra.Command, args []string) {
fmt.Println(info.String())
}
func doControlStatusCmd(cmd *cobra.Command, args []string) {
log := golog.New(os.Stderr, "", 0)
die := func() {
log.Print("exiting after error")
os.Exit(1)
}
if len(args) == 1 {
controlStatusCmdArgs.onlyShowJob = args[0]
} else if len(args) > 1 {
log.Print("can only specify one job as positional argument")
cmd.Usage()
die()
}
httpc, err := controlHttpClient()
if err != nil {
log.Printf("could not connect to daemon: %s", err)
die()
}
resp, err := httpc.Get("http://unix" + ControlJobEndpointStatus)
if err != nil {
log.Printf("error: %s", err)
die()
} else if resp.StatusCode != http.StatusOK {
var msg bytes.Buffer
io.CopyN(&msg, resp.Body, 4096)
log.Printf("error: %s", msg.String())
die()
}
var status DaemonStatus
err = json.NewDecoder(resp.Body).Decode(&status)
if err != nil {
log.Printf("error unmarshaling response: %s", err)
die()
}
switch controlStatusCmdArgs.format {
case "raw":
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
if err := enc.Encode(status); err != nil {
log.Panic(err)
}
case "human":
formatter := HumanFormatter{}
formatter.SetMetadataFlags(MetadataAll)
formatter.SetIgnoreFields([]string{
logJobField,
})
jobNames := make([]string, 0, len(status.Jobs))
for name, _ := range status.Jobs {
jobNames = append(jobNames, name)
}
sort.Slice(jobNames, func(i, j int) bool {
return strings.Compare(jobNames[i], jobNames[j]) == -1
})
now := time.Now()
for _, name := range jobNames {
if controlStatusCmdArgs.onlyShowJob != "" && name != controlStatusCmdArgs.onlyShowJob {
continue
}
job := status.Jobs[name]
jobLogEntries := make([]logger.Entry, 0)
informAboutError := false
fmt.Printf("Job '%s':\n", name)
for _, task := range job.Tasks {
var header bytes.Buffer
fmt.Fprintf(&header, " Task '%s': ", task.Name)
if !task.Idle {
fmt.Fprint(&header, strings.Join(task.ActivityStack, "."))
} else {
fmt.Fprint(&header, "<idle>")
}
fmt.Fprint(&header, " ")
const TASK_STALLED_HOLDOFF_DURATION = 10 * time.Second
sinceLastUpdate := now.Sub(task.LastUpdate)
if !task.Idle || task.ProgressRx != 0 || task.ProgressTx != 0 {
fmt.Fprintf(&header, "(%s / %s , Rx/Tx",
humanize.Bytes(uint64(task.ProgressRx)),
humanize.Bytes(uint64(task.ProgressTx)))
if task.Idle {
fmt.Fprint(&header, ", values from last run")
}
fmt.Fprint(&header, ")")
}
fmt.Fprint(&header, "\n")
if !task.Idle && !task.LastUpdate.IsZero() && sinceLastUpdate >= TASK_STALLED_HOLDOFF_DURATION {
informAboutError = true
fmt.Fprintf(&header, " WARNING: last update %s ago at %s)",
sinceLastUpdate.String(),
task.LastUpdate.Format(HumanFormatterDateFormat))
fmt.Fprint(&header, "\n")
}
io.Copy(os.Stdout, &header)
jobLogEntries = append(jobLogEntries, task.LogEntries...)
informAboutError = informAboutError || task.MaxLogLevel >= logger.Warn
}
sort.Slice(jobLogEntries, func(i, j int) bool {
return jobLogEntries[i].Time.Before(jobLogEntries[j].Time)
})
if informAboutError {
fmt.Println(" WARNING: Some tasks encountered problems since the last time they left idle state:")
fmt.Println(" check the logs below or your log file for more information.")
fmt.Println(" Use the --level flag if you need debug information.")
fmt.Println()
}
for _, e := range jobLogEntries {
if e.Level < controlStatusCmdArgs.level {
continue
}
formatted, err := formatter.Format(&e)
if err != nil {
panic(err)
}
fmt.Printf(" %s\n", string(formatted))
}
fmt.Println()
}
default:
log.Printf("invalid output format '%s'", controlStatusCmdArgs.format)
die()
}
}

View File

@ -1,16 +1,13 @@
package cmd
import (
"container/list"
"context"
"fmt"
"github.com/spf13/cobra"
"github.com/zrepl/zrepl/logger"
"io"
"os"
"os/signal"
//"strings"
"sync"
"syscall"
"time"
)
@ -30,7 +27,6 @@ type Job interface {
JobName() string
JobType() JobType
JobStart(ctxt context.Context)
JobStatus(ctxt context.Context) (*JobStatus, error)
}
type JobType string
@ -155,315 +151,3 @@ outer:
log.Info("exiting")
}
// Representation of a Job's status that is composed of Tasks
type JobStatus struct {
// Statuses of all tasks of this job
Tasks []*TaskStatus
// Error != "" if JobStatus() returned an error
JobStatusError string
}
// Representation of a Daemon's status that is composed of Jobs
type DaemonStatus struct {
StartedAt time.Time
Jobs map[string]*JobStatus
}
func (d *Daemon) Status() (s *DaemonStatus) {
s = &DaemonStatus{}
s.StartedAt = d.startedAt
s.Jobs = make(map[string]*JobStatus, len(d.conf.Jobs))
for name, j := range d.conf.Jobs {
status, err := j.JobStatus(context.TODO())
if err != nil {
s.Jobs[name] = &JobStatus{nil, err.Error()}
continue
}
s.Jobs[name] = status
}
return
}
// Representation of a Task's status
type TaskStatus struct {
Name string
// Whether the task is idle.
Idle bool
// The stack of activities the task is currently executing.
// The first element is the root activity and equal to Name.
ActivityStack []string
// Number of bytes received by the task since it last left idle state.
ProgressRx int64
// Number of bytes sent by the task since it last left idle state.
ProgressTx int64
// Log entries emitted by the task since it last left idle state.
// Only contains the log entries emitted through the task's logger
// (provided by Task.Log()).
LogEntries []logger.Entry
// The maximum log level of LogEntries.
// Only valid if len(LogEntries) > 0.
MaxLogLevel logger.Level
// Last time something about the Task changed
LastUpdate time.Time
}
// An instance of Task tracks a single thread of activity that is part of a Job.
type Task struct {
name string // immutable
parent Job // immutable
// Stack of activities the task is currently in
// Members are instances of taskActivity
activities *list.List
// Last time activities was changed (not the activities inside, the list)
activitiesLastUpdate time.Time
// Protects Task members from modification
rwl sync.RWMutex
}
// Structure that describes the progress a Task has made
type taskProgress struct {
rx int64
tx int64
creation time.Time
lastUpdate time.Time
logEntries []logger.Entry
mtx sync.RWMutex
}
func newTaskProgress() (p *taskProgress) {
return &taskProgress{
creation: time.Now(),
logEntries: make([]logger.Entry, 0),
}
}
func (p *taskProgress) UpdateIO(drx, dtx int64) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.rx += drx
p.tx += dtx
p.lastUpdate = time.Now()
}
func (p *taskProgress) UpdateLogEntry(entry logger.Entry) {
p.mtx.Lock()
defer p.mtx.Unlock()
// FIXME: ensure maximum size (issue #48)
p.logEntries = append(p.logEntries, entry)
p.lastUpdate = time.Now()
}
func (p *taskProgress) DeepCopy() (out taskProgress) {
p.mtx.RLock()
defer p.mtx.RUnlock()
out.rx, out.tx = p.rx, p.tx
out.creation = p.creation
out.lastUpdate = p.lastUpdate
out.logEntries = make([]logger.Entry, len(p.logEntries))
for i := range p.logEntries {
out.logEntries[i] = p.logEntries[i]
}
return
}
// returns a copy of this taskProgress, the mutex carries no semantic value
func (p *taskProgress) Read() (out taskProgress) {
p.mtx.RLock()
defer p.mtx.RUnlock()
return p.DeepCopy()
}
// Element of a Task's activity stack
type taskActivity struct {
name string
idle bool
logger logger.Logger
// The progress of the task that is updated by UpdateIO() and UpdateLogEntry()
//
// Progress happens on a task-level and is thus global to the task.
// That's why progress is just a pointer to the current taskProgress:
// we reset progress when leaving the idle root activity
progress *taskProgress
}
func NewTask(name string, parent Job, lg logger.Logger) *Task {
t := &Task{
name: name,
parent: parent,
activities: list.New(),
}
rootLogger := lg.ReplaceField(logTaskField, name).
WithOutlet(t, logger.Debug)
rootAct := &taskActivity{name, true, rootLogger, newTaskProgress()}
t.activities.PushFront(rootAct)
return t
}
// callers must hold t.rwl
func (t *Task) cur() *taskActivity {
return t.activities.Front().Value.(*taskActivity)
}
// buildActivityStack returns the stack of activity names
// t.rwl must be held, but the slice can be returned since strings are immutable
func (t *Task) buildActivityStack() []string {
comps := make([]string, 0, t.activities.Len())
for e := t.activities.Back(); e != nil; e = e.Prev() {
act := e.Value.(*taskActivity)
comps = append(comps, act.name)
}
return comps
}
// Start a sub-activity.
// Must always be matched with a call to t.Finish()
// --- consider using defer for this purpose.
func (t *Task) Enter(activity string) {
t.rwl.Lock()
defer t.rwl.Unlock()
prev := t.cur()
if prev.idle {
// reset progress when leaving idle task
// we leave the old progress dangling to have the user not worry about
prev.progress = newTaskProgress()
prom.taskLastActiveStart.WithLabelValues(
t.parent.JobName(),
t.parent.JobType().String(),
t.name).
Set(float64(prev.progress.creation.UnixNano()) / 1e9)
}
act := &taskActivity{activity, false, nil, prev.progress}
t.activities.PushFront(act)
//stack := t.buildActivityStack()
//activityField := strings.Join(stack, ".")
act.logger = prev.logger
// act.logger = prev.logger.ReplaceField(logTaskField, activityField)
t.activitiesLastUpdate = time.Now()
}
func (t *Task) UpdateProgress(dtx, drx int64) {
t.rwl.RLock()
p := t.cur().progress // protected by own rwlock
t.rwl.RUnlock()
p.UpdateIO(dtx, drx)
}
// Returns a wrapper io.Reader that updates this task's _current_ progress value.
// Progress updates after this task resets its progress value are discarded.
func (t *Task) ProgressUpdater(r io.Reader) *IOProgressUpdater {
t.rwl.RLock()
defer t.rwl.RUnlock()
return &IOProgressUpdater{r, t.cur().progress}
}
func (t *Task) Status() *TaskStatus {
t.rwl.RLock()
defer t.rwl.RUnlock()
// NOTE
// do not return any state in TaskStatus that is protected by t.rwl
cur := t.cur()
stack := t.buildActivityStack()
prog := cur.progress.Read()
var maxLevel logger.Level
for _, entry := range prog.logEntries {
if maxLevel < entry.Level {
maxLevel = entry.Level
}
}
lastUpdate := prog.lastUpdate
if lastUpdate.Before(t.activitiesLastUpdate) {
lastUpdate = t.activitiesLastUpdate
}
s := &TaskStatus{
Name: stack[0],
ActivityStack: stack,
Idle: cur.idle,
ProgressRx: prog.rx,
ProgressTx: prog.tx,
LogEntries: prog.logEntries,
MaxLogLevel: maxLevel,
LastUpdate: lastUpdate,
}
return s
}
// Finish a sub-activity.
// Corresponds to a preceding call to t.Enter()
func (t *Task) Finish() {
t.rwl.Lock()
defer t.rwl.Unlock()
top := t.activities.Front()
if top.Next() == nil {
return // cannot remove root activity
}
t.activities.Remove(top)
t.activitiesLastUpdate = time.Now()
// prometheus
front := t.activities.Front()
if front != nil && front == t.activities.Back() {
idleAct := front.Value.(*taskActivity)
if !idleAct.idle {
panic("inconsistent implementation")
}
progress := idleAct.progress.Read()
non_idle_time := t.activitiesLastUpdate.Sub(progress.creation) // use same time
prom.taskLastActiveDuration.WithLabelValues(
t.parent.JobName(),
t.parent.JobType().String(),
t.name).Set(non_idle_time.Seconds())
}
}
// Returns a logger derived from the logger passed to the constructor function.
// The logger's task field contains the current activity stack joined by '.'.
func (t *Task) Log() logger.Logger {
t.rwl.RLock()
defer t.rwl.RUnlock()
// FIXME should influence TaskStatus's LastUpdate field
return t.cur().logger
}
// implement logger.Outlet interface
func (t *Task) WriteEntry(entry logger.Entry) error {
t.rwl.RLock()
defer t.rwl.RUnlock()
t.cur().progress.UpdateLogEntry(entry)
prom.taskLogEntries.WithLabelValues(
t.parent.JobName(),
t.parent.JobType().String(),
t.name,
entry.Level.String()).
Inc()
return nil
}
type IOProgressUpdater struct {
r io.Reader
p *taskProgress
}
func (u *IOProgressUpdater) Read(p []byte) (n int, err error) {
n, err = u.r.Read(p)
u.p.UpdateIO(int64(n), 0)
return
}

View File

@ -8,7 +8,6 @@ import (
)
type Pruner struct {
task *Task
Now time.Time
DryRun bool
DatasetFilter zfs.DatasetFilter
@ -23,25 +22,21 @@ type PruneResult struct {
Remove []zfs.FilesystemVersion
}
func (p *Pruner) filterFilesystems() (filesystems []*zfs.DatasetPath, stop bool) {
p.task.Enter("filter_fs")
defer p.task.Finish()
func (p *Pruner) filterFilesystems(ctx context.Context) (filesystems []*zfs.DatasetPath, stop bool) {
filesystems, err := zfs.ZFSListMapping(p.DatasetFilter)
if err != nil {
p.task.Log().WithError(err).Error("error applying filesystem filter")
getLogger(ctx).WithError(err).Error("error applying filesystem filter")
return nil, true
}
if len(filesystems) <= 0 {
p.task.Log().Info("no filesystems matching filter")
getLogger(ctx).Info("no filesystems matching filter")
return nil, true
}
return filesystems, false
}
func (p *Pruner) filterVersions(fs *zfs.DatasetPath) (fsversions []zfs.FilesystemVersion, stop bool) {
p.task.Enter("filter_versions")
defer p.task.Finish()
log := p.task.Log().WithField("fs", fs.ToString())
func (p *Pruner) filterVersions(ctx context.Context, fs *zfs.DatasetPath) (fsversions []zfs.FilesystemVersion, stop bool) {
log := getLogger(ctx).WithField("fs", fs.ToString())
filter := NewPrefixFilter(p.SnapshotPrefix)
fsversions, err := zfs.ZFSListFilesystemVersions(fs, filter)
@ -56,19 +51,15 @@ func (p *Pruner) filterVersions(fs *zfs.DatasetPath) (fsversions []zfs.Filesyste
return fsversions, false
}
func (p *Pruner) pruneFilesystem(fs *zfs.DatasetPath) (r PruneResult, valid bool) {
p.task.Enter("prune_fs")
defer p.task.Finish()
log := p.task.Log().WithField("fs", fs.ToString())
func (p *Pruner) pruneFilesystem(ctx context.Context, fs *zfs.DatasetPath) (r PruneResult, valid bool) {
log := getLogger(ctx).WithField("fs", fs.ToString())
fsversions, stop := p.filterVersions(fs)
fsversions, stop := p.filterVersions(ctx, fs)
if stop {
return
}
p.task.Enter("prune_policy")
keep, remove, err := p.PrunePolicy.Prune(fs, fsversions)
p.task.Finish()
if err != nil {
log.WithError(err).Error("error evaluating prune policy")
return
@ -100,9 +91,7 @@ func (p *Pruner) pruneFilesystem(fs *zfs.DatasetPath) (r PruneResult, valid bool
// TODO special handling for EBUSY (zfs hold)
// TODO error handling for clones? just echo to cli, skip over, and exit with non-zero status code (we're idempotent)
if !p.DryRun {
p.task.Enter("destroy")
err := zfs.ZFSDestroyFilesystemVersion(fs, v)
p.task.Finish()
if err != nil {
log.WithFields(fields).WithError(err).Error("error destroying version")
}
@ -112,14 +101,11 @@ func (p *Pruner) pruneFilesystem(fs *zfs.DatasetPath) (r PruneResult, valid bool
}
func (p *Pruner) Run(ctx context.Context) (r []PruneResult, err error) {
p.task.Enter("run")
defer p.task.Finish()
if p.DryRun {
p.task.Log().Info("doing dry run")
getLogger(ctx).Info("doing dry run")
}
filesystems, stop := p.filterFilesystems()
filesystems, stop := p.filterFilesystems(ctx)
if stop {
return
}
@ -127,7 +113,7 @@ func (p *Pruner) Run(ctx context.Context) (r []PruneResult, err error) {
r = make([]PruneResult, 0, len(filesystems))
for _, fs := range filesystems {
res, ok := p.pruneFilesystem(fs)
res, ok := p.pruneFilesystem(ctx, fs)
if ok {
r = append(r, res)
}

View File

@ -170,8 +170,7 @@ func doTestPrunePolicy(cmd *cobra.Command, args []string) {
log.Printf("job dump:\n%s", pretty.Sprint(jobp))
task := NewTask("", jobi, log)
pruner, err := jobp.Pruner(task, testPrunePolicyArgs.side, true)
pruner, err := jobp.Pruner(testPrunePolicyArgs.side, true)
if err != nil {
log.Printf("cannot create test pruner: %s", err)
os.Exit(1)