diff --git a/client/status.go b/client/status.go index 2ac80ca..df1b793 100644 --- a/client/status.go +++ b/client/status.go @@ -8,8 +8,12 @@ import ( "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon" "github.com/zrepl/zrepl/daemon/job" + "github.com/zrepl/zrepl/daemon/pruner" + "github.com/zrepl/zrepl/replication" "github.com/zrepl/zrepl/replication/fsrep" + "math" "sort" + "strings" "sync" "time" "io" @@ -208,66 +212,182 @@ func (t *tui) draw() { t.newline() continue } - rep := pushStatus.Replication - if rep == nil { - t.newline() - continue - } - all := make([]*fsrep.Report, 0, len(rep.Completed)+len(rep.Pending) + 1) - all = append(all, rep.Completed...) - all = append(all, rep.Pending...) - if rep.Active != nil { - all = append(all, rep.Active) - } - sort.Slice(all, func(i, j int) bool { - return all[i].Filesystem < all[j].Filesystem - }) - - t.printf("Status: %s", rep.Status) + t.printf("Replication:") t.newline() - if rep.Problem != "" { - t.printf("Problem: %s", rep.Problem) - t.newline() - } - { // Progress: [---------------] - sumUpFSRep := func(rep *fsrep.Report) (transferred, total int64) { - for _, s := range rep.Pending { - transferred += s.Bytes - total += s.ExpectedBytes - } - for _, s := range rep.Completed { - transferred += s.Bytes - total += s.ExpectedBytes - } - return - } - var transferred, total int64 - for _, fs := range all { - fstx, fstotal := sumUpFSRep(fs) - transferred += fstx - total += fstotal - } - t.write("Progress: ") - t.drawBar(80, transferred, total) - t.write(fmt.Sprintf(" %s / %s", ByteCountBinary(transferred), ByteCountBinary(total))) - t.newline() - } + t.addIndent(1) + t.renderReplicationReport(pushStatus.Replication) + t.addIndent(-1) + + t.printf("Pruning Sender:") + t.newline() + t.addIndent(1) + t.renderPrunerReport(pushStatus.PruningSender) + t.addIndent(-1) + + t.printf("Pruning Receiver:") + t.newline() + t.addIndent(1) + t.renderPrunerReport(pushStatus.PruningReceiver) + t.addIndent(-1) - var maxFSLen int - for _, fs := range all { - if len(fs.Filesystem) > maxFSLen { - maxFSLen = len(fs.Filesystem) - } - } - for _, fs := range all { - printFilesystemStatus(fs, t, fs == rep.Active, maxFSLen) - } } } termbox.Flush() } +func (t *tui) renderReplicationReport(rep *replication.Report) { + if rep == nil { + t.printf("...\n") + return + } + + all := make([]*fsrep.Report, 0, len(rep.Completed)+len(rep.Pending) + 1) + all = append(all, rep.Completed...) + all = append(all, rep.Pending...) + if rep.Active != nil { + all = append(all, rep.Active) + } + sort.Slice(all, func(i, j int) bool { + return all[i].Filesystem < all[j].Filesystem + }) + + t.printf("Status: %s", rep.Status) + t.newline() + if rep.Problem != "" { + t.printf("Problem: %s", rep.Problem) + t.newline() + } + if rep.SleepUntil.After(time.Now()) { + t.printf("Sleeping until %s (%s left)\n", rep.SleepUntil, rep.SleepUntil.Sub(time.Now())) + } + { // Progress: [---------------] + sumUpFSRep := func(rep *fsrep.Report) (transferred, total int64) { + for _, s := range rep.Pending { + transferred += s.Bytes + total += s.ExpectedBytes + } + for _, s := range rep.Completed { + transferred += s.Bytes + total += s.ExpectedBytes + } + return + } + var transferred, total int64 + for _, fs := range all { + fstx, fstotal := sumUpFSRep(fs) + transferred += fstx + total += fstotal + } + t.write("Progress: ") + t.drawBar(80, transferred, total) + t.write(fmt.Sprintf(" %s / %s", ByteCountBinary(transferred), ByteCountBinary(total))) + t.newline() + } + + var maxFSLen int + for _, fs := range all { + if len(fs.Filesystem) > maxFSLen { + maxFSLen = len(fs.Filesystem) + } + } + for _, fs := range all { + printFilesystemStatus(fs, t, fs == rep.Active, maxFSLen) + } +} + +func (t *tui) renderPrunerReport(r *pruner.Report) { + if r == nil { + t.printf("...\n") + return + } + + t.printf("Status: %s", r.State) + t.newline() + + if r.Error != "" { + t.printf("Error: %s\n", r.Error) + } + if r.SleepUntil.After(time.Now()) { + t.printf("Sleeping until %s (%s left)\n", r.SleepUntil, r.SleepUntil.Sub(time.Now())) + } + + type commonFS struct { + *pruner.FSReport + completed bool + } + all := make([]commonFS, 0, len(r.Pending) + len(r.Completed)) + for i := range r.Pending { + all = append(all, commonFS{&r.Pending[i], false}) + } + for i := range r.Completed { + all = append(all, commonFS{&r.Completed[i], true}) + } + + if r.State == pruner.Plan.String() { + return + } + + if len(all) == 0 { + t.printf("nothing to do\n") + return + } + + var totalDestroyCount, completedDestroyCount int + var maxFSname int + for _, fs := range all { + totalDestroyCount += len(fs.DestroyList) + if fs.completed { + completedDestroyCount += len(fs.DestroyList) + } + if maxFSname < len(fs.Filesystem) { + maxFSname = len(fs.Filesystem) + } + } + + // global progress bar + progress := int(math.Round(80 * float64(completedDestroyCount) / float64(totalDestroyCount))) + t.write("Progress: ") + t.write("[") + t.write(times("=", progress)) + t.write(">") + t.write(times("-", 80 - progress)) + t.write("]") + t.printf(" %d/%d snapshots", completedDestroyCount, totalDestroyCount) + t.newline() + + sort.SliceStable(all, func(i, j int) bool { + return strings.Compare(all[i].Filesystem, all[j].Filesystem) == -1 + }) + + // Draw a table-like representation of 'all' + for _, fs := range all { + t.write(rightPad(fs.Filesystem, maxFSname, " ")) + t.write(" ") + if fs.Error != "" { + t.printf("ERROR: %s\n", fs.Error) // whitespace is padding + continue + } + + pruneRuleActionStr := fmt.Sprintf("(destroy %d of %d snapshots)", + len(fs.DestroyList), len(fs.SnapshotList)) + + if fs.completed { + t.printf( "Completed %s\n", pruneRuleActionStr) + continue + } + + t.write("Pending ") // whitespace is padding 10 + if len(fs.DestroyList) == 1 { + t.write(fs.DestroyList[0].Name) + } else { + t.write(pruneRuleActionStr) + } + t.newline() + } + +} + const snapshotIndent = 1 func calculateMaxFSLength(all []*fsrep.Report) (maxFS, maxStatus int) { for _, e := range all { @@ -357,9 +477,9 @@ func filesystemStatusString(rep *fsrep.Report, active bool, fsWidth int) (line s next = " problem: " + rep.Problem } else if len(rep.Pending) > 0 { if rep.Pending[0].From != "" { - next = fmt.Sprintf("next: %s => %s", rep.Pending[0].From, rep.Pending[0].To) + next = fmt.Sprintf(" next: %s => %s", rep.Pending[0].From, rep.Pending[0].To) } else { - next = fmt.Sprintf("next: %s (full)", rep.Pending[0].To) + next = fmt.Sprintf(" next: %s (full)", rep.Pending[0].To) } } status := fmt.Sprintf("%s (step %d/%d, %s/%s)%s", diff --git a/daemon/job/active.go b/daemon/job/active.go index 3595bd0..69cd7e8 100644 --- a/daemon/job/active.go +++ b/daemon/job/active.go @@ -193,6 +193,7 @@ func (j *ActiveSide) Name() string { return j.name } type ActiveSideStatus struct { Replication *replication.Report + PruningSender, PruningReceiver *pruner.Report } func (j *ActiveSide) Status() *Status { @@ -200,10 +201,15 @@ func (j *ActiveSide) Status() *Status { s := &ActiveSideStatus{} t := j.mode.Type() - if tasks.replication == nil { - return &Status{Type: t, JobSpecific: s} + if tasks.replication != nil { + s.Replication = tasks.replication.Report() + } + if tasks.prunerSender != nil { + s.PruningSender = tasks.prunerSender.Report() + } + if tasks.prunerReceiver != nil { + s.PruningReceiver = tasks.prunerReceiver.Report() } - s.Replication = tasks.replication.Report() return &Status{Type: t, JobSpecific: s} } diff --git a/daemon/pruner/pruner.go b/daemon/pruner/pruner.go index 83dd1bb..ebdfe1c 100644 --- a/daemon/pruner/pruner.go +++ b/daemon/pruner/pruner.go @@ -199,13 +199,65 @@ func (p *Pruner) prune(args args) { } } -func (p *Pruner) Report() interface{} { - return nil // FIXME TODO +type Report struct { + State string + SleepUntil time.Time + Error string + Pending, Completed []FSReport +} + +type FSReport struct { + Filesystem string + SnapshotList, DestroyList []SnapshotReport + Error string +} + +type SnapshotReport struct { + Name string + Replicated bool + Date time.Time +} + +func (p *Pruner) Report() *Report { + p.mtx.Lock() + defer p.mtx.Unlock() + + r := Report{State: p.state.String()} + + if p.state & PlanWait|ExecWait != 0 { + r.SleepUntil = p.sleepUntil + } + if p.state & PlanWait|ExecWait|ErrPerm != 0 { + if p.err != nil { + r.Error = p.err.Error() + } + } + + if p.state & Plan|PlanWait == 0 { + return &r + } + + r.Pending = make([]FSReport, len(p.prunePending)) + for i, fs := range p.prunePending{ + r.Pending[i] = fs.Report() + } + r.Completed = make([]FSReport, len(p.pruneCompleted)) + for i, fs := range p.pruneCompleted{ + r.Completed[i] = fs.Report() + } + + return &r } type fs struct { path string + + // snapshots presented by target + // (type snapshot) snaps []pruning.Snapshot + // destroy list returned by pruning.PruneSnapshots(snaps) + // (type snapshot) + destroyList []pruning.Snapshot mtx sync.RWMutex // for Plan @@ -218,12 +270,43 @@ func (f *fs) Update(err error) { f.err = err } +func (f *fs) Report() FSReport { + f.mtx.Lock() + defer f.mtx.Unlock() + + r := FSReport{} + r.Filesystem = f.path + if f.err != nil { + r.Error = f.err.Error() + } + + r.SnapshotList = make([]SnapshotReport, len(f.snaps)) + for i, snap := range f.snaps { + r.SnapshotList[i] = snap.(snapshot).Report() + } + + r.DestroyList = make([]SnapshotReport, len(f.destroyList)) + for i, snap := range f.destroyList{ + r.DestroyList[i] = snap.(snapshot).Report() + } + + return r +} + type snapshot struct { replicated bool date time.Time fsv *pdu.FilesystemVersion } +func (s snapshot) Report() SnapshotReport { + return SnapshotReport{ + Name: s.Name(), + Replicated: s.Replicated(), + Date: s.Date(), + } +} + var _ pruning.Snapshot = snapshot{} func (s snapshot) Name() string { return s.fsv.Name } @@ -344,6 +427,9 @@ fsloop: continue fsloop } + // Apply prune rules + pfs.destroyList = pruning.PruneSnapshots(pfs.snaps, a.rules) + } return u(func(pruner *Pruner) { @@ -378,11 +464,9 @@ func stateExec(a *args, u updater) state { return state.statefunc() } - GetLogger(a.ctx).Debug(fmt.Sprintf("%#v", a.rules)) - destroyListI := pruning.PruneSnapshots(pfs.snaps, a.rules) - destroyList := make([]*pdu.FilesystemVersion, len(destroyListI)) + destroyList := make([]*pdu.FilesystemVersion, len(pfs.destroyList)) for i := range destroyList { - destroyList[i] = destroyListI[i].(snapshot).fsv + destroyList[i] = pfs.destroyList[i].(snapshot).fsv GetLogger(a.ctx). WithField("fs", pfs.path). WithField("destroy_snap", destroyList[i].Name).