1
0
mirror of https://github.com/zrepl/zrepl.git synced 2024-12-20 14:10:47 +01:00
zrepl/client/status.go

817 lines
19 KiB
Go

package client
import (
"context"
"fmt"
"io"
"math"
"net/http"
"os"
"sort"
"strings"
"sync"
"time"
// tcell is the termbox-compatible library for abstracting away escape sequences, etc.
// as of tcell#252, the number of default distributed terminals is relatively limited
// additional terminal definitions can be included via side-effect import
// See https://github.com/gdamore/tcell/blob/master/terminfo/base/base.go
// See https://github.com/gdamore/tcell/issues/252#issuecomment-533836078
"github.com/gdamore/tcell/termbox"
_ "github.com/gdamore/tcell/terminfo/s/screen" // tmux on FreeBSD 11 & 12 without ncurses
"github.com/pkg/errors"
"github.com/spf13/pflag"
"github.com/zrepl/yaml-config"
"github.com/zrepl/zrepl/cli"
"github.com/zrepl/zrepl/daemon"
"github.com/zrepl/zrepl/daemon/job"
"github.com/zrepl/zrepl/daemon/pruner"
"github.com/zrepl/zrepl/daemon/snapper"
"github.com/zrepl/zrepl/replication/report"
)
type byteProgressMeasurement struct {
time time.Time
val int64
}
type bytesProgressHistory struct {
last *byteProgressMeasurement // pointer as poor man's optional
changeCount int
lastChange time.Time
bpsAvg float64
}
func (p *bytesProgressHistory) Update(currentVal int64) (bytesPerSecondAvg int64, changeCount int) {
if p.last == nil {
p.last = &byteProgressMeasurement{
time: time.Now(),
val: currentVal,
}
return 0, 0
}
if p.last.val != currentVal {
p.changeCount++
p.lastChange = time.Now()
}
if time.Since(p.lastChange) > 3*time.Second {
p.last = nil
return 0, 0
}
deltaV := currentVal - p.last.val
deltaT := time.Since(p.last.time)
rate := float64(deltaV) / deltaT.Seconds()
factor := 0.3
p.bpsAvg = (1-factor)*p.bpsAvg + factor*rate
p.last.time = time.Now()
p.last.val = currentVal
return int64(p.bpsAvg), p.changeCount
}
type tui struct {
x, y int
indent int
lock sync.Mutex //For report and error
report map[string]*job.Status
err error
jobFilter string
replicationProgress map[string]*bytesProgressHistory // by job name
}
func newTui() tui {
return tui{
replicationProgress: make(map[string]*bytesProgressHistory),
}
}
const INDENT_MULTIPLIER = 4
func (t *tui) moveLine(dl int, col int) {
t.y += dl
t.x = t.indent*INDENT_MULTIPLIER + col
}
func (t *tui) write(text string) {
for _, c := range text {
if c == '\n' {
t.newline()
continue
}
termbox.SetCell(t.x, t.y, c, termbox.ColorDefault, termbox.ColorDefault)
t.x += 1
}
}
func (t *tui) printf(text string, a ...interface{}) {
t.write(fmt.Sprintf(text, a...))
}
func wrap(s string, width int) string {
var b strings.Builder
for len(s) > 0 {
rem := width
if rem > len(s) {
rem = len(s)
}
if idx := strings.IndexAny(s, "\n\r"); idx != -1 && idx < rem {
rem = idx + 1
}
untilNewline := strings.TrimRight(s[:rem], "\n\r")
s = s[rem:]
if len(untilNewline) == 0 {
continue
}
b.WriteString(untilNewline)
b.WriteString("\n")
}
return strings.TrimRight(b.String(), "\n\r")
}
func (t *tui) printfDrawIndentedAndWrappedIfMultiline(format string, a ...interface{}) {
whole := fmt.Sprintf(format, a...)
width, _ := termbox.Size()
if !strings.ContainsAny(whole, "\n\r") && t.x+len(whole) <= width {
t.printf(format, a...)
} else {
t.addIndent(1)
t.newline()
t.write(wrap(whole, width-INDENT_MULTIPLIER*t.indent))
t.addIndent(-1)
}
}
func (t *tui) newline() {
t.moveLine(1, 0)
}
func (t *tui) setIndent(indent int) {
t.indent = indent
t.moveLine(0, 0)
}
func (t *tui) addIndent(indent int) {
t.indent += indent
t.moveLine(0, 0)
}
var statusFlags struct {
Raw bool
Job string
}
var StatusCmd = &cli.Subcommand{
Use: "status",
Short: "show job activity or dump as JSON for monitoring",
SetupFlags: func(f *pflag.FlagSet) {
f.BoolVar(&statusFlags.Raw, "raw", false, "dump raw status description from zrepl daemon")
f.StringVar(&statusFlags.Job, "job", "", "only dump specified job")
},
Run: runStatus,
}
func runStatus(ctx context.Context, s *cli.Subcommand, args []string) error {
httpc, err := controlHttpClient(s.Config().Global.Control.SockPath)
if err != nil {
return err
}
if statusFlags.Raw {
resp, err := httpc.Get("http://unix" + daemon.ControlJobEndpointStatus)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
fmt.Fprintf(os.Stderr, "Received error response:\n")
_, err := io.CopyN(os.Stderr, resp.Body, 4096)
if err != nil {
return err
}
return errors.Errorf("exit")
}
if _, err := io.Copy(os.Stdout, resp.Body); err != nil {
return err
}
return nil
}
t := newTui()
t.lock.Lock()
t.err = errors.New("Got no report yet")
t.lock.Unlock()
t.jobFilter = statusFlags.Job
err = termbox.Init()
if err != nil {
return err
}
defer termbox.Close()
update := func() {
var m daemon.Status
err2 := jsonRequestResponse(httpc, daemon.ControlJobEndpointStatus,
struct{}{},
&m,
)
t.lock.Lock()
t.err = err2
t.report = m.Jobs
t.lock.Unlock()
t.draw()
}
update()
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
go func() {
for range ticker.C {
update()
}
}()
termbox.HideCursor()
termbox.Clear(termbox.ColorDefault, termbox.ColorDefault)
loop:
for {
switch ev := termbox.PollEvent(); ev.Type {
case termbox.EventKey:
switch ev.Key {
case termbox.KeyEsc:
break loop
case termbox.KeyCtrlC:
break loop
}
case termbox.EventResize:
t.draw()
}
}
return nil
}
func (t *tui) getReplicationProgressHistory(jobName string) *bytesProgressHistory {
p, ok := t.replicationProgress[jobName]
if !ok {
p = &bytesProgressHistory{}
t.replicationProgress[jobName] = p
}
return p
}
func (t *tui) draw() {
t.lock.Lock()
defer t.lock.Unlock()
termbox.Clear(termbox.ColorDefault, termbox.ColorDefault)
t.x = 0
t.y = 0
t.indent = 0
if t.err != nil {
t.write(t.err.Error())
} else {
//Iterate over map in alphabetical order
keys := make([]string, 0, len(t.report))
for k := range t.report {
if len(k) == 0 || daemon.IsInternalJobName(k) { //Internal job
continue
}
if t.jobFilter != "" && k != t.jobFilter {
continue
}
keys = append(keys, k)
}
sort.Strings(keys)
if len(keys) == 0 {
t.setIndent(0)
t.printf("no jobs to display")
t.newline()
termbox.Flush()
return
}
for _, k := range keys {
v := t.report[k]
t.setIndent(0)
t.printf("Job: %s", k)
t.setIndent(1)
t.newline()
t.printf("Type: %s", v.Type)
t.setIndent(1)
t.newline()
if v.Type == job.TypePush || v.Type == job.TypePull {
activeStatus, ok := v.JobSpecific.(*job.ActiveSideStatus)
if !ok || activeStatus == nil {
t.printf("ActiveSideStatus is null")
t.newline()
continue
}
t.printf("Replication:")
t.newline()
t.addIndent(1)
t.renderReplicationReport(activeStatus.Replication, t.getReplicationProgressHistory(k))
t.addIndent(-1)
t.printf("Pruning Sender:")
t.newline()
t.addIndent(1)
t.renderPrunerReport(activeStatus.PruningSender)
t.addIndent(-1)
t.printf("Pruning Receiver:")
t.newline()
t.addIndent(1)
t.renderPrunerReport(activeStatus.PruningReceiver)
t.addIndent(-1)
if v.Type == job.TypePush {
t.printf("Snapshotting:")
t.newline()
t.addIndent(1)
t.renderSnapperReport(activeStatus.Snapshotting)
t.addIndent(-1)
}
} else if v.Type == job.TypeSnap {
snapStatus, ok := v.JobSpecific.(*job.SnapJobStatus)
if !ok || snapStatus == nil {
t.printf("SnapJobStatus is null")
t.newline()
continue
}
t.printf("Pruning snapshots:")
t.newline()
t.addIndent(1)
t.renderPrunerReport(snapStatus.Pruning)
t.addIndent(-1)
t.printf("Snapshotting:")
t.newline()
t.addIndent(1)
t.renderSnapperReport(snapStatus.Snapshotting)
t.addIndent(-1)
} else if v.Type == job.TypeSource {
st := v.JobSpecific.(*job.PassiveStatus)
t.printf("Snapshotting:\n")
t.addIndent(1)
t.renderSnapperReport(st.Snapper)
t.addIndent(-1)
} else {
t.printf("No status representation for job type '%s', dumping as YAML", v.Type)
t.newline()
asYaml, err := yaml.Marshal(v.JobSpecific)
if err != nil {
t.printf("Error marshaling status to YAML: %s", err)
t.newline()
continue
}
t.write(string(asYaml))
t.newline()
continue
}
}
}
termbox.Flush()
}
func (t *tui) renderReplicationReport(rep *report.Report, history *bytesProgressHistory) {
if rep == nil {
t.printf("...\n")
return
}
if rep.WaitReconnectError != nil {
t.printfDrawIndentedAndWrappedIfMultiline("Connectivity: %s", rep.WaitReconnectError)
t.newline()
}
if !rep.WaitReconnectSince.IsZero() {
delta := time.Until(rep.WaitReconnectUntil).Round(time.Second)
if rep.WaitReconnectUntil.IsZero() || delta > 0 {
var until string
if rep.WaitReconnectUntil.IsZero() {
until = "waiting indefinitely"
} else {
until = fmt.Sprintf("hard fail in %s @ %s", delta, rep.WaitReconnectUntil)
}
t.printfDrawIndentedAndWrappedIfMultiline("Connectivity: reconnecting with exponential backoff (since %s) (%s)",
rep.WaitReconnectSince, until)
} else {
t.printfDrawIndentedAndWrappedIfMultiline("Connectivity: reconnects reached hard-fail timeout @ %s", rep.WaitReconnectUntil)
}
t.newline()
}
// TODO visualize more than the latest attempt by folding all attempts into one
if len(rep.Attempts) == 0 {
t.printf("no attempts made yet")
return
} else {
t.printf("Attempt #%d", len(rep.Attempts))
if len(rep.Attempts) > 1 {
t.printf(". Previous attempts failed with the following statuses:")
t.newline()
t.addIndent(1)
for i, a := range rep.Attempts[:len(rep.Attempts)-1] {
t.printfDrawIndentedAndWrappedIfMultiline("#%d: %s (failed at %s) (ran %s)", i+1, a.State, a.FinishAt, a.FinishAt.Sub(a.StartAt))
t.newline()
}
t.addIndent(-1)
} else {
t.newline()
}
}
latest := rep.Attempts[len(rep.Attempts)-1]
sort.Slice(latest.Filesystems, func(i, j int) bool {
return latest.Filesystems[i].Info.Name < latest.Filesystems[j].Info.Name
})
t.printf("Status: %s", latest.State)
t.newline()
if latest.State == report.AttemptPlanningError {
t.printf("Problem: ")
t.printfDrawIndentedAndWrappedIfMultiline("%s", latest.PlanError)
t.newline()
} else if latest.State == report.AttemptFanOutError {
t.printf("Problem: one or more of the filesystems encountered errors")
t.newline()
}
if latest.State != report.AttemptPlanning && latest.State != report.AttemptPlanningError {
// Draw global progress bar
// Progress: [---------------]
expected, replicated, containsInvalidSizeEstimates := latest.BytesSum()
rate, changeCount := history.Update(replicated)
eta := time.Duration(0)
if rate > 0 {
eta = time.Duration((expected-replicated)/rate) * time.Second
}
t.write("Progress: ")
t.drawBar(50, replicated, expected, changeCount)
t.write(fmt.Sprintf(" %s / %s @ %s/s", ByteCountBinary(replicated), ByteCountBinary(expected), ByteCountBinary(rate)))
if eta != 0 {
t.write(fmt.Sprintf(" (%s remaining)", humanizeDuration(eta)))
}
t.newline()
if containsInvalidSizeEstimates {
t.write("NOTE: not all steps could be size-estimated, total estimate is likely imprecise!")
t.newline()
}
if len(latest.Filesystems) == 0 {
t.write("NOTE: no filesystems were considered for replication!")
t.newline()
}
var maxFSLen int
for _, fs := range latest.Filesystems {
if len(fs.Info.Name) > maxFSLen {
maxFSLen = len(fs.Info.Name)
}
}
for _, fs := range latest.Filesystems {
t.printFilesystemStatus(fs, false, maxFSLen) // FIXME bring 'active' flag back
}
}
}
func (t *tui) renderPrunerReport(r *pruner.Report) {
if r == nil {
t.printf("...\n")
return
}
state, err := pruner.StateString(r.State)
if err != nil {
t.printf("Status: %q (parse error: %q)\n", r.State, err)
return
}
t.printf("Status: %s", state)
t.newline()
if r.Error != "" {
t.printf("Error: %s\n", r.Error)
}
type commonFS struct {
*pruner.FSReport
completed bool
}
all := make([]commonFS, 0, len(r.Pending)+len(r.Completed))
for i := range r.Pending {
all = append(all, commonFS{&r.Pending[i], false})
}
for i := range r.Completed {
all = append(all, commonFS{&r.Completed[i], true})
}
switch state {
case pruner.Plan:
fallthrough
case pruner.PlanErr:
return
}
if len(all) == 0 {
t.printf("nothing to do\n")
return
}
var totalDestroyCount, completedDestroyCount int
var maxFSname int
for _, fs := range all {
totalDestroyCount += len(fs.DestroyList)
if fs.completed {
completedDestroyCount += len(fs.DestroyList)
}
if maxFSname < len(fs.Filesystem) {
maxFSname = len(fs.Filesystem)
}
}
// global progress bar
progress := int(math.Round(80 * float64(completedDestroyCount) / float64(totalDestroyCount)))
t.write("Progress: ")
t.write("[")
t.write(times("=", progress))
t.write(">")
t.write(times("-", 80-progress))
t.write("]")
t.printf(" %d/%d snapshots", completedDestroyCount, totalDestroyCount)
t.newline()
sort.SliceStable(all, func(i, j int) bool {
return strings.Compare(all[i].Filesystem, all[j].Filesystem) == -1
})
// Draw a table-like representation of 'all'
for _, fs := range all {
t.write(rightPad(fs.Filesystem, maxFSname, " "))
t.write(" ")
if !fs.SkipReason.NotSkipped() {
t.printf("skipped: %s\n", fs.SkipReason)
continue
}
if fs.LastError != "" {
if strings.ContainsAny(fs.LastError, "\r\n") {
t.printf("ERROR:")
t.printfDrawIndentedAndWrappedIfMultiline("%s\n", fs.LastError)
} else {
t.printfDrawIndentedAndWrappedIfMultiline("ERROR: %s\n", fs.LastError)
}
t.newline()
continue
}
pruneRuleActionStr := fmt.Sprintf("(destroy %d of %d snapshots)",
len(fs.DestroyList), len(fs.SnapshotList))
if fs.completed {
t.printf("Completed %s\n", pruneRuleActionStr)
continue
}
t.write("Pending ") // whitespace is padding 10
if len(fs.DestroyList) == 1 {
t.write(fs.DestroyList[0].Name)
} else {
t.write(pruneRuleActionStr)
}
t.newline()
}
}
func (t *tui) renderSnapperReport(r *snapper.Report) {
if r == nil {
t.printf("<snapshot type does not have a report>\n")
return
}
t.printf("Status: %s", r.State)
t.newline()
if r.Error != "" {
t.printf("Error: %s\n", r.Error)
}
if !r.SleepUntil.IsZero() {
t.printf("Sleep until: %s\n", r.SleepUntil)
}
sort.Slice(r.Progress, func(i, j int) bool {
return strings.Compare(r.Progress[i].Path, r.Progress[j].Path) == -1
})
t.addIndent(1)
defer t.addIndent(-1)
dur := func(d time.Duration) string {
return d.Round(100 * time.Millisecond).String()
}
type row struct {
path, state, duration, remainder, hookReport string
}
var widths struct {
path, state, duration int
}
rows := make([]*row, len(r.Progress))
for i, fs := range r.Progress {
r := &row{
path: fs.Path,
state: fs.State.String(),
}
if fs.HooksHadError {
r.hookReport = fs.Hooks // FIXME render here, not in daemon
}
switch fs.State {
case snapper.SnapPending:
r.duration = "..."
r.remainder = ""
case snapper.SnapStarted:
r.duration = dur(time.Since(fs.StartAt))
r.remainder = fmt.Sprintf("snap name: %q", fs.SnapName)
case snapper.SnapDone:
fallthrough
case snapper.SnapError:
r.duration = dur(fs.DoneAt.Sub(fs.StartAt))
r.remainder = fmt.Sprintf("snap name: %q", fs.SnapName)
}
rows[i] = r
if len(r.path) > widths.path {
widths.path = len(r.path)
}
if len(r.state) > widths.state {
widths.state = len(r.state)
}
if len(r.duration) > widths.duration {
widths.duration = len(r.duration)
}
}
for _, r := range rows {
path := rightPad(r.path, widths.path, " ")
state := rightPad(r.state, widths.state, " ")
duration := rightPad(r.duration, widths.duration, " ")
t.printf("%s %s %s", path, state, duration)
t.printfDrawIndentedAndWrappedIfMultiline(" %s", r.remainder)
if r.hookReport != "" {
t.printfDrawIndentedAndWrappedIfMultiline("%s", r.hookReport)
}
t.newline()
}
}
func times(str string, n int) (out string) {
for i := 0; i < n; i++ {
out += str
}
return
}
func rightPad(str string, length int, pad string) string {
if len(str) > length {
return str[:length]
}
return str + strings.Repeat(pad, length-len(str))
}
var arrowPositions = `>\|/`
// changeCount = 0 indicates stall / no progress
func (t *tui) drawBar(length int, bytes, totalBytes int64, changeCount int) {
var completedLength int
if totalBytes > 0 {
completedLength = int(int64(length) * bytes / totalBytes)
if completedLength > length {
completedLength = length
}
} else if totalBytes == bytes {
completedLength = length
}
t.write("[")
t.write(times("=", completedLength))
t.write(string(arrowPositions[changeCount%len(arrowPositions)]))
t.write(times("-", length-completedLength))
t.write("]")
}
func (t *tui) printFilesystemStatus(rep *report.FilesystemReport, active bool, maxFS int) {
expected, replicated, containsInvalidSizeEstimates := rep.BytesSum()
sizeEstimationImpreciseNotice := ""
if containsInvalidSizeEstimates {
sizeEstimationImpreciseNotice = " (some steps lack size estimation)"
}
if rep.CurrentStep < len(rep.Steps) && rep.Steps[rep.CurrentStep].Info.BytesExpected == 0 {
sizeEstimationImpreciseNotice = " (step lacks size estimation)"
}
status := fmt.Sprintf("%s (step %d/%d, %s/%s)%s",
strings.ToUpper(string(rep.State)),
rep.CurrentStep, len(rep.Steps),
ByteCountBinary(replicated), ByteCountBinary(expected),
sizeEstimationImpreciseNotice,
)
activeIndicator := " "
if active {
activeIndicator = "*"
}
t.printf("%s %s %s ",
activeIndicator,
rightPad(rep.Info.Name, maxFS, " "),
status)
next := ""
if err := rep.Error(); err != nil {
next = err.Err
} else if rep.State != report.FilesystemDone {
if nextStep := rep.NextStep(); nextStep != nil {
if nextStep.IsIncremental() {
next = fmt.Sprintf("next: %s => %s", nextStep.Info.From, nextStep.Info.To)
} else {
next = fmt.Sprintf("next: full send %s", nextStep.Info.To)
}
attribs := []string{}
if nextStep.Info.Resumed {
attribs = append(attribs, "resumed")
}
attribs = append(attribs, fmt.Sprintf("encrypted=%s", nextStep.Info.Encrypted))
next += fmt.Sprintf(" (%s)", strings.Join(attribs, ", "))
} else {
next = "" // individual FSes may still be in planning state
}
}
t.printfDrawIndentedAndWrappedIfMultiline("%s", next)
t.newline()
}
func ByteCountBinary(b int64) string {
const unit = 1024
if b < unit {
return fmt.Sprintf("%d B", b)
}
div, exp := int64(unit), 0
for n := b / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f %ciB", float64(b)/float64(div), "KMGTPE"[exp])
}
func humanizeDuration(duration time.Duration) string {
days := int64(duration.Hours() / 24)
hours := int64(math.Mod(duration.Hours(), 24))
minutes := int64(math.Mod(duration.Minutes(), 60))
seconds := int64(math.Mod(duration.Seconds(), 60))
var parts []string
force := false
chunks := []int64{days, hours, minutes, seconds}
for i, chunk := range chunks {
if force || chunk > 0 {
padding := 0
if force {
padding = 2
}
parts = append(parts, fmt.Sprintf("%*d%c", padding, chunk, "dhms"[i]))
force = true
}
}
return strings.Join(parts, " ")
}