Rudimentary progress reporting on send / recv side.

This commit is contained in:
Christian Schwarz 2017-07-30 09:14:37 +02:00
parent d1999fc17c
commit 8eb4a2ba44
4 changed files with 89 additions and 14 deletions

View File

@ -66,8 +66,6 @@ func (h Handler) HandleInitialTransferRequest(r rpc.InitialTransferRequest) (str
h.Logger.Printf("error sending filesystem: %#v", err) h.Logger.Printf("error sending filesystem: %#v", err)
} }
h.Logger.Printf("finished zfs send")
return return
} }
@ -87,8 +85,6 @@ func (h Handler) HandleIncrementalTransferRequest(r rpc.IncrementalTransferReque
h.Logger.Printf("error sending filesystem: %#v", err) h.Logger.Printf("error sending filesystem: %#v", err)
} }
h.Logger.Printf("finished zfs send")
return return
} }

View File

@ -2,14 +2,16 @@ package cmd
import ( import (
"fmt" "fmt"
"github.com/spf13/cobra"
"github.com/zrepl/zrepl/jobrun"
"github.com/zrepl/zrepl/rpc"
"github.com/zrepl/zrepl/zfs"
"io" "io"
"os" "os"
"sync" "sync"
"time" "time"
"github.com/spf13/cobra"
"github.com/zrepl/zrepl/jobrun"
"github.com/zrepl/zrepl/rpc"
"github.com/zrepl/zrepl/util"
"github.com/zrepl/zrepl/zfs"
) )
var runArgs struct { var runArgs struct {
@ -357,10 +359,16 @@ func doPull(pull PullContext) (err error) {
log("received initial transfer request response. zfs recv...") log("received initial transfer request response. zfs recv...")
if err = zfs.ZFSRecv(m.Local, stream, "-u"); err != nil { watcher := util.IOProgressWatcher{Reader: stream}
watcher.KickOff(1*time.Second, func(p util.IOProgress) {
log("progress on receive operation: %v bytes received", p.TotalRX)
})
if err = zfs.ZFSRecv(m.Local, &watcher, "-u"); err != nil {
log("error receiving stream, stopping...: %s", err) log("error receiving stream, stopping...: %s", err)
return false return false
} }
log("received stream, %v bytes total", watcher.Progress().TotalRX)
log("configuring properties of received filesystem") log("configuring properties of received filesystem")
@ -379,6 +387,7 @@ func doPull(pull PullContext) (err error) {
} }
log("incremental transfers using path: %#v", diff.IncrementalPath) log("incremental transfers using path: %#v", diff.IncrementalPath)
var pathRx uint64
for i := 0; i < len(diff.IncrementalPath)-1; i++ { for i := 0; i < len(diff.IncrementalPath)-1; i++ {
@ -403,16 +412,23 @@ func doPull(pull PullContext) (err error) {
log("receving incremental transfer") log("receving incremental transfer")
if err = zfs.ZFSRecv(m.Local, stream); err != nil { watcher := util.IOProgressWatcher{Reader: stream}
watcher.KickOff(1*time.Second, func(p util.IOProgress) {
log("progress on receive operation: %v bytes received", p.TotalRX)
})
if err = zfs.ZFSRecv(m.Local, &watcher); err != nil {
log("error receiving stream, stopping...: %s", err) log("error receiving stream, stopping...: %s", err)
return false return false
} }
log("finished incremental transfer") totalRx := watcher.Progress().TotalRX
pathRx += totalRx
log("finished incremental transfer, %v bytes total", totalRx)
} }
log("finished incremental transfer path") log("finished incremental transfer path, %v bytes total", pathRx)
return true return true
case zfs.ConflictNoCommonAncestor: case zfs.ConflictNoCommonAncestor:

View File

@ -10,6 +10,7 @@ import (
"io" "io"
"os" "os"
"reflect" "reflect"
"time"
) )
type RPCRequester interface { type RPCRequester interface {
@ -242,10 +243,16 @@ func (c ByteStreamRPC) serverLoop(handler RPCHandler) error {
send(&r) send(&r)
chunker := NewChunker(snapReader) chunker := NewChunker(snapReader)
_, err := io.Copy(conn, &chunker) watcher := IOProgressWatcher{Reader: &chunker}
watcher.KickOff(1*time.Second, func(p IOProgress) {
log.Printf("progress sending initial snapshot stream: %v bytes sent", p.TotalRX)
})
_, err := io.Copy(conn, &watcher)
if err != nil { if err != nil {
log.Printf("error sending initial snapshot stream: %s", err)
panic(err) panic(err)
} }
log.Printf("finished sending initial snapshot stream: total %v bytes sent", watcher.Progress().TotalRX)
} }
case RTIncrementalTransferRequest: case RTIncrementalTransferRequest:
@ -268,10 +275,16 @@ func (c ByteStreamRPC) serverLoop(handler RPCHandler) error {
send(&r) send(&r)
chunker := NewChunker(snapReader) chunker := NewChunker(snapReader)
_, err := io.Copy(conn, &chunker)
watcher := IOProgressWatcher{Reader: &chunker}
watcher.KickOff(1*time.Second, func(p IOProgress) {
log.Printf("progress sending incremental snapshot stream: %v bytes sent", p.TotalRX)
})
_, err := io.Copy(conn, &watcher)
if err != nil { if err != nil {
panic(err) panic(err)
} }
log.Printf("finished sending incremental snapshot stream: total %v bytes sent", watcher.Progress().TotalRX)
} }
case RTPullMeRequest: case RTPullMeRequest:

View File

@ -3,6 +3,7 @@ package util
import ( import (
"io" "io"
"os" "os"
"time"
) )
type ReadWriteCloserLogger struct { type ReadWriteCloserLogger struct {
@ -96,3 +97,52 @@ func (c *ChainedReader) Read(buf []byte) (n int, err error) {
return return
} }
type IOProgress struct {
TotalRX uint64
}
type IOProgressCallback func(progress IOProgress)
type IOProgressWatcher struct {
Reader io.Reader
callback IOProgressCallback
callbackTicker *time.Ticker
progress IOProgress
updateChannel chan int
}
func (w *IOProgressWatcher) KickOff(callbackInterval time.Duration, callback IOProgressCallback) {
w.callback = callback
w.callbackTicker = time.NewTicker(callbackInterval)
w.updateChannel = make(chan int)
go func() {
outer:
for {
select {
case newBytes, more := <-w.updateChannel:
w.progress.TotalRX += uint64(newBytes)
if !more {
w.callbackTicker.Stop()
break outer
}
case <-w.callbackTicker.C:
w.callback(w.progress)
}
}
w.callback(w.progress)
}()
}
func (w *IOProgressWatcher) Progress() IOProgress {
return w.progress
}
func (w *IOProgressWatcher) Read(p []byte) (n int, err error) {
n, err = w.Reader.Read(p)
w.updateChannel <- n
if err != nil {
close(w.updateChannel)
}
return
}