diff --git a/CHANGELOG.md b/CHANGELOG.md index 34cca76b..e6cf1400 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ FEATURE: New InfluxDB metrics capture infrastructure for `zrok test canary` fram FEATURE: New `zrok test canary enabler` to validate `enable`/`disable` operations and gather performance metrics around how those paths are operating (https://github.com/openziti/zrok/issues/771) +FEATURE: New `zrok test canary` infrastructure capable of supporting more complex testing scenarios; now capable of streaming canary metrics into an InfluxDB repository; new programming framework for developing additional types of streaming canary metrics (https://github.com/openziti/zrok/issues/948 https://github.com/openziti/zrok/issues/954) + +FEATURE: All `zrok test canary` commands that have "min" and "max" values (`--min-pacing` and `--max-pacing` for example) now include a singular version of that flag for setting both "min" and "max" to the same value (`--pacing` for example). The singular version of the flag always overrides any `--min-*` or `--max-*` values that might be set + CHANGE: New _guard_ to prevent users from running potentially dangerous `zrok test canary` commands inadvertently without understanding what they do (https://github.com/openziti/zrok/issues/947) ## v1.0.2 diff --git a/canary/enabler.go b/canary/enabler.go index 1841570c..58d72f60 100644 --- a/canary/enabler.go +++ b/canary/enabler.go @@ -63,31 +63,23 @@ func (e *Enabler) iterate() { Description: "canary.Enabler", }) if err == nil { - snapshot.Completed = time.Now() - snapshot.Ok = true - + snapshot.Complete().Success() e.Environments <- env logrus.Infof("#%d enabled environment '%v'", e.Id, env.ZitiIdentity) } else { - snapshot.Completed = time.Now() - snapshot.Ok = false - snapshot.Error = err + snapshot.Complete().Failure(err) logrus.Errorf("error creating canary (#%d) environment: %v", e.Id, err) } - if e.opt.SnapshotQueue != nil { - e.opt.SnapshotQueue <- snapshot - } else { - logrus.Info(snapshot) - } + snapshot.Send(e.opt.SnapshotQueue) pacingMs := e.opt.MaxPacing.Milliseconds() pacingDelta := e.opt.MaxPacing.Milliseconds() - e.opt.MinPacing.Milliseconds() if pacingDelta > 0 { pacingMs = (rand.Int63() % pacingDelta) + e.opt.MinPacing.Milliseconds() - time.Sleep(time.Duration(pacingMs) * time.Millisecond) } + time.Sleep(time.Duration(pacingMs) * time.Millisecond) } } diff --git a/canary/looper.go b/canary/looper.go index 81d9ad63..013dbf20 100644 --- a/canary/looper.go +++ b/canary/looper.go @@ -16,6 +16,12 @@ type LooperOptions struct { MaxDwell time.Duration MinPacing time.Duration MaxPacing time.Duration + BatchSize uint + MinBatchPacing time.Duration + MaxBatchPacing time.Duration + TargetName string + BindAddress string + SnapshotQueue chan *Snapshot } type LooperResults struct { diff --git a/canary/metrics.go b/canary/metrics.go deleted file mode 100644 index b63fa197..00000000 --- a/canary/metrics.go +++ /dev/null @@ -1,102 +0,0 @@ -package canary - -import ( - "context" - "fmt" - influxdb2 "github.com/influxdata/influxdb-client-go/v2" - "github.com/openziti/zrok/util" - "github.com/sirupsen/logrus" - "slices" - "sort" - "time" -) - -type Snapshot struct { - Operation string - Instance uint - Iteration uint64 - Started time.Time - Completed time.Time - Ok bool - Error error - Size uint64 -} - -func NewSnapshot(operation string, instance uint, iteration uint64) *Snapshot { - return &Snapshot{Operation: operation, Instance: instance, Iteration: iteration, Started: time.Now()} -} - -func (s *Snapshot) String() string { - if s.Ok { - return fmt.Sprintf("[%v, %d, %d] (ok) %v, %v", s.Operation, s.Instance, s.Iteration, s.Completed.Sub(s.Started), util.BytesToSize(int64(s.Size))) - } else { - return fmt.Sprintf("[%v, %d, %d] (err) %v, %v, (%v)", s.Operation, s.Instance, s.Iteration, s.Completed.Sub(s.Started), util.BytesToSize(int64(s.Size)), s.Error) - } -} - -type SnapshotCollector struct { - InputQueue chan *Snapshot - Closed chan struct{} - ctx context.Context - cfg *Config - snapshots map[string][]*Snapshot -} - -func NewSnapshotCollector(ctx context.Context, cfg *Config) *SnapshotCollector { - return &SnapshotCollector{ - InputQueue: make(chan *Snapshot), - Closed: make(chan struct{}), - ctx: ctx, - cfg: cfg, - snapshots: make(map[string][]*Snapshot), - } -} - -func (sc *SnapshotCollector) Run() { - defer close(sc.Closed) - defer logrus.Info("stopping") - logrus.Info("starting") - for { - select { - case <-sc.ctx.Done(): - return - - case snapshot := <-sc.InputQueue: - var snapshots []*Snapshot - if v, ok := sc.snapshots[snapshot.Operation]; ok { - snapshots = v - } - i := sort.Search(len(snapshots), func(i int) bool { return snapshots[i].Completed.After(snapshot.Started) }) - snapshots = slices.Insert(snapshots, i, snapshot) - sc.snapshots[snapshot.Operation] = snapshots - } - } -} - -func (sc *SnapshotCollector) Store() error { - idb := influxdb2.NewClient(sc.cfg.Influx.Url, sc.cfg.Influx.Token) - writeApi := idb.WriteAPIBlocking(sc.cfg.Influx.Org, sc.cfg.Influx.Bucket) - for key, arr := range sc.snapshots { - for _, snapshot := range arr { - tags := map[string]string{ - "instance": fmt.Sprintf("%d", snapshot.Instance), - "iteration": fmt.Sprintf("%d", snapshot.Iteration), - "ok": fmt.Sprintf("%t", snapshot.Ok), - } - if snapshot.Error != nil { - tags["error"] = snapshot.Error.Error() - } - pt := influxdb2.NewPoint(snapshot.Operation, tags, map[string]interface{}{ - "duration": snapshot.Completed.Sub(snapshot.Started).Milliseconds(), - "size": snapshot.Size, - }, snapshot.Started) - if err := writeApi.WritePoint(context.Background(), pt); err != nil { - return err - } - } - logrus.Infof("wrote '%v' points for '%v'", len(arr), key) - } - idb.Close() - logrus.Infof("complete") - return nil -} diff --git a/canary/privateHttpLooper.go b/canary/privateHttpLooper.go index b3f5d715..5df63962 100644 --- a/canary/privateHttpLooper.go +++ b/canary/privateHttpLooper.go @@ -19,15 +19,17 @@ import ( ) type PrivateHttpLooper struct { - id uint - acc *sdk.Access - opt *LooperOptions - root env_core.Root - shr *sdk.Share - listener edge.Listener - abort bool - done chan struct{} - results *LooperResults + id uint + target string + bindAddress string + acc *sdk.Access + opt *LooperOptions + root env_core.Root + shr *sdk.Share + listener edge.Listener + abort bool + done chan struct{} + results *LooperResults } func NewPrivateHttpLooper(id uint, opt *LooperOptions, root env_core.Root) *PrivateHttpLooper { @@ -74,23 +76,41 @@ func (l *PrivateHttpLooper) Results() *LooperResults { } func (l *PrivateHttpLooper) startup() error { + target := "canary.PrivateHttpLooper" + if l.opt.TargetName != "" { + target = l.opt.TargetName + } + + snapshotCreateShare := NewSnapshot("create-share", l.id, 0) shr, err := sdk.CreateShare(l.root, &sdk.ShareRequest{ ShareMode: sdk.PrivateShareMode, BackendMode: sdk.ProxyBackendMode, - Target: "canary.PrivateHttpLooper", + Target: target, PermissionMode: sdk.ClosedPermissionMode, }) + snapshotCreateShare.Complete() if err != nil { + snapshotCreateShare.Failure(err).Send(l.opt.SnapshotQueue) return err } + snapshotCreateShare.Success().Send(l.opt.SnapshotQueue) l.shr = shr + bindAddress := "" + if l.opt.BindAddress != "" { + bindAddress = l.opt.BindAddress + } + snapshotCreateAccess := NewSnapshot("create-access", l.id, 0) acc, err := sdk.CreateAccess(l.root, &sdk.AccessRequest{ - ShareToken: shr.Token, + ShareToken: shr.Token, + BindAddress: bindAddress, }) + snapshotCreateAccess.Complete() if err != nil { + snapshotCreateAccess.Failure(err).Send(l.opt.SnapshotQueue) return err } + snapshotCreateAccess.Success().Send(l.opt.SnapshotQueue) l.acc = acc logrus.Infof("#%d allocated share '%v', allocated frontend '%v'", l.id, shr.Token, acc.Token) @@ -116,9 +136,12 @@ func (l *PrivateHttpLooper) bind() error { return errors.Wrapf(err, "#%d error creating ziti context", l.id) } + snapshotListen := NewSnapshot("listen", l.id, 0) if l.listener, err = zctx.ListenWithOptions(l.shr.Token, &options); err != nil { + snapshotListen.Complete().Failure(err).Send(l.opt.SnapshotQueue) return errors.Wrapf(err, "#%d error binding listener", l.id) } + snapshotListen.Complete().Success().Send(l.opt.SnapshotQueue) go func() { if err := http.Serve(l.listener, l); err != nil { @@ -156,7 +179,19 @@ func (l *PrivateHttpLooper) iterate() { l.results.StartTime = time.Now() defer func() { l.results.StopTime = time.Now() }() - for i := uint(0); i < l.opt.Iterations; i++ { + for i := uint(0); i < l.opt.Iterations && !l.abort; i++ { + if i > 0 && l.opt.BatchSize > 0 && i%l.opt.BatchSize == 0 { + batchPacingMs := l.opt.MaxBatchPacing.Milliseconds() + batchPacingDelta := l.opt.MaxBatchPacing.Milliseconds() - l.opt.MinBatchPacing.Milliseconds() + if batchPacingDelta > 0 { + batchPacingMs = (rand.Int63() % batchPacingDelta) + l.opt.MinBatchPacing.Milliseconds() + } + logrus.Debugf("sleeping %d ms for batch pacing", batchPacingMs) + time.Sleep(time.Duration(batchPacingMs) * time.Millisecond) + } + + snapshot := NewSnapshot("private-proxy", l.id, uint64(i)) + if i > 0 && i%l.opt.StatusInterval == 0 { logrus.Infof("#%d: iteration %d", l.id, i) } @@ -177,6 +212,7 @@ func (l *PrivateHttpLooper) iterate() { outPayload := make([]byte, payloadSize) cryptorand.Read(outPayload) outBase64 := base64.StdEncoding.EncodeToString(outPayload) + snapshot.Size = uint64(len(outBase64)) if req, err := http.NewRequest("POST", "http://"+l.shr.Token, bytes.NewBufferString(outBase64)); err == nil { client := &http.Client{Timeout: l.opt.Timeout, Transport: &http.Transport{DialContext: connDialer{conn}.Dial}} @@ -191,9 +227,13 @@ func (l *PrivateHttpLooper) iterate() { if inBase64 != outBase64 { logrus.Errorf("#%d: payload mismatch", l.id) l.results.Mismatches++ + + snapshot.Complete().Failure(err) } else { l.results.Bytes += uint64(len(outBase64)) logrus.Debugf("#%d: payload match", l.id) + + snapshot.Complete().Success() } } else { logrus.Errorf("#%d: error: %v", l.id, err) @@ -204,6 +244,8 @@ func (l *PrivateHttpLooper) iterate() { l.results.Errors++ } + snapshot.Send(l.opt.SnapshotQueue) + if err := conn.Close(); err != nil { logrus.Errorf("#%d: error closing connection: %v", l.id, err) } @@ -212,8 +254,8 @@ func (l *PrivateHttpLooper) iterate() { pacingDelta := l.opt.MaxPacing.Milliseconds() - l.opt.MinPacing.Milliseconds() if pacingDelta > 0 { pacingMs = (rand.Int63() % pacingDelta) + l.opt.MinPacing.Milliseconds() - time.Sleep(time.Duration(pacingMs) * time.Millisecond) } + time.Sleep(time.Duration(pacingMs) * time.Millisecond) l.results.Loops++ } diff --git a/canary/publicHttpLooper.go b/canary/publicHttpLooper.go index 2a212341..b2756a68 100644 --- a/canary/publicHttpLooper.go +++ b/canary/publicHttpLooper.go @@ -73,16 +73,25 @@ func (l *PublicHttpLooper) Results() *LooperResults { } func (l *PublicHttpLooper) startup() error { + target := "canary.PublicHttpLooper" + if l.opt.TargetName != "" { + target = l.opt.TargetName + } + + snapshotCreateShare := NewSnapshot("create-share", l.id, 0) shr, err := sdk.CreateShare(l.root, &sdk.ShareRequest{ ShareMode: sdk.PublicShareMode, BackendMode: sdk.ProxyBackendMode, - Target: "canary.PublicHttpLooper", + Target: target, Frontends: []string{l.frontend}, PermissionMode: sdk.ClosedPermissionMode, }) + snapshotCreateShare.Complete() if err != nil { + snapshotCreateShare.Failure(err).Send(l.opt.SnapshotQueue) return err } + snapshotCreateShare.Success().Send(l.opt.SnapshotQueue) l.shr = shr logrus.Infof("#%d allocated share '%v'", l.id, l.shr.Token) @@ -108,9 +117,12 @@ func (l *PublicHttpLooper) bind() error { return errors.Wrapf(err, "#%d error creating ziti context", l.id) } + snapshotListen := NewSnapshot("listen", l.id, 0) if l.listener, err = zctx.ListenWithOptions(l.shr.Token, &options); err != nil { + snapshotListen.Complete().Failure(err).Send(l.opt.SnapshotQueue) return errors.Wrapf(err, "#%d error binding listener", l.id) } + snapshotListen.Complete().Success().Send(l.opt.SnapshotQueue) go func() { if err := http.Serve(l.listener, l); err != nil { @@ -141,6 +153,18 @@ func (l *PublicHttpLooper) iterate() { defer func() { l.results.StopTime = time.Now() }() for i := uint(0); i < l.opt.Iterations && !l.abort; i++ { + if i > 0 && l.opt.BatchSize > 0 && i%l.opt.BatchSize == 0 { + batchPacingMs := l.opt.MaxBatchPacing.Milliseconds() + batchPacingDelta := l.opt.MaxBatchPacing.Milliseconds() - l.opt.MinBatchPacing.Milliseconds() + if batchPacingDelta > 0 { + batchPacingMs = (rand.Int63() % batchPacingDelta) + l.opt.MinBatchPacing.Milliseconds() + } + logrus.Debugf("sleeping %d ms for batch pacing", batchPacingMs) + time.Sleep(time.Duration(batchPacingMs) * time.Millisecond) + } + + snapshot := NewSnapshot("public-proxy", l.id, uint64(i)) + if i > 0 && i%l.opt.StatusInterval == 0 { logrus.Infof("#%d: iteration %d", l.id, i) } @@ -153,6 +177,7 @@ func (l *PublicHttpLooper) iterate() { outPayload := make([]byte, payloadSize) cryptorand.Read(outPayload) outBase64 := base64.StdEncoding.EncodeToString(outPayload) + snapshot.Size = uint64(len(outBase64)) if req, err := http.NewRequest("POST", l.shr.FrontendEndpoints[0], bytes.NewBufferString(outBase64)); err == nil { client := &http.Client{Timeout: l.opt.Timeout} @@ -167,9 +192,13 @@ func (l *PublicHttpLooper) iterate() { if inBase64 != outBase64 { logrus.Errorf("#%d: payload mismatch", l.id) l.results.Mismatches++ + + snapshot.Complete().Failure(err) } else { l.results.Bytes += uint64(len(outBase64)) logrus.Debugf("#%d: payload match", l.id) + + snapshot.Complete().Success() } } else { logrus.Errorf("#%d: error: %v", l.id, err) @@ -180,12 +209,14 @@ func (l *PublicHttpLooper) iterate() { l.results.Errors++ } + snapshot.Send(l.opt.SnapshotQueue) + pacingMs := l.opt.MaxPacing.Milliseconds() pacingDelta := l.opt.MaxPacing.Milliseconds() - l.opt.MinPacing.Milliseconds() if pacingDelta > 0 { pacingMs = (rand.Int63() % pacingDelta) + l.opt.MinPacing.Milliseconds() - time.Sleep(time.Duration(pacingMs) * time.Millisecond) } + time.Sleep(time.Duration(pacingMs) * time.Millisecond) l.results.Loops++ } diff --git a/canary/snaphotStreamer.go b/canary/snaphotStreamer.go new file mode 100644 index 00000000..ff5c8074 --- /dev/null +++ b/canary/snaphotStreamer.go @@ -0,0 +1,73 @@ +package canary + +import ( + "context" + "errors" + "fmt" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + "github.com/influxdata/influxdb-client-go/v2/api" + "github.com/sirupsen/logrus" +) + +type SnapshotStreamer struct { + InputQueue chan *Snapshot + Closed chan struct{} + ctx context.Context + cfg *Config + ifxClient influxdb2.Client + ifxWriteApi api.WriteAPIBlocking +} + +func NewSnapshotStreamer(ctx context.Context, cfg *Config) (*SnapshotStreamer, error) { + out := &SnapshotStreamer{ + InputQueue: make(chan *Snapshot), + Closed: make(chan struct{}), + ctx: ctx, + cfg: cfg, + } + if cfg.Influx != nil { + out.ifxClient = influxdb2.NewClient(cfg.Influx.Url, cfg.Influx.Token) + out.ifxWriteApi = out.ifxClient.WriteAPIBlocking(cfg.Influx.Org, cfg.Influx.Bucket) + } else { + return nil, errors.New("missing influx configuration") + } + return out, nil +} + +func (ss *SnapshotStreamer) Run() { + defer close(ss.Closed) + defer ss.ifxClient.Close() + defer logrus.Info("stoping") + logrus.Info("starting") + + for { + select { + case <-ss.ctx.Done(): + return + + case snapshot := <-ss.InputQueue: + if err := ss.store(snapshot); err != nil { + logrus.Errorf("error storing snapshot: %v", err) + } + } + } +} + +func (ss *SnapshotStreamer) store(snapshot *Snapshot) error { + tags := map[string]string{ + "instance": fmt.Sprintf("%d", snapshot.Instance), + "iteration": fmt.Sprintf("%d", snapshot.Iteration), + "ok": fmt.Sprintf("%t", snapshot.Ok), + } + if snapshot.Error != nil { + tags["error"] = snapshot.Error.Error() + } + pt := influxdb2.NewPoint(snapshot.Operation, tags, map[string]interface{}{ + "duration": snapshot.Completed.Sub(snapshot.Started).Milliseconds(), + "size": snapshot.Size, + }, snapshot.Started) + if err := ss.ifxWriteApi.WritePoint(context.Background(), pt); err != nil { + return err + } + return nil +} diff --git a/canary/snapshot.go b/canary/snapshot.go new file mode 100644 index 00000000..7daf0099 --- /dev/null +++ b/canary/snapshot.go @@ -0,0 +1,52 @@ +package canary + +import ( + "fmt" + "github.com/openziti/zrok/util" + "time" +) + +type Snapshot struct { + Operation string + Instance uint + Iteration uint64 + Started time.Time + Completed time.Time + Ok bool + Error error + Size uint64 +} + +func NewSnapshot(operation string, instance uint, iteration uint64) *Snapshot { + return &Snapshot{Operation: operation, Instance: instance, Iteration: iteration, Started: time.Now()} +} + +func (s *Snapshot) Complete() *Snapshot { + s.Completed = time.Now() + return s +} + +func (s *Snapshot) Success() *Snapshot { + s.Ok = true + return s +} + +func (s *Snapshot) Failure(err error) *Snapshot { + s.Ok = false + s.Error = err + return s +} + +func (s *Snapshot) Send(queue chan *Snapshot) { + if queue != nil { + queue <- s + } +} + +func (s *Snapshot) String() string { + if s.Ok { + return fmt.Sprintf("[%v, %d, %d] (ok) %v, %v", s.Operation, s.Instance, s.Iteration, s.Completed.Sub(s.Started), util.BytesToSize(int64(s.Size))) + } else { + return fmt.Sprintf("[%v, %d, %d] (err) %v, %v, (%v)", s.Operation, s.Instance, s.Iteration, s.Completed.Sub(s.Started), util.BytesToSize(int64(s.Size)), s.Error) + } +} diff --git a/cmd/zrok/testCanaryEnabler.go b/cmd/zrok/testCanaryEnabler.go index 0a8111cf..063f8e87 100644 --- a/cmd/zrok/testCanaryEnabler.go +++ b/cmd/zrok/testCanaryEnabler.go @@ -18,10 +18,13 @@ type testCanaryEnabler struct { cmd *cobra.Command enablers uint iterations uint + preDelay time.Duration minPreDelay time.Duration maxPreDelay time.Duration + dwell time.Duration minDwell time.Duration maxDwell time.Duration + pacing time.Duration minPacing time.Duration maxPacing time.Duration skipDisable bool @@ -38,8 +41,10 @@ func newTestCanaryEnabler() *testCanaryEnabler { cmd.Run = command.run cmd.Flags().UintVarP(&command.enablers, "enablers", "e", 1, "Number of concurrent enablers to start") cmd.Flags().UintVarP(&command.iterations, "iterations", "i", 1, "Number of iterations") + cmd.Flags().DurationVar(&command.dwell, "dwell", 0, "Fixed dwell time") cmd.Flags().DurationVar(&command.minDwell, "min-dwell", 0, "Minimum dwell time") cmd.Flags().DurationVar(&command.maxDwell, "max-dwell", 0, "Maximum dwell time") + cmd.Flags().DurationVar(&command.pacing, "pacing", 0, "Fixed pacing time") cmd.Flags().DurationVar(&command.minPacing, "min-pacing", 0, "Minimum pacing time") cmd.Flags().DurationVar(&command.maxPacing, "max-pacing", 0, "Maximum pacing time") cmd.Flags().BoolVar(&command.skipDisable, "skip-disable", false, "Disable (clean up) enabled environments") @@ -57,17 +62,20 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) { panic(err) } - var sc *canary.SnapshotCollector - var scCtx context.Context - var scCancel context.CancelFunc + var sns *canary.SnapshotStreamer + var snsCtx context.Context + var snsCancel context.CancelFunc if cmd.canaryConfig != "" { cfg, err := canary.LoadConfig(cmd.canaryConfig) if err != nil { panic(err) } - scCtx, scCancel = context.WithCancel(context.Background()) - sc = canary.NewSnapshotCollector(scCtx, cfg) - go sc.Run() + snsCtx, snsCancel = context.WithCancel(context.Background()) + sns, err = canary.NewSnapshotStreamer(snsCtx, cfg) + if err != nil { + panic(err) + } + go sns.Run() } var enablers []*canary.Enabler @@ -76,8 +84,8 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) { preDelayDelta := cmd.maxPreDelay.Milliseconds() - cmd.minPreDelay.Milliseconds() if preDelayDelta > 0 { preDelay = int64(rand.Intn(int(preDelayDelta))) + cmd.minPreDelay.Milliseconds() - time.Sleep(time.Duration(preDelay) * time.Millisecond) } + time.Sleep(time.Duration(preDelay) * time.Millisecond) enablerOpts := &canary.EnablerOptions{ Iterations: cmd.iterations, @@ -86,8 +94,16 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) { MinPacing: cmd.minPacing, MaxPacing: cmd.maxPacing, } - if sc != nil { - enablerOpts.SnapshotQueue = sc.InputQueue + if cmd.pacing > 0 { + enablerOpts.MinDwell = cmd.dwell + enablerOpts.MaxDwell = cmd.dwell + } + if cmd.pacing > 0 { + enablerOpts.MinPacing = cmd.pacing + enablerOpts.MaxPacing = cmd.pacing + } + if sns != nil { + enablerOpts.SnapshotQueue = sns.InputQueue } enabler := canary.NewEnabler(i, enablerOpts, root) enablers = append(enablers, enabler) @@ -100,8 +116,8 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) { disablerOpts := &canary.DisablerOptions{ Environments: enablers[i].Environments, } - if sc != nil { - disablerOpts.SnapshotQueue = sc.InputQueue + if sns != nil { + disablerOpts.SnapshotQueue = sns.InputQueue } disabler := canary.NewDisabler(i, disablerOpts, root) disablers = append(disablers, disabler) @@ -131,13 +147,10 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) { <-enabler.Done } - if sc != nil { - scCancel() - <-sc.Closed - if err := sc.Store(); err != nil { - panic(err) - } + if sns != nil { + snsCancel() + <-sns.Closed } - logrus.Infof("complete") + logrus.Info("complete") } diff --git a/cmd/zrok/testCanaryPrivateProxy.go b/cmd/zrok/testCanaryPrivateProxy.go index cb3db67d..3313903b 100644 --- a/cmd/zrok/testCanaryPrivateProxy.go +++ b/cmd/zrok/testCanaryPrivateProxy.go @@ -1,6 +1,7 @@ package main import ( + "context" "github.com/openziti/zrok/canary" "github.com/openziti/zrok/environment" "github.com/sirupsen/logrus" @@ -22,14 +23,25 @@ type testCanaryPrivateProxy struct { iterations uint statusInterval uint timeout time.Duration + payload uint64 minPayload uint64 maxPayload uint64 + preDelay time.Duration minPreDelay time.Duration maxPreDelay time.Duration + dwell time.Duration minDwell time.Duration maxDwell time.Duration + pacing time.Duration minPacing time.Duration maxPacing time.Duration + batchSize uint + batchPacing time.Duration + minBatchPacing time.Duration + maxBatchPacing time.Duration + targetName string + bindAddress string + canaryConfig string } func newTestCanaryPrivateProxy() *testCanaryPrivateProxy { @@ -44,18 +56,33 @@ func newTestCanaryPrivateProxy() *testCanaryPrivateProxy { cmd.Flags().UintVarP(&command.iterations, "iterations", "i", 1, "Number of iterations") cmd.Flags().UintVarP(&command.statusInterval, "status-interval", "S", 100, "Show status every # iterations") cmd.Flags().DurationVarP(&command.timeout, "timeout", "T", 30*time.Second, "Timeout when sending HTTP requests") + cmd.Flags().Uint64Var(&command.payload, "payload", 0, "Fixed payload size in bytes") cmd.Flags().Uint64Var(&command.minPayload, "min-payload", 64, "Minimum payload size in bytes") cmd.Flags().Uint64Var(&command.maxPayload, "max-payload", 10240, "Maximum payload size in bytes") + cmd.Flags().DurationVar(&command.preDelay, "pre-delay", 0, "Fixed pre-delay before creating the next looper") cmd.Flags().DurationVar(&command.minPreDelay, "min-pre-delay", 0, "Minimum pre-delay before creating the next looper") cmd.Flags().DurationVar(&command.maxPreDelay, "max-pre-delay", 0, "Maximum pre-delay before creating the next looper") + cmd.Flags().DurationVar(&command.dwell, "dwell", 0, "Fixed dwell time") cmd.Flags().DurationVar(&command.minDwell, "min-dwell", 1*time.Second, "Minimum dwell time") cmd.Flags().DurationVar(&command.maxDwell, "max-dwell", 1*time.Second, "Maximum dwell time") + cmd.Flags().DurationVar(&command.pacing, "pacing", 0, "Fixed pacing time") cmd.Flags().DurationVar(&command.minPacing, "min-pacing", 0, "Minimum pacing time") cmd.Flags().DurationVar(&command.maxPacing, "max-pacing", 0, "Maximum pacing time") + cmd.Flags().UintVar(&command.batchSize, "batch-size", 0, "Iterate in batches of this size") + cmd.Flags().DurationVar(&command.batchPacing, "batch-pacing", 0, "Fixed batch pacing time") + cmd.Flags().DurationVar(&command.minBatchPacing, "min-batch-pacing", 0, "Minimum batch pacing time") + cmd.Flags().DurationVar(&command.maxBatchPacing, "max-batch-pacing", 0, "Maximum batch pacing time") + cmd.Flags().StringVar(&command.targetName, "target-name", "", "Metadata describing the virtual target") + cmd.Flags().StringVar(&command.bindAddress, "bind-address", "", "Metadata describing the virtual bind address") + cmd.Flags().StringVar(&command.canaryConfig, "canary-config", "", "Path to canary configuration file") return command } func (cmd *testCanaryPrivateProxy) run(_ *cobra.Command, _ []string) { + if err := canary.AcknowledgeDangerousCanary(); err != nil { + logrus.Fatal(err) + } + root, err := environment.LoadRoot() if err != nil { panic(err) @@ -65,14 +92,35 @@ func (cmd *testCanaryPrivateProxy) run(_ *cobra.Command, _ []string) { logrus.Fatal("unable to load environment; did you 'zrok enable'?") } + var sns *canary.SnapshotStreamer + var snsCtx context.Context + var snsCancel context.CancelFunc + if cmd.canaryConfig != "" { + cfg, err := canary.LoadConfig(cmd.canaryConfig) + if err != nil { + panic(err) + } + snsCtx, snsCancel = context.WithCancel(context.Background()) + sns, err = canary.NewSnapshotStreamer(snsCtx, cfg) + if err != nil { + panic(err) + } + go sns.Run() + } + var loopers []*canary.PrivateHttpLooper for i := uint(0); i < cmd.loopers; i++ { - preDelay := cmd.maxPreDelay.Milliseconds() - preDelayDelta := cmd.maxPreDelay.Milliseconds() - cmd.minPreDelay.Milliseconds() - if preDelayDelta > 0 { - preDelay = int64(rand.Intn(int(preDelayDelta))) + cmd.minPreDelay.Milliseconds() - time.Sleep(time.Duration(preDelay) * time.Millisecond) + var preDelay int64 + if cmd.preDelay > 0 { + preDelay = cmd.preDelay.Milliseconds() + } else { + preDelay = cmd.maxPreDelay.Milliseconds() + preDelayDelta := cmd.maxPreDelay.Milliseconds() - cmd.minPreDelay.Milliseconds() + if preDelayDelta > 0 { + preDelay = int64(rand.Intn(int(preDelayDelta))) + cmd.minPreDelay.Milliseconds() + } } + time.Sleep(time.Duration(preDelay) * time.Millisecond) looperOpts := &canary.LooperOptions{ Iterations: cmd.iterations, @@ -84,6 +132,30 @@ func (cmd *testCanaryPrivateProxy) run(_ *cobra.Command, _ []string) { MaxDwell: cmd.maxDwell, MinPacing: cmd.minPacing, MaxPacing: cmd.maxPacing, + BatchSize: cmd.batchSize, + MinBatchPacing: cmd.minBatchPacing, + MaxBatchPacing: cmd.maxBatchPacing, + TargetName: cmd.targetName, + BindAddress: cmd.bindAddress, + } + if cmd.payload > 0 { + looperOpts.MinPayload = cmd.payload + looperOpts.MaxPayload = cmd.payload + } + if cmd.dwell > 0 { + looperOpts.MinDwell = cmd.dwell + looperOpts.MaxDwell = cmd.dwell + } + if cmd.pacing > 0 { + looperOpts.MinPacing = cmd.pacing + looperOpts.MaxPacing = cmd.pacing + } + if cmd.batchPacing > 0 { + looperOpts.MinBatchPacing = cmd.batchPacing + looperOpts.MaxBatchPacing = cmd.batchPacing + } + if sns != nil { + looperOpts.SnapshotQueue = sns.InputQueue } looper := canary.NewPrivateHttpLooper(i, looperOpts, root) loopers = append(loopers, looper) @@ -103,6 +175,11 @@ func (cmd *testCanaryPrivateProxy) run(_ *cobra.Command, _ []string) { <-l.Done() } + if sns != nil { + snsCancel() + <-sns.Closed + } + results := make([]*canary.LooperResults, 0) for i := uint(0); i < cmd.loopers; i++ { results = append(results, loopers[i].Results()) diff --git a/cmd/zrok/testCanaryPublicProxy.go b/cmd/zrok/testCanaryPublicProxy.go index ae7616dc..81e44a5b 100644 --- a/cmd/zrok/testCanaryPublicProxy.go +++ b/cmd/zrok/testCanaryPublicProxy.go @@ -1,6 +1,7 @@ package main import ( + "context" "github.com/openziti/zrok/canary" "github.com/openziti/zrok/environment" "github.com/sirupsen/logrus" @@ -22,15 +23,25 @@ type testCanaryPublicProxy struct { iterations uint statusInterval uint timeout time.Duration + payload uint64 minPayload uint64 maxPayload uint64 + preDelay time.Duration minPreDelay time.Duration maxPreDelay time.Duration + dwell time.Duration minDwell time.Duration maxDwell time.Duration + pacing time.Duration minPacing time.Duration maxPacing time.Duration + batchSize uint + batchPacing time.Duration + minBatchPacing time.Duration + maxBatchPacing time.Duration + targetName string frontendSelection string + canaryConfig string } func newTestCanaryPublicProxy() *testCanaryPublicProxy { @@ -45,19 +56,33 @@ func newTestCanaryPublicProxy() *testCanaryPublicProxy { cmd.Flags().UintVarP(&command.iterations, "iterations", "i", 1, "Number of iterations") cmd.Flags().UintVarP(&command.statusInterval, "status-interval", "S", 100, "Show status every # iterations") cmd.Flags().DurationVarP(&command.timeout, "timeout", "T", 30*time.Second, "Timeout when sending HTTP requests") + cmd.Flags().Uint64Var(&command.payload, "payload", 0, "Fixed payload size") cmd.Flags().Uint64Var(&command.minPayload, "min-payload", 64, "Minimum payload size in bytes") cmd.Flags().Uint64Var(&command.maxPayload, "max-payload", 10240, "Maximum payload size in bytes") + cmd.Flags().DurationVar(&command.preDelay, "pre-delay", 0, "Fixed pre-delay before creating the next looper") cmd.Flags().DurationVar(&command.minPreDelay, "min-pre-delay", 0, "Minimum pre-delay before creating the next looper") cmd.Flags().DurationVar(&command.maxPreDelay, "max-pre-delay", 0, "Maximum pre-delay before creating the next looper") + cmd.Flags().DurationVar(&command.dwell, "dwell", 1*time.Second, "Fixed dwell time") cmd.Flags().DurationVar(&command.minDwell, "min-dwell", 1*time.Second, "Minimum dwell time") cmd.Flags().DurationVar(&command.maxDwell, "max-dwell", 1*time.Second, "Maximum dwell time") + cmd.Flags().DurationVar(&command.pacing, "pacing", 0, "Fixed pacing time") cmd.Flags().DurationVar(&command.minPacing, "min-pacing", 0, "Minimum pacing time") cmd.Flags().DurationVar(&command.maxPacing, "max-pacing", 0, "Maximum pacing time") + cmd.Flags().UintVar(&command.batchSize, "batch-size", 0, "Iterate in batches of this size") + cmd.Flags().DurationVar(&command.batchPacing, "batch-pacing", 0, "Fixed batch pacing time") + cmd.Flags().DurationVar(&command.minBatchPacing, "min-batch-pacing", 0, "Minimum batch pacing time") + cmd.Flags().DurationVar(&command.maxBatchPacing, "max-batch-pacing", 0, "Maximum batch pacing time") + cmd.Flags().StringVar(&command.targetName, "target-name", "", "Metadata describing the virtual target") cmd.Flags().StringVar(&command.frontendSelection, "frontend-selection", "public", "Select frontend selection") + cmd.Flags().StringVar(&command.canaryConfig, "canary-config", "", "Path to canary configuration file") return command } func (cmd *testCanaryPublicProxy) run(_ *cobra.Command, _ []string) { + if err := canary.AcknowledgeDangerousCanary(); err != nil { + logrus.Fatal(err) + } + root, err := environment.LoadRoot() if err != nil { panic(err) @@ -67,14 +92,35 @@ func (cmd *testCanaryPublicProxy) run(_ *cobra.Command, _ []string) { logrus.Fatal("unable to load environment; did you 'zrok enable'?") } + var sns *canary.SnapshotStreamer + var snsCtx context.Context + var snsCancel context.CancelFunc + if cmd.canaryConfig != "" { + cfg, err := canary.LoadConfig(cmd.canaryConfig) + if err != nil { + panic(err) + } + snsCtx, snsCancel = context.WithCancel(context.Background()) + sns, err = canary.NewSnapshotStreamer(snsCtx, cfg) + if err != nil { + panic(err) + } + go sns.Run() + } + var loopers []*canary.PublicHttpLooper for i := uint(0); i < cmd.loopers; i++ { - preDelay := cmd.maxPreDelay.Milliseconds() - preDelayDelta := cmd.maxPreDelay.Milliseconds() - cmd.minPreDelay.Milliseconds() - if preDelayDelta > 0 { - preDelay = int64(rand.Intn(int(preDelayDelta))) + cmd.minPreDelay.Milliseconds() - time.Sleep(time.Duration(preDelay) * time.Millisecond) + var preDelay int64 + if cmd.preDelay > 0 { + preDelay = cmd.preDelay.Milliseconds() + } else { + preDelay = cmd.maxPreDelay.Milliseconds() + preDelayDelta := cmd.maxPreDelay.Milliseconds() - cmd.minPreDelay.Milliseconds() + if preDelayDelta > 0 { + preDelay = int64(rand.Intn(int(preDelayDelta))) + cmd.minPreDelay.Milliseconds() + } } + time.Sleep(time.Duration(preDelay) * time.Millisecond) looperOpts := &canary.LooperOptions{ Iterations: cmd.iterations, @@ -86,6 +132,29 @@ func (cmd *testCanaryPublicProxy) run(_ *cobra.Command, _ []string) { MaxDwell: cmd.maxDwell, MinPacing: cmd.minPacing, MaxPacing: cmd.maxPacing, + BatchSize: cmd.batchSize, + MinBatchPacing: cmd.minBatchPacing, + MaxBatchPacing: cmd.maxBatchPacing, + TargetName: cmd.targetName, + } + if cmd.payload > 0 { + looperOpts.MinPayload = cmd.payload + looperOpts.MaxPayload = cmd.payload + } + if cmd.dwell > 0 { + looperOpts.MinDwell = cmd.dwell + looperOpts.MaxDwell = cmd.dwell + } + if cmd.pacing > 0 { + looperOpts.MinPacing = cmd.pacing + looperOpts.MaxPacing = cmd.pacing + } + if cmd.batchPacing > 0 { + looperOpts.MinBatchPacing = cmd.batchPacing + looperOpts.MaxBatchPacing = cmd.batchPacing + } + if sns != nil { + looperOpts.SnapshotQueue = sns.InputQueue } looper := canary.NewPublicHttpLooper(i, cmd.frontendSelection, looperOpts, root) loopers = append(loopers, looper) @@ -105,6 +174,11 @@ func (cmd *testCanaryPublicProxy) run(_ *cobra.Command, _ []string) { <-l.Done() } + if sns != nil { + snsCancel() + <-sns.Closed + } + results := make([]*canary.LooperResults, 0) for i := uint(0); i < cmd.loopers; i++ { results = append(results, loopers[i].Results()) diff --git a/controller/access.go b/controller/access.go index 15ad155c..3f7811d1 100644 --- a/controller/access.go +++ b/controller/access.go @@ -81,7 +81,11 @@ func (h *accessHandler) Handle(params share.AccessParams, principal *rest_model_ return share.NewAccessInternalServerError() } - if _, err := str.CreateFrontend(envId, &store.Frontend{PrivateShareId: &shr.Id, Token: feToken, ZId: envZId, PermissionMode: store.ClosedPermissionMode}, trx); err != nil { + fe := &store.Frontend{PrivateShareId: &shr.Id, Token: feToken, ZId: envZId, PermissionMode: store.ClosedPermissionMode} + if params.Body.BindAddress != "" { + fe.BindAddress = ¶ms.Body.BindAddress + } + if _, err := str.CreateFrontend(envId, fe, trx); err != nil { logrus.Errorf("error creating frontend record for user '%v': %v", principal.Email, err) return share.NewAccessInternalServerError() } diff --git a/sdk/golang/sdk/access.go b/sdk/golang/sdk/access.go index fd7068c9..3f8aeb5b 100644 --- a/sdk/golang/sdk/access.go +++ b/sdk/golang/sdk/access.go @@ -5,6 +5,7 @@ import ( "github.com/openziti/zrok/environment/env_core" "github.com/openziti/zrok/rest_client_zrok/share" "github.com/pkg/errors" + "github.com/sirupsen/logrus" ) func CreateAccess(root env_core.Root, request *AccessRequest) (*Access, error) { @@ -15,6 +16,10 @@ func CreateAccess(root env_core.Root, request *AccessRequest) (*Access, error) { out := share.NewAccessParams() out.Body.ShareToken = request.ShareToken out.Body.EnvZID = root.Environment().ZitiIdentity + if request.BindAddress != "" { + out.Body.BindAddress = request.BindAddress + logrus.Infof("requesting bind address '%v'", out.Body.BindAddress) + } zrok, err := root.Client() if err != nil { diff --git a/sdk/golang/sdk/model.go b/sdk/golang/sdk/model.go index 74752538..d9dc9a3a 100644 --- a/sdk/golang/sdk/model.go +++ b/sdk/golang/sdk/model.go @@ -62,7 +62,8 @@ type Share struct { } type AccessRequest struct { - ShareToken string + ShareToken string + BindAddress string } type Access struct {