diff --git a/agent/agent.go b/agent/agent.go index adea2da9..7eb30359 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -14,7 +14,11 @@ type Agent struct { root env_core.Root agentSocket string shares map[string]*share + inShares chan *share + outShares chan *share accesses map[string]*access + inAccesses chan *access + outAccesses chan *access } func NewAgent(root env_core.Root) (*Agent, error) { @@ -22,15 +26,21 @@ func NewAgent(root env_core.Root) (*Agent, error) { return nil, errors.Errorf("unable to load environment; did you 'zrok enable'?") } return &Agent{ - root: root, - shares: make(map[string]*share), - accesses: make(map[string]*access), + root: root, + shares: make(map[string]*share), + inShares: make(chan *share), + outShares: make(chan *share), + accesses: make(map[string]*access), + inAccesses: make(chan *access), + outAccesses: make(chan *access), }, nil } func (a *Agent) Run() error { logrus.Infof("started") + go a.manager() + agentSocket, err := a.root.AgentSocket() if err != nil { return err @@ -55,3 +65,20 @@ func (a *Agent) Shutdown() { logrus.Warnf("unable to remove agent socket: %v", err) } } + +func (a *Agent) manager() { + logrus.Info("started") + defer logrus.Warn("exited") + + for { + select { + case inShare := <-a.inShares: + logrus.Infof("adding new share '%v'", inShare.token) + a.shares[inShare.token] = inShare + + case outShare := <-a.outShares: + logrus.Infof("removing share '%v'", outShare.token) + delete(a.shares, outShare.token) + } + } +} diff --git a/agent/model.go b/agent/model.go index bf63520f..42371ea3 100644 --- a/agent/model.go +++ b/agent/model.go @@ -7,6 +7,7 @@ import ( "github.com/openziti/zrok/agent/agentGrpc" "github.com/openziti/zrok/agent/proctree" "github.com/openziti/zrok/sdk/golang/sdk" + "github.com/pkg/errors" "strings" "time" ) @@ -27,15 +28,27 @@ type share struct { closed bool accessGrants []string - process *proctree.Child - readBuffer bytes.Buffer - ready chan struct{} + process *proctree.Child + readBuffer bytes.Buffer + booted bool + bootComplete chan struct{} + bootErr error + + a *Agent +} + +func (s *share) monitor() { + if err := proctree.WaitChild(s.process); err != nil { + pfxlog.ChannelLogger(s.token).Error(err) + } + s.a.outShares <- s } func (s *share) tail(data []byte) { s.readBuffer.Write(data) if line, err := s.readBuffer.ReadString('\n'); err == nil { - if s.token == "" { + line = strings.Trim(line, "\n") + if !s.booted { in := make(map[string]interface{}) if err := json.Unmarshal([]byte(line), &in); err == nil { if v, found := in["token"]; found { @@ -52,8 +65,12 @@ func (s *share) tail(data []byte) { } } } + s.booted = true + } else { + s.bootErr = errors.New(line) } - close(s.ready) + close(s.bootComplete) + } else { if strings.HasPrefix(line, "{") { in := make(map[string]interface{}) diff --git a/agent/publicShare.go b/agent/publicShare.go index e35c6ec9..5253b85b 100644 --- a/agent/publicShare.go +++ b/agent/publicShare.go @@ -23,9 +23,10 @@ func (i *agentGrpcImpl) PublicShare(_ context.Context, req *agentGrpc.PublicShar shrCmd := []string{os.Args[0], "share", "public", "--agent", "-b", req.BackendMode} shr := &share{ - shareMode: sdk.PublicShareMode, - backendMode: sdk.BackendMode(req.BackendMode), - ready: make(chan struct{}), + shareMode: sdk.PublicShareMode, + backendMode: sdk.BackendMode(req.BackendMode), + bootComplete: make(chan struct{}), + a: i.a, } for _, basicAuth := range req.BasicAuth { @@ -74,14 +75,18 @@ func (i *agentGrpcImpl) PublicShare(_ context.Context, req *agentGrpc.PublicShar shr.process, err = proctree.StartChild(shr.tail, shrCmd...) if err != nil { - return nil, err + return &agentGrpc.PublicShareReply{}, err } - <-shr.ready - i.a.shares[shr.token] = shr + go shr.monitor() + <-shr.bootComplete - return &agentGrpc.PublicShareReply{ - Token: shr.token, - FrontendEndpoints: shr.frontendEndpoints, - }, nil + if shr.bootErr == nil { + i.a.inShares <- shr + return &agentGrpc.PublicShareReply{ + Token: shr.token, + FrontendEndpoints: shr.frontendEndpoints, + }, nil + } + return &agentGrpc.PublicShareReply{}, shr.bootErr }