diff --git a/device/receive.go b/device/receive.go index 912d1dd..c968396 100644 --- a/device/receive.go +++ b/device/receive.go @@ -495,7 +495,7 @@ func (peer *Peer) RoutineSequentialReceiver() { should_process = true } default: - if _, ok := device.graph.NhTable[device.ID][dst_nodeID]; ok { + if device.graph.Next(device.ID, dst_nodeID) != nil { should_transfer = true } else { device.log.Verbosef("No route to peer ID %v", dst_nodeID) @@ -517,12 +517,14 @@ func (peer *Peer) RoutineSequentialReceiver() { device.SpreadPacket(skip_list, elem.packet, MessageTransportOffsetContent) } else { - next_id := *device.graph.NhTable[device.ID][dst_nodeID] - peer_out = device.peers.IDMap[next_id] - if device.LogLevel.LogTransit { - fmt.Printf("Transfer packet from %d through %d to %d\n", peer.ID, device.ID, peer_out.ID) + next_id := device.graph.Next(device.ID, dst_nodeID) + if next_id != nil { + peer_out = device.peers.IDMap[*next_id] + if device.LogLevel.LogTransit { + fmt.Printf("Transfer packet from %d through %d to %d\n", peer.ID, device.ID, peer_out.ID) + } + device.SendPacket(peer_out, elem.packet, MessageTransportOffsetContent) } - device.SendPacket(peer_out, elem.packet, MessageTransportOffsetContent) } } } @@ -531,7 +533,7 @@ func (peer *Peer) RoutineSequentialReceiver() { if packet_type != path.NornalPacket { if device.LogLevel.LogControl { if peer.GetEndpointDstStr() != "" { - fmt.Printf("Received MID:" + strconv.Itoa(int(EgHeader.GetMessageID())) + " From:" + peer.GetEndpointDstStr() + " " + device.sprint_received(packet_type, elem.packet[path.EgHeaderLen:]) + "\n") + fmt.Println("Received MID:" + strconv.Itoa(int(EgHeader.GetMessageID())) + " From:" + peer.GetEndpointDstStr() + " " + device.sprint_received(packet_type, elem.packet[path.EgHeaderLen:])) } } err = device.process_received(packet_type, peer, elem.packet[path.EgHeaderLen:]) diff --git a/device/receivesendproc.go b/device/receivesendproc.go index 722bc4d..4420499 100644 --- a/device/receivesendproc.go +++ b/device/receivesendproc.go @@ -2,6 +2,7 @@ package device import ( "bytes" + "encoding/base64" "encoding/json" "errors" "fmt" @@ -33,7 +34,7 @@ func (device *Device) SendPacket(peer *Peer, packet []byte, offset int) { device.MsgCount += 1 EgHeader.SetMessageID(device.MsgCount) if peer.GetEndpointDstStr() != "" { - fmt.Printf("Send MID:" + strconv.Itoa(int(device.MsgCount)) + " To:" + peer.GetEndpointDstStr() + " " + device.sprint_received(EgHeader.GetUsage(), packet[path.EgHeaderLen:]) + "\n") + fmt.Println("Send MID:" + strconv.Itoa(int(device.MsgCount)) + " To:" + peer.GetEndpointDstStr() + " " + device.sprint_received(EgHeader.GetUsage(), packet[path.EgHeaderLen:])) } } } @@ -211,7 +212,9 @@ func (device *Device) process_ping(content path.PingMsg) error { Dst_nodeID: device.ID, Timediff: device.graph.GetCurrentTime().Sub(content.Time), } - device.graph.UpdateLentancy(content.Src_nodeID, device.ID, PongMSG.Timediff, false) + if device.DRoute.P2P.UseP2P && time.Now().After(device.graph.NhTableExpire) { + device.graph.UpdateLentancy(content.Src_nodeID, device.ID, PongMSG.Timediff, false) + } body, err := path.GetByte(&PongMSG) if err != nil { return err @@ -236,7 +239,9 @@ func (device *Device) process_ping(content path.PingMsg) error { func (device *Device) process_pong(peer *Peer, content path.PongMsg) error { if device.DRoute.P2P.UseP2P { - device.graph.UpdateLentancy(content.Src_nodeID, content.Dst_nodeID, content.Timediff, false) + if time.Now().After(device.graph.NhTableExpire) { + device.graph.UpdateLentancy(content.Src_nodeID, content.Dst_nodeID, content.Timediff, false) + } if !peer.AskedForNeighbor { QueryPeerMsg := path.QueryPeerMsg{ Request_ID: uint32(device.ID), @@ -302,6 +307,9 @@ func (device *Device) process_UpdatePeerMsg(content path.UpdatePeerMsg) error { } thepeer := device.LookupPeer(sk) if thepeer == nil { //not exist in local + if device.LogLevel.LogControl { + fmt.Println("Add new peer to local ID:" + peerinfo.NodeID.ToString() + " PubKey:" + pubkey) + } if device.graph.Weight(device.ID, peerinfo.NodeID) == path.Infinity { // add node to graph device.graph.UpdateLentancy(device.ID, peerinfo.NodeID, path.S2TD(path.Infinity), false) } @@ -508,6 +516,10 @@ func (device *Device) GeneratePingPacket(src_nodeID config.Vertex) ([]byte, erro func (device *Device) process_UpdateNhTableMsg(content path.UpdateNhTableMsg) error { if device.DRoute.SuperNode.UseSuperNode { if bytes.Equal(device.graph.NhTableHash[:], content.State_hash[:]) { + if device.LogLevel.LogControl { + fmt.Println("Same State_hash, skip download nhTable") + } + device.graph.NhTableExpire = time.Now().Add(device.graph.SuperNodeInfoTimeout) return nil } var NhTable config.NextHopTable @@ -595,6 +607,9 @@ func (device *Device) process_BoardcastPeerMsg(peer *Peer, content path.Boardcas copy(sk[:], content.PubKey[:]) thepeer := device.LookupPeer(sk) if thepeer == nil { //not exist in local + if device.LogLevel.LogControl { + fmt.Println("Add new peer to local ID:" + content.NodeID.ToString() + " PubKey:" + base64.StdEncoding.EncodeToString(content.PubKey[:])) + } if device.graph.Weight(device.ID, content.NodeID) == path.Infinity { // add node to graph device.graph.UpdateLentancy(device.ID, content.NodeID, path.S2TD(path.Infinity), false) } diff --git a/device/send.go b/device/send.go index f78c0a5..c6bac83 100644 --- a/device/send.go +++ b/device/send.go @@ -267,20 +267,22 @@ func (device *Device) RoutineReadFromTUN() { if dst_nodeID != config.Boardcast { var peer *Peer - next_id := *device.graph.NhTable[device.ID][dst_nodeID] - peer = device.peers.IDMap[next_id] - if peer == nil { - continue - } - if device.LogLevel.LogNormal { - if device.LogLevel.LogNormal { - fmt.Println("Send Normal packet To:" + peer.GetEndpointDstStr() + " SrcID:" + device.ID.ToString() + " DstID:" + dst_nodeID.ToString() + " Len:" + strconv.Itoa(len(elem.packet))) + next_id := device.graph.Next(device.ID, dst_nodeID) + if next_id != nil { + peer = device.peers.IDMap[*next_id] + if peer == nil { + continue + } + if device.LogLevel.LogNormal { + if device.LogLevel.LogNormal { + fmt.Println("Send Normal packet To:" + peer.GetEndpointDstStr() + " SrcID:" + device.ID.ToString() + " DstID:" + dst_nodeID.ToString() + " Len:" + strconv.Itoa(len(elem.packet))) + } + } + if peer.isRunning.Get() { + peer.StagePacket(elem) + elem = nil + peer.SendStagedPackets() } - } - if peer.isRunning.Get() { - peer.StagePacket(elem) - elem = nil - peer.SendStagedPackets() } } else { device.BoardcastPacket(make(map[config.Vertex]bool, 0), elem.packet, offset) diff --git a/example_config/super_mode/n1.yaml b/example_config/super_mode/n1.yaml index 7c91246..a7d126a 100644 --- a/example_config/super_mode/n1.yaml +++ b/example_config/super_mode/n1.yaml @@ -1,5 +1,5 @@ interface: - itype: udpsock + itype: stdio name: tap1 vppifaceid: 1 vppbridgeid: 0 @@ -15,7 +15,7 @@ listenport: 3001 loglevel: loglevel: normal logtransit: true - logcontrol: false + logcontrol: true lognormal: true dynamicroute: sendpinginterval: 20 @@ -26,7 +26,7 @@ dynamicroute: usesupernode: true connurlv4: 127.0.0.1:3000 pubkeyv4: LJ8KKacUcIoACTGB/9Ed9w0osrJ3WWeelzpL2u4oUic= - connurlv6: '[::1]:3000' + connurlv6: '' pubkeyv6: HCfL6YJtpJEGHTlJ2LgVXIWKB/K95P57LHTJ42ZG8VI= apiurl: http://127.0.0.1:3000/api supernodeinfotimeout: 40 diff --git a/example_config/super_mode/n2.yaml b/example_config/super_mode/n2.yaml index ebb5b32..5af0b91 100644 --- a/example_config/super_mode/n2.yaml +++ b/example_config/super_mode/n2.yaml @@ -15,7 +15,7 @@ listenport: 3002 loglevel: loglevel: normal logtransit: true - logcontrol: false + logcontrol: true lognormal: true dynamicroute: sendpinginterval: 20 @@ -26,7 +26,7 @@ dynamicroute: usesupernode: true connurlv4: 127.0.0.1:3000 pubkeyv4: LJ8KKacUcIoACTGB/9Ed9w0osrJ3WWeelzpL2u4oUic= - connurlv6: '[::1]:3000' + connurlv6: '' pubkeyv6: HCfL6YJtpJEGHTlJ2LgVXIWKB/K95P57LHTJ42ZG8VI= apiurl: http://127.0.0.1:3000/api supernodeinfotimeout: 40 diff --git a/main_super.go b/main_super.go index cea1db2..f52523b 100644 --- a/main_super.go +++ b/main_super.go @@ -287,15 +287,14 @@ func PushNhTable() { header.SetTTL(0) header.SetUsage(path.UpdateNhTable) copy(buf[path.EgHeaderLen:], body) - for pkstr, peerstate := range http_PeerState { - if peerstate.NhTableState != http_NhTable_Hash { - if peer := http_device4.LookupPeerByStr(pkstr); peer != nil { - http_device4.SendPacket(peer, buf, device.MessageTransportOffsetContent) - } - if peer := http_device6.LookupPeerByStr(pkstr); peer != nil { - http_device6.SendPacket(peer, buf, device.MessageTransportOffsetContent) - } + for pkstr, _ := range http_PeerState { + if peer := http_device4.LookupPeerByStr(pkstr); peer != nil && peer.GetEndpointDstStr() != "" { + http_device4.SendPacket(peer, buf, device.MessageTransportOffsetContent) } + if peer := http_device6.LookupPeerByStr(pkstr); peer != nil && peer.GetEndpointDstStr() != "" { + http_device6.SendPacket(peer, buf, device.MessageTransportOffsetContent) + } + } } diff --git a/path/metamessage.go b/path/metamessage.go index 8c795cb..76ef170 100644 --- a/path/metamessage.go +++ b/path/metamessage.go @@ -2,6 +2,7 @@ package path import ( "bytes" + "encoding/base64" "encoding/gob" "strconv" "time" @@ -27,7 +28,7 @@ type RegisterMsg struct { } func (c *RegisterMsg) ToString() string { - return "RegisterMsg Node_id:" + c.Node_id.ToString() + return "RegisterMsg Node_id:" + c.Node_id.ToString() + " Name:" + c.Name + " PeerHash" + base64.StdEncoding.EncodeToString(c.PeerStateHash[:]) + " NhHash:" + base64.StdEncoding.EncodeToString(c.NhStateHash[:]) } func ParseRegisterMsg(bin []byte) (StructPlace RegisterMsg, err error) { diff --git a/path/path.go b/path/path.go index 152c5d6..88a0f0e 100644 --- a/path/path.go +++ b/path/path.go @@ -46,7 +46,7 @@ type IG struct { RecalculateCoolDown time.Duration RecalculateTime time.Time dlTable config.DistTable - NhTable config.NextHopTable + nhTable config.NextHopTable NhTableHash [32]byte NhTableExpire time.Time IsSuperMode bool @@ -115,7 +115,7 @@ func (g *IG) RecalculateNhTable(checkchange bool) (changed bool) { } } } - g.dlTable, g.NhTable = dist, next + g.dlTable, g.nhTable = dist, next g.NhTableExpire = time.Now().Add(g.NodeReportTimeout) g.RecalculateTime = time.Now() } @@ -130,16 +130,14 @@ func (g *IG) UpdateLentancy(u, v config.Vertex, dt time.Duration, checkchange bo if _, ok := g.edges[u]; !ok { g.edges[u] = make(map[config.Vertex]Latency) } - g.edgelock.Unlock() - if g.ShouldUpdate(u, v, w) { - changed = g.RecalculateNhTable(checkchange) - } - g.edgelock.Lock() g.edges[u][v] = Latency{ ping: w, time: time.Now(), } g.edgelock.Unlock() + if g.ShouldUpdate(u, v, w) { + changed = g.RecalculateNhTable(checkchange) + } return } func (g IG) Vertices() map[config.Vertex]bool { @@ -161,13 +159,13 @@ func (g IG) Neighbors(v config.Vertex) (vs []config.Vertex) { } func (g IG) Next(u, v config.Vertex) *config.Vertex { - if _, ok := g.NhTable[u]; !ok { + if _, ok := g.nhTable[u]; !ok { return nil } - if _, ok := g.NhTable[u][v]; !ok { + if _, ok := g.nhTable[u][v]; !ok { return nil } - return g.NhTable[u][v] + return g.nhTable[u][v] } func (g IG) Weight(u, v config.Vertex) float64 { @@ -238,7 +236,7 @@ func Path(u, v config.Vertex, next config.NextHopTable) (path []config.Vertex) { } func (g *IG) SetNHTable(nh config.NextHopTable, table_hash [32]byte) { // set nhTable from supernode - g.NhTable = nh + g.nhTable = nh g.NhTableHash = table_hash g.NhTableExpire = time.Now().Add(g.SuperNodeInfoTimeout) } @@ -247,7 +245,7 @@ func (g *IG) GetNHTable(checkChange bool) config.NextHopTable { if time.Now().After(g.NhTableExpire) { g.RecalculateNhTable(checkChange) } - return g.NhTable + return g.nhTable } func (g *IG) GetDtst() config.DistTable { @@ -270,7 +268,7 @@ func (g *IG) GetEdges() (edges map[config.Vertex]map[config.Vertex]float64) { func (g *IG) GetBoardcastList(id config.Vertex) (tosend map[config.Vertex]bool) { tosend = make(map[config.Vertex]bool) - for _, element := range g.NhTable[id] { + for _, element := range g.nhTable[id] { tosend[*element] = true } return @@ -279,7 +277,7 @@ func (g *IG) GetBoardcastList(id config.Vertex) (tosend map[config.Vertex]bool) func (g *IG) GetBoardcastThroughList(self_id config.Vertex, in_id config.Vertex, src_id config.Vertex) (tosend map[config.Vertex]bool) { tosend = make(map[config.Vertex]bool) for check_id, _ := range g.GetBoardcastList(self_id) { - for _, path_node := range Path(src_id, check_id, g.NhTable) { + for _, path_node := range Path(src_id, check_id, g.nhTable) { if path_node == self_id && check_id != in_id { tosend[check_id] = true continue diff --git a/tap/tap_stdio.go b/tap/tap_stdio.go index 7920bcc..ef57aa8 100644 --- a/tap/tap_stdio.go +++ b/tap/tap_stdio.go @@ -134,5 +134,5 @@ func (tap *StdIOTap) Close() error { os.Stdin.Close() os.Stdin.WriteString("end\n") close(tap.events) - return nil + panic("No solution for this issue: https://stackoverflow.com/questions/44270803/is-there-a-good-way-to-cancel-a-blocking-read , I'm panic!") } // stops the device and closes the event channel