mirror of
https://github.com/openziti/zrok.git
synced 2025-06-21 18:21:34 +02:00
full persistence for private accesses and reserved shares (#922)
This commit is contained in:
parent
b5df385da4
commit
d319f32ea2
@ -26,6 +26,8 @@ type access struct {
|
||||
autoEndPort uint16
|
||||
responseHeaders []string
|
||||
|
||||
request *AccessPrivateRequest
|
||||
|
||||
process *proctree.Child
|
||||
sub *subordinate.MessageHandler
|
||||
|
||||
|
@ -34,9 +34,10 @@ func (a *Agent) AccessPrivate(req *AccessPrivateRequest) (frontendToken string,
|
||||
bindAddress: req.BindAddress,
|
||||
autoMode: req.AutoMode,
|
||||
autoAddress: req.AutoAddress,
|
||||
autoStartPort: uint16(req.AutoStartPort),
|
||||
autoEndPort: uint16(req.AutoEndPort),
|
||||
autoStartPort: req.AutoStartPort,
|
||||
autoEndPort: req.AutoEndPort,
|
||||
responseHeaders: req.ResponseHeaders,
|
||||
request: req,
|
||||
sub: subordinate.NewMessageHandler(),
|
||||
agent: a,
|
||||
}
|
||||
|
108
agent/agent.go
108
agent/agent.go
@ -19,16 +19,17 @@ import (
|
||||
)
|
||||
|
||||
type Agent struct {
|
||||
cfg *AgentConfig
|
||||
httpEndpoint string
|
||||
root env_core.Root
|
||||
agentSocket string
|
||||
shares map[string]*share
|
||||
addShare chan *share
|
||||
rmShare chan *share
|
||||
accesses map[string]*access
|
||||
addAccess chan *access
|
||||
rmAccess chan *access
|
||||
cfg *AgentConfig
|
||||
httpEndpoint string
|
||||
root env_core.Root
|
||||
agentSocket string
|
||||
shares map[string]*share
|
||||
addShare chan *share
|
||||
rmShare chan *share
|
||||
accesses map[string]*access
|
||||
addAccess chan *access
|
||||
rmAccess chan *access
|
||||
persistRegistry bool
|
||||
}
|
||||
|
||||
func NewAgent(cfg *AgentConfig, root env_core.Root) (*Agent, error) {
|
||||
@ -67,6 +68,12 @@ func (a *Agent) Run() error {
|
||||
go a.manager()
|
||||
go a.gateway(a.cfg)
|
||||
|
||||
a.persistRegistry = false
|
||||
if err := a.ReloadRegistry(); err != nil {
|
||||
logrus.Errorf("error reloading registry '%v'", err)
|
||||
}
|
||||
a.persistRegistry = true
|
||||
|
||||
srv := grpc.NewServer()
|
||||
agentGrpc.RegisterAgentServer(srv, &agentGrpcImpl{agent: a})
|
||||
if err := srv.Serve(l); err != nil {
|
||||
@ -79,6 +86,7 @@ func (a *Agent) Run() error {
|
||||
func (a *Agent) Shutdown() {
|
||||
logrus.Infof("stopping")
|
||||
|
||||
a.persistRegistry = false
|
||||
if err := os.Remove(a.agentSocket); err != nil {
|
||||
logrus.Warnf("unable to remove agent socket: %v", err)
|
||||
}
|
||||
@ -96,6 +104,60 @@ func (a *Agent) Config() *AgentConfig {
|
||||
return a.cfg
|
||||
}
|
||||
|
||||
func (a *Agent) ReloadRegistry() error {
|
||||
registryPath, err := a.root.AgentRegistry()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
registry, err := LoadRegistry(registryPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logrus.Infof("loaded %d reserved shares, %d accesses", len(registry.ReservedShares), len(registry.PrivateAccesses))
|
||||
for _, req := range registry.ReservedShares {
|
||||
if resp, err := a.ShareReserved(req); err == nil {
|
||||
logrus.Infof("restarted reserved share '%v' -> '%v'", req, resp)
|
||||
} else {
|
||||
logrus.Errorf("error restarting reserved share '%v': %v", req, err)
|
||||
}
|
||||
}
|
||||
for _, req := range registry.PrivateAccesses {
|
||||
if resp, err := a.AccessPrivate(req); err == nil {
|
||||
logrus.Infof("restarted private access '%v' -> '%v'", req, resp)
|
||||
} else {
|
||||
logrus.Errorf("error restarting private access '%v': %v", req, err)
|
||||
}
|
||||
}
|
||||
logrus.Infof("reload complete")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Agent) SaveRegistry() error {
|
||||
r := &Registry{}
|
||||
for _, shr := range a.shares {
|
||||
if shr.request != nil {
|
||||
switch shr.request.(type) {
|
||||
case *ShareReservedRequest:
|
||||
logrus.Infof("persisting reserved share '%v'", shr.token)
|
||||
r.ReservedShares = append(r.ReservedShares, shr.request.(*ShareReservedRequest))
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, acc := range a.accesses {
|
||||
if acc.request != nil {
|
||||
r.PrivateAccesses = append(r.PrivateAccesses, acc.request)
|
||||
}
|
||||
}
|
||||
registryPath, err := a.root.AgentRegistry()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := r.Save(registryPath); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Agent) gateway(cfg *AgentConfig) {
|
||||
logrus.Info("started")
|
||||
defer logrus.Warn("exited")
|
||||
@ -132,6 +194,12 @@ func (a *Agent) manager() {
|
||||
logrus.Infof("adding new share '%v'", inShare.token)
|
||||
a.shares[inShare.token] = inShare
|
||||
|
||||
if a.persistRegistry {
|
||||
if err := a.SaveRegistry(); err != nil {
|
||||
logrus.Errorf("unable to persist registry: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
case outShare := <-a.rmShare:
|
||||
if shr, found := a.shares[outShare.token]; found {
|
||||
logrus.Infof("removing share '%v'", shr.token)
|
||||
@ -147,6 +215,13 @@ func (a *Agent) manager() {
|
||||
}
|
||||
}
|
||||
delete(a.shares, shr.token)
|
||||
|
||||
if a.persistRegistry {
|
||||
if err := a.SaveRegistry(); err != nil {
|
||||
logrus.Errorf("unable to persist registry: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
logrus.Debug("skipping unidentified (orphaned) share removal")
|
||||
}
|
||||
@ -155,6 +230,12 @@ func (a *Agent) manager() {
|
||||
logrus.Infof("adding new access '%v'", inAccess.frontendToken)
|
||||
a.accesses[inAccess.frontendToken] = inAccess
|
||||
|
||||
if a.persistRegistry {
|
||||
if err := a.SaveRegistry(); err != nil {
|
||||
logrus.Errorf("unable to persist registry: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
case outAccess := <-a.rmAccess:
|
||||
if acc, found := a.accesses[outAccess.frontendToken]; found {
|
||||
logrus.Infof("removing access '%v'", acc.frontendToken)
|
||||
@ -168,6 +249,13 @@ func (a *Agent) manager() {
|
||||
logrus.Errorf("error deleting access '%v': %v", acc.frontendToken, err)
|
||||
}
|
||||
delete(a.accesses, acc.frontendToken)
|
||||
|
||||
if a.persistRegistry {
|
||||
if err := a.SaveRegistry(); err != nil {
|
||||
logrus.Errorf("unable to persist registry: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
logrus.Debug("skipping unidentified (orphaned) access removal")
|
||||
}
|
||||
|
31
agent/registry.go
Normal file
31
agent/registry.go
Normal file
@ -0,0 +1,31 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"os"
|
||||
)
|
||||
|
||||
type Registry struct {
|
||||
ReservedShares []*ShareReservedRequest
|
||||
PrivateAccesses []*AccessPrivateRequest
|
||||
}
|
||||
|
||||
func LoadRegistry(path string) (*Registry, error) {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r := &Registry{}
|
||||
if err := json.Unmarshal(data, r); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (r *Registry) Save(path string) error {
|
||||
data, err := json.MarshalIndent(r, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return os.WriteFile(path, data, 0644)
|
||||
}
|
@ -60,6 +60,8 @@ type share struct {
|
||||
closed bool
|
||||
accessGrants []string
|
||||
|
||||
request interface{}
|
||||
|
||||
process *proctree.Child
|
||||
sub *subordinate.MessageHandler
|
||||
|
||||
|
@ -27,6 +27,7 @@ func (a *Agent) SharePrivate(req *SharePrivateRequest) (shareToken string, err e
|
||||
shr := &share{
|
||||
shareMode: sdk.PrivateShareMode,
|
||||
backendMode: sdk.BackendMode(req.BackendMode),
|
||||
request: req,
|
||||
sub: subordinate.NewMessageHandler(),
|
||||
agent: a,
|
||||
}
|
||||
|
@ -27,6 +27,7 @@ func (a *Agent) SharePublic(req *SharePublicRequest) (shareToken string, fronten
|
||||
shr := &share{
|
||||
shareMode: sdk.PublicShareMode,
|
||||
backendMode: sdk.BackendMode(req.BackendMode),
|
||||
request: req,
|
||||
sub: subordinate.NewMessageHandler(),
|
||||
agent: a,
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ func (a *Agent) ShareReserved(req *ShareReservedRequest) (*ShareReservedResponse
|
||||
shrCmd := []string{os.Args[0], "share", "reserved", "--subordinate"}
|
||||
shr := &share{
|
||||
reserved: true,
|
||||
request: req,
|
||||
sub: subordinate.NewMessageHandler(),
|
||||
agent: a,
|
||||
}
|
||||
|
@ -29,6 +29,7 @@ type Root interface {
|
||||
DeleteZitiIdentityNamed(name string) error
|
||||
|
||||
AgentSocket() (string, error)
|
||||
AgentRegistry() (string, error)
|
||||
}
|
||||
|
||||
type Environment struct {
|
||||
|
@ -194,7 +194,11 @@ func (r *Root) DeleteZitiIdentityNamed(name string) error {
|
||||
}
|
||||
|
||||
func (r *Root) AgentSocket() (string, error) {
|
||||
return "", errors.Errorf("this environment version does not support agent sockets; please 'zrok update' this environment")
|
||||
return "", errors.Errorf("this environment version does not support the zrok Agent; please 'zrok update' this environment")
|
||||
}
|
||||
|
||||
func (r *Root) AgentRegistry() (string, error) {
|
||||
return "", errors.Errorf("this environment version does not support the zrok Agent; please 'zrok update' this environment")
|
||||
}
|
||||
|
||||
func (r *Root) Obliterate() error {
|
||||
|
@ -196,6 +196,10 @@ func (r *Root) AgentSocket() (string, error) {
|
||||
return agentSocket()
|
||||
}
|
||||
|
||||
func (r *Root) AgentRegistry() (string, error) {
|
||||
return agentRegistry()
|
||||
}
|
||||
|
||||
func (r *Root) Obliterate() error {
|
||||
zrd, err := rootDir()
|
||||
if err != nil {
|
||||
|
@ -61,3 +61,11 @@ func agentSocket() (string, error) {
|
||||
}
|
||||
return filepath.Join(zrd, "agent.socket"), nil
|
||||
}
|
||||
|
||||
func agentRegistry() (string, error) {
|
||||
zrd, err := rootDir()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return filepath.Join(zrd, "agent-registry.json"), nil
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user