better share lifecycle monitoring and error handling (#463)

This commit is contained in:
Michael Quigley 2024-09-13 14:39:30 -04:00
parent 8525348d21
commit 2cf484e1c5
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
3 changed files with 67 additions and 18 deletions

View File

@ -14,7 +14,11 @@ type Agent struct {
root env_core.Root root env_core.Root
agentSocket string agentSocket string
shares map[string]*share shares map[string]*share
inShares chan *share
outShares chan *share
accesses map[string]*access accesses map[string]*access
inAccesses chan *access
outAccesses chan *access
} }
func NewAgent(root env_core.Root) (*Agent, error) { func NewAgent(root env_core.Root) (*Agent, error) {
@ -24,13 +28,19 @@ func NewAgent(root env_core.Root) (*Agent, error) {
return &Agent{ return &Agent{
root: root, root: root,
shares: make(map[string]*share), shares: make(map[string]*share),
inShares: make(chan *share),
outShares: make(chan *share),
accesses: make(map[string]*access), accesses: make(map[string]*access),
inAccesses: make(chan *access),
outAccesses: make(chan *access),
}, nil }, nil
} }
func (a *Agent) Run() error { func (a *Agent) Run() error {
logrus.Infof("started") logrus.Infof("started")
go a.manager()
agentSocket, err := a.root.AgentSocket() agentSocket, err := a.root.AgentSocket()
if err != nil { if err != nil {
return err return err
@ -55,3 +65,20 @@ func (a *Agent) Shutdown() {
logrus.Warnf("unable to remove agent socket: %v", err) logrus.Warnf("unable to remove agent socket: %v", err)
} }
} }
func (a *Agent) manager() {
logrus.Info("started")
defer logrus.Warn("exited")
for {
select {
case inShare := <-a.inShares:
logrus.Infof("adding new share '%v'", inShare.token)
a.shares[inShare.token] = inShare
case outShare := <-a.outShares:
logrus.Infof("removing share '%v'", outShare.token)
delete(a.shares, outShare.token)
}
}
}

View File

@ -7,6 +7,7 @@ import (
"github.com/openziti/zrok/agent/agentGrpc" "github.com/openziti/zrok/agent/agentGrpc"
"github.com/openziti/zrok/agent/proctree" "github.com/openziti/zrok/agent/proctree"
"github.com/openziti/zrok/sdk/golang/sdk" "github.com/openziti/zrok/sdk/golang/sdk"
"github.com/pkg/errors"
"strings" "strings"
"time" "time"
) )
@ -29,13 +30,25 @@ type share struct {
process *proctree.Child process *proctree.Child
readBuffer bytes.Buffer readBuffer bytes.Buffer
ready chan struct{} booted bool
bootComplete chan struct{}
bootErr error
a *Agent
}
func (s *share) monitor() {
if err := proctree.WaitChild(s.process); err != nil {
pfxlog.ChannelLogger(s.token).Error(err)
}
s.a.outShares <- s
} }
func (s *share) tail(data []byte) { func (s *share) tail(data []byte) {
s.readBuffer.Write(data) s.readBuffer.Write(data)
if line, err := s.readBuffer.ReadString('\n'); err == nil { if line, err := s.readBuffer.ReadString('\n'); err == nil {
if s.token == "" { line = strings.Trim(line, "\n")
if !s.booted {
in := make(map[string]interface{}) in := make(map[string]interface{})
if err := json.Unmarshal([]byte(line), &in); err == nil { if err := json.Unmarshal([]byte(line), &in); err == nil {
if v, found := in["token"]; found { if v, found := in["token"]; found {
@ -52,8 +65,12 @@ func (s *share) tail(data []byte) {
} }
} }
} }
s.booted = true
} else {
s.bootErr = errors.New(line)
} }
close(s.ready) close(s.bootComplete)
} else { } else {
if strings.HasPrefix(line, "{") { if strings.HasPrefix(line, "{") {
in := make(map[string]interface{}) in := make(map[string]interface{})

View File

@ -25,7 +25,8 @@ func (i *agentGrpcImpl) PublicShare(_ context.Context, req *agentGrpc.PublicShar
shr := &share{ shr := &share{
shareMode: sdk.PublicShareMode, shareMode: sdk.PublicShareMode,
backendMode: sdk.BackendMode(req.BackendMode), backendMode: sdk.BackendMode(req.BackendMode),
ready: make(chan struct{}), bootComplete: make(chan struct{}),
a: i.a,
} }
for _, basicAuth := range req.BasicAuth { for _, basicAuth := range req.BasicAuth {
@ -74,14 +75,18 @@ func (i *agentGrpcImpl) PublicShare(_ context.Context, req *agentGrpc.PublicShar
shr.process, err = proctree.StartChild(shr.tail, shrCmd...) shr.process, err = proctree.StartChild(shr.tail, shrCmd...)
if err != nil { if err != nil {
return nil, err return &agentGrpc.PublicShareReply{}, err
} }
<-shr.ready go shr.monitor()
i.a.shares[shr.token] = shr <-shr.bootComplete
if shr.bootErr == nil {
i.a.inShares <- shr
return &agentGrpc.PublicShareReply{ return &agentGrpc.PublicShareReply{
Token: shr.token, Token: shr.token,
FrontendEndpoints: shr.frontendEndpoints, FrontendEndpoints: shr.frontendEndpoints,
}, nil }, nil
}
return &agentGrpc.PublicShareReply{}, shr.bootErr
} }