Fix msg delivery timeouts

This commit is contained in:
Zoltán Papp 2024-06-26 16:22:26 +02:00
parent f0eb004582
commit c0efce6556
7 changed files with 40 additions and 36 deletions

View File

@ -250,13 +250,6 @@ func (c *Client) connect() error {
} }
func (c *Client) handShake() error { func (c *Client) handShake() error {
defer func() {
err := c.relayConn.SetReadDeadline(time.Time{})
if err != nil {
log.Errorf("failed to reset read deadline: %s", err)
}
}()
msg, err := messages.MarshalHelloMsg(c.hashedID) msg, err := messages.MarshalHelloMsg(c.hashedID)
if err != nil { if err != nil {
log.Errorf("failed to marshal hello message: %s", err) log.Errorf("failed to marshal hello message: %s", err)
@ -267,15 +260,8 @@ func (c *Client) handShake() error {
log.Errorf("failed to send hello message: %s", err) log.Errorf("failed to send hello message: %s", err)
return err return err
} }
buf := make([]byte, messages.MaxHandshakeSize)
err = c.relayConn.SetReadDeadline(time.Now().Add(serverResponseTimeout)) n, err := c.readWithTimeout(buf)
if err != nil {
log.Errorf("failed to set read deadline: %s", err)
return err
}
buf := make([]byte, 1500) // todo: optimise buffer size
n, err := c.relayConn.Read(buf)
if err != nil { if err != nil {
log.Errorf("failed to read hello response: %s", err) log.Errorf("failed to read hello response: %s", err)
return err return err
@ -391,6 +377,29 @@ func (c *Client) closeAllConns() {
c.conns = make(map[string]*connContainer) c.conns = make(map[string]*connContainer)
} }
func (c *Client) readWithTimeout(buf []byte) (int, error) {
ctx, cancel := context.WithTimeout(c.parentCtx, serverResponseTimeout)
defer cancel()
readDone := make(chan struct{})
var (
n int
err error
)
go func() {
n, err = c.relayConn.Read(buf)
close(readDone)
}()
select {
case <-ctx.Done():
return 0, fmt.Errorf("read operation timed out")
case <-readDone:
return n, err
}
}
// todo check by reference too, the id is not enought because the id come from the outer conn // todo check by reference too, the id is not enought because the id come from the outer conn
func (c *Client) closeConn(id string) error { func (c *Client) closeConn(id string) error {
c.mu.Lock() c.mu.Lock()

View File

@ -51,27 +51,14 @@ func (c *Conn) LocalAddr() net.Addr {
} }
func (c *Conn) SetReadDeadline(t time.Time) error { func (c *Conn) SetReadDeadline(t time.Time) error {
// todo: implement me
return nil return nil
} }
func (c *Conn) SetWriteDeadline(t time.Time) error { func (c *Conn) SetWriteDeadline(t time.Time) error {
// todo: implement me
return nil return nil
} }
func (c *Conn) SetDeadline(t time.Time) error { func (c *Conn) SetDeadline(t time.Time) error {
// todo: implement me
errR := c.SetReadDeadline(t)
errW := c.SetWriteDeadline(t)
if errR != nil {
return errR
}
if errW != nil {
return errW
}
return nil return nil
} }

View File

@ -194,7 +194,6 @@ func (m *Manager) isForeignServer(address string) (bool, error) {
if err != nil { if err != nil {
return false, fmt.Errorf("relay client not connected") return false, fmt.Errorf("relay client not connected")
} }
log.Debugf("check if foreign server: %s != %s", rAddr.String(), address)
return rAddr.String() != address, nil return rAddr.String() != address, nil
} }

View File

@ -15,6 +15,8 @@ const (
headerSizeTransport = 1 + IDSize // 1 byte for msg type, IDSize for peerID headerSizeTransport = 1 + IDSize // 1 byte for msg type, IDSize for peerID
headerSizeHello = 1 + 4 + IDSize // 1 byte for msg type, 4 byte for magic header, IDSize for peerID headerSizeHello = 1 + 4 + IDSize // 1 byte for msg type, 4 byte for magic header, IDSize for peerID
MaxHandshakeSize = 90
) )
var ( var (

View File

@ -13,6 +13,10 @@ import (
"nhooyr.io/websocket" "nhooyr.io/websocket"
) )
const (
writeTimeout = 10 * time.Second
)
type Conn struct { type Conn struct {
*websocket.Conn *websocket.Conn
lAddr *net.TCPAddr lAddr *net.TCPAddr
@ -50,8 +54,14 @@ func (c *Conn) Read(b []byte) (n int, err error) {
return n, err return n, err
} }
// Write writes a binary message with the given payload.
// It does not block until fill the internal buffer.
// If the buffer filled up, wait until the buffer is drained or timeout.
func (c *Conn) Write(b []byte) (int, error) { func (c *Conn) Write(b []byte) (int, error) {
err := c.Conn.Write(c.ctx, websocket.MessageBinary, b) ctx, ctxCancel := context.WithTimeout(c.ctx, writeTimeout)
defer ctxCancel()
err := c.Conn.Write(ctx, websocket.MessageBinary, b)
return len(b), err return len(b), err
} }

View File

@ -29,7 +29,6 @@ func NewListener(address string) listener.Listener {
} }
} }
// Listen todo: prevent multiple call
func (l *Listener) Listen(acceptFn func(conn net.Conn)) error { func (l *Listener) Listen(acceptFn func(conn net.Conn)) error {
l.acceptFn = acceptFn l.acceptFn = acceptFn
mux := http.NewServeMux() mux := http.NewServeMux()

View File

@ -17,7 +17,6 @@ import (
const ( const (
bufferSize = 8820 bufferSize = 8820
maxHandshakeSize = 90
) )
type Server struct { type Server struct {
@ -135,7 +134,6 @@ func (r *Server) accept(conn net.Conn) {
peer.Log.Errorf("failed to update transport message: %s", err) peer.Log.Errorf("failed to update transport message: %s", err)
continue continue
} }
peer.Log.Infof("write transport msg!!!!")
_, err = dp.conn.Write(msg) _, err = dp.conn.Write(msg)
if err != nil { if err != nil {
peer.Log.Errorf("failed to write transport message to: %s", dp.String()) peer.Log.Errorf("failed to write transport message to: %s", dp.String())
@ -168,7 +166,7 @@ func (r *Server) sendCloseMsgs() {
} }
func handShake(conn net.Conn) (*Peer, error) { func handShake(conn net.Conn) (*Peer, error) {
buf := make([]byte, maxHandshakeSize) buf := make([]byte, messages.MaxHandshakeSize)
n, err := conn.Read(buf) n, err := conn.Read(buf)
if err != nil { if err != nil {
log.Errorf("failed to read message: %s", err) log.Errorf("failed to read message: %s", err)