From aaa23beeecc461d7ac92838ffca91d56989f81dc Mon Sep 17 00:00:00 2001 From: Zoltan Papp Date: Mon, 10 Mar 2025 13:17:09 +0100 Subject: [PATCH] [client] Prevent to block channel writing (#3474) The "runningChan" provides feedback to the UI or any client about whether the service is up and running. If the client exits earlier than when the service successfully starts, then this channel causes a block. - Added timeout for reading the channel to ensure we don't cause blocks for too long for the caller - Modified channel writing operations to be non-blocking --- client/embed/embed.go | 15 ++++++--------- client/internal/connect.go | 14 +++++++------- client/server/server.go | 19 ++++++++++--------- 3 files changed, 23 insertions(+), 25 deletions(-) diff --git a/client/embed/embed.go b/client/embed/embed.go index 9ded618c5..fe95b1942 100644 --- a/client/embed/embed.go +++ b/client/embed/embed.go @@ -134,10 +134,11 @@ func (c *Client) Start(startCtx context.Context) error { // either startup error (permanent backoff err) or nil err (successful engine up) // TODO: make after-startup backoff err available - run := make(chan error, 1) + run := make(chan struct{}, 1) + clientErr := make(chan error, 1) go func() { if err := client.Run(run); err != nil { - run <- err + clientErr <- err } }() @@ -147,13 +148,9 @@ func (c *Client) Start(startCtx context.Context) error { return fmt.Errorf("stop error after context done. Stop error: %w. Context done: %w", stopErr, startCtx.Err()) } return startCtx.Err() - case err := <-run: - if err != nil { - if stopErr := client.Stop(); stopErr != nil { - return fmt.Errorf("stop error after failed to startup. Stop error: %w. Start error: %w", stopErr, err) - } - return fmt.Errorf("startup: %w", err) - } + case err := <-clientErr: + return fmt.Errorf("startup: %w", err) + case <-run: } c.connect = client diff --git a/client/internal/connect.go b/client/internal/connect.go index bf513ed39..7cbe47b74 100644 --- a/client/internal/connect.go +++ b/client/internal/connect.go @@ -61,7 +61,7 @@ func NewConnectClient( } // Run with main logic. -func (c *ConnectClient) Run(runningChan chan error) error { +func (c *ConnectClient) Run(runningChan chan struct{}) error { return c.run(MobileDependency{}, runningChan) } @@ -102,7 +102,7 @@ func (c *ConnectClient) RunOniOS( return c.run(mobileDependency, nil) } -func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan error) error { +func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan struct{}) error { defer func() { if r := recover(); r != nil { rec := c.statusRecorder @@ -159,7 +159,6 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan } defer c.statusRecorder.ClientStop() - runningChanOpen := true operation := func() error { // if context cancelled we not start new backoff cycle if c.isContextCancelled() { @@ -282,10 +281,11 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan log.Infof("Netbird engine started, the IP is: %s", peerConfig.GetAddress()) state.Set(StatusConnected) - if runningChan != nil && runningChanOpen { - runningChan <- nil - close(runningChan) - runningChanOpen = false + if runningChan != nil { + select { + case runningChan <- struct{}{}: + default: + } } <-engineCtx.Done() diff --git a/client/server/server.go b/client/server/server.go index 8907f541f..b112a994f 100644 --- a/client/server/server.go +++ b/client/server/server.go @@ -160,7 +160,7 @@ func (s *Server) Start() error { // mechanism to keep the client connected even when the connection is lost. // we cancel retry if the client receive a stop or down command, or if disable auto connect is configured. func (s *Server) connectWithRetryRuns(ctx context.Context, config *internal.Config, statusRecorder *peer.Status, - runningChan chan error, + runningChan chan struct{}, ) { backOff := getConnectWithBackoff(ctx) retryStarted := false @@ -628,20 +628,21 @@ func (s *Server) Up(callerCtx context.Context, _ *proto.UpRequest) (*proto.UpRes s.statusRecorder.UpdateManagementAddress(s.config.ManagementURL.String()) s.statusRecorder.UpdateRosenpass(s.config.RosenpassEnabled, s.config.RosenpassPermissive) - runningChan := make(chan error) - go s.connectWithRetryRuns(ctx, s.config, s.statusRecorder, runningChan) + timeoutCtx, cancel := context.WithTimeout(callerCtx, 10*time.Second) + defer cancel() + runningChan := make(chan struct{}, 1) // buffered channel to do not lose the signal + go s.connectWithRetryRuns(ctx, s.config, s.statusRecorder, runningChan) for { select { - case err := <-runningChan: - if err != nil { - log.Debugf("waiting for engine to become ready failed: %s", err) - } else { - return &proto.UpResponse{}, nil - } + case <-runningChan: + return &proto.UpResponse{}, nil case <-callerCtx.Done(): log.Debug("context done, stopping the wait for engine to become ready") return nil, callerCtx.Err() + case <-timeoutCtx.Done(): + log.Debug("up is timed out, stopping the wait for engine to become ready") + return nil, timeoutCtx.Err() } } }