diff --git a/client/status.go b/client/status.go index 73386e4..75934ff 100644 --- a/client/status.go +++ b/client/status.go @@ -20,6 +20,7 @@ import ( "github.com/zrepl/zrepl/daemon" "github.com/zrepl/zrepl/daemon/job" "github.com/zrepl/zrepl/daemon/pruner" + "github.com/zrepl/zrepl/daemon/snapper" "github.com/zrepl/zrepl/replication/report" ) @@ -335,6 +336,15 @@ func (t *tui) draw() { t.addIndent(1) t.renderPrunerReport(activeStatus.PruningReceiver) t.addIndent(-1) + + if v.Type == job.TypePush { + t.printf("Snapshotting:") + t.newline() + t.addIndent(1) + t.renderSnapperReport(activeStatus.Snapshotting) + t.addIndent(-1) + } + } else if v.Type == job.TypeSnap { snapStatus, ok := v.JobSpecific.(*job.SnapJobStatus) if !ok || snapStatus == nil { @@ -347,6 +357,19 @@ func (t *tui) draw() { t.addIndent(1) t.renderPrunerReport(snapStatus.Pruning) t.addIndent(-1) + t.printf("Snapshotting:") + t.newline() + t.addIndent(1) + t.renderSnapperReport(snapStatus.Snapshotting) + t.addIndent(-1) + } else if v.Type == job.TypeSource { + + st := v.JobSpecific.(*job.PassiveStatus) + t.printf("Snapshotting:\n") + t.addIndent(1) + t.renderSnapperReport(st.Snapper) + t.addIndent(-1) + } else { t.printf("No status representation for job type '%s', dumping as YAML", v.Type) t.newline() @@ -560,6 +583,86 @@ func (t *tui) renderPrunerReport(r *pruner.Report) { } +func (t *tui) renderSnapperReport(r *snapper.Report) { + if r == nil { + t.printf("\n") + return + } + + t.printf("Status: %s", r.State) + t.newline() + + if r.Error != "" { + t.printf("Error: %s\n", r.Error) + } + if !r.SleepUntil.IsZero() { + t.printf("Sleep until: %s\n", r.SleepUntil) + } + + sort.Slice(r.Progress, func(i, j int) bool { + return strings.Compare(r.Progress[i].Path, r.Progress[j].Path) == -1 + }) + + t.addIndent(1) + defer t.addIndent(-1) + dur := func(d time.Duration) string { + return d.Round(100 * time.Millisecond).String() + } + + type row struct { + path, state, duration, remainder, hookReport string + } + var widths struct { + path, state, duration int + } + rows := make([]*row, len(r.Progress)) + for i, fs := range r.Progress { + r := &row{ + path: fs.Path, + state: fs.State.String(), + } + if fs.HooksHadError { + r.hookReport = fs.Hooks // FIXME render here, not in daemon + } + switch fs.State { + case snapper.SnapPending: + r.duration = "..." + r.remainder = "" + case snapper.SnapStarted: + r.duration = dur(time.Since(fs.StartAt)) + r.remainder = fmt.Sprintf("snap name: %q", fs.SnapName) + case snapper.SnapDone: + fallthrough + case snapper.SnapError: + r.duration = dur(fs.DoneAt.Sub(fs.StartAt)) + r.remainder = fmt.Sprintf("snap name: %q", fs.SnapName) + } + rows[i] = r + if len(r.path) > widths.path { + widths.path = len(r.path) + } + if len(r.state) > widths.state { + widths.state = len(r.state) + } + if len(r.duration) > widths.duration { + widths.duration = len(r.duration) + } + } + + for _, r := range rows { + path := rightPad(r.path, widths.path, " ") + state := rightPad(r.state, widths.state, " ") + duration := rightPad(r.duration, widths.duration, " ") + t.printf("%s %s %s", path, state, duration) + t.printfDrawIndentedAndWrappedIfMultiline(" %s", r.remainder) + if r.hookReport != "" { + t.printfDrawIndentedAndWrappedIfMultiline("%s", r.hookReport) + } + t.newline() + } + +} + func times(str string, n int) (out string) { for i := 0; i < n; i++ { out += str @@ -571,7 +674,7 @@ func rightPad(str string, length int, pad string) string { if len(str) > length { return str[:length] } - return str + times(pad, length-len(str)) + return str + strings.Repeat(pad, length-len(str)) } var arrowPositions = `>\|/` diff --git a/daemon/hooks/hook_exec.go b/daemon/hooks/hook_exec.go index b579f24..299bd59 100644 --- a/daemon/hooks/hook_exec.go +++ b/daemon/hooks/hook_exec.go @@ -77,8 +77,8 @@ type Step struct { Begin, End time.Time // Report may be nil // FIXME cannot serialize this for client status, but contains interesting info (like what error happened) - Report HookReport - state map[interface{}]interface{} + Report HookReport + state map[interface{}]interface{} } func (s Step) String() (out string) { diff --git a/daemon/job/active.go b/daemon/job/active.go index a878f19..f91ed63 100644 --- a/daemon/job/active.go +++ b/daemon/job/active.go @@ -83,6 +83,7 @@ type activeMode interface { SenderReceiver() (logic.Sender, logic.Receiver) Type() Type RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{}) + SnapperReport() *snapper.Report ResetConnectBackoff() } @@ -124,6 +125,10 @@ func (m *modePush) RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{} m.snapper.Run(ctx, wakeUpCommon) } +func (m *modePush) SnapperReport() *snapper.Report { + return m.snapper.Report() +} + func (m *modePush) ResetConnectBackoff() { m.setupMtx.Lock() defer m.setupMtx.Unlock() @@ -206,6 +211,10 @@ func (m *modePull) RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{} } } +func (m *modePull) SnapperReport() *snapper.Report { + return nil +} + func (m *modePull) ResetConnectBackoff() { m.setupMtx.Lock() defer m.setupMtx.Unlock() @@ -279,6 +288,7 @@ func (j *ActiveSide) Name() string { return j.name } type ActiveSideStatus struct { Replication *report.Report PruningSender, PruningReceiver *pruner.Report + Snapshotting *snapper.Report } func (j *ActiveSide) Status() *Status { @@ -295,6 +305,7 @@ func (j *ActiveSide) Status() *Status { if tasks.prunerReceiver != nil { s.PruningReceiver = tasks.prunerReceiver.Report() } + s.Snapshotting = j.mode.SnapperReport() return &Status{Type: t, JobSpecific: s} } diff --git a/daemon/job/passive.go b/daemon/job/passive.go index 5c7b48b..6057b53 100644 --- a/daemon/job/passive.go +++ b/daemon/job/passive.go @@ -27,6 +27,7 @@ type PassiveSide struct { type passiveMode interface { Handler() rpc.Handler RunPeriodic(ctx context.Context) + SnapperReport() *snapper.Report // may be nil Type() Type } @@ -40,7 +41,8 @@ func (m *modeSink) Handler() rpc.Handler { return endpoint.NewReceiver(m.rootDataset, true) } -func (m *modeSink) RunPeriodic(_ context.Context) {} +func (m *modeSink) RunPeriodic(_ context.Context) {} +func (m *modeSink) SnapperReport() *snapper.Report { return nil } func modeSinkFromConfig(g *config.Global, in *config.SinkJob) (m *modeSink, err error) { m = &modeSink{} @@ -85,6 +87,10 @@ func (m *modeSource) RunPeriodic(ctx context.Context) { m.snapper.Run(ctx, nil) } +func (m *modeSource) SnapperReport() *snapper.Report { + return m.snapper.Report() +} + func passiveSideFromConfig(g *config.Global, in *config.PassiveJob, mode passiveMode) (s *PassiveSide, err error) { s = &PassiveSide{mode: mode, name: in.Name} @@ -97,10 +103,15 @@ func passiveSideFromConfig(g *config.Global, in *config.PassiveJob, mode passive func (j *PassiveSide) Name() string { return j.name } -type PassiveStatus struct{} +type PassiveStatus struct { + Snapper *snapper.Report +} func (s *PassiveSide) Status() *Status { - return &Status{Type: s.mode.Type()} // FIXME PassiveStatus + st := &PassiveStatus{ + Snapper: s.mode.SnapperReport(), + } + return &Status{Type: s.mode.Type(), JobSpecific: st} } func (j *PassiveSide) OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) { diff --git a/daemon/job/snapjob.go b/daemon/job/snapjob.go index 7614336..bc9291d 100644 --- a/daemon/job/snapjob.go +++ b/daemon/job/snapjob.go @@ -66,7 +66,8 @@ func (j *SnapJob) RegisterMetrics(registerer prometheus.Registerer) { } type SnapJobStatus struct { - Pruning *pruner.Report + Pruning *pruner.Report + Snapshotting *snapper.Report // may be nil } func (j *SnapJob) Status() *Status { @@ -75,6 +76,7 @@ func (j *SnapJob) Status() *Status { if j.pruner != nil { s.Pruning = j.pruner.Report() } + s.Snapshotting = j.snapper.Report() return &Status{Type: t, JobSpecific: s} } diff --git a/daemon/snapper/snapper.go b/daemon/snapper/snapper.go index b7fffa4..9750d99 100644 --- a/daemon/snapper/snapper.go +++ b/daemon/snapper/snapper.go @@ -26,12 +26,14 @@ const ( SnapError ) +// All fields protected by Snapper.mtx type snapProgress struct { state SnapState // SnapStarted, SnapDone, SnapError - name string - startAt time.Time + name string + startAt time.Time + hookPlan *hooks.Plan // SnapDone doneAt time.Time @@ -61,7 +63,7 @@ type Snapper struct { lastInvocation time.Time // valid for state Snapshotting - plan map[*zfs.DatasetPath]snapProgress + plan map[*zfs.DatasetPath]*snapProgress // valid for state SyncUp and Waiting sleepUntil time.Time @@ -234,19 +236,20 @@ func plan(a args, u updater) state { return onErr(err, u) } - plan := make(map[*zfs.DatasetPath]snapProgress, len(fss)) + plan := make(map[*zfs.DatasetPath]*snapProgress, len(fss)) for _, fs := range fss { - plan[fs] = snapProgress{state: SnapPending} + plan[fs] = &snapProgress{state: SnapPending} } return u(func(s *Snapper) { s.state = Snapshotting s.plan = plan + s.err = nil }).sf() } func snapshot(a args, u updater) state { - var plan map[*zfs.DatasetPath]snapProgress + var plan map[*zfs.DatasetPath]*snapProgress u(func(snapper *Snapper) { plan = snapper.plan }) @@ -266,14 +269,6 @@ func snapshot(a args, u updater) state { WithField("fs", fs.ToString()). WithField("snap", snapname) - u(func(snapper *Snapper) { - progress.name = snapname - progress.startAt = time.Now() - progress.state = SnapStarted - }) - - var doneAt time.Time - hookEnvExtra := hooks.Env{ hooks.EnvFS: fs.ToString(), hooks.EnvSnapshot: snapname, @@ -285,7 +280,6 @@ func snapshot(a args, u updater) state { if err != nil { l.WithError(err).Error("cannot create snapshot") } - doneAt = time.Now() return }) @@ -312,6 +306,12 @@ func snapshot(a args, u updater) state { goto updateFSState } } + u(func(snapper *Snapper) { + progress.name = snapname + progress.startAt = time.Now() + progress.hookPlan = plan + progress.state = SnapStarted + }) { l := hooks.GetLogger(a.ctx).WithField("fs", fs.ToString()).WithField("snap", snapname) l.WithField("report", plan.Report().String()).Debug("begin run job plan") @@ -328,7 +328,7 @@ func snapshot(a args, u updater) state { updateFSState: anyFsHadErr = anyFsHadErr || fsHadErr u(func(snapper *Snapper) { - progress.doneAt = doneAt + progress.doneAt = time.Now() progress.state = SnapDone if fsHadErr { progress.state = SnapError @@ -364,6 +364,7 @@ func snapshot(a args, u updater) state { snapper.err = errors.New("one or more snapshots could not be created, check logs for details") } else { snapper.state = Waiting + snapper.err = nil } }).sf() } diff --git a/daemon/snapper/snapper_all.go b/daemon/snapper/snapper_all.go index 8a87211..51eda9e 100644 --- a/daemon/snapper/snapper_all.go +++ b/daemon/snapper/snapper_all.go @@ -24,6 +24,14 @@ func (s *PeriodicOrManual) Run(ctx context.Context, wakeUpCommon chan<- struct{} } } +// Returns nil if manual +func (s *PeriodicOrManual) Report() *Report { + if s.s != nil { + return s.s.Report() + } + return nil +} + func FromConfig(g *config.Global, fsf *filters.DatasetMapFilter, in config.SnapshottingEnum) (*PeriodicOrManual, error) { switch v := in.Ret.(type) { case *config.SnapshottingPeriodic: diff --git a/daemon/snapper/snapper_report.go b/daemon/snapper/snapper_report.go new file mode 100644 index 0000000..f0324df --- /dev/null +++ b/daemon/snapper/snapper_report.go @@ -0,0 +1,118 @@ +package snapper + +import ( + "fmt" + "sort" + "strings" + "time" + + "github.com/zrepl/zrepl/daemon/hooks" +) + +type Report struct { + State State + // valid in state SyncUp and Waiting + SleepUntil time.Time + // valid in state Err + Error string + // valid in state Snapshotting + Progress []*ReportFilesystem +} + +type ReportFilesystem struct { + Path string + State SnapState + + // Valid in SnapStarted and later + SnapName string + StartAt time.Time + Hooks string + HooksHadError bool + + // Valid in SnapDone | SnapError + DoneAt time.Time +} + +func errOrEmptyString(e error) string { + if e != nil { + return e.Error() + } + return "" +} + +func (s *Snapper) Report() *Report { + s.mtx.Lock() + defer s.mtx.Unlock() + + pReps := make([]*ReportFilesystem, 0, len(s.plan)) + for fs, p := range s.plan { + var hooksStr string + var hooksHadError bool + if p.hookPlan != nil { + hr := p.hookPlan.Report() + // FIXME: technically this belongs into client + // but we can't serialize hooks.Step ATM + rightPad := func(str string, length int, pad string) string { + if len(str) > length { + return str[:length] + } + return str + strings.Repeat(pad, length-len(str)) + } + hooksHadError = hr.HadError() + rows := make([][]string, len(hr)) + const numCols = 4 + lens := make([]int, numCols) + for i, e := range hr { + rows[i] = make([]string, numCols) + rows[i][0] = fmt.Sprintf("%d", i+1) + rows[i][1] = e.Status.String() + runTime := "..." + if e.Status != hooks.StepPending { + runTime = e.End.Sub(e.Begin).Round(time.Millisecond).String() + } + rows[i][2] = runTime + rows[i][3] = "" + if e.Report != nil { + rows[i][3] = e.Report.String() + } + for j, col := range lens { + if len(rows[i][j]) > col { + lens[j] = len(rows[i][j]) + } + } + } + rowsFlat := make([]string, len(hr)) + for i, r := range rows { + colsPadded := make([]string, len(r)) + for j, c := range r[:len(r)-1] { + colsPadded[j] = rightPad(c, lens[j], " ") + } + colsPadded[len(r)-1] = r[len(r)-1] + rowsFlat[i] = strings.Join(colsPadded, " ") + } + hooksStr = strings.Join(rowsFlat, "\n") + } + pReps = append(pReps, &ReportFilesystem{ + Path: fs.ToString(), + State: p.state, + SnapName: p.name, + StartAt: p.startAt, + DoneAt: p.doneAt, + Hooks: hooksStr, + HooksHadError: hooksHadError, + }) + } + + sort.Slice(pReps, func(i, j int) bool { + return strings.Compare(pReps[i].Path, pReps[j].Path) == -1 + }) + + r := &Report{ + State: s.state, + SleepUntil: s.sleepUntil, + Error: errOrEmptyString(s.err), + Progress: pReps, + } + + return r +}