Puller: refactor + use Task API

* drop rx byte count functionality
* will be re-added to Task as necessary

refs #10
This commit is contained in:
Christian Schwarz
2017-12-26 21:37:48 +01:00
parent 59e34942d1
commit b69089a527
4 changed files with 247 additions and 274 deletions

View File

@ -3,7 +3,6 @@ package util
import (
"io"
"os"
"time"
)
type ReadWriteCloserLogger struct {
@ -97,52 +96,3 @@ func (c *ChainedReader) Read(buf []byte) (n int, err error) {
return
}
type IOProgress struct {
TotalRX uint64
}
type IOProgressCallback func(progress IOProgress)
type IOProgressWatcher struct {
Reader io.Reader
callback IOProgressCallback
callbackTicker *time.Ticker
progress IOProgress
updateChannel chan int
}
func (w *IOProgressWatcher) KickOff(callbackInterval time.Duration, callback IOProgressCallback) {
w.callback = callback
w.callbackTicker = time.NewTicker(callbackInterval)
w.updateChannel = make(chan int)
go func() {
outer:
for {
select {
case newBytes, more := <-w.updateChannel:
w.progress.TotalRX += uint64(newBytes)
if !more {
w.callbackTicker.Stop()
break outer
}
case <-w.callbackTicker.C:
w.callback(w.progress)
}
}
w.callback(w.progress)
}()
}
func (w *IOProgressWatcher) Progress() IOProgress {
return w.progress
}
func (w *IOProgressWatcher) Read(p []byte) (n int, err error) {
n, err = w.Reader.Read(p)
w.updateChannel <- n
if err != nil {
close(w.updateChannel)
}
return
}