stats: fix the speed not getting updated after a pause in the processing

This shifts the behavior of the average loop to be a persistent loop
that gets resumed/paused when transfers & checks are started/completed.

Previously, the averageLoop was stopped on completion of
transfers & checks but failed to start again due to the protection of
the sync.Once

Signed-off-by: Anagh Kumar Baranwal <6824881+darthShadow@users.noreply.github.com>
This commit is contained in:
Anagh Kumar Baranwal 2024-09-17 18:09:54 +05:30
parent f639cd9c78
commit 40bba359e2
No known key found for this signature in database

View File

@ -65,28 +65,29 @@ type StatsInfo struct {
}
type averageValues struct {
mu sync.Mutex
lpBytes int64
lpTime time.Time
speed float64
stop chan bool
stopped sync.WaitGroup
startOnce sync.Once
stopOnce sync.Once
mu sync.Mutex
lpBytes int64
lpTime time.Time
speed float64
stop chan bool
stopped sync.WaitGroup
started bool
}
// NewStats creates an initialised StatsInfo
func NewStats(ctx context.Context) *StatsInfo {
ci := fs.GetConfig(ctx)
return &StatsInfo{
s := &StatsInfo{
ctx: ctx,
ci: ci,
checking: newTransferMap(ci.Checkers, "checking"),
transferring: newTransferMap(ci.Transfers, "transferring"),
inProgress: newInProgress(ctx),
startTime: time.Now(),
average: averageValues{stop: make(chan bool)},
average: averageValues{},
}
s.startAverageLoop()
return s
}
// RemoteStats returns stats for rc
@ -328,61 +329,97 @@ func (s *StatsInfo) averageLoop() {
ticker := time.NewTicker(averagePeriodLength)
defer ticker.Stop()
startTime := time.Now()
a := &s.average
defer a.stopped.Done()
shouldRun := false
for {
select {
case now := <-ticker.C:
a.mu.Lock()
var elapsed float64
if a.lpTime.IsZero() {
elapsed = now.Sub(startTime).Seconds()
} else {
elapsed = now.Sub(a.lpTime).Seconds()
if !shouldRun {
a.mu.Unlock()
continue
}
avg := 0.0
elapsed := now.Sub(a.lpTime).Seconds()
if elapsed > 0 {
avg = float64(a.lpBytes) / elapsed
}
if period < averagePeriod {
period++
}
a.speed = (avg + a.speed*(period-1)) / period
a.lpBytes = 0
a.lpTime = now
a.mu.Unlock()
case stop, ok := <-a.stop:
if !ok {
return // Channel closed, exit the loop
}
a.mu.Lock()
// If we are resuming, store the current time
if !shouldRun && !stop {
a.lpTime = time.Now()
}
shouldRun = !stop
a.mu.Unlock()
case <-a.stop:
return
}
}
}
// Resume the average loop
func (s *StatsInfo) resumeAverageLoop() {
s.mu.Lock()
defer s.mu.Unlock()
s.average.stop <- false
}
// Pause the average loop
func (s *StatsInfo) pauseAverageLoop() {
s.mu.Lock()
defer s.mu.Unlock()
s.average.stop <- true
}
// Start the average loop
//
// Call with the mutex held
func (s *StatsInfo) _startAverageLoop() {
if !s.average.started {
s.average.stop = make(chan bool)
s.average.started = true
s.average.stopped.Add(1)
go s.averageLoop()
}
}
// Start the average loop
func (s *StatsInfo) startAverageLoop() {
s.mu.RLock()
defer s.mu.RUnlock()
s.average.startOnce.Do(func() {
s.average.stopped.Add(1)
go s.averageLoop()
})
s.mu.Lock()
defer s.mu.Unlock()
s._startAverageLoop()
}
// Stop the average loop
//
// Call with the mutex held
func (s *StatsInfo) _stopAverageLoop() {
s.average.stopOnce.Do(func() {
if s.average.started {
close(s.average.stop)
s.average.stopped.Wait()
})
}
// Stop the average loop
func (s *StatsInfo) stopAverageLoop() {
s.mu.RLock()
defer s.mu.RUnlock()
s._stopAverageLoop()
s.average.started = false
}
}
// String convert the StatsInfo to a string for printing
@ -564,9 +601,9 @@ func (s *StatsInfo) GetBytesWithPending() int64 {
pending := int64(0)
for _, tr := range s.startedTransfers {
if tr.acc != nil {
bytes, size := tr.acc.progress()
if bytes < size {
pending += size - bytes
bytesRead, size := tr.acc.progress()
if bytesRead < size {
pending += size - bytesRead
}
}
}
@ -699,7 +736,8 @@ func (s *StatsInfo) ResetCounters() {
s.oldDuration = 0
s._stopAverageLoop()
s.average = averageValues{stop: make(chan bool)}
s.average = averageValues{}
s._startAverageLoop()
}
// ResetErrors sets the errors count to 0 and resets lastError, fatalError and retryError
@ -788,7 +826,7 @@ func (s *StatsInfo) NewTransfer(obj fs.DirEntry, dstFs fs.Fs) *Transfer {
}
tr := newTransfer(s, obj, srcFs, dstFs)
s.transferring.add(tr)
s.startAverageLoop()
s.resumeAverageLoop()
return tr
}
@ -796,7 +834,7 @@ func (s *StatsInfo) NewTransfer(obj fs.DirEntry, dstFs fs.Fs) *Transfer {
func (s *StatsInfo) NewTransferRemoteSize(remote string, size int64, srcFs, dstFs fs.Fs) *Transfer {
tr := newTransferRemoteSize(s, remote, size, false, "", srcFs, dstFs)
s.transferring.add(tr)
s.startAverageLoop()
s.resumeAverageLoop()
return tr
}
@ -811,7 +849,7 @@ func (s *StatsInfo) DoneTransferring(remote string, ok bool) {
s.mu.Unlock()
}
if s.transferring.empty() && s.checking.empty() {
time.AfterFunc(averageStopAfter, s.stopAverageLoop)
s.pauseAverageLoop()
}
}