From 0a5870208eee85948367cc5810b18acc8a5a9cf5 Mon Sep 17 00:00:00 2001 From: klauspost Date: Tue, 15 Sep 2015 16:46:06 +0200 Subject: [PATCH] Display individual transfer progress Improve progress printing by displaying individual file progress, as well as a moving average speed with ETA. Example output: 2015/09/15 16:38:21 Transferred: 183599104 Bytes (4646.49 kByte/s) Errors: 0 Checks: 1 Transferred: 0 Elapsed time: 38.5s Transferring: * 01_06_14.mp3: 33% done. avg: 1280.5, cur: 1288.8 kByte/s. ETA: 1m12s * 01_12_15.mp3: 33% done. avg: 1002.2, cur: 943.4 kByte/s. ETA: 1m17s * 01_13_14.mp3: 48% done. avg: 1456.8, cur: 1425.2 kByte/s. ETA: 39s * 01_19_15.mp3: 28% done. avg: 1226.9, cur: 1114.4 kByte/s. ETA: 1m37s --- fs/accounting.go | 248 ++++++++++++++++++++++++++++++++++++++++------- fs/operations.go | 10 +- 2 files changed, 218 insertions(+), 40 deletions(-) diff --git a/fs/accounting.go b/fs/accounting.go index a3ce0c9b0..fb08f6d99 100644 --- a/fs/accounting.go +++ b/fs/accounting.go @@ -7,10 +7,12 @@ import ( "fmt" "io" "log" + "sort" "strings" "sync" "time" + "github.com/VividCortex/ewma" "github.com/tsenart/tb" ) @@ -28,21 +30,63 @@ func startTokenBucket() { } } -// Stringset holds some strings -type StringSet map[string]bool +// stringSet holds a set of strings +type stringSet map[string]struct{} -// Strings returns all the strings in the StringSet -func (ss StringSet) Strings() []string { - strings := make([]string, 0, len(ss)) - for k := range ss { - strings = append(strings, k) - } - return strings +// inProgress holds a synchronizes map of in progress transfers +type inProgress struct { + mu sync.Mutex + m map[string]*Account } -// String returns all the strings in the StringSet joined by comma -func (ss StringSet) String() string { - return strings.Join(ss.Strings(), ", ") +// newInProgress makes a new inProgress object +func newInProgress() *inProgress { + return &inProgress{ + m: make(map[string]*Account, Config.Transfers), + } +} + +// set marks the name as in progress +func (ip *inProgress) set(name string, acc *Account) { + ip.mu.Lock() + defer ip.mu.Unlock() + ip.m[name] = acc +} + +// clear marks the name as no longer in progress +func (ip *inProgress) clear(name string) { + ip.mu.Lock() + defer ip.mu.Unlock() + delete(ip.m, name) +} + +// get gets the account for name, of nil if not found +func (ip *inProgress) get(name string) *Account { + ip.mu.Lock() + defer ip.mu.Unlock() + return ip.m[name] +} + +// Strings returns all the strings in the stringSet +func (ss stringSet) Strings() []string { + strings := make([]string, 0, len(ss)) + for name, _ := range ss { + var out string + if acc := Stats.inProgress.get(name); acc != nil { + out = acc.String() + } else { + out = name + } + strings = append(strings, " * "+out) + } + sorted := sort.StringSlice(strings) + sorted.Sort() + return sorted +} + +// String returns all the file names in the stringSet joined by newline +func (ss stringSet) String() string { + return strings.Join(ss.Strings(), "\n") } // Stats limits and accounts all transfers @@ -51,18 +95,20 @@ type StatsInfo struct { bytes int64 errors int64 checks int64 - checking StringSet + checking stringSet transfers int64 - transferring StringSet + transferring stringSet start time.Time + inProgress *inProgress } // NewStats cretates an initialised StatsInfo func NewStats() *StatsInfo { return &StatsInfo{ - checking: make(StringSet, Config.Checkers), - transferring: make(StringSet, Config.Transfers), + checking: make(stringSet, Config.Checkers), + transferring: make(stringSet, Config.Transfers), start: time.Now(), + inProgress: newInProgress(), } } @@ -76,24 +122,25 @@ func (s *StatsInfo) String() string { if dt > 0 { speed = float64(s.bytes) / 1024 / dt_seconds } + dt_rounded := dt - (dt % (time.Second / 10)) buf := &bytes.Buffer{} fmt.Fprintf(buf, ` Transferred: %10d Bytes (%7.2f kByte/s) Errors: %10d Checks: %10d Transferred: %10d -Elapsed time: %v +Elapsed time: %10v `, s.bytes, speed, s.errors, s.checks, s.transfers, - dt) + dt_rounded) if len(s.checking) > 0 { - fmt.Fprintf(buf, "Checking: %s\n", s.checking) + fmt.Fprintf(buf, "Checking:\n%s\n", s.checking) } if len(s.transferring) > 0 { - fmt.Fprintf(buf, "Transferring: %s\n", s.transferring) + fmt.Fprintf(buf, "Transferring:\n%s\n", s.transferring) } return buf.String() } @@ -159,7 +206,7 @@ func (s *StatsInfo) Error() { func (s *StatsInfo) Checking(o Object) { s.lock.Lock() defer s.lock.Unlock() - s.checking[o.Remote()] = true + s.checking[o.Remote()] = struct{}{} } // DoneChecking removes a check from the stats @@ -181,7 +228,7 @@ func (s *StatsInfo) GetTransfers() int64 { func (s *StatsInfo) Transferring(o Object) { s.lock.Lock() defer s.lock.Unlock() - s.transferring[o.Remote()] = true + s.transferring[o.Remote()] = struct{}{} } // DoneTransferring removes a transfer from the stats @@ -199,15 +246,52 @@ type Account struct { // in http transport calls Read() after Do() returns on // CancelRequest so this race can happen when it apparently // shouldn't. - mu sync.Mutex - in io.ReadCloser - bytes int64 + mu sync.Mutex + in io.ReadCloser + size int64 + name string + statmu sync.Mutex // Separate mutex for stat values. + bytes int64 // Total number of bytes read + start time.Time // Start time of first read + lpTime time.Time // Time of last average measurement + lpBytes int // Number of bytes read since last measurement + avg ewma.MovingAverage // Moving average of last few measurements + exit chan struct{} // channel that will be closed when transfer is finished } -// NewAccount makes a Account reader -func NewAccount(in io.ReadCloser) *Account { - return &Account{ - in: in, +// NewAccount makes a Account reader for an object +func NewAccount(in io.ReadCloser, obj Object) *Account { + acc := &Account{ + in: in, + size: obj.Size(), + name: obj.Remote(), + exit: make(chan struct{}), + avg: ewma.NewMovingAverage(), + lpTime: time.Now(), + } + go acc.averageLoop() + Stats.inProgress.set(acc.name, acc) + return acc +} + +func (file *Account) averageLoop() { + tick := time.NewTicker(time.Second) + defer tick.Stop() + for { + select { + case now := <-tick.C: + file.statmu.Lock() + // Add average of last second. + elapsed := now.Sub(file.lpTime).Seconds() + avg := float64(file.lpBytes) / elapsed + file.avg.Add(avg) + file.lpBytes = 0 + file.lpTime = now + // Unlock stats + file.statmu.Unlock() + case <-file.exit: + return + } } } @@ -215,12 +299,24 @@ func NewAccount(in io.ReadCloser) *Account { func (file *Account) Read(p []byte) (n int, err error) { file.mu.Lock() defer file.mu.Unlock() - n, err = file.in.Read(p) - file.bytes += int64(n) - Stats.Bytes(int64(n)) - if err == io.EOF { - // FIXME Do something? + + // Set start time. + file.statmu.Lock() + if file.start.IsZero() { + file.start = time.Now() } + file.statmu.Unlock() + + n, err = file.in.Read(p) + + // Update Stats + file.statmu.Lock() + file.lpBytes += n + file.bytes += int64(n) + file.statmu.Unlock() + + Stats.Bytes(int64(n)) + // Limit the transfer speed if required if tokenBucket != nil { tokenBucket.Wait(int64(n)) @@ -228,11 +324,93 @@ func (file *Account) Read(p []byte) (n int, err error) { return } +// Returns bytes read as well as the size. +// Size can be <= 0 if the size is unknown. +func (file *Account) Progress() (bytes, size int64) { + if file == nil { + return 0, 0 + } + file.statmu.Lock() + if bytes > size { + size = 0 + } + defer file.statmu.Unlock() + return file.bytes, file.size +} + +// Speed returns the speed of the current file transfer +// in bytes per second, as well a an exponentially weighted moving average +// If no read has completed yet, 0 is returned for both values. +func (file *Account) Speed() (bps, current float64) { + if file == nil { + return 0, 0 + } + file.statmu.Lock() + defer file.statmu.Unlock() + if file.bytes == 0 { + return 0, 0 + } + // Calculate speed from first read. + total := float64(time.Now().Sub(file.start)) / float64(time.Second) + bps = float64(file.bytes) / total + current = file.avg.Value() + return +} + +// ETA returns the ETA of the current operation, +// rounded to full seconds. +// If the ETA cannot be determined 'ok' returns false. +func (file *Account) ETA() (eta time.Duration, ok bool) { + if file == nil || file.size <= 0 { + return 0, false + } + file.statmu.Lock() + defer file.statmu.Unlock() + if file.bytes == 0 { + return 0, false + } + left := file.size - file.bytes + if left <= 0 { + return 0, true + } + avg := file.avg.Value() + if avg <= 0 { + return 0, false + } + seconds := float64(left) / file.avg.Value() + + return time.Duration(time.Second * time.Duration(int(seconds))), true +} + +// String produces stats for this file +func (file *Account) String() string { + a, b := file.Progress() + avg, cur := file.Speed() + eta, etaok := file.ETA() + etas := "-" + if etaok { + if eta > 0 { + etas = fmt.Sprintf("%v", eta) + } else { + etas = "0s" + } + } + name := []rune(file.name) + if len(name) > 25 { + name = name[:25] + } + if b <= 0 { + return fmt.Sprintf("%s: avg:%7.1f, cur: %6.1f kByte/s. ETA: %s", string(name), avg/1024, cur/1024, etas) + } + return fmt.Sprintf("%s: %2d%% done. avg: %6.1f, cur: %6.1f kByte/s. ETA: %s", string(name), int(100*float64(a)/float64(b)), avg/1024, cur/1024, etas) +} + // Close the object func (file *Account) Close() error { + close(file.exit) file.mu.Lock() defer file.mu.Unlock() - // FIXME do something? + Stats.inProgress.clear(file.name) return file.in.Close() } diff --git a/fs/operations.go b/fs/operations.go index bc3c0e445..d8b5d7945 100644 --- a/fs/operations.go +++ b/fs/operations.go @@ -196,7 +196,7 @@ tryAgain: ErrorLog(src, "Failed to open: %s", err) return } - in := NewAccount(in0) // account the transfer + in := NewAccount(in0, src) // account the transfer if doUpdate { actionTaken = "Copied (updated existing)" @@ -621,7 +621,7 @@ func syncFprintf(w io.Writer, format string, a ...interface{}) (n int, err error return fmt.Fprintf(w, format, a...) } -// List the Fs to stdout +// List the Fs to the supplied writer // // Shows size and path // @@ -632,7 +632,7 @@ func List(f Fs, w io.Writer) error { }) } -// List the Fs to stdout +// List the Fs to the supplied writer // // Shows size, mod time and path // @@ -646,7 +646,7 @@ func ListLong(f Fs, w io.Writer) error { }) } -// List the Fs to stdout +// List the Fs to the supplied writer // // Produces the same output as the md5sum command // @@ -664,7 +664,7 @@ func Md5sum(f Fs, w io.Writer) error { }) } -// List the directories/buckets/containers in the Fs to stdout +// List the directories/buckets/containers in the Fs to the supplied writer func ListDir(f Fs, w io.Writer) error { for dir := range f.ListDir() { syncFprintf(w, "%12d %13s %9d %s\n", dir.Bytes, dir.When.Format("2006-01-02 15:04:05"), dir.Count, dir.Name)