mirror of
https://github.com/netbirdio/netbird.git
synced 2025-06-21 18:22:37 +02:00
Single thread on server sending
This commit is contained in:
parent
0a59f12012
commit
f0eb004582
@ -371,6 +371,7 @@ func (c *Client) writeTo(id string, dstID []byte, payload []byte) (int, error) {
|
|||||||
return 0, io.EOF
|
return 0, io.EOF
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
// todo: use buffer pool instead of create new transport msg.
|
||||||
msg, err := messages.MarshalTransportMsg(dstID, payload)
|
msg, err := messages.MarshalTransportMsg(dstID, payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to marshal transport message: %s", err)
|
log.Errorf("failed to marshal transport message: %s", err)
|
||||||
|
@ -15,6 +15,11 @@ import (
|
|||||||
ws "github.com/netbirdio/netbird/relay/server/listener/wsnhooyr"
|
ws "github.com/netbirdio/netbird/relay/server/listener/wsnhooyr"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
bufferSize = 8820
|
||||||
|
maxHandshakeSize = 90
|
||||||
|
)
|
||||||
|
|
||||||
type Server struct {
|
type Server struct {
|
||||||
store *Store
|
store *Store
|
||||||
storeMu sync.RWMutex
|
storeMu sync.RWMutex
|
||||||
@ -95,8 +100,8 @@ func (r *Server) accept(conn net.Conn) {
|
|||||||
peer.Log.Infof("relay connection closed")
|
peer.Log.Infof("relay connection closed")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
buf := make([]byte, bufferSize)
|
||||||
for {
|
for {
|
||||||
buf := make([]byte, 1500) // todo: optimize buffer size
|
|
||||||
n, err := conn.Read(buf)
|
n, err := conn.Read(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != io.EOF {
|
if err != io.EOF {
|
||||||
@ -105,36 +110,36 @@ func (r *Server) accept(conn net.Conn) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
msgType, err := messages.DetermineClientMsgType(buf[:n])
|
msg := buf[:n]
|
||||||
|
|
||||||
|
msgType, err := messages.DetermineClientMsgType(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
peer.Log.Errorf("failed to determine message type: %s", err)
|
peer.Log.Errorf("failed to determine message type: %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
switch msgType {
|
switch msgType {
|
||||||
case messages.MsgTypeTransport:
|
case messages.MsgTypeTransport:
|
||||||
msg := buf[:n]
|
|
||||||
peerID, err := messages.UnmarshalTransportID(msg)
|
peerID, err := messages.UnmarshalTransportID(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
peer.Log.Errorf("failed to unmarshal transport message: %s", err)
|
peer.Log.Errorf("failed to unmarshal transport message: %s", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
go func() {
|
|
||||||
stringPeerID := messages.HashIDToString(peerID)
|
stringPeerID := messages.HashIDToString(peerID)
|
||||||
dp, ok := r.store.Peer(stringPeerID)
|
dp, ok := r.store.Peer(stringPeerID)
|
||||||
if !ok {
|
if !ok {
|
||||||
peer.Log.Errorf("peer not found: %s", stringPeerID)
|
peer.Log.Errorf("peer not found: %s", stringPeerID)
|
||||||
return
|
continue
|
||||||
}
|
}
|
||||||
err := messages.UpdateTransportMsg(msg, peer.ID())
|
err = messages.UpdateTransportMsg(msg, peer.ID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
peer.Log.Errorf("failed to update transport message: %s", err)
|
peer.Log.Errorf("failed to update transport message: %s", err)
|
||||||
return
|
continue
|
||||||
}
|
}
|
||||||
|
peer.Log.Infof("write transport msg!!!!")
|
||||||
_, err = dp.conn.Write(msg)
|
_, err = dp.conn.Write(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
peer.Log.Errorf("failed to write transport message to: %s", dp.String())
|
peer.Log.Errorf("failed to write transport message to: %s", dp.String())
|
||||||
}
|
}
|
||||||
}()
|
|
||||||
case messages.MsgClose:
|
case messages.MsgClose:
|
||||||
peer.Log.Infof("peer disconnected gracefully")
|
peer.Log.Infof("peer disconnected gracefully")
|
||||||
_ = conn.Close()
|
_ = conn.Close()
|
||||||
@ -163,7 +168,7 @@ func (r *Server) sendCloseMsgs() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func handShake(conn net.Conn) (*Peer, error) {
|
func handShake(conn net.Conn) (*Peer, error) {
|
||||||
buf := make([]byte, 1500)
|
buf := make([]byte, maxHandshakeSize)
|
||||||
n, err := conn.Read(buf)
|
n, err := conn.Read(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to read message: %s", err)
|
log.Errorf("failed to read message: %s", err)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user