mirror of
https://github.com/zrepl/zrepl.git
synced 2025-02-02 03:29:13 +01:00
daemon: Task abstraction + TaskStatus
An instance of Task tracks a single thread of activity that is part of a Job. While the docs already use this terminology of tasks being composed of jobs, the code did not have an object to represent these semantics. Now it does: * A task t is initialized with a root activity, which is its name * t can t.Enter() and t.Finish() an activity, building a stack of activities * t's code can get a logger t.Log() whose logTaskField is set to the concatenated stack of activities * t's code can update IO progress it made since leaving idle state * t's code's log output vie t.Log() is captured since leaving idle state * FIXME: find a way to bound that buffer refs #10 refs #48
This commit is contained in:
parent
d7f3fb93ae
commit
2c87b15e83
229
cmd/daemon.go
229
cmd/daemon.go
@ -1,12 +1,16 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/zrepl/zrepl/logger"
|
||||
"io"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
@ -109,3 +113,228 @@ outer:
|
||||
log.Info("exiting")
|
||||
|
||||
}
|
||||
|
||||
// Representation of a Task's status
|
||||
type TaskStatus struct {
|
||||
Name string
|
||||
// Whether the task is idle.
|
||||
Idle bool
|
||||
// The stack of activities the task is currently executing.
|
||||
// The first element is the root activity and equal to Name.
|
||||
ActivityStack []string
|
||||
// Number of bytes received by the task since it last left idle state.
|
||||
ProgressRx int64
|
||||
// Number of bytes sent by the task since it last left idle state.
|
||||
ProgressTx int64
|
||||
// Log entries emitted by the task since it last left idle state.
|
||||
// Only contains the log entries emitted through the task's logger
|
||||
// (provided by Task.Log()).
|
||||
LogEntries []logger.Entry
|
||||
// The maximum log level of LogEntries.
|
||||
// Only valid if len(LogEntries) > 0.
|
||||
MaxLogLevel logger.Level
|
||||
}
|
||||
|
||||
// An instance of Task tracks a single thread of activity that is part of a Job.
|
||||
type Task struct {
|
||||
// Stack of activities the task is currently in
|
||||
// Members are instances of taskActivity
|
||||
activities *list.List
|
||||
// Protects Task members from modification
|
||||
rwl sync.RWMutex
|
||||
}
|
||||
|
||||
// Structure that describes the progress a Task has made
|
||||
type taskProgress struct {
|
||||
rx int64
|
||||
tx int64
|
||||
lastUpdate time.Time
|
||||
logEntries []logger.Entry
|
||||
mtx sync.RWMutex
|
||||
}
|
||||
|
||||
func newTaskProgress() (p *taskProgress) {
|
||||
return &taskProgress{
|
||||
logEntries: make([]logger.Entry, 0),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *taskProgress) UpdateIO(drx, dtx int64) {
|
||||
p.mtx.Lock()
|
||||
defer p.mtx.Unlock()
|
||||
p.rx += drx
|
||||
p.tx += dtx
|
||||
p.lastUpdate = time.Now()
|
||||
}
|
||||
|
||||
func (p *taskProgress) UpdateLogEntry(entry logger.Entry) {
|
||||
p.mtx.Lock()
|
||||
defer p.mtx.Unlock()
|
||||
// FIXME: ensure maximum size (issue #48)
|
||||
p.logEntries = append(p.logEntries, entry)
|
||||
p.lastUpdate = time.Now()
|
||||
}
|
||||
|
||||
func (p *taskProgress) DeepCopy() (out taskProgress) {
|
||||
p.mtx.RLock()
|
||||
defer p.mtx.RUnlock()
|
||||
out.rx, out.tx = p.rx, p.tx
|
||||
out.logEntries = make([]logger.Entry, len(p.logEntries))
|
||||
for i := range p.logEntries {
|
||||
out.logEntries[i] = p.logEntries[i]
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// returns a copy of this taskProgress, the mutex carries no semantic value
|
||||
func (p *taskProgress) Read() (out taskProgress) {
|
||||
p.mtx.RLock()
|
||||
defer p.mtx.RUnlock()
|
||||
return p.DeepCopy()
|
||||
}
|
||||
|
||||
// Element of a Task's activity stack
|
||||
type taskActivity struct {
|
||||
name string
|
||||
idle bool
|
||||
logger *logger.Logger
|
||||
// The progress of the task that is updated by UpdateIO() and UpdateLogEntry()
|
||||
//
|
||||
// Progress happens on a task-level and is thus global to the task.
|
||||
// That's why progress is just a pointer to the current taskProgress:
|
||||
// we reset progress when leaving the idle root activity
|
||||
progress *taskProgress
|
||||
}
|
||||
|
||||
func NewTask(name string, lg *logger.Logger) *Task {
|
||||
t := &Task{
|
||||
activities: list.New(),
|
||||
}
|
||||
rootLogger := lg.ReplaceField(logTaskField, name).
|
||||
WithOutlet(t, logger.Debug)
|
||||
rootAct := &taskActivity{name, true, rootLogger, newTaskProgress()}
|
||||
t.activities.PushFront(rootAct)
|
||||
return t
|
||||
}
|
||||
|
||||
// callers must hold t.rwl
|
||||
func (t *Task) cur() *taskActivity {
|
||||
return t.activities.Front().Value.(*taskActivity)
|
||||
}
|
||||
|
||||
// buildActivityStack returns the stack of activity names
|
||||
// t.rwl must be held, but the slice can be returned since strings are immutable
|
||||
func (t *Task) buildActivityStack() []string {
|
||||
comps := make([]string, 0, t.activities.Len())
|
||||
for e := t.activities.Back(); e != nil; e = e.Prev() {
|
||||
act := e.Value.(*taskActivity)
|
||||
comps = append(comps, act.name)
|
||||
}
|
||||
return comps
|
||||
}
|
||||
|
||||
// Start a sub-activity.
|
||||
// Must always be matched with a call to t.Finish()
|
||||
// --- consider using defer for this purpose.
|
||||
func (t *Task) Enter(activity string) {
|
||||
t.rwl.Lock()
|
||||
defer t.rwl.Unlock()
|
||||
|
||||
prev := t.cur()
|
||||
if prev.idle {
|
||||
// reset progress when leaving idle task
|
||||
// we leave the old progress dangling to have the user not worry about
|
||||
prev.progress = newTaskProgress()
|
||||
}
|
||||
act := &taskActivity{activity, false, nil, prev.progress}
|
||||
t.activities.PushFront(act)
|
||||
stack := t.buildActivityStack()
|
||||
activityField := strings.Join(stack, ".")
|
||||
act.logger = prev.logger.ReplaceField(logTaskField, activityField)
|
||||
|
||||
}
|
||||
|
||||
func (t *Task) UpdateProgress(dtx, drx int64) {
|
||||
t.rwl.RLock()
|
||||
p := t.cur().progress // protected by own rwlock
|
||||
t.rwl.RUnlock()
|
||||
p.UpdateIO(dtx, drx)
|
||||
}
|
||||
|
||||
// Returns a wrapper io.Reader that updates this task's _current_ progress value.
|
||||
// Progress updates after this task resets its progress value are discarded.
|
||||
func (t *Task) ProgressUpdater(r io.Reader) *IOProgressUpdater {
|
||||
t.rwl.RLock()
|
||||
defer t.rwl.RUnlock()
|
||||
return &IOProgressUpdater{r, t.cur().progress}
|
||||
}
|
||||
|
||||
func (t *Task) Status() *TaskStatus {
|
||||
t.rwl.RLock()
|
||||
defer t.rwl.RUnlock()
|
||||
// NOTE
|
||||
// do not return any state in TaskStatus that is protected by t.rwl
|
||||
|
||||
cur := t.cur()
|
||||
stack := t.buildActivityStack()
|
||||
prog := cur.progress.Read()
|
||||
|
||||
var maxLevel logger.Level
|
||||
for _, entry := range prog.logEntries {
|
||||
if maxLevel < entry.Level {
|
||||
maxLevel = entry.Level
|
||||
}
|
||||
}
|
||||
|
||||
s := &TaskStatus{
|
||||
Name: stack[0],
|
||||
ActivityStack: stack,
|
||||
Idle: cur.idle,
|
||||
ProgressRx: prog.rx,
|
||||
ProgressTx: prog.tx,
|
||||
LogEntries: prog.logEntries,
|
||||
MaxLogLevel: maxLevel,
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
// Finish a sub-activity.
|
||||
// Corresponds to a preceding call to t.Enter()
|
||||
func (t *Task) Finish() {
|
||||
t.rwl.Lock()
|
||||
defer t.rwl.Unlock()
|
||||
top := t.activities.Front()
|
||||
if top.Next() == nil {
|
||||
return // cannot remove root activity
|
||||
}
|
||||
t.activities.Remove(top)
|
||||
|
||||
}
|
||||
|
||||
// Returns a logger derived from the logger passed to the constructor function.
|
||||
// The logger's task field contains the current activity stack joined by '.'.
|
||||
func (t *Task) Log() *logger.Logger {
|
||||
t.rwl.RLock()
|
||||
defer t.rwl.RUnlock()
|
||||
return t.cur().logger
|
||||
}
|
||||
|
||||
// implement logger.Outlet interface
|
||||
func (t *Task) WriteEntry(ctx context.Context, entry logger.Entry) error {
|
||||
t.rwl.RLock()
|
||||
defer t.rwl.RUnlock()
|
||||
t.cur().progress.UpdateLogEntry(entry)
|
||||
return nil
|
||||
}
|
||||
|
||||
type IOProgressUpdater struct {
|
||||
r io.Reader
|
||||
p *taskProgress
|
||||
}
|
||||
|
||||
func (u *IOProgressUpdater) Read(p []byte) (n int, err error) {
|
||||
n, err = u.r.Read(p)
|
||||
u.p.UpdateIO(int64(n), 0)
|
||||
return
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user