zrepl/daemon/pruner/pruner_queue.go
Christian Schwarz 438f950be3 pruner: improve cancellation + error handling strategy
Pruner now backs off as soon as there is an error, making that error the
Error field in the pruner report.
The error is also stored in the specific *fs that failed, and we
maintain an error counter per *fs to de-prioritize those fs that failed.
Like with replication, the de-prioritization on errors is to avoid '
getting stuck' with an individual filesystem until the watchdog hits.
2018-10-20 12:46:43 +02:00

89 lines
1.7 KiB
Go

package pruner
import (
"sort"
"strings"
"sync"
)
type execQueue struct {
mtx sync.Mutex
pending, completed []*fs
}
func newExecQueue(cap int) *execQueue {
q := execQueue{
pending: make([]*fs, 0, cap),
completed: make([]*fs, 0, cap),
}
return &q
}
func (q *execQueue) Report() (pending, completed []FSReport) {
q.mtx.Lock()
defer q.mtx.Unlock()
pending = make([]FSReport, len(q.pending))
for i, fs := range q.pending {
pending[i] = fs.Report()
}
completed = make([]FSReport, len(q.completed))
for i, fs := range q.completed {
completed[i] = fs.Report()
}
return pending, completed
}
func (q *execQueue) HasCompletedFSWithErrors() bool {
q.mtx.Lock()
defer q.mtx.Unlock()
for _, fs := range q.completed {
if fs.execErrLast != nil {
return true
}
}
return false
}
func (q *execQueue) Pop() *fs {
if len(q.pending) == 0 {
return nil
}
fs := q.pending[0]
q.pending = q.pending[1:]
return fs
}
func(q *execQueue) Put(fs *fs, err error, done bool) {
fs.mtx.Lock()
fs.execErrLast = err
if err != nil {
fs.execErrCount++
}
if done || (err != nil && !shouldRetry(fs.execErrLast)) {
fs.mtx.Unlock()
q.mtx.Lock()
q.completed = append(q.completed, fs)
q.mtx.Unlock()
return
}
fs.mtx.Unlock()
q.mtx.Lock()
// inefficient priority q
q.pending = append(q.pending, fs)
sort.SliceStable(q.pending, func(i, j int) bool {
q.pending[i].mtx.Lock()
defer q.pending[i].mtx.Unlock()
q.pending[j].mtx.Lock()
defer q.pending[j].mtx.Unlock()
if q.pending[i].execErrCount != q.pending[j].execErrCount {
return q.pending[i].execErrCount < q.pending[j].execErrCount
}
return strings.Compare(q.pending[i].path, q.pending[j].path) == -1
})
q.mtx.Unlock()
}