fix sendpacket race condition

This commit is contained in:
testscript 2023-02-21 20:51:28 +08:00
parent 1356780d7d
commit ec63f38ba2
9 changed files with 72 additions and 44 deletions

5
.vscode/launch.json vendored
View File

@ -22,7 +22,9 @@
"program": "${workspaceFolder}", "program": "${workspaceFolder}",
"buildFlags": "-tags 'novpp'", "buildFlags": "-tags 'novpp'",
"env": {"CGO_CFLAGS":"-I/usr/include/memif"}, "env": {"CGO_CFLAGS":"-I/usr/include/memif"},
"args":["-config","example_config/p2p_mode/EgNet_edge4.yaml","-mode","edge"/*,"-example"*/], "args":["-config","example_config/super_mode/EgNet_edge002.yaml","-mode","edge"/*,"-example"*/],
"asRoot": true,
"console": "integratedTerminal",
}, },
{ {
"name": "Launch GenCfg", "name": "Launch GenCfg",
@ -44,5 +46,6 @@
"env": {"CGO_CFLAGS":"-I/usr/include/memif"}, "env": {"CGO_CFLAGS":"-I/usr/include/memif"},
"args":["-config","example_config/static_mode/path.txt","-mode","solve"], "args":["-config","example_config/static_mode/path.txt","-mode","solve"],
} }
] ]
} }

View File

