zrok/cmd/zrokloop/loop.go

180 lines
4.9 KiB
Go
Raw Normal View History

2022-09-30 21:23:53 +02:00
package main
2022-10-03 18:00:26 +02:00
import (
2022-10-03 22:34:24 +02:00
"bytes"
"crypto/rand"
"encoding/base64"
"fmt"
httptransport "github.com/go-openapi/runtime/client"
"github.com/openziti-test-kitchen/zrok/model"
"github.com/openziti-test-kitchen/zrok/rest_client_zrok/tunnel"
"github.com/openziti-test-kitchen/zrok/rest_model_zrok"
"github.com/openziti-test-kitchen/zrok/zrokdir"
2022-10-03 22:15:57 +02:00
"github.com/openziti/sdk-golang/ziti"
"github.com/openziti/sdk-golang/ziti/config"
"github.com/openziti/sdk-golang/ziti/edge"
2022-10-03 18:00:26 +02:00
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
2022-10-03 22:34:24 +02:00
"io"
2022-10-03 22:15:57 +02:00
"net/http"
"time"
2022-10-03 18:00:26 +02:00
)
2022-09-30 21:23:53 +02:00
func init() {
rootCmd.AddCommand(newRun().cmd)
}
type run struct {
2022-10-04 19:28:16 +02:00
cmd *cobra.Command
loopers int
iterations int
statusEvery int
dwellSeconds int
timeoutSeconds int
2022-09-30 21:23:53 +02:00
}
func newRun() *run {
cmd := &cobra.Command{
Use: "run",
Short: "Start a loop agent",
Args: cobra.ExactArgs(0),
}
r := &run{cmd: cmd}
cmd.Run = r.run
2022-10-03 18:00:26 +02:00
cmd.Flags().IntVarP(&r.loopers, "loopers", "l", 1, "Number of current loopers to start")
2022-10-03 22:44:15 +02:00
cmd.Flags().IntVarP(&r.iterations, "iterations", "i", 1, "Number of iterations per looper")
2022-10-04 19:16:58 +02:00
cmd.Flags().IntVarP(&r.statusEvery, "status-every", "E", 100, "Show status every # iterations")
2022-10-04 19:28:16 +02:00
cmd.Flags().IntVarP(&r.dwellSeconds, "dwell-seconds", "D", 1, "Dwell # seconds before starting iterations")
cmd.Flags().IntVarP(&r.timeoutSeconds, "timeout-seconds", "T", 30, "Time out after # seconds when sending http requests")
2022-09-30 21:23:53 +02:00
return r
}
func (r *run) run(_ *cobra.Command, _ []string) {
2022-10-03 18:00:26 +02:00
var loopers []*looper
for i := 0; i < r.loopers; i++ {
2022-10-04 19:16:30 +02:00
l := newLooper(i, r)
2022-10-03 18:00:26 +02:00
loopers = append(loopers, l)
go l.run()
}
for _, l := range loopers {
<-l.done
}
}
type looper struct {
2022-10-04 19:16:30 +02:00
id int
2022-10-04 19:28:16 +02:00
r *run
2022-10-04 19:16:30 +02:00
done chan struct{}
listener edge.Listener
2022-10-03 18:00:26 +02:00
}
2022-10-04 19:28:16 +02:00
func newLooper(id int, r *run) *looper {
2022-10-03 18:00:26 +02:00
return &looper{
2022-10-04 19:16:30 +02:00
id: id,
2022-10-04 19:28:16 +02:00
r: r,
2022-10-04 19:16:30 +02:00
done: make(chan struct{}),
2022-10-03 18:00:26 +02:00
}
}
func (l *looper) run() {
logrus.Infof("starting #%d", l.id)
defer close(l.done)
defer logrus.Infof("stopping #%d", l.id)
env, err := zrokdir.LoadEnvironment()
if err != nil {
panic(err)
}
2022-10-03 22:15:57 +02:00
zif, err := zrokdir.ZitiIdentityFile("environment")
if err != nil {
panic(err)
}
zrok, err := zrokdir.ZrokClient(env.ApiEndpoint)
if err != nil {
panic(err)
}
auth := httptransport.APIKeyAuth("x-token", "header", env.ZrokToken)
tunnelReq := tunnel.NewTunnelParams()
tunnelReq.Body = &rest_model_zrok.TunnelRequest{
ZitiIdentityID: env.ZitiIdentityId,
Endpoint: fmt.Sprintf("looper#%d", l.id),
AuthScheme: string(model.None),
}
tunnelResp, err := zrok.Tunnel.Tunnel(tunnelReq, auth)
if err != nil {
panic(err)
}
2022-10-04 19:16:30 +02:00
logrus.Infof("looper #%d, service: %v, frontend: %v", l.id, tunnelResp.Payload.Service, tunnelResp.Payload.ProxyEndpoint)
2022-10-03 22:15:57 +02:00
go l.serviceListener(zif, tunnelResp.Payload.Service)
2022-10-03 22:34:24 +02:00
2022-10-04 19:28:16 +02:00
time.Sleep(time.Duration(l.r.dwellSeconds) * time.Second)
2022-10-03 22:34:24 +02:00
2022-10-04 19:28:16 +02:00
for i := 0; i < l.r.iterations; i++ {
if i > 0 && i%l.r.statusEvery == 0 {
2022-10-03 22:44:15 +02:00
logrus.Infof("looper #%d: iteration #%d", l.id, i)
}
outpayload := make([]byte, 10240)
2022-10-03 22:34:24 +02:00
outbase64 := base64.StdEncoding.EncodeToString(outpayload)
rand.Read(outpayload)
if req, err := http.NewRequest("POST", tunnelResp.Payload.ProxyEndpoint, bytes.NewBufferString(outbase64)); err == nil {
2022-10-04 19:28:16 +02:00
client := &http.Client{Timeout: time.Second * time.Duration(l.r.timeoutSeconds)}
2022-10-03 22:34:24 +02:00
if resp, err := client.Do(req); err == nil {
inpayload := new(bytes.Buffer)
io.Copy(inpayload, resp.Body)
inbase64 := inpayload.String()
if inbase64 != outbase64 {
2022-10-04 19:16:30 +02:00
logrus.Errorf("looper #%d payload mismatch!", l.id)
2022-10-03 22:34:24 +02:00
} else {
2022-10-04 19:16:30 +02:00
logrus.Debugf("looper #%d payload match", l.id)
2022-10-03 22:34:24 +02:00
}
} else {
2022-10-04 19:16:30 +02:00
logrus.Errorf("looper #%d error: %v", l.id, err)
2022-10-03 22:34:24 +02:00
}
} else {
2022-10-04 19:16:30 +02:00
logrus.Errorf("looper #%d error creating request: %v", l.id, err)
2022-10-03 22:34:24 +02:00
}
}
2022-10-03 22:44:15 +02:00
logrus.Infof("looper #%d: complete", l.id)
2022-10-03 22:34:24 +02:00
2022-10-03 22:15:57 +02:00
if l.listener != nil {
if err := l.listener.Close(); err != nil {
2022-10-04 19:16:30 +02:00
logrus.Errorf("looper #%d error closing listener: %v", l.id, err)
2022-10-03 22:15:57 +02:00
}
}
untunnelReq := tunnel.NewUntunnelParams()
untunnelReq.Body = &rest_model_zrok.UntunnelRequest{
ZitiIdentityID: env.ZitiIdentityId,
Service: tunnelResp.Payload.Service,
}
if _, err := zrok.Tunnel.Untunnel(untunnelReq, auth); err != nil {
logrus.Errorf("error shutting down looper #%d: %v", l.id, err)
}
2022-09-30 21:23:53 +02:00
}
2022-10-03 22:15:57 +02:00
func (l *looper) serviceListener(zitiIdPath string, svcId string) {
zcfg, err := config.NewFromFile(zitiIdPath)
if err != nil {
logrus.Errorf("error opening ziti config '%v': %v", zitiIdPath, err)
return
}
opts := ziti.ListenOptions{
ConnectTimeout: 5 * time.Minute,
MaxConnections: 10,
}
if l.listener, err = ziti.NewContextWithConfig(zcfg).ListenWithOptions(svcId, &opts); err == nil {
if err := http.Serve(l.listener, l); err != nil {
2022-10-04 19:16:30 +02:00
logrus.Errorf("looper #%d, error serving: %v", l.id, err)
2022-10-03 22:15:57 +02:00
}
} else {
2022-10-04 19:16:30 +02:00
logrus.Errorf("looper #%d, error listening: %v", l.id, err)
2022-10-03 22:15:57 +02:00
}
}
func (l *looper) ServeHTTP(w http.ResponseWriter, r *http.Request) {
2022-10-03 22:34:24 +02:00
buf := new(bytes.Buffer)
io.Copy(buf, r.Body)
w.Write(buf.Bytes())
2022-10-03 22:15:57 +02:00
}