diff --git a/client/jsonclient.go b/client/jsonclient.go index 144f9b6..43bc903 100644 --- a/client/jsonclient.go +++ b/client/jsonclient.go @@ -1,13 +1,13 @@ package client import ( - "net/http" - "net" - "context" "bytes" + "context" "encoding/json" - "io" "github.com/pkg/errors" + "io" + "net" + "net/http" ) func controlHttpClient(sockpath string) (client http.Client, err error) { @@ -43,4 +43,3 @@ func jsonRequestResponse(c http.Client, endpoint string, req interface{}, res in return nil } - diff --git a/client/status.go b/client/status.go index 62cd7bc..4824f2d 100644 --- a/client/status.go +++ b/client/status.go @@ -1,26 +1,26 @@ package client import ( + "fmt" + "github.com/mitchellh/mapstructure" + "github.com/nsf/termbox-go" + "github.com/pkg/errors" "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon" - "fmt" "github.com/zrepl/zrepl/replication" - "github.com/mitchellh/mapstructure" "github.com/zrepl/zrepl/replication/fsrep" - "github.com/nsf/termbox-go" - "time" - "github.com/pkg/errors" "sort" "sync" + "time" ) type tui struct { - x, y int + x, y int indent int - lock sync.Mutex //For report and error + lock sync.Mutex //For report and error report map[string]interface{} - err error + err error } func newTui() tui { @@ -34,7 +34,7 @@ func (t *tui) moveCursor(x, y int) { func (t *tui) moveLine(dl int, col int) { t.y += dl - t.x = t.indent * 4 + col + t.x = t.indent*4 + col } func (t *tui) write(text string) { @@ -62,7 +62,6 @@ func (t *tui) addIndent(indent int) { t.moveLine(0, 0) } - func RunStatus(config config.Config, args []string) error { httpc, err := controlHttpClient(config.Global.Control.SockPath) if err != nil { @@ -80,29 +79,34 @@ func RunStatus(config config.Config, args []string) error { } defer termbox.Close() + update := func() { + m := make(map[string]interface{}) + + err2 := jsonRequestResponse(httpc, daemon.ControlJobEndpointStatus, + struct{}{}, + &m, + ) + + t.lock.Lock() + t.err = err2 + t.report = m + t.lock.Unlock() + t.draw() + } + update() + ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() go func() { for _ = range ticker.C { - m := make(map[string]interface{}) - - err2 := jsonRequestResponse(httpc, daemon.ControlJobEndpointStatus, - struct {}{}, - &m, - ) - - t.lock.Lock() - t.err = err2 - t.report = m - t.lock.Unlock() - t.draw() + update() } }() termbox.HideCursor() termbox.Clear(termbox.ColorDefault, termbox.ColorDefault) - loop: +loop: for { switch ev := termbox.PollEvent(); ev.Type { case termbox.EventKey: @@ -167,7 +171,7 @@ func (t *tui) draw() { } t.printf("Status: %s", rep.Status) t.newline() - if (rep.Problem != "") { + if rep.Problem != "" { t.printf("Problem: %s", rep.Problem) t.newline() } @@ -198,7 +202,7 @@ func rightPad(str string, length int, pad string) string { return str + times(pad, length-len(str)) } -func (t *tui) drawBar(name string, status string, total int, done int) { +func (t *tui) drawBar(name string, status string, total int, done int, bytes int64) { t.write(rightPad(name, 20, " ")) t.write(" ") t.write(rightPad(status, 20, " ")) @@ -207,11 +211,12 @@ func (t *tui) drawBar(name string, status string, total int, done int) { length := 50 completedLength := length * done / total - //FIXME finished bar has 1 off size compared to not finished bar - t.write(times("=", completedLength-1)) + t.write(times("=", completedLength)) t.write(">") t.write(times("-", length-completedLength)) + t.write(" ") + t.write(rightPad(ByteCountBinary(bytes), 8, " ")) t.printf(" %d/%d", done, total) } @@ -219,11 +224,32 @@ func (t *tui) drawBar(name string, status string, total int, done int) { } func printFilesystem(rep *fsrep.Report, t *tui) { - t.drawBar(rep.Filesystem, rep.Status, len(rep.Completed) + len(rep.Pending), len(rep.Completed)) - if (rep.Problem != "") { + bytes := int64(0) + for _, s := range rep.Pending { + bytes += s.Bytes + } + for _, s := range rep.Completed { + bytes += s.Bytes + } + + t.drawBar(rep.Filesystem, rep.Status, len(rep.Completed)+len(rep.Pending), len(rep.Completed), bytes) + if rep.Problem != "" { t.addIndent(1) t.printf("Problem: %s", rep.Problem) t.newline() t.addIndent(-1) } -} \ No newline at end of file +} + +func ByteCountBinary(b int64) string { + const unit = 1024 + if b < unit { + return fmt.Sprintf("%d B", b) + } + div, exp := int64(unit), 0 + for n := b / unit; n >= unit; n /= unit { + div *= unit + exp++ + } + return fmt.Sprintf("%.1f %ciB", float64(b)/float64(div), "KMGTPE"[exp]) +} diff --git a/main.go b/main.go index 38f7138..40f1f33 100644 --- a/main.go +++ b/main.go @@ -3,9 +3,9 @@ package main import ( "github.com/spf13/cobra" + "github.com/zrepl/zrepl/client" "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon" - "github.com/zrepl/zrepl/client" "log" "os" ) diff --git a/replication/fsrep/fsfsm.go b/replication/fsrep/fsfsm.go index 5957dda..0bf7fb6 100644 --- a/replication/fsrep/fsfsm.go +++ b/replication/fsrep/fsfsm.go @@ -13,6 +13,7 @@ import ( "github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/replication/pdu" + "github.com/zrepl/zrepl/util" ) type contextKey int @@ -56,6 +57,7 @@ type StepReport struct { From, To string Status string Problem string + Bytes int64 } type Report struct { @@ -167,7 +169,8 @@ type ReplicationStep struct { parent *Replication // both retry and permanent error - err error + err error + byteCounter *util.ByteCounterReader } func (f *Replication) TakeStep(ctx context.Context, sender Sender, receiver Receiver) (post State, nextStepDate time.Time) { @@ -362,6 +365,9 @@ func (s *ReplicationStep) doReplication(ctx context.Context, sender Sender, rece return updateStateError(err) } + s.byteCounter = util.NewByteCounterReader(sstream) + sstream = s.byteCounter + rr := &pdu.ReceiveReq{ Filesystem: fs, ClearResumeToken: !sres.UsedResumeToken, @@ -439,15 +445,20 @@ func (s *ReplicationStep) String() string { } } -func (step *ReplicationStep) Report() *StepReport { +func (s *ReplicationStep) Report() *StepReport { var from string // FIXME follow same convention as ZFS: to should be nil on full send - if step.from != nil { - from = step.from.RelName() + if s.from != nil { + from = s.from.RelName() + } + bytes := int64(0) + if s.byteCounter != nil { + bytes = s.byteCounter.Bytes() } rep := StepReport{ From: from, - To: step.to.RelName(), - Status: step.state.String(), + To: s.to.RelName(), + Status: s.state.String(), + Bytes: bytes, } return &rep } diff --git a/util/io.go b/util/io.go index 969e9c7..f9210b3 100644 --- a/util/io.go +++ b/util/io.go @@ -4,6 +4,7 @@ import ( "io" "net" "os" + "sync/atomic" ) type NetConnLogger struct { @@ -97,3 +98,28 @@ func (c *ChainedReader) Read(buf []byte) (n int, err error) { return } + +type ByteCounterReader struct { + reader io.ReadCloser + bytes int64 +} + +func NewByteCounterReader(reader io.ReadCloser) *ByteCounterReader { + return &ByteCounterReader{ + reader: reader, + } +} + +func (b *ByteCounterReader) Close() error { + return b.reader.Close() +} + +func (b *ByteCounterReader) Read(p []byte) (n int, err error) { + n, err = b.reader.Read(p) + atomic.AddInt64(&b.bytes, int64(n)) + return n, err +} + +func (b *ByteCounterReader) Bytes() int64 { + return atomic.LoadInt64(&b.bytes) +}