mirror of
https://github.com/rclone/rclone.git
synced 2025-02-18 03:21:11 +01:00
Display individual transfer progress
Improve progress printing by displaying individual file progress, as well as a moving average speed with ETA. Example output: 2015/09/15 16:38:21 Transferred: 183599104 Bytes (4646.49 kByte/s) Errors: 0 Checks: 1 Transferred: 0 Elapsed time: 38.5s Transferring: * 01_06_14.mp3: 33% done. avg: 1280.5, cur: 1288.8 kByte/s. ETA: 1m12s * 01_12_15.mp3: 33% done. avg: 1002.2, cur: 943.4 kByte/s. ETA: 1m17s * 01_13_14.mp3: 48% done. avg: 1456.8, cur: 1425.2 kByte/s. ETA: 39s * 01_19_15.mp3: 28% done. avg: 1226.9, cur: 1114.4 kByte/s. ETA: 1m37s
This commit is contained in:
parent
34193fd8d9
commit
7667d728ab
208
fs/accounting.go
208
fs/accounting.go
@ -10,7 +10,9 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
"sort"
|
||||
|
||||
"github.com/VividCortex/ewma"
|
||||
"github.com/tsenart/tb"
|
||||
)
|
||||
|
||||
@ -29,20 +31,22 @@ func startTokenBucket() {
|
||||
}
|
||||
|
||||
// Stringset holds some strings
|
||||
type StringSet map[string]bool
|
||||
type StringSet map[string]Tracker
|
||||
|
||||
// Strings returns all the strings in the StringSet
|
||||
func (ss StringSet) Strings() []string {
|
||||
strings := make([]string, 0, len(ss))
|
||||
for k := range ss {
|
||||
strings = append(strings, k)
|
||||
for _, v := range ss {
|
||||
strings = append(strings, " * "+v.String())
|
||||
}
|
||||
return strings
|
||||
sorted := sort.StringSlice(strings)
|
||||
sorted.Sort()
|
||||
return sorted
|
||||
}
|
||||
|
||||
// String returns all the strings in the StringSet joined by comma
|
||||
func (ss StringSet) String() string {
|
||||
return strings.Join(ss.Strings(), ", ")
|
||||
return strings.Join(ss.Strings(), "\n")
|
||||
}
|
||||
|
||||
// Stats limits and accounts all transfers
|
||||
@ -76,24 +80,25 @@ func (s *StatsInfo) String() string {
|
||||
if dt > 0 {
|
||||
speed = float64(s.bytes) / 1024 / dt_seconds
|
||||
}
|
||||
dt_rounded := dt - (dt % (time.Second / 10))
|
||||
buf := &bytes.Buffer{}
|
||||
fmt.Fprintf(buf, `
|
||||
Transferred: %10d Bytes (%7.2f kByte/s)
|
||||
Errors: %10d
|
||||
Checks: %10d
|
||||
Transferred: %10d
|
||||
Elapsed time: %v
|
||||
Elapsed time: %10v
|
||||
`,
|
||||
s.bytes, speed,
|
||||
s.errors,
|
||||
s.checks,
|
||||
s.transfers,
|
||||
dt)
|
||||
dt_rounded)
|
||||
if len(s.checking) > 0 {
|
||||
fmt.Fprintf(buf, "Checking: %s\n", s.checking)
|
||||
fmt.Fprintf(buf, "Checking:\n%s\n", s.checking)
|
||||
}
|
||||
if len(s.transferring) > 0 {
|
||||
fmt.Fprintf(buf, "Transferring: %s\n", s.transferring)
|
||||
fmt.Fprintf(buf, "Transferring:\n%s\n", s.transferring)
|
||||
}
|
||||
return buf.String()
|
||||
}
|
||||
@ -156,17 +161,17 @@ func (s *StatsInfo) Error() {
|
||||
}
|
||||
|
||||
// Checking adds a check into the stats
|
||||
func (s *StatsInfo) Checking(o Object) {
|
||||
func (s *StatsInfo) Checking(p Tracker) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
s.checking[o.Remote()] = true
|
||||
s.checking[p.Remote()] = p
|
||||
}
|
||||
|
||||
// DoneChecking removes a check from the stats
|
||||
func (s *StatsInfo) DoneChecking(o Object) {
|
||||
func (s *StatsInfo) DoneChecking(p Tracker) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
delete(s.checking, o.Remote())
|
||||
delete(s.checking, p.Remote())
|
||||
s.checks += 1
|
||||
}
|
||||
|
||||
@ -178,17 +183,17 @@ func (s *StatsInfo) GetTransfers() int64 {
|
||||
}
|
||||
|
||||
// Transferring adds a transfer into the stats
|
||||
func (s *StatsInfo) Transferring(o Object) {
|
||||
func (s *StatsInfo) Transferring(p Tracker) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
s.transferring[o.Remote()] = true
|
||||
s.transferring[p.Remote()] = p
|
||||
}
|
||||
|
||||
// DoneTransferring removes a transfer from the stats
|
||||
func (s *StatsInfo) DoneTransferring(o Object) {
|
||||
func (s *StatsInfo) DoneTransferring(p Tracker) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
delete(s.transferring, o.Remote())
|
||||
delete(s.transferring, p.Remote())
|
||||
s.transfers += 1
|
||||
}
|
||||
|
||||
@ -199,9 +204,15 @@ type Account struct {
|
||||
// in http transport calls Read() after Do() returns on
|
||||
// CancelRequest so this race can happen when it apparently
|
||||
// shouldn't.
|
||||
mu sync.Mutex
|
||||
in io.ReadCloser
|
||||
bytes int64
|
||||
mu sync.Mutex
|
||||
in io.ReadCloser
|
||||
size int64
|
||||
statmu sync.Mutex // Separate mutex for stat values.
|
||||
bytes int64
|
||||
start time.Time
|
||||
lpTime time.Time
|
||||
lpBytes int
|
||||
avg ewma.MovingAverage
|
||||
}
|
||||
|
||||
// NewAccount makes a Account reader
|
||||
@ -211,16 +222,49 @@ func NewAccount(in io.ReadCloser) *Account {
|
||||
}
|
||||
}
|
||||
|
||||
// NewAccount makes a Account reader with a specified size
|
||||
func NewAccountSize(in io.ReadCloser, size int64) *Account {
|
||||
return &Account{
|
||||
in: in,
|
||||
size: size,
|
||||
}
|
||||
}
|
||||
|
||||
// Read bytes from the object - see io.Reader
|
||||
func (file *Account) Read(p []byte) (n int, err error) {
|
||||
file.mu.Lock()
|
||||
defer file.mu.Unlock()
|
||||
n, err = file.in.Read(p)
|
||||
file.bytes += int64(n)
|
||||
Stats.Bytes(int64(n))
|
||||
if err == io.EOF {
|
||||
// FIXME Do something?
|
||||
|
||||
// Set start time.
|
||||
file.statmu.Lock()
|
||||
if file.start.IsZero() {
|
||||
file.start = time.Now()
|
||||
file.lpTime = time.Now()
|
||||
file.avg = ewma.NewMovingAverage()
|
||||
}
|
||||
file.statmu.Unlock()
|
||||
|
||||
n, err = file.in.Read(p)
|
||||
|
||||
// Update Stats
|
||||
file.statmu.Lock()
|
||||
file.lpBytes += n
|
||||
elapsed := time.Now().Sub(file.lpTime)
|
||||
|
||||
// We only update the moving average every second, otherwise
|
||||
// the variance is too big.
|
||||
if elapsed > time.Second {
|
||||
avg := float64(file.lpBytes) / (float64(elapsed) / float64(time.Second))
|
||||
file.avg.Add(avg)
|
||||
file.lpBytes = 0
|
||||
file.lpTime = time.Now()
|
||||
}
|
||||
|
||||
file.bytes += int64(n)
|
||||
file.statmu.Unlock()
|
||||
|
||||
Stats.Bytes(int64(n))
|
||||
|
||||
// Limit the transfer speed if required
|
||||
if tokenBucket != nil {
|
||||
tokenBucket.Wait(int64(n))
|
||||
@ -228,6 +272,64 @@ func (file *Account) Read(p []byte) (n int, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// Returns bytes read as well as the size.
|
||||
// Size can be <= 0 if the size is unknown.
|
||||
func (file *Account) Progress() (bytes, size int64) {
|
||||
if file == nil {
|
||||
return 0, 0
|
||||
}
|
||||
file.statmu.Lock()
|
||||
if bytes > size {
|
||||
size = 0
|
||||
}
|
||||
defer file.statmu.Unlock()
|
||||
return file.bytes, file.size
|
||||
}
|
||||
|
||||
// Speed returns the speed of the current file transfer
|
||||
// in bytes per second, as well a an exponentially weighted moving average
|
||||
// If no read has completed yet, 0 is returned for both values.
|
||||
func (file *Account) Speed() (bps, current float64) {
|
||||
if file == nil {
|
||||
return 0, 0
|
||||
}
|
||||
file.statmu.Lock()
|
||||
defer file.statmu.Unlock()
|
||||
if file.bytes == 0 {
|
||||
return 0, 0
|
||||
}
|
||||
// Calculate speed from first read.
|
||||
total := float64(time.Now().Sub(file.start)) / float64(time.Second)
|
||||
bps = float64(file.bytes) / total
|
||||
current = file.avg.Value()
|
||||
return
|
||||
}
|
||||
|
||||
// ETA returns the ETA of the current operation,
|
||||
// rounded to full seconds.
|
||||
// If the ETA cannot be determined 'ok' returns false.
|
||||
func (file *Account) ETA() (eta time.Duration, ok bool) {
|
||||
if file == nil || file.size <= 0 {
|
||||
return 0, false
|
||||
}
|
||||
file.statmu.Lock()
|
||||
defer file.statmu.Unlock()
|
||||
if file.bytes == 0 {
|
||||
return 0, false
|
||||
}
|
||||
left := file.size - file.bytes
|
||||
if left <= 0 {
|
||||
return 0, true
|
||||
}
|
||||
avg := file.avg.Value()
|
||||
if avg <= 0 {
|
||||
return 0, false
|
||||
}
|
||||
seconds := float64(left)/file.avg.Value()
|
||||
|
||||
return time.Duration(time.Second * time.Duration(int(seconds))), true
|
||||
}
|
||||
|
||||
// Close the object
|
||||
func (file *Account) Close() error {
|
||||
file.mu.Lock()
|
||||
@ -236,5 +338,61 @@ func (file *Account) Close() error {
|
||||
return file.in.Close()
|
||||
}
|
||||
|
||||
type tracker struct {
|
||||
Object
|
||||
mu sync.Mutex
|
||||
acc *Account
|
||||
}
|
||||
|
||||
func NewTracker(o Object) Tracker {
|
||||
return &tracker{Object: o}
|
||||
}
|
||||
|
||||
func (t *tracker) SetAccount(a *Account) {
|
||||
t.mu.Lock()
|
||||
t.acc = a
|
||||
t.mu.Unlock()
|
||||
}
|
||||
|
||||
func (t *tracker) GetAccount() *Account {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
return t.acc
|
||||
}
|
||||
|
||||
func (t *tracker) String() string {
|
||||
acc := t.GetAccount()
|
||||
if acc == nil {
|
||||
return t.Remote()
|
||||
}
|
||||
a, b := acc.Progress()
|
||||
avg, cur := acc.Speed()
|
||||
eta, etaok := acc.ETA()
|
||||
etas := "-"
|
||||
if etaok {
|
||||
if eta > 0 {
|
||||
etas = fmt.Sprintf("%v", eta)
|
||||
} else {
|
||||
etas = "0s"
|
||||
}
|
||||
}
|
||||
name := []rune(t.Remote())
|
||||
if len(name) > 25 {
|
||||
name = name[:25]
|
||||
}
|
||||
if b <= 0 {
|
||||
return fmt.Sprintf("%s: avg:%7.1f, cur: %6.1f kByte/s. ETA: %s", string(name) , avg/1024, cur/1024, etas)
|
||||
}
|
||||
return fmt.Sprintf("%s: %2d%% done. avg: %6.1f, cur: %6.1f kByte/s. ETA: %s", string(name), int(100*float64(a)/float64(b)), avg/1024, cur/1024, etas)
|
||||
}
|
||||
|
||||
// A Tracker interface includes an
|
||||
// Object with an optional Account object
|
||||
// attached for tracking stats.
|
||||
type Tracker interface {
|
||||
Object
|
||||
SetAccount(*Account)
|
||||
}
|
||||
|
||||
// Check it satisfies the interface
|
||||
var _ io.ReadCloser = &Account{}
|
||||
|
@ -169,7 +169,7 @@ func removeFailedCopy(dst Object) bool {
|
||||
// If dst is nil then the object must not exist already. If you do
|
||||
// call Copy() with dst nil on a pre-existing file then some filing
|
||||
// systems (eg Drive) may duplicate the file.
|
||||
func Copy(f Fs, dst, src Object) {
|
||||
func Copy(f Fs, dst Object, src Tracker) {
|
||||
const maxTries = 10
|
||||
tries := 0
|
||||
doUpdate := dst != nil
|
||||
@ -196,7 +196,8 @@ tryAgain:
|
||||
ErrorLog(src, "Failed to open: %s", err)
|
||||
return
|
||||
}
|
||||
in := NewAccount(in0) // account the transfer
|
||||
in := NewAccountSize(in0, src.Size()) // account the transfer
|
||||
src.SetAccount(in)
|
||||
|
||||
if doUpdate {
|
||||
actionTaken = "Copied (updated existing)"
|
||||
@ -287,10 +288,10 @@ func checkOne(pair ObjectPair, out ObjectPairChan) {
|
||||
func PairChecker(in ObjectPairChan, out ObjectPairChan, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
for pair := range in {
|
||||
src := pair.src
|
||||
Stats.Checking(src)
|
||||
p := NewTracker(pair.src)
|
||||
Stats.Checking(p)
|
||||
checkOne(pair, out)
|
||||
Stats.DoneChecking(src)
|
||||
Stats.DoneChecking(p)
|
||||
}
|
||||
}
|
||||
|
||||
@ -299,13 +300,14 @@ func PairCopier(in ObjectPairChan, fdst Fs, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
for pair := range in {
|
||||
src := pair.src
|
||||
Stats.Transferring(src)
|
||||
p := NewTracker(src)
|
||||
Stats.Transferring(p)
|
||||
if Config.DryRun {
|
||||
Debug(src, "Not copying as --dry-run")
|
||||
} else {
|
||||
Copy(fdst, pair.dst, src)
|
||||
Copy(fdst, pair.dst, p)
|
||||
}
|
||||
Stats.DoneTransferring(src)
|
||||
Stats.DoneTransferring(p)
|
||||
}
|
||||
}
|
||||
|
||||
@ -317,7 +319,8 @@ func PairMover(in ObjectPairChan, fdst Fs, wg *sync.WaitGroup) {
|
||||
for pair := range in {
|
||||
src := pair.src
|
||||
dst := pair.dst
|
||||
Stats.Transferring(src)
|
||||
p := NewTracker(src)
|
||||
Stats.Transferring(p)
|
||||
if Config.DryRun {
|
||||
Debug(src, "Not moving as --dry-run")
|
||||
} else if haveMover {
|
||||
@ -332,9 +335,9 @@ func PairMover(in ObjectPairChan, fdst Fs, wg *sync.WaitGroup) {
|
||||
fdstMover.Move(src, src.Remote())
|
||||
Debug(src, "Moved")
|
||||
} else {
|
||||
Copy(fdst, pair.dst, src)
|
||||
Copy(fdst, pair.dst, p)
|
||||
}
|
||||
Stats.DoneTransferring(src)
|
||||
Stats.DoneTransferring(p)
|
||||
}
|
||||
}
|
||||
|
||||
@ -349,9 +352,10 @@ func DeleteFiles(to_be_deleted ObjectsChan) {
|
||||
if Config.DryRun {
|
||||
Debug(dst, "Not deleting as --dry-run")
|
||||
} else {
|
||||
Stats.Checking(dst)
|
||||
p := NewTracker(dst)
|
||||
Stats.Checking(p)
|
||||
err := dst.Remove()
|
||||
Stats.DoneChecking(dst)
|
||||
Stats.DoneChecking(p)
|
||||
if err != nil {
|
||||
Stats.Error()
|
||||
ErrorLog(dst, "Couldn't delete: %s", err)
|
||||
@ -562,15 +566,16 @@ func Check(fdst, fsrc Fs) error {
|
||||
defer checkerWg.Done()
|
||||
for check := range checks {
|
||||
dst, src := check[0], check[1]
|
||||
Stats.Checking(src)
|
||||
p := NewTracker(src)
|
||||
Stats.Checking(p)
|
||||
if src.Size() != dst.Size() {
|
||||
Stats.DoneChecking(src)
|
||||
Stats.DoneChecking(p)
|
||||
Stats.Error()
|
||||
ErrorLog(src, "Sizes differ")
|
||||
continue
|
||||
}
|
||||
same, _, err := CheckMd5sums(src, dst)
|
||||
Stats.DoneChecking(src)
|
||||
Stats.DoneChecking(p)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
@ -632,30 +637,32 @@ func List(f Fs, w io.Writer) error {
|
||||
})
|
||||
}
|
||||
|
||||
// List the Fs to stdout
|
||||
// List the Fs to the supplied writer
|
||||
//
|
||||
// Shows size, mod time and path
|
||||
//
|
||||
// Lists in parallel which may get them out of order
|
||||
func ListLong(f Fs, w io.Writer) error {
|
||||
return ListFn(f, func(o Object) {
|
||||
Stats.Checking(o)
|
||||
p := NewTracker(o)
|
||||
Stats.Checking(p)
|
||||
modTime := o.ModTime()
|
||||
Stats.DoneChecking(o)
|
||||
Stats.DoneChecking(p)
|
||||
syncFprintf(w, "%9d %s %s\n", o.Size(), modTime.Format("2006-01-02 15:04:05.000000000"), o.Remote())
|
||||
})
|
||||
}
|
||||
|
||||
// List the Fs to stdout
|
||||
// List the Fs to the supplied writer
|
||||
//
|
||||
// Produces the same output as the md5sum command
|
||||
//
|
||||
// Lists in parallel which may get them out of order
|
||||
func Md5sum(f Fs, w io.Writer) error {
|
||||
return ListFn(f, func(o Object) {
|
||||
Stats.Checking(o)
|
||||
p := NewTracker(o)
|
||||
Stats.Checking(p)
|
||||
md5sum, err := o.Md5sum()
|
||||
Stats.DoneChecking(o)
|
||||
Stats.DoneChecking(p)
|
||||
if err != nil {
|
||||
Debug(o, "Failed to read MD5: %v", err)
|
||||
md5sum = "ERROR"
|
||||
|
Loading…
Reference in New Issue
Block a user