snapper + client/status: snapshotting reports

This commit is contained in:
Christian Schwarz 2019-09-19 00:13:55 +02:00
parent 729c83ee72
commit b5ff1a9926
8 changed files with 277 additions and 23 deletions

View File

@ -20,6 +20,7 @@ import (
"github.com/zrepl/zrepl/daemon"
"github.com/zrepl/zrepl/daemon/job"
"github.com/zrepl/zrepl/daemon/pruner"
"github.com/zrepl/zrepl/daemon/snapper"
"github.com/zrepl/zrepl/replication/report"
)
@ -335,6 +336,15 @@ func (t *tui) draw() {
t.addIndent(1)
t.renderPrunerReport(activeStatus.PruningReceiver)
t.addIndent(-1)
if v.Type == job.TypePush {
t.printf("Snapshotting:")
t.newline()
t.addIndent(1)
t.renderSnapperReport(activeStatus.Snapshotting)
t.addIndent(-1)
}
} else if v.Type == job.TypeSnap {
snapStatus, ok := v.JobSpecific.(*job.SnapJobStatus)
if !ok || snapStatus == nil {
@ -347,6 +357,19 @@ func (t *tui) draw() {
t.addIndent(1)
t.renderPrunerReport(snapStatus.Pruning)
t.addIndent(-1)
t.printf("Snapshotting:")
t.newline()
t.addIndent(1)
t.renderSnapperReport(snapStatus.Snapshotting)
t.addIndent(-1)
} else if v.Type == job.TypeSource {
st := v.JobSpecific.(*job.PassiveStatus)
t.printf("Snapshotting:\n")
t.addIndent(1)
t.renderSnapperReport(st.Snapper)
t.addIndent(-1)
} else {
t.printf("No status representation for job type '%s', dumping as YAML", v.Type)
t.newline()
@ -560,6 +583,86 @@ func (t *tui) renderPrunerReport(r *pruner.Report) {
}
func (t *tui) renderSnapperReport(r *snapper.Report) {
if r == nil {
t.printf("<snapshot type does not have a report>\n")
return
}
t.printf("Status: %s", r.State)
t.newline()
if r.Error != "" {
t.printf("Error: %s\n", r.Error)
}
if !r.SleepUntil.IsZero() {
t.printf("Sleep until: %s\n", r.SleepUntil)
}
sort.Slice(r.Progress, func(i, j int) bool {
return strings.Compare(r.Progress[i].Path, r.Progress[j].Path) == -1
})
t.addIndent(1)
defer t.addIndent(-1)
dur := func(d time.Duration) string {
return d.Round(100 * time.Millisecond).String()
}
type row struct {
path, state, duration, remainder, hookReport string
}
var widths struct {
path, state, duration int
}
rows := make([]*row, len(r.Progress))
for i, fs := range r.Progress {
r := &row{
path: fs.Path,
state: fs.State.String(),
}
if fs.HooksHadError {
r.hookReport = fs.Hooks // FIXME render here, not in daemon
}
switch fs.State {
case snapper.SnapPending:
r.duration = "..."
r.remainder = ""
case snapper.SnapStarted:
r.duration = dur(time.Since(fs.StartAt))
r.remainder = fmt.Sprintf("snap name: %q", fs.SnapName)
case snapper.SnapDone:
fallthrough
case snapper.SnapError:
r.duration = dur(fs.DoneAt.Sub(fs.StartAt))
r.remainder = fmt.Sprintf("snap name: %q", fs.SnapName)
}
rows[i] = r
if len(r.path) > widths.path {
widths.path = len(r.path)
}
if len(r.state) > widths.state {
widths.state = len(r.state)
}
if len(r.duration) > widths.duration {
widths.duration = len(r.duration)
}
}
for _, r := range rows {
path := rightPad(r.path, widths.path, " ")
state := rightPad(r.state, widths.state, " ")
duration := rightPad(r.duration, widths.duration, " ")
t.printf("%s %s %s", path, state, duration)
t.printfDrawIndentedAndWrappedIfMultiline(" %s", r.remainder)
if r.hookReport != "" {
t.printfDrawIndentedAndWrappedIfMultiline("%s", r.hookReport)
}
t.newline()
}
}
func times(str string, n int) (out string) {
for i := 0; i < n; i++ {
out += str
@ -571,7 +674,7 @@ func rightPad(str string, length int, pad string) string {
if len(str) > length {
return str[:length]
}
return str + times(pad, length-len(str))
return str + strings.Repeat(pad, length-len(str))
}
var arrowPositions = `>\|/`

View File

@ -77,8 +77,8 @@ type Step struct {
Begin, End time.Time
// Report may be nil
// FIXME cannot serialize this for client status, but contains interesting info (like what error happened)
Report HookReport
state map[interface{}]interface{}
Report HookReport
state map[interface{}]interface{}
}
func (s Step) String() (out string) {

View File

@ -83,6 +83,7 @@ type activeMode interface {
SenderReceiver() (logic.Sender, logic.Receiver)
Type() Type
RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{})
SnapperReport() *snapper.Report
ResetConnectBackoff()
}
@ -124,6 +125,10 @@ func (m *modePush) RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{}
m.snapper.Run(ctx, wakeUpCommon)
}
func (m *modePush) SnapperReport() *snapper.Report {
return m.snapper.Report()
}
func (m *modePush) ResetConnectBackoff() {
m.setupMtx.Lock()
defer m.setupMtx.Unlock()
@ -206,6 +211,10 @@ func (m *modePull) RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{}
}
}
func (m *modePull) SnapperReport() *snapper.Report {
return nil
}
func (m *modePull) ResetConnectBackoff() {
m.setupMtx.Lock()
defer m.setupMtx.Unlock()
@ -279,6 +288,7 @@ func (j *ActiveSide) Name() string { return j.name }
type ActiveSideStatus struct {
Replication *report.Report
PruningSender, PruningReceiver *pruner.Report
Snapshotting *snapper.Report
}
func (j *ActiveSide) Status() *Status {
@ -295,6 +305,7 @@ func (j *ActiveSide) Status() *Status {
if tasks.prunerReceiver != nil {
s.PruningReceiver = tasks.prunerReceiver.Report()
}
s.Snapshotting = j.mode.SnapperReport()
return &Status{Type: t, JobSpecific: s}
}

View File

@ -27,6 +27,7 @@ type PassiveSide struct {
type passiveMode interface {
Handler() rpc.Handler
RunPeriodic(ctx context.Context)
SnapperReport() *snapper.Report // may be nil
Type() Type
}
@ -40,7 +41,8 @@ func (m *modeSink) Handler() rpc.Handler {
return endpoint.NewReceiver(m.rootDataset, true)
}
func (m *modeSink) RunPeriodic(_ context.Context) {}
func (m *modeSink) RunPeriodic(_ context.Context) {}
func (m *modeSink) SnapperReport() *snapper.Report { return nil }
func modeSinkFromConfig(g *config.Global, in *config.SinkJob) (m *modeSink, err error) {
m = &modeSink{}
@ -85,6 +87,10 @@ func (m *modeSource) RunPeriodic(ctx context.Context) {
m.snapper.Run(ctx, nil)
}
func (m *modeSource) SnapperReport() *snapper.Report {
return m.snapper.Report()
}
func passiveSideFromConfig(g *config.Global, in *config.PassiveJob, mode passiveMode) (s *PassiveSide, err error) {
s = &PassiveSide{mode: mode, name: in.Name}
@ -97,10 +103,15 @@ func passiveSideFromConfig(g *config.Global, in *config.PassiveJob, mode passive
func (j *PassiveSide) Name() string { return j.name }
type PassiveStatus struct{}
type PassiveStatus struct {
Snapper *snapper.Report
}
func (s *PassiveSide) Status() *Status {
return &Status{Type: s.mode.Type()} // FIXME PassiveStatus
st := &PassiveStatus{
Snapper: s.mode.SnapperReport(),
}
return &Status{Type: s.mode.Type(), JobSpecific: st}
}
func (j *PassiveSide) OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) {

View File

@ -66,7 +66,8 @@ func (j *SnapJob) RegisterMetrics(registerer prometheus.Registerer) {
}
type SnapJobStatus struct {
Pruning *pruner.Report
Pruning *pruner.Report
Snapshotting *snapper.Report // may be nil
}
func (j *SnapJob) Status() *Status {
@ -75,6 +76,7 @@ func (j *SnapJob) Status() *Status {
if j.pruner != nil {
s.Pruning = j.pruner.Report()
}
s.Snapshotting = j.snapper.Report()
return &Status{Type: t, JobSpecific: s}
}

View File

@ -26,12 +26,14 @@ const (
SnapError
)
// All fields protected by Snapper.mtx
type snapProgress struct {
state SnapState
// SnapStarted, SnapDone, SnapError
name string
startAt time.Time
name string
startAt time.Time
hookPlan *hooks.Plan
// SnapDone
doneAt time.Time
@ -61,7 +63,7 @@ type Snapper struct {
lastInvocation time.Time
// valid for state Snapshotting
plan map[*zfs.DatasetPath]snapProgress
plan map[*zfs.DatasetPath]*snapProgress
// valid for state SyncUp and Waiting
sleepUntil time.Time
@ -234,19 +236,20 @@ func plan(a args, u updater) state {
return onErr(err, u)
}
plan := make(map[*zfs.DatasetPath]snapProgress, len(fss))
plan := make(map[*zfs.DatasetPath]*snapProgress, len(fss))
for _, fs := range fss {
plan[fs] = snapProgress{state: SnapPending}
plan[fs] = &snapProgress{state: SnapPending}
}
return u(func(s *Snapper) {
s.state = Snapshotting
s.plan = plan
s.err = nil
}).sf()
}
func snapshot(a args, u updater) state {
var plan map[*zfs.DatasetPath]snapProgress
var plan map[*zfs.DatasetPath]*snapProgress
u(func(snapper *Snapper) {
plan = snapper.plan
})
@ -266,14 +269,6 @@ func snapshot(a args, u updater) state {
WithField("fs", fs.ToString()).
WithField("snap", snapname)
u(func(snapper *Snapper) {
progress.name = snapname
progress.startAt = time.Now()
progress.state = SnapStarted
})
var doneAt time.Time
hookEnvExtra := hooks.Env{
hooks.EnvFS: fs.ToString(),
hooks.EnvSnapshot: snapname,
@ -285,7 +280,6 @@ func snapshot(a args, u updater) state {
if err != nil {
l.WithError(err).Error("cannot create snapshot")
}
doneAt = time.Now()
return
})
@ -312,6 +306,12 @@ func snapshot(a args, u updater) state {
goto updateFSState
}
}
u(func(snapper *Snapper) {
progress.name = snapname
progress.startAt = time.Now()
progress.hookPlan = plan
progress.state = SnapStarted
})
{
l := hooks.GetLogger(a.ctx).WithField("fs", fs.ToString()).WithField("snap", snapname)
l.WithField("report", plan.Report().String()).Debug("begin run job plan")
@ -328,7 +328,7 @@ func snapshot(a args, u updater) state {
updateFSState:
anyFsHadErr = anyFsHadErr || fsHadErr
u(func(snapper *Snapper) {
progress.doneAt = doneAt
progress.doneAt = time.Now()
progress.state = SnapDone
if fsHadErr {
progress.state = SnapError
@ -364,6 +364,7 @@ func snapshot(a args, u updater) state {
snapper.err = errors.New("one or more snapshots could not be created, check logs for details")
} else {
snapper.state = Waiting
snapper.err = nil
}
}).sf()
}

View File

@ -24,6 +24,14 @@ func (s *PeriodicOrManual) Run(ctx context.Context, wakeUpCommon chan<- struct{}
}
}
// Returns nil if manual
func (s *PeriodicOrManual) Report() *Report {
if s.s != nil {
return s.s.Report()
}
return nil
}
func FromConfig(g *config.Global, fsf *filters.DatasetMapFilter, in config.SnapshottingEnum) (*PeriodicOrManual, error) {
switch v := in.Ret.(type) {
case *config.SnapshottingPeriodic:

View File

@ -0,0 +1,118 @@
package snapper
import (
"fmt"
"sort"
"strings"
"time"
"github.com/zrepl/zrepl/daemon/hooks"
)
type Report struct {
State State
// valid in state SyncUp and Waiting
SleepUntil time.Time
// valid in state Err
Error string
// valid in state Snapshotting
Progress []*ReportFilesystem
}
type ReportFilesystem struct {
Path string
State SnapState
// Valid in SnapStarted and later
SnapName string
StartAt time.Time
Hooks string
HooksHadError bool
// Valid in SnapDone | SnapError
DoneAt time.Time
}
func errOrEmptyString(e error) string {
if e != nil {
return e.Error()
}
return ""
}
func (s *Snapper) Report() *Report {
s.mtx.Lock()
defer s.mtx.Unlock()
pReps := make([]*ReportFilesystem, 0, len(s.plan))
for fs, p := range s.plan {
var hooksStr string
var hooksHadError bool
if p.hookPlan != nil {
hr := p.hookPlan.Report()
// FIXME: technically this belongs into client
// but we can't serialize hooks.Step ATM
rightPad := func(str string, length int, pad string) string {
if len(str) > length {
return str[:length]
}
return str + strings.Repeat(pad, length-len(str))
}
hooksHadError = hr.HadError()
rows := make([][]string, len(hr))
const numCols = 4
lens := make([]int, numCols)
for i, e := range hr {
rows[i] = make([]string, numCols)
rows[i][0] = fmt.Sprintf("%d", i+1)
rows[i][1] = e.Status.String()
runTime := "..."
if e.Status != hooks.StepPending {
runTime = e.End.Sub(e.Begin).Round(time.Millisecond).String()
}
rows[i][2] = runTime
rows[i][3] = ""
if e.Report != nil {
rows[i][3] = e.Report.String()
}
for j, col := range lens {
if len(rows[i][j]) > col {
lens[j] = len(rows[i][j])
}
}
}
rowsFlat := make([]string, len(hr))
for i, r := range rows {
colsPadded := make([]string, len(r))
for j, c := range r[:len(r)-1] {
colsPadded[j] = rightPad(c, lens[j], " ")
}
colsPadded[len(r)-1] = r[len(r)-1]
rowsFlat[i] = strings.Join(colsPadded, " ")
}
hooksStr = strings.Join(rowsFlat, "\n")
}
pReps = append(pReps, &ReportFilesystem{
Path: fs.ToString(),
State: p.state,
SnapName: p.name,
StartAt: p.startAt,
DoneAt: p.doneAt,
Hooks: hooksStr,
HooksHadError: hooksHadError,
})
}
sort.Slice(pReps, func(i, j int) bool {
return strings.Compare(pReps[i].Path, pReps[j].Path) == -1
})
r := &Report{
State: s.state,
SleepUntil: s.sleepUntil,
Error: errOrEmptyString(s.err),
Progress: pReps,
}
return r
}