[client] Replace engine probes with direct calls (#3195)

This commit is contained in:
Viktor Liu 2025-01-28 12:25:45 +01:00 committed by GitHub
parent 7335c82553
commit a7ddb8f1f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 69 additions and 187 deletions

View File

@ -190,7 +190,7 @@ func runInForegroundMode(ctx context.Context, cmd *cobra.Command) error {
r.GetFullStatus() r.GetFullStatus()
connectClient := internal.NewConnectClient(ctx, config, r) connectClient := internal.NewConnectClient(ctx, config, r)
return connectClient.Run() return connectClient.Run(nil)
} }
func runInDaemonMode(ctx context.Context, cmd *cobra.Command) error { func runInDaemonMode(ctx context.Context, cmd *cobra.Command) error {

View File

@ -59,13 +59,8 @@ func NewConnectClient(
} }
// Run with main logic. // Run with main logic.
func (c *ConnectClient) Run() error { func (c *ConnectClient) Run(runningChan chan error) error {
return c.run(MobileDependency{}, nil, nil) return c.run(MobileDependency{}, runningChan)
}
// RunWithProbes runs the client's main logic with probes attached
func (c *ConnectClient) RunWithProbes(probes *ProbeHolder, runningChan chan error) error {
return c.run(MobileDependency{}, probes, runningChan)
} }
// RunOnAndroid with main logic on mobile system // RunOnAndroid with main logic on mobile system
@ -84,7 +79,7 @@ func (c *ConnectClient) RunOnAndroid(
HostDNSAddresses: dnsAddresses, HostDNSAddresses: dnsAddresses,
DnsReadyListener: dnsReadyListener, DnsReadyListener: dnsReadyListener,
} }
return c.run(mobileDependency, nil, nil) return c.run(mobileDependency, nil)
} }
func (c *ConnectClient) RunOniOS( func (c *ConnectClient) RunOniOS(
@ -102,10 +97,10 @@ func (c *ConnectClient) RunOniOS(
DnsManager: dnsManager, DnsManager: dnsManager,
StateFilePath: stateFilePath, StateFilePath: stateFilePath,
} }
return c.run(mobileDependency, nil, nil) return c.run(mobileDependency, nil)
} }
func (c *ConnectClient) run(mobileDependency MobileDependency, probes *ProbeHolder, runningChan chan error) error { func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan error) error {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
log.Panicf("Panic occurred: %v, stack trace: %s", r, string(debug.Stack())) log.Panicf("Panic occurred: %v, stack trace: %s", r, string(debug.Stack()))
@ -261,7 +256,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, probes *ProbeHold
checks := loginResp.GetChecks() checks := loginResp.GetChecks()
c.engineMutex.Lock() c.engineMutex.Lock()
c.engine = NewEngineWithProbes(engineCtx, cancel, signalClient, mgmClient, relayManager, engineConfig, mobileDependency, c.statusRecorder, probes, checks) c.engine = NewEngine(engineCtx, cancel, signalClient, mgmClient, relayManager, engineConfig, mobileDependency, c.statusRecorder, checks)
c.engine.SetNetworkMapPersistence(c.persistNetworkMap) c.engine.SetNetworkMapPersistence(c.persistNetworkMap)
c.engineMutex.Unlock() c.engineMutex.Unlock()

View File

@ -175,8 +175,6 @@ type Engine struct {
dnsServer dns.Server dnsServer dns.Server
probes *ProbeHolder
// checks are the client-applied posture checks that need to be evaluated on the client // checks are the client-applied posture checks that need to be evaluated on the client
checks []*mgmProto.Checks checks []*mgmProto.Checks
@ -196,7 +194,7 @@ type Peer struct {
WgAllowedIps string WgAllowedIps string
} }
// NewEngine creates a new Connection Engine // NewEngine creates a new Connection Engine with probes attached
func NewEngine( func NewEngine(
clientCtx context.Context, clientCtx context.Context,
clientCancel context.CancelFunc, clientCancel context.CancelFunc,
@ -207,33 +205,6 @@ func NewEngine(
mobileDep MobileDependency, mobileDep MobileDependency,
statusRecorder *peer.Status, statusRecorder *peer.Status,
checks []*mgmProto.Checks, checks []*mgmProto.Checks,
) *Engine {
return NewEngineWithProbes(
clientCtx,
clientCancel,
signalClient,
mgmClient,
relayManager,
config,
mobileDep,
statusRecorder,
nil,
checks,
)
}
// NewEngineWithProbes creates a new Connection Engine with probes attached
func NewEngineWithProbes(
clientCtx context.Context,
clientCancel context.CancelFunc,
signalClient signal.Client,
mgmClient mgm.Client,
relayManager *relayClient.Manager,
config *EngineConfig,
mobileDep MobileDependency,
statusRecorder *peer.Status,
probes *ProbeHolder,
checks []*mgmProto.Checks,
) *Engine { ) *Engine {
engine := &Engine{ engine := &Engine{
clientCtx: clientCtx, clientCtx: clientCtx,
@ -251,7 +222,6 @@ func NewEngineWithProbes(
networkSerial: 0, networkSerial: 0,
sshServerFunc: nbssh.DefaultSSHServer, sshServerFunc: nbssh.DefaultSSHServer,
statusRecorder: statusRecorder, statusRecorder: statusRecorder,
probes: probes,
checks: checks, checks: checks,
connSemaphore: semaphoregroup.NewSemaphoreGroup(connInitLimit), connSemaphore: semaphoregroup.NewSemaphoreGroup(connInitLimit),
} }
@ -450,7 +420,6 @@ func (e *Engine) Start() error {
e.receiveSignalEvents() e.receiveSignalEvents()
e.receiveManagementEvents() e.receiveManagementEvents()
e.receiveProbeEvents()
// starting network monitor at the very last to avoid disruptions // starting network monitor at the very last to avoid disruptions
e.startNetworkMonitor() e.startNetworkMonitor()
@ -1513,72 +1482,58 @@ func (e *Engine) getRosenpassAddr() string {
return "" return ""
} }
func (e *Engine) receiveProbeEvents() { // RunHealthProbes executes health checks for Signal, Management, Relay and WireGuard services
if e.probes == nil { // and updates the status recorder with the latest states.
return func (e *Engine) RunHealthProbes() bool {
signalHealthy := e.signal.IsHealthy()
log.Debugf("signal health check: healthy=%t", signalHealthy)
managementHealthy := e.mgmClient.IsHealthy()
log.Debugf("management health check: healthy=%t", managementHealthy)
results := append(e.probeSTUNs(), e.probeTURNs()...)
e.statusRecorder.UpdateRelayStates(results)
relayHealthy := true
for _, res := range results {
if res.Err != nil {
relayHealthy = false
break
}
} }
if e.probes.SignalProbe != nil { log.Debugf("relay health check: healthy=%t", relayHealthy)
go e.probes.SignalProbe.Receive(e.ctx, func() bool {
healthy := e.signal.IsHealthy() for _, key := range e.peerStore.PeersPubKey() {
log.Debugf("received signal probe request, healthy: %t", healthy) wgStats, err := e.wgInterface.GetStats(key)
return healthy if err != nil {
}) log.Debugf("failed to get wg stats for peer %s: %s", key, err)
continue
}
// wgStats could be zero value, in which case we just reset the stats
if err := e.statusRecorder.UpdateWireGuardPeerState(key, wgStats); err != nil {
log.Debugf("failed to update wg stats for peer %s: %s", key, err)
}
} }
if e.probes.MgmProbe != nil { allHealthy := signalHealthy && managementHealthy && relayHealthy
go e.probes.MgmProbe.Receive(e.ctx, func() bool { log.Debugf("all health checks completed: healthy=%t", allHealthy)
healthy := e.mgmClient.IsHealthy() return allHealthy
log.Debugf("received management probe request, healthy: %t", healthy)
return healthy
})
}
if e.probes.RelayProbe != nil {
go e.probes.RelayProbe.Receive(e.ctx, func() bool {
healthy := true
results := append(e.probeSTUNs(), e.probeTURNs()...)
e.statusRecorder.UpdateRelayStates(results)
// A single failed server will result in a "failed" probe
for _, res := range results {
if res.Err != nil {
healthy = false
break
}
}
log.Debugf("received relay probe request, healthy: %t", healthy)
return healthy
})
}
if e.probes.WgProbe != nil {
go e.probes.WgProbe.Receive(e.ctx, func() bool {
log.Debug("received wg probe request")
for _, key := range e.peerStore.PeersPubKey() {
wgStats, err := e.wgInterface.GetStats(key)
if err != nil {
log.Debugf("failed to get wg stats for peer %s: %s", key, err)
}
// wgStats could be zero value, in which case we just reset the stats
if err := e.statusRecorder.UpdateWireGuardPeerState(key, wgStats); err != nil {
log.Debugf("failed to update wg stats for peer %s: %s", key, err)
}
}
return true
})
}
} }
func (e *Engine) probeSTUNs() []relay.ProbeResult { func (e *Engine) probeSTUNs() []relay.ProbeResult {
return relay.ProbeAll(e.ctx, relay.ProbeSTUN, e.STUNs) e.syncMsgMux.Lock()
stuns := slices.Clone(e.STUNs)
e.syncMsgMux.Unlock()
return relay.ProbeAll(e.ctx, relay.ProbeSTUN, stuns)
} }
func (e *Engine) probeTURNs() []relay.ProbeResult { func (e *Engine) probeTURNs() []relay.ProbeResult {
return relay.ProbeAll(e.ctx, relay.ProbeTURN, e.TURNs) e.syncMsgMux.Lock()
turns := slices.Clone(e.TURNs)
e.syncMsgMux.Unlock()
return relay.ProbeAll(e.ctx, relay.ProbeTURN, turns)
} }
func (e *Engine) restartEngine() { func (e *Engine) restartEngine() {

View File

@ -1,58 +0,0 @@
package internal
import "context"
type ProbeHolder struct {
MgmProbe *Probe
SignalProbe *Probe
RelayProbe *Probe
WgProbe *Probe
}
// Probe allows to run on-demand callbacks from different code locations.
// Pass the probe to a receiving and a sending end. The receiving end starts listening
// to requests with Receive and executes a callback when the sending end requests it
// by calling Probe.
type Probe struct {
request chan struct{}
result chan bool
ready bool
}
// NewProbe returns a new initialized probe.
func NewProbe() *Probe {
return &Probe{
request: make(chan struct{}),
result: make(chan bool),
}
}
// Probe requests the callback to be run and returns a bool indicating success.
// It always returns true as long as the receiver is not ready.
func (p *Probe) Probe() bool {
if !p.ready {
return true
}
p.request <- struct{}{}
return <-p.result
}
// Receive starts listening for probe requests. On such a request it runs the supplied
// callback func which must return a bool indicating success.
// Blocks until the passed context is cancelled.
func (p *Probe) Receive(ctx context.Context, callback func() bool) {
p.ready = true
defer func() {
p.ready = false
}()
for {
select {
case <-ctx.Done():
return
case <-p.request:
p.result <- callback()
}
}
}

View File

@ -63,12 +63,7 @@ type Server struct {
statusRecorder *peer.Status statusRecorder *peer.Status
sessionWatcher *internal.SessionWatcher sessionWatcher *internal.SessionWatcher
mgmProbe *internal.Probe lastProbe time.Time
signalProbe *internal.Probe
relayProbe *internal.Probe
wgProbe *internal.Probe
lastProbe time.Time
persistNetworkMap bool persistNetworkMap bool
} }
@ -86,12 +81,7 @@ func New(ctx context.Context, configPath, logFile string) *Server {
latestConfigInput: internal.ConfigInput{ latestConfigInput: internal.ConfigInput{
ConfigPath: configPath, ConfigPath: configPath,
}, },
logFile: logFile, logFile: logFile,
mgmProbe: internal.NewProbe(),
signalProbe: internal.NewProbe(),
relayProbe: internal.NewProbe(),
wgProbe: internal.NewProbe(),
persistNetworkMap: true, persistNetworkMap: true,
} }
} }
@ -202,14 +192,7 @@ func (s *Server) connectWithRetryRuns(ctx context.Context, config *internal.Conf
s.connectClient = internal.NewConnectClient(ctx, config, statusRecorder) s.connectClient = internal.NewConnectClient(ctx, config, statusRecorder)
s.connectClient.SetNetworkMapPersistence(s.persistNetworkMap) s.connectClient.SetNetworkMapPersistence(s.persistNetworkMap)
probes := internal.ProbeHolder{ err := s.connectClient.Run(runningChan)
MgmProbe: s.mgmProbe,
SignalProbe: s.signalProbe,
RelayProbe: s.relayProbe,
WgProbe: s.wgProbe,
}
err := s.connectClient.RunWithProbes(&probes, runningChan)
if err != nil { if err != nil {
log.Debugf("run client connection exited with error: %v. Will retry in the background", err) log.Debugf("run client connection exited with error: %v. Will retry in the background", err)
} }
@ -676,9 +659,13 @@ func (s *Server) Down(ctx context.Context, _ *proto.DownRequest) (*proto.DownRes
// Status returns the daemon status // Status returns the daemon status
func (s *Server) Status( func (s *Server) Status(
_ context.Context, ctx context.Context,
msg *proto.StatusRequest, msg *proto.StatusRequest,
) (*proto.StatusResponse, error) { ) (*proto.StatusResponse, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
@ -707,14 +694,17 @@ func (s *Server) Status(
} }
func (s *Server) runProbes() { func (s *Server) runProbes() {
if time.Since(s.lastProbe) > probeThreshold { if s.connectClient == nil {
managementHealthy := s.mgmProbe.Probe() return
signalHealthy := s.signalProbe.Probe() }
relayHealthy := s.relayProbe.Probe()
wgProbe := s.wgProbe.Probe()
// Update last time only if all probes were successful engine := s.connectClient.Engine()
if managementHealthy && signalHealthy && relayHealthy && wgProbe { if engine == nil {
return
}
if time.Since(s.lastProbe) > probeThreshold {
if engine.RunHealthProbes() {
s.lastProbe = time.Now() s.lastProbe = time.Now()
} }
} }