cli improvements to enabler; snapshot improvements in enabler (#954)

This commit is contained in:
Michael Quigley 2025-04-30 17:18:05 -04:00
parent 62dbb2fc9c
commit df6a5877ae
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
7 changed files with 51 additions and 38 deletions

View File

@ -63,31 +63,23 @@ func (e *Enabler) iterate() {
Description: "canary.Enabler", Description: "canary.Enabler",
}) })
if err == nil { if err == nil {
snapshot.Completed = time.Now() snapshot.Complete().Success()
snapshot.Ok = true
e.Environments <- env e.Environments <- env
logrus.Infof("#%d enabled environment '%v'", e.Id, env.ZitiIdentity) logrus.Infof("#%d enabled environment '%v'", e.Id, env.ZitiIdentity)
} else { } else {
snapshot.Completed = time.Now() snapshot.Complete().Failure(err)
snapshot.Ok = false
snapshot.Error = err
logrus.Errorf("error creating canary (#%d) environment: %v", e.Id, err) logrus.Errorf("error creating canary (#%d) environment: %v", e.Id, err)
} }
if e.opt.SnapshotQueue != nil { snapshot.Send(e.opt.SnapshotQueue)
e.opt.SnapshotQueue <- snapshot
} else {
logrus.Info(snapshot)
}
pacingMs := e.opt.MaxPacing.Milliseconds() pacingMs := e.opt.MaxPacing.Milliseconds()
pacingDelta := e.opt.MaxPacing.Milliseconds() - e.opt.MinPacing.Milliseconds() pacingDelta := e.opt.MaxPacing.Milliseconds() - e.opt.MinPacing.Milliseconds()
if pacingDelta > 0 { if pacingDelta > 0 {
pacingMs = (rand.Int63() % pacingDelta) + e.opt.MinPacing.Milliseconds() pacingMs = (rand.Int63() % pacingDelta) + e.opt.MinPacing.Milliseconds()
}
time.Sleep(time.Duration(pacingMs) * time.Millisecond) time.Sleep(time.Duration(pacingMs) * time.Millisecond)
} }
} }
}

View File

