mirror of
https://github.com/netbirdio/netbird.git
synced 2025-06-20 09:47:49 +02:00
[client] Prevent to block channel writing (#3474)
The "runningChan" provides feedback to the UI or any client about whether the service is up and running. If the client exits earlier than when the service successfully starts, then this channel causes a block. - Added timeout for reading the channel to ensure we don't cause blocks for too long for the caller - Modified channel writing operations to be non-blocking
This commit is contained in:
parent
6bef474e9e
commit
aaa23beeec
@ -134,10 +134,11 @@ func (c *Client) Start(startCtx context.Context) error {
|
|||||||
|
|
||||||
// either startup error (permanent backoff err) or nil err (successful engine up)
|
// either startup error (permanent backoff err) or nil err (successful engine up)
|
||||||
// TODO: make after-startup backoff err available
|
// TODO: make after-startup backoff err available
|
||||||
run := make(chan error, 1)
|
run := make(chan struct{}, 1)
|
||||||
|
clientErr := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
if err := client.Run(run); err != nil {
|
if err := client.Run(run); err != nil {
|
||||||
run <- err
|
clientErr <- err
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -147,13 +148,9 @@ func (c *Client) Start(startCtx context.Context) error {
|
|||||||
return fmt.Errorf("stop error after context done. Stop error: %w. Context done: %w", stopErr, startCtx.Err())
|
return fmt.Errorf("stop error after context done. Stop error: %w. Context done: %w", stopErr, startCtx.Err())
|
||||||
}
|
}
|
||||||
return startCtx.Err()
|
return startCtx.Err()
|
||||||
case err := <-run:
|
case err := <-clientErr:
|
||||||
if err != nil {
|
|
||||||
if stopErr := client.Stop(); stopErr != nil {
|
|
||||||
return fmt.Errorf("stop error after failed to startup. Stop error: %w. Start error: %w", stopErr, err)
|
|
||||||
}
|
|
||||||
return fmt.Errorf("startup: %w", err)
|
return fmt.Errorf("startup: %w", err)
|
||||||
}
|
case <-run:
|
||||||
}
|
}
|
||||||
|
|
||||||
c.connect = client
|
c.connect = client
|
||||||
|
@ -61,7 +61,7 @@ func NewConnectClient(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Run with main logic.
|
// Run with main logic.
|
||||||
func (c *ConnectClient) Run(runningChan chan error) error {
|
func (c *ConnectClient) Run(runningChan chan struct{}) error {
|
||||||
return c.run(MobileDependency{}, runningChan)
|
return c.run(MobileDependency{}, runningChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -102,7 +102,7 @@ func (c *ConnectClient) RunOniOS(
|
|||||||
return c.run(mobileDependency, nil)
|
return c.run(mobileDependency, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan error) error {
|
func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan struct{}) error {
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
rec := c.statusRecorder
|
rec := c.statusRecorder
|
||||||
@ -159,7 +159,6 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
|
|||||||
}
|
}
|
||||||
|
|
||||||
defer c.statusRecorder.ClientStop()
|
defer c.statusRecorder.ClientStop()
|
||||||
runningChanOpen := true
|
|
||||||
operation := func() error {
|
operation := func() error {
|
||||||
// if context cancelled we not start new backoff cycle
|
// if context cancelled we not start new backoff cycle
|
||||||
if c.isContextCancelled() {
|
if c.isContextCancelled() {
|
||||||
@ -282,10 +281,11 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
|
|||||||
log.Infof("Netbird engine started, the IP is: %s", peerConfig.GetAddress())
|
log.Infof("Netbird engine started, the IP is: %s", peerConfig.GetAddress())
|
||||||
state.Set(StatusConnected)
|
state.Set(StatusConnected)
|
||||||
|
|
||||||
if runningChan != nil && runningChanOpen {
|
if runningChan != nil {
|
||||||
runningChan <- nil
|
select {
|
||||||
close(runningChan)
|
case runningChan <- struct{}{}:
|
||||||
runningChanOpen = false
|
default:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
<-engineCtx.Done()
|
<-engineCtx.Done()
|
||||||
|
@ -160,7 +160,7 @@ func (s *Server) Start() error {
|
|||||||
// mechanism to keep the client connected even when the connection is lost.
|
// mechanism to keep the client connected even when the connection is lost.
|
||||||
// we cancel retry if the client receive a stop or down command, or if disable auto connect is configured.
|
// we cancel retry if the client receive a stop or down command, or if disable auto connect is configured.
|
||||||
func (s *Server) connectWithRetryRuns(ctx context.Context, config *internal.Config, statusRecorder *peer.Status,
|
func (s *Server) connectWithRetryRuns(ctx context.Context, config *internal.Config, statusRecorder *peer.Status,
|
||||||
runningChan chan error,
|
runningChan chan struct{},
|
||||||
) {
|
) {
|
||||||
backOff := getConnectWithBackoff(ctx)
|
backOff := getConnectWithBackoff(ctx)
|
||||||
retryStarted := false
|
retryStarted := false
|
||||||
@ -628,20 +628,21 @@ func (s *Server) Up(callerCtx context.Context, _ *proto.UpRequest) (*proto.UpRes
|
|||||||
s.statusRecorder.UpdateManagementAddress(s.config.ManagementURL.String())
|
s.statusRecorder.UpdateManagementAddress(s.config.ManagementURL.String())
|
||||||
s.statusRecorder.UpdateRosenpass(s.config.RosenpassEnabled, s.config.RosenpassPermissive)
|
s.statusRecorder.UpdateRosenpass(s.config.RosenpassEnabled, s.config.RosenpassPermissive)
|
||||||
|
|
||||||
runningChan := make(chan error)
|
timeoutCtx, cancel := context.WithTimeout(callerCtx, 10*time.Second)
|
||||||
go s.connectWithRetryRuns(ctx, s.config, s.statusRecorder, runningChan)
|
defer cancel()
|
||||||
|
|
||||||
|
runningChan := make(chan struct{}, 1) // buffered channel to do not lose the signal
|
||||||
|
go s.connectWithRetryRuns(ctx, s.config, s.statusRecorder, runningChan)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case err := <-runningChan:
|
case <-runningChan:
|
||||||
if err != nil {
|
|
||||||
log.Debugf("waiting for engine to become ready failed: %s", err)
|
|
||||||
} else {
|
|
||||||
return &proto.UpResponse{}, nil
|
return &proto.UpResponse{}, nil
|
||||||
}
|
|
||||||
case <-callerCtx.Done():
|
case <-callerCtx.Done():
|
||||||
log.Debug("context done, stopping the wait for engine to become ready")
|
log.Debug("context done, stopping the wait for engine to become ready")
|
||||||
return nil, callerCtx.Err()
|
return nil, callerCtx.Err()
|
||||||
|
case <-timeoutCtx.Done():
|
||||||
|
log.Debug("up is timed out, stopping the wait for engine to become ready")
|
||||||
|
return nil, timeoutCtx.Err()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user