diff --git a/cmd/handler.go b/cmd/handler.go index da2f098..850e0e2 100644 --- a/cmd/handler.go +++ b/cmd/handler.go @@ -66,8 +66,6 @@ func (h Handler) HandleInitialTransferRequest(r rpc.InitialTransferRequest) (str h.Logger.Printf("error sending filesystem: %#v", err) } - h.Logger.Printf("finished zfs send") - return } @@ -87,8 +85,6 @@ func (h Handler) HandleIncrementalTransferRequest(r rpc.IncrementalTransferReque h.Logger.Printf("error sending filesystem: %#v", err) } - h.Logger.Printf("finished zfs send") - return } diff --git a/cmd/replication.go b/cmd/replication.go index 39e2d72..01d2355 100644 --- a/cmd/replication.go +++ b/cmd/replication.go @@ -2,14 +2,16 @@ package cmd import ( "fmt" - "github.com/spf13/cobra" - "github.com/zrepl/zrepl/jobrun" - "github.com/zrepl/zrepl/rpc" - "github.com/zrepl/zrepl/zfs" "io" "os" "sync" "time" + + "github.com/spf13/cobra" + "github.com/zrepl/zrepl/jobrun" + "github.com/zrepl/zrepl/rpc" + "github.com/zrepl/zrepl/util" + "github.com/zrepl/zrepl/zfs" ) var runArgs struct { @@ -357,10 +359,16 @@ func doPull(pull PullContext) (err error) { log("received initial transfer request response. zfs recv...") - if err = zfs.ZFSRecv(m.Local, stream, "-u"); err != nil { + watcher := util.IOProgressWatcher{Reader: stream} + watcher.KickOff(1*time.Second, func(p util.IOProgress) { + log("progress on receive operation: %v bytes received", p.TotalRX) + }) + + if err = zfs.ZFSRecv(m.Local, &watcher, "-u"); err != nil { log("error receiving stream, stopping...: %s", err) return false } + log("received stream, %v bytes total", watcher.Progress().TotalRX) log("configuring properties of received filesystem") @@ -379,6 +387,7 @@ func doPull(pull PullContext) (err error) { } log("incremental transfers using path: %#v", diff.IncrementalPath) + var pathRx uint64 for i := 0; i < len(diff.IncrementalPath)-1; i++ { @@ -403,16 +412,23 @@ func doPull(pull PullContext) (err error) { log("receving incremental transfer") - if err = zfs.ZFSRecv(m.Local, stream); err != nil { + watcher := util.IOProgressWatcher{Reader: stream} + watcher.KickOff(1*time.Second, func(p util.IOProgress) { + log("progress on receive operation: %v bytes received", p.TotalRX) + }) + + if err = zfs.ZFSRecv(m.Local, &watcher); err != nil { log("error receiving stream, stopping...: %s", err) return false } - log("finished incremental transfer") + totalRx := watcher.Progress().TotalRX + pathRx += totalRx + log("finished incremental transfer, %v bytes total", totalRx) } - log("finished incremental transfer path") + log("finished incremental transfer path, %v bytes total", pathRx) return true case zfs.ConflictNoCommonAncestor: diff --git a/rpc/rpc.go b/rpc/rpc.go index 6d2a17b..667f04c 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -10,6 +10,7 @@ import ( "io" "os" "reflect" + "time" ) type RPCRequester interface { @@ -242,10 +243,16 @@ func (c ByteStreamRPC) serverLoop(handler RPCHandler) error { send(&r) chunker := NewChunker(snapReader) - _, err := io.Copy(conn, &chunker) + watcher := IOProgressWatcher{Reader: &chunker} + watcher.KickOff(1*time.Second, func(p IOProgress) { + log.Printf("progress sending initial snapshot stream: %v bytes sent", p.TotalRX) + }) + _, err := io.Copy(conn, &watcher) if err != nil { + log.Printf("error sending initial snapshot stream: %s", err) panic(err) } + log.Printf("finished sending initial snapshot stream: total %v bytes sent", watcher.Progress().TotalRX) } case RTIncrementalTransferRequest: @@ -268,10 +275,16 @@ func (c ByteStreamRPC) serverLoop(handler RPCHandler) error { send(&r) chunker := NewChunker(snapReader) - _, err := io.Copy(conn, &chunker) + + watcher := IOProgressWatcher{Reader: &chunker} + watcher.KickOff(1*time.Second, func(p IOProgress) { + log.Printf("progress sending incremental snapshot stream: %v bytes sent", p.TotalRX) + }) + _, err := io.Copy(conn, &watcher) if err != nil { panic(err) } + log.Printf("finished sending incremental snapshot stream: total %v bytes sent", watcher.Progress().TotalRX) } case RTPullMeRequest: diff --git a/util/io.go b/util/io.go index 68ae286..446b323 100644 --- a/util/io.go +++ b/util/io.go @@ -3,6 +3,7 @@ package util import ( "io" "os" + "time" ) type ReadWriteCloserLogger struct { @@ -96,3 +97,52 @@ func (c *ChainedReader) Read(buf []byte) (n int, err error) { return } + +type IOProgress struct { + TotalRX uint64 +} + +type IOProgressCallback func(progress IOProgress) + +type IOProgressWatcher struct { + Reader io.Reader + callback IOProgressCallback + callbackTicker *time.Ticker + progress IOProgress + updateChannel chan int +} + +func (w *IOProgressWatcher) KickOff(callbackInterval time.Duration, callback IOProgressCallback) { + w.callback = callback + w.callbackTicker = time.NewTicker(callbackInterval) + w.updateChannel = make(chan int) + go func() { + outer: + for { + select { + case newBytes, more := <-w.updateChannel: + w.progress.TotalRX += uint64(newBytes) + if !more { + w.callbackTicker.Stop() + break outer + } + case <-w.callbackTicker.C: + w.callback(w.progress) + } + } + w.callback(w.progress) + }() +} + +func (w *IOProgressWatcher) Progress() IOProgress { + return w.progress +} + +func (w *IOProgressWatcher) Read(p []byte) (n int, err error) { + n, err = w.Reader.Read(p) + w.updateChannel <- n + if err != nil { + close(w.updateChannel) + } + return +}