diff --git a/canary/enabler.go b/canary/enabler.go index 1841570c..58d72f60 100644 --- a/canary/enabler.go +++ b/canary/enabler.go @@ -63,31 +63,23 @@ func (e *Enabler) iterate() { Description: "canary.Enabler", }) if err == nil { - snapshot.Completed = time.Now() - snapshot.Ok = true - + snapshot.Complete().Success() e.Environments <- env logrus.Infof("#%d enabled environment '%v'", e.Id, env.ZitiIdentity) } else { - snapshot.Completed = time.Now() - snapshot.Ok = false - snapshot.Error = err + snapshot.Complete().Failure(err) logrus.Errorf("error creating canary (#%d) environment: %v", e.Id, err) } - if e.opt.SnapshotQueue != nil { - e.opt.SnapshotQueue <- snapshot - } else { - logrus.Info(snapshot) - } + snapshot.Send(e.opt.SnapshotQueue) pacingMs := e.opt.MaxPacing.Milliseconds() pacingDelta := e.opt.MaxPacing.Milliseconds() - e.opt.MinPacing.Milliseconds() if pacingDelta > 0 { pacingMs = (rand.Int63() % pacingDelta) + e.opt.MinPacing.Milliseconds() - time.Sleep(time.Duration(pacingMs) * time.Millisecond) } + time.Sleep(time.Duration(pacingMs) * time.Millisecond) } } diff --git a/canary/privateHttpLooper.go b/canary/privateHttpLooper.go index 052e510d..5df63962 100644 --- a/canary/privateHttpLooper.go +++ b/canary/privateHttpLooper.go @@ -90,7 +90,7 @@ func (l *PrivateHttpLooper) startup() error { }) snapshotCreateShare.Complete() if err != nil { - snapshotCreateShare.Failed(err).Send(l.opt.SnapshotQueue) + snapshotCreateShare.Failure(err).Send(l.opt.SnapshotQueue) return err } snapshotCreateShare.Success().Send(l.opt.SnapshotQueue) @@ -107,7 +107,7 @@ func (l *PrivateHttpLooper) startup() error { }) snapshotCreateAccess.Complete() if err != nil { - snapshotCreateAccess.Failed(err).Send(l.opt.SnapshotQueue) + snapshotCreateAccess.Failure(err).Send(l.opt.SnapshotQueue) return err } snapshotCreateAccess.Success().Send(l.opt.SnapshotQueue) @@ -138,7 +138,7 @@ func (l *PrivateHttpLooper) bind() error { snapshotListen := NewSnapshot("listen", l.id, 0) if l.listener, err = zctx.ListenWithOptions(l.shr.Token, &options); err != nil { - snapshotListen.Complete().Failed(err).Send(l.opt.SnapshotQueue) + snapshotListen.Complete().Failure(err).Send(l.opt.SnapshotQueue) return errors.Wrapf(err, "#%d error binding listener", l.id) } snapshotListen.Complete().Success().Send(l.opt.SnapshotQueue) @@ -228,7 +228,7 @@ func (l *PrivateHttpLooper) iterate() { logrus.Errorf("#%d: payload mismatch", l.id) l.results.Mismatches++ - snapshot.Complete().Failed(err) + snapshot.Complete().Failure(err) } else { l.results.Bytes += uint64(len(outBase64)) logrus.Debugf("#%d: payload match", l.id) diff --git a/canary/publicHttpLooper.go b/canary/publicHttpLooper.go index b47bc390..b2756a68 100644 --- a/canary/publicHttpLooper.go +++ b/canary/publicHttpLooper.go @@ -88,7 +88,7 @@ func (l *PublicHttpLooper) startup() error { }) snapshotCreateShare.Complete() if err != nil { - snapshotCreateShare.Failed(err).Send(l.opt.SnapshotQueue) + snapshotCreateShare.Failure(err).Send(l.opt.SnapshotQueue) return err } snapshotCreateShare.Success().Send(l.opt.SnapshotQueue) @@ -119,7 +119,7 @@ func (l *PublicHttpLooper) bind() error { snapshotListen := NewSnapshot("listen", l.id, 0) if l.listener, err = zctx.ListenWithOptions(l.shr.Token, &options); err != nil { - snapshotListen.Complete().Failed(err).Send(l.opt.SnapshotQueue) + snapshotListen.Complete().Failure(err).Send(l.opt.SnapshotQueue) return errors.Wrapf(err, "#%d error binding listener", l.id) } snapshotListen.Complete().Success().Send(l.opt.SnapshotQueue) @@ -193,7 +193,7 @@ func (l *PublicHttpLooper) iterate() { logrus.Errorf("#%d: payload mismatch", l.id) l.results.Mismatches++ - snapshot.Complete().Failed(err) + snapshot.Complete().Failure(err) } else { l.results.Bytes += uint64(len(outBase64)) logrus.Debugf("#%d: payload match", l.id) diff --git a/canary/snapshot.go b/canary/snapshot.go index 62a89682..7daf0099 100644 --- a/canary/snapshot.go +++ b/canary/snapshot.go @@ -31,7 +31,7 @@ func (s *Snapshot) Success() *Snapshot { return s } -func (s *Snapshot) Failed(err error) *Snapshot { +func (s *Snapshot) Failure(err error) *Snapshot { s.Ok = false s.Error = err return s diff --git a/cmd/zrok/testCanaryEnabler.go b/cmd/zrok/testCanaryEnabler.go index 0a8111cf..063f8e87 100644 --- a/cmd/zrok/testCanaryEnabler.go +++ b/cmd/zrok/testCanaryEnabler.go @@ -18,10 +18,13 @@ type testCanaryEnabler struct { cmd *cobra.Command enablers uint iterations uint + preDelay time.Duration minPreDelay time.Duration maxPreDelay time.Duration + dwell time.Duration minDwell time.Duration maxDwell time.Duration + pacing time.Duration minPacing time.Duration maxPacing time.Duration skipDisable bool @@ -38,8 +41,10 @@ func newTestCanaryEnabler() *testCanaryEnabler { cmd.Run = command.run cmd.Flags().UintVarP(&command.enablers, "enablers", "e", 1, "Number of concurrent enablers to start") cmd.Flags().UintVarP(&command.iterations, "iterations", "i", 1, "Number of iterations") + cmd.Flags().DurationVar(&command.dwell, "dwell", 0, "Fixed dwell time") cmd.Flags().DurationVar(&command.minDwell, "min-dwell", 0, "Minimum dwell time") cmd.Flags().DurationVar(&command.maxDwell, "max-dwell", 0, "Maximum dwell time") + cmd.Flags().DurationVar(&command.pacing, "pacing", 0, "Fixed pacing time") cmd.Flags().DurationVar(&command.minPacing, "min-pacing", 0, "Minimum pacing time") cmd.Flags().DurationVar(&command.maxPacing, "max-pacing", 0, "Maximum pacing time") cmd.Flags().BoolVar(&command.skipDisable, "skip-disable", false, "Disable (clean up) enabled environments") @@ -57,17 +62,20 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) { panic(err) } - var sc *canary.SnapshotCollector - var scCtx context.Context - var scCancel context.CancelFunc + var sns *canary.SnapshotStreamer + var snsCtx context.Context + var snsCancel context.CancelFunc if cmd.canaryConfig != "" { cfg, err := canary.LoadConfig(cmd.canaryConfig) if err != nil { panic(err) } - scCtx, scCancel = context.WithCancel(context.Background()) - sc = canary.NewSnapshotCollector(scCtx, cfg) - go sc.Run() + snsCtx, snsCancel = context.WithCancel(context.Background()) + sns, err = canary.NewSnapshotStreamer(snsCtx, cfg) + if err != nil { + panic(err) + } + go sns.Run() } var enablers []*canary.Enabler @@ -76,8 +84,8 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) { preDelayDelta := cmd.maxPreDelay.Milliseconds() - cmd.minPreDelay.Milliseconds() if preDelayDelta > 0 { preDelay = int64(rand.Intn(int(preDelayDelta))) + cmd.minPreDelay.Milliseconds() - time.Sleep(time.Duration(preDelay) * time.Millisecond) } + time.Sleep(time.Duration(preDelay) * time.Millisecond) enablerOpts := &canary.EnablerOptions{ Iterations: cmd.iterations, @@ -86,8 +94,16 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) { MinPacing: cmd.minPacing, MaxPacing: cmd.maxPacing, } - if sc != nil { - enablerOpts.SnapshotQueue = sc.InputQueue + if cmd.pacing > 0 { + enablerOpts.MinDwell = cmd.dwell + enablerOpts.MaxDwell = cmd.dwell + } + if cmd.pacing > 0 { + enablerOpts.MinPacing = cmd.pacing + enablerOpts.MaxPacing = cmd.pacing + } + if sns != nil { + enablerOpts.SnapshotQueue = sns.InputQueue } enabler := canary.NewEnabler(i, enablerOpts, root) enablers = append(enablers, enabler) @@ -100,8 +116,8 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) { disablerOpts := &canary.DisablerOptions{ Environments: enablers[i].Environments, } - if sc != nil { - disablerOpts.SnapshotQueue = sc.InputQueue + if sns != nil { + disablerOpts.SnapshotQueue = sns.InputQueue } disabler := canary.NewDisabler(i, disablerOpts, root) disablers = append(disablers, disabler) @@ -131,13 +147,10 @@ func (cmd *testCanaryEnabler) run(_ *cobra.Command, _ []string) { <-enabler.Done } - if sc != nil { - scCancel() - <-sc.Closed - if err := sc.Store(); err != nil { - panic(err) - } + if sns != nil { + snsCancel() + <-sns.Closed } - logrus.Infof("complete") + logrus.Info("complete") } diff --git a/cmd/zrok/testCanaryPrivateProxy.go b/cmd/zrok/testCanaryPrivateProxy.go index 5bf77c71..3313903b 100644 --- a/cmd/zrok/testCanaryPrivateProxy.go +++ b/cmd/zrok/testCanaryPrivateProxy.go @@ -79,6 +79,10 @@ func newTestCanaryPrivateProxy() *testCanaryPrivateProxy { } func (cmd *testCanaryPrivateProxy) run(_ *cobra.Command, _ []string) { + if err := canary.AcknowledgeDangerousCanary(); err != nil { + logrus.Fatal(err) + } + root, err := environment.LoadRoot() if err != nil { panic(err) diff --git a/cmd/zrok/testCanaryPublicProxy.go b/cmd/zrok/testCanaryPublicProxy.go index 21299140..81e44a5b 100644 --- a/cmd/zrok/testCanaryPublicProxy.go +++ b/cmd/zrok/testCanaryPublicProxy.go @@ -79,6 +79,10 @@ func newTestCanaryPublicProxy() *testCanaryPublicProxy { } func (cmd *testCanaryPublicProxy) run(_ *cobra.Command, _ []string) { + if err := canary.AcknowledgeDangerousCanary(); err != nil { + logrus.Fatal(err) + } + root, err := environment.LoadRoot() if err != nil { panic(err)