@ -90,7 +90,7 @@ func (l *PrivateHttpLooper) startup() error {
}) })
snapshotCreateShare.Complete() snapshotCreateShare.Complete()
if err != nil { if err != nil {
snapshotCreateShare.Failed(err).Send(l.opt.SnapshotQueue) snapshotCreateShare.Failure(err).Send(l.opt.SnapshotQueue)
return err return err
} }
snapshotCreateShare.Success().Send(l.opt.SnapshotQueue) snapshotCreateShare.Success().Send(l.opt.SnapshotQueue)
@ -107,7 +107,7 @@ func (l *PrivateHttpLooper) startup() error {
}) })
snapshotCreateAccess.Complete() snapshotCreateAccess.Complete()
if err != nil { if err != nil {
snapshotCreateAccess.Failed(err).Send(l.opt.SnapshotQueue) snapshotCreateAccess.Failure(err).Send(l.opt.SnapshotQueue)
return err return err
} }
snapshotCreateAccess.Success().Send(l.opt.SnapshotQueue) snapshotCreateAccess.Success().Send(l.opt.SnapshotQueue)
@ -138,7 +138,7 @@ func (l *PrivateHttpLooper) bind() error {
snapshotListen := NewSnapshot("listen", l.id, 0) snapshotListen := NewSnapshot("listen", l.id, 0)
if l.listener, err = zctx.ListenWithOptions(l.shr.Token, &options); err != nil { if l.listener, err = zctx.ListenWithOptions(l.shr.Token, &options); err != nil {
snapshotListen.Complete().Failed(err).Send(l.opt.SnapshotQueue) snapshotListen.Complete().Failure(err).Send(l.opt.SnapshotQueue)
return errors.Wrapf(err, "#%d error binding listener", l.id) return errors.Wrapf(err, "#%d error binding listener", l.id)
} }
snapshotListen.Complete().Success().Send(l.opt.SnapshotQueue) snapshotListen.Complete().Success().Send(l.opt.SnapshotQueue)
@ -228,7 +228,7 @@ func (l *PrivateHttpLooper) iterate() {
logrus.Errorf("#%d: payload mismatch", l.id) logrus.Errorf("#%d: payload mismatch", l.id)
l.results.Mismatches++ l.results.Mismatches++
snapshot.Complete().Failed(err) snapshot.Complete().Failure(err)
} else { } else {
l.results.Bytes += uint64(len(outBase64)) l.results.Bytes += uint64(len(outBase64))
logrus.Debugf("#%d: payload match", l.id) logrus.Debugf("#%d: payload match", l.id)

View File

@ -88,7 +88,7 @@ func (l *PublicHttpLooper) startup() error {
}) })
snapshotCreateShare.Complete() snapshotCreateShare.Complete()
if err != nil { if err != nil {
snapshotCreateShare.Failed(err).Send(l.opt.SnapshotQueue) snapshotCreateShare.Failure(err).Send(l.opt.SnapshotQueue)
return err return err
} }
snapshotCreateShare.Success().Send(l.opt.SnapshotQueue) snapshotCreateShare.Success().Send(l.opt.SnapshotQueue)
@ -119,7 +119,7 @@ func (l *PublicHttpLooper) bind() error {
snapshotListen := NewSnapshot("listen", l.id, 0) snapshotListen := NewSnapshot("listen", l.id, 0)
if l.listener, err = zctx.ListenWithOptions(l.shr.Token, &options); err != nil { if l.listener, err = zctx.ListenWithOptions(l.shr.Token, &options); err != nil {
snapshotListen.Complete().Failed(err).Send(l.opt.SnapshotQueue) snapshotListen.Complete().Failure(err).Send(l.opt.SnapshotQueue)
return errors.Wrapf(err, "#%d error binding listener", l.id) return errors.Wrapf(err, "#%d error binding listener", l.id)
} }
snapshotListen.Complete().Success().Send(l.opt.SnapshotQueue) snapshotListen.Complete().Success().Send(l.opt.SnapshotQueue)
@ -193,7 +193,7 @@ func (l *PublicHttpLooper) iterate() {
logrus.Errorf("#%d: payload mismatch", l.id) logrus.Errorf("#%d: payload mismatch", l.id)
l.results.Mismatches++ l.results.Mismatches++
snapshot.Complete().Failed(err) snapshot.Complete().Failure(err)
} else { } else {
l.results.Bytes += uint64(len(outBase64)) l.results.Bytes += uint64(len(outBase64))
logrus.Debugf("#%d: payload match", l.id) logrus.Debugf("#%d: payload match", l.id)

View File

@ -31,7 +31,7 @@ func (s *Snapshot) Success() *Snapshot {
return s return s
} }
func (s *Snapshot) Failed(err error) *Snapshot { func (s *Snapshot) Failure(err error) *Snapshot {
s.Ok = false s.Ok = false
s.Error = err s.Error = err
return s return s

View File

@ -18,10 +18,13 @@ type testCanaryEnabler struct {
cmd *cobra.Command cmd *cobra.Command
enablers uint enablers uint
iterations uint iterations uint
preDelay time.Duration
minPreDelay time.Duration minPreDelay time.Duration
maxPreDelay time.Duration maxPreDelay time.Duration
dwell time.Duration
minDwell time.Duration minDwell time.Duration
maxDwell time.Duration maxDwell time.Duration
pacing time.Duration
minPacing time.Duration minPacing time.Duration
maxPacing time.Duration maxPacing time.Duration
skipDisable bool skipDisable bool
@ -38,8 +41,10 @@ func newTestCanaryEnabler() *testCanaryEnabler {
cmd.Run = command.run cmd.Run = command.run
cmd.Flags().UintVarP(&command.enablers, "enablers", "e", 1, "Number of concurrent enablers to start") cmd.Flags().UintVarP(&command.enablers, "enablers", "e", 1, "Number of concurrent enablers to start")
cmd.Flags().UintVarP(&command.iterations, "iterations", "i", 1, "Number of iterations") cmd.Flags().UintVarP(&command.iterations, "iterations", "i", 1, "Number of iterations")
cmd.Flags().DurationVar(&command.dwell, "dwell", 0, "Fixed dwell time")
cmd.Flags().DurationVar(&command.minDwell, "min-dwell", 0, "Minimum dwell time") cmd.Flags().DurationVar(&command.minDwell, "min-dwell", 0, "Minimum dwell time")
cmd.Flags().DurationVar(&command.maxDwell, "max-dwell", 0, "Maximum dwell time") cmd.Flags().DurationVar(&command.maxDwell, "max-dwell", 0, "Maximum dwell time")
cmd.Flags().DurationVar(&command.pacing, "pacing", 0, "Fixed pacing time")
cmd.Flags().DurationVar(&command.minPacing, "min-pacing", 0, "Minimum pacing time") cmd.Flags().DurationVar(&command.minPacing, "min-pacing", 0, "Minimum pacing time")
cmd.Flags().DurationVar(&command.maxPacing, "max-pacing", 0, "Maximum pacing time") cmd.Flags().DurationVar(&command.maxPacing, "max-pacing", 0, "Maximum pacing time")
cmd.Flags().BoolVar(&command.skipDisable, "skip-disable", false, "Disable (clean up) enabled environments") cmd.Flags().BoolVar(&command.skipDisable, "skip-disable", false, "Disable (clean up) enabled environments")
@ -57,17 +62,20 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) {
panic(err) panic(err)
} }
var sc *canary.SnapshotCollector var sns *canary.SnapshotStreamer
var scCtx context.Context var snsCtx context.Context
var scCancel context.CancelFunc var snsCancel context.CancelFunc
if cmd.canaryConfig != "" { if cmd.canaryConfig != "" {
cfg, err := canary.LoadConfig(cmd.canaryConfig) cfg, err := canary.LoadConfig(cmd.canaryConfig)
if err != nil { if err != nil {
panic(err) panic(err)
} }
scCtx, scCancel = context.WithCancel(context.Background()) snsCtx, snsCancel = context.WithCancel(context.Background())
sc = canary.NewSnapshotCollector(scCtx, cfg) sns, err = canary.NewSnapshotStreamer(snsCtx, cfg)
go sc.Run() if err != nil {
panic(err)
}
go sns.Run()
} }
var enablers []*canary.Enabler var enablers []*canary.Enabler
@ -76,8 +84,8 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) {
preDelayDelta := cmd.maxPreDelay.Milliseconds() - cmd.minPreDelay.Milliseconds() preDelayDelta := cmd.maxPreDelay.Milliseconds() - cmd.minPreDelay.Milliseconds()
if preDelayDelta > 0 { if preDelayDelta > 0 {
preDelay = int64(rand.Intn(int(preDelayDelta))) + cmd.minPreDelay.Milliseconds() preDelay = int64(rand.Intn(int(preDelayDelta))) + cmd.minPreDelay.Milliseconds()
time.Sleep(time.Duration(preDelay) * time.Millisecond)
} }
time.Sleep(time.Duration(preDelay) * time.Millisecond)
enablerOpts := &canary.EnablerOptions{ enablerOpts := &canary.EnablerOptions{
Iterations: cmd.iterations, Iterations: cmd.iterations,
@ -86,8 +94,16 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) {
MinPacing: cmd.minPacing, MinPacing: cmd.minPacing,
MaxPacing: cmd.maxPacing, MaxPacing: cmd.maxPacing,
} }
if sc != nil { if cmd.pacing > 0 {
enablerOpts.SnapshotQueue = sc.InputQueue enablerOpts.MinDwell = cmd.dwell
enablerOpts.MaxDwell = cmd.dwell
}
if cmd.pacing > 0 {
enablerOpts.MinPacing = cmd.pacing
enablerOpts.MaxPacing = cmd.pacing
}
if sns != nil {
enablerOpts.SnapshotQueue = sns.InputQueue
} }
enabler := canary.NewEnabler(i, enablerOpts, root) enabler := canary.NewEnabler(i, enablerOpts, root)
enablers = append(enablers, enabler) enablers = append(enablers, enabler)
@ -100,8 +116,8 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) {
disablerOpts := &canary.DisablerOptions{ disablerOpts := &canary.DisablerOptions{
Environments: enablers[i].Environments, Environments: enablers[i].Environments,
} }
if sc != nil { if sns != nil {
disablerOpts.SnapshotQueue = sc.InputQueue disablerOpts.SnapshotQueue = sns.InputQueue
} }
disabler := canary.NewDisabler(i, disablerOpts, root) disabler := canary.NewDisabler(i, disablerOpts, root)
disablers = append(disablers, disabler) disablers = append(disablers, disabler)
@ -131,13 +147,10 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) {
<-enabler.Done <-enabler.Done
} }
if sc != nil { if sns != nil {
scCancel() snsCancel()
<-sc.Closed <-sns.Closed
if err := sc.Store(); err != nil {
panic(err)
}
} }
logrus.Infof("complete") logrus.Info("complete")
} }

View File

@ -79,6 +79,10 @@ func newTestCanaryPrivateProxy() *testCanaryPrivateProxy {
} }
func (cmd *testCanaryPrivateProxy) run(_ *cobra.Command, _ []string) { func (cmd *testCanaryPrivateProxy) run(_ *cobra.Command, _ []string) {
if err := canary.AcknowledgeDangerousCanary(); err != nil {
logrus.Fatal(err)
}
root, err := environment.LoadRoot() root, err := environment.LoadRoot()
if err != nil { if err != nil {
panic(err) panic(err)

View File

@ -79,6 +79,10 @@ func newTestCanaryPublicProxy() *testCanaryPublicProxy {
} }
func (cmd *testCanaryPublicProxy) run(_ *cobra.Command, _ []string) { func (cmd *testCanaryPublicProxy) run(_ *cobra.Command, _ []string) {
if err := canary.AcknowledgeDangerousCanary(); err != nil {
logrus.Fatal(err)
}
root, err := environment.LoadRoot() root, err := environment.LoadRoot()
if err != nil { if err != nil {
panic(err) panic(err)