fs: fix goroutine leak and improve stats accounting process

This fixes the go routine leak in the stats accounting

- don't start stats average loop when initializing `StatsInfo`
- stop the loop instead of pausing
- use a context instead of a channel
- move `period` variable in `averageValues` struct

Fixes #8570
This commit is contained in:
Nathanael Demacon 2025-06-04 14:43:19 +01:00 committed by GitHub
parent 5173ca0454
commit ddebca8d42
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 20 additions and 49 deletions

View File

@ -68,10 +68,11 @@ type StatsInfo struct {
type averageValues struct { type averageValues struct {
mu sync.Mutex mu sync.Mutex
period float64
lpBytes int64 lpBytes int64
lpTime time.Time lpTime time.Time
speed float64 speed float64
stop chan bool cancel context.CancelFunc
stopped sync.WaitGroup stopped sync.WaitGroup
started bool started bool
} }
@ -88,7 +89,6 @@ func NewStats(ctx context.Context) *StatsInfo {
startTime: time.Now(), startTime: time.Now(),
average: averageValues{}, average: averageValues{},
} }
s.startAverageLoop()
return s return s
} }
@ -328,84 +328,52 @@ func (s *StatsInfo) calculateTransferStats() (ts transferStats) {
return ts return ts
} }
func (s *StatsInfo) averageLoop() { func (s *StatsInfo) averageLoop(ctx context.Context) {
var period float64
ticker := time.NewTicker(averagePeriodLength) ticker := time.NewTicker(averagePeriodLength)
defer ticker.Stop() defer ticker.Stop()
a := &s.average a := &s.average
defer a.stopped.Done() defer a.stopped.Done()
shouldRun := false
for { for {
select { select {
case now := <-ticker.C: case now := <-ticker.C:
a.mu.Lock() a.mu.Lock()
if !shouldRun {
a.mu.Unlock()
continue
}
avg := 0.0 avg := 0.0
elapsed := now.Sub(a.lpTime).Seconds() elapsed := now.Sub(a.lpTime).Seconds()
if elapsed > 0 { if elapsed > 0 {
avg = float64(a.lpBytes) / elapsed avg = float64(a.lpBytes) / elapsed
} }
if period < averagePeriod { if a.period < averagePeriod {
period++ a.period++
} }
a.speed = (avg + a.speed*(period-1)) / period a.speed = (avg + a.speed*(a.period-1)) / a.period
a.lpBytes = 0 a.lpBytes = 0
a.lpTime = now a.lpTime = now
a.mu.Unlock() a.mu.Unlock()
case stop, ok := <-a.stop: case <-ctx.Done():
if !ok { // Stop the loop
return // Channel closed, exit the loop return
}
a.mu.Lock()
// If we are resuming, store the current time
if !shouldRun && !stop {
a.lpTime = time.Now()
}
shouldRun = !stop
a.mu.Unlock()
} }
} }
} }
// Resume the average loop
func (s *StatsInfo) resumeAverageLoop() {
s.mu.Lock()
defer s.mu.Unlock()
s.average.stop <- false
}
// Pause the average loop
func (s *StatsInfo) pauseAverageLoop() {
s.mu.Lock()
defer s.mu.Unlock()
s.average.stop <- true
}
// Start the average loop // Start the average loop
// //
// Call with the mutex held // Call with the mutex held
func (s *StatsInfo) _startAverageLoop() { func (s *StatsInfo) _startAverageLoop() {
if !s.average.started { if !s.average.started {
s.average.stop = make(chan bool) ctx, cancel := context.WithCancel(s.ctx)
s.average.cancel = cancel
s.average.started = true s.average.started = true
s.average.stopped.Add(1) s.average.stopped.Add(1)
go s.averageLoop() s.average.lpTime = time.Now()
go s.averageLoop(ctx)
} }
} }
@ -421,7 +389,7 @@ func (s *StatsInfo) startAverageLoop() {
// Call with the mutex held // Call with the mutex held
func (s *StatsInfo) _stopAverageLoop() { func (s *StatsInfo) _stopAverageLoop() {
if s.average.started { if s.average.started {
close(s.average.stop) s.average.cancel()
s.average.stopped.Wait() s.average.stopped.Wait()
} }
} }
@ -839,7 +807,7 @@ func (s *StatsInfo) NewTransfer(obj fs.DirEntry, dstFs fs.Fs) *Transfer {
} }
tr := newTransfer(s, obj, srcFs, dstFs) tr := newTransfer(s, obj, srcFs, dstFs)
s.transferring.add(tr) s.transferring.add(tr)
s.resumeAverageLoop() s.startAverageLoop()
return tr return tr
} }
@ -847,7 +815,7 @@ func (s *StatsInfo) NewTransfer(obj fs.DirEntry, dstFs fs.Fs) *Transfer {
func (s *StatsInfo) NewTransferRemoteSize(remote string, size int64, srcFs, dstFs fs.Fs) *Transfer { func (s *StatsInfo) NewTransferRemoteSize(remote string, size int64, srcFs, dstFs fs.Fs) *Transfer {
tr := newTransferRemoteSize(s, remote, size, false, "", srcFs, dstFs) tr := newTransferRemoteSize(s, remote, size, false, "", srcFs, dstFs)
s.transferring.add(tr) s.transferring.add(tr)
s.resumeAverageLoop() s.startAverageLoop()
return tr return tr
} }
@ -862,7 +830,9 @@ func (s *StatsInfo) DoneTransferring(remote string, ok bool) {
s.mu.Unlock() s.mu.Unlock()
} }
if s.transferring.empty() && s.checking.empty() { if s.transferring.empty() && s.checking.empty() {
s.pauseAverageLoop() s.mu.Lock()
s._stopAverageLoop()
s.mu.Unlock()
} }
} }

View File

@ -298,6 +298,7 @@ func GlobalStats() *StatsInfo {
// NewStatsGroup creates new stats under named group. // NewStatsGroup creates new stats under named group.
func NewStatsGroup(ctx context.Context, group string) *StatsInfo { func NewStatsGroup(ctx context.Context, group string) *StatsInfo {
stats := NewStats(ctx) stats := NewStats(ctx)
stats.startAverageLoop()
stats.group = group stats.group = group
groups.set(ctx, group, stats) groups.set(ctx, group, stats)
return stats return stats