mirror of
https://github.com/zrepl/zrepl.git
synced 2025-04-20 09:29:34 +02:00
Changes done so far: - signal to route concurrency request up to the stepQueue - pretty hacky, no status reporting yet - stepQueue upsizing (confirmed that it works, no intermediary commit) Stuck at: stepQueue downsizing - idea was to have the stepQueue signal to the activated step that it should suspend - ideally, we'd just kill everything associated with the step and track it as restartable - the driver model doesn't allow for that though within an attempt - would need to start a separate run for the step - less perfect: tell the downsized steps to stop copying, but leave all zfs sends + rpc conns open - - doesn't give back the resoures that a step aquired before being selected as downsize victim (open zfs processe + TCP conn, some memory in the pipe) - - looks weird to user if the ps aux - + re-waking a step is easy: just tell it to proceed with copying - (the impl would likely pass a check function down into Step.do and have it check that functino periodically. the suspend should be acknowledged, and stepQueue should only remove the step from the active queue _after_ that step has acknowledged that is suspended)
222 lines
4.8 KiB
Go
222 lines
4.8 KiB
Go
package driver
|
|
|
|
import (
|
|
"container/heap"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/zrepl/zrepl/util/chainlock"
|
|
)
|
|
|
|
type stepQueueRec struct {
|
|
ident interface{}
|
|
targetDate time.Time
|
|
wakeup chan StepCompletedFunc
|
|
cancelDueToConcurrencyDownsize interace{}
|
|
}
|
|
|
|
type stepQueue struct {
|
|
stop chan struct{}
|
|
reqs chan stepQueueRec
|
|
|
|
// l protects all members except the channels above
|
|
|
|
l *chainlock.L
|
|
pendingCond *sync.Cond
|
|
|
|
// ident => queueItem
|
|
pending *stepQueueHeap
|
|
active *stepQueueHeap
|
|
queueItems map[interface{}]*stepQueueHeapItem // for tracking used idents in both pending and active
|
|
|
|
// stopped is used for cancellation of "wake" goroutine
|
|
stopped bool
|
|
|
|
concurrency int
|
|
}
|
|
|
|
type stepQueueHeapItem struct {
|
|
idx int
|
|
req *stepQueueRec
|
|
}
|
|
type stepQueueHeap struct {
|
|
items []*stepQueueHeapItem
|
|
reverse bool // never change after pushing first element
|
|
}
|
|
|
|
func (h stepQueueHeap) Less(i, j int) bool {
|
|
res := h.items[i].req.targetDate.Before(h.items[j].req.targetDate)
|
|
if h.reverse {
|
|
return !res
|
|
}
|
|
return res
|
|
}
|
|
|
|
func (h stepQueueHeap) Swap(i, j int) {
|
|
h.items[i], h.items[j] = h.items[j], h.items[i]
|
|
h.items[i].idx = i
|
|
h.items[j].idx = j
|
|
}
|
|
|
|
func (h stepQueueHeap) Len() int {
|
|
return len(h.items)
|
|
}
|
|
|
|
func (h *stepQueueHeap) Push(elem interface{}) {
|
|
hitem := elem.(*stepQueueHeapItem)
|
|
hitem.idx = h.Len()
|
|
h.items = append(h.items, hitem)
|
|
}
|
|
|
|
func (h *stepQueueHeap) Pop() interface{} {
|
|
elem := h.items[h.Len()-1]
|
|
elem.idx = -1
|
|
h.items = h.items[:h.Len()-1]
|
|
return elem
|
|
}
|
|
|
|
// returned stepQueue must be closed with method Close
|
|
func newStepQueue(concurrency int) *stepQueue {
|
|
l := chainlock.New()
|
|
q := &stepQueue{
|
|
stop: make(chan struct{}),
|
|
reqs: make(chan stepQueueRec),
|
|
l: l,
|
|
pendingCond: l.NewCond(),
|
|
// priority queue
|
|
pending: &stepQueueHeap{reverse: false},
|
|
active: &stepQueueHeap{reverse: true},
|
|
// ident => queueItem
|
|
queueItems: make(map[interface{}]*stepQueueHeapItem),
|
|
// stopped is used for cancellation of "wake" goroutine
|
|
stopped: false,
|
|
}
|
|
err := q.setConcurrencyLocked(concurrency)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return q
|
|
}
|
|
|
|
// the returned done function must be called to free resources
|
|
// allocated by the call to Start
|
|
//
|
|
// No WaitReady calls must be active at the time done is called
|
|
// The behavior of calling WaitReady after done was called is undefined
|
|
func (q *stepQueue) Start() (done func()) {
|
|
go func() { // "stopper" goroutine
|
|
<-q.stop
|
|
defer q.l.Lock().Unlock()
|
|
q.stopped = true
|
|
q.pendingCond.Broadcast()
|
|
}()
|
|
go func() { // "reqs" goroutine
|
|
for {
|
|
select {
|
|
case <-q.stop:
|
|
select {
|
|
case <-q.reqs:
|
|
panic("WaitReady call active while calling Close")
|
|
default:
|
|
return
|
|
}
|
|
case req := <-q.reqs:
|
|
func() {
|
|
defer q.l.Lock().Unlock()
|
|
if _, ok := q.queueItems[req.ident]; ok {
|
|
panic("WaitReady must not be called twice for the same ident")
|
|
}
|
|
qitem := &stepQueueHeapItem{
|
|
req: req,
|
|
}
|
|
q.queueItems[req.ident] = qitem
|
|
heap.Push(q.pending, qitem)
|
|
q.pendingCond.Broadcast()
|
|
}()
|
|
}
|
|
}
|
|
}()
|
|
go func() { // "wake" goroutine
|
|
defer q.l.Lock().Unlock()
|
|
for {
|
|
|
|
for !q.stopped && (q.active.Len() >= q.concurrency || q.pending.Len() == 0) {
|
|
q.pendingCond.Wait()
|
|
}
|
|
if q.stopped {
|
|
return
|
|
}
|
|
if q.pending.Len() <= 0 {
|
|
return
|
|
}
|
|
|
|
// pop from tracked items
|
|
next := heap.Pop(q.pending).(*stepQueueHeapItem)
|
|
|
|
next.req.cancelDueToConcurrencyDownsize =
|
|
|
|
heap.Push(q.active, next)
|
|
|
|
next.req.wakeup <- func() {
|
|
defer q.l.Lock().Unlock()
|
|
|
|
//
|
|
qitem := &stepQueueHeapItem{
|
|
req: req,
|
|
}
|
|
|
|
// delete(q.queueItems, next.req.ident) // def
|
|
|
|
q.pendingCond.Broadcast()
|
|
}
|
|
}
|
|
}()
|
|
|
|
done = func() {
|
|
close(q.stop)
|
|
}
|
|
return done
|
|
}
|
|
|
|
type StepCompletedFunc func()
|
|
|
|
func (q *stepQueue) sendAndWaitForWakeup(ident interface{}, targetDate time.Time) StepCompletedFunc {
|
|
req := stepQueueRec{
|
|
ident,
|
|
targetDate,
|
|
make(chan StepCompletedFunc),
|
|
}
|
|
q.reqs <- req
|
|
return <-req.wakeup
|
|
}
|
|
|
|
// Wait for the ident with targetDate to be selected to run.
|
|
func (q *stepQueue) WaitReady(ident interface{}, targetDate time.Time) StepCompletedFunc {
|
|
if targetDate.IsZero() {
|
|
panic("targetDate of zero is reserved for marking Done")
|
|
}
|
|
return q.sendAndWaitForWakeup(ident, targetDate)
|
|
}
|
|
|
|
// caller must hold lock
|
|
func (q *stepQueue) setConcurrencyLocked(newConcurrency int) error {
|
|
if !(newConcurrency >= 1) {
|
|
return fmt.Errorf("concurrency must be >= 1 but requested %v", newConcurrency)
|
|
}
|
|
q.concurrency = newConcurrency
|
|
q.pendingCond.Broadcast() // wake up waiters who could make progress
|
|
|
|
for q.active.Len() > q.concurrency {
|
|
item := heap.Pop(q.active).(*stepQueueHeapItem)
|
|
item.req.cancelDueToConcurrencyDownsize()
|
|
heap.Push(q.pending, item)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (q *stepQueue) SetConcurrency(new int) error {
|
|
defer q.l.Lock().Unlock()
|
|
return q.setConcurrencyLocked(new)
|
|
}
|