byte counter for status

This commit is contained in:
Anton Schirg 2018-08-29 22:06:24 +02:00
parent 42056f7a32
commit 6ca11a7391
5 changed files with 104 additions and 42 deletions

View File

@ -1,13 +1,13 @@
package client package client
import ( import (
"net/http"
"net"
"context"
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"io"
"github.com/pkg/errors" "github.com/pkg/errors"
"io"
"net"
"net/http"
) )
func controlHttpClient(sockpath string) (client http.Client, err error) { func controlHttpClient(sockpath string) (client http.Client, err error) {
@ -43,4 +43,3 @@ func jsonRequestResponse(c http.Client, endpoint string, req interface{}, res in
return nil return nil
} }

View File

@ -1,26 +1,26 @@
package client package client
import ( import (
"fmt"
"github.com/mitchellh/mapstructure"
"github.com/nsf/termbox-go"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon" "github.com/zrepl/zrepl/daemon"
"fmt"
"github.com/zrepl/zrepl/replication" "github.com/zrepl/zrepl/replication"
"github.com/mitchellh/mapstructure"
"github.com/zrepl/zrepl/replication/fsrep" "github.com/zrepl/zrepl/replication/fsrep"
"github.com/nsf/termbox-go"
"time"
"github.com/pkg/errors"
"sort" "sort"
"sync" "sync"
"time"
) )
type tui struct { type tui struct {
x, y int x, y int
indent int indent int
lock sync.Mutex //For report and error lock sync.Mutex //For report and error
report map[string]interface{} report map[string]interface{}
err error err error
} }
func newTui() tui { func newTui() tui {
@ -34,7 +34,7 @@ func (t *tui) moveCursor(x, y int) {
func (t *tui) moveLine(dl int, col int) { func (t *tui) moveLine(dl int, col int) {
t.y += dl t.y += dl
t.x = t.indent * 4 + col t.x = t.indent*4 + col
} }
func (t *tui) write(text string) { func (t *tui) write(text string) {
@ -62,7 +62,6 @@ func (t *tui) addIndent(indent int) {
t.moveLine(0, 0) t.moveLine(0, 0)
} }
func RunStatus(config config.Config, args []string) error { func RunStatus(config config.Config, args []string) error {
httpc, err := controlHttpClient(config.Global.Control.SockPath) httpc, err := controlHttpClient(config.Global.Control.SockPath)
if err != nil { if err != nil {
@ -80,29 +79,34 @@ func RunStatus(config config.Config, args []string) error {
} }
defer termbox.Close() defer termbox.Close()
update := func() {
m := make(map[string]interface{})
err2 := jsonRequestResponse(httpc, daemon.ControlJobEndpointStatus,
struct{}{},
&m,
)
t.lock.Lock()
t.err = err2
t.report = m
t.lock.Unlock()
t.draw()
}
update()
ticker := time.NewTicker(500 * time.Millisecond) ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop() defer ticker.Stop()
go func() { go func() {
for _ = range ticker.C { for _ = range ticker.C {
m := make(map[string]interface{}) update()
err2 := jsonRequestResponse(httpc, daemon.ControlJobEndpointStatus,
struct {}{},
&m,
)
t.lock.Lock()
t.err = err2
t.report = m
t.lock.Unlock()
t.draw()
} }
}() }()
termbox.HideCursor() termbox.HideCursor()
termbox.Clear(termbox.ColorDefault, termbox.ColorDefault) termbox.Clear(termbox.ColorDefault, termbox.ColorDefault)
loop: loop:
for { for {
switch ev := termbox.PollEvent(); ev.Type { switch ev := termbox.PollEvent(); ev.Type {
case termbox.EventKey: case termbox.EventKey:
@ -167,7 +171,7 @@ func (t *tui) draw() {
} }
t.printf("Status: %s", rep.Status) t.printf("Status: %s", rep.Status)
t.newline() t.newline()
if (rep.Problem != "") { if rep.Problem != "" {
t.printf("Problem: %s", rep.Problem) t.printf("Problem: %s", rep.Problem)
t.newline() t.newline()
} }
@ -198,7 +202,7 @@ func rightPad(str string, length int, pad string) string {
return str + times(pad, length-len(str)) return str + times(pad, length-len(str))
} }
func (t *tui) drawBar(name string, status string, total int, done int) { func (t *tui) drawBar(name string, status string, total int, done int, bytes int64) {
t.write(rightPad(name, 20, " ")) t.write(rightPad(name, 20, " "))
t.write(" ") t.write(" ")
t.write(rightPad(status, 20, " ")) t.write(rightPad(status, 20, " "))
@ -207,11 +211,12 @@ func (t *tui) drawBar(name string, status string, total int, done int) {
length := 50 length := 50
completedLength := length * done / total completedLength := length * done / total
//FIXME finished bar has 1 off size compared to not finished bar t.write(times("=", completedLength))
t.write(times("=", completedLength-1))
t.write(">") t.write(">")
t.write(times("-", length-completedLength)) t.write(times("-", length-completedLength))
t.write(" ")
t.write(rightPad(ByteCountBinary(bytes), 8, " "))
t.printf(" %d/%d", done, total) t.printf(" %d/%d", done, total)
} }
@ -219,11 +224,32 @@ func (t *tui) drawBar(name string, status string, total int, done int) {
} }
func printFilesystem(rep *fsrep.Report, t *tui) { func printFilesystem(rep *fsrep.Report, t *tui) {
t.drawBar(rep.Filesystem, rep.Status, len(rep.Completed) + len(rep.Pending), len(rep.Completed)) bytes := int64(0)
if (rep.Problem != "") { for _, s := range rep.Pending {
bytes += s.Bytes
}
for _, s := range rep.Completed {
bytes += s.Bytes
}
t.drawBar(rep.Filesystem, rep.Status, len(rep.Completed)+len(rep.Pending), len(rep.Completed), bytes)
if rep.Problem != "" {
t.addIndent(1) t.addIndent(1)
t.printf("Problem: %s", rep.Problem) t.printf("Problem: %s", rep.Problem)
t.newline() t.newline()
t.addIndent(-1) t.addIndent(-1)
} }
} }
func ByteCountBinary(b int64) string {
const unit = 1024
if b < unit {
return fmt.Sprintf("%d B", b)
}
div, exp := int64(unit), 0
for n := b / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f %ciB", float64(b)/float64(div), "KMGTPE"[exp])
}

View File

@ -3,9 +3,9 @@ package main
import ( import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/zrepl/zrepl/client"
"github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon" "github.com/zrepl/zrepl/daemon"
"github.com/zrepl/zrepl/client"
"log" "log"
"os" "os"
) )

View File

@ -13,6 +13,7 @@ import (
"github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/replication/pdu" "github.com/zrepl/zrepl/replication/pdu"
"github.com/zrepl/zrepl/util"
) )
type contextKey int type contextKey int
@ -56,6 +57,7 @@ type StepReport struct {
From, To string From, To string
Status string Status string
Problem string Problem string
Bytes int64
} }
type Report struct { type Report struct {
@ -167,7 +169,8 @@ type ReplicationStep struct {
parent *Replication parent *Replication
// both retry and permanent error // both retry and permanent error
err error err error
byteCounter *util.ByteCounterReader
} }
func (f *Replication) TakeStep(ctx context.Context, sender Sender, receiver Receiver) (post State, nextStepDate time.Time) { func (f *Replication) TakeStep(ctx context.Context, sender Sender, receiver Receiver) (post State, nextStepDate time.Time) {
@ -362,6 +365,9 @@ func (s *ReplicationStep) doReplication(ctx context.Context, sender Sender, rece
return updateStateError(err) return updateStateError(err)
} }
s.byteCounter = util.NewByteCounterReader(sstream)
sstream = s.byteCounter
rr := &pdu.ReceiveReq{ rr := &pdu.ReceiveReq{
Filesystem: fs, Filesystem: fs,
ClearResumeToken: !sres.UsedResumeToken, ClearResumeToken: !sres.UsedResumeToken,
@ -439,15 +445,20 @@ func (s *ReplicationStep) String() string {
} }
} }
func (step *ReplicationStep) Report() *StepReport { func (s *ReplicationStep) Report() *StepReport {
var from string // FIXME follow same convention as ZFS: to should be nil on full send var from string // FIXME follow same convention as ZFS: to should be nil on full send
if step.from != nil { if s.from != nil {
from = step.from.RelName() from = s.from.RelName()
}
bytes := int64(0)
if s.byteCounter != nil {
bytes = s.byteCounter.Bytes()
} }
rep := StepReport{ rep := StepReport{
From: from, From: from,
To: step.to.RelName(), To: s.to.RelName(),
Status: step.state.String(), Status: s.state.String(),
Bytes: bytes,
} }
return &rep return &rep
} }

View File

@ -4,6 +4,7 @@ import (
"io" "io"
"net" "net"
"os" "os"
"sync/atomic"
) )
type NetConnLogger struct { type NetConnLogger struct {
@ -97,3 +98,28 @@ func (c *ChainedReader) Read(buf []byte) (n int, err error) {
return return
} }
type ByteCounterReader struct {
reader io.ReadCloser
bytes int64
}
func NewByteCounterReader(reader io.ReadCloser) *ByteCounterReader {
return &ByteCounterReader{
reader: reader,
}
}
func (b *ByteCounterReader) Close() error {
return b.reader.Close()
}
func (b *ByteCounterReader) Read(p []byte) (n int, err error) {
n, err = b.reader.Read(p)
atomic.AddInt64(&b.bytes, int64(n))
return n, err
}
func (b *ByteCounterReader) Bytes() int64 {
return atomic.LoadInt64(&b.bytes)
}