port the share infrastructure over to the subordinate framework (#789)

This commit is contained in:
Michael Quigley 2024-11-15 12:09:20 -05:00
parent 5c4cf9b26a
commit 3205b92d06
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
7 changed files with 126 additions and 113 deletions

View File

@ -1,14 +1,11 @@
package agent
import (
"bytes"
"encoding/json"
"errors"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/zrok/agent/proctree"
"github.com/openziti/zrok/cmd/zrok/subordinate"
"github.com/openziti/zrok/sdk/golang/sdk"
"github.com/sirupsen/logrus"
"strings"
"time"
)
@ -28,11 +25,8 @@ type share struct {
closed bool
accessGrants []string
process *proctree.Child
readBuffer bytes.Buffer
booted bool
bootComplete chan struct{}
bootErr error
process *proctree.Child
sub *subordinate.MessageHandler
agent *Agent
}
@ -44,79 +38,46 @@ func (s *share) monitor() {
s.agent.rmShare <- s
}
func (s *share) tail(data []byte) {
defer func() {
if r := recover(); r != nil {
logrus.Errorf("recovering: %v", r)
func (s *share) bootHandler(msgType string, msg subordinate.Message) error {
switch msgType {
case subordinate.BootMessage:
if v, found := msg["token"]; found {
if str, ok := v.(string); ok {
s.token = str
}
}
}()
s.readBuffer.Write(data)
if line, err := s.readBuffer.ReadString('\n'); err == nil {
line = strings.Trim(line, "\n")
if !s.booted {
if strings.HasPrefix(line, "{") {
in := make(map[string]interface{})
if err := json.Unmarshal([]byte(line), &in); err == nil {
if v, found := in["message"]; found {
if str, ok := v.(string); ok {
if str == "boot" {
if v, found := in["token"]; found {
if str, ok := v.(string); ok {
s.token = str
}
}
if v, found := in["backend_mode"]; found {
if str, ok := v.(string); ok {
s.backendMode = sdk.BackendMode(str)
}
}
if v, found := in["share_mode"]; found {
if str, ok := v.(string); ok {
s.shareMode = sdk.ShareMode(str)
}
}
if v, found := in["frontend_endpoints"]; found {
if vArr, ok := v.([]interface{}); ok {
for _, v := range vArr {
if str, ok := v.(string); ok {
s.frontendEndpoints = append(s.frontendEndpoints, str)
}
}
}
}
if v, found := in["target"]; found {
if str, ok := v.(string); ok {
s.target = str
}
}
s.booted = true
} else {
s.bootErr = errors.New(line)
}
} else {
s.bootErr = errors.New(line)
}
} else {
s.bootErr = errors.New(line)
if v, found := msg["backend_mode"]; found {
if str, ok := v.(string); ok {
s.backendMode = sdk.BackendMode(str)
}
}
if v, found := msg["share_mode"]; found {
if str, ok := v.(string); ok {
s.shareMode = sdk.ShareMode(str)
}
}
if v, found := msg["frontend_endpoints"]; found {
if vArr, ok := v.([]interface{}); ok {
for _, v := range vArr {
if str, ok := v.(string); ok {
s.frontendEndpoints = append(s.frontendEndpoints, str)
}
} else {
s.bootErr = errors.New(line)
}
close(s.bootComplete)
} else {
logrus.Warn(line)
}
} else {
if strings.HasPrefix(line, "{") {
in := make(map[string]interface{})
if err := json.Unmarshal([]byte(line), &in); err == nil {
pfxlog.ChannelLogger(s.token).Info(in)
}
} else {
pfxlog.ChannelLogger(s.token).Info(strings.Trim(line, "\n"))
}
}
} else {
s.readBuffer.WriteString(line)
if v, found := msg["target"]; found {
if str, ok := v.(string); ok {
s.target = str
}
}
case subordinate.ErrorMessage:
if v, found := msg[subordinate.ErrorMessage]; found {
if str, ok := v.(string); ok {
return errors.New(str)
}
}
}
return nil
}

View File

@ -3,8 +3,10 @@ package agent
import (
"context"
"errors"
"fmt"
"github.com/openziti/zrok/agent/agentGrpc"
"github.com/openziti/zrok/agent/proctree"
"github.com/openziti/zrok/cmd/zrok/subordinate"
"github.com/openziti/zrok/environment"
"github.com/openziti/zrok/sdk/golang/sdk"
"github.com/sirupsen/logrus"
@ -23,10 +25,20 @@ func (i *agentGrpcImpl) SharePrivate(_ context.Context, req *agentGrpc.SharePriv
shrCmd := []string{os.Args[0], "share", "private", "--subordinate", "-b", req.BackendMode}
shr := &share{
shareMode: sdk.PrivateShareMode,
backendMode: sdk.BackendMode(req.BackendMode),
bootComplete: make(chan struct{}),
agent: i.agent,
shareMode: sdk.PrivateShareMode,
backendMode: sdk.BackendMode(req.BackendMode),
sub: subordinate.NewMessageHandler(),
agent: i.agent,
}
shr.sub.MessageHandler = func(msg subordinate.Message) {
logrus.Info(msg)
}
var bootErr error
shr.sub.BootHandler = func(msgType string, msg subordinate.Message) {
bootErr = shr.bootHandler(msgType, msg)
}
shr.sub.MalformedHandler = func(msg subordinate.Message) {
logrus.Error(msg)
}
if req.Insecure {
@ -49,18 +61,22 @@ func (i *agentGrpcImpl) SharePrivate(_ context.Context, req *agentGrpc.SharePriv
logrus.Infof("executing '%v'", shrCmd)
shr.process, err = proctree.StartChild(shr.tail, shrCmd...)
shr.process, err = proctree.StartChild(shr.sub.Tail, shrCmd...)
if err != nil {
return nil, err
}
go shr.monitor()
<-shr.bootComplete
<-shr.sub.BootComplete
if shr.bootErr == nil {
if bootErr == nil {
go shr.monitor()
i.agent.addShare <- shr
return &agentGrpc.SharePrivateResponse{Token: shr.token}, nil
} else {
if err := proctree.WaitChild(shr.process); err != nil {
logrus.Errorf("error joining: %v", err)
}
return nil, fmt.Errorf("unable to start share: %v", bootErr)
}
return nil, shr.bootErr
}

View File

@ -3,8 +3,10 @@ package agent
import (
"context"
"errors"
"fmt"
"github.com/openziti/zrok/agent/agentGrpc"
"github.com/openziti/zrok/agent/proctree"
"github.com/openziti/zrok/cmd/zrok/subordinate"
"github.com/openziti/zrok/environment"
"github.com/openziti/zrok/sdk/golang/sdk"
"github.com/sirupsen/logrus"
@ -23,10 +25,20 @@ func (i *agentGrpcImpl) SharePublic(_ context.Context, req *agentGrpc.SharePubli
shrCmd := []string{os.Args[0], "share", "public", "--subordinate", "-b", req.BackendMode}
shr := &share{
shareMode: sdk.PublicShareMode,
backendMode: sdk.BackendMode(req.BackendMode),
bootComplete: make(chan struct{}),
agent: i.agent,
shareMode: sdk.PublicShareMode,
backendMode: sdk.BackendMode(req.BackendMode),
sub: subordinate.NewMessageHandler(),
agent: i.agent,
}
shr.sub.MessageHandler = func(msg subordinate.Message) {
logrus.Info(msg)
}
var bootErr error
shr.sub.BootHandler = func(msgType string, msg subordinate.Message) {
bootErr = shr.bootHandler(msgType, msg)
}
shr.sub.MalformedHandler = func(msg subordinate.Message) {
logrus.Error(msg)
}
for _, basicAuth := range req.BasicAuth {
@ -73,21 +85,25 @@ func (i *agentGrpcImpl) SharePublic(_ context.Context, req *agentGrpc.SharePubli
logrus.Infof("executing '%v'", shrCmd)
shr.process, err = proctree.StartChild(shr.tail, shrCmd...)
shr.process, err = proctree.StartChild(shr.sub.Tail, shrCmd...)
if err != nil {
return nil, err
}
go shr.monitor()
<-shr.bootComplete
<-shr.sub.BootComplete
if shr.bootErr == nil {
if bootErr == nil {
go shr.monitor()
i.agent.addShare <- shr
return &agentGrpc.SharePublicResponse{
Token: shr.token,
FrontendEndpoints: shr.frontendEndpoints,
}, nil
}
return nil, shr.bootErr
} else {
if err := proctree.WaitChild(shr.process); err != nil {
logrus.Errorf("error joining: %v", err)
}
return nil, fmt.Errorf("unable to start share: %v", bootErr)
}
}

View File

@ -3,9 +3,12 @@ package agent
import (
"context"
"errors"
"fmt"
"github.com/openziti/zrok/agent/agentGrpc"
"github.com/openziti/zrok/agent/proctree"
"github.com/openziti/zrok/cmd/zrok/subordinate"
"github.com/openziti/zrok/environment"
"github.com/sirupsen/logrus"
"os"
)
@ -21,9 +24,19 @@ func (i *agentGrpcImpl) ShareReserved(_ context.Context, req *agentGrpc.ShareRes
shrCmd := []string{os.Args[0], "share", "reserved", "--subordinate"}
shr := &share{
reserved: true,
bootComplete: make(chan struct{}),
agent: i.agent,
reserved: true,
sub: subordinate.NewMessageHandler(),
agent: i.agent,
}
shr.sub.MessageHandler = func(msg subordinate.Message) {
logrus.Info(msg)
}
var bootErr error
shr.sub.BootHandler = func(msgType string, msg subordinate.Message) {
bootErr = shr.bootHandler(msgType, msg)
}
shr.sub.MalformedHandler = func(msg subordinate.Message) {
logrus.Error(msg)
}
if req.OverrideEndpoint != "" {
@ -38,15 +51,15 @@ func (i *agentGrpcImpl) ShareReserved(_ context.Context, req *agentGrpc.ShareRes
shrCmd = append(shrCmd, req.Token)
shr.token = req.Token
shr.process, err = proctree.StartChild(shr.tail, shrCmd...)
shr.process, err = proctree.StartChild(shr.sub.Tail, shrCmd...)
if err != nil {
return nil, err
}
go shr.monitor()
<-shr.bootComplete
<-shr.sub.BootComplete
if shr.bootErr == nil {
if bootErr == nil {
go shr.monitor()
i.agent.addShare <- shr
return &agentGrpc.ShareReservedResponse{
Token: shr.token,
@ -55,7 +68,11 @@ func (i *agentGrpcImpl) ShareReserved(_ context.Context, req *agentGrpc.ShareRes
FrontendEndpoints: shr.frontendEndpoints,
Target: shr.target,
}, nil
}
return nil, shr.bootErr
} else {
if err := proctree.WaitChild(shr.process); err != nil {
logrus.Errorf("error joining: %v", err)
}
return nil, fmt.Errorf("unable to start share: %v", bootErr)
}
}

View File

@ -7,6 +7,7 @@ import (
tea "github.com/charmbracelet/bubbletea"
"github.com/openziti/zrok/agent/agentClient"
"github.com/openziti/zrok/agent/agentGrpc"
"github.com/openziti/zrok/cmd/zrok/subordinate"
"github.com/openziti/zrok/endpoints"
"github.com/openziti/zrok/endpoints/drive"
"github.com/openziti/zrok/endpoints/proxy"
@ -371,7 +372,7 @@ func (cmd *sharePrivateCommand) shareLocal(args []string, root env_core.Root) {
if cmd.subordinate {
data := make(map[string]interface{})
data["message"] = "boot"
data[subordinate.MessageKey] = subordinate.BootMessage
data["token"] = shr.Token
data["frontend_endpoints"] = shr.FrontendEndpoints
jsonData, err := json.Marshal(data)
@ -395,7 +396,7 @@ func (cmd *sharePrivateCommand) shareLocal(args []string, root env_core.Root) {
select {
case req := <-requests:
data := make(map[string]interface{})
data["message"] = "access"
data[subordinate.MessageKey] = "access"
data["remote_address"] = req.RemoteAddr
data["method"] = req.Method
data["path"] = req.Path

View File

@ -8,6 +8,7 @@ import (
"github.com/gobwas/glob"
"github.com/openziti/zrok/agent/agentClient"
"github.com/openziti/zrok/agent/agentGrpc"
"github.com/openziti/zrok/cmd/zrok/subordinate"
"github.com/openziti/zrok/endpoints"
"github.com/openziti/zrok/endpoints/drive"
"github.com/openziti/zrok/endpoints/proxy"
@ -273,7 +274,7 @@ func (cmd *sharePublicCommand) shareLocal(args []string, root env_core.Root) {
if cmd.subordinate {
data := make(map[string]interface{})
data["message"] = "boot"
data[subordinate.MessageKey] = subordinate.BootMessage
data["token"] = shr.Token
data["frontend_endpoints"] = shr.FrontendEndpoints
jsonData, err := json.Marshal(data)
@ -297,7 +298,7 @@ func (cmd *sharePublicCommand) shareLocal(args []string, root env_core.Root) {
select {
case req := <-requests:
data := make(map[string]interface{})
data["message"] = "access"
data[subordinate.MessageKey] = "access"
data["remote_address"] = req.RemoteAddr
data["method"] = req.Method
data["path"] = req.Path

View File

@ -8,6 +8,7 @@ import (
httptransport "github.com/go-openapi/runtime/client"
"github.com/openziti/zrok/agent/agentClient"
"github.com/openziti/zrok/agent/agentGrpc"
"github.com/openziti/zrok/cmd/zrok/subordinate"
"github.com/openziti/zrok/endpoints"
"github.com/openziti/zrok/endpoints/drive"
"github.com/openziti/zrok/endpoints/proxy"
@ -321,7 +322,7 @@ func (cmd *shareReservedCommand) shareLocal(args []string, root env_core.Root) {
if cmd.subordinate {
data := make(map[string]interface{})
data["message"] = "boot"
data[subordinate.MessageKey] = subordinate.BootMessage
data["token"] = resp.Payload.Token
data["backend_mode"] = resp.Payload.BackendMode
data["share_mode"] = resp.Payload.ShareMode
@ -358,7 +359,7 @@ func (cmd *shareReservedCommand) shareLocal(args []string, root env_core.Root) {
select {
case req := <-requests:
data := make(map[string]interface{})
data["message"] = "access"
data[subordinate.MessageKey] = "access"
data["remote-address"] = req.RemoteAddr
data["method"] = req.Method
data["path"] = req.Path