Bugfix, p2p mode ok

This commit is contained in:
KusakabeSi 2021-08-23 16:39:04 +00:00
parent 88ef721c1d
commit 8a1126dcbf
22 changed files with 134 additions and 63 deletions

2
.vscode/launch.json vendored
View File

@ -10,7 +10,7 @@
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}",
"args":["-config","example_config/static_mode/n1.yaml","-mode","edge"],
"args":["-config","example_config/p2p_mode/n1.yaml","-mode","edge"/*,"-example"*/],
}
]
}

View File

@ -338,7 +338,7 @@ func NewDevice(tapDevice tap.Device, id config.Vertex, bind conn.Bind, logger *L
device.EdgeConfigPath = theconfigpath
device.EdgeConfig = econfig
device.DRoute = econfig.DynamicRoute
device.DupData = *fixed_time_cache.NewCache(path.S2TD(econfig.DynamicRoute.DupCheckTimeout))
device.DupData = *fixed_time_cache.NewCache(path.S2TD(econfig.DynamicRoute.DupCheckTimeout), false, path.S2TD(60))
device.event_tryendpoint = make(chan struct{}, 1<<6)
device.Event_save_config = make(chan struct{}, 1<<5)
device.LogTransit = econfig.LogLevel.LogTransit
@ -347,6 +347,7 @@ func NewDevice(tapDevice tap.Device, id config.Vertex, bind conn.Bind, logger *L
go device.RoutineRegister()
go device.RoutineSendPing()
go device.RoutineRecalculateNhTable()
go device.RoutineSpreadAllMyNeighbor()
}
// create queues

View File

@ -27,11 +27,12 @@ type Peer struct {
handshake Handshake
device *Device
endpoint conn.Endpoint
endpoint_trylist map[string]time.Time
endpoint_trylist sync.Map //map[string]time.Time
LastPingReceived time.Time
stopping sync.WaitGroup // routines pending stop
ID config.Vertex
ID config.Vertex
AskedForNeighbor bool
// These fields are accessed with atomic operations, which must be
// 64-bit aligned even on 32-bit platforms. Go guarantees that an
@ -102,8 +103,6 @@ func (device *Device) NewPeer(pk NoisePublicKey, id config.Vertex) (*Peer, error
peer.queue.outbound = newAutodrainingOutboundQueue(device)
peer.queue.inbound = newAutodrainingInboundQueue(device)
peer.queue.staged = make(chan *QueueOutboundElement, QueueStagedSize)
peer.endpoint_trylist = make(map[string]time.Time)
// map public key
_, ok := device.peers.keyMap[pk]
if ok {

View File

@ -446,6 +446,10 @@ func (peer *Peer) RoutineSequentialReceiver() {
}
peer.timersDataReceived()
if len(elem.packet) <= path.EgHeaderLen {
device.log.Errorf("Invalid EgHeader from peer %v", peer)
goto skip
}
EgHeader, err = path.NewEgHeader(elem.packet[0:path.EgHeaderLen]) // EG header
src_nodeID = EgHeader.GetSrc()
dst_nodeID = EgHeader.GetDst()
@ -530,7 +534,7 @@ func (peer *Peer) RoutineSequentialReceiver() {
fmt.Printf("Received MID:" + strconv.Itoa(int(EgHeader.GetMessageID())) + " From:" + peer.GetEndpointDstStr() + " " + device.sprint_received(packet_type, elem.packet[path.EgHeaderLen:]) + "\n")
}
}
err = device.process_received(packet_type, elem.packet[path.EgHeaderLen:])
err = device.process_received(packet_type, peer, elem.packet[path.EgHeaderLen:])
if err != nil {
device.log.Errorf(err.Error())
}
@ -539,6 +543,10 @@ func (peer *Peer) RoutineSequentialReceiver() {
if should_receive { // Write message to tap device
if packet_type == path.NornalPacket {
if len(elem.packet) <= path.EgHeaderLen+12 {
device.log.Errorf("Invalid normal packet from peer %v", peer)
goto skip
}
src_macaddr := tap.GetSrcMacAddr(elem.packet[path.EgHeaderLen:])
if !tap.IsBoardCast(src_macaddr) {
device.l2fib.Store(src_macaddr, src_nodeID) // Write to l2fib table

View File

@ -47,14 +47,17 @@ func (device *Device) BoardcastPacket(skip_list map[config.Vertex]bool, packet [
for node_id, _ := range skip_list {
send_list[node_id] = false
}
device.peers.RLock()
for node_id, should_send := range send_list {
if should_send {
device.SendPacket(device.peers.IDMap[node_id], packet, offset)
}
}
device.peers.RUnlock()
}
func (device *Device) SpreadPacket(skip_list map[config.Vertex]bool, packet []byte, offset int) { // Send packet to all peers no matter it is alive
device.peers.RLock()
for peer_id, peer_out := range device.peers.IDMap {
if _, ok := skip_list[peer_id]; ok {
if device.LogTransit {
@ -64,10 +67,12 @@ func (device *Device) SpreadPacket(skip_list map[config.Vertex]bool, packet []by
}
device.SendPacket(peer_out, packet, MessageTransportOffsetContent)
}
device.peers.RUnlock()
}
func (device *Device) TransitBoardcastPacket(src_nodeID config.Vertex, in_id config.Vertex, packet []byte, offset int) {
node_boardcast_list := device.graph.GetBoardcastThroughList(device.ID, in_id, src_nodeID)
device.peers.RLock()
for peer_id := range node_boardcast_list {
peer_out := device.peers.IDMap[peer_id]
if device.LogTransit {
@ -75,9 +80,11 @@ func (device *Device) TransitBoardcastPacket(src_nodeID config.Vertex, in_id con
}
device.SendPacket(peer_out, packet, offset)
}
device.peers.RUnlock()
}
func (device *Device) Send2Super(packet []byte, offset int) {
device.peers.RLock()
if device.DRoute.SuperNode.UseSuperNode {
for _, peer_out := range device.peers.SuperPeer {
if device.LogTransit {
@ -86,6 +93,7 @@ func (device *Device) Send2Super(packet []byte, offset int) {
device.SendPacket(peer_out, packet, offset)
}
}
device.peers.RUnlock()
}
func (device *Device) CheckNoDup(packet []byte) bool {
@ -97,7 +105,7 @@ func (device *Device) CheckNoDup(packet []byte) bool {
return !ok
}
func (device *Device) process_received(msg_type path.Usage, body []byte) (err error) {
func (device *Device) process_received(msg_type path.Usage, peer *Peer, body []byte) (err error) {
if device.IsSuperNode {
switch msg_type {
case path.Register:
@ -127,10 +135,10 @@ func (device *Device) process_received(msg_type path.Usage, body []byte) (err er
}
case path.PongPacket:
if content, err := path.ParsePongMsg(body); err == nil {
return device.process_pong(content)
return device.process_pong(peer, content)
}
case path.RequestPeer:
if content, err := path.ParseRequestPeerMsg(body); err == nil {
case path.QueryPeer:
if content, err := path.ParseQueryPeerMsg(body); err == nil {
return device.process_RequestPeerMsg(content)
}
case path.BoardcastPeer:
@ -166,8 +174,8 @@ func (device *Device) sprint_received(msg_type path.Usage, body []byte) (ret str
if content, err := path.ParsePongMsg(body); err == nil {
ret = content.ToString()
}
case path.RequestPeer:
if content, err := path.ParseRequestPeerMsg(body); err == nil {
case path.QueryPeer:
if content, err := path.ParseQueryPeerMsg(body); err == nil {
ret = content.ToString()
}
case path.BoardcastPeer:
@ -196,6 +204,7 @@ func (device *Device) process_ping(content path.PingMsg) error {
Dst_nodeID: device.ID,
Timediff: device.graph.GetCurrentTime().Sub(content.Time),
}
device.graph.UpdateLentancy(content.Src_nodeID, device.ID, PongMSG.Timediff, false)
body, err := path.GetByte(&PongMSG)
if err != nil {
return err
@ -218,9 +227,27 @@ func (device *Device) process_ping(content path.PingMsg) error {
return nil
}
func (device *Device) process_pong(content path.PongMsg) error {
func (device *Device) process_pong(peer *Peer, content path.PongMsg) error {
if device.DRoute.P2P.UseP2P {
device.graph.UpdateLentancy(content.Src_nodeID, content.Dst_nodeID, content.Timediff, false)
if !peer.AskedForNeighbor {
QueryPeerMsg := path.QueryPeerMsg{
Request_ID: uint32(device.ID),
}
body, err := path.GetByte(&QueryPeerMsg)
if err != nil {
return err
}
buf := make([]byte, path.EgHeaderLen+len(body))
header, err := path.NewEgHeader(buf[:path.EgHeaderLen])
header.SetSrc(device.ID)
header.SetTTL(200)
header.SetUsage(path.QueryPeer)
header.SetPacketLength(uint16(len(body)))
copy(buf[path.EgHeaderLen:], body)
device.SendPacket(peer, buf, MessageTransportOffsetContent)
peer.AskedForNeighbor = true
}
}
return nil
}
@ -277,7 +304,7 @@ func (device *Device) process_UpdatePeerMsg(content path.UpdatePeerMsg) error {
if thepeer.LastPingReceived.Add(path.S2TD(device.DRoute.P2P.PeerAliveTimeout)).Before(time.Now()) {
//Peer died, try to switch to this new endpoint
for url, _ := range peerinfo.Connurl {
thepeer.endpoint_trylist[url] = time.Time{} //another gorouting will process it
thepeer.endpoint_trylist.Store(url, time.Time{}) //another gorouting will process it
send_signal = true
}
}
@ -300,32 +327,36 @@ func (device *Device) RoutineSetEndpoint() {
for _, thepeer := range device.peers.IDMap {
if thepeer.LastPingReceived.Add(path.S2TD(device.DRoute.P2P.PeerAliveTimeout)).After(time.Now()) {
//Peer alives
for url := range thepeer.endpoint_trylist {
delete(thepeer.endpoint_trylist, url)
}
thepeer.endpoint_trylist.Range(func(key, value interface{}) bool {
thepeer.endpoint_trylist.Delete(key)
return true
})
} else {
//Peer died, try to switch to this new endpoint
for url, trytime := range thepeer.endpoint_trylist {
thepeer.endpoint_trylist.Range(func(key interface{}, value interface{}) bool {
url := key.(string)
trytime := value.(time.Time)
if trytime.Sub(time.Time{}) != time.Duration(0) && time.Now().Sub(trytime) > path.S2TD(device.DRoute.ConnTimeOut) {
delete(thepeer.endpoint_trylist, url)
thepeer.endpoint_trylist.Delete(key)
} else {
endpoint, err := device.Bind().ParseEndpoint(url) //trying to bind first url in the list and wait device.DRoute.P2P.PeerAliveTimeout seconds
if err != nil {
device.log.Errorf("Can't bind " + url)
delete(thepeer.endpoint_trylist, url)
thepeer.endpoint_trylist.Delete(url)
return true
}
if device.LogControl {
fmt.Println("Set endpoint to " + endpoint.DstToString() + " for NodeID:" + strconv.Itoa(int(thepeer.ID)))
}
thepeer.SetEndpointFromPacket(endpoint)
NextRun = true
thepeer.endpoint_trylist[url] = time.Now()
thepeer.endpoint_trylist.Store(key, time.Now())
//Send Ping message to it
packet, err := device.GeneratePingPacket(device.ID)
device.SendPacket(thepeer, packet, MessageTransportOffsetContent)
break
return false
}
}
return true
})
}
}
ClearChanLoop:
@ -404,7 +435,18 @@ func (device *Device) RoutineRecalculateNhTable() {
time.Sleep(device.graph.NodeReportTimeout)
}
}
}
func (device *Device) RoutineSpreadAllMyNeighbor() {
if !device.DRoute.P2P.UseP2P {
return
}
for {
device.process_RequestPeerMsg(path.QueryPeerMsg{
Request_ID: 0,
})
time.Sleep(path.S2TD(device.DRoute.P2P.SendPeerInterval))
}
}
func (device *Device) GeneratePingPacket(src_nodeID config.Vertex) ([]byte, error) {
@ -459,12 +501,17 @@ func (device *Device) process_UpdateNhTableMsg(content path.UpdateNhTableMsg) er
return nil
}
func (device *Device) process_RequestPeerMsg(content path.RequestPeerMsg) error {
func (device *Device) process_RequestPeerMsg(content path.QueryPeerMsg) error { //Send all my peers to all my peers
if device.DRoute.P2P.UseP2P {
device.peers.RLock()
for pubkey, peer := range device.peers.keyMap {
if peer.ID >= path.Special_NodeID {
continue
}
if peer.endpoint == nil {
continue
}
peer.handshake.mutex.RLock()
response := path.BoardcastPeerMsg{
Request_ID: content.Request_ID,
NodeID: peer.ID,
@ -472,6 +519,7 @@ func (device *Device) process_RequestPeerMsg(content path.RequestPeerMsg) error
PSKey: peer.handshake.presharedKey,
ConnURL: peer.endpoint.DstToString(),
}
peer.handshake.mutex.RUnlock()
body, err := path.GetByte(response)
if err != nil {
device.log.Errorf("Error at receivesendproc.go line221: ", err)
@ -487,6 +535,7 @@ func (device *Device) process_RequestPeerMsg(content path.RequestPeerMsg) error
copy(buf[path.EgHeaderLen:], body)
device.SpreadPacket(make(map[config.Vertex]bool), buf, MessageTransportOffsetContent)
}
device.peers.RUnlock()
}
return nil
}
@ -494,6 +543,9 @@ func (device *Device) process_RequestPeerMsg(content path.RequestPeerMsg) error
func (device *Device) process_BoardcastPeerMsg(content path.BoardcastPeerMsg) error {
if device.DRoute.P2P.UseP2P {
var sk NoisePublicKey
if bytes.Equal(content.PubKey[:], device.staticIdentity.publicKey[:]) {
return nil
}
copy(sk[:], content.PubKey[:])
thepeer := device.LookupPeer(sk)
if thepeer == nil { //not exist in local
@ -511,7 +563,8 @@ func (device *Device) process_BoardcastPeerMsg(content path.BoardcastPeerMsg) er
}
if thepeer.LastPingReceived.Add(path.S2TD(device.DRoute.P2P.PeerAliveTimeout)).Before(time.Now()) {
//Peer died, try to switch to this new endpoint
thepeer.endpoint_trylist[content.ConnURL] = time.Time{} //another gorouting will process it
thepeer.endpoint_trylist.Store(content.ConnURL, time.Time{}) //another gorouting will process it
device.event_tryendpoint <- struct{}{}
}
}

View File

@ -14,12 +14,12 @@ listenport: 3001
loglevel:
loglevel: normal`
logtransit: true
logcontrol: false
logcontrol: true
dynamicroute:
sendpinginterval: 20
dupchecktimeout: 40
conntimeout: 30
savenewpeers: true
savenewpeers: false
supernode:
usesupernode: false
connurlv4: 127.0.0.1:3000
@ -29,7 +29,7 @@ dynamicroute:
apiurl: http://127.0.0.1:3000/api
supernodeinfotimeout: 40
p2p:
usep2p: false
usep2p: true
sendpeerinterval: 20
peeralivetimeout: 30
graphrecalculatesetting:

View File

@ -14,12 +14,12 @@ listenport: 3002
loglevel:
loglevel: normal
logtransit: true
logcontrol: false
logcontrol: true
dynamicroute:
sendpinginterval: 20
dupchecktimeout: 40
conntimeout: 30
savenewpeers: true
savenewpeers: false
supernode:
usesupernode: false
connurlv4: 127.0.0.1:3000
@ -29,7 +29,7 @@ dynamicroute:
apiurl: http://127.0.0.1:3000/api
supernodeinfotimeout: 40
p2p:
usep2p: false
usep2p: true
sendpeerinterval: 20
peeralivetimeout: 30
graphrecalculatesetting:

View File

@ -14,12 +14,12 @@ listenport: 3003
loglevel:
loglevel: normal
logtransit: true
logcontrol: false
logcontrol: true
dynamicroute:
sendpinginterval: 20
dupchecktimeout: 40
conntimeout: 30
savenewpeers: true
savenewpeers: false
supernode:
usesupernode: false
connurlv4: 127.0.0.1:3000
@ -29,7 +29,7 @@ dynamicroute:
apiurl: http://127.0.0.1:3000/api
supernodeinfotimeout: 40
p2p:
usep2p: false
usep2p: true
sendpeerinterval: 20
peeralivetimeout: 30
graphrecalculatesetting:

View File

@ -14,12 +14,12 @@ listenport: 3004
loglevel:
loglevel: normal
logtransit: true
logcontrol: false
logcontrol: true
dynamicroute:
sendpinginterval: 20
dupchecktimeout: 40
conntimeout: 30
savenewpeers: true
savenewpeers: false
supernode:
usesupernode: false
connurlv4: 127.0.0.1:3000
@ -29,7 +29,7 @@ dynamicroute:
apiurl: http://127.0.0.1:3000/api
supernodeinfotimeout: 40
p2p:
usep2p: false
usep2p: true
sendpeerinterval: 20
peeralivetimeout: 30
graphrecalculatesetting:

View File

@ -14,12 +14,12 @@ listenport: 3005
loglevel:
loglevel: normal
logtransit: true
logcontrol: false
logcontrol: true
dynamicroute:
sendpinginterval: 20
dupchecktimeout: 40
conntimeout: 30
savenewpeers: true
savenewpeers: false
supernode:
usesupernode: false
connurlv4: 127.0.0.1:3000
@ -29,7 +29,7 @@ dynamicroute:
apiurl: http://127.0.0.1:3000/api
supernodeinfotimeout: 40
p2p:
usep2p: false
usep2p: true
sendpeerinterval: 20
peeralivetimeout: 30
graphrecalculatesetting:

View File

@ -14,12 +14,12 @@ listenport: 3006
loglevel:
loglevel: normal
logtransit: true
logcontrol: false
logcontrol: true
dynamicroute:
sendpinginterval: 20
dupchecktimeout: 40
conntimeout: 30
savenewpeers: true
savenewpeers: false
supernode:
usesupernode: false
connurlv4: 127.0.0.1:3000
@ -29,7 +29,7 @@ dynamicroute:
apiurl: http://127.0.0.1:3000/api
supernodeinfotimeout: 40
p2p:
usep2p: false
usep2p: true
sendpeerinterval: 20
peeralivetimeout: 30
graphrecalculatesetting:

View File

@ -14,7 +14,7 @@ listenport: 3001
loglevel:
loglevel: normal`
logtransit: true
logcontrol: false
logcontrol: true
dynamicroute:
sendpinginterval: 20
dupchecktimeout: 40

View File

@ -14,7 +14,7 @@ listenport: 3002
loglevel:
loglevel: normal
logtransit: true
logcontrol: false
logcontrol: true
dynamicroute:
sendpinginterval: 20
dupchecktimeout: 40

View File

@ -14,7 +14,7 @@ listenport: 3003
loglevel:
loglevel: normal
logtransit: true
logcontrol: false
logcontrol: true
dynamicroute:
sendpinginterval: 20
dupchecktimeout: 40

View File

@ -14,7 +14,7 @@ listenport: 3004
loglevel:
loglevel: normal
logtransit: true
logcontrol: false
logcontrol: true
dynamicroute:
sendpinginterval: 20
dupchecktimeout: 40

View File

@ -14,7 +14,7 @@ listenport: 3005
loglevel:
loglevel: normal
logtransit: true
logcontrol: false
logcontrol: true
dynamicroute:
sendpinginterval: 20
dupchecktimeout: 40

View File

@ -14,7 +14,7 @@ listenport: 3006
loglevel:
loglevel: normal
logtransit: true
logcontrol: false
logcontrol: true
dynamicroute:
sendpinginterval: 20
dupchecktimeout: 40

2
go.mod
View File

@ -3,7 +3,7 @@ module github.com/KusakabeSi/EtherGuardVPN
go 1.16
require (
github.com/KusakabeSi/go-cache v0.0.0-20210817164551-57817be43e28
github.com/KusakabeSi/go-cache v0.0.0-20210823132304-22b5b1d22b41
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57
gopkg.in/yaml.v2 v2.4.0

View File

@ -24,7 +24,7 @@ const (
PingPacket //Comes from other peer
PongPacket //Send to everyone, include server
RequestPeer
QueryPeer
BoardcastPeer
)

View File

@ -105,15 +105,15 @@ func ParsePongMsg(bin []byte) (StructPlace PongMsg, err error) {
return
}
type RequestPeerMsg struct {
type QueryPeerMsg struct {
Request_ID uint32 `struc:"uint32"`
}
func (c *RequestPeerMsg) ToString() string {
return "RequestPeerMsg Request_ID:" + strconv.Itoa(int(c.Request_ID))
func (c *QueryPeerMsg) ToString() string {
return "QueryPeerMsg Request_ID:" + strconv.Itoa(int(c.Request_ID))
}
func ParseRequestPeerMsg(bin []byte) (StructPlace RequestPeerMsg, err error) {
func ParseQueryPeerMsg(bin []byte) (StructPlace QueryPeerMsg, err error) {
var b bytes.Buffer
b.Write(bin)
d := gob.NewDecoder(&b)

View File

@ -46,7 +46,7 @@ type Fullroute struct {
type IG struct {
Vert map[config.Vertex]bool
edges map[config.Vertex]map[config.Vertex]Latency
edgelock sync.RWMutex
edgelock *sync.RWMutex
JitterTolerance float64
JitterToleranceMultiplier float64
NodeReportTimeout time.Duration
@ -66,6 +66,7 @@ func S2TD(secs float64) time.Duration {
func NewGraph(num_node int, IsSuperMode bool, theconfig config.GraphRecalculateSetting) *IG {
g := IG{
edgelock: &sync.RWMutex{},
JitterTolerance: theconfig.JitterTolerance,
JitterToleranceMultiplier: theconfig.JitterToleranceMultiplier,
NodeReportTimeout: S2TD(theconfig.NodeReportTimeout),
@ -126,13 +127,11 @@ func (g *IG) UpdateLentancy(u, v config.Vertex, dt time.Duration, checkchange bo
g.edgelock.Lock()
g.Vert[u] = true
g.Vert[v] = true
g.edgelock.Unlock()
w := float64(dt) / float64(time.Second)
if _, ok := g.edges[u]; !ok {
g.edgelock.Lock()
g.edges[u] = make(map[config.Vertex]Latency)
g.edgelock.Unlock()
}
g.edgelock.Unlock()
if g.ShouldUpdate(u, v, w) {
changed = g.RecalculateNhTable(checkchange)
}
@ -145,10 +144,16 @@ func (g *IG) UpdateLentancy(u, v config.Vertex, dt time.Duration, checkchange bo
return
}
func (g IG) Vertices() map[config.Vertex]bool {
return g.Vert
vr := make(map[config.Vertex]bool)
for k, v := range g.Vert { //copy a new list
vr[k] = v
}
return vr
}
func (g IG) Neighbors(v config.Vertex) (vs []config.Vertex) {
for k := range g.edges[v] {
g.edgelock.RLock()
defer g.edgelock.RUnlock()
for k := range g.edges[v] { //copy a new list
vs = append(vs, k)
}
return vs
@ -165,10 +170,14 @@ func (g IG) Next(u, v config.Vertex) *config.Vertex {
}
func (g IG) Weight(u, v config.Vertex) float64 {
g.edgelock.RLock()
defer g.edgelock.RUnlock()
if _, ok := g.edges[u]; !ok {
g.edgelock.RUnlock()
g.edgelock.Lock()
g.edges[u] = make(map[config.Vertex]Latency)
g.edgelock.Unlock()
g.edgelock.RLock()
return Infinity
}
if _, ok := g.edges[u][v]; !ok {

View File

@ -92,6 +92,7 @@ func (tap *StdIOTap) Events() chan Event {
func (tap *StdIOTap) Close() error {
tap.events <- EventDown
os.Stdin.Close()
os.Stdin.WriteString("end\n")
close(tap.events)
return nil
} // stops the device and closes the event channel