diff --git a/relay/client/client.go b/relay/client/client.go index 3e7fe185a..b2235ec46 100644 --- a/relay/client/client.go +++ b/relay/client/client.go @@ -146,6 +146,11 @@ func (c *Client) readLoop() { log := log.WithField("client_id", c.peerID) var errExit error var n int + err := c.relayConn.SetReadDeadline(time.Time{}) + if err != nil { + log.Errorf("failed to set read deadline: %s", err) + return + } for { buf := make([]byte, bufferSize) // todo optimise buffer size, use pool n, errExit = c.relayConn.Read(buf) @@ -175,7 +180,13 @@ func (c *Client) readLoop() { log.Errorf("failed to parse transport message: %v", err) continue } - go c.handleTransport(channelId, buf[:n]) + container, ok := c.channels[channelId] + if !ok { + log.Errorf("unexpected transport message for channel: %d", channelId) + return + } + + container.messages <- buf[:n] } } @@ -194,7 +205,7 @@ func (c *Client) handleBindResponse(channelId uint16, peerId string) { } delete(c.channelsPending, peerId) - messageBuffer := make(chan []byte, 10) + messageBuffer := make(chan []byte, 2) conn := NewConn(c, channelId, c.generateConnReaderFN(messageBuffer)) c.channels[channelId] = &connContainer{ @@ -206,20 +217,6 @@ func (c *Client) handleBindResponse(channelId uint16, peerId string) { bindSuccessChan <- conn } -func (c *Client) handleTransport(channelId uint16, msg []byte) { - container, ok := c.channels[channelId] - if !ok { - log.Errorf("unexpected transport message for channel: %d", channelId) - return - } - - select { - case container.messages <- msg: - default: - log.Errorf("dropping message for channel: %d", channelId) - } -} - func (c *Client) writeTo(channelID uint16, payload []byte) (int, error) { msg := messages.MarshalTransportMsg(channelID, payload) n, err := c.relayConn.Write(msg)