package client import ( "fmt" "github.com/gdamore/tcell/termbox" "github.com/pkg/errors" "github.com/spf13/pflag" "github.com/zrepl/yaml-config" "github.com/zrepl/zrepl/cli" "github.com/zrepl/zrepl/daemon" "github.com/zrepl/zrepl/daemon/job" "github.com/zrepl/zrepl/daemon/pruner" "github.com/zrepl/zrepl/replication" "github.com/zrepl/zrepl/replication/fsrep" "io" "math" "net/http" "os" "sort" "strings" "sync" "time" ) type bytesProgressHistory struct { changeCounter int lastChangeAt time.Time last int64 bpsIncreaseExpAvg float64 } func (p *bytesProgressHistory) Update(currentVal int64) (bytesPerSecondAvg int64, changeCount int) { if currentVal < p.last { *p = bytesProgressHistory{ last: currentVal, lastChangeAt: time.Now(), } } defer func() { p.last = currentVal }() if time.Now().Sub(p.lastChangeAt) > 3 *time.Second { // FIXME depends on refresh frequency p.changeCounter = 0 p.bpsIncreaseExpAvg = 0 } if currentVal != p.last { p.changeCounter++ p.lastChangeAt = time.Now() } byteIncrease := float64(currentVal - p.last) if byteIncrease < 0 { byteIncrease = 0 } const factor = 0.1 p.bpsIncreaseExpAvg = (1-factor) * p.bpsIncreaseExpAvg + factor *byteIncrease return int64(p.bpsIncreaseExpAvg), p.changeCounter } type tui struct { x, y int indent int lock sync.Mutex //For report and error report map[string]job.Status err error replicationProgress map[string]*bytesProgressHistory // by job name } func newTui() tui { return tui{ replicationProgress: make(map[string]*bytesProgressHistory, 0), } } func (t *tui) moveCursor(x, y int) { t.x += x t.y += y } const INDENT_MULTIPLIER = 4 func (t *tui) moveLine(dl int, col int) { t.y += dl t.x = t.indent*INDENT_MULTIPLIER + col } func (t *tui) write(text string) { for _, c := range text { if c == '\n' { t.newline() continue } termbox.SetCell(t.x, t.y, c, termbox.ColorDefault, termbox.ColorDefault) t.x += 1 } } func (t *tui) printf(text string, a ...interface{}) { t.write(fmt.Sprintf(text, a...)) } func wrap(s string, width int) string { var b strings.Builder for len(s) > 0 { rem := width if rem > len(s) { rem = len(s) } if idx := strings.IndexAny(s, "\n\r"); idx != -1 && idx < rem { rem = idx+1 } untilNewline := strings.TrimSpace(s[:rem]) s = s[rem:] if len(untilNewline) == 0 { continue } b.WriteString(untilNewline) b.WriteString("\n") } return strings.TrimSpace(b.String()) } func (t *tui) printfDrawIndentedAndWrappedIfMultiline(format string, a ...interface{}) { whole := fmt.Sprintf(format, a...) width, _ := termbox.Size() if !strings.ContainsAny(whole, "\n\r") && t.x + len(whole) <= width { t.printf(format, a...) } else { t.addIndent(1) t.newline() t.write(wrap(whole, width - INDENT_MULTIPLIER*t.indent)) t.addIndent(-1) } } func (t *tui) newline() { t.moveLine(1, 0) } func (t *tui) setIndent(indent int) { t.indent = indent t.moveLine(0, 0) } func (t *tui) addIndent(indent int) { t.indent += indent t.moveLine(0, 0) } var statusFlags struct { Raw bool } var StatusCmd = &cli.Subcommand{ Use: "status", Short: "show job activity or dump as JSON for monitoring", SetupFlags: func(f *pflag.FlagSet) { f.BoolVar(&statusFlags.Raw, "raw", false, "dump raw status description from zrepl daemon") }, Run: runStatus, } func runStatus(s *cli.Subcommand, args []string) error { httpc, err := controlHttpClient(s.Config().Global.Control.SockPath) if err != nil { return err } if statusFlags.Raw { resp, err := httpc.Get("http://unix"+daemon.ControlJobEndpointStatus) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { fmt.Fprintf(os.Stderr, "Received error response:\n") io.CopyN(os.Stderr, resp.Body, 4096) return errors.Errorf("exit") } if _, err := io.Copy(os.Stdout, resp.Body); err != nil { return err } return nil } t := newTui() t.lock.Lock() t.err = errors.New("Got no report yet") t.lock.Unlock() err = termbox.Init() if err != nil { return err } defer termbox.Close() update := func() { m := make(map[string]job.Status) err2 := jsonRequestResponse(httpc, daemon.ControlJobEndpointStatus, struct{}{}, &m, ) t.lock.Lock() t.err = err2 t.report = m t.lock.Unlock() t.draw() } update() ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() go func() { for _ = range ticker.C { update() } }() termbox.HideCursor() termbox.Clear(termbox.ColorDefault, termbox.ColorDefault) loop: for { switch ev := termbox.PollEvent(); ev.Type { case termbox.EventKey: switch ev.Key { case termbox.KeyEsc: break loop case termbox.KeyCtrlC: break loop } case termbox.EventResize: t.draw() } } return nil } func (t *tui) getReplicationProgresHistory(jobName string) *bytesProgressHistory { p, ok := t.replicationProgress[jobName] if !ok { p = &bytesProgressHistory{} t.replicationProgress[jobName] = p } return p } func (t *tui) draw() { t.lock.Lock() defer t.lock.Unlock() termbox.Clear(termbox.ColorDefault, termbox.ColorDefault) t.x = 0 t.y = 0 t.indent = 0 if t.err != nil { t.write(t.err.Error()) } else { //Iterate over map in alphabetical order keys := make([]string, len(t.report)) i := 0 for k, _ := range t.report { keys[i] = k i++ } sort.Strings(keys) for _, k := range keys { v := t.report[k] if len(k) == 0 || daemon.IsInternalJobName(k) { //Internal job continue } t.setIndent(0) t.printf("Job: %s", k) t.setIndent(1) t.newline() t.printf("Type: %s", v.Type) t.setIndent(1) t.newline() if v.Type != job.TypePush && v.Type != job.TypePull { t.printf("No status representation for job type '%s', dumping as YAML", v.Type) t.newline() asYaml, err := yaml.Marshal(v.JobSpecific) if err != nil { t.printf("Error marshaling status to YAML: %s", err) t.newline() continue } t.write(string(asYaml)) t.newline() continue } pushStatus, ok := v.JobSpecific.(*job.ActiveSideStatus) if !ok || pushStatus == nil { t.printf("ActiveSideStatus is null") t.newline() continue } t.printf("Replication:") t.newline() t.addIndent(1) t.renderReplicationReport(pushStatus.Replication, t.getReplicationProgresHistory(k)) t.addIndent(-1) t.printf("Pruning Sender:") t.newline() t.addIndent(1) t.renderPrunerReport(pushStatus.PruningSender) t.addIndent(-1) t.printf("Pruning Receiver:") t.newline() t.addIndent(1) t.renderPrunerReport(pushStatus.PruningReceiver) t.addIndent(-1) } } termbox.Flush() } func (t *tui) renderReplicationReport(rep *replication.Report, history *bytesProgressHistory) { if rep == nil { t.printf("...\n") return } all := make([]*fsrep.Report, 0, len(rep.Completed)+len(rep.Pending) + 1) all = append(all, rep.Completed...) all = append(all, rep.Pending...) if rep.Active != nil { all = append(all, rep.Active) } sort.Slice(all, func(i, j int) bool { return all[i].Filesystem < all[j].Filesystem }) state, err := replication.StateString(rep.Status) if err != nil { t.printf("Status: %q (parse error: %q)\n", rep.Status, err) return } t.printf("Status: %s", state) t.newline() if rep.Problem != "" { t.printf("Problem: ") t.printfDrawIndentedAndWrappedIfMultiline("%s", rep.Problem) t.newline() } if rep.SleepUntil.After(time.Now()) && !state.IsTerminal() { t.printf("Sleeping until %s (%s left)\n", rep.SleepUntil, rep.SleepUntil.Sub(time.Now())) } if state != replication.Planning && state != replication.PlanningError { // Progress: [---------------] sumUpFSRep := func(rep *fsrep.Report) (transferred, total int64) { for _, s := range rep.Pending { transferred += s.Bytes total += s.ExpectedBytes } for _, s := range rep.Completed { transferred += s.Bytes total += s.ExpectedBytes } return } var transferred, total int64 for _, fs := range all { fstx, fstotal := sumUpFSRep(fs) transferred += fstx total += fstotal } rate, changeCount := history.Update(transferred) t.write("Progress: ") t.drawBar(50, transferred, total, changeCount) t.write(fmt.Sprintf(" %s / %s @ %s/s", ByteCountBinary(transferred), ByteCountBinary(total), ByteCountBinary(rate))) t.newline() } var maxFSLen int for _, fs := range all { if len(fs.Filesystem) > maxFSLen { maxFSLen = len(fs.Filesystem) } } for _, fs := range all { t.printFilesystemStatus(fs, fs == rep.Active, maxFSLen) } } func (t *tui) renderPrunerReport(r *pruner.Report) { if r == nil { t.printf("...\n") return } state, err := pruner.StateString(r.State) if err != nil { t.printf("Status: %q (parse error: %q)\n", r.State, err) return } t.printf("Status: %s", state) t.newline() if r.Error != "" { t.printf("Error: %s\n", r.Error) } if r.SleepUntil.After(time.Now()) { t.printf("Sleeping until %s (%s left)\n", r.SleepUntil, r.SleepUntil.Sub(time.Now())) } type commonFS struct { *pruner.FSReport completed bool } all := make([]commonFS, 0, len(r.Pending) + len(r.Completed)) for i := range r.Pending { all = append(all, commonFS{&r.Pending[i], false}) } for i := range r.Completed { all = append(all, commonFS{&r.Completed[i], true}) } switch state { case pruner.Plan: fallthrough case pruner.PlanWait: fallthrough case pruner.ErrPerm: return } if len(all) == 0 { t.printf("nothing to do\n") return } var totalDestroyCount, completedDestroyCount int var maxFSname int for _, fs := range all { totalDestroyCount += len(fs.DestroyList) if fs.completed { completedDestroyCount += len(fs.DestroyList) } if maxFSname < len(fs.Filesystem) { maxFSname = len(fs.Filesystem) } } // global progress bar progress := int(math.Round(80 * float64(completedDestroyCount) / float64(totalDestroyCount))) t.write("Progress: ") t.write("[") t.write(times("=", progress)) t.write(">") t.write(times("-", 80 - progress)) t.write("]") t.printf(" %d/%d snapshots", completedDestroyCount, totalDestroyCount) t.newline() sort.SliceStable(all, func(i, j int) bool { return strings.Compare(all[i].Filesystem, all[j].Filesystem) == -1 }) // Draw a table-like representation of 'all' for _, fs := range all { t.write(rightPad(fs.Filesystem, maxFSname, " ")) t.write(" ") if fs.LastError != "" { t.printf("ERROR (%d): %s\n", fs.ErrorCount, fs.LastError) // whitespace is padding continue } pruneRuleActionStr := fmt.Sprintf("(destroy %d of %d snapshots)", len(fs.DestroyList), len(fs.SnapshotList)) if fs.completed { t.printf( "Completed %s\n", pruneRuleActionStr) continue } t.write("Pending ") // whitespace is padding 10 if len(fs.DestroyList) == 1 { t.write(fs.DestroyList[0].Name) } else { t.write(pruneRuleActionStr) } t.newline() } } const snapshotIndent = 1 func calculateMaxFSLength(all []*fsrep.Report) (maxFS, maxStatus int) { for _, e := range all { if len(e.Filesystem) > maxFS { maxFS = len(e.Filesystem) } all2 := make([]*fsrep.StepReport, 0, len(e.Pending) + len(e.Completed)) all2 = append(all2, e.Pending...) all2 = append(all2, e.Completed...) for _, e2 := range all2 { elen := len(e2.Problem) + len(e2.From) + len(e2.To) + 60 // random spacing, units, labels, etc if elen > maxStatus { maxStatus = elen } } } return } func times(str string, n int) (out string) { for i := 0; i < n; i++ { out += str } return } func rightPad(str string, length int, pad string) string { if len(str) > length { return str[:length] } return str + times(pad, length-len(str)) } func leftPad(str string, length int, pad string) string { if len(str) > length { return str[len(str)-length:] } return times(pad, length-len(str)) + str } var arrowPositions = `>\|/` // changeCount = 0 indicates stall / no progresss func (t *tui) drawBar(length int, bytes, totalBytes int64, changeCount int) { var completedLength int if totalBytes > 0 { completedLength = int(int64(length) * bytes / totalBytes) if completedLength > length { completedLength = length } } else if totalBytes == bytes { completedLength = length } t.write("[") t.write(times("=", completedLength)) t.write( string(arrowPositions[changeCount%len(arrowPositions)])) t.write(times("-", length-completedLength)) t.write("]") } func StringStepState(s fsrep.StepState) string { switch s { case fsrep.StepReplicationReady: return "Ready" case fsrep.StepMarkReplicatedReady: return "MarkReady" case fsrep.StepCompleted: return "Completed" default: return fmt.Sprintf("UNKNOWN %d", s) } } func (t *tui) printFilesystemStatus(rep *fsrep.Report, active bool, maxFS int) { bytes := int64(0) totalBytes := int64(0) for _, s := range rep.Pending { bytes += s.Bytes totalBytes += s.ExpectedBytes } for _, s := range rep.Completed { bytes += s.Bytes totalBytes += s.ExpectedBytes } status := fmt.Sprintf("%s (step %d/%d, %s/%s)", rep.Status, len(rep.Completed), len(rep.Pending) + len(rep.Completed), ByteCountBinary(bytes), ByteCountBinary(totalBytes), ) activeIndicator := " " if active { activeIndicator = "*" } t.printf("%s %s %s ", activeIndicator, rightPad(rep.Filesystem, maxFS, " "), status) next := "" if rep.Problem != "" { next = rep.Problem } else if len(rep.Pending) > 0 { if rep.Pending[0].From != "" { next = fmt.Sprintf("next: %s => %s", rep.Pending[0].From, rep.Pending[0].To) } else { next = fmt.Sprintf("next: %s (full)", rep.Pending[0].To) } } t.printfDrawIndentedAndWrappedIfMultiline("%s", next) t.newline() } func ByteCountBinary(b int64) string { const unit = 1024 if b < unit { return fmt.Sprintf("%d B", b) } div, exp := int64(unit), 0 for n := b / unit; n >= unit; n /= unit { div *= unit exp++ } return fmt.Sprintf("%.1f %ciB", float64(b)/float64(div), "KMGTPE"[exp]) }