diff --git a/client/signal.go b/client/signal.go index 472c7ed..f51d793 100644 --- a/client/signal.go +++ b/client/signal.go @@ -9,7 +9,7 @@ import ( ) var SignalCmd = &cli.Subcommand{ - Use: "signal [wakeup|reset] JOB", + Use: "signal [wakeup|reset] JOB [DATA]", Short: "wake up a job from wait state or abort its current invocation", Run: func(subcommand *cli.Subcommand, args []string) error { return runSignalCmd(subcommand.Config(), args) @@ -17,8 +17,13 @@ var SignalCmd = &cli.Subcommand{ } func runSignalCmd(config *config.Config, args []string) error { - if len(args) != 2 { - return errors.Errorf("Expected 2 arguments: [wakeup|reset] JOB") + if len(args) < 2 || len(args) > 3 { + return errors.Errorf("Expected 2 arguments: [wakeup|reset|set-concurrency] JOB [DATA]") + } + + var data string + if len(args) == 3 { + data = args[2] } httpc, err := controlHttpClient(config.Global.Control.SockPath) @@ -30,9 +35,11 @@ func runSignalCmd(config *config.Config, args []string) error { struct { Name string Op string + Data string }{ Name: args[1], Op: args[0], + Data: data, }, struct{}{}, ) diff --git a/daemon/control.go b/daemon/control.go index 5cb498d..04ce7fd 100644 --- a/daemon/control.go +++ b/daemon/control.go @@ -8,6 +8,7 @@ import ( "io" "net" "net/http" + "strconv" "time" "github.com/pkg/errors" @@ -47,6 +48,8 @@ func (j *controlJob) OwnedDatasetSubtreeRoot() (p *zfs.DatasetPath, ok bool) { r func (j *controlJob) SenderConfig() *endpoint.SenderConfig { return nil } +func (j *controlJob) SetConcurrency(concurrency int) error { return errors.Errorf("not supported") } + var promControl struct { requestBegin *prometheus.CounterVec requestFinished *prometheus.HistogramVec @@ -126,6 +129,7 @@ func (j *controlJob) Run(ctx context.Context) { type reqT struct { Name string Op string + Data string } var req reqT if decoder(&req) != nil { @@ -138,6 +142,14 @@ func (j *controlJob) Run(ctx context.Context) { err = j.jobs.wakeup(req.Name) case "reset": err = j.jobs.reset(req.Name) + case "set-concurrency": + var concurrency int + concurrency, err = strconv.Atoi(req.Data) // shadow + if err != nil { + // fallthrough outer + } else { + err = j.jobs.setConcurrency(req.Name, concurrency) + } default: err = fmt.Errorf("operation %q is invalid", req.Op) } diff --git a/daemon/daemon.go b/daemon/daemon.go index 265ddc4..246eb99 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -176,6 +176,18 @@ func (s *jobs) reset(job string) error { return wu() } +func (s *jobs) setConcurrency(jobName string, concurrency int) error { + s.m.RLock() + defer s.m.RUnlock() + + job, ok := s.jobs[jobName] + if !ok { + return errors.Errorf("Job %q does not exist", job) + } + + return job.SetConcurrency(concurrency) +} + const ( jobNamePrometheus = "_prometheus" jobNameControl = "_control" diff --git a/daemon/job/active.go b/daemon/job/active.go index 4a784df..9994a0a 100644 --- a/daemon/job/active.go +++ b/daemon/job/active.go @@ -55,9 +55,12 @@ const ( type activeSideTasks struct { state ActiveSideState + concurrency int + // valid for state ActiveSideReplicating, ActiveSidePruneSender, ActiveSidePruneReceiver, ActiveSideDone - replicationReport driver.ReportFunc - replicationCancel context.CancelFunc + replicationReport driver.ReportFunc + replicationCancel context.CancelFunc + replicationSetConcurrency driver.SetConcurrencyFunc // valid for state ActiveSidePruneSender, ActiveSidePruneReceiver, ActiveSideDone prunerSender, prunerReceiver *pruner.Pruner @@ -278,6 +281,8 @@ func activeSide(g *config.Global, in *config.ActiveJob, configJob interface{}) ( return nil, errors.Wrap(err, "invalid job name") } + j.tasks.concurrency = 1 // FIXME + switch v := configJob.(type) { case *config.PushJob: j.mode, err = modePushFromConfig(g, v, j.name) // shadow @@ -375,6 +380,22 @@ func (j *ActiveSide) SenderConfig() *endpoint.SenderConfig { return push.senderConfig } +func (j *ActiveSide) SetConcurrency(concurrency int) (err error) { + j.updateTasks(func(tasks *activeSideTasks) { + if tasks.replicationSetConcurrency != nil { + err = tasks.replicationSetConcurrency(concurrency) // no shadow + if err == nil { + tasks.concurrency = concurrency + } + } else { + // FIXME this is not great, should always be able to set it + err = errors.Errorf("cannot set while not replicating") + } + + }) + return err +} + func (j *ActiveSide) Run(ctx context.Context) { log := GetLogger(ctx) ctx = logging.WithSubsystemLoggers(ctx, log) @@ -436,11 +457,14 @@ func (j *ActiveSide) do(ctx context.Context) { ctx, repCancel := context.WithCancel(ctx) var repWait driver.WaitFunc j.updateTasks(func(tasks *activeSideTasks) { - // reset it - *tasks = activeSideTasks{} + // reset it (almost) + old := *tasks + *tasks = activeSideTasks{ + concurrency: old.concurrency, + } tasks.replicationCancel = repCancel - tasks.replicationReport, repWait = replication.Do( - ctx, logic.NewPlanner(j.promRepStateSecs, j.promBytesReplicated, sender, receiver, j.mode.PlannerPolicy()), + tasks.replicationReport, repWait, tasks.replicationSetConcurrency = replication.Do( + ctx, tasks.concurrency, logic.NewPlanner(j.promRepStateSecs, j.promBytesReplicated, sender, receiver, j.mode.PlannerPolicy()), ) tasks.state = ActiveSideReplicating }) diff --git a/daemon/job/job.go b/daemon/job/job.go index cba89e1..c133090 100644 --- a/daemon/job/job.go +++ b/daemon/job/job.go @@ -40,6 +40,7 @@ type Job interface { // must return the root of that subtree as rfs and ok = true OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) SenderConfig() *endpoint.SenderConfig + SetConcurrency(concurrency int) error } type Type string diff --git a/daemon/job/passive.go b/daemon/job/passive.go index ce3bafe..5189e0e 100644 --- a/daemon/job/passive.go +++ b/daemon/job/passive.go @@ -163,6 +163,8 @@ func (j *PassiveSide) SenderConfig() *endpoint.SenderConfig { func (*PassiveSide) RegisterMetrics(registerer prometheus.Registerer) {} +func (*PassiveSide) SetConcurrency(concurrency int) error { return errors.Errorf("not supported") } + func (j *PassiveSide) Run(ctx context.Context) { log := GetLogger(ctx) diff --git a/daemon/job/snapjob.go b/daemon/job/snapjob.go index ef554a4..222a5e3 100644 --- a/daemon/job/snapjob.go +++ b/daemon/job/snapjob.go @@ -117,6 +117,8 @@ outer: } } +func (*SnapJob) SetConcurrency(concurrency int) error { return errors.Errorf("not supported") } + // Adaptor that implements pruner.History around a pruner.Target. // The ReplicationCursor method is Get-op only and always returns // the filesystem's most recent version's GUID. diff --git a/daemon/prometheus.go b/daemon/prometheus.go index 7e43e8a..c159067 100644 --- a/daemon/prometheus.go +++ b/daemon/prometheus.go @@ -5,6 +5,7 @@ import ( "net" "net/http" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -51,6 +52,8 @@ func (j *prometheusJob) OwnedDatasetSubtreeRoot() (p *zfs.DatasetPath, ok bool) func (j *prometheusJob) SenderConfig() *endpoint.SenderConfig { return nil } +func (j *prometheusJob) SetConcurrency(concurrency int) error { return errors.Errorf("not supported") } + func (j *prometheusJob) RegisterMetrics(registerer prometheus.Registerer) {} func (j *prometheusJob) Run(ctx context.Context) { diff --git a/replication/driver/replication_driver.go b/replication/driver/replication_driver.go index a9c2ce7..8d2b314 100644 --- a/replication/driver/replication_driver.go +++ b/replication/driver/replication_driver.go @@ -89,6 +89,8 @@ type attempt struct { // if both are nil, it must be assumed that Planner.Plan is active planErr *timedError fss []*fs + + concurrency int } type timedError struct { @@ -170,11 +172,12 @@ type step struct { type ReportFunc func() *report.Report type WaitFunc func(block bool) (done bool) +type SetConcurrencyFunc func(concurrency int) error var maxAttempts = envconst.Int64("ZREPL_REPLICATION_MAX_ATTEMPTS", 3) var reconnectHardFailTimeout = envconst.Duration("ZREPL_REPLICATION_RECONNECT_HARD_FAIL_TIMEOUT", 10*time.Minute) -func Do(ctx context.Context, planner Planner) (ReportFunc, WaitFunc) { +func Do(ctx context.Context, initialConcurrency int, planner Planner) (ReportFunc, WaitFunc, SetConcurrencyFunc) { log := getLog(ctx) l := chainlock.New() run := &run{ @@ -182,6 +185,8 @@ func Do(ctx context.Context, planner Planner) (ReportFunc, WaitFunc) { startedAt: time.Now(), } + concurrencyChanges := make(chan concurrencyChange) + done := make(chan struct{}) go func() { defer close(done) @@ -198,15 +203,21 @@ func Do(ctx context.Context, planner Planner) (ReportFunc, WaitFunc) { run.waitReconnect.SetZero() run.waitReconnectError = nil + prevConcurrency := initialConcurrency // FIXME default concurrency + if prev != nil { + prevConcurrency = prev.concurrency + } + // do current attempt cur := &attempt{ - l: l, - startedAt: time.Now(), - planner: planner, + l: l, + startedAt: time.Now(), + planner: planner, + concurrency: prevConcurrency, } run.attempts = append(run.attempts, cur) run.l.DropWhile(func() { - cur.do(ctx, prev) + cur.do(ctx, prev, concurrencyChanges) }) prev = cur if ctx.Err() != nil { @@ -277,10 +288,25 @@ func Do(ctx context.Context, planner Planner) (ReportFunc, WaitFunc) { defer run.l.Lock().Unlock() return run.report() } - return report, wait + setConcurrency := func(concurrency int) (reterr error) { + var wg sync.WaitGroup + wg.Add(1) + concurrencyChanges <- concurrencyChange{ concurrency, func(err error) { + defer wg.Done() + reterr = err // shadow + }} + wg.Wait() + return reterr + } + return report, wait, setConcurrency } -func (a *attempt) do(ctx context.Context, prev *attempt) { +type concurrencyChange struct { + value int + resultCallback func(error) +} + +func (a *attempt) do(ctx context.Context, prev *attempt, setConcurrency <-chan concurrencyChange) { pfss, err := a.planner.Plan(ctx) errTime := time.Now() defer a.l.Lock().Unlock() @@ -354,8 +380,8 @@ func (a *attempt) do(ctx context.Context, prev *attempt) { } // invariant: prevs contains an entry for each unambigious correspondence - stepQueue := newStepQueue() - defer stepQueue.Start(1)() // TODO parallel replication + stepQueue := newStepQueue(a.concurrency) + defer stepQueue.Start()() var fssesDone sync.WaitGroup for _, f := range a.fss { fssesDone.Add(1) @@ -364,8 +390,27 @@ func (a *attempt) do(ctx context.Context, prev *attempt) { f.do(ctx, stepQueue, prevs[f]) }(f) } + changeConcurrencyDone := make(chan struct{}) + go func() { + for { + select { + case change := <-setConcurrency: + err := stepQueue.SetConcurrency(change.value) + go change.resultCallback(err) + if err == nil { + a.l.Lock() + a.concurrency = change.value + a.l.Unlock() + } + case <-changeConcurrencyDone: + return + // not waiting for ctx.Done here, the main job are the fsses + } + } + }() a.l.DropWhile(func() { fssesDone.Wait() + close(changeConcurrencyDone) }) a.finishedAt = time.Now() } @@ -461,7 +506,8 @@ func (fs *fs) do(ctx context.Context, pq *stepQueue, prev *fs) { // lock must not be held while executing step in order for reporting to work fs.l.DropWhile(func() { targetDate := s.step.TargetDate() - defer pq.WaitReady(fs, targetDate)() + ctx, done := pq.WaitReady(ctx, fs, targetDate)() + defer done() err = s.step.Step(ctx) // no shadow errTime = time.Now() // no shadow }) @@ -482,6 +528,7 @@ func (r *run) report() *report.Report { WaitReconnectSince: r.waitReconnect.begin, WaitReconnectUntil: r.waitReconnect.end, WaitReconnectError: r.waitReconnectError.IntoReportError(), + // Concurrency: r.concurrency, } for i := range report.Attempts { report.Attempts[i] = r.attempts[i].report() diff --git a/replication/driver/replication_driver_test.go b/replication/driver/replication_driver_test.go index abf39f1..3250a5c 100644 --- a/replication/driver/replication_driver_test.go +++ b/replication/driver/replication_driver_test.go @@ -151,7 +151,7 @@ func TestReplication(t *testing.T) { ctx := context.Background() mp := &mockPlanner{} - getReport, wait := Do(ctx, mp) + getReport, wait, _ := Do(ctx, 1, mp) begin := time.Now() fireAt := []time.Duration{ // the following values are relative to the start diff --git a/replication/driver/replication_stepqueue.go b/replication/driver/replication_stepqueue.go index a6486c0..31ab068 100644 --- a/replication/driver/replication_stepqueue.go +++ b/replication/driver/replication_stepqueue.go @@ -2,6 +2,8 @@ package driver import ( "container/heap" + "fmt" + "sync" "time" "github.com/zrepl/zrepl/util/chainlock" @@ -11,51 +13,88 @@ type stepQueueRec struct { ident interface{} targetDate time.Time wakeup chan StepCompletedFunc + cancelDueToConcurrencyDownsize interace{} } type stepQueue struct { stop chan struct{} reqs chan stepQueueRec + + // l protects all members except the channels above + + l *chainlock.L + pendingCond *sync.Cond + + // ident => queueItem + pending *stepQueueHeap + active *stepQueueHeap + queueItems map[interface{}]*stepQueueHeapItem // for tracking used idents in both pending and active + + // stopped is used for cancellation of "wake" goroutine + stopped bool + + concurrency int } type stepQueueHeapItem struct { idx int - req stepQueueRec + req *stepQueueRec +} +type stepQueueHeap struct { + items []*stepQueueHeapItem + reverse bool // never change after pushing first element } -type stepQueueHeap []*stepQueueHeapItem func (h stepQueueHeap) Less(i, j int) bool { - return h[i].req.targetDate.Before(h[j].req.targetDate) + res := h.items[i].req.targetDate.Before(h.items[j].req.targetDate) + if h.reverse { + return !res + } + return res } func (h stepQueueHeap) Swap(i, j int) { - h[i], h[j] = h[j], h[i] - h[i].idx = i - h[j].idx = j + h.items[i], h.items[j] = h.items[j], h.items[i] + h.items[i].idx = i + h.items[j].idx = j } func (h stepQueueHeap) Len() int { - return len(h) + return len(h.items) } func (h *stepQueueHeap) Push(elem interface{}) { hitem := elem.(*stepQueueHeapItem) hitem.idx = h.Len() - *h = append(*h, hitem) + h.items = append(h.items, hitem) } func (h *stepQueueHeap) Pop() interface{} { - elem := (*h)[h.Len()-1] + elem := h.items[h.Len()-1] elem.idx = -1 - *h = (*h)[:h.Len()-1] + h.items = h.items[:h.Len()-1] return elem } // returned stepQueue must be closed with method Close -func newStepQueue() *stepQueue { +func newStepQueue(concurrency int) *stepQueue { + l := chainlock.New() q := &stepQueue{ - stop: make(chan struct{}), - reqs: make(chan stepQueueRec), + stop: make(chan struct{}), + reqs: make(chan stepQueueRec), + l: l, + pendingCond: l.NewCond(), + // priority queue + pending: &stepQueueHeap{reverse: false}, + active: &stepQueueHeap{reverse: true}, + // ident => queueItem + queueItems: make(map[interface{}]*stepQueueHeapItem), + // stopped is used for cancellation of "wake" goroutine + stopped: false, + } + err := q.setConcurrencyLocked(concurrency) + if err != nil { + panic(err) } return q } @@ -65,25 +104,12 @@ func newStepQueue() *stepQueue { // // No WaitReady calls must be active at the time done is called // The behavior of calling WaitReady after done was called is undefined -func (q *stepQueue) Start(concurrency int) (done func()) { - if concurrency < 1 { - panic("concurrency must be >= 1") - } - // l protects pending and queueItems - l := chainlock.New() - pendingCond := l.NewCond() - // priority queue - pending := &stepQueueHeap{} - // ident => queueItem - queueItems := make(map[interface{}]*stepQueueHeapItem) - // stopped is used for cancellation of "wake" goroutine - stopped := false - active := 0 +func (q *stepQueue) Start() (done func()) { go func() { // "stopper" goroutine <-q.stop - defer l.Lock().Unlock() - stopped = true - pendingCond.Broadcast() + defer q.l.Lock().Unlock() + q.stopped = true + q.pendingCond.Broadcast() }() go func() { // "reqs" goroutine for { @@ -97,41 +123,52 @@ func (q *stepQueue) Start(concurrency int) (done func()) { } case req := <-q.reqs: func() { - defer l.Lock().Unlock() - if _, ok := queueItems[req.ident]; ok { + defer q.l.Lock().Unlock() + if _, ok := q.queueItems[req.ident]; ok { panic("WaitReady must not be called twice for the same ident") } qitem := &stepQueueHeapItem{ req: req, } - queueItems[req.ident] = qitem - heap.Push(pending, qitem) - pendingCond.Broadcast() + q.queueItems[req.ident] = qitem + heap.Push(q.pending, qitem) + q.pendingCond.Broadcast() }() } } }() go func() { // "wake" goroutine - defer l.Lock().Unlock() + defer q.l.Lock().Unlock() for { - for !stopped && (active >= concurrency || pending.Len() == 0) { - pendingCond.Wait() + for !q.stopped && (q.active.Len() >= q.concurrency || q.pending.Len() == 0) { + q.pendingCond.Wait() } - if stopped { + if q.stopped { return } - if pending.Len() <= 0 { + if q.pending.Len() <= 0 { return } - active++ - next := heap.Pop(pending).(*stepQueueHeapItem).req - delete(queueItems, next.ident) - next.wakeup <- func() { - defer l.Lock().Unlock() - active-- - pendingCond.Broadcast() + // pop from tracked items + next := heap.Pop(q.pending).(*stepQueueHeapItem) + + next.req.cancelDueToConcurrencyDownsize = + + heap.Push(q.active, next) + + next.req.wakeup <- func() { + defer q.l.Lock().Unlock() + + // + qitem := &stepQueueHeapItem{ + req: req, + } + + // delete(q.queueItems, next.req.ident) // def + + q.pendingCond.Broadcast() } } }() @@ -161,3 +198,24 @@ func (q *stepQueue) WaitReady(ident interface{}, targetDate time.Time) StepCompl } return q.sendAndWaitForWakeup(ident, targetDate) } + +// caller must hold lock +func (q *stepQueue) setConcurrencyLocked(newConcurrency int) error { + if !(newConcurrency >= 1) { + return fmt.Errorf("concurrency must be >= 1 but requested %v", newConcurrency) + } + q.concurrency = newConcurrency + q.pendingCond.Broadcast() // wake up waiters who could make progress + + for q.active.Len() > q.concurrency { + item := heap.Pop(q.active).(*stepQueueHeapItem) + item.req.cancelDueToConcurrencyDownsize() + heap.Push(q.pending, item) + } + return nil +} + +func (q *stepQueue) SetConcurrency(new int) error { + defer q.l.Lock().Unlock() + return q.setConcurrencyLocked(new) +} diff --git a/replication/driver/replication_stepqueue_test.go b/replication/driver/replication_stepqueue_test.go index 73f877a..c00bc59 100644 --- a/replication/driver/replication_stepqueue_test.go +++ b/replication/driver/replication_stepqueue_test.go @@ -17,7 +17,7 @@ import ( // (relies on scheduler responsivity of < 500ms) func TestPqNotconcurrent(t *testing.T) { var ctr uint32 - q := newStepQueue() + q := newStepQueue(1) var wg sync.WaitGroup wg.Add(4) go func() { @@ -29,7 +29,7 @@ func TestPqNotconcurrent(t *testing.T) { }() // give goroutine "1" 500ms to enter queue, get the active slot and enter time.Sleep - defer q.Start(1)() + defer q.Start()() time.Sleep(500 * time.Millisecond) // while "1" is still running, queue in "2", "3" and "4" @@ -77,8 +77,9 @@ func (r record) String() string { // Hence, perform some statistics on the wakeup times and assert that the mean wakeup // times for each step are close together. func TestPqConcurrent(t *testing.T) { - - q := newStepQueue() + + concurrency := 5 + q := newStepQueue(concurrency) var wg sync.WaitGroup filesystems := 100 stepsPerFS := 20 @@ -104,8 +105,7 @@ func TestPqConcurrent(t *testing.T) { records <- recs }(fs) } - concurrency := 5 - defer q.Start(concurrency)() + defer q.Start()() wg.Wait() close(records) t.Logf("loop done") diff --git a/replication/replication.go b/replication/replication.go index 7f9e35b..bd9a7f2 100644 --- a/replication/replication.go +++ b/replication/replication.go @@ -8,6 +8,6 @@ import ( "github.com/zrepl/zrepl/replication/driver" ) -func Do(ctx context.Context, planner driver.Planner) (driver.ReportFunc, driver.WaitFunc) { - return driver.Do(ctx, planner) +func Do(ctx context.Context, initialConcurrency int, planner driver.Planner) (driver.ReportFunc, driver.WaitFunc, driver.SetConcurrencyFunc) { + return driver.Do(ctx, initialConcurrency, planner) } diff --git a/replication/report/replication_report.go b/replication/report/replication_report.go index 5a64a3a..3643bc2 100644 --- a/replication/report/replication_report.go +++ b/replication/report/replication_report.go @@ -10,6 +10,7 @@ type Report struct { WaitReconnectSince, WaitReconnectUntil time.Time WaitReconnectError *TimedError Attempts []*AttemptReport + Concurrency int } var _, _ = json.Marshal(&Report{})