From 06923dcef8dc3dde906586bfe7e91276848a1190 Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Fri, 25 Apr 2025 15:02:55 -0400 Subject: [PATCH] snapshot instrumentation for public-proxy (#953) --- canary/looper.go | 1 + canary/publicHttpLooper.go | 16 ++++++++++++++++ cmd/zrok/testCanaryPublicProxy.go | 27 +++++++++++++++++++++++++++ 3 files changed, 44 insertions(+) diff --git a/canary/looper.go b/canary/looper.go index d382eb7b..b23cdff9 100644 --- a/canary/looper.go +++ b/canary/looper.go @@ -18,6 +18,7 @@ type LooperOptions struct { MaxPacing time.Duration TargetName string BindAddress string + SnapshotQueue chan *Snapshot } type LooperResults struct { diff --git a/canary/publicHttpLooper.go b/canary/publicHttpLooper.go index 2a212341..ef5bf454 100644 --- a/canary/publicHttpLooper.go +++ b/canary/publicHttpLooper.go @@ -141,6 +141,8 @@ func (l *PublicHttpLooper) iterate() { defer func() { l.results.StopTime = time.Now() }() for i := uint(0); i < l.opt.Iterations && !l.abort; i++ { + snapshot := NewSnapshot("public-proxy", l.id, uint64(i)) + if i > 0 && i%l.opt.StatusInterval == 0 { logrus.Infof("#%d: iteration %d", l.id, i) } @@ -153,6 +155,7 @@ func (l *PublicHttpLooper) iterate() { outPayload := make([]byte, payloadSize) cryptorand.Read(outPayload) outBase64 := base64.StdEncoding.EncodeToString(outPayload) + snapshot.Size = uint64(len(outBase64)) if req, err := http.NewRequest("POST", l.shr.FrontendEndpoints[0], bytes.NewBufferString(outBase64)); err == nil { client := &http.Client{Timeout: l.opt.Timeout} @@ -167,9 +170,16 @@ func (l *PublicHttpLooper) iterate() { if inBase64 != outBase64 { logrus.Errorf("#%d: payload mismatch", l.id) l.results.Mismatches++ + + snapshot.Completed = time.Now() + snapshot.Ok = false + snapshot.Error = errors.New("payload mismatch") } else { l.results.Bytes += uint64(len(outBase64)) logrus.Debugf("#%d: payload match", l.id) + + snapshot.Completed = time.Now() + snapshot.Ok = true } } else { logrus.Errorf("#%d: error: %v", l.id, err) @@ -180,6 +190,12 @@ func (l *PublicHttpLooper) iterate() { l.results.Errors++ } + if l.opt.SnapshotQueue != nil { + l.opt.SnapshotQueue <- snapshot + } else { + logrus.Info(snapshot) + } + pacingMs := l.opt.MaxPacing.Milliseconds() pacingDelta := l.opt.MaxPacing.Milliseconds() - l.opt.MinPacing.Milliseconds() if pacingDelta > 0 { diff --git a/cmd/zrok/testCanaryPublicProxy.go b/cmd/zrok/testCanaryPublicProxy.go index ae7616dc..1bf57b69 100644 --- a/cmd/zrok/testCanaryPublicProxy.go +++ b/cmd/zrok/testCanaryPublicProxy.go @@ -1,6 +1,7 @@ package main import ( + "context" "github.com/openziti/zrok/canary" "github.com/openziti/zrok/environment" "github.com/sirupsen/logrus" @@ -31,6 +32,7 @@ type testCanaryPublicProxy struct { minPacing time.Duration maxPacing time.Duration frontendSelection string + canaryConfig string } func newTestCanaryPublicProxy() *testCanaryPublicProxy { @@ -54,6 +56,7 @@ func newTestCanaryPublicProxy() *testCanaryPublicProxy { cmd.Flags().DurationVar(&command.minPacing, "min-pacing", 0, "Minimum pacing time") cmd.Flags().DurationVar(&command.maxPacing, "max-pacing", 0, "Maximum pacing time") cmd.Flags().StringVar(&command.frontendSelection, "frontend-selection", "public", "Select frontend selection") + cmd.Flags().StringVar(&command.canaryConfig, "canary-config", "", "Path to canary configuration file") return command } @@ -67,6 +70,19 @@ func (cmd *testCanaryPublicProxy) run(_ *cobra.Command, _ []string) { logrus.Fatal("unable to load environment; did you 'zrok enable'?") } + var sc *canary.SnapshotCollector + var scCtx context.Context + var scCancel context.CancelFunc + if cmd.canaryConfig != "" { + cfg, err := canary.LoadConfig(cmd.canaryConfig) + if err != nil { + panic(err) + } + scCtx, scCancel = context.WithCancel(context.Background()) + sc = canary.NewSnapshotCollector(scCtx, cfg) + go sc.Run() + } + var loopers []*canary.PublicHttpLooper for i := uint(0); i < cmd.loopers; i++ { preDelay := cmd.maxPreDelay.Milliseconds() @@ -87,6 +103,9 @@ func (cmd *testCanaryPublicProxy) run(_ *cobra.Command, _ []string) { MinPacing: cmd.minPacing, MaxPacing: cmd.maxPacing, } + if sc != nil { + looperOpts.SnapshotQueue = sc.InputQueue + } looper := canary.NewPublicHttpLooper(i, cmd.frontendSelection, looperOpts, root) loopers = append(loopers, looper) go looper.Run() @@ -105,6 +124,14 @@ func (cmd *testCanaryPublicProxy) run(_ *cobra.Command, _ []string) { <-l.Done() } + if sc != nil { + scCancel() + <-sc.Closed + if err := sc.Store(); err != nil { + panic(err) + } + } + results := make([]*canary.LooperResults, 0) for i := uint(0); i < cmd.loopers; i++ { results = append(results, loopers[i].Results())