snapshot instrumentation for public-proxy (#953)

This commit is contained in:
Michael Quigley 2025-04-25 15:02:55 -04:00
parent abd22ef906
commit 06923dcef8
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
3 changed files with 44 additions and 0 deletions

View File

@ -18,6 +18,7 @@ type LooperOptions struct {
MaxPacing time.Duration MaxPacing time.Duration
TargetName string TargetName string
BindAddress string BindAddress string
SnapshotQueue chan *Snapshot
} }
type LooperResults struct { type LooperResults struct {

View File

@ -141,6 +141,8 @@ func (l *PublicHttpLooper) iterate() {
defer func() { l.results.StopTime = time.Now() }() defer func() { l.results.StopTime = time.Now() }()
for i := uint(0); i < l.opt.Iterations && !l.abort; i++ { for i := uint(0); i < l.opt.Iterations && !l.abort; i++ {
snapshot := NewSnapshot("public-proxy", l.id, uint64(i))
if i > 0 && i%l.opt.StatusInterval == 0 { if i > 0 && i%l.opt.StatusInterval == 0 {
logrus.Infof("#%d: iteration %d", l.id, i) logrus.Infof("#%d: iteration %d", l.id, i)
} }
@ -153,6 +155,7 @@ func (l *PublicHttpLooper) iterate() {
outPayload := make([]byte, payloadSize) outPayload := make([]byte, payloadSize)
cryptorand.Read(outPayload) cryptorand.Read(outPayload)
outBase64 := base64.StdEncoding.EncodeToString(outPayload) outBase64 := base64.StdEncoding.EncodeToString(outPayload)
snapshot.Size = uint64(len(outBase64))
if req, err := http.NewRequest("POST", l.shr.FrontendEndpoints[0], bytes.NewBufferString(outBase64)); err == nil { if req, err := http.NewRequest("POST", l.shr.FrontendEndpoints[0], bytes.NewBufferString(outBase64)); err == nil {
client := &http.Client{Timeout: l.opt.Timeout} client := &http.Client{Timeout: l.opt.Timeout}
@ -167,9 +170,16 @@ func (l *PublicHttpLooper) iterate() {
if inBase64 != outBase64 { if inBase64 != outBase64 {
logrus.Errorf("#%d: payload mismatch", l.id) logrus.Errorf("#%d: payload mismatch", l.id)
l.results.Mismatches++ l.results.Mismatches++
snapshot.Completed = time.Now()
snapshot.Ok = false
snapshot.Error = errors.New("payload mismatch")
} else { } else {
l.results.Bytes += uint64(len(outBase64)) l.results.Bytes += uint64(len(outBase64))
logrus.Debugf("#%d: payload match", l.id) logrus.Debugf("#%d: payload match", l.id)
snapshot.Completed = time.Now()
snapshot.Ok = true
} }
} else { } else {
logrus.Errorf("#%d: error: %v", l.id, err) logrus.Errorf("#%d: error: %v", l.id, err)
@ -180,6 +190,12 @@ func (l *PublicHttpLooper) iterate() {
l.results.Errors++ l.results.Errors++
} }
if l.opt.SnapshotQueue != nil {
l.opt.SnapshotQueue <- snapshot
} else {
logrus.Info(snapshot)
}
pacingMs := l.opt.MaxPacing.Milliseconds() pacingMs := l.opt.MaxPacing.Milliseconds()
pacingDelta := l.opt.MaxPacing.Milliseconds() - l.opt.MinPacing.Milliseconds() pacingDelta := l.opt.MaxPacing.Milliseconds() - l.opt.MinPacing.Milliseconds()
if pacingDelta > 0 { if pacingDelta > 0 {

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"github.com/openziti/zrok/canary" "github.com/openziti/zrok/canary"
"github.com/openziti/zrok/environment" "github.com/openziti/zrok/environment"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -31,6 +32,7 @@ type testCanaryPublicProxy struct {
minPacing time.Duration minPacing time.Duration
maxPacing time.Duration maxPacing time.Duration
frontendSelection string frontendSelection string
canaryConfig string
} }
func newTestCanaryPublicProxy() *testCanaryPublicProxy { func newTestCanaryPublicProxy() *testCanaryPublicProxy {
@ -54,6 +56,7 @@ func newTestCanaryPublicProxy() *testCanaryPublicProxy {
cmd.Flags().DurationVar(&command.minPacing, "min-pacing", 0, "Minimum pacing time") cmd.Flags().DurationVar(&command.minPacing, "min-pacing", 0, "Minimum pacing time")
cmd.Flags().DurationVar(&command.maxPacing, "max-pacing", 0, "Maximum pacing time") cmd.Flags().DurationVar(&command.maxPacing, "max-pacing", 0, "Maximum pacing time")
cmd.Flags().StringVar(&command.frontendSelection, "frontend-selection", "public", "Select frontend selection") cmd.Flags().StringVar(&command.frontendSelection, "frontend-selection", "public", "Select frontend selection")
cmd.Flags().StringVar(&command.canaryConfig, "canary-config", "", "Path to canary configuration file")
return command return command
} }
@ -67,6 +70,19 @@ func (cmd *testCanaryPublicProxy) run(_ *cobra.Command, _ []string) {
logrus.Fatal("unable to load environment; did you 'zrok enable'?") logrus.Fatal("unable to load environment; did you 'zrok enable'?")
} }
var sc *canary.SnapshotCollector
var scCtx context.Context
var scCancel context.CancelFunc
if cmd.canaryConfig != "" {
cfg, err := canary.LoadConfig(cmd.canaryConfig)
if err != nil {
panic(err)
}
scCtx, scCancel = context.WithCancel(context.Background())
sc = canary.NewSnapshotCollector(scCtx, cfg)
go sc.Run()
}
var loopers []*canary.PublicHttpLooper var loopers []*canary.PublicHttpLooper
for i := uint(0); i < cmd.loopers; i++ { for i := uint(0); i < cmd.loopers; i++ {
preDelay := cmd.maxPreDelay.Milliseconds() preDelay := cmd.maxPreDelay.Milliseconds()
@ -87,6 +103,9 @@ func (cmd *testCanaryPublicProxy) run(_ *cobra.Command, _ []string) {
MinPacing: cmd.minPacing, MinPacing: cmd.minPacing,
MaxPacing: cmd.maxPacing, MaxPacing: cmd.maxPacing,
} }
if sc != nil {
looperOpts.SnapshotQueue = sc.InputQueue
}
looper := canary.NewPublicHttpLooper(i, cmd.frontendSelection, looperOpts, root) looper := canary.NewPublicHttpLooper(i, cmd.frontendSelection, looperOpts, root)
loopers = append(loopers, looper) loopers = append(loopers, looper)
go looper.Run() go looper.Run()
@ -105,6 +124,14 @@ func (cmd *testCanaryPublicProxy) run(_ *cobra.Command, _ []string) {
<-l.Done() <-l.Done()
} }
if sc != nil {
scCancel()
<-sc.Closed
if err := sc.Store(); err != nil {
panic(err)
}
}
results := make([]*canary.LooperResults, 0) results := make([]*canary.LooperResults, 0)
for i := uint(0); i < cmd.loopers; i++ { for i := uint(0); i < cmd.loopers; i++ {
results = append(results, loopers[i].Results()) results = append(results, loopers[i].Results())