From 2c87b15e83c995052edc806a44ac86f6c4d03614 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 25 Dec 2017 00:20:31 +0100 Subject: [PATCH] daemon: Task abstraction + TaskStatus An instance of Task tracks a single thread of activity that is part of a Job. While the docs already use this terminology of tasks being composed of jobs, the code did not have an object to represent these semantics. Now it does: * A task t is initialized with a root activity, which is its name * t can t.Enter() and t.Finish() an activity, building a stack of activities * t's code can get a logger t.Log() whose logTaskField is set to the concatenated stack of activities * t's code can update IO progress it made since leaving idle state * t's code's log output vie t.Log() is captured since leaving idle state * FIXME: find a way to bound that buffer refs #10 refs #48 --- cmd/daemon.go | 229 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 229 insertions(+) diff --git a/cmd/daemon.go b/cmd/daemon.go index c91e689..cd5390e 100644 --- a/cmd/daemon.go +++ b/cmd/daemon.go @@ -1,12 +1,16 @@ package cmd import ( + "container/list" "context" "fmt" "github.com/spf13/cobra" "github.com/zrepl/zrepl/logger" + "io" "os" "os/signal" + "strings" + "sync" "syscall" "time" ) @@ -109,3 +113,228 @@ outer: log.Info("exiting") } + +// Representation of a Task's status +type TaskStatus struct { + Name string + // Whether the task is idle. + Idle bool + // The stack of activities the task is currently executing. + // The first element is the root activity and equal to Name. + ActivityStack []string + // Number of bytes received by the task since it last left idle state. + ProgressRx int64 + // Number of bytes sent by the task since it last left idle state. + ProgressTx int64 + // Log entries emitted by the task since it last left idle state. + // Only contains the log entries emitted through the task's logger + // (provided by Task.Log()). + LogEntries []logger.Entry + // The maximum log level of LogEntries. + // Only valid if len(LogEntries) > 0. + MaxLogLevel logger.Level +} + +// An instance of Task tracks a single thread of activity that is part of a Job. +type Task struct { + // Stack of activities the task is currently in + // Members are instances of taskActivity + activities *list.List + // Protects Task members from modification + rwl sync.RWMutex +} + +// Structure that describes the progress a Task has made +type taskProgress struct { + rx int64 + tx int64 + lastUpdate time.Time + logEntries []logger.Entry + mtx sync.RWMutex +} + +func newTaskProgress() (p *taskProgress) { + return &taskProgress{ + logEntries: make([]logger.Entry, 0), + } +} + +func (p *taskProgress) UpdateIO(drx, dtx int64) { + p.mtx.Lock() + defer p.mtx.Unlock() + p.rx += drx + p.tx += dtx + p.lastUpdate = time.Now() +} + +func (p *taskProgress) UpdateLogEntry(entry logger.Entry) { + p.mtx.Lock() + defer p.mtx.Unlock() + // FIXME: ensure maximum size (issue #48) + p.logEntries = append(p.logEntries, entry) + p.lastUpdate = time.Now() +} + +func (p *taskProgress) DeepCopy() (out taskProgress) { + p.mtx.RLock() + defer p.mtx.RUnlock() + out.rx, out.tx = p.rx, p.tx + out.logEntries = make([]logger.Entry, len(p.logEntries)) + for i := range p.logEntries { + out.logEntries[i] = p.logEntries[i] + } + return +} + +// returns a copy of this taskProgress, the mutex carries no semantic value +func (p *taskProgress) Read() (out taskProgress) { + p.mtx.RLock() + defer p.mtx.RUnlock() + return p.DeepCopy() +} + +// Element of a Task's activity stack +type taskActivity struct { + name string + idle bool + logger *logger.Logger + // The progress of the task that is updated by UpdateIO() and UpdateLogEntry() + // + // Progress happens on a task-level and is thus global to the task. + // That's why progress is just a pointer to the current taskProgress: + // we reset progress when leaving the idle root activity + progress *taskProgress +} + +func NewTask(name string, lg *logger.Logger) *Task { + t := &Task{ + activities: list.New(), + } + rootLogger := lg.ReplaceField(logTaskField, name). + WithOutlet(t, logger.Debug) + rootAct := &taskActivity{name, true, rootLogger, newTaskProgress()} + t.activities.PushFront(rootAct) + return t +} + +// callers must hold t.rwl +func (t *Task) cur() *taskActivity { + return t.activities.Front().Value.(*taskActivity) +} + +// buildActivityStack returns the stack of activity names +// t.rwl must be held, but the slice can be returned since strings are immutable +func (t *Task) buildActivityStack() []string { + comps := make([]string, 0, t.activities.Len()) + for e := t.activities.Back(); e != nil; e = e.Prev() { + act := e.Value.(*taskActivity) + comps = append(comps, act.name) + } + return comps +} + +// Start a sub-activity. +// Must always be matched with a call to t.Finish() +// --- consider using defer for this purpose. +func (t *Task) Enter(activity string) { + t.rwl.Lock() + defer t.rwl.Unlock() + + prev := t.cur() + if prev.idle { + // reset progress when leaving idle task + // we leave the old progress dangling to have the user not worry about + prev.progress = newTaskProgress() + } + act := &taskActivity{activity, false, nil, prev.progress} + t.activities.PushFront(act) + stack := t.buildActivityStack() + activityField := strings.Join(stack, ".") + act.logger = prev.logger.ReplaceField(logTaskField, activityField) + +} + +func (t *Task) UpdateProgress(dtx, drx int64) { + t.rwl.RLock() + p := t.cur().progress // protected by own rwlock + t.rwl.RUnlock() + p.UpdateIO(dtx, drx) +} + +// Returns a wrapper io.Reader that updates this task's _current_ progress value. +// Progress updates after this task resets its progress value are discarded. +func (t *Task) ProgressUpdater(r io.Reader) *IOProgressUpdater { + t.rwl.RLock() + defer t.rwl.RUnlock() + return &IOProgressUpdater{r, t.cur().progress} +} + +func (t *Task) Status() *TaskStatus { + t.rwl.RLock() + defer t.rwl.RUnlock() + // NOTE + // do not return any state in TaskStatus that is protected by t.rwl + + cur := t.cur() + stack := t.buildActivityStack() + prog := cur.progress.Read() + + var maxLevel logger.Level + for _, entry := range prog.logEntries { + if maxLevel < entry.Level { + maxLevel = entry.Level + } + } + + s := &TaskStatus{ + Name: stack[0], + ActivityStack: stack, + Idle: cur.idle, + ProgressRx: prog.rx, + ProgressTx: prog.tx, + LogEntries: prog.logEntries, + MaxLogLevel: maxLevel, + } + + return s +} + +// Finish a sub-activity. +// Corresponds to a preceding call to t.Enter() +func (t *Task) Finish() { + t.rwl.Lock() + defer t.rwl.Unlock() + top := t.activities.Front() + if top.Next() == nil { + return // cannot remove root activity + } + t.activities.Remove(top) + +} + +// Returns a logger derived from the logger passed to the constructor function. +// The logger's task field contains the current activity stack joined by '.'. +func (t *Task) Log() *logger.Logger { + t.rwl.RLock() + defer t.rwl.RUnlock() + return t.cur().logger +} + +// implement logger.Outlet interface +func (t *Task) WriteEntry(ctx context.Context, entry logger.Entry) error { + t.rwl.RLock() + defer t.rwl.RUnlock() + t.cur().progress.UpdateLogEntry(entry) + return nil +} + +type IOProgressUpdater struct { + r io.Reader + p *taskProgress +} + +func (u *IOProgressUpdater) Read(p []byte) (n int, err error) { + n, err = u.r.Read(p) + u.p.UpdateIO(int64(n), 0) + return +}