Integrate NCW changes, and add async average calculation.

This commit is contained in:
klauspost 2015-09-16 11:45:33 +02:00
parent 7667d728ab
commit acf939f569

View File

@ -7,10 +7,10 @@ import (
"fmt" "fmt"
"io" "io"
"log" "log"
"sort"
"strings" "strings"
"sync" "sync"
"time" "time"
"sort"
"github.com/VividCortex/ewma" "github.com/VividCortex/ewma"
"github.com/tsenart/tb" "github.com/tsenart/tb"
@ -30,22 +30,62 @@ func startTokenBucket() {
} }
} }
// Stringset holds some strings // stringSet holds a set of strings
type StringSet map[string]Tracker type stringSet map[string]struct{}
// Strings returns all the strings in the StringSet // inProgress holds a synchronizes map of in progress transfers
func (ss StringSet) Strings() []string { type inProgress struct {
mu sync.Mutex
m map[string]*Account
}
// newInProgress makes a new inProgress object
func newInProgress() *inProgress {
return &inProgress{
m: make(map[string]*Account, Config.Transfers),
}
}
// set marks the name as in progress
func (ip *inProgress) set(name string, acc *Account) {
ip.mu.Lock()
defer ip.mu.Unlock()
ip.m[name] = acc
}
// clear marks the name as no longer in progress
func (ip *inProgress) clear(name string) {
ip.mu.Lock()
defer ip.mu.Unlock()
delete(ip.m, name)
}
// get gets the account for name, of nil if not found
func (ip *inProgress) get(name string) *Account {
ip.mu.Lock()
defer ip.mu.Unlock()
return ip.m[name]
}
// Strings returns all the strings in the stringSet
func (ss stringSet) Strings() []string {
strings := make([]string, 0, len(ss)) strings := make([]string, 0, len(ss))
for _, v := range ss { for name, _ := range ss {
strings = append(strings, " * "+v.String()) var out string
if acc := Stats.inProgress.get(name); acc != nil {
out = acc.String()
} else {
out = name
}
strings = append(strings, " * "+out)
} }
sorted := sort.StringSlice(strings) sorted := sort.StringSlice(strings)
sorted.Sort() sorted.Sort()
return sorted return sorted
} }
// String returns all the strings in the StringSet joined by comma // String returns all the file names in the stringSet joined by newline
func (ss StringSet) String() string { func (ss stringSet) String() string {
return strings.Join(ss.Strings(), "\n") return strings.Join(ss.Strings(), "\n")
} }
@ -55,18 +95,20 @@ type StatsInfo struct {
bytes int64 bytes int64
errors int64 errors int64
checks int64 checks int64
checking StringSet checking stringSet
transfers int64 transfers int64
transferring StringSet transferring stringSet
start time.Time start time.Time
inProgress *inProgress
} }
// NewStats cretates an initialised StatsInfo // NewStats cretates an initialised StatsInfo
func NewStats() *StatsInfo { func NewStats() *StatsInfo {
return &StatsInfo{ return &StatsInfo{
checking: make(StringSet, Config.Checkers), checking: make(stringSet, Config.Checkers),
transferring: make(StringSet, Config.Transfers), transferring: make(stringSet, Config.Transfers),
start: time.Now(), start: time.Now(),
inProgress: newInProgress(),
} }
} }
@ -161,17 +203,17 @@ func (s *StatsInfo) Error() {
} }
// Checking adds a check into the stats // Checking adds a check into the stats
func (s *StatsInfo) Checking(p Tracker) { func (s *StatsInfo) Checking(o Object) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
s.checking[p.Remote()] = p s.checking[o.Remote()] = struct{}{}
} }
// DoneChecking removes a check from the stats // DoneChecking removes a check from the stats
func (s *StatsInfo) DoneChecking(p Tracker) { func (s *StatsInfo) DoneChecking(o Object) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
delete(s.checking, p.Remote()) delete(s.checking, o.Remote())
s.checks += 1 s.checks += 1
} }
@ -183,17 +225,17 @@ func (s *StatsInfo) GetTransfers() int64 {
} }
// Transferring adds a transfer into the stats // Transferring adds a transfer into the stats
func (s *StatsInfo) Transferring(p Tracker) { func (s *StatsInfo) Transferring(o Object) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
s.transferring[p.Remote()] = p s.transferring[o.Remote()] = struct{}{}
} }
// DoneTransferring removes a transfer from the stats // DoneTransferring removes a transfer from the stats
func (s *StatsInfo) DoneTransferring(p Tracker) { func (s *StatsInfo) DoneTransferring(o Object) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
delete(s.transferring, p.Remote()) delete(s.transferring, o.Remote())
s.transfers += 1 s.transfers += 1
} }
@ -204,29 +246,52 @@ type Account struct {
// in http transport calls Read() after Do() returns on // in http transport calls Read() after Do() returns on
// CancelRequest so this race can happen when it apparently // CancelRequest so this race can happen when it apparently
// shouldn't. // shouldn't.
mu sync.Mutex mu sync.Mutex
in io.ReadCloser in io.ReadCloser
size int64 size int64
statmu sync.Mutex // Separate mutex for stat values. name string
bytes int64 statmu sync.Mutex // Separate mutex for stat values.
start time.Time bytes int64 // Total number of bytes read
lpTime time.Time start time.Time // Start time of first read
lpBytes int lpTime time.Time // Time of last average measurement
avg ewma.MovingAverage lpBytes int // Number of bytes read since last measurement
avg ewma.MovingAverage // Moving average of last few measurements
exit chan struct{} // channel that will be closed when transfer is finished
} }
// NewAccount makes a Account reader // NewAccount makes a Account reader for an object
func NewAccount(in io.ReadCloser) *Account { func NewAccount(in io.ReadCloser, obj Object) *Account {
return &Account{ acc := &Account{
in: in, in: in,
size: obj.Size(),
name: obj.Remote(),
exit: make(chan struct{}),
avg: ewma.NewMovingAverage(),
lpTime: time.Now(),
} }
go acc.averageLoop()
Stats.inProgress.set(acc.name, acc)
return acc
} }
// NewAccount makes a Account reader with a specified size func (file *Account) averageLoop() {
func NewAccountSize(in io.ReadCloser, size int64) *Account { tick := time.NewTicker(time.Second)
return &Account{ defer tick.Stop()
in: in, for {
size: size, select {
case now := <-tick.C:
file.statmu.Lock()
// Add average of last second.
elapsed := now.Sub(file.lpTime).Seconds()
avg := float64(file.lpBytes) / elapsed
file.avg.Add(avg)
file.lpBytes = 0
file.lpTime = now
// Unlock stats
file.statmu.Unlock()
case <-file.exit:
return
}
} }
} }
@ -239,8 +304,6 @@ func (file *Account) Read(p []byte) (n int, err error) {
file.statmu.Lock() file.statmu.Lock()
if file.start.IsZero() { if file.start.IsZero() {
file.start = time.Now() file.start = time.Now()
file.lpTime = time.Now()
file.avg = ewma.NewMovingAverage()
} }
file.statmu.Unlock() file.statmu.Unlock()
@ -249,17 +312,6 @@ func (file *Account) Read(p []byte) (n int, err error) {
// Update Stats // Update Stats
file.statmu.Lock() file.statmu.Lock()
file.lpBytes += n file.lpBytes += n
elapsed := time.Now().Sub(file.lpTime)
// We only update the moving average every second, otherwise
// the variance is too big.
if elapsed > time.Second {
avg := float64(file.lpBytes) / (float64(elapsed) / float64(time.Second))
file.avg.Add(avg)
file.lpBytes = 0
file.lpTime = time.Now()
}
file.bytes += int64(n) file.bytes += int64(n)
file.statmu.Unlock() file.statmu.Unlock()
@ -325,49 +377,16 @@ func (file *Account) ETA() (eta time.Duration, ok bool) {
if avg <= 0 { if avg <= 0 {
return 0, false return 0, false
} }
seconds := float64(left)/file.avg.Value() seconds := float64(left) / file.avg.Value()
return time.Duration(time.Second * time.Duration(int(seconds))), true return time.Duration(time.Second * time.Duration(int(seconds))), true
} }
// Close the object // String produces stats for this file
func (file *Account) Close() error { func (file *Account) String() string {
file.mu.Lock() a, b := file.Progress()
defer file.mu.Unlock() avg, cur := file.Speed()
// FIXME do something? eta, etaok := file.ETA()
return file.in.Close()
}
type tracker struct {
Object
mu sync.Mutex
acc *Account
}
func NewTracker(o Object) Tracker {
return &tracker{Object: o}
}
func (t *tracker) SetAccount(a *Account) {
t.mu.Lock()
t.acc = a
t.mu.Unlock()
}
func (t *tracker) GetAccount() *Account {
t.mu.Lock()
defer t.mu.Unlock()
return t.acc
}
func (t *tracker) String() string {
acc := t.GetAccount()
if acc == nil {
return t.Remote()
}
a, b := acc.Progress()
avg, cur := acc.Speed()
eta, etaok := acc.ETA()
etas := "-" etas := "-"
if etaok { if etaok {
if eta > 0 { if eta > 0 {
@ -376,22 +395,23 @@ func (t *tracker) String() string {
etas = "0s" etas = "0s"
} }
} }
name := []rune(t.Remote()) name := []rune(file.name)
if len(name) > 25 { if len(name) > 25 {
name = name[:25] name = name[:25]
} }
if b <= 0 { if b <= 0 {
return fmt.Sprintf("%s: avg:%7.1f, cur: %6.1f kByte/s. ETA: %s", string(name) , avg/1024, cur/1024, etas) return fmt.Sprintf("%s: avg:%7.1f, cur: %6.1f kByte/s. ETA: %s", string(name), avg/1024, cur/1024, etas)
} }
return fmt.Sprintf("%s: %2d%% done. avg: %6.1f, cur: %6.1f kByte/s. ETA: %s", string(name), int(100*float64(a)/float64(b)), avg/1024, cur/1024, etas) return fmt.Sprintf("%s: %2d%% done. avg: %6.1f, cur: %6.1f kByte/s. ETA: %s", string(name), int(100*float64(a)/float64(b)), avg/1024, cur/1024, etas)
} }
// A Tracker interface includes an // Close the object
// Object with an optional Account object func (file *Account) Close() error {
// attached for tracking stats. close(file.exit)
type Tracker interface { file.mu.Lock()
Object defer file.mu.Unlock()
SetAccount(*Account) Stats.inProgress.clear(file.name)
return file.in.Close()
} }
// Check it satisfies the interface // Check it satisfies the interface