batch size in loopers (#953)

This commit is contained in:
Michael Quigley 2025-04-30 14:47:13 -04:00
parent 921ded926b
commit 3483e18d4b
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
3 changed files with 30 additions and 1 deletions

View File

@ -16,6 +16,9 @@ type LooperOptions struct {
MaxDwell time.Duration
MinPacing time.Duration
MaxPacing time.Duration
BatchSize uint
MinBatchPacing time.Duration
MaxBatchPacing time.Duration
TargetName string
BindAddress string
SnapshotQueue chan *Snapshot

View File

@ -168,6 +168,16 @@ func (l *PublicHttpLooper) iterate() {
defer func() { l.results.StopTime = time.Now() }()
for i := uint(0); i < l.opt.Iterations && !l.abort; i++ {
if i > 0 && l.opt.BatchSize > 0 && i%l.opt.BatchSize == 0 {
batchPacingMs := l.opt.MaxBatchPacing.Milliseconds()
batchPacingDelta := l.opt.MaxBatchPacing.Milliseconds() - l.opt.MinBatchPacing.Milliseconds()
if batchPacingDelta > 0 {
batchPacingMs = (rand.Int63() % batchPacingDelta) + l.opt.MinBatchPacing.Milliseconds()
}
logrus.Debug("sleeping %d ms for batch pacing", batchPacingMs)
time.Sleep(time.Duration(batchPacingMs) * time.Millisecond)
}
snapshot := NewSnapshot("public-proxy", l.id, uint64(i))
if i > 0 && i%l.opt.StatusInterval == 0 {
@ -227,8 +237,8 @@ func (l *PublicHttpLooper) iterate() {
pacingDelta := l.opt.MaxPacing.Milliseconds() - l.opt.MinPacing.Milliseconds()
if pacingDelta > 0 {
pacingMs = (rand.Int63() % pacingDelta) + l.opt.MinPacing.Milliseconds()
time.Sleep(time.Duration(pacingMs) * time.Millisecond)
}
time.Sleep(time.Duration(pacingMs) * time.Millisecond)
l.results.Loops++
}

View File

@ -31,6 +31,10 @@ type testCanaryPublicProxy struct {
maxDwell time.Duration
minPacing time.Duration
maxPacing time.Duration
batchSize uint
batchPacing time.Duration
minBatchPacing time.Duration
maxBatchPacing time.Duration
frontendSelection string
canaryConfig string
}
@ -55,6 +59,10 @@ func newTestCanaryPublicProxy() *testCanaryPublicProxy {
cmd.Flags().DurationVar(&command.maxDwell, "max-dwell", 1*time.Second, "Maximum dwell time")
cmd.Flags().DurationVar(&command.minPacing, "min-pacing", 0, "Minimum pacing time")
cmd.Flags().DurationVar(&command.maxPacing, "max-pacing", 0, "Maximum pacing time")
cmd.Flags().UintVar(&command.batchSize, "batch-size", 0, "Iterate in batches of this size")
cmd.Flags().DurationVar(&command.batchPacing, "batch-pacing", 0, "Fixed batch pacing time")
cmd.Flags().DurationVar(&command.minBatchPacing, "min-batch-pacing", 0, "Minimum batch pacing time")
cmd.Flags().DurationVar(&command.maxBatchPacing, "max-batch-pacing", 0, "Maximum batch pacing time")
cmd.Flags().StringVar(&command.frontendSelection, "frontend-selection", "public", "Select frontend selection")
cmd.Flags().StringVar(&command.canaryConfig, "canary-config", "", "Path to canary configuration file")
return command
@ -105,6 +113,14 @@ func (cmd *testCanaryPublicProxy) run(_ *cobra.Command, _ []string) {
MaxDwell: cmd.maxDwell,
MinPacing: cmd.minPacing,
MaxPacing: cmd.maxPacing,
BatchSize: cmd.batchSize,
}
if cmd.batchPacing > 0 {
looperOpts.MinBatchPacing = cmd.batchPacing
looperOpts.MaxBatchPacing = cmd.batchPacing
} else {
looperOpts.MinBatchPacing = cmd.minBatchPacing
looperOpts.MaxBatchPacing = cmd.maxBatchPacing
}
if sc != nil {
looperOpts.SnapshotQueue = sc.InputQueue