From 4efbdd82511cce5492565a0fd9e4860eabfbf7d0 Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Mon, 4 Nov 2024 13:34:44 -0500 Subject: [PATCH] canary iteration; public-proxy; private-proxy (#771) --- canary/privateHttpLooper.go | 236 ++++++++++++++++++ canary/publicHttpLooper.go | 26 +- cmd/zrok/testCanaryPrivateProxy.go | 113 +++++++++ ...ryPeriodic.go => testCanaryPublicProxy.go} | 14 +- 4 files changed, 368 insertions(+), 21 deletions(-) create mode 100644 canary/privateHttpLooper.go create mode 100644 cmd/zrok/testCanaryPrivateProxy.go rename cmd/zrok/{testCanaryPeriodic.go => testCanaryPublicProxy.go} (90%) diff --git a/canary/privateHttpLooper.go b/canary/privateHttpLooper.go new file mode 100644 index 00000000..b3f5d715 --- /dev/null +++ b/canary/privateHttpLooper.go @@ -0,0 +1,236 @@ +package canary + +import ( + "bytes" + "context" + cryptorand "crypto/rand" + "encoding/base64" + "github.com/openziti/sdk-golang/ziti" + "github.com/openziti/sdk-golang/ziti/edge" + "github.com/openziti/zrok/environment/env_core" + "github.com/openziti/zrok/sdk/golang/sdk" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "io" + "math/rand" + "net" + "net/http" + "time" +) + +type PrivateHttpLooper struct { + id uint + acc *sdk.Access + opt *LooperOptions + root env_core.Root + shr *sdk.Share + listener edge.Listener + abort bool + done chan struct{} + results *LooperResults +} + +func NewPrivateHttpLooper(id uint, opt *LooperOptions, root env_core.Root) *PrivateHttpLooper { + return &PrivateHttpLooper{ + id: id, + opt: opt, + root: root, + done: make(chan struct{}), + results: &LooperResults{}, + } +} + +func (l *PrivateHttpLooper) Run() { + defer close(l.done) + defer logrus.Infof("#%d stopping", l.id) + defer l.shutdown() + logrus.Infof("#%d starting", l.id) + + if err := l.startup(); err != nil { + logrus.Fatalf("#%d error starting: %v", l.id, err) + } + + if err := l.bind(); err != nil { + logrus.Fatalf("#%d error binding: %v", l.id, err) + } + + l.dwell() + + l.iterate() + + logrus.Infof("#%d completed", l.id) +} + +func (l *PrivateHttpLooper) Abort() { + l.abort = true +} + +func (l *PrivateHttpLooper) Done() <-chan struct{} { + return l.done +} + +func (l *PrivateHttpLooper) Results() *LooperResults { + return l.results +} + +func (l *PrivateHttpLooper) startup() error { + shr, err := sdk.CreateShare(l.root, &sdk.ShareRequest{ + ShareMode: sdk.PrivateShareMode, + BackendMode: sdk.ProxyBackendMode, + Target: "canary.PrivateHttpLooper", + PermissionMode: sdk.ClosedPermissionMode, + }) + if err != nil { + return err + } + l.shr = shr + + acc, err := sdk.CreateAccess(l.root, &sdk.AccessRequest{ + ShareToken: shr.Token, + }) + if err != nil { + return err + } + l.acc = acc + + logrus.Infof("#%d allocated share '%v', allocated frontend '%v'", l.id, shr.Token, acc.Token) + + return nil +} + +func (l *PrivateHttpLooper) bind() error { + zif, err := l.root.ZitiIdentityNamed(l.root.EnvironmentIdentityName()) + if err != nil { + return errors.Wrapf(err, "#%d error getting identity", l.id) + } + zcfg, err := ziti.NewConfigFromFile(zif) + if err != nil { + return errors.Wrapf(err, "#%d error loading ziti config", l.id) + } + options := ziti.ListenOptions{ + ConnectTimeout: 5 * time.Minute, + WaitForNEstablishedListeners: 1, + } + zctx, err := ziti.NewContext(zcfg) + if err != nil { + return errors.Wrapf(err, "#%d error creating ziti context", l.id) + } + + if l.listener, err = zctx.ListenWithOptions(l.shr.Token, &options); err != nil { + return errors.Wrapf(err, "#%d error binding listener", l.id) + } + + go func() { + if err := http.Serve(l.listener, l); err != nil { + logrus.Errorf("#%d error in http listener: %v", l.id, err) + } + }() + + return nil +} + +func (l *PrivateHttpLooper) ServeHTTP(w http.ResponseWriter, r *http.Request) { + buf := new(bytes.Buffer) + io.Copy(buf, r.Body) + w.Write(buf.Bytes()) +} + +func (l *PrivateHttpLooper) dwell() { + dwell := l.opt.MinDwell.Milliseconds() + dwelta := l.opt.MaxDwell.Milliseconds() - l.opt.MinDwell.Milliseconds() + if dwelta > 0 { + dwell = int64(rand.Intn(int(dwelta)) + int(l.opt.MinDwell.Milliseconds())) + } + time.Sleep(time.Duration(dwell) * time.Millisecond) +} + +type connDialer struct { + c net.Conn +} + +func (cd connDialer) Dial(_ context.Context, network, addr string) (net.Conn, error) { + return cd.c, nil +} + +func (l *PrivateHttpLooper) iterate() { + l.results.StartTime = time.Now() + defer func() { l.results.StopTime = time.Now() }() + + for i := uint(0); i < l.opt.Iterations; i++ { + if i > 0 && i%l.opt.StatusInterval == 0 { + logrus.Infof("#%d: iteration %d", l.id, i) + } + + conn, err := sdk.NewDialer(l.shr.Token, l.root) + if err != nil { + logrus.Errorf("#%d: error dialing: %v", l.id, err) + l.results.Errors++ + time.Sleep(1 * time.Second) + continue + } + + payloadSize := l.opt.MaxPayload + payloadRange := l.opt.MaxPayload - l.opt.MinPayload + if payloadRange > 0 { + payloadSize = (rand.Uint64() % payloadRange) + l.opt.MinPayload + } + outPayload := make([]byte, payloadSize) + cryptorand.Read(outPayload) + outBase64 := base64.StdEncoding.EncodeToString(outPayload) + + if req, err := http.NewRequest("POST", "http://"+l.shr.Token, bytes.NewBufferString(outBase64)); err == nil { + client := &http.Client{Timeout: l.opt.Timeout, Transport: &http.Transport{DialContext: connDialer{conn}.Dial}} + if resp, err := client.Do(req); err == nil { + if resp.StatusCode != 200 { + logrus.Errorf("#%d: unexpected status code: %v", l.id, resp.StatusCode) + l.results.Errors++ + } + inPayload := new(bytes.Buffer) + io.Copy(inPayload, resp.Body) + inBase64 := inPayload.String() + if inBase64 != outBase64 { + logrus.Errorf("#%d: payload mismatch", l.id) + l.results.Mismatches++ + } else { + l.results.Bytes += uint64(len(outBase64)) + logrus.Debugf("#%d: payload match", l.id) + } + } else { + logrus.Errorf("#%d: error: %v", l.id, err) + l.results.Errors++ + } + } else { + logrus.Errorf("#%d: error creating request: %v", l.id, err) + l.results.Errors++ + } + + if err := conn.Close(); err != nil { + logrus.Errorf("#%d: error closing connection: %v", l.id, err) + } + + pacingMs := l.opt.MaxPacing.Milliseconds() + pacingDelta := l.opt.MaxPacing.Milliseconds() - l.opt.MinPacing.Milliseconds() + if pacingDelta > 0 { + pacingMs = (rand.Int63() % pacingDelta) + l.opt.MinPacing.Milliseconds() + time.Sleep(time.Duration(pacingMs) * time.Millisecond) + } + + l.results.Loops++ + } +} + +func (l *PrivateHttpLooper) shutdown() { + if l.listener != nil { + if err := l.listener.Close(); err != nil { + logrus.Errorf("#%d error closing listener: %v", l.id, err) + } + } + + if err := sdk.DeleteAccess(l.root, l.acc); err != nil { + logrus.Errorf("#%d error deleting access '%v': %v", l.id, l.acc.Token, err) + } + + if err := sdk.DeleteShare(l.root, l.shr); err != nil { + logrus.Errorf("#%d error deleting share '%v': %v", l.id, l.shr.Token, err) + } +} diff --git a/canary/publicHttpLooper.go b/canary/publicHttpLooper.go index 1fa520e5..2a212341 100644 --- a/canary/publicHttpLooper.go +++ b/canary/publicHttpLooper.go @@ -42,14 +42,15 @@ func NewPublicHttpLooper(id uint, frontend string, opt *LooperOptions, root env_ func (l *PublicHttpLooper) Run() { defer close(l.done) defer logrus.Infof("#%d stopping", l.id) + defer l.shutdown() logrus.Infof("#%d starting", l.id) if err := l.startup(); err != nil { logrus.Fatalf("#%d error starting: %v", l.id, err) } - if err := l.bindListener(); err != nil { - logrus.Fatalf("#%d error binding listener: %v", l.id, err) + if err := l.bind(); err != nil { + logrus.Fatalf("#%d error binding: %v", l.id, err) } l.dwell() @@ -57,10 +58,6 @@ func (l *PublicHttpLooper) Run() { l.iterate() logrus.Infof("#%d completed", l.id) - - if err := l.shutdown(); err != nil { - logrus.Fatalf("#%d: error shutting down: %v", l.id, err) - } } func (l *PublicHttpLooper) Abort() { @@ -87,12 +84,13 @@ func (l *PublicHttpLooper) startup() error { return err } l.shr = shr + logrus.Infof("#%d allocated share '%v'", l.id, l.shr.Token) return nil } -func (l *PublicHttpLooper) bindListener() error { +func (l *PublicHttpLooper) bind() error { zif, err := l.root.ZitiIdentityNamed(l.root.EnvironmentIdentityName()) if err != nil { return errors.Wrapf(err, "#%d error getting identity", l.id) @@ -148,8 +146,9 @@ func (l *PublicHttpLooper) iterate() { } payloadSize := l.opt.MaxPayload - if l.opt.MaxPayload-l.opt.MinPayload > 0 { - payloadSize = rand.Uint64() % (l.opt.MaxPayload - l.opt.MinPayload) + payloadRange := l.opt.MaxPayload - l.opt.MinPayload + if payloadRange > 0 { + payloadSize = (rand.Uint64() % payloadRange) + l.opt.MinPayload } outPayload := make([]byte, payloadSize) cryptorand.Read(outPayload) @@ -184,14 +183,15 @@ func (l *PublicHttpLooper) iterate() { pacingMs := l.opt.MaxPacing.Milliseconds() pacingDelta := l.opt.MaxPacing.Milliseconds() - l.opt.MinPacing.Milliseconds() if pacingDelta > 0 { - pacingMs = rand.Int63() % pacingDelta + pacingMs = (rand.Int63() % pacingDelta) + l.opt.MinPacing.Milliseconds() time.Sleep(time.Duration(pacingMs) * time.Millisecond) } + l.results.Loops++ } } -func (l *PublicHttpLooper) shutdown() error { +func (l *PublicHttpLooper) shutdown() { if l.listener != nil { if err := l.listener.Close(); err != nil { logrus.Errorf("#%d error closing listener: %v", l.id, err) @@ -199,8 +199,6 @@ func (l *PublicHttpLooper) shutdown() error { } if err := sdk.DeleteShare(l.root, l.shr); err != nil { - return errors.Wrapf(err, "#%d error deleting share", l.id) + logrus.Errorf("#%d error deleting share '%v': %v", l.id, l.shr.Token, err) } - - return nil } diff --git a/cmd/zrok/testCanaryPrivateProxy.go b/cmd/zrok/testCanaryPrivateProxy.go new file mode 100644 index 00000000..cb3db67d --- /dev/null +++ b/cmd/zrok/testCanaryPrivateProxy.go @@ -0,0 +1,113 @@ +package main + +import ( + "github.com/openziti/zrok/canary" + "github.com/openziti/zrok/environment" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "math/rand" + "os" + "os/signal" + "syscall" + "time" +) + +func init() { + testCanaryCmd.AddCommand(newTestCanaryPrivateProxy().cmd) +} + +type testCanaryPrivateProxy struct { + cmd *cobra.Command + loopers uint + iterations uint + statusInterval uint + timeout time.Duration + minPayload uint64 + maxPayload uint64 + minPreDelay time.Duration + maxPreDelay time.Duration + minDwell time.Duration + maxDwell time.Duration + minPacing time.Duration + maxPacing time.Duration +} + +func newTestCanaryPrivateProxy() *testCanaryPrivateProxy { + cmd := &cobra.Command{ + Use: "private-proxy", + Short: "Run a private `proxy` looper canary", + Args: cobra.NoArgs, + } + command := &testCanaryPrivateProxy{cmd: cmd} + cmd.Run = command.run + cmd.Flags().UintVarP(&command.loopers, "loopers", "l", 1, "Number of concurrent loopers to start") + cmd.Flags().UintVarP(&command.iterations, "iterations", "i", 1, "Number of iterations") + cmd.Flags().UintVarP(&command.statusInterval, "status-interval", "S", 100, "Show status every # iterations") + cmd.Flags().DurationVarP(&command.timeout, "timeout", "T", 30*time.Second, "Timeout when sending HTTP requests") + cmd.Flags().Uint64Var(&command.minPayload, "min-payload", 64, "Minimum payload size in bytes") + cmd.Flags().Uint64Var(&command.maxPayload, "max-payload", 10240, "Maximum payload size in bytes") + cmd.Flags().DurationVar(&command.minPreDelay, "min-pre-delay", 0, "Minimum pre-delay before creating the next looper") + cmd.Flags().DurationVar(&command.maxPreDelay, "max-pre-delay", 0, "Maximum pre-delay before creating the next looper") + cmd.Flags().DurationVar(&command.minDwell, "min-dwell", 1*time.Second, "Minimum dwell time") + cmd.Flags().DurationVar(&command.maxDwell, "max-dwell", 1*time.Second, "Maximum dwell time") + cmd.Flags().DurationVar(&command.minPacing, "min-pacing", 0, "Minimum pacing time") + cmd.Flags().DurationVar(&command.maxPacing, "max-pacing", 0, "Maximum pacing time") + return command +} + +func (cmd *testCanaryPrivateProxy) run(_ *cobra.Command, _ []string) { + root, err := environment.LoadRoot() + if err != nil { + panic(err) + } + + if !root.IsEnabled() { + logrus.Fatal("unable to load environment; did you 'zrok enable'?") + } + + var loopers []*canary.PrivateHttpLooper + for i := uint(0); i < cmd.loopers; i++ { + preDelay := cmd.maxPreDelay.Milliseconds() + preDelayDelta := cmd.maxPreDelay.Milliseconds() - cmd.minPreDelay.Milliseconds() + if preDelayDelta > 0 { + preDelay = int64(rand.Intn(int(preDelayDelta))) + cmd.minPreDelay.Milliseconds() + time.Sleep(time.Duration(preDelay) * time.Millisecond) + } + + looperOpts := &canary.LooperOptions{ + Iterations: cmd.iterations, + StatusInterval: cmd.statusInterval, + Timeout: cmd.timeout, + MinPayload: cmd.minPayload, + MaxPayload: cmd.maxPayload, + MinDwell: cmd.minDwell, + MaxDwell: cmd.maxDwell, + MinPacing: cmd.minPacing, + MaxPacing: cmd.maxPacing, + } + looper := canary.NewPrivateHttpLooper(i, looperOpts, root) + loopers = append(loopers, looper) + go looper.Run() + } + + c := make(chan os.Signal) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + for _, looper := range loopers { + looper.Abort() + } + }() + + for _, l := range loopers { + <-l.Done() + } + + results := make([]*canary.LooperResults, 0) + for i := uint(0); i < cmd.loopers; i++ { + results = append(results, loopers[i].Results()) + } + canary.ReportLooperResults(results) + + os.Exit(0) +} diff --git a/cmd/zrok/testCanaryPeriodic.go b/cmd/zrok/testCanaryPublicProxy.go similarity index 90% rename from cmd/zrok/testCanaryPeriodic.go rename to cmd/zrok/testCanaryPublicProxy.go index c5d3043e..ae7616dc 100644 --- a/cmd/zrok/testCanaryPeriodic.go +++ b/cmd/zrok/testCanaryPublicProxy.go @@ -13,10 +13,10 @@ import ( ) func init() { - testCanaryCmd.AddCommand(newTestCanaryPeriodicCommand().cmd) + testCanaryCmd.AddCommand(newTestCanaryPublicProxy().cmd) } -type testCanaryPeriodicCommand struct { +type testCanaryPublicProxy struct { cmd *cobra.Command loopers uint iterations uint @@ -33,13 +33,13 @@ type testCanaryPeriodicCommand struct { frontendSelection string } -func newTestCanaryPeriodicCommand() *testCanaryPeriodicCommand { +func newTestCanaryPublicProxy() *testCanaryPublicProxy { cmd := &cobra.Command{ - Use: "periodic", - Short: "Run a periodic canary inspection", + Use: "public-proxy", + Short: "Run a public `proxy` looper canary", Args: cobra.NoArgs, } - command := &testCanaryPeriodicCommand{cmd: cmd} + command := &testCanaryPublicProxy{cmd: cmd} cmd.Run = command.run cmd.Flags().UintVarP(&command.loopers, "loopers", "l", 1, "Number of concurrent loopers to start") cmd.Flags().UintVarP(&command.iterations, "iterations", "i", 1, "Number of iterations") @@ -57,7 +57,7 @@ func newTestCanaryPeriodicCommand() *testCanaryPeriodicCommand { return command } -func (cmd *testCanaryPeriodicCommand) run(_ *cobra.Command, _ []string) { +func (cmd *testCanaryPublicProxy) run(_ *cobra.Command, _ []string) { root, err := environment.LoadRoot() if err != nil { panic(err)