zrok office hours miscellany (#771)

This commit is contained in:
Michael Quigley 2024-11-01 15:07:26 -04:00
parent a7e1c4524b
commit a0e7b5a3aa
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
6 changed files with 9 additions and 434 deletions

View File

@ -28,20 +28,20 @@ type LooperResults struct {
}
func ReportLooperResults(results []*LooperResults) {
totalXfer := uint64(0)
totalBytes := uint64(0)
totalXferRate := uint64(0)
totalErrors := uint(0)
totalMismatches := uint(0)
totalLoops := uint(0)
for i, result := range results {
totalBytes += result.Bytes
deltaSeconds := result.StopTime.Sub(result.StartTime).Seconds()
xfer := uint64(float64(result.Bytes) / deltaSeconds)
totalXfer += xfer
xferRate := uint64(float64(result.Bytes) / deltaSeconds)
totalXferRate += xferRate
totalErrors += result.Errors
totalMismatches += result.Mismatches
xferSec := util.BytesToSize(int64(xfer))
totalLoops += result.Loops
logrus.Infof("looper #%d: %d loops, %d errors, %d mismatches, %s/sec", i, result.Loops, result.Errors, result.Mismatches, xferSec)
logrus.Infof("looper #%d: %d loops, %v, %d errors, %d mismatches, %s/sec", i, result.Loops, util.BytesToSize(int64(result.Bytes)), result.Errors, result.Mismatches, util.BytesToSize(int64(xferRate)))
}
totalXferSec := util.BytesToSize(int64(totalXfer))
logrus.Infof("total: %d loops, %d errors, %d mismatches, %s/sec", totalLoops, totalErrors, totalMismatches, totalXferSec)
logrus.Infof("total: %d loops, %v, %d errors, %d mismatches, %s/sec", totalLoops, util.BytesToSize(int64(totalBytes)), totalErrors, totalMismatches, util.BytesToSize(int64(totalXferRate)))
}

View File

@ -42,12 +42,6 @@ func NewPublicHttpLooper(id uint, frontend string, opt *LooperOptions, root env_
func (l *PublicHttpLooper) Run() {
defer close(l.done)
defer logrus.Infof("#%d stopping", l.id)
defer func() {
if r := recover(); r != nil {
logrus.Errorf("#%d: %v", l.id, r)
panic(r)
}
}()
logrus.Infof("#%d starting", l.id)
if err := l.startup(); err != nil {
@ -122,7 +116,7 @@ func (l *PublicHttpLooper) bindListener() error {
go func() {
if err := http.Serve(l.listener, l); err != nil {
logrus.Errorf("#%d error starting http listener: %v", l.id, err)
logrus.Errorf("#%d error in http listener: %v", l.id, err)
}
}()

View File

@ -25,7 +25,6 @@ func init() {
adminCmd.AddCommand(adminListCmd)
adminCmd.AddCommand(adminUpdateCmd)
testCmd.AddCommand(testCanaryCmd)
testCmd.AddCommand(testLoopCmd)
rootCmd.AddCommand(adminCmd)
rootCmd.AddCommand(configCmd)
rootCmd.AddCommand(modifyCmd)
@ -104,12 +103,6 @@ var testCanaryCmd = &cobra.Command{
Short: "Utilities for performance management",
}
var testLoopCmd = &cobra.Command{
Use: "loopback",
Aliases: []string{"loop"},
Short: "Loopback testing utilities",
}
func main() {
if err := rootCmd.Execute(); err != nil {
if panicInstead {

View File

@ -88,6 +88,7 @@ func (cmd *testCanaryPeriodicCommand) run(_ *cobra.Command, _ []string) {
looper.Abort()
}
}()
for _, l := range loopers {
<-l.Done()
}

View File

@ -1,292 +0,0 @@
package main
import (
"bytes"
"encoding/base64"
"fmt"
"github.com/go-openapi/runtime"
httptransport "github.com/go-openapi/runtime/client"
"github.com/openziti/sdk-golang/ziti"
"github.com/openziti/sdk-golang/ziti/edge"
"github.com/openziti/zrok/environment"
"github.com/openziti/zrok/environment/env_core"
"github.com/openziti/zrok/rest_client_zrok"
"github.com/openziti/zrok/rest_client_zrok/share"
"github.com/openziti/zrok/rest_model_zrok"
"github.com/openziti/zrok/sdk/golang/sdk"
"github.com/openziti/zrok/tui"
"github.com/openziti/zrok/util"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"io"
"math/rand"
"net/http"
"os"
"os/signal"
"syscall"
"time"
)
func init() {
testLoopCmd.AddCommand(newTestLoopPublicCommand().cmd)
}
type testLoopPublicCommand struct {
cmd *cobra.Command
loopers int
iterations int
statusEvery int
timeoutSeconds int
minPayload int
maxPayload int
minDwellMs int
maxDwellMs int
minPacingMs int
maxPacingMs int
frontendSelection []string
}
func newTestLoopPublicCommand() *testLoopPublicCommand {
cmd := &cobra.Command{
Use: "public",
Short: "Start a loop agent testing public proxy shares",
Args: cobra.NoArgs,
}
command := &testLoopPublicCommand{cmd: cmd}
cmd.Run = command.run
cmd.Flags().IntVarP(&command.loopers, "loopers", "l", 1, "Number of current loopers to start")
cmd.Flags().IntVarP(&command.iterations, "iterations", "i", 1, "Number of iterations per looper")
cmd.Flags().IntVarP(&command.statusEvery, "status-every", "E", 100, "Show status every # iterations")
cmd.Flags().IntVarP(&command.timeoutSeconds, "timeout-seconds", "T", 30, "Time out after # seconds when sending http requests")
cmd.Flags().IntVar(&command.minPayload, "min-payload", 64, "Minimum payload size in bytes")
cmd.Flags().IntVar(&command.maxPayload, "max-payload", 10240, "Maximum payload size in bytes")
cmd.Flags().IntVar(&command.minDwellMs, "min-dwell-ms", 1000, "Minimum dwell time in milliseconds")
cmd.Flags().IntVar(&command.maxDwellMs, "max-dwell-ms", 1000, "Maximum dwell time in milliseconds")
cmd.Flags().IntVar(&command.minPacingMs, "min-pacing-ms", 0, "Minimum pacing in milliseconds")
cmd.Flags().IntVar(&command.maxPacingMs, "max-pacing-ms", 0, "Maximum pacing in milliseconds")
cmd.Flags().StringArrayVar(&command.frontendSelection, "frontends", []string{"public"}, "Selected frontends to use for the share")
return command
}
func (cmd *testLoopPublicCommand) run(_ *cobra.Command, _ []string) {
var loopers []*looper
for i := 0; i < cmd.loopers; i++ {
l := newLooper(i, cmd)
loopers = append(loopers, l)
go l.run()
}
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
for _, looper := range loopers {
looper.stop = true
}
}()
for _, l := range loopers {
<-l.done
}
totalMismatches := 0
totalXfer := int64(0)
totalLoops := int64(0)
for _, l := range loopers {
deltaSeconds := l.stopTime.Sub(l.startTime).Seconds()
xfer := int64(float64(l.bytes) / deltaSeconds)
totalXfer += xfer
totalMismatches += l.mismatches
xferSec := util.BytesToSize(xfer)
totalLoops += l.loops
logrus.Infof("looper #%d: %d loops, %d mismatches, %s/sec", l.id, l.loops, l.mismatches, xferSec)
}
totalXferSec := util.BytesToSize(totalXfer)
logrus.Infof("total: %d loops, %d mismatches, %s/sec", totalLoops, totalMismatches, totalXferSec)
os.Exit(0)
}
type looper struct {
id int
cmd *testLoopPublicCommand
env *env_core.Environment
done chan struct{}
listener edge.Listener
zif string
zrok *rest_client_zrok.Zrok
shrToken string
proxyEndpoint string
auth runtime.ClientAuthInfoWriter
mismatches int
bytes int64
loops int64
startTime time.Time
stopTime time.Time
stop bool
}
func newLooper(id int, cmd *testLoopPublicCommand) *looper {
return &looper{
id: id,
cmd: cmd,
done: make(chan struct{}),
}
}
func (l *looper) run() {
defer close(l.done)
defer logrus.Infof("stopping #%d", l.id)
l.startup()
logrus.Infof("looper #%d, shrToken: %v, frontend: %v", l.id, l.shrToken, l.proxyEndpoint)
if l.serviceListener() {
l.dwell()
l.iterate()
}
logrus.Infof("looper #%d: complete", l.id)
l.shutdown()
}
func (l *looper) serviceListener() bool {
zcfg, err := ziti.NewConfigFromFile(l.zif)
if err != nil {
logrus.Errorf("error opening ziti config '%v': %v", l.zif, err)
return false
}
options := ziti.ListenOptions{
ConnectTimeout: 5 * time.Minute,
WaitForNEstablishedListeners: 1,
}
zctx, err := ziti.NewContext(zcfg)
if err != nil {
logrus.Errorf("error loading ziti context: %v", err)
return false
}
if l.listener, err = zctx.ListenWithOptions(l.shrToken, &options); err != nil {
logrus.Errorf("looper #%d, error listening: %v", l.id, err)
return false
}
go func() {
if err := http.Serve(l.listener, l); err != nil {
logrus.Errorf("looper #%d, error serving: %v", l.id, err)
}
}()
return true
}
func (l *looper) ServeHTTP(w http.ResponseWriter, r *http.Request) {
buf := new(bytes.Buffer)
io.Copy(buf, r.Body)
w.Write(buf.Bytes())
}
func (l *looper) startup() {
logrus.Infof("starting #%d", l.id)
env, err := environment.LoadRoot()
if err != nil {
panic(err)
}
if !env.IsEnabled() {
tui.Error("unable to load environment; did you 'zrok enable'?", nil)
}
l.env = env.Environment()
l.zif, err = env.ZitiIdentityNamed(env.EnvironmentIdentityName())
if err != nil {
panic(err)
}
l.zrok, err = env.Client()
if err != nil {
panic(err)
}
l.auth = httptransport.APIKeyAuth("x-token", "header", l.env.Token)
tunnelReq := share.NewShareParams()
tunnelReq.Body = &rest_model_zrok.ShareRequest{
EnvZID: l.env.ZitiIdentity,
ShareMode: string(sdk.PublicShareMode),
FrontendSelection: l.cmd.frontendSelection,
BackendMode: string(sdk.ProxyBackendMode),
BackendProxyEndpoint: fmt.Sprintf("looper#%d", l.id),
AuthScheme: string(sdk.None),
}
tunnelReq.SetTimeout(60 * time.Second)
tunnelResp, err := l.zrok.Share.Share(tunnelReq, l.auth)
if err != nil {
panic(err)
}
l.shrToken = tunnelResp.Payload.ShrToken
l.proxyEndpoint = tunnelResp.Payload.FrontendProxyEndpoints[0]
}
func (l *looper) dwell() {
dwell := l.cmd.minDwellMs
if l.cmd.maxDwellMs-l.cmd.minDwellMs > 0 {
dwell = rand.Intn(l.cmd.maxDwellMs-l.cmd.minDwellMs) + l.cmd.minDwellMs
}
time.Sleep(time.Duration(dwell) * time.Millisecond)
}
func (l *looper) iterate() {
l.startTime = time.Now()
defer func() { l.stopTime = time.Now() }()
for i := 0; i < l.cmd.iterations && !l.stop; i++ {
if i > 0 && i%l.cmd.statusEvery == 0 {
logrus.Infof("looper #%d: iteration #%d", l.id, i)
}
sz := l.cmd.maxPayload
if l.cmd.maxPayload-l.cmd.minPayload > 0 {
sz = rand.Intn(l.cmd.maxPayload-l.cmd.minPayload) + l.cmd.minPayload
}
outpayload := make([]byte, sz)
outbase64 := base64.StdEncoding.EncodeToString(outpayload)
rand.Read(outpayload)
if req, err := http.NewRequest("POST", l.proxyEndpoint, bytes.NewBufferString(outbase64)); err == nil {
client := &http.Client{Timeout: time.Second * time.Duration(l.cmd.timeoutSeconds)}
if resp, err := client.Do(req); err == nil {
if resp.StatusCode != 200 {
logrus.Errorf("looper #%d unexpected response status code %v!", l.id, resp.StatusCode)
}
inpayload := new(bytes.Buffer)
io.Copy(inpayload, resp.Body)
inbase64 := inpayload.String()
if inbase64 != outbase64 {
logrus.Errorf("looper #%d payload mismatch!", l.id)
l.mismatches++
} else {
l.bytes += int64(len(outbase64))
logrus.Debugf("looper #%d payload match", l.id)
}
} else {
logrus.Errorf("looper #%d error: %v", l.id, err)
}
} else {
logrus.Errorf("looper #%d error creating request: %v", l.id, err)
}
pacingMs := l.cmd.maxPayload
if l.cmd.maxPacingMs-l.cmd.minPacingMs > 0 {
pacingMs = rand.Intn(l.cmd.maxPacingMs-l.cmd.minPacingMs) + l.cmd.minPacingMs
time.Sleep(time.Duration(pacingMs) * time.Millisecond)
}
l.loops++
}
}
func (l *looper) shutdown() {
if l.listener != nil {
if err := l.listener.Close(); err != nil {
logrus.Errorf("looper #%d error closing listener: %v", l.id, err)
}
}
untunnelReq := share.NewUnshareParams()
untunnelReq.Body = &rest_model_zrok.UnshareRequest{
EnvZID: l.env.ZitiIdentity,
ShrToken: l.shrToken,
}
if _, err := l.zrok.Share.Unshare(untunnelReq, l.auth); err != nil {
logrus.Errorf("error shutting down looper #%d: %v", l.id, err)
}
}

View File

@ -1,121 +0,0 @@
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"net"
"net/http"
"os"
"strings"
"time"
"github.com/openziti/sdk-golang/ziti"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
)
func init() {
testCmd.AddCommand(newTestWebsocketCommand().cmd)
}
type testWebsocketCommand struct {
cmd *cobra.Command
identityJsonFile string
serviceName string
enableZiti bool
}
func newTestWebsocketCommand() *testWebsocketCommand {
cmd := &cobra.Command{
Use: "websocket",
Args: cobra.RangeArgs(0, 1),
}
command := &testWebsocketCommand{cmd: cmd}
cmd.Flags().BoolVar(&command.enableZiti, "ziti", false, "Enable the usage of a ziti network")
cmd.Flags().StringVar(&command.identityJsonFile, "ziti-identity", "", "Path to Ziti Identity json file")
cmd.Flags().StringVar(&command.serviceName, "ziti-name", "", "Name of the Ziti Service")
cmd.Run = command.run
return command
}
func (cmd *testWebsocketCommand) run(_ *cobra.Command, args []string) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*6)
defer cancel()
opts := &websocket.DialOptions{}
var addr string
if cmd.enableZiti {
identityJsonBytes, err := os.ReadFile(cmd.identityJsonFile)
if err != nil {
fmt.Fprintf(os.Stderr, "Error: failed to read identity config JSON from file %s: %s\n", cmd.identityJsonFile, err)
os.Exit(1)
}
if len(identityJsonBytes) == 0 {
fmt.Fprintf(os.Stderr, "Error: When running a ziti enabled service must have ziti identity provided\n\n")
flag.Usage()
os.Exit(1)
}
cfg := &ziti.Config{}
err = json.Unmarshal(identityJsonBytes, cfg)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to load ziti configuration JSON: %v", err)
os.Exit(1)
}
zitiContext, err := ziti.NewContext(cfg)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to load ziti context: %v", err)
os.Exit(1)
}
dial := func(_ context.Context, _, addr string) (net.Conn, error) {
service := strings.Split(addr, ":")[0]
return zitiContext.DialWithOptions(service, &ziti.DialOptions{ConnectTimeout: 30 * time.Second})
}
zitiTransport := http.DefaultTransport.(*http.Transport).Clone()
zitiTransport.DialContext = dial
opts.HTTPClient = &http.Client{Transport: zitiTransport}
addr = cmd.serviceName
} else {
if len(args) == 0 {
logrus.Error("address required if not using ziti")
flag.Usage()
os.Exit(1)
}
addr = args[0]
}
logrus.Info(fmt.Sprintf("http://%s/echo", addr))
c, _, err := websocket.Dial(ctx, fmt.Sprintf("http://%s/echo", addr), opts)
if err != nil {
logrus.Error(err)
return
}
defer c.Close(websocket.StatusInternalError, "the sky is falling")
logrus.Info("writing to server...")
err = wsjson.Write(ctx, c, "hi")
if err != nil {
logrus.Error(err)
return
}
logrus.Info("reading response...")
typ, dat, err := c.Read(ctx)
if err != nil {
logrus.Error(err)
return
}
logrus.Info(typ)
logrus.Info(string(dat))
c.Close(websocket.StatusNormalClosure, "")
}