From 3225501d00bc4272f13ec0e28691029077706c0b Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Wed, 30 Apr 2025 16:01:51 -0400 Subject: [PATCH] better canary.Snapshot API improving code readability; public-proxy and private-proxy code paths converged (#953) --- canary/privateHttpLooper.go | 31 ++++++++++++ canary/publicHttpLooper.go | 54 +++++++-------------- canary/snapshot.go | 22 +++++++++ cmd/zrok/testCanaryPrivateProxy.go | 77 ++++++++++++++++++++++++++++-- cmd/zrok/testCanaryPublicProxy.go | 63 +++++++++++++----------- 5 files changed, 176 insertions(+), 71 deletions(-) diff --git a/canary/privateHttpLooper.go b/canary/privateHttpLooper.go index 775eed53..2bdc3662 100644 --- a/canary/privateHttpLooper.go +++ b/canary/privateHttpLooper.go @@ -80,28 +80,37 @@ func (l *PrivateHttpLooper) startup() error { if l.opt.TargetName != "" { target = l.opt.TargetName } + + snapshotCreateShare := NewSnapshot("create-share", l.id, 0) shr, err := sdk.CreateShare(l.root, &sdk.ShareRequest{ ShareMode: sdk.PrivateShareMode, BackendMode: sdk.ProxyBackendMode, Target: target, PermissionMode: sdk.ClosedPermissionMode, }) + snapshotCreateShare.Complete() if err != nil { + snapshotCreateShare.Failed(err).Send(l.opt.SnapshotQueue) return err } + snapshotCreateShare.Success().Send(l.opt.SnapshotQueue) l.shr = shr bindAddress := "" if l.opt.BindAddress != "" { bindAddress = l.opt.BindAddress } + snapshotCreateAccess := NewSnapshot("create-access", l.id, 0) acc, err := sdk.CreateAccess(l.root, &sdk.AccessRequest{ ShareToken: shr.Token, BindAddress: bindAddress, }) + snapshotCreateAccess.Complete() if err != nil { + snapshotCreateAccess.Failed(err).Send(l.opt.SnapshotQueue) return err } + snapshotCreateAccess.Success().Send(l.opt.SnapshotQueue) l.acc = acc logrus.Infof("#%d allocated share '%v', allocated frontend '%v'", l.id, shr.Token, acc.Token) @@ -127,9 +136,12 @@ func (l *PrivateHttpLooper) bind() error { return errors.Wrapf(err, "#%d error creating ziti context", l.id) } + snapshotListen := NewSnapshot("listen", l.id, 0) if l.listener, err = zctx.ListenWithOptions(l.shr.Token, &options); err != nil { + snapshotListen.Complete().Failed(err).Send(l.opt.SnapshotQueue) return errors.Wrapf(err, "#%d error binding listener", l.id) } + snapshotListen.Complete().Success().Send(l.opt.SnapshotQueue) go func() { if err := http.Serve(l.listener, l); err != nil { @@ -168,6 +180,18 @@ func (l *PrivateHttpLooper) iterate() { defer func() { l.results.StopTime = time.Now() }() for i := uint(0); i < l.opt.Iterations && !l.abort; i++ { + if i > 0 && l.opt.BatchSize > 0 && i%l.opt.BatchSize == 0 { + batchPacingMs := l.opt.MaxBatchPacing.Milliseconds() + batchPacingDelta := l.opt.MaxBatchPacing.Milliseconds() - l.opt.MinBatchPacing.Milliseconds() + if batchPacingDelta > 0 { + batchPacingMs = (rand.Int63() % batchPacingDelta) + l.opt.MinBatchPacing.Milliseconds() + } + logrus.Debugf("sleeping %d ms for batch pacing", batchPacingMs) + time.Sleep(time.Duration(batchPacingMs) * time.Millisecond) + } + + snapshot := NewSnapshot("private-proxy", l.id, uint64(i)) + if i > 0 && i%l.opt.StatusInterval == 0 { logrus.Infof("#%d: iteration %d", l.id, i) } @@ -188,6 +212,7 @@ func (l *PrivateHttpLooper) iterate() { outPayload := make([]byte, payloadSize) cryptorand.Read(outPayload) outBase64 := base64.StdEncoding.EncodeToString(outPayload) + snapshot.Size = uint64(len(outBase64)) if req, err := http.NewRequest("POST", "http://"+l.shr.Token, bytes.NewBufferString(outBase64)); err == nil { client := &http.Client{Timeout: l.opt.Timeout, Transport: &http.Transport{DialContext: connDialer{conn}.Dial}} @@ -202,9 +227,13 @@ func (l *PrivateHttpLooper) iterate() { if inBase64 != outBase64 { logrus.Errorf("#%d: payload mismatch", l.id) l.results.Mismatches++ + + snapshot.Complete().Failed(err) } else { l.results.Bytes += uint64(len(outBase64)) logrus.Debugf("#%d: payload match", l.id) + + snapshot.Complete().Success() } } else { logrus.Errorf("#%d: error: %v", l.id, err) @@ -215,6 +244,8 @@ func (l *PrivateHttpLooper) iterate() { l.results.Errors++ } + snapshot.Send(l.opt.SnapshotQueue) + if err := conn.Close(); err != nil { logrus.Errorf("#%d: error closing connection: %v", l.id, err) } diff --git a/canary/publicHttpLooper.go b/canary/publicHttpLooper.go index 0200569c..b47bc390 100644 --- a/canary/publicHttpLooper.go +++ b/canary/publicHttpLooper.go @@ -73,28 +73,25 @@ func (l *PublicHttpLooper) Results() *LooperResults { } func (l *PublicHttpLooper) startup() error { - snapshot := NewSnapshot("create-share", l.id, 0) + target := "canary.PublicHttpLooper" + if l.opt.TargetName != "" { + target = l.opt.TargetName + } + snapshotCreateShare := NewSnapshot("create-share", l.id, 0) shr, err := sdk.CreateShare(l.root, &sdk.ShareRequest{ ShareMode: sdk.PublicShareMode, BackendMode: sdk.ProxyBackendMode, - Target: "canary.PublicHttpLooper", + Target: target, Frontends: []string{l.frontend}, PermissionMode: sdk.ClosedPermissionMode, }) - snapshot.Completed = time.Now() + snapshotCreateShare.Complete() if err != nil { - snapshot.Ok = false - snapshot.Error = err - if l.opt.SnapshotQueue != nil { - l.opt.SnapshotQueue <- snapshot - } + snapshotCreateShare.Failed(err).Send(l.opt.SnapshotQueue) return err } - snapshot.Ok = true - if l.opt.SnapshotQueue != nil { - l.opt.SnapshotQueue <- snapshot - } + snapshotCreateShare.Success().Send(l.opt.SnapshotQueue) l.shr = shr logrus.Infof("#%d allocated share '%v'", l.id, l.shr.Token) @@ -120,24 +117,12 @@ func (l *PublicHttpLooper) bind() error { return errors.Wrapf(err, "#%d error creating ziti context", l.id) } - snapshot := NewSnapshot("listen", l.id, 0) - + snapshotListen := NewSnapshot("listen", l.id, 0) if l.listener, err = zctx.ListenWithOptions(l.shr.Token, &options); err != nil { - snapshot.Completed = time.Now() - snapshot.Ok = false - snapshot.Error = err - if l.opt.SnapshotQueue != nil { - l.opt.SnapshotQueue <- snapshot - } - + snapshotListen.Complete().Failed(err).Send(l.opt.SnapshotQueue) return errors.Wrapf(err, "#%d error binding listener", l.id) } - - snapshot.Completed = time.Now() - snapshot.Ok = true - if l.opt.SnapshotQueue != nil { - l.opt.SnapshotQueue <- snapshot - } + snapshotListen.Complete().Success().Send(l.opt.SnapshotQueue) go func() { if err := http.Serve(l.listener, l); err != nil { @@ -174,7 +159,7 @@ func (l *PublicHttpLooper) iterate() { if batchPacingDelta > 0 { batchPacingMs = (rand.Int63() % batchPacingDelta) + l.opt.MinBatchPacing.Milliseconds() } - logrus.Debug("sleeping %d ms for batch pacing", batchPacingMs) + logrus.Debugf("sleeping %d ms for batch pacing", batchPacingMs) time.Sleep(time.Duration(batchPacingMs) * time.Millisecond) } @@ -208,15 +193,12 @@ func (l *PublicHttpLooper) iterate() { logrus.Errorf("#%d: payload mismatch", l.id) l.results.Mismatches++ - snapshot.Completed = time.Now() - snapshot.Ok = false - snapshot.Error = errors.New("payload mismatch") + snapshot.Complete().Failed(err) } else { l.results.Bytes += uint64(len(outBase64)) logrus.Debugf("#%d: payload match", l.id) - snapshot.Completed = time.Now() - snapshot.Ok = true + snapshot.Complete().Success() } } else { logrus.Errorf("#%d: error: %v", l.id, err) @@ -227,11 +209,7 @@ func (l *PublicHttpLooper) iterate() { l.results.Errors++ } - if l.opt.SnapshotQueue != nil { - l.opt.SnapshotQueue <- snapshot - } else { - logrus.Info(snapshot) - } + snapshot.Send(l.opt.SnapshotQueue) pacingMs := l.opt.MaxPacing.Milliseconds() pacingDelta := l.opt.MaxPacing.Milliseconds() - l.opt.MinPacing.Milliseconds() diff --git a/canary/snapshot.go b/canary/snapshot.go index 3f6486d1..62a89682 100644 --- a/canary/snapshot.go +++ b/canary/snapshot.go @@ -21,6 +21,28 @@ func NewSnapshot(operation string, instance uint, iteration uint64) *Snapshot { return &Snapshot{Operation: operation, Instance: instance, Iteration: iteration, Started: time.Now()} } +func (s *Snapshot) Complete() *Snapshot { + s.Completed = time.Now() + return s +} + +func (s *Snapshot) Success() *Snapshot { + s.Ok = true + return s +} + +func (s *Snapshot) Failed(err error) *Snapshot { + s.Ok = false + s.Error = err + return s +} + +func (s *Snapshot) Send(queue chan *Snapshot) { + if queue != nil { + queue <- s + } +} + func (s *Snapshot) String() string { if s.Ok { return fmt.Sprintf("[%v, %d, %d] (ok) %v, %v", s.Operation, s.Instance, s.Iteration, s.Completed.Sub(s.Started), util.BytesToSize(int64(s.Size))) diff --git a/cmd/zrok/testCanaryPrivateProxy.go b/cmd/zrok/testCanaryPrivateProxy.go index 6c3402a8..5bf77c71 100644 --- a/cmd/zrok/testCanaryPrivateProxy.go +++ b/cmd/zrok/testCanaryPrivateProxy.go @@ -1,6 +1,7 @@ package main import ( + "context" "github.com/openziti/zrok/canary" "github.com/openziti/zrok/environment" "github.com/sirupsen/logrus" @@ -22,16 +23,25 @@ type testCanaryPrivateProxy struct { iterations uint statusInterval uint timeout time.Duration + payload uint64 minPayload uint64 maxPayload uint64 + preDelay time.Duration minPreDelay time.Duration maxPreDelay time.Duration + dwell time.Duration minDwell time.Duration maxDwell time.Duration + pacing time.Duration minPacing time.Duration maxPacing time.Duration + batchSize uint + batchPacing time.Duration + minBatchPacing time.Duration + maxBatchPacing time.Duration targetName string bindAddress string + canaryConfig string } func newTestCanaryPrivateProxy() *testCanaryPrivateProxy { @@ -46,16 +56,25 @@ func newTestCanaryPrivateProxy() *testCanaryPrivateProxy { cmd.Flags().UintVarP(&command.iterations, "iterations", "i", 1, "Number of iterations") cmd.Flags().UintVarP(&command.statusInterval, "status-interval", "S", 100, "Show status every # iterations") cmd.Flags().DurationVarP(&command.timeout, "timeout", "T", 30*time.Second, "Timeout when sending HTTP requests") + cmd.Flags().Uint64Var(&command.payload, "payload", 0, "Fixed payload size in bytes") cmd.Flags().Uint64Var(&command.minPayload, "min-payload", 64, "Minimum payload size in bytes") cmd.Flags().Uint64Var(&command.maxPayload, "max-payload", 10240, "Maximum payload size in bytes") + cmd.Flags().DurationVar(&command.preDelay, "pre-delay", 0, "Fixed pre-delay before creating the next looper") cmd.Flags().DurationVar(&command.minPreDelay, "min-pre-delay", 0, "Minimum pre-delay before creating the next looper") cmd.Flags().DurationVar(&command.maxPreDelay, "max-pre-delay", 0, "Maximum pre-delay before creating the next looper") + cmd.Flags().DurationVar(&command.dwell, "dwell", 0, "Fixed dwell time") cmd.Flags().DurationVar(&command.minDwell, "min-dwell", 1*time.Second, "Minimum dwell time") cmd.Flags().DurationVar(&command.maxDwell, "max-dwell", 1*time.Second, "Maximum dwell time") + cmd.Flags().DurationVar(&command.pacing, "pacing", 0, "Fixed pacing time") cmd.Flags().DurationVar(&command.minPacing, "min-pacing", 0, "Minimum pacing time") cmd.Flags().DurationVar(&command.maxPacing, "max-pacing", 0, "Maximum pacing time") + cmd.Flags().UintVar(&command.batchSize, "batch-size", 0, "Iterate in batches of this size") + cmd.Flags().DurationVar(&command.batchPacing, "batch-pacing", 0, "Fixed batch pacing time") + cmd.Flags().DurationVar(&command.minBatchPacing, "min-batch-pacing", 0, "Minimum batch pacing time") + cmd.Flags().DurationVar(&command.maxBatchPacing, "max-batch-pacing", 0, "Maximum batch pacing time") cmd.Flags().StringVar(&command.targetName, "target-name", "", "Metadata describing the virtual target") cmd.Flags().StringVar(&command.bindAddress, "bind-address", "", "Metadata describing the virtual bind address") + cmd.Flags().StringVar(&command.canaryConfig, "canary-config", "", "Path to canary configuration file") return command } @@ -69,14 +88,35 @@ func (cmd *testCanaryPrivateProxy) run(_ *cobra.Command, _ []string) { logrus.Fatal("unable to load environment; did you 'zrok enable'?") } + var sns *canary.SnapshotStreamer + var snsCtx context.Context + var snsCancel context.CancelFunc + if cmd.canaryConfig != "" { + cfg, err := canary.LoadConfig(cmd.canaryConfig) + if err != nil { + panic(err) + } + snsCtx, snsCancel = context.WithCancel(context.Background()) + sns, err = canary.NewSnapshotStreamer(snsCtx, cfg) + if err != nil { + panic(err) + } + go sns.Run() + } + var loopers []*canary.PrivateHttpLooper for i := uint(0); i < cmd.loopers; i++ { - preDelay := cmd.maxPreDelay.Milliseconds() - preDelayDelta := cmd.maxPreDelay.Milliseconds() - cmd.minPreDelay.Milliseconds() - if preDelayDelta > 0 { - preDelay = int64(rand.Intn(int(preDelayDelta))) + cmd.minPreDelay.Milliseconds() - time.Sleep(time.Duration(preDelay) * time.Millisecond) + var preDelay int64 + if cmd.preDelay > 0 { + preDelay = cmd.preDelay.Milliseconds() + } else { + preDelay = cmd.maxPreDelay.Milliseconds() + preDelayDelta := cmd.maxPreDelay.Milliseconds() - cmd.minPreDelay.Milliseconds() + if preDelayDelta > 0 { + preDelay = int64(rand.Intn(int(preDelayDelta))) + cmd.minPreDelay.Milliseconds() + } } + time.Sleep(time.Duration(preDelay) * time.Millisecond) looperOpts := &canary.LooperOptions{ Iterations: cmd.iterations, @@ -88,9 +128,31 @@ func (cmd *testCanaryPrivateProxy) run(_ *cobra.Command, _ []string) { MaxDwell: cmd.maxDwell, MinPacing: cmd.minPacing, MaxPacing: cmd.maxPacing, + BatchSize: cmd.batchSize, + MinBatchPacing: cmd.minBatchPacing, + MaxBatchPacing: cmd.maxBatchPacing, TargetName: cmd.targetName, BindAddress: cmd.bindAddress, } + if cmd.payload > 0 { + looperOpts.MinPayload = cmd.payload + looperOpts.MaxPayload = cmd.payload + } + if cmd.dwell > 0 { + looperOpts.MinDwell = cmd.dwell + looperOpts.MaxDwell = cmd.dwell + } + if cmd.pacing > 0 { + looperOpts.MinPacing = cmd.pacing + looperOpts.MaxPacing = cmd.pacing + } + if cmd.batchPacing > 0 { + looperOpts.MinBatchPacing = cmd.batchPacing + looperOpts.MaxBatchPacing = cmd.batchPacing + } + if sns != nil { + looperOpts.SnapshotQueue = sns.InputQueue + } looper := canary.NewPrivateHttpLooper(i, looperOpts, root) loopers = append(loopers, looper) go looper.Run() @@ -109,6 +171,11 @@ func (cmd *testCanaryPrivateProxy) run(_ *cobra.Command, _ []string) { <-l.Done() } + if sns != nil { + snsCancel() + <-sns.Closed + } + results := make([]*canary.LooperResults, 0) for i := uint(0); i < cmd.loopers; i++ { results = append(results, loopers[i].Results()) diff --git a/cmd/zrok/testCanaryPublicProxy.go b/cmd/zrok/testCanaryPublicProxy.go index 1346a0aa..21299140 100644 --- a/cmd/zrok/testCanaryPublicProxy.go +++ b/cmd/zrok/testCanaryPublicProxy.go @@ -39,6 +39,7 @@ type testCanaryPublicProxy struct { batchPacing time.Duration minBatchPacing time.Duration maxBatchPacing time.Duration + targetName string frontendSelection string canaryConfig string } @@ -58,6 +59,9 @@ func newTestCanaryPublicProxy() *testCanaryPublicProxy { cmd.Flags().Uint64Var(&command.payload, "payload", 0, "Fixed payload size") cmd.Flags().Uint64Var(&command.minPayload, "min-payload", 64, "Minimum payload size in bytes") cmd.Flags().Uint64Var(&command.maxPayload, "max-payload", 10240, "Maximum payload size in bytes") + cmd.Flags().DurationVar(&command.preDelay, "pre-delay", 0, "Fixed pre-delay before creating the next looper") + cmd.Flags().DurationVar(&command.minPreDelay, "min-pre-delay", 0, "Minimum pre-delay before creating the next looper") + cmd.Flags().DurationVar(&command.maxPreDelay, "max-pre-delay", 0, "Maximum pre-delay before creating the next looper") cmd.Flags().DurationVar(&command.dwell, "dwell", 1*time.Second, "Fixed dwell time") cmd.Flags().DurationVar(&command.minDwell, "min-dwell", 1*time.Second, "Minimum dwell time") cmd.Flags().DurationVar(&command.maxDwell, "max-dwell", 1*time.Second, "Maximum dwell time") @@ -68,6 +72,7 @@ func newTestCanaryPublicProxy() *testCanaryPublicProxy { cmd.Flags().DurationVar(&command.batchPacing, "batch-pacing", 0, "Fixed batch pacing time") cmd.Flags().DurationVar(&command.minBatchPacing, "min-batch-pacing", 0, "Minimum batch pacing time") cmd.Flags().DurationVar(&command.maxBatchPacing, "max-batch-pacing", 0, "Maximum batch pacing time") + cmd.Flags().StringVar(&command.targetName, "target-name", "", "Metadata describing the virtual target") cmd.Flags().StringVar(&command.frontendSelection, "frontend-selection", "public", "Select frontend selection") cmd.Flags().StringVar(&command.canaryConfig, "canary-config", "", "Path to canary configuration file") return command @@ -83,67 +88,69 @@ func (cmd *testCanaryPublicProxy) run(_ *cobra.Command, _ []string) { logrus.Fatal("unable to load environment; did you 'zrok enable'?") } - var sc *canary.SnapshotStreamer - var scCtx context.Context - var scCancel context.CancelFunc + var sns *canary.SnapshotStreamer + var snsCtx context.Context + var snsCancel context.CancelFunc if cmd.canaryConfig != "" { cfg, err := canary.LoadConfig(cmd.canaryConfig) if err != nil { panic(err) } - scCtx, scCancel = context.WithCancel(context.Background()) - sc, err = canary.NewSnapshotStreamer(scCtx, cfg) + snsCtx, snsCancel = context.WithCancel(context.Background()) + sns, err = canary.NewSnapshotStreamer(snsCtx, cfg) if err != nil { panic(err) } - go sc.Run() + go sns.Run() } var loopers []*canary.PublicHttpLooper for i := uint(0); i < cmd.loopers; i++ { - preDelay := cmd.maxPreDelay.Milliseconds() - preDelayDelta := cmd.maxPreDelay.Milliseconds() - cmd.minPreDelay.Milliseconds() - if preDelayDelta > 0 { - preDelay = int64(rand.Intn(int(preDelayDelta))) + cmd.minPreDelay.Milliseconds() - time.Sleep(time.Duration(preDelay) * time.Millisecond) + var preDelay int64 + if cmd.preDelay > 0 { + preDelay = cmd.preDelay.Milliseconds() + } else { + preDelay = cmd.maxPreDelay.Milliseconds() + preDelayDelta := cmd.maxPreDelay.Milliseconds() - cmd.minPreDelay.Milliseconds() + if preDelayDelta > 0 { + preDelay = int64(rand.Intn(int(preDelayDelta))) + cmd.minPreDelay.Milliseconds() + } } + time.Sleep(time.Duration(preDelay) * time.Millisecond) looperOpts := &canary.LooperOptions{ Iterations: cmd.iterations, StatusInterval: cmd.statusInterval, Timeout: cmd.timeout, + MinPayload: cmd.minPayload, + MaxPayload: cmd.maxPayload, + MinDwell: cmd.minDwell, + MaxDwell: cmd.maxDwell, + MinPacing: cmd.minPacing, + MaxPacing: cmd.maxPacing, BatchSize: cmd.batchSize, + MinBatchPacing: cmd.minBatchPacing, + MaxBatchPacing: cmd.maxBatchPacing, + TargetName: cmd.targetName, } if cmd.payload > 0 { looperOpts.MinPayload = cmd.payload looperOpts.MaxPayload = cmd.payload - } else { - looperOpts.MinPayload = cmd.minPayload - looperOpts.MaxPayload = cmd.maxPayload } if cmd.dwell > 0 { looperOpts.MinDwell = cmd.dwell looperOpts.MaxDwell = cmd.dwell - } else { - looperOpts.MinDwell = cmd.minDwell - looperOpts.MaxDwell = cmd.maxDwell } if cmd.pacing > 0 { looperOpts.MinPacing = cmd.pacing looperOpts.MaxPacing = cmd.pacing - } else { - looperOpts.MinPacing = cmd.minPacing - looperOpts.MaxPacing = cmd.maxPacing } if cmd.batchPacing > 0 { looperOpts.MinBatchPacing = cmd.batchPacing looperOpts.MaxBatchPacing = cmd.batchPacing - } else { - looperOpts.MinBatchPacing = cmd.minBatchPacing - looperOpts.MaxBatchPacing = cmd.maxBatchPacing } - if sc != nil { - looperOpts.SnapshotQueue = sc.InputQueue + if sns != nil { + looperOpts.SnapshotQueue = sns.InputQueue } looper := canary.NewPublicHttpLooper(i, cmd.frontendSelection, looperOpts, root) loopers = append(loopers, looper) @@ -163,9 +170,9 @@ func (cmd *testCanaryPublicProxy) run(_ *cobra.Command, _ []string) { <-l.Done() } - if sc != nil { - scCancel() - <-sc.Closed + if sns != nil { + snsCancel() + <-sns.Closed } results := make([]*canary.LooperResults, 0)