Fix bug: remove unnecessary nhTable update

This commit is contained in:
KusakabeSi 2021-08-25 09:18:54 +00:00
parent 89f3069e7f
commit eae0dc1aa5
9 changed files with 69 additions and 52 deletions

View File

@ -495,7 +495,7 @@ func (peer *Peer) RoutineSequentialReceiver() {
should_process = true should_process = true
} }
default: default:
if _, ok := device.graph.NhTable[device.ID][dst_nodeID]; ok { if device.graph.Next(device.ID, dst_nodeID) != nil {
should_transfer = true should_transfer = true
} else { } else {
device.log.Verbosef("No route to peer ID %v", dst_nodeID) device.log.Verbosef("No route to peer ID %v", dst_nodeID)
@ -517,12 +517,14 @@ func (peer *Peer) RoutineSequentialReceiver() {
device.SpreadPacket(skip_list, elem.packet, MessageTransportOffsetContent) device.SpreadPacket(skip_list, elem.packet, MessageTransportOffsetContent)
} else { } else {
next_id := *device.graph.NhTable[device.ID][dst_nodeID] next_id := device.graph.Next(device.ID, dst_nodeID)
peer_out = device.peers.IDMap[next_id] if next_id != nil {
if device.LogLevel.LogTransit { peer_out = device.peers.IDMap[*next_id]
fmt.Printf("Transfer packet from %d through %d to %d\n", peer.ID, device.ID, peer_out.ID) if device.LogLevel.LogTransit {
fmt.Printf("Transfer packet from %d through %d to %d\n", peer.ID, device.ID, peer_out.ID)
}
device.SendPacket(peer_out, elem.packet, MessageTransportOffsetContent)
} }
device.SendPacket(peer_out, elem.packet, MessageTransportOffsetContent)
} }
} }
} }
@ -531,7 +533,7 @@ func (peer *Peer) RoutineSequentialReceiver() {
if packet_type != path.NornalPacket { if packet_type != path.NornalPacket {
if device.LogLevel.LogControl { if device.LogLevel.LogControl {
if peer.GetEndpointDstStr() != "" { if peer.GetEndpointDstStr() != "" {
fmt.Printf("Received MID:" + strconv.Itoa(int(EgHeader.GetMessageID())) + " From:" + peer.GetEndpointDstStr() + " " + device.sprint_received(packet_type, elem.packet[path.EgHeaderLen:]) + "\n") fmt.Println("Received MID:" + strconv.Itoa(int(EgHeader.GetMessageID())) + " From:" + peer.GetEndpointDstStr() + " " + device.sprint_received(packet_type, elem.packet[path.EgHeaderLen:]))
} }
} }
err = device.process_received(packet_type, peer, elem.packet[path.EgHeaderLen:]) err = device.process_received(packet_type, peer, elem.packet[path.EgHeaderLen:])

View File

@ -2,6 +2,7 @@ package device
import ( import (
"bytes" "bytes"
"encoding/base64"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -33,7 +34,7 @@ func (device *Device) SendPacket(peer *Peer, packet []byte, offset int) {
device.MsgCount += 1 device.MsgCount += 1
EgHeader.SetMessageID(device.MsgCount) EgHeader.SetMessageID(device.MsgCount)
if peer.GetEndpointDstStr() != "" { if peer.GetEndpointDstStr() != "" {
fmt.Printf("Send MID:" + strconv.Itoa(int(device.MsgCount)) + " To:" + peer.GetEndpointDstStr() + " " + device.sprint_received(EgHeader.GetUsage(), packet[path.EgHeaderLen:]) + "\n") fmt.Println("Send MID:" + strconv.Itoa(int(device.MsgCount)) + " To:" + peer.GetEndpointDstStr() + " " + device.sprint_received(EgHeader.GetUsage(), packet[path.EgHeaderLen:]))
} }
} }
} }
@ -211,7 +212,9 @@ func (device *Device) process_ping(content path.PingMsg) error {
Dst_nodeID: device.ID, Dst_nodeID: device.ID,
Timediff: device.graph.GetCurrentTime().Sub(content.Time), Timediff: device.graph.GetCurrentTime().Sub(content.Time),
} }
device.graph.UpdateLentancy(content.Src_nodeID, device.ID, PongMSG.Timediff, false) if device.DRoute.P2P.UseP2P && time.Now().After(device.graph.NhTableExpire) {
device.graph.UpdateLentancy(content.Src_nodeID, device.ID, PongMSG.Timediff, false)
}
body, err := path.GetByte(&PongMSG) body, err := path.GetByte(&PongMSG)
if err != nil { if err != nil {
return err return err
@ -236,7 +239,9 @@ func (device *Device) process_ping(content path.PingMsg) error {
func (device *Device) process_pong(peer *Peer, content path.PongMsg) error { func (device *Device) process_pong(peer *Peer, content path.PongMsg) error {
if device.DRoute.P2P.UseP2P { if device.DRoute.P2P.UseP2P {
device.graph.UpdateLentancy(content.Src_nodeID, content.Dst_nodeID, content.Timediff, false) if time.Now().After(device.graph.NhTableExpire) {
device.graph.UpdateLentancy(content.Src_nodeID, content.Dst_nodeID, content.Timediff, false)
}
if !peer.AskedForNeighbor { if !peer.AskedForNeighbor {
QueryPeerMsg := path.QueryPeerMsg{ QueryPeerMsg := path.QueryPeerMsg{
Request_ID: uint32(device.ID), Request_ID: uint32(device.ID),
@ -302,6 +307,9 @@ func (device *Device) process_UpdatePeerMsg(content path.UpdatePeerMsg) error {
} }
thepeer := device.LookupPeer(sk) thepeer := device.LookupPeer(sk)
if thepeer == nil { //not exist in local if thepeer == nil { //not exist in local
if device.LogLevel.LogControl {
fmt.Println("Add new peer to local ID:" + peerinfo.NodeID.ToString() + " PubKey:" + pubkey)
}
if device.graph.Weight(device.ID, peerinfo.NodeID) == path.Infinity { // add node to graph if device.graph.Weight(device.ID, peerinfo.NodeID) == path.Infinity { // add node to graph
device.graph.UpdateLentancy(device.ID, peerinfo.NodeID, path.S2TD(path.Infinity), false) device.graph.UpdateLentancy(device.ID, peerinfo.NodeID, path.S2TD(path.Infinity), false)
} }
@ -508,6 +516,10 @@ func (device *Device) GeneratePingPacket(src_nodeID config.Vertex) ([]byte, erro
func (device *Device) process_UpdateNhTableMsg(content path.UpdateNhTableMsg) error { func (device *Device) process_UpdateNhTableMsg(content path.UpdateNhTableMsg) error {
if device.DRoute.SuperNode.UseSuperNode { if device.DRoute.SuperNode.UseSuperNode {
if bytes.Equal(device.graph.NhTableHash[:], content.State_hash[:]) { if bytes.Equal(device.graph.NhTableHash[:], content.State_hash[:]) {
if device.LogLevel.LogControl {
fmt.Println("Same State_hash, skip download nhTable")
}
device.graph.NhTableExpire = time.Now().Add(device.graph.SuperNodeInfoTimeout)
return nil return nil
} }
var NhTable config.NextHopTable var NhTable config.NextHopTable
@ -595,6 +607,9 @@ func (device *Device) process_BoardcastPeerMsg(peer *Peer, content path.Boardcas
copy(sk[:], content.PubKey[:]) copy(sk[:], content.PubKey[:])
thepeer := device.LookupPeer(sk) thepeer := device.LookupPeer(sk)
if thepeer == nil { //not exist in local if thepeer == nil { //not exist in local
if device.LogLevel.LogControl {
fmt.Println("Add new peer to local ID:" + content.NodeID.ToString() + " PubKey:" + base64.StdEncoding.EncodeToString(content.PubKey[:]))
}
if device.graph.Weight(device.ID, content.NodeID) == path.Infinity { // add node to graph if device.graph.Weight(device.ID, content.NodeID) == path.Infinity { // add node to graph
device.graph.UpdateLentancy(device.ID, content.NodeID, path.S2TD(path.Infinity), false) device.graph.UpdateLentancy(device.ID, content.NodeID, path.S2TD(path.Infinity), false)
} }

View File

@ -267,20 +267,22 @@ func (device *Device) RoutineReadFromTUN() {
if dst_nodeID != config.Boardcast { if dst_nodeID != config.Boardcast {
var peer *Peer var peer *Peer
next_id := *device.graph.NhTable[device.ID][dst_nodeID] next_id := device.graph.Next(device.ID, dst_nodeID)
peer = device.peers.IDMap[next_id] if next_id != nil {
if peer == nil { peer = device.peers.IDMap[*next_id]
continue if peer == nil {
} continue
if device.LogLevel.LogNormal { }
if device.LogLevel.LogNormal { if device.LogLevel.LogNormal {
fmt.Println("Send Normal packet To:" + peer.GetEndpointDstStr() + " SrcID:" + device.ID.ToString() + " DstID:" + dst_nodeID.ToString() + " Len:" + strconv.Itoa(len(elem.packet))) if device.LogLevel.LogNormal {
fmt.Println("Send Normal packet To:" + peer.GetEndpointDstStr() + " SrcID:" + device.ID.ToString() + " DstID:" + dst_nodeID.ToString() + " Len:" + strconv.Itoa(len(elem.packet)))
}
}
if peer.isRunning.Get() {
peer.StagePacket(elem)
elem = nil
peer.SendStagedPackets()
} }
}
if peer.isRunning.Get() {
peer.StagePacket(elem)
elem = nil
peer.SendStagedPackets()
} }
} else { } else {
device.BoardcastPacket(make(map[config.Vertex]bool, 0), elem.packet, offset) device.BoardcastPacket(make(map[config.Vertex]bool, 0), elem.packet, offset)

View File

@ -1,5 +1,5 @@
interface: interface:
itype: udpsock itype: stdio
name: tap1 name: tap1
vppifaceid: 1 vppifaceid: 1
vppbridgeid: 0 vppbridgeid: 0
@ -15,7 +15,7 @@ listenport: 3001
loglevel: loglevel:
loglevel: normal loglevel: normal
logtransit: true logtransit: true
logcontrol: false logcontrol: true
lognormal: true lognormal: true
dynamicroute: dynamicroute:
sendpinginterval: 20 sendpinginterval: 20
@ -26,7 +26,7 @@ dynamicroute:
usesupernode: true usesupernode: true
connurlv4: 127.0.0.1:3000 connurlv4: 127.0.0.1:3000
pubkeyv4: LJ8KKacUcIoACTGB/9Ed9w0osrJ3WWeelzpL2u4oUic= pubkeyv4: LJ8KKacUcIoACTGB/9Ed9w0osrJ3WWeelzpL2u4oUic=
connurlv6: '[::1]:3000' connurlv6: ''
pubkeyv6: HCfL6YJtpJEGHTlJ2LgVXIWKB/K95P57LHTJ42ZG8VI= pubkeyv6: HCfL6YJtpJEGHTlJ2LgVXIWKB/K95P57LHTJ42ZG8VI=
apiurl: http://127.0.0.1:3000/api apiurl: http://127.0.0.1:3000/api
supernodeinfotimeout: 40 supernodeinfotimeout: 40

View File

@ -15,7 +15,7 @@ listenport: 3002
loglevel: loglevel:
loglevel: normal loglevel: normal
logtransit: true logtransit: true
logcontrol: false logcontrol: true
lognormal: true lognormal: true
dynamicroute: dynamicroute:
sendpinginterval: 20 sendpinginterval: 20
@ -26,7 +26,7 @@ dynamicroute:
usesupernode: true usesupernode: true
connurlv4: 127.0.0.1:3000 connurlv4: 127.0.0.1:3000
pubkeyv4: LJ8KKacUcIoACTGB/9Ed9w0osrJ3WWeelzpL2u4oUic= pubkeyv4: LJ8KKacUcIoACTGB/9Ed9w0osrJ3WWeelzpL2u4oUic=
connurlv6: '[::1]:3000' connurlv6: ''
pubkeyv6: HCfL6YJtpJEGHTlJ2LgVXIWKB/K95P57LHTJ42ZG8VI= pubkeyv6: HCfL6YJtpJEGHTlJ2LgVXIWKB/K95P57LHTJ42ZG8VI=
apiurl: http://127.0.0.1:3000/api apiurl: http://127.0.0.1:3000/api
supernodeinfotimeout: 40 supernodeinfotimeout: 40

View File

@ -287,15 +287,14 @@ func PushNhTable() {
header.SetTTL(0) header.SetTTL(0)
header.SetUsage(path.UpdateNhTable) header.SetUsage(path.UpdateNhTable)
copy(buf[path.EgHeaderLen:], body) copy(buf[path.EgHeaderLen:], body)
for pkstr, peerstate := range http_PeerState { for pkstr, _ := range http_PeerState {
if peerstate.NhTableState != http_NhTable_Hash { if peer := http_device4.LookupPeerByStr(pkstr); peer != nil && peer.GetEndpointDstStr() != "" {
if peer := http_device4.LookupPeerByStr(pkstr); peer != nil { http_device4.SendPacket(peer, buf, device.MessageTransportOffsetContent)
http_device4.SendPacket(peer, buf, device.MessageTransportOffsetContent)
}
if peer := http_device6.LookupPeerByStr(pkstr); peer != nil {
http_device6.SendPacket(peer, buf, device.MessageTransportOffsetContent)
}
} }
if peer := http_device6.LookupPeerByStr(pkstr); peer != nil && peer.GetEndpointDstStr() != "" {
http_device6.SendPacket(peer, buf, device.MessageTransportOffsetContent)
}
} }
} }

View File

@ -2,6 +2,7 @@ package path
import ( import (
"bytes" "bytes"
"encoding/base64"
"encoding/gob" "encoding/gob"
"strconv" "strconv"
"time" "time"
@ -27,7 +28,7 @@ type RegisterMsg struct {
} }
func (c *RegisterMsg) ToString() string { func (c *RegisterMsg) ToString() string {
return "RegisterMsg Node_id:" + c.Node_id.ToString() return "RegisterMsg Node_id:" + c.Node_id.ToString() + " Name:" + c.Name + " PeerHash" + base64.StdEncoding.EncodeToString(c.PeerStateHash[:]) + " NhHash:" + base64.StdEncoding.EncodeToString(c.NhStateHash[:])
} }
func ParseRegisterMsg(bin []byte) (StructPlace RegisterMsg, err error) { func ParseRegisterMsg(bin []byte) (StructPlace RegisterMsg, err error) {

View File

@ -46,7 +46,7 @@ type IG struct {
RecalculateCoolDown time.Duration RecalculateCoolDown time.Duration
RecalculateTime time.Time RecalculateTime time.Time
dlTable config.DistTable dlTable config.DistTable
NhTable config.NextHopTable nhTable config.NextHopTable
NhTableHash [32]byte NhTableHash [32]byte
NhTableExpire time.Time NhTableExpire time.Time
IsSuperMode bool IsSuperMode bool
@ -115,7 +115,7 @@ func (g *IG) RecalculateNhTable(checkchange bool) (changed bool) {
} }
} }
} }
g.dlTable, g.NhTable = dist, next g.dlTable, g.nhTable = dist, next
g.NhTableExpire = time.Now().Add(g.NodeReportTimeout) g.NhTableExpire = time.Now().Add(g.NodeReportTimeout)
g.RecalculateTime = time.Now() g.RecalculateTime = time.Now()
} }
@ -130,16 +130,14 @@ func (g *IG) UpdateLentancy(u, v config.Vertex, dt time.Duration, checkchange bo
if _, ok := g.edges[u]; !ok { if _, ok := g.edges[u]; !ok {
g.edges[u] = make(map[config.Vertex]Latency) g.edges[u] = make(map[config.Vertex]Latency)
} }
g.edgelock.Unlock()
if g.ShouldUpdate(u, v, w) {
changed = g.RecalculateNhTable(checkchange)
}
g.edgelock.Lock()
g.edges[u][v] = Latency{ g.edges[u][v] = Latency{
ping: w, ping: w,
time: time.Now(), time: time.Now(),
} }
g.edgelock.Unlock() g.edgelock.Unlock()
if g.ShouldUpdate(u, v, w) {
changed = g.RecalculateNhTable(checkchange)
}
return return
} }
func (g IG) Vertices() map[config.Vertex]bool { func (g IG) Vertices() map[config.Vertex]bool {
@ -161,13 +159,13 @@ func (g IG) Neighbors(v config.Vertex) (vs []config.Vertex) {
} }
func (g IG) Next(u, v config.Vertex) *config.Vertex { func (g IG) Next(u, v config.Vertex) *config.Vertex {
if _, ok := g.NhTable[u]; !ok { if _, ok := g.nhTable[u]; !ok {
return nil return nil
} }
if _, ok := g.NhTable[u][v]; !ok { if _, ok := g.nhTable[u][v]; !ok {
return nil return nil
} }
return g.NhTable[u][v] return g.nhTable[u][v]
} }
func (g IG) Weight(u, v config.Vertex) float64 { func (g IG) Weight(u, v config.Vertex) float64 {
@ -238,7 +236,7 @@ func Path(u, v config.Vertex, next config.NextHopTable) (path []config.Vertex) {
} }
func (g *IG) SetNHTable(nh config.NextHopTable, table_hash [32]byte) { // set nhTable from supernode func (g *IG) SetNHTable(nh config.NextHopTable, table_hash [32]byte) { // set nhTable from supernode
g.NhTable = nh g.nhTable = nh
g.NhTableHash = table_hash g.NhTableHash = table_hash
g.NhTableExpire = time.Now().Add(g.SuperNodeInfoTimeout) g.NhTableExpire = time.Now().Add(g.SuperNodeInfoTimeout)
} }
@ -247,7 +245,7 @@ func (g *IG) GetNHTable(checkChange bool) config.NextHopTable {
if time.Now().After(g.NhTableExpire) { if time.Now().After(g.NhTableExpire) {
g.RecalculateNhTable(checkChange) g.RecalculateNhTable(checkChange)
} }
return g.NhTable return g.nhTable
} }
func (g *IG) GetDtst() config.DistTable { func (g *IG) GetDtst() config.DistTable {
@ -270,7 +268,7 @@ func (g *IG) GetEdges() (edges map[config.Vertex]map[config.Vertex]float64) {
func (g *IG) GetBoardcastList(id config.Vertex) (tosend map[config.Vertex]bool) { func (g *IG) GetBoardcastList(id config.Vertex) (tosend map[config.Vertex]bool) {
tosend = make(map[config.Vertex]bool) tosend = make(map[config.Vertex]bool)
for _, element := range g.NhTable[id] { for _, element := range g.nhTable[id] {
tosend[*element] = true tosend[*element] = true
} }
return return
@ -279,7 +277,7 @@ func (g *IG) GetBoardcastList(id config.Vertex) (tosend map[config.Vertex]bool)
func (g *IG) GetBoardcastThroughList(self_id config.Vertex, in_id config.Vertex, src_id config.Vertex) (tosend map[config.Vertex]bool) { func (g *IG) GetBoardcastThroughList(self_id config.Vertex, in_id config.Vertex, src_id config.Vertex) (tosend map[config.Vertex]bool) {
tosend = make(map[config.Vertex]bool) tosend = make(map[config.Vertex]bool)
for check_id, _ := range g.GetBoardcastList(self_id) { for check_id, _ := range g.GetBoardcastList(self_id) {
for _, path_node := range Path(src_id, check_id, g.NhTable) { for _, path_node := range Path(src_id, check_id, g.nhTable) {
if path_node == self_id && check_id != in_id { if path_node == self_id && check_id != in_id {
tosend[check_id] = true tosend[check_id] = true
continue continue

View File

@ -134,5 +134,5 @@ func (tap *StdIOTap) Close() error {
os.Stdin.Close() os.Stdin.Close()
os.Stdin.WriteString("end\n") os.Stdin.WriteString("end\n")
close(tap.events) close(tap.events)
return nil panic("No solution for this issue: https://stackoverflow.com/questions/44270803/is-there-a-good-way-to-cancel-a-blocking-read , I'm panic!")
} // stops the device and closes the event channel } // stops the device and closes the event channel