Merge pull request #951 from openziti/v1.next_canary

Canary Metrics; Enhancements
This commit is contained in:
Michael Quigley 2025-04-30 20:56:39 -04:00 committed by GitHub
commit 19c6edf6bc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 431 additions and 159 deletions

View File

@ -10,6 +10,10 @@ FEATURE: New InfluxDB metrics capture infrastructure for `zrok test canary` fram
FEATURE: New `zrok test canary enabler` to validate `enable`/`disable` operations and gather performance metrics around how those paths are operating (https://github.com/openziti/zrok/issues/771)
FEATURE: New `zrok test canary` infrastructure capable of supporting more complex testing scenarios; now capable of streaming canary metrics into an InfluxDB repository; new programming framework for developing additional types of streaming canary metrics (https://github.com/openziti/zrok/issues/948 https://github.com/openziti/zrok/issues/954)
FEATURE: All `zrok test canary` commands that have "min" and "max" values (`--min-pacing` and `--max-pacing` for example) now include a singular version of that flag for setting both "min" and "max" to the same value (`--pacing` for example). The singular version of the flag always overrides any `--min-*` or `--max-*` values that might be set
CHANGE: New _guard_ to prevent users from running potentially dangerous `zrok test canary` commands inadvertently without understanding what they do (https://github.com/openziti/zrok/issues/947)
## v1.0.2

View File

@ -63,31 +63,23 @@ func (e *Enabler) iterate() {
Description: "canary.Enabler",
})
if err == nil {
snapshot.Completed = time.Now()
snapshot.Ok = true
snapshot.Complete().Success()
e.Environments <- env
logrus.Infof("#%d enabled environment '%v'", e.Id, env.ZitiIdentity)
} else {
snapshot.Completed = time.Now()
snapshot.Ok = false
snapshot.Error = err
snapshot.Complete().Failure(err)
logrus.Errorf("error creating canary (#%d) environment: %v", e.Id, err)
}
if e.opt.SnapshotQueue != nil {
e.opt.SnapshotQueue <- snapshot
} else {
logrus.Info(snapshot)
}
snapshot.Send(e.opt.SnapshotQueue)
pacingMs := e.opt.MaxPacing.Milliseconds()
pacingDelta := e.opt.MaxPacing.Milliseconds() - e.opt.MinPacing.Milliseconds()
if pacingDelta > 0 {
pacingMs = (rand.Int63() % pacingDelta) + e.opt.MinPacing.Milliseconds()
time.Sleep(time.Duration(pacingMs) * time.Millisecond)
}
time.Sleep(time.Duration(pacingMs) * time.Millisecond)
}
}

View File