@ -77,6 +77,7 @@ type Device struct {
state_hashes mtypes.StateHash state_hashes mtypes.StateHash
event_tryendpoint chan struct{} event_tryendpoint chan struct{}
chan_send_packet chan *packet_send_params
EdgeConfigPath string EdgeConfigPath string
EdgeConfig *mtypes.EdgeConfig EdgeConfig *mtypes.EdgeConfig
@ -137,10 +138,9 @@ type IdAndTime struct {
// There are three states: down, up, closed. // There are three states: down, up, closed.
// Transitions: // Transitions:
// //
// down -----+ // down -----+
// ↑↓ ↓ // ↑↓ ↓
// up -> closed // up -> closed
//
type deviceState uint32 type deviceState uint32
//go:generate go run golang.org/x/tools/cmd/stringer -type deviceState -trimprefix=deviceState //go:generate go run golang.org/x/tools/cmd/stringer -type deviceState -trimprefix=deviceState
@ -355,6 +355,7 @@ func NewDevice(tapDevice tap.Device, id mtypes.Vertex, bind conn.Bind, logger *L
device.indexTable.Init() device.indexTable.Init()
device.PopulatePools() device.PopulatePools()
device.Chan_Device_Initialized = make(chan struct{}, 1<<5) device.Chan_Device_Initialized = make(chan struct{}, 1<<5)
device.chan_send_packet = make(chan *packet_send_params, 1<<15)
if IsSuperNode { if IsSuperNode {
device.SuperConfigPath = configpath device.SuperConfigPath = configpath
device.SuperConfig = sconfig device.SuperConfig = sconfig
@ -378,7 +379,7 @@ func NewDevice(tapDevice tap.Device, id mtypes.Vertex, bind conn.Bind, logger *L
device.SuperConfig.DampingFilterRadius = device.EdgeConfig.DynamicRoute.DampingFilterRadius device.SuperConfig.DampingFilterRadius = device.EdgeConfig.DynamicRoute.DampingFilterRadius
} }
go device.RoutineSendPacket()
go func() { go func() {
<-device.Chan_Device_Initialized <-device.Chan_Device_Initialized
if device.LogLevel.LogInternal { if device.LogLevel.LogInternal {

View File

@ -17,14 +17,14 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"golang.org/x/crypto/chacha20poly1305" "golang.org/x/crypto/chacha20poly1305"
"github.com/KusakabeSi/EtherGuard-VPN/conn" "github.com/KusakabeSi/EtherGuard-VPN/conn"
"github.com/KusakabeSi/EtherGuard-VPN/mtypes" "github.com/KusakabeSi/EtherGuard-VPN/mtypes"
"github.com/KusakabeSi/EtherGuard-VPN/path" "github.com/KusakabeSi/EtherGuard-VPN/path"
"github.com/KusakabeSi/EtherGuard-VPN/tap" "github.com/KusakabeSi/EtherGuard-VPN/tap"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
) )
type QueueHandshakeElement struct { type QueueHandshakeElement struct {
@ -563,12 +563,12 @@ func (peer *Peer) RoutineSequentialReceiver() {
} else { } else {
l2ttl = l2ttl - 1 l2ttl = l2ttl - 1
if dst_nodeID == mtypes.NodeID_Broadcast { //Regular transfer algorithm if dst_nodeID == mtypes.NodeID_Broadcast { //Regular transfer algorithm
device.TransitBoardcastPacket(src_nodeID, peer.ID, elem.Type, l2ttl, elem.packet, MessageTransportOffsetContent) go device.TransitBoardcastPacket(src_nodeID, peer.ID, elem.Type, l2ttl, elem.packet, MessageTransportOffsetContent)
} else if dst_nodeID == mtypes.NodeID_Spread { // Control Message will try send to every know node regardless the connectivity } else if dst_nodeID == mtypes.NodeID_Spread { // Control Message will try send to every know node regardless the connectivity
skip_list := make(map[mtypes.Vertex]bool) skip_list := make(map[mtypes.Vertex]bool)
skip_list[src_nodeID] = true //Don't send to conimg peer and source peer skip_list[src_nodeID] = true //Don't send to conimg peer and source peer
skip_list[peer.ID] = true skip_list[peer.ID] = true
device.SpreadPacket(skip_list, elem.Type, l2ttl, elem.packet, MessageTransportOffsetContent) go device.SpreadPacket(skip_list, elem.Type, l2ttl, elem.packet, MessageTransportOffsetContent)
} else { } else {
next_id := device.graph.Next(device.ID, dst_nodeID) next_id := device.graph.Next(device.ID, dst_nodeID)
@ -612,8 +612,10 @@ func (peer *Peer) RoutineSequentialReceiver() {
if device.LogLevel.LogNormal { if device.LogLevel.LogNormal {
packet_len := len(elem.packet) - path.EgHeaderLen packet_len := len(elem.packet) - path.EgHeaderLen
fmt.Printf("Normal: Recv Len:%v S:%v D:%v TTL:%v From:%v IP:%v:\n", strconv.Itoa(packet_len), src_nodeID.ToString(), dst_nodeID.ToString(), elem.TTL, peer.ID.ToString(), peer.GetEndpointDstStr()) fmt.Printf("Normal: Recv Len:%v S:%v D:%v TTL:%v From:%v IP:%v:\n", strconv.Itoa(packet_len), src_nodeID.ToString(), dst_nodeID.ToString(), elem.TTL, peer.ID.ToString(), peer.GetEndpointDstStr())
packet := gopacket.NewPacket(elem.packet[path.EgHeaderLen:], layers.LayerTypeEthernet, gopacket.Default) if device.LogLevel.DumpNormal {
fmt.Println(packet.Dump()) packet := gopacket.NewPacket(elem.packet[path.EgHeaderLen:], layers.LayerTypeEthernet, gopacket.Default)
fmt.Println(packet.Dump())
}
} }
src_macaddr := tap.GetSrcMacAddr(elem.packet[path.EgHeaderLen:]) src_macaddr := tap.GetSrcMacAddr(elem.packet[path.EgHeaderLen:])
if !tap.IsNotUnicast(src_macaddr) { if !tap.IsNotUnicast(src_macaddr) {

View File

@ -29,6 +29,14 @@ import (
"github.com/google/gopacket/layers" "github.com/google/gopacket/layers"
) )
type packet_send_params struct {
peer *Peer
usage path.Usage
ttl uint8
packet []byte
offset int
}
func (device *Device) SendPacket(peer *Peer, usage path.Usage, ttl uint8, packet []byte, offset int) { func (device *Device) SendPacket(peer *Peer, usage path.Usage, ttl uint8, packet []byte, offset int) {
if peer == nil { if peer == nil {
return return
@ -48,8 +56,10 @@ func (device *Device) SendPacket(peer *Peer, usage path.Usage, ttl uint8, packet
dst_nodeID := EgHeader.GetDst() dst_nodeID := EgHeader.GetDst()
packet_len := len(packet) - path.EgHeaderLen packet_len := len(packet) - path.EgHeaderLen
fmt.Printf("Normal: Send Len:%v S:%v D:%v TTL:%v To:%v IP:%v:\n", packet_len, device.ID.ToString(), dst_nodeID.ToString(), ttl, peer.ID.ToString(), peer.GetEndpointDstStr()) fmt.Printf("Normal: Send Len:%v S:%v D:%v TTL:%v To:%v IP:%v:\n", packet_len, device.ID.ToString(), dst_nodeID.ToString(), ttl, peer.ID.ToString(), peer.GetEndpointDstStr())
packet := gopacket.NewPacket(packet[path.EgHeaderLen:], layers.LayerTypeEthernet, gopacket.Default) if device.LogLevel.DumpNormal {
fmt.Println(packet.Dump()) packet_dump := gopacket.NewPacket(packet[path.EgHeaderLen:], layers.LayerTypeEthernet, gopacket.Default)
fmt.Println(packet_dump.Dump())
}
} }
} }
if device.LogLevel.LogControl { if device.LogLevel.LogControl {
@ -63,16 +73,32 @@ func (device *Device) SendPacket(peer *Peer, usage path.Usage, ttl uint8, packet
} }
} }
var elem *QueueOutboundElement device.chan_send_packet <- &packet_send_params{
elem = device.NewOutboundElement() peer: peer,
copy(elem.buffer[offset:offset+len(packet)], packet) usage: usage,
elem.Type = usage ttl: ttl,
elem.TTL = ttl packet: packet,
elem.packet = elem.buffer[offset : offset+len(packet)] offset: offset,
if peer.isRunning.Get() { }
peer.StagePacket(elem) }
elem = nil
peer.SendStagedPackets() func (device *Device) RoutineSendPacket() {
for {
var elem *QueueOutboundElement
elem = device.NewOutboundElement()
params := <-device.chan_send_packet
offset := params.offset
packet := params.packet
peer := params.peer
copy(elem.buffer[offset:offset+len(packet)], packet)
elem.Type = params.usage
elem.TTL = params.ttl
elem.packet = elem.buffer[offset : offset+len(packet)]
if peer.isRunning.Get() {
peer.StagePacket(elem)
elem = nil
peer.SendStagedPackets()
}
} }
} }
@ -82,6 +108,7 @@ func (device *Device) BoardcastPacket(skip_list map[mtypes.Vertex]bool, usage pa
send_list[node_id] = false send_list[node_id] = false
} }
device.peers.RLock() device.peers.RLock()
fmt.Printf("Transit: Boardcast to %v\n", send_list)
for node_id, should_send := range send_list { for node_id, should_send := range send_list {
if should_send { if should_send {
peer_out := device.peers.IDMap[node_id] peer_out := device.peers.IDMap[node_id]

View File

@ -219,15 +219,11 @@ func (device *Device) RoutineReadFromTUN() {
device.log.Verbosef("Routine: TUN reader - started") device.log.Verbosef("Routine: TUN reader - started")
var elem *QueueOutboundElement elem := &QueueOutboundElement{
buffer: &[MaxMessageSize]byte{},
}
for { for {
if elem != nil {
device.PutMessageBuffer(elem.buffer)
device.PutOutboundElement(elem)
}
elem = device.NewOutboundElement()
// read packet // read packet
offset := MessageTransportHeaderSize offset := MessageTransportHeaderSize
@ -288,14 +284,12 @@ func (device *Device) RoutineReadFromTUN() {
if device.LogLevel.LogNormal { if device.LogLevel.LogNormal {
packet_len := len(elem.packet) - path.EgHeaderLen packet_len := len(elem.packet) - path.EgHeaderLen
fmt.Printf("Normal: Send Len:%v S:%v D:%v TTL:%v To:%v IP:%v:\n", packet_len, device.ID.ToString(), dst_nodeID.ToString(), elem.TTL, peer.ID.ToString(), peer.GetEndpointDstStr()) fmt.Printf("Normal: Send Len:%v S:%v D:%v TTL:%v To:%v IP:%v:\n", packet_len, device.ID.ToString(), dst_nodeID.ToString(), elem.TTL, peer.ID.ToString(), peer.GetEndpointDstStr())
packet := gopacket.NewPacket(elem.packet[path.EgHeaderLen:], layers.LayerTypeEthernet, gopacket.Default) if device.LogLevel.DumpNormal {
fmt.Println(packet.Dump()) packet_dump := gopacket.NewPacket(elem.packet[path.EgHeaderLen:], layers.LayerTypeEthernet, gopacket.Default)
} fmt.Println(packet_dump.Dump())
if peer.isRunning.Get() { }
peer.StagePacket(elem)
elem = nil
peer.SendStagedPackets()
} }
device.SendPacket(peer, elem.Type, elem.TTL, elem.packet, offset)
} }
} else { } else {
device.BoardcastPacket(make(map[mtypes.Vertex]bool, 0), elem.Type, elem.TTL, elem.packet, offset) device.BoardcastPacket(make(map[mtypes.Vertex]bool, 0), elem.Type, elem.TTL, elem.packet, offset)

Binary file not shown.

View File

@ -400,9 +400,9 @@ To avoid this issue, please use the external IP of the supernode in the edge con
## Quick start ## Quick start
Run this example_config (please open three terminals): Run this example_config (please open three terminals):
```bash ```bash
./etherguard-go -config example_config/super_mode/Node_super.yaml -mode super ./etherguard-go -config example_config/super_mode/EgNet_super.yaml -mode super
./etherguard-go -config example_config/super_mode/Node_edge001.yaml -mode edge ./etherguard-go -config example_config/super_mode/EgNet_edge001.yaml -mode edge
./etherguard-go -config example_config/super_mode/Node_edge002.yaml -mode edge ./etherguard-go -config example_config/super_mode/EgNet_edge002.yaml -mode edge
``` ```
Because it is in `stdio` mode, stdin will be read into the VPN network Because it is in `stdio` mode, stdin will be read into the VPN network
Please type in one of the edge windows Please type in one of the edge windows

View File

@ -414,9 +414,9 @@ Relay node其實也是一個edge node只不過被設定成為interface=dummy
在**不同terminal**分別執行以下命令 在**不同terminal**分別執行以下命令
```bash ```bash
./etherguard-go -config example_config/super_mode/Node_super.yaml -mode super ./etherguard-go -config example_config/super_mode/EgNet_super.yaml -mode super
./etherguard-go -config example_config/super_mode/Node_edge001.yaml -mode edge ./etherguard-go -config example_config/super_mode/EgNet_edge001.yaml -mode edge
./etherguard-go -config example_config/super_mode/Node_edge002.yaml -mode edge ./etherguard-go -config example_config/super_mode/EgNet_edge002.yaml -mode edge
``` ```
因為是stdio模式stdin會讀入VPN網路 因為是stdio模式stdin會讀入VPN網路
請在其中一個edge視窗中鍵入 請在其中一個edge視窗中鍵入

View File

@ -108,6 +108,7 @@ type LoggerInfo struct {
LogLevel string `yaml:"LogLevel"` LogLevel string `yaml:"LogLevel"`
LogTransit bool `yaml:"LogTransit"` LogTransit bool `yaml:"LogTransit"`
LogNormal bool `yaml:"LogNormal"` LogNormal bool `yaml:"LogNormal"`
DumpNormal bool `yaml:"DumpNormal"`
LogControl bool `yaml:"LogControl"` LogControl bool `yaml:"LogControl"`
LogInternal bool `yaml:"LogInternal"` LogInternal bool `yaml:"LogInternal"`
LogNTP bool `yaml:"LogNTP"` LogNTP bool `yaml:"LogNTP"`