Merge branch 'main' into v1_0_0_docs_update

This commit is contained in:
Michael Quigley
2025-04-01 12:43:58 -04:00
15 changed files with 326 additions and 57 deletions

View File

@ -2,6 +2,8 @@
## v1.0.1
FEATURE: The zrok Agent now persists private accesses and reserved shares between executions. Any `zrok access private` instances or `zrok share reserved` instances created using the agent are now persisted to a registry stored in `${HOME}/.zrok`. When restarting the agent these accesses and reserved shares are re-created from the data in this registry (https://github.com/openziti/zrok/pull/922)
FEATURE: zrok-agent Linux package runs the agent as a user service (https://github.com/openziti/zrok/issues/883)
CHANGE: let the Docker instance set the Caddy HTTPS port (https://github.com/openziti/zrok/pull/920)

View File

@ -6,6 +6,16 @@ import (
"github.com/openziti/zrok/cmd/zrok/subordinate"
)
type AccessPrivateRequest struct {
Token string `json:"token"`
BindAddress string `json:"bind_address"`
AutoMode bool `json:"auto_mode"`
AutoAddress string `json:"auto_address"`
AutoStartPort uint16 `json:"auto_start_port"`
AutoEndPort uint16 `json:"auto_end_port"`
ResponseHeaders []string `json:"response_headers"`
}
type access struct {
frontendToken string
token string
@ -16,6 +26,8 @@ type access struct {
autoEndPort uint16
responseHeaders []string
request *AccessPrivateRequest
process *proctree.Child
sub *subordinate.MessageHandler

View File

@ -12,14 +12,14 @@ import (
"os"
)
func (i *agentGrpcImpl) AccessPrivate(_ context.Context, req *agentGrpc.AccessPrivateRequest) (*agentGrpc.AccessPrivateResponse, error) {
func (a *Agent) AccessPrivate(req *AccessPrivateRequest) (frontendToken string, err error) {
root, err := environment.LoadRoot()
if err != nil {
return nil, err
return "", err
}
if !root.IsEnabled() {
return nil, errors.New("unable to load environment; did you 'zrok enable'?")
return "", errors.New("unable to load environment; did you 'zrok enable'?")
}
accCmd := []string{os.Args[0], "access", "private", "--subordinate", "-b", req.BindAddress, req.Token}
@ -34,11 +34,12 @@ func (i *agentGrpcImpl) AccessPrivate(_ context.Context, req *agentGrpc.AccessPr
bindAddress: req.BindAddress,
autoMode: req.AutoMode,
autoAddress: req.AutoAddress,
autoStartPort: uint16(req.AutoStartPort),
autoEndPort: uint16(req.AutoEndPort),
autoStartPort: req.AutoStartPort,
autoEndPort: req.AutoEndPort,
responseHeaders: req.ResponseHeaders,
request: req,
sub: subordinate.NewMessageHandler(),
agent: i.agent,
agent: a,
}
acc.sub.MessageHandler = func(msg subordinate.Message) {
logrus.Info(msg)
@ -74,20 +75,36 @@ func (i *agentGrpcImpl) AccessPrivate(_ context.Context, req *agentGrpc.AccessPr
acc.process, err = proctree.StartChild(acc.sub.Tail, accCmd...)
if err != nil {
return nil, err
return "", err
}
<-acc.sub.BootComplete
if bootErr == nil {
go acc.monitor()
i.agent.addAccess <- acc
return &agentGrpc.AccessPrivateResponse{FrontendToken: acc.frontendToken}, nil
a.addAccess <- acc
return acc.frontendToken, nil
} else {
if err := proctree.WaitChild(acc.process); err != nil {
logrus.Errorf("error joining: %v", err)
}
return nil, fmt.Errorf("unable to start access: %v", bootErr)
return "", fmt.Errorf("unable to start access: %v", bootErr)
}
}
func (i *agentGrpcImpl) AccessPrivate(_ context.Context, req *agentGrpc.AccessPrivateRequest) (*agentGrpc.AccessPrivateResponse, error) {
if frontendToken, err := i.agent.AccessPrivate(&AccessPrivateRequest{
Token: req.Token,
BindAddress: req.BindAddress,
AutoMode: req.AutoMode,
AutoAddress: req.AutoAddress,
AutoStartPort: uint16(req.AutoStartPort),
AutoEndPort: uint16(req.AutoEndPort),
ResponseHeaders: req.ResponseHeaders,
}); err == nil {
return &agentGrpc.AccessPrivateResponse{FrontendToken: frontendToken}, nil
} else {
return nil, err
}
}

View File

@ -19,16 +19,17 @@ import (
)
type Agent struct {
cfg *AgentConfig
httpEndpoint string
root env_core.Root
agentSocket string
shares map[string]*share
addShare chan *share
rmShare chan *share
accesses map[string]*access
addAccess chan *access
rmAccess chan *access
cfg *AgentConfig
httpEndpoint string
root env_core.Root
agentSocket string
shares map[string]*share
addShare chan *share
rmShare chan *share
accesses map[string]*access
addAccess chan *access
rmAccess chan *access
persistRegistry bool
}
func NewAgent(cfg *AgentConfig, root env_core.Root) (*Agent, error) {
@ -67,6 +68,12 @@ func (a *Agent) Run() error {
go a.manager()
go a.gateway(a.cfg)
a.persistRegistry = false
if err := a.ReloadRegistry(); err != nil {
logrus.Errorf("error reloading registry '%v'", err)
}
a.persistRegistry = true
srv := grpc.NewServer()
agentGrpc.RegisterAgentServer(srv, &agentGrpcImpl{agent: a})
if err := srv.Serve(l); err != nil {
@ -79,6 +86,7 @@ func (a *Agent) Run() error {
func (a *Agent) Shutdown() {
logrus.Infof("stopping")
a.persistRegistry = false
if err := os.Remove(a.agentSocket); err != nil {
logrus.Warnf("unable to remove agent socket: %v", err)
}
@ -96,6 +104,60 @@ func (a *Agent) Config() *AgentConfig {
return a.cfg
}
func (a *Agent) ReloadRegistry() error {
registryPath, err := a.root.AgentRegistry()
if err != nil {
return err
}
registry, err := LoadRegistry(registryPath)
if err != nil {
return err
}
logrus.Infof("loaded %d reserved shares, %d accesses", len(registry.ReservedShares), len(registry.PrivateAccesses))
for _, req := range registry.ReservedShares {
if resp, err := a.ShareReserved(req); err == nil {
logrus.Infof("restarted reserved share '%v' -> '%v'", req, resp)
} else {
logrus.Errorf("error restarting reserved share '%v': %v", req, err)
}
}
for _, req := range registry.PrivateAccesses {
if resp, err := a.AccessPrivate(req); err == nil {
logrus.Infof("restarted private access '%v' -> '%v'", req, resp)
} else {
logrus.Errorf("error restarting private access '%v': %v", req, err)
}
}
logrus.Infof("reload complete")
return nil
}
func (a *Agent) SaveRegistry() error {
r := &Registry{}
for _, shr := range a.shares {
if shr.request != nil {
switch shr.request.(type) {
case *ShareReservedRequest:
logrus.Infof("persisting reserved share '%v'", shr.token)
r.ReservedShares = append(r.ReservedShares, shr.request.(*ShareReservedRequest))
}
}
}
for _, acc := range a.accesses {
if acc.request != nil {
r.PrivateAccesses = append(r.PrivateAccesses, acc.request)
}
}
registryPath, err := a.root.AgentRegistry()
if err != nil {
return err
}
if err := r.Save(registryPath); err != nil {
return err
}
return nil
}
func (a *Agent) gateway(cfg *AgentConfig) {
logrus.Info("started")
defer logrus.Warn("exited")
@ -132,6 +194,12 @@ func (a *Agent) manager() {
logrus.Infof("adding new share '%v'", inShare.token)
a.shares[inShare.token] = inShare
if a.persistRegistry {
if err := a.SaveRegistry(); err != nil {
logrus.Errorf("unable to persist registry: %v", err)
}
}
case outShare := <-a.rmShare:
if shr, found := a.shares[outShare.token]; found {
logrus.Infof("removing share '%v'", shr.token)
@ -147,6 +215,13 @@ func (a *Agent) manager() {
}
}
delete(a.shares, shr.token)
if a.persistRegistry {
if err := a.SaveRegistry(); err != nil {
logrus.Errorf("unable to persist registry: %v", err)
}
}
} else {
logrus.Debug("skipping unidentified (orphaned) share removal")
}
@ -155,6 +230,12 @@ func (a *Agent) manager() {
logrus.Infof("adding new access '%v'", inAccess.frontendToken)
a.accesses[inAccess.frontendToken] = inAccess
if a.persistRegistry {
if err := a.SaveRegistry(); err != nil {
logrus.Errorf("unable to persist registry: %v", err)
}
}
case outAccess := <-a.rmAccess:
if acc, found := a.accesses[outAccess.frontendToken]; found {
logrus.Infof("removing access '%v'", acc.frontendToken)
@ -168,6 +249,13 @@ func (a *Agent) manager() {
logrus.Errorf("error deleting access '%v': %v", acc.frontendToken, err)
}
delete(a.accesses, acc.frontendToken)
if a.persistRegistry {
if err := a.SaveRegistry(); err != nil {
logrus.Errorf("unable to persist registry: %v", err)
}
}
} else {
logrus.Debug("skipping unidentified (orphaned) access removal")
}

39
agent/registry.go Normal file
View File

@ -0,0 +1,39 @@
package agent
import (
"encoding/json"
"fmt"
"os"
)
const RegistryV = "1"
type Registry struct {
V string `json:"v"`
ReservedShares []*ShareReservedRequest `json:"reserved_shares"`
PrivateAccesses []*AccessPrivateRequest `json:"private_accesses"`
}
func LoadRegistry(path string) (*Registry, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
r := &Registry{}
if err := json.Unmarshal(data, r); err != nil {
return nil, err
}
if r.V != RegistryV {
return nil, fmt.Errorf("invalid registry version '%v'; expected '%v", r.V, RegistryV)
}
return r, nil
}
func (r *Registry) Save(path string) error {
r.V = RegistryV
data, err := json.MarshalIndent(r, "", " ")
if err != nil {
return err
}
return os.WriteFile(path, data, 0644)
}

View File

@ -7,13 +7,16 @@ import (
"github.com/sirupsen/logrus"
)
func (i *agentGrpcImpl) ReleaseAccess(_ context.Context, req *agentGrpc.ReleaseAccessRequest) (*agentGrpc.ReleaseAccessResponse, error) {
if acc, found := i.agent.accesses[req.FrontendToken]; found {
i.agent.rmAccess <- acc
func (a *Agent) ReleaseAccess(frontendToken string) error {
if acc, found := a.accesses[frontendToken]; found {
a.rmAccess <- acc
logrus.Infof("released access '%v'", acc.frontendToken)
} else {
return nil, errors.Errorf("agent has no access with frontend token '%v'", req.FrontendToken)
return errors.Errorf("agent has no access with frontend token '%v'", frontendToken)
}
return nil, nil
return nil
}
func (i *agentGrpcImpl) ReleaseAccess(_ context.Context, req *agentGrpc.ReleaseAccessRequest) (*agentGrpc.ReleaseAccessResponse, error) {
return nil, i.agent.ReleaseAccess(req.FrontendToken)
}

View File

@ -7,13 +7,16 @@ import (
"github.com/sirupsen/logrus"
)
func (i *agentGrpcImpl) ReleaseShare(_ context.Context, req *agentGrpc.ReleaseShareRequest) (*agentGrpc.ReleaseShareResponse, error) {
if shr, found := i.agent.shares[req.Token]; found {
i.agent.rmShare <- shr
func (a *Agent) ReleaseShare(shareToken string) error {
if shr, found := a.shares[shareToken]; found {
a.rmShare <- shr
logrus.Infof("released share '%v'", shr.token)
} else {
return nil, errors.Errorf("agent has no share with token '%v'", req.Token)
errors.Errorf("agent has no share with token '%v'", shareToken)
}
return nil, nil
return nil
}
func (i *agentGrpcImpl) ReleaseShare(_ context.Context, req *agentGrpc.ReleaseShareRequest) (*agentGrpc.ReleaseShareResponse, error) {
return nil, i.agent.ReleaseShare(req.Token)
}

View File

@ -9,6 +9,41 @@ import (
"time"
)
type SharePrivateRequest struct {
Target string `json:"target"`
BackendMode string `json:"backend_mode"`
Insecure bool `json:"insecure"`
Closed bool `json:"closed"`
AccessGrants []string `json:"access_grants"`
}
type SharePublicRequest struct {
Target string `json:"target"`
BasicAuth []string `json:"basic_auth"`
FrontendSelection []string `json:"frontend_selection"`
BackendMode string `json:"backend_mode"`
Insecure bool `json:"insecure"`
OauthProvider string `json:"oauth_provider"`
OauthEmailAddressPatterns []string `json:"oauth_email_address_patterns"`
OauthCheckInterval string `json:"oauth_check_interval"`
Closed bool `json:"closed"`
AccessGrants []string `json:"access_grants"`
}
type ShareReservedRequest struct {
Token string `json:"token"`
OverrideEndpoint string `json:"override_endpoint"`
Insecure bool `json:"insecure"`
}
type ShareReservedResponse struct {
Token string
BackendMode string
ShareMode string
FrontendEndpoints []string
Target string
}
type share struct {
token string
frontendEndpoints []string
@ -25,6 +60,8 @@ type share struct {
closed bool
accessGrants []string
request interface{}
process *proctree.Child
sub *subordinate.MessageHandler

View File

@ -13,22 +13,23 @@ import (
"os"
)
func (i *agentGrpcImpl) SharePrivate(_ context.Context, req *agentGrpc.SharePrivateRequest) (*agentGrpc.SharePrivateResponse, error) {
func (a *Agent) SharePrivate(req *SharePrivateRequest) (shareToken string, err error) {
root, err := environment.LoadRoot()
if err != nil {
return nil, err
return "", err
}
if !root.IsEnabled() {
return nil, errors.New("unable to load environment; did you 'zrok enable'?")
return "", errors.New("unable to load environment; did you 'zrok enable'?")
}
shrCmd := []string{os.Args[0], "share", "private", "--subordinate", "-b", req.BackendMode}
shr := &share{
shareMode: sdk.PrivateShareMode,
backendMode: sdk.BackendMode(req.BackendMode),
request: req,
sub: subordinate.NewMessageHandler(),
agent: i.agent,
agent: a,
}
shr.sub.MessageHandler = func(msg subordinate.Message) {
logrus.Info(msg)
@ -63,20 +64,34 @@ func (i *agentGrpcImpl) SharePrivate(_ context.Context, req *agentGrpc.SharePriv
shr.process, err = proctree.StartChild(shr.sub.Tail, shrCmd...)
if err != nil {
return nil, err
return "", err
}
<-shr.sub.BootComplete
if bootErr == nil {
go shr.monitor()
i.agent.addShare <- shr
return &agentGrpc.SharePrivateResponse{Token: shr.token}, nil
a.addShare <- shr
return shr.token, nil
} else {
if err := proctree.WaitChild(shr.process); err != nil {
logrus.Errorf("error joining: %v", err)
}
return nil, fmt.Errorf("unable to start share: %v", bootErr)
return "", fmt.Errorf("unable to start share: %v", bootErr)
}
}
func (i *agentGrpcImpl) SharePrivate(_ context.Context, req *agentGrpc.SharePrivateRequest) (*agentGrpc.SharePrivateResponse, error) {
if shareToken, err := i.agent.SharePrivate(&SharePrivateRequest{
Target: req.Target,
BackendMode: req.BackendMode,
Insecure: req.Insecure,
Closed: req.Closed,
AccessGrants: req.AccessGrants,
}); err == nil {
return &agentGrpc.SharePrivateResponse{Token: shareToken}, nil
} else {
return nil, err
}
}

View File

@ -13,22 +13,23 @@ import (
"os"
)
func (i *agentGrpcImpl) SharePublic(_ context.Context, req *agentGrpc.SharePublicRequest) (*agentGrpc.SharePublicResponse, error) {
func (a *Agent) SharePublic(req *SharePublicRequest) (shareToken string, frontendEndpoint []string, err error) {
root, err := environment.LoadRoot()
if err != nil {
return nil, err
return "", nil, err
}
if !root.IsEnabled() {
return nil, errors.New("unable to load environment; did you 'zrok enable'?")
return "", nil, errors.New("unable to load environment; did you 'zrok enable'?")
}
shrCmd := []string{os.Args[0], "share", "public", "--subordinate", "-b", req.BackendMode}
shr := &share{
shareMode: sdk.PublicShareMode,
backendMode: sdk.BackendMode(req.BackendMode),
request: req,
sub: subordinate.NewMessageHandler(),
agent: i.agent,
agent: a,
}
shr.sub.MessageHandler = func(msg subordinate.Message) {
logrus.Info(msg)
@ -87,23 +88,39 @@ func (i *agentGrpcImpl) SharePublic(_ context.Context, req *agentGrpc.SharePubli
shr.process, err = proctree.StartChild(shr.sub.Tail, shrCmd...)
if err != nil {
return nil, err
return "", nil, err
}
<-shr.sub.BootComplete
if bootErr == nil {
go shr.monitor()
i.agent.addShare <- shr
return &agentGrpc.SharePublicResponse{
Token: shr.token,
FrontendEndpoints: shr.frontendEndpoints,
}, nil
a.addShare <- shr
return shr.token, shr.frontendEndpoints, nil
} else {
if err := proctree.WaitChild(shr.process); err != nil {
logrus.Errorf("error joining: %v", err)
}
return nil, fmt.Errorf("unable to start share: %v", bootErr)
return "", nil, fmt.Errorf("unable to start share: %v", bootErr)
}
}
func (i *agentGrpcImpl) SharePublic(_ context.Context, req *agentGrpc.SharePublicRequest) (*agentGrpc.SharePublicResponse, error) {
if shareToken, frontendEndpoints, err := i.agent.SharePublic(&SharePublicRequest{
Target: req.Target,
BasicAuth: req.BasicAuth,
FrontendSelection: req.FrontendSelection,
BackendMode: req.BackendMode,
Insecure: req.Insecure,
OauthProvider: req.OauthProvider,
OauthEmailAddressPatterns: req.OauthEmailAddressPatterns,
OauthCheckInterval: req.OauthCheckInterval,
Closed: req.Closed,
AccessGrants: req.AccessGrants,
}); err == nil {
return &agentGrpc.SharePublicResponse{Token: shareToken, FrontendEndpoints: frontendEndpoints}, nil
} else {
return nil, err
}
}

View File

@ -12,7 +12,7 @@ import (
"os"
)
func (i *agentGrpcImpl) ShareReserved(_ context.Context, req *agentGrpc.ShareReservedRequest) (*agentGrpc.ShareReservedResponse, error) {
func (a *Agent) ShareReserved(req *ShareReservedRequest) (*ShareReservedResponse, error) {
root, err := environment.LoadRoot()
if err != nil {
return nil, err
@ -25,8 +25,9 @@ func (i *agentGrpcImpl) ShareReserved(_ context.Context, req *agentGrpc.ShareRes
shrCmd := []string{os.Args[0], "share", "reserved", "--subordinate"}
shr := &share{
reserved: true,
request: req,
sub: subordinate.NewMessageHandler(),
agent: i.agent,
agent: a,
}
shr.sub.MessageHandler = func(msg subordinate.Message) {
logrus.Info(msg)
@ -60,8 +61,8 @@ func (i *agentGrpcImpl) ShareReserved(_ context.Context, req *agentGrpc.ShareRes
if bootErr == nil {
go shr.monitor()
i.agent.addShare <- shr
return &agentGrpc.ShareReservedResponse{
a.addShare <- shr
return &ShareReservedResponse{
Token: shr.token,
BackendMode: string(shr.backendMode),
ShareMode: string(shr.shareMode),
@ -76,3 +77,21 @@ func (i *agentGrpcImpl) ShareReserved(_ context.Context, req *agentGrpc.ShareRes
return nil, fmt.Errorf("unable to start share: %v", bootErr)
}
}
func (i *agentGrpcImpl) ShareReserved(_ context.Context, req *agentGrpc.ShareReservedRequest) (*agentGrpc.ShareReservedResponse, error) {
if resp, err := i.agent.ShareReserved(&ShareReservedRequest{
Token: req.Token,
OverrideEndpoint: req.OverrideEndpoint,
Insecure: req.Insecure,
}); err == nil {
return &agentGrpc.ShareReservedResponse{
Token: resp.Token,
BackendMode: resp.BackendMode,
ShareMode: resp.ShareMode,
FrontendEndpoints: resp.FrontendEndpoints,
Target: resp.Target,
}, nil
} else {
return nil, err
}
}

View File

@ -29,6 +29,7 @@ type Root interface {
DeleteZitiIdentityNamed(name string) error
AgentSocket() (string, error)
AgentRegistry() (string, error)
}
type Environment struct {

View File

@ -194,7 +194,11 @@ func (r *Root) DeleteZitiIdentityNamed(name string) error {
}
func (r *Root) AgentSocket() (string, error) {
return "", errors.Errorf("this environment version does not support agent sockets; please 'zrok update' this environment")
return "", errors.Errorf("this environment version does not support the zrok Agent; please 'zrok update' this environment")
}
func (r *Root) AgentRegistry() (string, error) {
return "", errors.Errorf("this environment version does not support the zrok Agent; please 'zrok update' this environment")
}
func (r *Root) Obliterate() error {

View File

@ -196,6 +196,10 @@ func (r *Root) AgentSocket() (string, error) {
return agentSocket()
}
func (r *Root) AgentRegistry() (string, error) {
return agentRegistry()
}
func (r *Root) Obliterate() error {
zrd, err := rootDir()
if err != nil {

View File

@ -61,3 +61,11 @@ func agentSocket() (string, error) {
}
return filepath.Join(zrd, "agent.socket"), nil
}
func agentRegistry() (string, error) {
zrd, err := rootDir()
if err != nil {
return "", err
}
return filepath.Join(zrd, "agent-registry.json"), nil
}