From 40bba359e2b48d47883b77e28b16780d35951f84 Mon Sep 17 00:00:00 2001 From: Anagh Kumar Baranwal <6824881+darthShadow@users.noreply.github.com> Date: Tue, 17 Sep 2024 18:09:54 +0530 Subject: [PATCH] stats: fix the speed not getting updated after a pause in the processing This shifts the behavior of the average loop to be a persistent loop that gets resumed/paused when transfers & checks are started/completed. Previously, the averageLoop was stopped on completion of transfers & checks but failed to start again due to the protection of the sync.Once Signed-off-by: Anagh Kumar Baranwal <6824881+darthShadow@users.noreply.github.com> --- fs/accounting/stats.go | 118 +++++++++++++++++++++++++++-------------- 1 file changed, 78 insertions(+), 40 deletions(-) diff --git a/fs/accounting/stats.go b/fs/accounting/stats.go index e46f95e5c..5e86d27ef 100644 --- a/fs/accounting/stats.go +++ b/fs/accounting/stats.go @@ -65,28 +65,29 @@ type StatsInfo struct { } type averageValues struct { - mu sync.Mutex - lpBytes int64 - lpTime time.Time - speed float64 - stop chan bool - stopped sync.WaitGroup - startOnce sync.Once - stopOnce sync.Once + mu sync.Mutex + lpBytes int64 + lpTime time.Time + speed float64 + stop chan bool + stopped sync.WaitGroup + started bool } // NewStats creates an initialised StatsInfo func NewStats(ctx context.Context) *StatsInfo { ci := fs.GetConfig(ctx) - return &StatsInfo{ + s := &StatsInfo{ ctx: ctx, ci: ci, checking: newTransferMap(ci.Checkers, "checking"), transferring: newTransferMap(ci.Transfers, "transferring"), inProgress: newInProgress(ctx), startTime: time.Now(), - average: averageValues{stop: make(chan bool)}, + average: averageValues{}, } + s.startAverageLoop() + return s } // RemoteStats returns stats for rc @@ -328,61 +329,97 @@ func (s *StatsInfo) averageLoop() { ticker := time.NewTicker(averagePeriodLength) defer ticker.Stop() - startTime := time.Now() a := &s.average defer a.stopped.Done() + + shouldRun := false + for { select { case now := <-ticker.C: a.mu.Lock() - var elapsed float64 - if a.lpTime.IsZero() { - elapsed = now.Sub(startTime).Seconds() - } else { - elapsed = now.Sub(a.lpTime).Seconds() + + if !shouldRun { + a.mu.Unlock() + continue } + avg := 0.0 + elapsed := now.Sub(a.lpTime).Seconds() if elapsed > 0 { avg = float64(a.lpBytes) / elapsed } + if period < averagePeriod { period++ } + a.speed = (avg + a.speed*(period-1)) / period a.lpBytes = 0 a.lpTime = now + + a.mu.Unlock() + + case stop, ok := <-a.stop: + if !ok { + return // Channel closed, exit the loop + } + + a.mu.Lock() + + // If we are resuming, store the current time + if !shouldRun && !stop { + a.lpTime = time.Now() + } + shouldRun = !stop + a.mu.Unlock() - case <-a.stop: - return } } } +// Resume the average loop +func (s *StatsInfo) resumeAverageLoop() { + s.mu.Lock() + defer s.mu.Unlock() + s.average.stop <- false +} + +// Pause the average loop +func (s *StatsInfo) pauseAverageLoop() { + s.mu.Lock() + defer s.mu.Unlock() + s.average.stop <- true +} + +// Start the average loop +// +// Call with the mutex held +func (s *StatsInfo) _startAverageLoop() { + if !s.average.started { + s.average.stop = make(chan bool) + s.average.started = true + s.average.stopped.Add(1) + go s.averageLoop() + } +} + // Start the average loop func (s *StatsInfo) startAverageLoop() { - s.mu.RLock() - defer s.mu.RUnlock() - s.average.startOnce.Do(func() { - s.average.stopped.Add(1) - go s.averageLoop() - }) + s.mu.Lock() + defer s.mu.Unlock() + s._startAverageLoop() } // Stop the average loop // // Call with the mutex held func (s *StatsInfo) _stopAverageLoop() { - s.average.stopOnce.Do(func() { + if s.average.started { close(s.average.stop) s.average.stopped.Wait() - }) -} - -// Stop the average loop -func (s *StatsInfo) stopAverageLoop() { - s.mu.RLock() - defer s.mu.RUnlock() - s._stopAverageLoop() + s.average.started = false + } } // String convert the StatsInfo to a string for printing @@ -564,9 +601,9 @@ func (s *StatsInfo) GetBytesWithPending() int64 { pending := int64(0) for _, tr := range s.startedTransfers { if tr.acc != nil { - bytes, size := tr.acc.progress() - if bytes < size { - pending += size - bytes + bytesRead, size := tr.acc.progress() + if bytesRead < size { + pending += size - bytesRead } } } @@ -699,7 +736,8 @@ func (s *StatsInfo) ResetCounters() { s.oldDuration = 0 s._stopAverageLoop() - s.average = averageValues{stop: make(chan bool)} + s.average = averageValues{} + s._startAverageLoop() } // ResetErrors sets the errors count to 0 and resets lastError, fatalError and retryError @@ -788,7 +826,7 @@ func (s *StatsInfo) NewTransfer(obj fs.DirEntry, dstFs fs.Fs) *Transfer { } tr := newTransfer(s, obj, srcFs, dstFs) s.transferring.add(tr) - s.startAverageLoop() + s.resumeAverageLoop() return tr } @@ -796,7 +834,7 @@ func (s *StatsInfo) NewTransfer(obj fs.DirEntry, dstFs fs.Fs) *Transfer { func (s *StatsInfo) NewTransferRemoteSize(remote string, size int64, srcFs, dstFs fs.Fs) *Transfer { tr := newTransferRemoteSize(s, remote, size, false, "", srcFs, dstFs) s.transferring.add(tr) - s.startAverageLoop() + s.resumeAverageLoop() return tr } @@ -811,7 +849,7 @@ func (s *StatsInfo) DoneTransferring(remote string, ok bool) { s.mu.Unlock() } if s.transferring.empty() && s.checking.empty() { - time.AfterFunc(averageStopAfter, s.stopAverageLoop) + s.pauseAverageLoop() } }