@ -16,6 +16,12 @@ type LooperOptions struct {
MaxDwell time.Duration
MinPacing time.Duration
MaxPacing time.Duration
BatchSize uint
MinBatchPacing time.Duration
MaxBatchPacing time.Duration
TargetName string
BindAddress string
SnapshotQueue chan *Snapshot
}
type LooperResults struct {

View File

@ -1,102 +0,0 @@
package canary
import (
"context"
"fmt"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/openziti/zrok/util"
"github.com/sirupsen/logrus"
"slices"
"sort"
"time"
)
type Snapshot struct {
Operation string
Instance uint
Iteration uint64
Started time.Time
Completed time.Time
Ok bool
Error error
Size uint64
}
func NewSnapshot(operation string, instance uint, iteration uint64) *Snapshot {
return &Snapshot{Operation: operation, Instance: instance, Iteration: iteration, Started: time.Now()}
}
func (s *Snapshot) String() string {
if s.Ok {
return fmt.Sprintf("[%v, %d, %d] (ok) %v, %v", s.Operation, s.Instance, s.Iteration, s.Completed.Sub(s.Started), util.BytesToSize(int64(s.Size)))
} else {
return fmt.Sprintf("[%v, %d, %d] (err) %v, %v, (%v)", s.Operation, s.Instance, s.Iteration, s.Completed.Sub(s.Started), util.BytesToSize(int64(s.Size)), s.Error)
}
}
type SnapshotCollector struct {
InputQueue chan *Snapshot
Closed chan struct{}
ctx context.Context
cfg *Config
snapshots map[string][]*Snapshot
}
func NewSnapshotCollector(ctx context.Context, cfg *Config) *SnapshotCollector {
return &SnapshotCollector{
InputQueue: make(chan *Snapshot),
Closed: make(chan struct{}),
ctx: ctx,
cfg: cfg,
snapshots: make(map[string][]*Snapshot),
}
}
func (sc *SnapshotCollector) Run() {
defer close(sc.Closed)
defer logrus.Info("stopping")
logrus.Info("starting")
for {
select {
case <-sc.ctx.Done():
return
case snapshot := <-sc.InputQueue:
var snapshots []*Snapshot
if v, ok := sc.snapshots[snapshot.Operation]; ok {
snapshots = v
}
i := sort.Search(len(snapshots), func(i int) bool { return snapshots[i].Completed.After(snapshot.Started) })
snapshots = slices.Insert(snapshots, i, snapshot)
sc.snapshots[snapshot.Operation] = snapshots
}
}
}
func (sc *SnapshotCollector) Store() error {
idb := influxdb2.NewClient(sc.cfg.Influx.Url, sc.cfg.Influx.Token)
writeApi := idb.WriteAPIBlocking(sc.cfg.Influx.Org, sc.cfg.Influx.Bucket)
for key, arr := range sc.snapshots {
for _, snapshot := range arr {
tags := map[string]string{
"instance": fmt.Sprintf("%d", snapshot.Instance),
"iteration": fmt.Sprintf("%d", snapshot.Iteration),
"ok": fmt.Sprintf("%t", snapshot.Ok),
}
if snapshot.Error != nil {
tags["error"] = snapshot.Error.Error()
}
pt := influxdb2.NewPoint(snapshot.Operation, tags, map[string]interface{}{
"duration": snapshot.Completed.Sub(snapshot.Started).Milliseconds(),
"size": snapshot.Size,
}, snapshot.Started)
if err := writeApi.WritePoint(context.Background(), pt); err != nil {
return err
}
}
logrus.Infof("wrote '%v' points for '%v'", len(arr), key)
}
idb.Close()
logrus.Infof("complete")
return nil
}

View File

@ -19,15 +19,17 @@ import (
)
type PrivateHttpLooper struct {
id uint
acc *sdk.Access
opt *LooperOptions
root env_core.Root
shr *sdk.Share
listener edge.Listener
abort bool
done chan struct{}
results *LooperResults
id uint
target string
bindAddress string
acc *sdk.Access
opt *LooperOptions
root env_core.Root
shr *sdk.Share
listener edge.Listener
abort bool
done chan struct{}
results *LooperResults
}
func NewPrivateHttpLooper(id uint, opt *LooperOptions, root env_core.Root) *PrivateHttpLooper {
@ -74,23 +76,41 @@ func (l *PrivateHttpLooper) Results() *LooperResults {
}
func (l *PrivateHttpLooper) startup() error {
target := "canary.PrivateHttpLooper"
if l.opt.TargetName != "" {
target = l.opt.TargetName
}
snapshotCreateShare := NewSnapshot("create-share", l.id, 0)
shr, err := sdk.CreateShare(l.root, &sdk.ShareRequest{
ShareMode: sdk.PrivateShareMode,
BackendMode: sdk.ProxyBackendMode,
Target: "canary.PrivateHttpLooper",
Target: target,
PermissionMode: sdk.ClosedPermissionMode,
})
snapshotCreateShare.Complete()
if err != nil {
snapshotCreateShare.Failure(err).Send(l.opt.SnapshotQueue)
return err
}
snapshotCreateShare.Success().Send(l.opt.SnapshotQueue)
l.shr = shr
bindAddress := ""
if l.opt.BindAddress != "" {
bindAddress = l.opt.BindAddress
}
snapshotCreateAccess := NewSnapshot("create-access", l.id, 0)
acc, err := sdk.CreateAccess(l.root, &sdk.AccessRequest{
ShareToken: shr.Token,
ShareToken: shr.Token,
BindAddress: bindAddress,
})
snapshotCreateAccess.Complete()
if err != nil {
snapshotCreateAccess.Failure(err).Send(l.opt.SnapshotQueue)
return err
}
snapshotCreateAccess.Success().Send(l.opt.SnapshotQueue)
l.acc = acc
logrus.Infof("#%d allocated share '%v', allocated frontend '%v'", l.id, shr.Token, acc.Token)
@ -116,9 +136,12 @@ func (l *PrivateHttpLooper) bind() error {
return errors.Wrapf(err, "#%d error creating ziti context", l.id)
}
snapshotListen := NewSnapshot("listen", l.id, 0)
if l.listener, err = zctx.ListenWithOptions(l.shr.Token, &options); err != nil {
snapshotListen.Complete().Failure(err).Send(l.opt.SnapshotQueue)
return errors.Wrapf(err, "#%d error binding listener", l.id)
}
snapshotListen.Complete().Success().Send(l.opt.SnapshotQueue)
go func() {
if err := http.Serve(l.listener, l); err != nil {
@ -156,7 +179,19 @@ func (l *PrivateHttpLooper) iterate() {
l.results.StartTime = time.Now()
defer func() { l.results.StopTime = time.Now() }()
for i := uint(0); i < l.opt.Iterations; i++ {
for i := uint(0); i < l.opt.Iterations && !l.abort; i++ {
if i > 0 && l.opt.BatchSize > 0 && i%l.opt.BatchSize == 0 {
batchPacingMs := l.opt.MaxBatchPacing.Milliseconds()
batchPacingDelta := l.opt.MaxBatchPacing.Milliseconds() - l.opt.MinBatchPacing.Milliseconds()
if batchPacingDelta > 0 {
batchPacingMs = (rand.Int63() % batchPacingDelta) + l.opt.MinBatchPacing.Milliseconds()
}
logrus.Debugf("sleeping %d ms for batch pacing", batchPacingMs)
time.Sleep(time.Duration(batchPacingMs) * time.Millisecond)
}
snapshot := NewSnapshot("private-proxy", l.id, uint64(i))
if i > 0 && i%l.opt.StatusInterval == 0 {
logrus.Infof("#%d: iteration %d", l.id, i)
}
@ -177,6 +212,7 @@ func (l *PrivateHttpLooper) iterate() {
outPayload := make([]byte, payloadSize)
cryptorand.Read(outPayload)
outBase64 := base64.StdEncoding.EncodeToString(outPayload)
snapshot.Size = uint64(len(outBase64))
if req, err := http.NewRequest("POST", "http://"+l.shr.Token, bytes.NewBufferString(outBase64)); err == nil {
client := &http.Client{Timeout: l.opt.Timeout, Transport: &http.Transport{DialContext: connDialer{conn}.Dial}}
@ -191,9 +227,13 @@ func (l *PrivateHttpLooper) iterate() {
if inBase64 != outBase64 {
logrus.Errorf("#%d: payload mismatch", l.id)
l.results.Mismatches++
snapshot.Complete().Failure(err)
} else {
l.results.Bytes += uint64(len(outBase64))
logrus.Debugf("#%d: payload match", l.id)
snapshot.Complete().Success()
}
} else {
logrus.Errorf("#%d: error: %v", l.id, err)
@ -204,6 +244,8 @@ func (l *PrivateHttpLooper) iterate() {
l.results.Errors++
}
snapshot.Send(l.opt.SnapshotQueue)
if err := conn.Close(); err != nil {
logrus.Errorf("#%d: error closing connection: %v", l.id, err)
}
@ -212,8 +254,8 @@ func (l *PrivateHttpLooper) iterate() {
pacingDelta := l.opt.MaxPacing.Milliseconds() - l.opt.MinPacing.Milliseconds()
if pacingDelta > 0 {
pacingMs = (rand.Int63() % pacingDelta) + l.opt.MinPacing.Milliseconds()
time.Sleep(time.Duration(pacingMs) * time.Millisecond)
}
time.Sleep(time.Duration(pacingMs) * time.Millisecond)
l.results.Loops++
}

View File

@ -73,16 +73,25 @@ func (l *PublicHttpLooper) Results() *LooperResults {
}
func (l *PublicHttpLooper) startup() error {
target := "canary.PublicHttpLooper"
if l.opt.TargetName != "" {
target = l.opt.TargetName
}
snapshotCreateShare := NewSnapshot("create-share", l.id, 0)
shr, err := sdk.CreateShare(l.root, &sdk.ShareRequest{
ShareMode: sdk.PublicShareMode,
BackendMode: sdk.ProxyBackendMode,
Target: "canary.PublicHttpLooper",
Target: target,
Frontends: []string{l.frontend},
PermissionMode: sdk.ClosedPermissionMode,
})
snapshotCreateShare.Complete()
if err != nil {
snapshotCreateShare.Failure(err).Send(l.opt.SnapshotQueue)
return err
}
snapshotCreateShare.Success().Send(l.opt.SnapshotQueue)
l.shr = shr
logrus.Infof("#%d allocated share '%v'", l.id, l.shr.Token)
@ -108,9 +117,12 @@ func (l *PublicHttpLooper) bind() error {
return errors.Wrapf(err, "#%d error creating ziti context", l.id)
}
snapshotListen := NewSnapshot("listen", l.id, 0)
if l.listener, err = zctx.ListenWithOptions(l.shr.Token, &options); err != nil {
snapshotListen.Complete().Failure(err).Send(l.opt.SnapshotQueue)
return errors.Wrapf(err, "#%d error binding listener", l.id)
}
snapshotListen.Complete().Success().Send(l.opt.SnapshotQueue)
go func() {
if err := http.Serve(l.listener, l); err != nil {
@ -141,6 +153,18 @@ func (l *PublicHttpLooper) iterate() {
defer func() { l.results.StopTime = time.Now() }()
for i := uint(0); i < l.opt.Iterations && !l.abort; i++ {
if i > 0 && l.opt.BatchSize > 0 && i%l.opt.BatchSize == 0 {
batchPacingMs := l.opt.MaxBatchPacing.Milliseconds()
batchPacingDelta := l.opt.MaxBatchPacing.Milliseconds() - l.opt.MinBatchPacing.Milliseconds()
if batchPacingDelta > 0 {
batchPacingMs = (rand.Int63() % batchPacingDelta) + l.opt.MinBatchPacing.Milliseconds()
}
logrus.Debugf("sleeping %d ms for batch pacing", batchPacingMs)
time.Sleep(time.Duration(batchPacingMs) * time.Millisecond)
}
snapshot := NewSnapshot("public-proxy", l.id, uint64(i))
if i > 0 && i%l.opt.StatusInterval == 0 {
logrus.Infof("#%d: iteration %d", l.id, i)
}
@ -153,6 +177,7 @@ func (l *PublicHttpLooper) iterate() {
outPayload := make([]byte, payloadSize)
cryptorand.Read(outPayload)
outBase64 := base64.StdEncoding.EncodeToString(outPayload)
snapshot.Size = uint64(len(outBase64))
if req, err := http.NewRequest("POST", l.shr.FrontendEndpoints[0], bytes.NewBufferString(outBase64)); err == nil {
client := &http.Client{Timeout: l.opt.Timeout}
@ -167,9 +192,13 @@ func (l *PublicHttpLooper) iterate() {
if inBase64 != outBase64 {
logrus.Errorf("#%d: payload mismatch", l.id)
l.results.Mismatches++
snapshot.Complete().Failure(err)
} else {
l.results.Bytes += uint64(len(outBase64))
logrus.Debugf("#%d: payload match", l.id)
snapshot.Complete().Success()
}
} else {
logrus.Errorf("#%d: error: %v", l.id, err)
@ -180,12 +209,14 @@ func (l *PublicHttpLooper) iterate() {
l.results.Errors++
}
snapshot.Send(l.opt.SnapshotQueue)
pacingMs := l.opt.MaxPacing.Milliseconds()
pacingDelta := l.opt.MaxPacing.Milliseconds() - l.opt.MinPacing.Milliseconds()
if pacingDelta > 0 {
pacingMs = (rand.Int63() % pacingDelta) + l.opt.MinPacing.Milliseconds()
time.Sleep(time.Duration(pacingMs) * time.Millisecond)
}
time.Sleep(time.Duration(pacingMs) * time.Millisecond)
l.results.Loops++
}

73
canary/snaphotStreamer.go Normal file
View File

@ -0,0 +1,73 @@
package canary
import (
"context"
"errors"
"fmt"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/sirupsen/logrus"
)
type SnapshotStreamer struct {
InputQueue chan *Snapshot
Closed chan struct{}
ctx context.Context
cfg *Config
ifxClient influxdb2.Client
ifxWriteApi api.WriteAPIBlocking
}
func NewSnapshotStreamer(ctx context.Context, cfg *Config) (*SnapshotStreamer, error) {
out := &SnapshotStreamer{
InputQueue: make(chan *Snapshot),
Closed: make(chan struct{}),
ctx: ctx,
cfg: cfg,
}
if cfg.Influx != nil {
out.ifxClient = influxdb2.NewClient(cfg.Influx.Url, cfg.Influx.Token)
out.ifxWriteApi = out.ifxClient.WriteAPIBlocking(cfg.Influx.Org, cfg.Influx.Bucket)
} else {
return nil, errors.New("missing influx configuration")
}
return out, nil
}
func (ss *SnapshotStreamer) Run() {
defer close(ss.Closed)
defer ss.ifxClient.Close()
defer logrus.Info("stoping")
logrus.Info("starting")
for {
select {
case <-ss.ctx.Done():
return
case snapshot := <-ss.InputQueue:
if err := ss.store(snapshot); err != nil {
logrus.Errorf("error storing snapshot: %v", err)
}
}
}
}
func (ss *SnapshotStreamer) store(snapshot *Snapshot) error {
tags := map[string]string{
"instance": fmt.Sprintf("%d", snapshot.Instance),
"iteration": fmt.Sprintf("%d", snapshot.Iteration),
"ok": fmt.Sprintf("%t", snapshot.Ok),
}
if snapshot.Error != nil {
tags["error"] = snapshot.Error.Error()
}
pt := influxdb2.NewPoint(snapshot.Operation, tags, map[string]interface{}{
"duration": snapshot.Completed.Sub(snapshot.Started).Milliseconds(),
"size": snapshot.Size,
}, snapshot.Started)
if err := ss.ifxWriteApi.WritePoint(context.Background(), pt); err != nil {
return err
}
return nil
}

52
canary/snapshot.go Normal file
View File

@ -0,0 +1,52 @@
package canary
import (
"fmt"
"github.com/openziti/zrok/util"
"time"
)
type Snapshot struct {
Operation string
Instance uint
Iteration uint64
Started time.Time
Completed time.Time
Ok bool
Error error
Size uint64
}
func NewSnapshot(operation string, instance uint, iteration uint64) *Snapshot {
return &Snapshot{Operation: operation, Instance: instance, Iteration: iteration, Started: time.Now()}
}
func (s *Snapshot) Complete() *Snapshot {
s.Completed = time.Now()
return s
}
func (s *Snapshot) Success() *Snapshot {
s.Ok = true
return s
}
func (s *Snapshot) Failure(err error) *Snapshot {
s.Ok = false
s.Error = err
return s
}
func (s *Snapshot) Send(queue chan *Snapshot) {
if queue != nil {
queue <- s
}
}
func (s *Snapshot) String() string {
if s.Ok {
return fmt.Sprintf("[%v, %d, %d] (ok) %v, %v", s.Operation, s.Instance, s.Iteration, s.Completed.Sub(s.Started), util.BytesToSize(int64(s.Size)))
} else {
return fmt.Sprintf("[%v, %d, %d] (err) %v, %v, (%v)", s.Operation, s.Instance, s.Iteration, s.Completed.Sub(s.Started), util.BytesToSize(int64(s.Size)), s.Error)
}
}

View File

@ -18,10 +18,13 @@ type testCanaryEnabler struct {
cmd *cobra.Command
enablers uint
iterations uint
preDelay time.Duration
minPreDelay time.Duration
maxPreDelay time.Duration
dwell time.Duration
minDwell time.Duration
maxDwell time.Duration
pacing time.Duration
minPacing time.Duration
maxPacing time.Duration
skipDisable bool
@ -38,8 +41,10 @@ func newTestCanaryEnabler() *testCanaryEnabler {
cmd.Run = command.run
cmd.Flags().UintVarP(&command.enablers, "enablers", "e", 1, "Number of concurrent enablers to start")
cmd.Flags().UintVarP(&command.iterations, "iterations", "i", 1, "Number of iterations")
cmd.Flags().DurationVar(&command.dwell, "dwell", 0, "Fixed dwell time")
cmd.Flags().DurationVar(&command.minDwell, "min-dwell", 0, "Minimum dwell time")
cmd.Flags().DurationVar(&command.maxDwell, "max-dwell", 0, "Maximum dwell time")
cmd.Flags().DurationVar(&command.pacing, "pacing", 0, "Fixed pacing time")
cmd.Flags().DurationVar(&command.minPacing, "min-pacing", 0, "Minimum pacing time")
cmd.Flags().DurationVar(&command.maxPacing, "max-pacing", 0, "Maximum pacing time")
cmd.Flags().BoolVar(&command.skipDisable, "skip-disable", false, "Disable (clean up) enabled environments")
@ -57,17 +62,20 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) {
panic(err)
}
var sc *canary.SnapshotCollector
var scCtx context.Context
var scCancel context.CancelFunc
var sns *canary.SnapshotStreamer
var snsCtx context.Context
var snsCancel context.CancelFunc
if cmd.canaryConfig != "" {
cfg, err := canary.LoadConfig(cmd.canaryConfig)
if err != nil {
panic(err)
}
scCtx, scCancel = context.WithCancel(context.Background())
sc = canary.NewSnapshotCollector(scCtx, cfg)
go sc.Run()
snsCtx, snsCancel = context.WithCancel(context.Background())
sns, err = canary.NewSnapshotStreamer(snsCtx, cfg)
if err != nil {
panic(err)
}
go sns.Run()
}
var enablers []*canary.Enabler
@ -76,8 +84,8 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) {
preDelayDelta := cmd.maxPreDelay.Milliseconds() - cmd.minPreDelay.Milliseconds()
if preDelayDelta > 0 {
preDelay = int64(rand.Intn(int(preDelayDelta))) + cmd.minPreDelay.Milliseconds()
time.Sleep(time.Duration(preDelay) * time.Millisecond)
}
time.Sleep(time.Duration(preDelay) * time.Millisecond)
enablerOpts := &canary.EnablerOptions{
Iterations: cmd.iterations,
@ -86,8 +94,16 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) {
MinPacing: cmd.minPacing,
MaxPacing: cmd.maxPacing,
}
if sc != nil {
enablerOpts.SnapshotQueue = sc.InputQueue
if cmd.pacing > 0 {
enablerOpts.MinDwell = cmd.dwell
enablerOpts.MaxDwell = cmd.dwell
}
if cmd.pacing > 0 {
enablerOpts.MinPacing = cmd.pacing
enablerOpts.MaxPacing = cmd.pacing
}
if sns != nil {
enablerOpts.SnapshotQueue = sns.InputQueue
}
enabler := canary.NewEnabler(i, enablerOpts, root)
enablers = append(enablers, enabler)
@ -100,8 +116,8 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) {
disablerOpts := &canary.DisablerOptions{
Environments: enablers[i].Environments,
}
if sc != nil {
disablerOpts.SnapshotQueue = sc.InputQueue
if sns != nil {
disablerOpts.SnapshotQueue = sns.InputQueue
}
disabler := canary.NewDisabler(i, disablerOpts, root)
disablers = append(disablers, disabler)
@ -131,13 +147,10 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) {
<-enabler.Done
}
if sc != nil {
scCancel()
<-sc.Closed
if err := sc.Store(); err != nil {
panic(err)
}
if sns != nil {
snsCancel()
<-sns.Closed
}
logrus.Infof("complete")
logrus.Info("complete")
}

View File

@ -1,6 +1,7 @@
package main
import (
"context"
"github.com/openziti/zrok/canary"
"github.com/openziti/zrok/environment"
"github.com/sirupsen/logrus"
@ -22,14 +23,25 @@ type testCanaryPrivateProxy struct {
iterations uint
statusInterval uint
timeout time.Duration
payload uint64
minPayload uint64
maxPayload uint64
preDelay time.Duration
minPreDelay time.Duration
maxPreDelay time.Duration
dwell time.Duration
minDwell time.Duration
maxDwell time.Duration
pacing time.Duration
minPacing time.Duration
maxPacing time.Duration
batchSize uint
batchPacing time.Duration
minBatchPacing time.Duration
maxBatchPacing time.Duration
targetName string
bindAddress string
canaryConfig string
}
func newTestCanaryPrivateProxy() *testCanaryPrivateProxy {
@ -44,18 +56,33 @@ func newTestCanaryPrivateProxy() *testCanaryPrivateProxy {
cmd.Flags().UintVarP(&command.iterations, "iterations", "i", 1, "Number of iterations")
cmd.Flags().UintVarP(&command.statusInterval, "status-interval", "S", 100, "Show status every # iterations")
cmd.Flags().DurationVarP(&command.timeout, "timeout", "T", 30*time.Second, "Timeout when sending HTTP requests")
cmd.Flags().Uint64Var(&command.payload, "payload", 0, "Fixed payload size in bytes")
cmd.Flags().Uint64Var(&command.minPayload, "min-payload", 64, "Minimum payload size in bytes")
cmd.Flags().Uint64Var(&command.maxPayload, "max-payload", 10240, "Maximum payload size in bytes")
cmd.Flags().DurationVar(&command.preDelay, "pre-delay", 0, "Fixed pre-delay before creating the next looper")
cmd.Flags().DurationVar(&command.minPreDelay, "min-pre-delay", 0, "Minimum pre-delay before creating the next looper")
cmd.Flags().DurationVar(&command.maxPreDelay, "max-pre-delay", 0, "Maximum pre-delay before creating the next looper")
cmd.Flags().DurationVar(&command.dwell, "dwell", 0, "Fixed dwell time")
cmd.Flags().DurationVar(&command.minDwell, "min-dwell", 1*time.Second, "Minimum dwell time")
cmd.Flags().DurationVar(&command.maxDwell, "max-dwell", 1*time.Second, "Maximum dwell time")
cmd.Flags().DurationVar(&command.pacing, "pacing", 0, "Fixed pacing time")
cmd.Flags().DurationVar(&command.minPacing, "min-pacing", 0, "Minimum pacing time")
cmd.Flags().DurationVar(&command.maxPacing, "max-pacing", 0, "Maximum pacing time")
cmd.Flags().UintVar(&command.batchSize, "batch-size", 0, "Iterate in batches of this size")
cmd.Flags().DurationVar(&command.batchPacing, "batch-pacing", 0, "Fixed batch pacing time")
cmd.Flags().DurationVar(&command.minBatchPacing, "min-batch-pacing", 0, "Minimum batch pacing time")
cmd.Flags().DurationVar(&command.maxBatchPacing, "max-batch-pacing", 0, "Maximum batch pacing time")
cmd.Flags().StringVar(&command.targetName, "target-name", "", "Metadata describing the virtual target")
cmd.Flags().StringVar(&command.bindAddress, "bind-address", "", "Metadata describing the virtual bind address")
cmd.Flags().StringVar(&command.canaryConfig, "canary-config", "", "Path to canary configuration file")
return command
}
func (cmd *testCanaryPrivateProxy) run(_ *cobra.Command, _ []string) {
if err := canary.AcknowledgeDangerousCanary(); err != nil {
logrus.Fatal(err)
}
root, err := environment.LoadRoot()
if err != nil {
panic(err)
@ -65,14 +92,35 @@ func (cmd *testCanaryPrivateProxy) run(_ *cobra.Command, _ []string) {
logrus.Fatal("unable to load environment; did you 'zrok enable'?")
}
var sns *canary.SnapshotStreamer
var snsCtx context.Context
var snsCancel context.CancelFunc
if cmd.canaryConfig != "" {
cfg, err := canary.LoadConfig(cmd.canaryConfig)
if err != nil {
panic(err)
}
snsCtx, snsCancel = context.WithCancel(context.Background())
sns, err = canary.NewSnapshotStreamer(snsCtx, cfg)
if err != nil {
panic(err)
}
go sns.Run()
}
var loopers []*canary.PrivateHttpLooper
for i := uint(0); i < cmd.loopers; i++ {
preDelay := cmd.maxPreDelay.Milliseconds()
preDelayDelta := cmd.maxPreDelay.Milliseconds() - cmd.minPreDelay.Milliseconds()
if preDelayDelta > 0 {
preDelay = int64(rand.Intn(int(preDelayDelta))) + cmd.minPreDelay.Milliseconds()
time.Sleep(time.Duration(preDelay) * time.Millisecond)
var preDelay int64
if cmd.preDelay > 0 {
preDelay = cmd.preDelay.Milliseconds()
} else {
preDelay = cmd.maxPreDelay.Milliseconds()
preDelayDelta := cmd.maxPreDelay.Milliseconds() - cmd.minPreDelay.Milliseconds()
if preDelayDelta > 0 {
preDelay = int64(rand.Intn(int(preDelayDelta))) + cmd.minPreDelay.Milliseconds()
}
}
time.Sleep(time.Duration(preDelay) * time.Millisecond)
looperOpts := &canary.LooperOptions{
Iterations: cmd.iterations,
@ -84,6 +132,30 @@ func (cmd *testCanaryPrivateProxy) run(_ *cobra.Command, _ []string) {
MaxDwell: cmd.maxDwell,
MinPacing: cmd.minPacing,
MaxPacing: cmd.maxPacing,
BatchSize: cmd.batchSize,
MinBatchPacing: cmd.minBatchPacing,
MaxBatchPacing: cmd.maxBatchPacing,
TargetName: cmd.targetName,
BindAddress: cmd.bindAddress,
}
if cmd.payload > 0 {
looperOpts.MinPayload = cmd.payload
looperOpts.MaxPayload = cmd.payload
}
if cmd.dwell > 0 {
looperOpts.MinDwell = cmd.dwell
looperOpts.MaxDwell = cmd.dwell
}
if cmd.pacing > 0 {
looperOpts.MinPacing = cmd.pacing
looperOpts.MaxPacing = cmd.pacing
}
if cmd.batchPacing > 0 {
looperOpts.MinBatchPacing = cmd.batchPacing
looperOpts.MaxBatchPacing = cmd.batchPacing
}
if sns != nil {
looperOpts.SnapshotQueue = sns.InputQueue
}
looper := canary.NewPrivateHttpLooper(i, looperOpts, root)
loopers = append(loopers, looper)
@ -103,6 +175,11 @@ func (cmd *testCanaryPrivateProxy) run(_ *cobra.Command, _ []string) {
<-l.Done()
}
if sns != nil {
snsCancel()
<-sns.Closed
}
results := make([]*canary.LooperResults, 0)
for i := uint(0); i < cmd.loopers; i++ {
results = append(results, loopers[i].Results())

View File

@ -1,6 +1,7 @@
package main
import (
"context"
"github.com/openziti/zrok/canary"
"github.com/openziti/zrok/environment"
"github.com/sirupsen/logrus"
@ -22,15 +23,25 @@ type testCanaryPublicProxy struct {
iterations uint
statusInterval uint
timeout time.Duration
payload uint64
minPayload uint64
maxPayload uint64
preDelay time.Duration
minPreDelay time.Duration
maxPreDelay time.Duration
dwell time.Duration
minDwell time.Duration
maxDwell time.Duration
pacing time.Duration
minPacing time.Duration
maxPacing time.Duration
batchSize uint
batchPacing time.Duration
minBatchPacing time.Duration
maxBatchPacing time.Duration
targetName string
frontendSelection string
canaryConfig string
}
func newTestCanaryPublicProxy() *testCanaryPublicProxy {
@ -45,19 +56,33 @@ func newTestCanaryPublicProxy() *testCanaryPublicProxy {
cmd.Flags().UintVarP(&command.iterations, "iterations", "i", 1, "Number of iterations")
cmd.Flags().UintVarP(&command.statusInterval, "status-interval", "S", 100, "Show status every # iterations")
cmd.Flags().DurationVarP(&command.timeout, "timeout", "T", 30*time.Second, "Timeout when sending HTTP requests")
cmd.Flags().Uint64Var(&command.payload, "payload", 0, "Fixed payload size")
cmd.Flags().Uint64Var(&command.minPayload, "min-payload", 64, "Minimum payload size in bytes")
cmd.Flags().Uint64Var(&command.maxPayload, "max-payload", 10240, "Maximum payload size in bytes")
cmd.Flags().DurationVar(&command.preDelay, "pre-delay", 0, "Fixed pre-delay before creating the next looper")
cmd.Flags().DurationVar(&command.minPreDelay, "min-pre-delay", 0, "Minimum pre-delay before creating the next looper")
cmd.Flags().DurationVar(&command.maxPreDelay, "max-pre-delay", 0, "Maximum pre-delay before creating the next looper")
cmd.Flags().DurationVar(&command.dwell, "dwell", 1*time.Second, "Fixed dwell time")
cmd.Flags().DurationVar(&command.minDwell, "min-dwell", 1*time.Second, "Minimum dwell time")
cmd.Flags().DurationVar(&command.maxDwell, "max-dwell", 1*time.Second, "Maximum dwell time")
cmd.Flags().DurationVar(&command.pacing, "pacing", 0, "Fixed pacing time")
cmd.Flags().DurationVar(&command.minPacing, "min-pacing", 0, "Minimum pacing time")
cmd.Flags().DurationVar(&command.maxPacing, "max-pacing", 0, "Maximum pacing time")
cmd.Flags().UintVar(&command.batchSize, "batch-size", 0, "Iterate in batches of this size")
cmd.Flags().DurationVar(&command.batchPacing, "batch-pacing", 0, "Fixed batch pacing time")
cmd.Flags().DurationVar(&command.minBatchPacing, "min-batch-pacing", 0, "Minimum batch pacing time")
cmd.Flags().DurationVar(&command.maxBatchPacing, "max-batch-pacing", 0, "Maximum batch pacing time")
cmd.Flags().StringVar(&command.targetName, "target-name", "", "Metadata describing the virtual target")
cmd.Flags().StringVar(&command.frontendSelection, "frontend-selection", "public", "Select frontend selection")
cmd.Flags().StringVar(&command.canaryConfig, "canary-config", "", "Path to canary configuration file")
return command
}
func (cmd *testCanaryPublicProxy) run(_ *cobra.Command, _ []string) {
if err := canary.AcknowledgeDangerousCanary(); err != nil {
logrus.Fatal(err)
}
root, err := environment.LoadRoot()
if err != nil {
panic(err)
@ -67,14 +92,35 @@ func (cmd *testCanaryPublicProxy) run(_ *cobra.Command, _ []string) {
logrus.Fatal("unable to load environment; did you 'zrok enable'?")
}
var sns *canary.SnapshotStreamer
var snsCtx context.Context
var snsCancel context.CancelFunc
if cmd.canaryConfig != "" {
cfg, err := canary.LoadConfig(cmd.canaryConfig)
if err != nil {
panic(err)
}
snsCtx, snsCancel = context.WithCancel(context.Background())
sns, err = canary.NewSnapshotStreamer(snsCtx, cfg)
if err != nil {
panic(err)
}
go sns.Run()
}
var loopers []*canary.PublicHttpLooper
for i := uint(0); i < cmd.loopers; i++ {
preDelay := cmd.maxPreDelay.Milliseconds()
preDelayDelta := cmd.maxPreDelay.Milliseconds() - cmd.minPreDelay.Milliseconds()
if preDelayDelta > 0 {
preDelay = int64(rand.Intn(int(preDelayDelta))) + cmd.minPreDelay.Milliseconds()
time.Sleep(time.Duration(preDelay) * time.Millisecond)
var preDelay int64
if cmd.preDelay > 0 {
preDelay = cmd.preDelay.Milliseconds()
} else {
preDelay = cmd.maxPreDelay.Milliseconds()
preDelayDelta := cmd.maxPreDelay.Milliseconds() - cmd.minPreDelay.Milliseconds()
if preDelayDelta > 0 {
preDelay = int64(rand.Intn(int(preDelayDelta))) + cmd.minPreDelay.Milliseconds()
}
}
time.Sleep(time.Duration(preDelay) * time.Millisecond)
looperOpts := &canary.LooperOptions{
Iterations: cmd.iterations,
@ -86,6 +132,29 @@ func (cmd *testCanaryPublicProxy) run(_ *cobra.Command, _ []string) {
MaxDwell: cmd.maxDwell,
MinPacing: cmd.minPacing,
MaxPacing: cmd.maxPacing,
BatchSize: cmd.batchSize,
MinBatchPacing: cmd.minBatchPacing,
MaxBatchPacing: cmd.maxBatchPacing,
TargetName: cmd.targetName,
}
if cmd.payload > 0 {
looperOpts.MinPayload = cmd.payload
looperOpts.MaxPayload = cmd.payload
}
if cmd.dwell > 0 {
looperOpts.MinDwell = cmd.dwell
looperOpts.MaxDwell = cmd.dwell
}
if cmd.pacing > 0 {
looperOpts.MinPacing = cmd.pacing
looperOpts.MaxPacing = cmd.pacing
}
if cmd.batchPacing > 0 {
looperOpts.MinBatchPacing = cmd.batchPacing
looperOpts.MaxBatchPacing = cmd.batchPacing
}
if sns != nil {
looperOpts.SnapshotQueue = sns.InputQueue
}
looper := canary.NewPublicHttpLooper(i, cmd.frontendSelection, looperOpts, root)
loopers = append(loopers, looper)
@ -105,6 +174,11 @@ func (cmd *testCanaryPublicProxy) run(_ *cobra.Command, _ []string) {
<-l.Done()
}
if sns != nil {
snsCancel()
<-sns.Closed
}
results := make([]*canary.LooperResults, 0)
for i := uint(0); i < cmd.loopers; i++ {
results = append(results, loopers[i].Results())

View File

@ -81,7 +81,11 @@ func (h *accessHandler) Handle(params share.AccessParams, principal *rest_model_
return share.NewAccessInternalServerError()
}
if _, err := str.CreateFrontend(envId, &store.Frontend{PrivateShareId: &shr.Id, Token: feToken, ZId: envZId, PermissionMode: store.ClosedPermissionMode}, trx); err != nil {
fe := &store.Frontend{PrivateShareId: &shr.Id, Token: feToken, ZId: envZId, PermissionMode: store.ClosedPermissionMode}
if params.Body.BindAddress != "" {
fe.BindAddress = &params.Body.BindAddress
}
if _, err := str.CreateFrontend(envId, fe, trx); err != nil {
logrus.Errorf("error creating frontend record for user '%v': %v", principal.Email, err)
return share.NewAccessInternalServerError()
}

View File

@ -5,6 +5,7 @@ import (
"github.com/openziti/zrok/environment/env_core"
"github.com/openziti/zrok/rest_client_zrok/share"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
func CreateAccess(root env_core.Root, request *AccessRequest) (*Access, error) {
@ -15,6 +16,10 @@ func CreateAccess(root env_core.Root, request *AccessRequest) (*Access, error) {
out := share.NewAccessParams()
out.Body.ShareToken = request.ShareToken
out.Body.EnvZID = root.Environment().ZitiIdentity
if request.BindAddress != "" {
out.Body.BindAddress = request.BindAddress
logrus.Infof("requesting bind address '%v'", out.Body.BindAddress)
}
zrok, err := root.Client()
if err != nil {

View File

@ -62,7 +62,8 @@ type Share struct {
}
type AccessRequest struct {
ShareToken string
ShareToken string
BindAddress string
}
type Access struct {