zrepl/replication/driver/replication_stepqueue.go

164 lines
3.4 KiB
Go
Raw Normal View History

package driver
import (
"container/heap"
"time"
"github.com/zrepl/zrepl/util/chainlock"
)
type stepQueueRec struct {
ident interface{}
targetDate time.Time
wakeup chan StepCompletedFunc
}
type stepQueue struct {
stop chan struct{}
reqs chan stepQueueRec
}
type stepQueueHeapItem struct {
idx int
req stepQueueRec
}
type stepQueueHeap []*stepQueueHeapItem
func (h stepQueueHeap) Less(i, j int) bool {
return h[i].req.targetDate.Before(h[j].req.targetDate)
}
func (h stepQueueHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].idx = i
h[j].idx = j
}
func (h stepQueueHeap) Len() int {
return len(h)
}
func (h *stepQueueHeap) Push(elem interface{}) {
hitem := elem.(*stepQueueHeapItem)
hitem.idx = h.Len()
*h = append(*h, hitem)
}
func (h *stepQueueHeap) Pop() interface{} {
elem := (*h)[h.Len()-1]
elem.idx = -1
*h = (*h)[:h.Len()-1]
return elem
}
// returned stepQueue must be closed with method Close
func newStepQueue() *stepQueue {
q := &stepQueue{
stop: make(chan struct{}),
reqs: make(chan stepQueueRec),
}
return q
}
// the returned done function must be called to free resources
// allocated by the call to Start
//
// No WaitReady calls must be active at the time done is called
// The behavior of calling WaitReady after done was called is undefined
func (q *stepQueue) Start(concurrency int) (done func()) {
if concurrency < 1 {
panic("concurrency must be >= 1")
}
// l protects pending and queueItems
l := chainlock.New()
pendingCond := l.NewCond()
// priority queue
pending := &stepQueueHeap{}
// ident => queueItem
queueItems := make(map[interface{}]*stepQueueHeapItem)
// stopped is used for cancellation of "wake" goroutine
stopped := false
active := 0
go func() { // "stopper" goroutine
<-q.stop
defer l.Lock().Unlock()
stopped = true
pendingCond.Broadcast()
}()
go func() { // "reqs" goroutine
for {
select {
case <-q.stop:
select {
case <-q.reqs:
panic("WaitReady call active while calling Close")
default:
return
}
case req := <-q.reqs:
func() {
defer l.Lock().Unlock()
if _, ok := queueItems[req.ident]; ok {
panic("WaitReady must not be called twice for the same ident")
}
qitem := &stepQueueHeapItem{
req: req,
}
queueItems[req.ident] = qitem
heap.Push(pending, qitem)
pendingCond.Broadcast()
}()
}
}
}()
go func() { // "wake" goroutine
defer l.Lock().Unlock()
for {
for !stopped && (active >= concurrency || pending.Len() == 0) {
pendingCond.Wait()
}
if stopped {
return
}
if pending.Len() <= 0 {
return
}
active++
next := heap.Pop(pending).(*stepQueueHeapItem).req
delete(queueItems, next.ident)
next.wakeup <- func() {
defer l.Lock().Unlock()
active--
pendingCond.Broadcast()
}
}
}()
done = func() {
close(q.stop)
}
return done
}
type StepCompletedFunc func()
func (q *stepQueue) sendAndWaitForWakeup(ident interface{}, targetDate time.Time) StepCompletedFunc {
req := stepQueueRec{
ident,
targetDate,
make(chan StepCompletedFunc),
}
q.reqs <- req
return <-req.wakeup
}
// Wait for the ident with targetDate to be selected to run.
func (q *stepQueue) WaitReady(ident interface{}, targetDate time.Time) StepCompletedFunc {
if targetDate.IsZero() {
panic("targetDate of zero is reserved for marking Done")
}
return q.sendAndWaitForWakeup(ident, targetDate)
}