From d319f32ea2f703eca2ebc23a44937af8d4d02206 Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Thu, 27 Mar 2025 15:13:10 -0400 Subject: [PATCH] full persistence for private accesses and reserved shares (#922) --- agent/access.go | 2 + agent/accessPrivate.go | 5 +- agent/agent.go | 108 ++++++++++++++++++++++++++++++---- agent/registry.go | 31 ++++++++++ agent/share.go | 2 + agent/sharePrivate.go | 1 + agent/sharePublic.go | 1 + agent/shareReserved.go | 1 + environment/env_core/model.go | 1 + environment/env_v0_3/api.go | 6 +- environment/env_v0_4/api.go | 4 ++ environment/env_v0_4/dirs.go | 8 +++ 12 files changed, 157 insertions(+), 13 deletions(-) create mode 100644 agent/registry.go diff --git a/agent/access.go b/agent/access.go index 083381dd..2f4f66af 100644 --- a/agent/access.go +++ b/agent/access.go @@ -26,6 +26,8 @@ type access struct { autoEndPort uint16 responseHeaders []string + request *AccessPrivateRequest + process *proctree.Child sub *subordinate.MessageHandler diff --git a/agent/accessPrivate.go b/agent/accessPrivate.go index 75f4034b..bde95c49 100644 --- a/agent/accessPrivate.go +++ b/agent/accessPrivate.go @@ -34,9 +34,10 @@ func (a *Agent) AccessPrivate(req *AccessPrivateRequest) (frontendToken string, bindAddress: req.BindAddress, autoMode: req.AutoMode, autoAddress: req.AutoAddress, - autoStartPort: uint16(req.AutoStartPort), - autoEndPort: uint16(req.AutoEndPort), + autoStartPort: req.AutoStartPort, + autoEndPort: req.AutoEndPort, responseHeaders: req.ResponseHeaders, + request: req, sub: subordinate.NewMessageHandler(), agent: a, } diff --git a/agent/agent.go b/agent/agent.go index 7d9d616a..13001454 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -19,16 +19,17 @@ import ( ) type Agent struct { - cfg *AgentConfig - httpEndpoint string - root env_core.Root - agentSocket string - shares map[string]*share - addShare chan *share - rmShare chan *share - accesses map[string]*access - addAccess chan *access - rmAccess chan *access + cfg *AgentConfig + httpEndpoint string + root env_core.Root + agentSocket string + shares map[string]*share + addShare chan *share + rmShare chan *share + accesses map[string]*access + addAccess chan *access + rmAccess chan *access + persistRegistry bool } func NewAgent(cfg *AgentConfig, root env_core.Root) (*Agent, error) { @@ -67,6 +68,12 @@ func (a *Agent) Run() error { go a.manager() go a.gateway(a.cfg) + a.persistRegistry = false + if err := a.ReloadRegistry(); err != nil { + logrus.Errorf("error reloading registry '%v'", err) + } + a.persistRegistry = true + srv := grpc.NewServer() agentGrpc.RegisterAgentServer(srv, &agentGrpcImpl{agent: a}) if err := srv.Serve(l); err != nil { @@ -79,6 +86,7 @@ func (a *Agent) Run() error { func (a *Agent) Shutdown() { logrus.Infof("stopping") + a.persistRegistry = false if err := os.Remove(a.agentSocket); err != nil { logrus.Warnf("unable to remove agent socket: %v", err) } @@ -96,6 +104,60 @@ func (a *Agent) Config() *AgentConfig { return a.cfg } +func (a *Agent) ReloadRegistry() error { + registryPath, err := a.root.AgentRegistry() + if err != nil { + return err + } + registry, err := LoadRegistry(registryPath) + if err != nil { + return err + } + logrus.Infof("loaded %d reserved shares, %d accesses", len(registry.ReservedShares), len(registry.PrivateAccesses)) + for _, req := range registry.ReservedShares { + if resp, err := a.ShareReserved(req); err == nil { + logrus.Infof("restarted reserved share '%v' -> '%v'", req, resp) + } else { + logrus.Errorf("error restarting reserved share '%v': %v", req, err) + } + } + for _, req := range registry.PrivateAccesses { + if resp, err := a.AccessPrivate(req); err == nil { + logrus.Infof("restarted private access '%v' -> '%v'", req, resp) + } else { + logrus.Errorf("error restarting private access '%v': %v", req, err) + } + } + logrus.Infof("reload complete") + return nil +} + +func (a *Agent) SaveRegistry() error { + r := &Registry{} + for _, shr := range a.shares { + if shr.request != nil { + switch shr.request.(type) { + case *ShareReservedRequest: + logrus.Infof("persisting reserved share '%v'", shr.token) + r.ReservedShares = append(r.ReservedShares, shr.request.(*ShareReservedRequest)) + } + } + } + for _, acc := range a.accesses { + if acc.request != nil { + r.PrivateAccesses = append(r.PrivateAccesses, acc.request) + } + } + registryPath, err := a.root.AgentRegistry() + if err != nil { + return err + } + if err := r.Save(registryPath); err != nil { + return err + } + return nil +} + func (a *Agent) gateway(cfg *AgentConfig) { logrus.Info("started") defer logrus.Warn("exited") @@ -132,6 +194,12 @@ func (a *Agent) manager() { logrus.Infof("adding new share '%v'", inShare.token) a.shares[inShare.token] = inShare + if a.persistRegistry { + if err := a.SaveRegistry(); err != nil { + logrus.Errorf("unable to persist registry: %v", err) + } + } + case outShare := <-a.rmShare: if shr, found := a.shares[outShare.token]; found { logrus.Infof("removing share '%v'", shr.token) @@ -147,6 +215,13 @@ func (a *Agent) manager() { } } delete(a.shares, shr.token) + + if a.persistRegistry { + if err := a.SaveRegistry(); err != nil { + logrus.Errorf("unable to persist registry: %v", err) + } + } + } else { logrus.Debug("skipping unidentified (orphaned) share removal") } @@ -155,6 +230,12 @@ func (a *Agent) manager() { logrus.Infof("adding new access '%v'", inAccess.frontendToken) a.accesses[inAccess.frontendToken] = inAccess + if a.persistRegistry { + if err := a.SaveRegistry(); err != nil { + logrus.Errorf("unable to persist registry: %v", err) + } + } + case outAccess := <-a.rmAccess: if acc, found := a.accesses[outAccess.frontendToken]; found { logrus.Infof("removing access '%v'", acc.frontendToken) @@ -168,6 +249,13 @@ func (a *Agent) manager() { logrus.Errorf("error deleting access '%v': %v", acc.frontendToken, err) } delete(a.accesses, acc.frontendToken) + + if a.persistRegistry { + if err := a.SaveRegistry(); err != nil { + logrus.Errorf("unable to persist registry: %v", err) + } + } + } else { logrus.Debug("skipping unidentified (orphaned) access removal") } diff --git a/agent/registry.go b/agent/registry.go new file mode 100644 index 00000000..7cb2a5d0 --- /dev/null +++ b/agent/registry.go @@ -0,0 +1,31 @@ +package agent + +import ( + "encoding/json" + "os" +) + +type Registry struct { + ReservedShares []*ShareReservedRequest + PrivateAccesses []*AccessPrivateRequest +} + +func LoadRegistry(path string) (*Registry, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, err + } + r := &Registry{} + if err := json.Unmarshal(data, r); err != nil { + return nil, err + } + return r, nil +} + +func (r *Registry) Save(path string) error { + data, err := json.MarshalIndent(r, "", " ") + if err != nil { + return err + } + return os.WriteFile(path, data, 0644) +} diff --git a/agent/share.go b/agent/share.go index ab633154..7c5c3e8e 100644 --- a/agent/share.go +++ b/agent/share.go @@ -60,6 +60,8 @@ type share struct { closed bool accessGrants []string + request interface{} + process *proctree.Child sub *subordinate.MessageHandler diff --git a/agent/sharePrivate.go b/agent/sharePrivate.go index d4e07778..d5f24155 100644 --- a/agent/sharePrivate.go +++ b/agent/sharePrivate.go @@ -27,6 +27,7 @@ func (a *Agent) SharePrivate(req *SharePrivateRequest) (shareToken string, err e shr := &share{ shareMode: sdk.PrivateShareMode, backendMode: sdk.BackendMode(req.BackendMode), + request: req, sub: subordinate.NewMessageHandler(), agent: a, } diff --git a/agent/sharePublic.go b/agent/sharePublic.go index 33891da3..5db0a521 100644 --- a/agent/sharePublic.go +++ b/agent/sharePublic.go @@ -27,6 +27,7 @@ func (a *Agent) SharePublic(req *SharePublicRequest) (shareToken string, fronten shr := &share{ shareMode: sdk.PublicShareMode, backendMode: sdk.BackendMode(req.BackendMode), + request: req, sub: subordinate.NewMessageHandler(), agent: a, } diff --git a/agent/shareReserved.go b/agent/shareReserved.go index f5e3e588..d3301197 100644 --- a/agent/shareReserved.go +++ b/agent/shareReserved.go @@ -25,6 +25,7 @@ func (a *Agent) ShareReserved(req *ShareReservedRequest) (*ShareReservedResponse shrCmd := []string{os.Args[0], "share", "reserved", "--subordinate"} shr := &share{ reserved: true, + request: req, sub: subordinate.NewMessageHandler(), agent: a, } diff --git a/environment/env_core/model.go b/environment/env_core/model.go index 84a1d98a..f7d5f74c 100644 --- a/environment/env_core/model.go +++ b/environment/env_core/model.go @@ -29,6 +29,7 @@ type Root interface { DeleteZitiIdentityNamed(name string) error AgentSocket() (string, error) + AgentRegistry() (string, error) } type Environment struct { diff --git a/environment/env_v0_3/api.go b/environment/env_v0_3/api.go index cab59ed0..cf0f610a 100644 --- a/environment/env_v0_3/api.go +++ b/environment/env_v0_3/api.go @@ -194,7 +194,11 @@ func (r *Root) DeleteZitiIdentityNamed(name string) error { } func (r *Root) AgentSocket() (string, error) { - return "", errors.Errorf("this environment version does not support agent sockets; please 'zrok update' this environment") + return "", errors.Errorf("this environment version does not support the zrok Agent; please 'zrok update' this environment") +} + +func (r *Root) AgentRegistry() (string, error) { + return "", errors.Errorf("this environment version does not support the zrok Agent; please 'zrok update' this environment") } func (r *Root) Obliterate() error { diff --git a/environment/env_v0_4/api.go b/environment/env_v0_4/api.go index 1f53dc8b..d2aec7e6 100644 --- a/environment/env_v0_4/api.go +++ b/environment/env_v0_4/api.go @@ -196,6 +196,10 @@ func (r *Root) AgentSocket() (string, error) { return agentSocket() } +func (r *Root) AgentRegistry() (string, error) { + return agentRegistry() +} + func (r *Root) Obliterate() error { zrd, err := rootDir() if err != nil { diff --git a/environment/env_v0_4/dirs.go b/environment/env_v0_4/dirs.go index bf18febf..328eeb80 100644 --- a/environment/env_v0_4/dirs.go +++ b/environment/env_v0_4/dirs.go @@ -61,3 +61,11 @@ func agentSocket() (string, error) { } return filepath.Join(zrd, "agent.socket"), nil } + +func agentRegistry() (string, error) { + zrd, err := rootDir() + if err != nil { + return "", err + } + return filepath.Join(zrd, "agent-registry.json"), nil +}