diff --git a/fs/accounting/stats.go b/fs/accounting/stats.go index 18249e300..f8a9980d8 100644 --- a/fs/accounting/stats.go +++ b/fs/accounting/stats.go @@ -68,10 +68,11 @@ type StatsInfo struct { type averageValues struct { mu sync.Mutex + period float64 lpBytes int64 lpTime time.Time speed float64 - stop chan bool + cancel context.CancelFunc stopped sync.WaitGroup started bool } @@ -88,7 +89,6 @@ func NewStats(ctx context.Context) *StatsInfo { startTime: time.Now(), average: averageValues{}, } - s.startAverageLoop() return s } @@ -328,84 +328,52 @@ func (s *StatsInfo) calculateTransferStats() (ts transferStats) { return ts } -func (s *StatsInfo) averageLoop() { - var period float64 - +func (s *StatsInfo) averageLoop(ctx context.Context) { ticker := time.NewTicker(averagePeriodLength) defer ticker.Stop() a := &s.average defer a.stopped.Done() - shouldRun := false - for { select { case now := <-ticker.C: a.mu.Lock() - if !shouldRun { - a.mu.Unlock() - continue - } - avg := 0.0 elapsed := now.Sub(a.lpTime).Seconds() if elapsed > 0 { avg = float64(a.lpBytes) / elapsed } - if period < averagePeriod { - period++ + if a.period < averagePeriod { + a.period++ } - a.speed = (avg + a.speed*(period-1)) / period + a.speed = (avg + a.speed*(a.period-1)) / a.period a.lpBytes = 0 a.lpTime = now a.mu.Unlock() - case stop, ok := <-a.stop: - if !ok { - return // Channel closed, exit the loop - } - - a.mu.Lock() - - // If we are resuming, store the current time - if !shouldRun && !stop { - a.lpTime = time.Now() - } - shouldRun = !stop - - a.mu.Unlock() + case <-ctx.Done(): + // Stop the loop + return } } } -// Resume the average loop -func (s *StatsInfo) resumeAverageLoop() { - s.mu.Lock() - defer s.mu.Unlock() - s.average.stop <- false -} - -// Pause the average loop -func (s *StatsInfo) pauseAverageLoop() { - s.mu.Lock() - defer s.mu.Unlock() - s.average.stop <- true -} - // Start the average loop // // Call with the mutex held func (s *StatsInfo) _startAverageLoop() { if !s.average.started { - s.average.stop = make(chan bool) + ctx, cancel := context.WithCancel(s.ctx) + s.average.cancel = cancel s.average.started = true s.average.stopped.Add(1) - go s.averageLoop() + s.average.lpTime = time.Now() + go s.averageLoop(ctx) } } @@ -421,7 +389,7 @@ func (s *StatsInfo) startAverageLoop() { // Call with the mutex held func (s *StatsInfo) _stopAverageLoop() { if s.average.started { - close(s.average.stop) + s.average.cancel() s.average.stopped.Wait() } } @@ -839,7 +807,7 @@ func (s *StatsInfo) NewTransfer(obj fs.DirEntry, dstFs fs.Fs) *Transfer { } tr := newTransfer(s, obj, srcFs, dstFs) s.transferring.add(tr) - s.resumeAverageLoop() + s.startAverageLoop() return tr } @@ -847,7 +815,7 @@ func (s *StatsInfo) NewTransfer(obj fs.DirEntry, dstFs fs.Fs) *Transfer { func (s *StatsInfo) NewTransferRemoteSize(remote string, size int64, srcFs, dstFs fs.Fs) *Transfer { tr := newTransferRemoteSize(s, remote, size, false, "", srcFs, dstFs) s.transferring.add(tr) - s.resumeAverageLoop() + s.startAverageLoop() return tr } @@ -862,7 +830,9 @@ func (s *StatsInfo) DoneTransferring(remote string, ok bool) { s.mu.Unlock() } if s.transferring.empty() && s.checking.empty() { - s.pauseAverageLoop() + s.mu.Lock() + s._stopAverageLoop() + s.mu.Unlock() } } diff --git a/fs/accounting/stats_groups.go b/fs/accounting/stats_groups.go index 3c971d831..f209d4e64 100644 --- a/fs/accounting/stats_groups.go +++ b/fs/accounting/stats_groups.go @@ -298,6 +298,7 @@ func GlobalStats() *StatsInfo { // NewStatsGroup creates new stats under named group. func NewStatsGroup(ctx context.Context, group string) *StatsInfo { stats := NewStats(ctx) + stats.startAverageLoop() stats.group = group groups.set(ctx, group, stats) return stats