From 3483e18d4bdfb4207dd30ddf0a6b0beac570428f Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Wed, 30 Apr 2025 14:47:13 -0400 Subject: [PATCH] batch size in loopers (#953) --- canary/looper.go | 3 +++ canary/publicHttpLooper.go | 12 +++++++++++- cmd/zrok/testCanaryPublicProxy.go | 16 ++++++++++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/canary/looper.go b/canary/looper.go index b23cdff9..013dbf20 100644 --- a/canary/looper.go +++ b/canary/looper.go @@ -16,6 +16,9 @@ type LooperOptions struct { MaxDwell time.Duration MinPacing time.Duration MaxPacing time.Duration + BatchSize uint + MinBatchPacing time.Duration + MaxBatchPacing time.Duration TargetName string BindAddress string SnapshotQueue chan *Snapshot diff --git a/canary/publicHttpLooper.go b/canary/publicHttpLooper.go index 0e0f6603..0200569c 100644 --- a/canary/publicHttpLooper.go +++ b/canary/publicHttpLooper.go @@ -168,6 +168,16 @@ func (l *PublicHttpLooper) iterate() { defer func() { l.results.StopTime = time.Now() }() for i := uint(0); i < l.opt.Iterations && !l.abort; i++ { + if i > 0 && l.opt.BatchSize > 0 && i%l.opt.BatchSize == 0 { + batchPacingMs := l.opt.MaxBatchPacing.Milliseconds() + batchPacingDelta := l.opt.MaxBatchPacing.Milliseconds() - l.opt.MinBatchPacing.Milliseconds() + if batchPacingDelta > 0 { + batchPacingMs = (rand.Int63() % batchPacingDelta) + l.opt.MinBatchPacing.Milliseconds() + } + logrus.Debug("sleeping %d ms for batch pacing", batchPacingMs) + time.Sleep(time.Duration(batchPacingMs) * time.Millisecond) + } + snapshot := NewSnapshot("public-proxy", l.id, uint64(i)) if i > 0 && i%l.opt.StatusInterval == 0 { @@ -227,8 +237,8 @@ func (l *PublicHttpLooper) iterate() { pacingDelta := l.opt.MaxPacing.Milliseconds() - l.opt.MinPacing.Milliseconds() if pacingDelta > 0 { pacingMs = (rand.Int63() % pacingDelta) + l.opt.MinPacing.Milliseconds() - time.Sleep(time.Duration(pacingMs) * time.Millisecond) } + time.Sleep(time.Duration(pacingMs) * time.Millisecond) l.results.Loops++ } diff --git a/cmd/zrok/testCanaryPublicProxy.go b/cmd/zrok/testCanaryPublicProxy.go index 190b5222..d8e60c1c 100644 --- a/cmd/zrok/testCanaryPublicProxy.go +++ b/cmd/zrok/testCanaryPublicProxy.go @@ -31,6 +31,10 @@ type testCanaryPublicProxy struct { maxDwell time.Duration minPacing time.Duration maxPacing time.Duration + batchSize uint + batchPacing time.Duration + minBatchPacing time.Duration + maxBatchPacing time.Duration frontendSelection string canaryConfig string } @@ -55,6 +59,10 @@ func newTestCanaryPublicProxy() *testCanaryPublicProxy { cmd.Flags().DurationVar(&command.maxDwell, "max-dwell", 1*time.Second, "Maximum dwell time") cmd.Flags().DurationVar(&command.minPacing, "min-pacing", 0, "Minimum pacing time") cmd.Flags().DurationVar(&command.maxPacing, "max-pacing", 0, "Maximum pacing time") + cmd.Flags().UintVar(&command.batchSize, "batch-size", 0, "Iterate in batches of this size") + cmd.Flags().DurationVar(&command.batchPacing, "batch-pacing", 0, "Fixed batch pacing time") + cmd.Flags().DurationVar(&command.minBatchPacing, "min-batch-pacing", 0, "Minimum batch pacing time") + cmd.Flags().DurationVar(&command.maxBatchPacing, "max-batch-pacing", 0, "Maximum batch pacing time") cmd.Flags().StringVar(&command.frontendSelection, "frontend-selection", "public", "Select frontend selection") cmd.Flags().StringVar(&command.canaryConfig, "canary-config", "", "Path to canary configuration file") return command @@ -105,6 +113,14 @@ func (cmd *testCanaryPublicProxy) run(_ *cobra.Command, _ []string) { MaxDwell: cmd.maxDwell, MinPacing: cmd.minPacing, MaxPacing: cmd.maxPacing, + BatchSize: cmd.batchSize, + } + if cmd.batchPacing > 0 { + looperOpts.MinBatchPacing = cmd.batchPacing + looperOpts.MaxBatchPacing = cmd.batchPacing + } else { + looperOpts.MinBatchPacing = cmd.minBatchPacing + looperOpts.MaxBatchPacing = cmd.maxBatchPacing } if sc != nil { looperOpts.SnapshotQueue = sc.InputQueue