WIP runtime-controllable concurrency for replication

Changes done so far:
- signal to route concurrency request up to the stepQueue
    - pretty hacky, no status reporting yet
- stepQueue upsizing (confirmed that it works, no intermediary commit)

Stuck at: stepQueue downsizing
- idea was to have the stepQueue signal to the activated step that it
  should suspend
- ideally, we'd just kill everything associated with the step and track
  it as restartable
    - the driver model doesn't allow for that though within an attempt
    - would need to start a separate run for the step
- less perfect: tell the downsized steps to stop copying, but leave
  all zfs sends + rpc conns open
    - - doesn't give back the resoures that a step aquired before
      being selected as downsize victim (open zfs processe + TCP conn,
      some memory in the pipe)
    - - looks weird to user if the ps aux
    - + re-waking a step is easy: just tell it to proceed with copying
    - (the impl would likely pass a check function down into Step.do
      and have it check that functino periodically. the suspend should
      be acknowledged, and stepQueue should only remove the step from
      the active queue _after_ that step has acknowledged that is
      suspended)
This commit is contained in:
Christian Schwarz 2020-02-01 15:28:52 +01:00
parent b8d9f4ba92
commit 17add553d3
14 changed files with 244 additions and 75 deletions

View File

@ -9,7 +9,7 @@ import (
) )
var SignalCmd = &cli.Subcommand{ var SignalCmd = &cli.Subcommand{
Use: "signal [wakeup|reset] JOB", Use: "signal [wakeup|reset] JOB [DATA]",
Short: "wake up a job from wait state or abort its current invocation", Short: "wake up a job from wait state or abort its current invocation",
Run: func(subcommand *cli.Subcommand, args []string) error { Run: func(subcommand *cli.Subcommand, args []string) error {
return runSignalCmd(subcommand.Config(), args) return runSignalCmd(subcommand.Config(), args)
@ -17,8 +17,13 @@ var SignalCmd = &cli.Subcommand{
} }
func runSignalCmd(config *config.Config, args []string) error { func runSignalCmd(config *config.Config, args []string) error {
if len(args) != 2 { if len(args) < 2 || len(args) > 3 {
return errors.Errorf("Expected 2 arguments: [wakeup|reset] JOB") return errors.Errorf("Expected 2 arguments: [wakeup|reset|set-concurrency] JOB [DATA]")
}
var data string
if len(args) == 3 {
data = args[2]
} }
httpc, err := controlHttpClient(config.Global.Control.SockPath) httpc, err := controlHttpClient(config.Global.Control.SockPath)
@ -30,9 +35,11 @@ func runSignalCmd(config *config.Config, args []string) error {
struct { struct {
Name string Name string
Op string Op string
Data string
}{ }{
Name: args[1], Name: args[1],
Op: args[0], Op: args[0],
Data: data,
}, },
struct{}{}, struct{}{},
) )

View File

@ -8,6 +8,7 @@ import (
"io" "io"
"net" "net"
"net/http" "net/http"
"strconv"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -47,6 +48,8 @@ func (j *controlJob) OwnedDatasetSubtreeRoot() (p *zfs.DatasetPath, ok bool) { r
func (j *controlJob) SenderConfig() *endpoint.SenderConfig { return nil } func (j *controlJob) SenderConfig() *endpoint.SenderConfig { return nil }
func (j *controlJob) SetConcurrency(concurrency int) error { return errors.Errorf("not supported") }
var promControl struct { var promControl struct {
requestBegin *prometheus.CounterVec requestBegin *prometheus.CounterVec
requestFinished *prometheus.HistogramVec requestFinished *prometheus.HistogramVec
@ -126,6 +129,7 @@ func (j *controlJob) Run(ctx context.Context) {
type reqT struct { type reqT struct {
Name string Name string
Op string Op string
Data string
} }
var req reqT var req reqT
if decoder(&req) != nil { if decoder(&req) != nil {
@ -138,6 +142,14 @@ func (j *controlJob) Run(ctx context.Context) {
err = j.jobs.wakeup(req.Name) err = j.jobs.wakeup(req.Name)
case "reset": case "reset":
err = j.jobs.reset(req.Name) err = j.jobs.reset(req.Name)
case "set-concurrency":
var concurrency int
concurrency, err = strconv.Atoi(req.Data) // shadow
if err != nil {
// fallthrough outer
} else {
err = j.jobs.setConcurrency(req.Name, concurrency)
}
default: default:
err = fmt.Errorf("operation %q is invalid", req.Op) err = fmt.Errorf("operation %q is invalid", req.Op)
} }

View File

@ -176,6 +176,18 @@ func (s *jobs) reset(job string) error {
return wu() return wu()
} }
func (s *jobs) setConcurrency(jobName string, concurrency int) error {
s.m.RLock()
defer s.m.RUnlock()
job, ok := s.jobs[jobName]
if !ok {
return errors.Errorf("Job %q does not exist", job)
}
return job.SetConcurrency(concurrency)
}
const ( const (
jobNamePrometheus = "_prometheus" jobNamePrometheus = "_prometheus"
jobNameControl = "_control" jobNameControl = "_control"

View File

@ -55,9 +55,12 @@ const (
type activeSideTasks struct { type activeSideTasks struct {
state ActiveSideState state ActiveSideState
concurrency int
// valid for state ActiveSideReplicating, ActiveSidePruneSender, ActiveSidePruneReceiver, ActiveSideDone // valid for state ActiveSideReplicating, ActiveSidePruneSender, ActiveSidePruneReceiver, ActiveSideDone
replicationReport driver.ReportFunc replicationReport driver.ReportFunc
replicationCancel context.CancelFunc replicationCancel context.CancelFunc
replicationSetConcurrency driver.SetConcurrencyFunc
// valid for state ActiveSidePruneSender, ActiveSidePruneReceiver, ActiveSideDone // valid for state ActiveSidePruneSender, ActiveSidePruneReceiver, ActiveSideDone
prunerSender, prunerReceiver *pruner.Pruner prunerSender, prunerReceiver *pruner.Pruner
@ -278,6 +281,8 @@ func activeSide(g *config.Global, in *config.ActiveJob, configJob interface{}) (
return nil, errors.Wrap(err, "invalid job name") return nil, errors.Wrap(err, "invalid job name")
} }
j.tasks.concurrency = 1 // FIXME
switch v := configJob.(type) { switch v := configJob.(type) {
case *config.PushJob: case *config.PushJob:
j.mode, err = modePushFromConfig(g, v, j.name) // shadow j.mode, err = modePushFromConfig(g, v, j.name) // shadow
@ -375,6 +380,22 @@ func (j *ActiveSide) SenderConfig() *endpoint.SenderConfig {
return push.senderConfig return push.senderConfig
} }
func (j *ActiveSide) SetConcurrency(concurrency int) (err error) {
j.updateTasks(func(tasks *activeSideTasks) {
if tasks.replicationSetConcurrency != nil {
err = tasks.replicationSetConcurrency(concurrency) // no shadow
if err == nil {
tasks.concurrency = concurrency
}
} else {
// FIXME this is not great, should always be able to set it
err = errors.Errorf("cannot set while not replicating")
}
})
return err
}
func (j *ActiveSide) Run(ctx context.Context) { func (j *ActiveSide) Run(ctx context.Context) {
log := GetLogger(ctx) log := GetLogger(ctx)
ctx = logging.WithSubsystemLoggers(ctx, log) ctx = logging.WithSubsystemLoggers(ctx, log)
@ -436,11 +457,14 @@ func (j *ActiveSide) do(ctx context.Context) {
ctx, repCancel := context.WithCancel(ctx) ctx, repCancel := context.WithCancel(ctx)
var repWait driver.WaitFunc var repWait driver.WaitFunc
j.updateTasks(func(tasks *activeSideTasks) { j.updateTasks(func(tasks *activeSideTasks) {
// reset it // reset it (almost)
*tasks = activeSideTasks{} old := *tasks
*tasks = activeSideTasks{
concurrency: old.concurrency,
}
tasks.replicationCancel = repCancel tasks.replicationCancel = repCancel
tasks.replicationReport, repWait = replication.Do( tasks.replicationReport, repWait, tasks.replicationSetConcurrency = replication.Do(
ctx, logic.NewPlanner(j.promRepStateSecs, j.promBytesReplicated, sender, receiver, j.mode.PlannerPolicy()), ctx, tasks.concurrency, logic.NewPlanner(j.promRepStateSecs, j.promBytesReplicated, sender, receiver, j.mode.PlannerPolicy()),
) )
tasks.state = ActiveSideReplicating tasks.state = ActiveSideReplicating
}) })

View File

@ -40,6 +40,7 @@ type Job interface {
// must return the root of that subtree as rfs and ok = true // must return the root of that subtree as rfs and ok = true
OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool)
SenderConfig() *endpoint.SenderConfig SenderConfig() *endpoint.SenderConfig
SetConcurrency(concurrency int) error
} }
type Type string type Type string

View File

@ -163,6 +163,8 @@ func (j *PassiveSide) SenderConfig() *endpoint.SenderConfig {
func (*PassiveSide) RegisterMetrics(registerer prometheus.Registerer) {} func (*PassiveSide) RegisterMetrics(registerer prometheus.Registerer) {}
func (*PassiveSide) SetConcurrency(concurrency int) error { return errors.Errorf("not supported") }
func (j *PassiveSide) Run(ctx context.Context) { func (j *PassiveSide) Run(ctx context.Context) {
log := GetLogger(ctx) log := GetLogger(ctx)

View File

@ -117,6 +117,8 @@ outer:
} }
} }
func (*SnapJob) SetConcurrency(concurrency int) error { return errors.Errorf("not supported") }
// Adaptor that implements pruner.History around a pruner.Target. // Adaptor that implements pruner.History around a pruner.Target.
// The ReplicationCursor method is Get-op only and always returns // The ReplicationCursor method is Get-op only and always returns
// the filesystem's most recent version's GUID. // the filesystem's most recent version's GUID.

View File

@ -5,6 +5,7 @@ import (
"net" "net"
"net/http" "net/http"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
@ -51,6 +52,8 @@ func (j *prometheusJob) OwnedDatasetSubtreeRoot() (p *zfs.DatasetPath, ok bool)
func (j *prometheusJob) SenderConfig() *endpoint.SenderConfig { return nil } func (j *prometheusJob) SenderConfig() *endpoint.SenderConfig { return nil }
func (j *prometheusJob) SetConcurrency(concurrency int) error { return errors.Errorf("not supported") }
func (j *prometheusJob) RegisterMetrics(registerer prometheus.Registerer) {} func (j *prometheusJob) RegisterMetrics(registerer prometheus.Registerer) {}
func (j *prometheusJob) Run(ctx context.Context) { func (j *prometheusJob) Run(ctx context.Context) {

View File

@ -89,6 +89,8 @@ type attempt struct {
// if both are nil, it must be assumed that Planner.Plan is active // if both are nil, it must be assumed that Planner.Plan is active
planErr *timedError planErr *timedError
fss []*fs fss []*fs
concurrency int
} }
type timedError struct { type timedError struct {
@ -170,11 +172,12 @@ type step struct {
type ReportFunc func() *report.Report type ReportFunc func() *report.Report
type WaitFunc func(block bool) (done bool) type WaitFunc func(block bool) (done bool)
type SetConcurrencyFunc func(concurrency int) error
var maxAttempts = envconst.Int64("ZREPL_REPLICATION_MAX_ATTEMPTS", 3) var maxAttempts = envconst.Int64("ZREPL_REPLICATION_MAX_ATTEMPTS", 3)
var reconnectHardFailTimeout = envconst.Duration("ZREPL_REPLICATION_RECONNECT_HARD_FAIL_TIMEOUT", 10*time.Minute) var reconnectHardFailTimeout = envconst.Duration("ZREPL_REPLICATION_RECONNECT_HARD_FAIL_TIMEOUT", 10*time.Minute)
func Do(ctx context.Context, planner Planner) (ReportFunc, WaitFunc) { func Do(ctx context.Context, initialConcurrency int, planner Planner) (ReportFunc, WaitFunc, SetConcurrencyFunc) {
log := getLog(ctx) log := getLog(ctx)
l := chainlock.New() l := chainlock.New()
run := &run{ run := &run{
@ -182,6 +185,8 @@ func Do(ctx context.Context, planner Planner) (ReportFunc, WaitFunc) {
startedAt: time.Now(), startedAt: time.Now(),
} }
concurrencyChanges := make(chan concurrencyChange)
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
defer close(done) defer close(done)
@ -198,15 +203,21 @@ func Do(ctx context.Context, planner Planner) (ReportFunc, WaitFunc) {
run.waitReconnect.SetZero() run.waitReconnect.SetZero()
run.waitReconnectError = nil run.waitReconnectError = nil
prevConcurrency := initialConcurrency // FIXME default concurrency
if prev != nil {
prevConcurrency = prev.concurrency
}
// do current attempt // do current attempt
cur := &attempt{ cur := &attempt{
l: l, l: l,
startedAt: time.Now(), startedAt: time.Now(),
planner: planner, planner: planner,
concurrency: prevConcurrency,
} }
run.attempts = append(run.attempts, cur) run.attempts = append(run.attempts, cur)
run.l.DropWhile(func() { run.l.DropWhile(func() {
cur.do(ctx, prev) cur.do(ctx, prev, concurrencyChanges)
}) })
prev = cur prev = cur
if ctx.Err() != nil { if ctx.Err() != nil {
@ -277,10 +288,25 @@ func Do(ctx context.Context, planner Planner) (ReportFunc, WaitFunc) {
defer run.l.Lock().Unlock() defer run.l.Lock().Unlock()
return run.report() return run.report()
} }
return report, wait setConcurrency := func(concurrency int) (reterr error) {
var wg sync.WaitGroup
wg.Add(1)
concurrencyChanges <- concurrencyChange{ concurrency, func(err error) {
defer wg.Done()
reterr = err // shadow
}}
wg.Wait()
return reterr
}
return report, wait, setConcurrency
} }
func (a *attempt) do(ctx context.Context, prev *attempt) { type concurrencyChange struct {
value int
resultCallback func(error)
}
func (a *attempt) do(ctx context.Context, prev *attempt, setConcurrency <-chan concurrencyChange) {
pfss, err := a.planner.Plan(ctx) pfss, err := a.planner.Plan(ctx)
errTime := time.Now() errTime := time.Now()
defer a.l.Lock().Unlock() defer a.l.Lock().Unlock()
@ -354,8 +380,8 @@ func (a *attempt) do(ctx context.Context, prev *attempt) {
} }
// invariant: prevs contains an entry for each unambigious correspondence // invariant: prevs contains an entry for each unambigious correspondence
stepQueue := newStepQueue() stepQueue := newStepQueue(a.concurrency)
defer stepQueue.Start(1)() // TODO parallel replication defer stepQueue.Start()()
var fssesDone sync.WaitGroup var fssesDone sync.WaitGroup
for _, f := range a.fss { for _, f := range a.fss {
fssesDone.Add(1) fssesDone.Add(1)
@ -364,8 +390,27 @@ func (a *attempt) do(ctx context.Context, prev *attempt) {
f.do(ctx, stepQueue, prevs[f]) f.do(ctx, stepQueue, prevs[f])
}(f) }(f)
} }
changeConcurrencyDone := make(chan struct{})
go func() {
for {
select {
case change := <-setConcurrency:
err := stepQueue.SetConcurrency(change.value)
go change.resultCallback(err)
if err == nil {
a.l.Lock()
a.concurrency = change.value
a.l.Unlock()
}
case <-changeConcurrencyDone:
return
// not waiting for ctx.Done here, the main job are the fsses
}
}
}()
a.l.DropWhile(func() { a.l.DropWhile(func() {
fssesDone.Wait() fssesDone.Wait()
close(changeConcurrencyDone)
}) })
a.finishedAt = time.Now() a.finishedAt = time.Now()
} }
@ -461,7 +506,8 @@ func (fs *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
// lock must not be held while executing step in order for reporting to work // lock must not be held while executing step in order for reporting to work
fs.l.DropWhile(func() { fs.l.DropWhile(func() {
targetDate := s.step.TargetDate() targetDate := s.step.TargetDate()
defer pq.WaitReady(fs, targetDate)() ctx, done := pq.WaitReady(ctx, fs, targetDate)()
defer done()
err = s.step.Step(ctx) // no shadow err = s.step.Step(ctx) // no shadow
errTime = time.Now() // no shadow errTime = time.Now() // no shadow
}) })
@ -482,6 +528,7 @@ func (r *run) report() *report.Report {
WaitReconnectSince: r.waitReconnect.begin, WaitReconnectSince: r.waitReconnect.begin,
WaitReconnectUntil: r.waitReconnect.end, WaitReconnectUntil: r.waitReconnect.end,
WaitReconnectError: r.waitReconnectError.IntoReportError(), WaitReconnectError: r.waitReconnectError.IntoReportError(),
// Concurrency: r.concurrency,
} }
for i := range report.Attempts { for i := range report.Attempts {
report.Attempts[i] = r.attempts[i].report() report.Attempts[i] = r.attempts[i].report()

View File

@ -151,7 +151,7 @@ func TestReplication(t *testing.T) {
ctx := context.Background() ctx := context.Background()
mp := &mockPlanner{} mp := &mockPlanner{}
getReport, wait := Do(ctx, mp) getReport, wait, _ := Do(ctx, 1, mp)
begin := time.Now() begin := time.Now()
fireAt := []time.Duration{ fireAt := []time.Duration{
// the following values are relative to the start // the following values are relative to the start

View File

@ -2,6 +2,8 @@ package driver
import ( import (
"container/heap" "container/heap"
"fmt"
"sync"
"time" "time"
"github.com/zrepl/zrepl/util/chainlock" "github.com/zrepl/zrepl/util/chainlock"
@ -11,51 +13,88 @@ type stepQueueRec struct {
ident interface{} ident interface{}
targetDate time.Time targetDate time.Time
wakeup chan StepCompletedFunc wakeup chan StepCompletedFunc
cancelDueToConcurrencyDownsize interace{}
} }
type stepQueue struct { type stepQueue struct {
stop chan struct{} stop chan struct{}
reqs chan stepQueueRec reqs chan stepQueueRec
// l protects all members except the channels above
l *chainlock.L
pendingCond *sync.Cond
// ident => queueItem
pending *stepQueueHeap
active *stepQueueHeap
queueItems map[interface{}]*stepQueueHeapItem // for tracking used idents in both pending and active
// stopped is used for cancellation of "wake" goroutine
stopped bool
concurrency int
} }
type stepQueueHeapItem struct { type stepQueueHeapItem struct {
idx int idx int
req stepQueueRec req *stepQueueRec
}
type stepQueueHeap struct {
items []*stepQueueHeapItem
reverse bool // never change after pushing first element
} }
type stepQueueHeap []*stepQueueHeapItem
func (h stepQueueHeap) Less(i, j int) bool { func (h stepQueueHeap) Less(i, j int) bool {
return h[i].req.targetDate.Before(h[j].req.targetDate) res := h.items[i].req.targetDate.Before(h.items[j].req.targetDate)
if h.reverse {
return !res
}
return res
} }
func (h stepQueueHeap) Swap(i, j int) { func (h stepQueueHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i] h.items[i], h.items[j] = h.items[j], h.items[i]
h[i].idx = i h.items[i].idx = i
h[j].idx = j h.items[j].idx = j
} }
func (h stepQueueHeap) Len() int { func (h stepQueueHeap) Len() int {
return len(h) return len(h.items)
} }
func (h *stepQueueHeap) Push(elem interface{}) { func (h *stepQueueHeap) Push(elem interface{}) {
hitem := elem.(*stepQueueHeapItem) hitem := elem.(*stepQueueHeapItem)
hitem.idx = h.Len() hitem.idx = h.Len()
*h = append(*h, hitem) h.items = append(h.items, hitem)
} }
func (h *stepQueueHeap) Pop() interface{} { func (h *stepQueueHeap) Pop() interface{} {
elem := (*h)[h.Len()-1] elem := h.items[h.Len()-1]
elem.idx = -1 elem.idx = -1
*h = (*h)[:h.Len()-1] h.items = h.items[:h.Len()-1]
return elem return elem
} }
// returned stepQueue must be closed with method Close // returned stepQueue must be closed with method Close
func newStepQueue() *stepQueue { func newStepQueue(concurrency int) *stepQueue {
l := chainlock.New()
q := &stepQueue{ q := &stepQueue{
stop: make(chan struct{}), stop: make(chan struct{}),
reqs: make(chan stepQueueRec), reqs: make(chan stepQueueRec),
l: l,
pendingCond: l.NewCond(),
// priority queue
pending: &stepQueueHeap{reverse: false},
active: &stepQueueHeap{reverse: true},
// ident => queueItem
queueItems: make(map[interface{}]*stepQueueHeapItem),
// stopped is used for cancellation of "wake" goroutine
stopped: false,
}
err := q.setConcurrencyLocked(concurrency)
if err != nil {
panic(err)
} }
return q return q
} }
@ -65,25 +104,12 @@ func newStepQueue() *stepQueue {
// //
// No WaitReady calls must be active at the time done is called // No WaitReady calls must be active at the time done is called
// The behavior of calling WaitReady after done was called is undefined // The behavior of calling WaitReady after done was called is undefined
func (q *stepQueue) Start(concurrency int) (done func()) { func (q *stepQueue) Start() (done func()) {
if concurrency < 1 {
panic("concurrency must be >= 1")
}
// l protects pending and queueItems
l := chainlock.New()
pendingCond := l.NewCond()
// priority queue
pending := &stepQueueHeap{}
// ident => queueItem
queueItems := make(map[interface{}]*stepQueueHeapItem)
// stopped is used for cancellation of "wake" goroutine
stopped := false
active := 0
go func() { // "stopper" goroutine go func() { // "stopper" goroutine
<-q.stop <-q.stop
defer l.Lock().Unlock() defer q.l.Lock().Unlock()
stopped = true q.stopped = true
pendingCond.Broadcast() q.pendingCond.Broadcast()
}() }()
go func() { // "reqs" goroutine go func() { // "reqs" goroutine
for { for {
@ -97,41 +123,52 @@ func (q *stepQueue) Start(concurrency int) (done func()) {
} }
case req := <-q.reqs: case req := <-q.reqs:
func() { func() {
defer l.Lock().Unlock() defer q.l.Lock().Unlock()
if _, ok := queueItems[req.ident]; ok { if _, ok := q.queueItems[req.ident]; ok {
panic("WaitReady must not be called twice for the same ident") panic("WaitReady must not be called twice for the same ident")
} }
qitem := &stepQueueHeapItem{ qitem := &stepQueueHeapItem{
req: req, req: req,
} }
queueItems[req.ident] = qitem q.queueItems[req.ident] = qitem
heap.Push(pending, qitem) heap.Push(q.pending, qitem)
pendingCond.Broadcast() q.pendingCond.Broadcast()
}() }()
} }
} }
}() }()
go func() { // "wake" goroutine go func() { // "wake" goroutine
defer l.Lock().Unlock() defer q.l.Lock().Unlock()
for { for {
for !stopped && (active >= concurrency || pending.Len() == 0) { for !q.stopped && (q.active.Len() >= q.concurrency || q.pending.Len() == 0) {
pendingCond.Wait() q.pendingCond.Wait()
} }
if stopped { if q.stopped {
return return
} }
if pending.Len() <= 0 { if q.pending.Len() <= 0 {
return return
} }
active++
next := heap.Pop(pending).(*stepQueueHeapItem).req
delete(queueItems, next.ident)
next.wakeup <- func() { // pop from tracked items
defer l.Lock().Unlock() next := heap.Pop(q.pending).(*stepQueueHeapItem)
active--
pendingCond.Broadcast() next.req.cancelDueToConcurrencyDownsize =
heap.Push(q.active, next)
next.req.wakeup <- func() {
defer q.l.Lock().Unlock()
//
qitem := &stepQueueHeapItem{
req: req,
}
// delete(q.queueItems, next.req.ident) // def
q.pendingCond.Broadcast()
} }
} }
}() }()
@ -161,3 +198,24 @@ func (q *stepQueue) WaitReady(ident interface{}, targetDate time.Time) StepCompl
} }
return q.sendAndWaitForWakeup(ident, targetDate) return q.sendAndWaitForWakeup(ident, targetDate)
} }
// caller must hold lock
func (q *stepQueue) setConcurrencyLocked(newConcurrency int) error {
if !(newConcurrency >= 1) {
return fmt.Errorf("concurrency must be >= 1 but requested %v", newConcurrency)
}
q.concurrency = newConcurrency
q.pendingCond.Broadcast() // wake up waiters who could make progress
for q.active.Len() > q.concurrency {
item := heap.Pop(q.active).(*stepQueueHeapItem)
item.req.cancelDueToConcurrencyDownsize()
heap.Push(q.pending, item)
}
return nil
}
func (q *stepQueue) SetConcurrency(new int) error {
defer q.l.Lock().Unlock()
return q.setConcurrencyLocked(new)
}

View File

@ -17,7 +17,7 @@ import (
// (relies on scheduler responsivity of < 500ms) // (relies on scheduler responsivity of < 500ms)
func TestPqNotconcurrent(t *testing.T) { func TestPqNotconcurrent(t *testing.T) {
var ctr uint32 var ctr uint32
q := newStepQueue() q := newStepQueue(1)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(4) wg.Add(4)
go func() { go func() {
@ -29,7 +29,7 @@ func TestPqNotconcurrent(t *testing.T) {
}() }()
// give goroutine "1" 500ms to enter queue, get the active slot and enter time.Sleep // give goroutine "1" 500ms to enter queue, get the active slot and enter time.Sleep
defer q.Start(1)() defer q.Start()()
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
// while "1" is still running, queue in "2", "3" and "4" // while "1" is still running, queue in "2", "3" and "4"
@ -78,7 +78,8 @@ func (r record) String() string {
// times for each step are close together. // times for each step are close together.
func TestPqConcurrent(t *testing.T) { func TestPqConcurrent(t *testing.T) {
q := newStepQueue() concurrency := 5
q := newStepQueue(concurrency)
var wg sync.WaitGroup var wg sync.WaitGroup
filesystems := 100 filesystems := 100
stepsPerFS := 20 stepsPerFS := 20
@ -104,8 +105,7 @@ func TestPqConcurrent(t *testing.T) {
records <- recs records <- recs
}(fs) }(fs)
} }
concurrency := 5 defer q.Start()()
defer q.Start(concurrency)()
wg.Wait() wg.Wait()
close(records) close(records)
t.Logf("loop done") t.Logf("loop done")

View File

@ -8,6 +8,6 @@ import (
"github.com/zrepl/zrepl/replication/driver" "github.com/zrepl/zrepl/replication/driver"
) )
func Do(ctx context.Context, planner driver.Planner) (driver.ReportFunc, driver.WaitFunc) { func Do(ctx context.Context, initialConcurrency int, planner driver.Planner) (driver.ReportFunc, driver.WaitFunc, driver.SetConcurrencyFunc) {
return driver.Do(ctx, planner) return driver.Do(ctx, initialConcurrency, planner)
} }

View File

@ -10,6 +10,7 @@ type Report struct {
WaitReconnectSince, WaitReconnectUntil time.Time WaitReconnectSince, WaitReconnectUntil time.Time
WaitReconnectError *TimedError WaitReconnectError *TimedError
Attempts []*AttemptReport Attempts []*AttemptReport
Concurrency int
} }
var _, _ = json.Marshal(&Report{}) var _, _ = json.Marshal(&Report{})