better canary.Snapshot API improving code readability; public-proxy and private-proxy code paths converged (#953)

This commit is contained in:
Michael Quigley 2025-04-30 16:01:51 -04:00
parent 8278189edc
commit 3225501d00
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
5 changed files with 176 additions and 71 deletions

View File

@ -80,28 +80,37 @@ func (l *PrivateHttpLooper) startup() error {
if l.opt.TargetName != "" {
target = l.opt.TargetName
}
snapshotCreateShare := NewSnapshot("create-share", l.id, 0)
shr, err := sdk.CreateShare(l.root, &sdk.ShareRequest{
ShareMode: sdk.PrivateShareMode,
BackendMode: sdk.ProxyBackendMode,
Target: target,
PermissionMode: sdk.ClosedPermissionMode,
})
snapshotCreateShare.Complete()
if err != nil {
snapshotCreateShare.Failed(err).Send(l.opt.SnapshotQueue)
return err
}
snapshotCreateShare.Success().Send(l.opt.SnapshotQueue)
l.shr = shr
bindAddress := ""
if l.opt.BindAddress != "" {
bindAddress = l.opt.BindAddress
}
snapshotCreateAccess := NewSnapshot("create-access", l.id, 0)
acc, err := sdk.CreateAccess(l.root, &sdk.AccessRequest{
ShareToken: shr.Token,
BindAddress: bindAddress,
})
snapshotCreateAccess.Complete()
if err != nil {
snapshotCreateAccess.Failed(err).Send(l.opt.SnapshotQueue)
return err
}
snapshotCreateAccess.Success().Send(l.opt.SnapshotQueue)
l.acc = acc
logrus.Infof("#%d allocated share '%v', allocated frontend '%v'", l.id, shr.Token, acc.Token)
@ -127,9 +136,12 @@ func (l *PrivateHttpLooper) bind() error {
return errors.Wrapf(err, "#%d error creating ziti context", l.id)
}
snapshotListen := NewSnapshot("listen", l.id, 0)
if l.listener, err = zctx.ListenWithOptions(l.shr.Token, &options); err != nil {
snapshotListen.Complete().Failed(err).Send(l.opt.SnapshotQueue)
return errors.Wrapf(err, "#%d error binding listener", l.id)
}
snapshotListen.Complete().Success().Send(l.opt.SnapshotQueue)
go func() {
if err := http.Serve(l.listener, l); err != nil {
@ -168,6 +180,18 @@ func (l *PrivateHttpLooper) iterate() {
defer func() { l.results.StopTime = time.Now() }()
for i := uint(0); i < l.opt.Iterations && !l.abort; i++ {
if i > 0 && l.opt.BatchSize > 0 && i%l.opt.BatchSize == 0 {
batchPacingMs := l.opt.MaxBatchPacing.Milliseconds()
batchPacingDelta := l.opt.MaxBatchPacing.Milliseconds() - l.opt.MinBatchPacing.Milliseconds()
if batchPacingDelta > 0 {
batchPacingMs = (rand.Int63() % batchPacingDelta) + l.opt.MinBatchPacing.Milliseconds()
}
logrus.Debugf("sleeping %d ms for batch pacing", batchPacingMs)
time.Sleep(time.Duration(batchPacingMs) * time.Millisecond)
}
snapshot := NewSnapshot("private-proxy", l.id, uint64(i))
if i > 0 && i%l.opt.StatusInterval == 0 {
logrus.Infof("#%d: iteration %d", l.id, i)
}
@ -188,6 +212,7 @@ func (l *PrivateHttpLooper) iterate() {
outPayload := make([]byte, payloadSize)
cryptorand.Read(outPayload)
outBase64 := base64.StdEncoding.EncodeToString(outPayload)
snapshot.Size = uint64(len(outBase64))
if req, err := http.NewRequest("POST", "http://"+l.shr.Token, bytes.NewBufferString(outBase64)); err == nil {
client := &http.Client{Timeout: l.opt.Timeout, Transport: &http.Transport{DialContext: connDialer{conn}.Dial}}
@ -202,9 +227,13 @@ func (l *PrivateHttpLooper) iterate() {
if inBase64 != outBase64 {
logrus.Errorf("#%d: payload mismatch", l.id)
l.results.Mismatches++
snapshot.Complete().Failed(err)
} else {
l.results.Bytes += uint64(len(outBase64))
logrus.Debugf("#%d: payload match", l.id)
snapshot.Complete().Success()
}
} else {
logrus.Errorf("#%d: error: %v", l.id, err)
@ -215,6 +244,8 @@ func (l *PrivateHttpLooper) iterate() {
l.results.Errors++
}
snapshot.Send(l.opt.SnapshotQueue)
if err := conn.Close(); err != nil {
logrus.Errorf("#%d: error closing connection: %v", l.id, err)
}

View File

@ -73,28 +73,25 @@ func (l *PublicHttpLooper) Results() *LooperResults {
}
func (l *PublicHttpLooper) startup() error {
snapshot := NewSnapshot("create-share", l.id, 0)
target := "canary.PublicHttpLooper"
if l.opt.TargetName != "" {
target = l.opt.TargetName
}
snapshotCreateShare := NewSnapshot("create-share", l.id, 0)
shr, err := sdk.CreateShare(l.root, &sdk.ShareRequest{
ShareMode: sdk.PublicShareMode,
BackendMode: sdk.ProxyBackendMode,
Target: "canary.PublicHttpLooper",
Target: target,
Frontends: []string{l.frontend},
PermissionMode: sdk.ClosedPermissionMode,
})
snapshot.Completed = time.Now()
snapshotCreateShare.Complete()
if err != nil {
snapshot.Ok = false
snapshot.Error = err
if l.opt.SnapshotQueue != nil {
l.opt.SnapshotQueue <- snapshot
}
snapshotCreateShare.Failed(err).Send(l.opt.SnapshotQueue)
return err
}
snapshot.Ok = true
if l.opt.SnapshotQueue != nil {
l.opt.SnapshotQueue <- snapshot
}
snapshotCreateShare.Success().Send(l.opt.SnapshotQueue)
l.shr = shr
logrus.Infof("#%d allocated share '%v'", l.id, l.shr.Token)
@ -120,24 +117,12 @@ func (l *PublicHttpLooper) bind() error {
return errors.Wrapf(err, "#%d error creating ziti context", l.id)
}
snapshot := NewSnapshot("listen", l.id, 0)
snapshotListen := NewSnapshot("listen", l.id, 0)
if l.listener, err = zctx.ListenWithOptions(l.shr.Token, &options); err != nil {
snapshot.Completed = time.Now()
snapshot.Ok = false
snapshot.Error = err
if l.opt.SnapshotQueue != nil {
l.opt.SnapshotQueue <- snapshot
}
snapshotListen.Complete().Failed(err).Send(l.opt.SnapshotQueue)
return errors.Wrapf(err, "#%d error binding listener", l.id)
}
snapshot.Completed = time.Now()
snapshot.Ok = true
if l.opt.SnapshotQueue != nil {
l.opt.SnapshotQueue <- snapshot
}
snapshotListen.Complete().Success().Send(l.opt.SnapshotQueue)
go func() {
if err := http.Serve(l.listener, l); err != nil {
@ -174,7 +159,7 @@ func (l *PublicHttpLooper) iterate() {
if batchPacingDelta > 0 {
batchPacingMs = (rand.Int63() % batchPacingDelta) + l.opt.MinBatchPacing.Milliseconds()
}
logrus.Debug("sleeping %d ms for batch pacing", batchPacingMs)
logrus.Debugf("sleeping %d ms for batch pacing", batchPacingMs)
time.Sleep(time.Duration(batchPacingMs) * time.Millisecond)
}
@ -208,15 +193,12 @@ func (l *PublicHttpLooper) iterate() {
logrus.Errorf("#%d: payload mismatch", l.id)
l.results.Mismatches++
snapshot.Completed = time.Now()
snapshot.Ok = false
snapshot.Error = errors.New("payload mismatch")
snapshot.Complete().Failed(err)
} else {
l.results.Bytes += uint64(len(outBase64))
logrus.Debugf("#%d: payload match", l.id)
snapshot.Completed = time.Now()
snapshot.Ok = true
snapshot.Complete().Success()
}
} else {
logrus.Errorf("#%d: error: %v", l.id, err)
@ -227,11 +209,7 @@ func (l *PublicHttpLooper) iterate() {
l.results.Errors++
}
if l.opt.SnapshotQueue != nil {
l.opt.SnapshotQueue <- snapshot
} else {
logrus.Info(snapshot)
}
snapshot.Send(l.opt.SnapshotQueue)
pacingMs := l.opt.MaxPacing.Milliseconds()
pacingDelta := l.opt.MaxPacing.Milliseconds() - l.opt.MinPacing.Milliseconds()

View File

@ -21,6 +21,28 @@ func NewSnapshot(operation string, instance uint, iteration uint64) *Snapshot {
return &Snapshot{Operation: operation, Instance: instance, Iteration: iteration, Started: time.Now()}
}
func (s *Snapshot) Complete() *Snapshot {
s.Completed = time.Now()
return s
}
func (s *Snapshot) Success() *Snapshot {
s.Ok = true
return s
}
func (s *Snapshot) Failed(err error) *Snapshot {
s.Ok = false
s.Error = err
return s
}
func (s *Snapshot) Send(queue chan *Snapshot) {
if queue != nil {
queue <- s
}
}
func (s *Snapshot) String() string {
if s.Ok {
return fmt.Sprintf("[%v, %d, %d] (ok) %v, %v", s.Operation, s.Instance, s.Iteration, s.Completed.Sub(s.Started), util.BytesToSize(int64(s.Size)))

View File

@ -1,6 +1,7 @@
package main
import (
"context"
"github.com/openziti/zrok/canary"
"github.com/openziti/zrok/environment"
"github.com/sirupsen/logrus"
@ -22,16 +23,25 @@ type testCanaryPrivateProxy struct {
iterations uint
statusInterval uint
timeout time.Duration
payload uint64
minPayload uint64
maxPayload uint64
preDelay time.Duration
minPreDelay time.Duration
maxPreDelay time.Duration
dwell time.Duration
minDwell time.Duration
maxDwell time.Duration
pacing time.Duration
minPacing time.Duration
maxPacing time.Duration
batchSize uint
batchPacing time.Duration
minBatchPacing time.Duration
maxBatchPacing time.Duration
targetName string
bindAddress string
canaryConfig string
}
func newTestCanaryPrivateProxy() *testCanaryPrivateProxy {
@ -46,16 +56,25 @@ func newTestCanaryPrivateProxy() *testCanaryPrivateProxy {
cmd.Flags().UintVarP(&command.iterations, "iterations", "i", 1, "Number of iterations")
cmd.Flags().UintVarP(&command.statusInterval, "status-interval", "S", 100, "Show status every # iterations")
cmd.Flags().DurationVarP(&command.timeout, "timeout", "T", 30*time.Second, "Timeout when sending HTTP requests")
cmd.Flags().Uint64Var(&command.payload, "payload", 0, "Fixed payload size in bytes")
cmd.Flags().Uint64Var(&command.minPayload, "min-payload", 64, "Minimum payload size in bytes")
cmd.Flags().Uint64Var(&command.maxPayload, "max-payload", 10240, "Maximum payload size in bytes")
cmd.Flags().DurationVar(&command.preDelay, "pre-delay", 0, "Fixed pre-delay before creating the next looper")
cmd.Flags().DurationVar(&command.minPreDelay, "min-pre-delay", 0, "Minimum pre-delay before creating the next looper")
cmd.Flags().DurationVar(&command.maxPreDelay, "max-pre-delay", 0, "Maximum pre-delay before creating the next looper")
cmd.Flags().DurationVar(&command.dwell, "dwell", 0, "Fixed dwell time")
cmd.Flags().DurationVar(&command.minDwell, "min-dwell", 1*time.Second, "Minimum dwell time")
cmd.Flags().DurationVar(&command.maxDwell, "max-dwell", 1*time.Second, "Maximum dwell time")
cmd.Flags().DurationVar(&command.pacing, "pacing", 0, "Fixed pacing time")
cmd.Flags().DurationVar(&command.minPacing, "min-pacing", 0, "Minimum pacing time")
cmd.Flags().DurationVar(&command.maxPacing, "max-pacing", 0, "Maximum pacing time")
cmd.Flags().UintVar(&command.batchSize, "batch-size", 0, "Iterate in batches of this size")
cmd.Flags().DurationVar(&command.batchPacing, "batch-pacing", 0, "Fixed batch pacing time")
cmd.Flags().DurationVar(&command.minBatchPacing, "min-batch-pacing", 0, "Minimum batch pacing time")
cmd.Flags().DurationVar(&command.maxBatchPacing, "max-batch-pacing", 0, "Maximum batch pacing time")
cmd.Flags().StringVar(&command.targetName, "target-name", "", "Metadata describing the virtual target")
cmd.Flags().StringVar(&command.bindAddress, "bind-address", "", "Metadata describing the virtual bind address")
cmd.Flags().StringVar(&command.canaryConfig, "canary-config", "", "Path to canary configuration file")
return command
}
@ -69,14 +88,35 @@ func (cmd *testCanaryPrivateProxy) run(_ *cobra.Command, _ []string) {
logrus.Fatal("unable to load environment; did you 'zrok enable'?")
}
var sns *canary.SnapshotStreamer
var snsCtx context.Context
var snsCancel context.CancelFunc
if cmd.canaryConfig != "" {
cfg, err := canary.LoadConfig(cmd.canaryConfig)
if err != nil {
panic(err)
}
snsCtx, snsCancel = context.WithCancel(context.Background())
sns, err = canary.NewSnapshotStreamer(snsCtx, cfg)
if err != nil {
panic(err)
}
go sns.Run()
}
var loopers []*canary.PrivateHttpLooper
for i := uint(0); i < cmd.loopers; i++ {
preDelay := cmd.maxPreDelay.Milliseconds()
preDelayDelta := cmd.maxPreDelay.Milliseconds() - cmd.minPreDelay.Milliseconds()
if preDelayDelta > 0 {
preDelay = int64(rand.Intn(int(preDelayDelta))) + cmd.minPreDelay.Milliseconds()
time.Sleep(time.Duration(preDelay) * time.Millisecond)
var preDelay int64
if cmd.preDelay > 0 {
preDelay = cmd.preDelay.Milliseconds()
} else {
preDelay = cmd.maxPreDelay.Milliseconds()
preDelayDelta := cmd.maxPreDelay.Milliseconds() - cmd.minPreDelay.Milliseconds()
if preDelayDelta > 0 {
preDelay = int64(rand.Intn(int(preDelayDelta))) + cmd.minPreDelay.Milliseconds()
}
}
time.Sleep(time.Duration(preDelay) * time.Millisecond)
looperOpts := &canary.LooperOptions{
Iterations: cmd.iterations,
@ -88,9 +128,31 @@ func (cmd *testCanaryPrivateProxy) run(_ *cobra.Command, _ []string) {
MaxDwell: cmd.maxDwell,
MinPacing: cmd.minPacing,
MaxPacing: cmd.maxPacing,
BatchSize: cmd.batchSize,
MinBatchPacing: cmd.minBatchPacing,
MaxBatchPacing: cmd.maxBatchPacing,
TargetName: cmd.targetName,
BindAddress: cmd.bindAddress,
}
if cmd.payload > 0 {
looperOpts.MinPayload = cmd.payload
looperOpts.MaxPayload = cmd.payload
}
if cmd.dwell > 0 {
looperOpts.MinDwell = cmd.dwell
looperOpts.MaxDwell = cmd.dwell
}
if cmd.pacing > 0 {
looperOpts.MinPacing = cmd.pacing
looperOpts.MaxPacing = cmd.pacing
}
if cmd.batchPacing > 0 {
looperOpts.MinBatchPacing = cmd.batchPacing
looperOpts.MaxBatchPacing = cmd.batchPacing
}
if sns != nil {
looperOpts.SnapshotQueue = sns.InputQueue
}
looper := canary.NewPrivateHttpLooper(i, looperOpts, root)
loopers = append(loopers, looper)
go looper.Run()
@ -109,6 +171,11 @@ func (cmd *testCanaryPrivateProxy) run(_ *cobra.Command, _ []string) {
<-l.Done()
}
if sns != nil {
snsCancel()
<-sns.Closed
}
results := make([]*canary.LooperResults, 0)
for i := uint(0); i < cmd.loopers; i++ {
results = append(results, loopers[i].Results())

View File

@ -39,6 +39,7 @@ type testCanaryPublicProxy struct {
batchPacing time.Duration
minBatchPacing time.Duration
maxBatchPacing time.Duration
targetName string
frontendSelection string
canaryConfig string
}
@ -58,6 +59,9 @@ func newTestCanaryPublicProxy() *testCanaryPublicProxy {
cmd.Flags().Uint64Var(&command.payload, "payload", 0, "Fixed payload size")
cmd.Flags().Uint64Var(&command.minPayload, "min-payload", 64, "Minimum payload size in bytes")
cmd.Flags().Uint64Var(&command.maxPayload, "max-payload", 10240, "Maximum payload size in bytes")
cmd.Flags().DurationVar(&command.preDelay, "pre-delay", 0, "Fixed pre-delay before creating the next looper")
cmd.Flags().DurationVar(&command.minPreDelay, "min-pre-delay", 0, "Minimum pre-delay before creating the next looper")
cmd.Flags().DurationVar(&command.maxPreDelay, "max-pre-delay", 0, "Maximum pre-delay before creating the next looper")
cmd.Flags().DurationVar(&command.dwell, "dwell", 1*time.Second, "Fixed dwell time")
cmd.Flags().DurationVar(&command.minDwell, "min-dwell", 1*time.Second, "Minimum dwell time")
cmd.Flags().DurationVar(&command.maxDwell, "max-dwell", 1*time.Second, "Maximum dwell time")
@ -68,6 +72,7 @@ func newTestCanaryPublicProxy() *testCanaryPublicProxy {
cmd.Flags().DurationVar(&command.batchPacing, "batch-pacing", 0, "Fixed batch pacing time")
cmd.Flags().DurationVar(&command.minBatchPacing, "min-batch-pacing", 0, "Minimum batch pacing time")
cmd.Flags().DurationVar(&command.maxBatchPacing, "max-batch-pacing", 0, "Maximum batch pacing time")
cmd.Flags().StringVar(&command.targetName, "target-name", "", "Metadata describing the virtual target")
cmd.Flags().StringVar(&command.frontendSelection, "frontend-selection", "public", "Select frontend selection")
cmd.Flags().StringVar(&command.canaryConfig, "canary-config", "", "Path to canary configuration file")
return command
@ -83,67 +88,69 @@ func (cmd *testCanaryPublicProxy) run(_ *cobra.Command, _ []string) {
logrus.Fatal("unable to load environment; did you 'zrok enable'?")
}
var sc *canary.SnapshotStreamer
var scCtx context.Context
var scCancel context.CancelFunc
var sns *canary.SnapshotStreamer
var snsCtx context.Context
var snsCancel context.CancelFunc
if cmd.canaryConfig != "" {
cfg, err := canary.LoadConfig(cmd.canaryConfig)
if err != nil {
panic(err)
}
scCtx, scCancel = context.WithCancel(context.Background())
sc, err = canary.NewSnapshotStreamer(scCtx, cfg)
snsCtx, snsCancel = context.WithCancel(context.Background())
sns, err = canary.NewSnapshotStreamer(snsCtx, cfg)
if err != nil {
panic(err)
}
go sc.Run()
go sns.Run()
}
var loopers []*canary.PublicHttpLooper
for i := uint(0); i < cmd.loopers; i++ {
preDelay := cmd.maxPreDelay.Milliseconds()
preDelayDelta := cmd.maxPreDelay.Milliseconds() - cmd.minPreDelay.Milliseconds()
if preDelayDelta > 0 {
preDelay = int64(rand.Intn(int(preDelayDelta))) + cmd.minPreDelay.Milliseconds()
time.Sleep(time.Duration(preDelay) * time.Millisecond)
var preDelay int64
if cmd.preDelay > 0 {
preDelay = cmd.preDelay.Milliseconds()
} else {
preDelay = cmd.maxPreDelay.Milliseconds()
preDelayDelta := cmd.maxPreDelay.Milliseconds() - cmd.minPreDelay.Milliseconds()
if preDelayDelta > 0 {
preDelay = int64(rand.Intn(int(preDelayDelta))) + cmd.minPreDelay.Milliseconds()
}
}
time.Sleep(time.Duration(preDelay) * time.Millisecond)
looperOpts := &canary.LooperOptions{
Iterations: cmd.iterations,
StatusInterval: cmd.statusInterval,
Timeout: cmd.timeout,
MinPayload: cmd.minPayload,
MaxPayload: cmd.maxPayload,
MinDwell: cmd.minDwell,
MaxDwell: cmd.maxDwell,
MinPacing: cmd.minPacing,
MaxPacing: cmd.maxPacing,
BatchSize: cmd.batchSize,
MinBatchPacing: cmd.minBatchPacing,
MaxBatchPacing: cmd.maxBatchPacing,
TargetName: cmd.targetName,
}
if cmd.payload > 0 {
looperOpts.MinPayload = cmd.payload
looperOpts.MaxPayload = cmd.payload
} else {
looperOpts.MinPayload = cmd.minPayload
looperOpts.MaxPayload = cmd.maxPayload
}
if cmd.dwell > 0 {
looperOpts.MinDwell = cmd.dwell
looperOpts.MaxDwell = cmd.dwell
} else {
looperOpts.MinDwell = cmd.minDwell
looperOpts.MaxDwell = cmd.maxDwell
}
if cmd.pacing > 0 {
looperOpts.MinPacing = cmd.pacing
looperOpts.MaxPacing = cmd.pacing
} else {
looperOpts.MinPacing = cmd.minPacing
looperOpts.MaxPacing = cmd.maxPacing
}
if cmd.batchPacing > 0 {
looperOpts.MinBatchPacing = cmd.batchPacing
looperOpts.MaxBatchPacing = cmd.batchPacing
} else {
looperOpts.MinBatchPacing = cmd.minBatchPacing
looperOpts.MaxBatchPacing = cmd.maxBatchPacing
}
if sc != nil {
looperOpts.SnapshotQueue = sc.InputQueue
if sns != nil {
looperOpts.SnapshotQueue = sns.InputQueue
}
looper := canary.NewPublicHttpLooper(i, cmd.frontendSelection, looperOpts, root)
loopers = append(loopers, looper)
@ -163,9 +170,9 @@ func (cmd *testCanaryPublicProxy) run(_ *cobra.Command, _ []string) {
<-l.Done()
}
if sc != nil {
scCancel()
<-sc.Closed
if sns != nil {
snsCancel()
<-sns.Closed
}
results := make([]*canary.LooperResults, 0)