zrepl/client/status.go

811 lines
19 KiB
Go
Raw Normal View History

2018-08-29 19:18:54 +02:00
package client
import (
"fmt"
2019-03-22 19:41:12 +01:00
"io"
"math"
"net/http"
"os"
"sort"
"strings"
"sync"
"time"
// tcell is the termbox-compatible library for abstracting away escape sequences, etc.
// as of tcell#252, the number of default distributed terminals is relatively limited
// additional terminal definitions can be included via side-effect import
// See https://github.com/gdamore/tcell/blob/master/terminfo/base/base.go
// See https://github.com/gdamore/tcell/issues/252#issuecomment-533836078
"github.com/gdamore/tcell/termbox"
_ "github.com/gdamore/tcell/terminfo/s/screen" // tmux on FreeBSD 11 & 12 without ncurses
2018-08-29 19:18:54 +02:00
"github.com/pkg/errors"
"github.com/spf13/pflag"
"github.com/zrepl/yaml-config"
2019-03-22 19:41:12 +01:00
"github.com/zrepl/zrepl/cli"
2018-08-29 22:06:24 +02:00
"github.com/zrepl/zrepl/daemon"
"github.com/zrepl/zrepl/daemon/job"
"github.com/zrepl/zrepl/daemon/pruner"
"github.com/zrepl/zrepl/daemon/snapper"
"github.com/zrepl/zrepl/replication/report"
2018-08-29 19:18:54 +02:00
)
type byteProgressMeasurement struct {
time time.Time
2019-03-22 19:41:12 +01:00
val int64
}
type bytesProgressHistory struct {
2019-03-22 19:41:12 +01:00
last *byteProgressMeasurement // pointer as poor man's optional
changeCount int
2019-03-22 19:41:12 +01:00
lastChange time.Time
bpsAvg float64
}
func (p *bytesProgressHistory) Update(currentVal int64) (bytesPerSecondAvg int64, changeCount int) {
if p.last == nil {
p.last = &byteProgressMeasurement{
time: time.Now(),
2019-03-22 19:41:12 +01:00
val: currentVal,
}
return 0, 0
}
if p.last.val != currentVal {
p.changeCount++
p.lastChange = time.Now()
}
if time.Since(p.lastChange) > 3*time.Second {
p.last = nil
return 0, 0
}
2019-03-22 19:41:12 +01:00
deltaV := currentVal - p.last.val
deltaT := time.Since(p.last.time)
rate := float64(deltaV) / deltaT.Seconds()
factor := 0.3
2019-03-22 19:41:12 +01:00
p.bpsAvg = (1-factor)*p.bpsAvg + factor*rate
p.last.time = time.Now()
p.last.val = currentVal
return int64(p.bpsAvg), p.changeCount
}
2018-08-29 19:18:54 +02:00
type tui struct {
2018-08-29 22:06:24 +02:00
x, y int
2018-08-29 19:18:54 +02:00
indent int
2018-08-29 22:06:24 +02:00
lock sync.Mutex //For report and error
report map[string]job.Status
2018-08-29 22:06:24 +02:00
err error
2019-09-09 22:17:18 +02:00
jobFilter string
replicationProgress map[string]*bytesProgressHistory // by job name
2018-08-29 19:18:54 +02:00
}
func newTui() tui {
return tui{
replicationProgress: make(map[string]*bytesProgressHistory),
}
2018-08-29 19:18:54 +02:00
}
const INDENT_MULTIPLIER = 4
2018-08-29 19:18:54 +02:00
func (t *tui) moveLine(dl int, col int) {
t.y += dl
t.x = t.indent*INDENT_MULTIPLIER + col
2018-08-29 19:18:54 +02:00
}
func (t *tui) write(text string) {
for _, c := range text {
if c == '\n' {
t.newline()
continue
}
2018-08-29 19:18:54 +02:00
termbox.SetCell(t.x, t.y, c, termbox.ColorDefault, termbox.ColorDefault)
t.x += 1
}
}
func (t *tui) printf(text string, a ...interface{}) {
t.write(fmt.Sprintf(text, a...))
}
func wrap(s string, width int) string {
var b strings.Builder
for len(s) > 0 {
rem := width
if rem > len(s) {
rem = len(s)
}
if idx := strings.IndexAny(s, "\n\r"); idx != -1 && idx < rem {
2019-03-22 19:41:12 +01:00
rem = idx + 1
}
untilNewline := strings.TrimRight(s[:rem], "\n\r")
s = s[rem:]
if len(untilNewline) == 0 {
continue
}
b.WriteString(untilNewline)
b.WriteString("\n")
}
return strings.TrimRight(b.String(), "\n\r")
}
func (t *tui) printfDrawIndentedAndWrappedIfMultiline(format string, a ...interface{}) {
whole := fmt.Sprintf(format, a...)
width, _ := termbox.Size()
2019-03-22 19:41:12 +01:00
if !strings.ContainsAny(whole, "\n\r") && t.x+len(whole) <= width {
t.printf(format, a...)
} else {
t.addIndent(1)
t.newline()
2019-03-22 19:41:12 +01:00
t.write(wrap(whole, width-INDENT_MULTIPLIER*t.indent))
t.addIndent(-1)
}
}
2018-08-29 19:18:54 +02:00
func (t *tui) newline() {
t.moveLine(1, 0)
}
func (t *tui) setIndent(indent int) {
t.indent = indent
t.moveLine(0, 0)
}
func (t *tui) addIndent(indent int) {
t.indent += indent
t.moveLine(0, 0)
}
var statusFlags struct {
2018-09-23 14:44:53 +02:00
Raw bool
2019-09-09 22:17:18 +02:00
Job string
2018-09-23 14:44:53 +02:00
}
var StatusCmd = &cli.Subcommand{
Use: "status",
Short: "show job activity or dump as JSON for monitoring",
SetupFlags: func(f *pflag.FlagSet) {
f.BoolVar(&statusFlags.Raw, "raw", false, "dump raw status description from zrepl daemon")
2019-09-09 22:17:18 +02:00
f.StringVar(&statusFlags.Job, "job", "", "only dump specified job")
},
Run: runStatus,
}
func runStatus(s *cli.Subcommand, args []string) error {
httpc, err := controlHttpClient(s.Config().Global.Control.SockPath)
2018-08-29 19:18:54 +02:00
if err != nil {
return err
}
if statusFlags.Raw {
2019-03-22 19:41:12 +01:00
resp, err := httpc.Get("http://unix" + daemon.ControlJobEndpointStatus)
2018-09-23 14:44:53 +02:00
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
fmt.Fprintf(os.Stderr, "Received error response:\n")
_, err := io.CopyN(os.Stderr, resp.Body, 4096)
if err != nil {
return err
}
2018-09-23 14:44:53 +02:00
return errors.Errorf("exit")
}
if _, err := io.Copy(os.Stdout, resp.Body); err != nil {
return err
}
return nil
}
2018-08-29 19:18:54 +02:00
t := newTui()
t.lock.Lock()
t.err = errors.New("Got no report yet")
t.lock.Unlock()
2019-09-09 22:17:18 +02:00
t.jobFilter = statusFlags.Job
2018-08-29 19:18:54 +02:00
err = termbox.Init()
if err != nil {
return err
}
defer termbox.Close()
2018-08-29 22:06:24 +02:00
update := func() {
m := make(map[string]job.Status)
2018-08-29 22:06:24 +02:00
err2 := jsonRequestResponse(httpc, daemon.ControlJobEndpointStatus,
struct{}{},
&m,
)
t.lock.Lock()
t.err = err2
t.report = m
t.lock.Unlock()
t.draw()
}
update()
2018-08-29 19:18:54 +02:00
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
go func() {
for range ticker.C {
2018-08-29 22:06:24 +02:00
update()
2018-08-29 19:18:54 +02:00
}
}()
termbox.HideCursor()
termbox.Clear(termbox.ColorDefault, termbox.ColorDefault)
2018-08-29 22:06:24 +02:00
loop:
2018-08-29 19:18:54 +02:00
for {
switch ev := termbox.PollEvent(); ev.Type {
case termbox.EventKey:
switch ev.Key {
case termbox.KeyEsc:
break loop
case termbox.KeyCtrlC:
break loop
}
case termbox.EventResize:
t.draw()
}
}
return nil
}
func (t *tui) getReplicationProgressHistory(jobName string) *bytesProgressHistory {
p, ok := t.replicationProgress[jobName]
if !ok {
p = &bytesProgressHistory{}
t.replicationProgress[jobName] = p
}
return p
}
2018-08-29 19:18:54 +02:00
func (t *tui) draw() {
t.lock.Lock()
defer t.lock.Unlock()
termbox.Clear(termbox.ColorDefault, termbox.ColorDefault)
t.x = 0
t.y = 0
t.indent = 0
if t.err != nil {
t.write(t.err.Error())
} else {
//Iterate over map in alphabetical order
2019-09-09 22:17:18 +02:00
keys := make([]string, 0, len(t.report))
for k := range t.report {
2019-09-09 22:17:18 +02:00
if len(k) == 0 || daemon.IsInternalJobName(k) { //Internal job
continue
}
if t.jobFilter != "" && k != t.jobFilter {
continue
}
keys = append(keys, k)
2018-08-29 19:18:54 +02:00
}
sort.Strings(keys)
2019-09-09 22:17:18 +02:00
if len(keys) == 0 {
t.setIndent(0)
t.printf("no jobs to display")
t.newline()
termbox.Flush()
return
}
2018-08-29 19:18:54 +02:00
for _, k := range keys {
v := t.report[k]
2019-09-09 22:17:18 +02:00
2018-08-29 19:18:54 +02:00
t.setIndent(0)
t.printf("Job: %s", k)
t.setIndent(1)
t.newline()
t.printf("Type: %s", v.Type)
t.setIndent(1)
t.newline()
2018-11-21 04:06:13 +01:00
if v.Type == job.TypePush || v.Type == job.TypePull {
activeStatus, ok := v.JobSpecific.(*job.ActiveSideStatus)
if !ok || activeStatus == nil {
t.printf("ActiveSideStatus is null")
t.newline()
continue
}
t.printf("Replication:")
t.newline()
t.addIndent(1)
t.renderReplicationReport(activeStatus.Replication, t.getReplicationProgressHistory(k))
2018-11-21 04:06:13 +01:00
t.addIndent(-1)
t.printf("Pruning Sender:")
t.newline()
t.addIndent(1)
t.renderPrunerReport(activeStatus.PruningSender)
t.addIndent(-1)
t.printf("Pruning Receiver:")
t.newline()
t.addIndent(1)
t.renderPrunerReport(activeStatus.PruningReceiver)
t.addIndent(-1)
if v.Type == job.TypePush {
t.printf("Snapshotting:")
t.newline()
t.addIndent(1)
t.renderSnapperReport(activeStatus.Snapshotting)
t.addIndent(-1)
}
2018-11-21 04:06:13 +01:00
} else if v.Type == job.TypeSnap {
snapStatus, ok := v.JobSpecific.(*job.SnapJobStatus)
if !ok || snapStatus == nil {
t.printf("SnapJobStatus is null")
t.newline()
continue
}
t.printf("Pruning snapshots:")
t.newline()
t.addIndent(1)
t.renderPrunerReport(snapStatus.Pruning)
t.addIndent(-1)
t.printf("Snapshotting:")
t.newline()
t.addIndent(1)
t.renderSnapperReport(snapStatus.Snapshotting)
t.addIndent(-1)
} else if v.Type == job.TypeSource {
st := v.JobSpecific.(*job.PassiveStatus)
t.printf("Snapshotting:\n")
t.addIndent(1)
t.renderSnapperReport(st.Snapper)
t.addIndent(-1)
2018-11-21 04:06:13 +01:00
} else {
t.printf("No status representation for job type '%s', dumping as YAML", v.Type)
t.newline()
asYaml, err := yaml.Marshal(v.JobSpecific)
if err != nil {
t.printf("Error marshaling status to YAML: %s", err)
t.newline()
continue
}
t.write(string(asYaml))
t.newline()
continue
}
}
}
termbox.Flush()
}
func (t *tui) renderReplicationReport(rep *report.Report, history *bytesProgressHistory) {
if rep == nil {
t.printf("...\n")
return
}
if rep.WaitReconnectError != nil {
t.printfDrawIndentedAndWrappedIfMultiline("Connectivity: %s", rep.WaitReconnectError)
t.newline()
}
if !rep.WaitReconnectSince.IsZero() {
delta := time.Until(rep.WaitReconnectUntil).Round(time.Second)
if rep.WaitReconnectUntil.IsZero() || delta > 0 {
var until string
if rep.WaitReconnectUntil.IsZero() {
until = "waiting indefinitely"
} else {
until = fmt.Sprintf("hard fail in %s @ %s", delta, rep.WaitReconnectUntil)
}
t.printfDrawIndentedAndWrappedIfMultiline("Connectivity: reconnecting with exponential backoff (since %s) (%s)",
rep.WaitReconnectSince, until)
} else {
t.printfDrawIndentedAndWrappedIfMultiline("Connectivity: reconnects reached hard-fail timeout @ %s", rep.WaitReconnectUntil)
}
t.newline()
}
// TODO visualize more than the latest attempt by folding all attempts into one
if len(rep.Attempts) == 0 {
t.printf("no attempts made yet")
return
} else {
t.printf("Attempt #%d", len(rep.Attempts))
if len(rep.Attempts) > 1 {
2019-06-07 07:04:57 +02:00
t.printf(". Previous attempts failed with the following statuses:")
t.newline()
t.addIndent(1)
for i, a := range rep.Attempts[:len(rep.Attempts)-1] {
2019-03-22 19:41:12 +01:00
t.printfDrawIndentedAndWrappedIfMultiline("#%d: %s (failed at %s) (ran %s)", i+1, a.State, a.FinishAt, a.FinishAt.Sub(a.StartAt))
t.newline()
}
t.addIndent(-1)
} else {
t.newline()
}
}
latest := rep.Attempts[len(rep.Attempts)-1]
sort.Slice(latest.Filesystems, func(i, j int) bool {
return latest.Filesystems[i].Info.Name < latest.Filesystems[j].Info.Name
})
t.printf("Status: %s", latest.State)
t.newline()
if latest.State == report.AttemptPlanningError {
t.printf("Problem: ")
t.printfDrawIndentedAndWrappedIfMultiline("%s", latest.PlanError)
t.newline()
} else if latest.State == report.AttemptFanOutError {
t.printf("Problem: one or more of the filesystems encountered errors")
t.newline()
}
if latest.State != report.AttemptPlanning && latest.State != report.AttemptPlanningError {
// Draw global progress bar
// Progress: [---------------]
expected, replicated, containsInvalidSizeEstimates := latest.BytesSum()
rate, changeCount := history.Update(replicated)
2020-01-20 15:26:07 +01:00
eta := time.Duration(0)
if rate > 0 {
eta = time.Duration((expected-replicated)/rate) * time.Second
}
t.write("Progress: ")
t.drawBar(50, replicated, expected, changeCount)
t.write(fmt.Sprintf(" %s / %s @ %s/s", ByteCountBinary(replicated), ByteCountBinary(expected), ByteCountBinary(rate)))
2020-01-20 15:26:07 +01:00
if eta != 0 {
t.write(fmt.Sprintf(" (%s remaining)", humanizeDuration(eta)))
}
t.newline()
if containsInvalidSizeEstimates {
t.write("NOTE: not all steps could be size-estimated, total estimate is likely imprecise!")
t.newline()
}
var maxFSLen int
for _, fs := range latest.Filesystems {
if len(fs.Info.Name) > maxFSLen {
maxFSLen = len(fs.Info.Name)
}
}
for _, fs := range latest.Filesystems {
t.printFilesystemStatus(fs, false, maxFSLen) // FIXME bring 'active' flag back
}
}
}
func (t *tui) renderPrunerReport(r *pruner.Report) {
if r == nil {
t.printf("...\n")
return
}
state, err := pruner.StateString(r.State)
if err != nil {
t.printf("Status: %q (parse error: %q)\n", r.State, err)
return
}
t.printf("Status: %s", state)
t.newline()
if r.Error != "" {
t.printf("Error: %s\n", r.Error)
}
type commonFS struct {
*pruner.FSReport
completed bool
}
2019-03-22 19:41:12 +01:00
all := make([]commonFS, 0, len(r.Pending)+len(r.Completed))
for i := range r.Pending {
all = append(all, commonFS{&r.Pending[i], false})
}
for i := range r.Completed {
all = append(all, commonFS{&r.Completed[i], true})
}
switch state {
2019-03-22 19:41:12 +01:00
case pruner.Plan:
fallthrough
case pruner.PlanErr:
return
}
if len(all) == 0 {
t.printf("nothing to do\n")
return
}
var totalDestroyCount, completedDestroyCount int
var maxFSname int
for _, fs := range all {
totalDestroyCount += len(fs.DestroyList)
if fs.completed {
completedDestroyCount += len(fs.DestroyList)
}
if maxFSname < len(fs.Filesystem) {
maxFSname = len(fs.Filesystem)
}
}
// global progress bar
progress := int(math.Round(80 * float64(completedDestroyCount) / float64(totalDestroyCount)))
t.write("Progress: ")
t.write("[")
t.write(times("=", progress))
t.write(">")
2019-03-22 19:41:12 +01:00
t.write(times("-", 80-progress))
t.write("]")
t.printf(" %d/%d snapshots", completedDestroyCount, totalDestroyCount)
t.newline()
sort.SliceStable(all, func(i, j int) bool {
return strings.Compare(all[i].Filesystem, all[j].Filesystem) == -1
})
// Draw a table-like representation of 'all'
for _, fs := range all {
t.write(rightPad(fs.Filesystem, maxFSname, " "))
t.write(" ")
if !fs.SkipReason.NotSkipped() {
t.printf("skipped: %s\n", fs.SkipReason)
continue
}
if fs.LastError != "" {
if strings.ContainsAny(fs.LastError, "\r\n") {
t.printf("ERROR:")
2019-03-22 19:41:12 +01:00
t.printfDrawIndentedAndWrappedIfMultiline("%s\n", fs.LastError)
} else {
2019-03-22 19:41:12 +01:00
t.printfDrawIndentedAndWrappedIfMultiline("ERROR: %s\n", fs.LastError)
}
t.newline()
continue
}
pruneRuleActionStr := fmt.Sprintf("(destroy %d of %d snapshots)",
len(fs.DestroyList), len(fs.SnapshotList))
if fs.completed {
2019-03-22 19:41:12 +01:00
t.printf("Completed %s\n", pruneRuleActionStr)
continue
}
t.write("Pending ") // whitespace is padding 10
if len(fs.DestroyList) == 1 {
t.write(fs.DestroyList[0].Name)
} else {
t.write(pruneRuleActionStr)
}
t.newline()
}
2018-08-29 19:18:54 +02:00
}
func (t *tui) renderSnapperReport(r *snapper.Report) {
if r == nil {
t.printf("<snapshot type does not have a report>\n")
return
}
t.printf("Status: %s", r.State)
t.newline()
if r.Error != "" {
t.printf("Error: %s\n", r.Error)
}
if !r.SleepUntil.IsZero() {
t.printf("Sleep until: %s\n", r.SleepUntil)
}
sort.Slice(r.Progress, func(i, j int) bool {
return strings.Compare(r.Progress[i].Path, r.Progress[j].Path) == -1
})
t.addIndent(1)
defer t.addIndent(-1)
dur := func(d time.Duration) string {
return d.Round(100 * time.Millisecond).String()
}
type row struct {
path, state, duration, remainder, hookReport string
}
var widths struct {
path, state, duration int
}
rows := make([]*row, len(r.Progress))
for i, fs := range r.Progress {
r := &row{
path: fs.Path,
state: fs.State.String(),
}
if fs.HooksHadError {
r.hookReport = fs.Hooks // FIXME render here, not in daemon
}
switch fs.State {
case snapper.SnapPending:
r.duration = "..."
r.remainder = ""
case snapper.SnapStarted:
r.duration = dur(time.Since(fs.StartAt))
r.remainder = fmt.Sprintf("snap name: %q", fs.SnapName)
case snapper.SnapDone:
fallthrough
case snapper.SnapError:
r.duration = dur(fs.DoneAt.Sub(fs.StartAt))
r.remainder = fmt.Sprintf("snap name: %q", fs.SnapName)
}
rows[i] = r
if len(r.path) > widths.path {
widths.path = len(r.path)
}
if len(r.state) > widths.state {
widths.state = len(r.state)
}
if len(r.duration) > widths.duration {
widths.duration = len(r.duration)
}
}
for _, r := range rows {
path := rightPad(r.path, widths.path, " ")
state := rightPad(r.state, widths.state, " ")
duration := rightPad(r.duration, widths.duration, " ")
t.printf("%s %s %s", path, state, duration)
t.printfDrawIndentedAndWrappedIfMultiline(" %s", r.remainder)
if r.hookReport != "" {
t.printfDrawIndentedAndWrappedIfMultiline("%s", r.hookReport)
}
t.newline()
}
}
2018-08-29 19:18:54 +02:00
func times(str string, n int) (out string) {
for i := 0; i < n; i++ {
out += str
}
return
}
func rightPad(str string, length int, pad string) string {
2018-08-30 14:07:28 +02:00
if len(str) > length {
return str[:length]
}
return str + strings.Repeat(pad, length-len(str))
2018-08-29 19:18:54 +02:00
}
var arrowPositions = `>\|/`
// changeCount = 0 indicates stall / no progress
func (t *tui) drawBar(length int, bytes, totalBytes int64, changeCount int) {
var completedLength int
if totalBytes > 0 {
completedLength = int(int64(length) * bytes / totalBytes)
2018-08-30 14:07:28 +02:00
if completedLength > length {
completedLength = length
}
} else if totalBytes == bytes {
completedLength = length
}
2018-08-29 19:18:54 +02:00
t.write("[")
t.write(times("=", completedLength))
2019-03-22 19:41:12 +01:00
t.write(string(arrowPositions[changeCount%len(arrowPositions)]))
t.write(times("-", length-completedLength))
t.write("]")
2018-08-29 19:18:54 +02:00
}
func (t *tui) printFilesystemStatus(rep *report.FilesystemReport, active bool, maxFS int) {
expected, replicated, containsInvalidSizeEstimates := rep.BytesSum()
sizeEstimationImpreciseNotice := ""
if containsInvalidSizeEstimates {
sizeEstimationImpreciseNotice = " (some steps lack size estimation)"
}
if rep.CurrentStep < len(rep.Steps) && rep.Steps[rep.CurrentStep].Info.BytesExpected == 0 {
sizeEstimationImpreciseNotice = " (step lacks size estimation)"
}
status := fmt.Sprintf("%s (step %d/%d, %s/%s)%s",
strings.ToUpper(string(rep.State)),
rep.CurrentStep, len(rep.Steps),
ByteCountBinary(replicated), ByteCountBinary(expected),
sizeEstimationImpreciseNotice,
)
activeIndicator := " "
if active {
activeIndicator = "*"
}
t.printf("%s %s %s ",
activeIndicator,
rightPad(rep.Info.Name, maxFS, " "),
status)
next := ""
if err := rep.Error(); err != nil {
next = err.Err
} else if rep.State != report.FilesystemDone {
if nextStep := rep.NextStep(); nextStep != nil {
if nextStep.IsIncremental() {
next = fmt.Sprintf("next: %s => %s", nextStep.Info.From, nextStep.Info.To)
} else {
new features: {resumable,encrypted,hold-protected} send-recv, last-received-hold - **Resumable Send & Recv Support** No knobs required, automatically used where supported. - **Hold-Protected Send & Recv** Automatic ZFS holds to ensure that we can always resume a replication step. - **Encrypted Send & Recv Support** for OpenZFS native encryption. Configurable at the job level, i.e., for all filesystems a job is responsible for. - **Receive-side hold on last received dataset** The counterpart to the replication cursor bookmark on the send-side. Ensures that incremental replication will always be possible between a sender and receiver. Design Doc ---------- `replication/design.md` doc describes how we use ZFS holds and bookmarks to ensure that a single replication step is always resumable. The replication algorithm described in the design doc introduces the notion of job IDs (please read the details on this design doc). We reuse the job names for job IDs and use `JobID` type to ensure that a job name can be embedded into hold tags, bookmark names, etc. This might BREAK CONFIG on upgrade. Protocol Version Bump --------------------- This commit makes backwards-incompatible changes to the replication/pdu protobufs. Thus, bump the version number used in the protocol handshake. Replication Cursor Format Change -------------------------------- The new replication cursor bookmark format is: `#zrepl_CURSOR_G_${this.GUID}_J_${jobid}` Including the GUID enables transaction-safe moving-forward of the cursor. Including the job id enables that multiple sending jobs can send the same filesystem without interfering. The `zrepl migrate replication-cursor:v1-v2` subcommand can be used to safely destroy old-format cursors once zrepl has created new-format cursors. Changes in This Commit ---------------------- - package zfs - infrastructure for holds - infrastructure for resume token decoding - implement a variant of OpenZFS's `entity_namecheck` and use it for validation in new code - ZFSSendArgs to specify a ZFS send operation - validation code protects against malicious resume tokens by checking that the token encodes the same send parameters that the send-side would use if no resume token were available (i.e. same filesystem, `fromguid`, `toguid`) - RecvOptions support for `recv -s` flag - convert a bunch of ZFS operations to be idempotent - achieved through more differentiated error message scraping / additional pre-/post-checks - package replication/pdu - add field for encryption to send request messages - add fields for resume handling to send & recv request messages - receive requests now contain `FilesystemVersion To` in addition to the filesystem into which the stream should be `recv`d into - can use `zfs recv $root_fs/$client_id/path/to/dataset@${To.Name}`, which enables additional validation after recv (i.e. whether `To.Guid` matched what we received in the stream) - used to set `last-received-hold` - package replication/logic - introduce `PlannerPolicy` struct, currently only used to configure whether encrypted sends should be requested from the sender - integrate encryption and resume token support into `Step` struct - package endpoint - move the concepts that endpoint builds on top of ZFS to a single file `endpoint/endpoint_zfs.go` - step-holds + step-bookmarks - last-received-hold - new replication cursor + old replication cursor compat code - adjust `endpoint/endpoint.go` handlers for - encryption - resumability - new replication cursor - last-received-hold - client subcommand `zrepl holds list`: list all holds and hold-like bookmarks that zrepl thinks belong to it - client subcommand `zrepl migrate replication-cursor:v1-v2`
2019-09-11 17:19:17 +02:00
next = fmt.Sprintf("next: full send %s", nextStep.Info.To)
}
new features: {resumable,encrypted,hold-protected} send-recv, last-received-hold - **Resumable Send & Recv Support** No knobs required, automatically used where supported. - **Hold-Protected Send & Recv** Automatic ZFS holds to ensure that we can always resume a replication step. - **Encrypted Send & Recv Support** for OpenZFS native encryption. Configurable at the job level, i.e., for all filesystems a job is responsible for. - **Receive-side hold on last received dataset** The counterpart to the replication cursor bookmark on the send-side. Ensures that incremental replication will always be possible between a sender and receiver. Design Doc ---------- `replication/design.md` doc describes how we use ZFS holds and bookmarks to ensure that a single replication step is always resumable. The replication algorithm described in the design doc introduces the notion of job IDs (please read the details on this design doc). We reuse the job names for job IDs and use `JobID` type to ensure that a job name can be embedded into hold tags, bookmark names, etc. This might BREAK CONFIG on upgrade. Protocol Version Bump --------------------- This commit makes backwards-incompatible changes to the replication/pdu protobufs. Thus, bump the version number used in the protocol handshake. Replication Cursor Format Change -------------------------------- The new replication cursor bookmark format is: `#zrepl_CURSOR_G_${this.GUID}_J_${jobid}` Including the GUID enables transaction-safe moving-forward of the cursor. Including the job id enables that multiple sending jobs can send the same filesystem without interfering. The `zrepl migrate replication-cursor:v1-v2` subcommand can be used to safely destroy old-format cursors once zrepl has created new-format cursors. Changes in This Commit ---------------------- - package zfs - infrastructure for holds - infrastructure for resume token decoding - implement a variant of OpenZFS's `entity_namecheck` and use it for validation in new code - ZFSSendArgs to specify a ZFS send operation - validation code protects against malicious resume tokens by checking that the token encodes the same send parameters that the send-side would use if no resume token were available (i.e. same filesystem, `fromguid`, `toguid`) - RecvOptions support for `recv -s` flag - convert a bunch of ZFS operations to be idempotent - achieved through more differentiated error message scraping / additional pre-/post-checks - package replication/pdu - add field for encryption to send request messages - add fields for resume handling to send & recv request messages - receive requests now contain `FilesystemVersion To` in addition to the filesystem into which the stream should be `recv`d into - can use `zfs recv $root_fs/$client_id/path/to/dataset@${To.Name}`, which enables additional validation after recv (i.e. whether `To.Guid` matched what we received in the stream) - used to set `last-received-hold` - package replication/logic - introduce `PlannerPolicy` struct, currently only used to configure whether encrypted sends should be requested from the sender - integrate encryption and resume token support into `Step` struct - package endpoint - move the concepts that endpoint builds on top of ZFS to a single file `endpoint/endpoint_zfs.go` - step-holds + step-bookmarks - last-received-hold - new replication cursor + old replication cursor compat code - adjust `endpoint/endpoint.go` handlers for - encryption - resumability - new replication cursor - last-received-hold - client subcommand `zrepl holds list`: list all holds and hold-like bookmarks that zrepl thinks belong to it - client subcommand `zrepl migrate replication-cursor:v1-v2`
2019-09-11 17:19:17 +02:00
attribs := []string{}
if nextStep.Info.Resumed {
attribs = append(attribs, "resumed")
}
attribs = append(attribs, fmt.Sprintf("encrypted=%s", nextStep.Info.Encrypted))
next += fmt.Sprintf(" (%s)", strings.Join(attribs, ", "))
} else {
next = "" // individual FSes may still be in planning state
}
}
t.printfDrawIndentedAndWrappedIfMultiline("%s", next)
t.newline()
2018-08-29 22:06:24 +02:00
}
func ByteCountBinary(b int64) string {
const unit = 1024
if b < unit {
return fmt.Sprintf("%d B", b)
}
div, exp := int64(unit), 0
for n := b / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f %ciB", float64(b)/float64(div), "KMGTPE"[exp])
}
2020-01-20 15:26:07 +01:00
func humanizeDuration(duration time.Duration) string {
days := int64(duration.Hours() / 24)
hours := int64(math.Mod(duration.Hours(), 24))
minutes := int64(math.Mod(duration.Minutes(), 60))
seconds := int64(math.Mod(duration.Seconds(), 60))
var parts []string
force := false
chunks := []int64{days, hours, minutes, seconds}
for i, chunk := range chunks {
if force || chunk > 0 {
padding := 0
if force {
padding = 2
}
parts = append(parts, fmt.Sprintf("%*d%c", padding, chunk, "dhms"[i]))
force = true
}
}
return strings.Join(parts, " ")
}