diff --git a/.vscode/launch.json b/.vscode/launch.json index fdb146b..c3a3980 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -10,7 +10,7 @@ "request": "launch", "mode": "auto", "program": "${workspaceFolder}", - "args":["-config","example_config/static_mode/n1.yaml","-mode","edge"], + "args":["-config","example_config/p2p_mode/n1.yaml","-mode","edge"/*,"-example"*/], } ] } \ No newline at end of file diff --git a/device/device.go b/device/device.go index 39a70ed..015a51a 100644 --- a/device/device.go +++ b/device/device.go @@ -338,7 +338,7 @@ func NewDevice(tapDevice tap.Device, id config.Vertex, bind conn.Bind, logger *L device.EdgeConfigPath = theconfigpath device.EdgeConfig = econfig device.DRoute = econfig.DynamicRoute - device.DupData = *fixed_time_cache.NewCache(path.S2TD(econfig.DynamicRoute.DupCheckTimeout)) + device.DupData = *fixed_time_cache.NewCache(path.S2TD(econfig.DynamicRoute.DupCheckTimeout), false, path.S2TD(60)) device.event_tryendpoint = make(chan struct{}, 1<<6) device.Event_save_config = make(chan struct{}, 1<<5) device.LogTransit = econfig.LogLevel.LogTransit @@ -347,6 +347,7 @@ func NewDevice(tapDevice tap.Device, id config.Vertex, bind conn.Bind, logger *L go device.RoutineRegister() go device.RoutineSendPing() go device.RoutineRecalculateNhTable() + go device.RoutineSpreadAllMyNeighbor() } // create queues diff --git a/device/peer.go b/device/peer.go index dc538ed..dc3b336 100644 --- a/device/peer.go +++ b/device/peer.go @@ -27,11 +27,12 @@ type Peer struct { handshake Handshake device *Device endpoint conn.Endpoint - endpoint_trylist map[string]time.Time + endpoint_trylist sync.Map //map[string]time.Time LastPingReceived time.Time stopping sync.WaitGroup // routines pending stop - ID config.Vertex + ID config.Vertex + AskedForNeighbor bool // These fields are accessed with atomic operations, which must be // 64-bit aligned even on 32-bit platforms. Go guarantees that an @@ -102,8 +103,6 @@ func (device *Device) NewPeer(pk NoisePublicKey, id config.Vertex) (*Peer, error peer.queue.outbound = newAutodrainingOutboundQueue(device) peer.queue.inbound = newAutodrainingInboundQueue(device) peer.queue.staged = make(chan *QueueOutboundElement, QueueStagedSize) - peer.endpoint_trylist = make(map[string]time.Time) - // map public key _, ok := device.peers.keyMap[pk] if ok { diff --git a/device/receive.go b/device/receive.go index 43bb530..ca4eaf0 100644 --- a/device/receive.go +++ b/device/receive.go @@ -446,6 +446,10 @@ func (peer *Peer) RoutineSequentialReceiver() { } peer.timersDataReceived() + if len(elem.packet) <= path.EgHeaderLen { + device.log.Errorf("Invalid EgHeader from peer %v", peer) + goto skip + } EgHeader, err = path.NewEgHeader(elem.packet[0:path.EgHeaderLen]) // EG header src_nodeID = EgHeader.GetSrc() dst_nodeID = EgHeader.GetDst() @@ -530,7 +534,7 @@ func (peer *Peer) RoutineSequentialReceiver() { fmt.Printf("Received MID:" + strconv.Itoa(int(EgHeader.GetMessageID())) + " From:" + peer.GetEndpointDstStr() + " " + device.sprint_received(packet_type, elem.packet[path.EgHeaderLen:]) + "\n") } } - err = device.process_received(packet_type, elem.packet[path.EgHeaderLen:]) + err = device.process_received(packet_type, peer, elem.packet[path.EgHeaderLen:]) if err != nil { device.log.Errorf(err.Error()) } @@ -539,6 +543,10 @@ func (peer *Peer) RoutineSequentialReceiver() { if should_receive { // Write message to tap device if packet_type == path.NornalPacket { + if len(elem.packet) <= path.EgHeaderLen+12 { + device.log.Errorf("Invalid normal packet from peer %v", peer) + goto skip + } src_macaddr := tap.GetSrcMacAddr(elem.packet[path.EgHeaderLen:]) if !tap.IsBoardCast(src_macaddr) { device.l2fib.Store(src_macaddr, src_nodeID) // Write to l2fib table diff --git a/device/receivesendproc.go b/device/receivesendproc.go index da01df5..759ffaf 100644 --- a/device/receivesendproc.go +++ b/device/receivesendproc.go @@ -47,14 +47,17 @@ func (device *Device) BoardcastPacket(skip_list map[config.Vertex]bool, packet [ for node_id, _ := range skip_list { send_list[node_id] = false } + device.peers.RLock() for node_id, should_send := range send_list { if should_send { device.SendPacket(device.peers.IDMap[node_id], packet, offset) } } + device.peers.RUnlock() } func (device *Device) SpreadPacket(skip_list map[config.Vertex]bool, packet []byte, offset int) { // Send packet to all peers no matter it is alive + device.peers.RLock() for peer_id, peer_out := range device.peers.IDMap { if _, ok := skip_list[peer_id]; ok { if device.LogTransit { @@ -64,10 +67,12 @@ func (device *Device) SpreadPacket(skip_list map[config.Vertex]bool, packet []by } device.SendPacket(peer_out, packet, MessageTransportOffsetContent) } + device.peers.RUnlock() } func (device *Device) TransitBoardcastPacket(src_nodeID config.Vertex, in_id config.Vertex, packet []byte, offset int) { node_boardcast_list := device.graph.GetBoardcastThroughList(device.ID, in_id, src_nodeID) + device.peers.RLock() for peer_id := range node_boardcast_list { peer_out := device.peers.IDMap[peer_id] if device.LogTransit { @@ -75,9 +80,11 @@ func (device *Device) TransitBoardcastPacket(src_nodeID config.Vertex, in_id con } device.SendPacket(peer_out, packet, offset) } + device.peers.RUnlock() } func (device *Device) Send2Super(packet []byte, offset int) { + device.peers.RLock() if device.DRoute.SuperNode.UseSuperNode { for _, peer_out := range device.peers.SuperPeer { if device.LogTransit { @@ -86,6 +93,7 @@ func (device *Device) Send2Super(packet []byte, offset int) { device.SendPacket(peer_out, packet, offset) } } + device.peers.RUnlock() } func (device *Device) CheckNoDup(packet []byte) bool { @@ -97,7 +105,7 @@ func (device *Device) CheckNoDup(packet []byte) bool { return !ok } -func (device *Device) process_received(msg_type path.Usage, body []byte) (err error) { +func (device *Device) process_received(msg_type path.Usage, peer *Peer, body []byte) (err error) { if device.IsSuperNode { switch msg_type { case path.Register: @@ -127,10 +135,10 @@ func (device *Device) process_received(msg_type path.Usage, body []byte) (err er } case path.PongPacket: if content, err := path.ParsePongMsg(body); err == nil { - return device.process_pong(content) + return device.process_pong(peer, content) } - case path.RequestPeer: - if content, err := path.ParseRequestPeerMsg(body); err == nil { + case path.QueryPeer: + if content, err := path.ParseQueryPeerMsg(body); err == nil { return device.process_RequestPeerMsg(content) } case path.BoardcastPeer: @@ -166,8 +174,8 @@ func (device *Device) sprint_received(msg_type path.Usage, body []byte) (ret str if content, err := path.ParsePongMsg(body); err == nil { ret = content.ToString() } - case path.RequestPeer: - if content, err := path.ParseRequestPeerMsg(body); err == nil { + case path.QueryPeer: + if content, err := path.ParseQueryPeerMsg(body); err == nil { ret = content.ToString() } case path.BoardcastPeer: @@ -196,6 +204,7 @@ func (device *Device) process_ping(content path.PingMsg) error { Dst_nodeID: device.ID, Timediff: device.graph.GetCurrentTime().Sub(content.Time), } + device.graph.UpdateLentancy(content.Src_nodeID, device.ID, PongMSG.Timediff, false) body, err := path.GetByte(&PongMSG) if err != nil { return err @@ -218,9 +227,27 @@ func (device *Device) process_ping(content path.PingMsg) error { return nil } -func (device *Device) process_pong(content path.PongMsg) error { +func (device *Device) process_pong(peer *Peer, content path.PongMsg) error { if device.DRoute.P2P.UseP2P { device.graph.UpdateLentancy(content.Src_nodeID, content.Dst_nodeID, content.Timediff, false) + if !peer.AskedForNeighbor { + QueryPeerMsg := path.QueryPeerMsg{ + Request_ID: uint32(device.ID), + } + body, err := path.GetByte(&QueryPeerMsg) + if err != nil { + return err + } + buf := make([]byte, path.EgHeaderLen+len(body)) + header, err := path.NewEgHeader(buf[:path.EgHeaderLen]) + header.SetSrc(device.ID) + header.SetTTL(200) + header.SetUsage(path.QueryPeer) + header.SetPacketLength(uint16(len(body))) + copy(buf[path.EgHeaderLen:], body) + device.SendPacket(peer, buf, MessageTransportOffsetContent) + peer.AskedForNeighbor = true + } } return nil } @@ -277,7 +304,7 @@ func (device *Device) process_UpdatePeerMsg(content path.UpdatePeerMsg) error { if thepeer.LastPingReceived.Add(path.S2TD(device.DRoute.P2P.PeerAliveTimeout)).Before(time.Now()) { //Peer died, try to switch to this new endpoint for url, _ := range peerinfo.Connurl { - thepeer.endpoint_trylist[url] = time.Time{} //another gorouting will process it + thepeer.endpoint_trylist.Store(url, time.Time{}) //another gorouting will process it send_signal = true } } @@ -300,32 +327,36 @@ func (device *Device) RoutineSetEndpoint() { for _, thepeer := range device.peers.IDMap { if thepeer.LastPingReceived.Add(path.S2TD(device.DRoute.P2P.PeerAliveTimeout)).After(time.Now()) { //Peer alives - for url := range thepeer.endpoint_trylist { - delete(thepeer.endpoint_trylist, url) - } + thepeer.endpoint_trylist.Range(func(key, value interface{}) bool { + thepeer.endpoint_trylist.Delete(key) + return true + }) } else { - //Peer died, try to switch to this new endpoint - for url, trytime := range thepeer.endpoint_trylist { + thepeer.endpoint_trylist.Range(func(key interface{}, value interface{}) bool { + url := key.(string) + trytime := value.(time.Time) if trytime.Sub(time.Time{}) != time.Duration(0) && time.Now().Sub(trytime) > path.S2TD(device.DRoute.ConnTimeOut) { - delete(thepeer.endpoint_trylist, url) + thepeer.endpoint_trylist.Delete(key) } else { endpoint, err := device.Bind().ParseEndpoint(url) //trying to bind first url in the list and wait device.DRoute.P2P.PeerAliveTimeout seconds if err != nil { device.log.Errorf("Can't bind " + url) - delete(thepeer.endpoint_trylist, url) + thepeer.endpoint_trylist.Delete(url) + return true } if device.LogControl { fmt.Println("Set endpoint to " + endpoint.DstToString() + " for NodeID:" + strconv.Itoa(int(thepeer.ID))) } thepeer.SetEndpointFromPacket(endpoint) NextRun = true - thepeer.endpoint_trylist[url] = time.Now() + thepeer.endpoint_trylist.Store(key, time.Now()) //Send Ping message to it packet, err := device.GeneratePingPacket(device.ID) device.SendPacket(thepeer, packet, MessageTransportOffsetContent) - break + return false } - } + return true + }) } } ClearChanLoop: @@ -404,7 +435,18 @@ func (device *Device) RoutineRecalculateNhTable() { time.Sleep(device.graph.NodeReportTimeout) } } +} +func (device *Device) RoutineSpreadAllMyNeighbor() { + if !device.DRoute.P2P.UseP2P { + return + } + for { + device.process_RequestPeerMsg(path.QueryPeerMsg{ + Request_ID: 0, + }) + time.Sleep(path.S2TD(device.DRoute.P2P.SendPeerInterval)) + } } func (device *Device) GeneratePingPacket(src_nodeID config.Vertex) ([]byte, error) { @@ -459,12 +501,17 @@ func (device *Device) process_UpdateNhTableMsg(content path.UpdateNhTableMsg) er return nil } -func (device *Device) process_RequestPeerMsg(content path.RequestPeerMsg) error { +func (device *Device) process_RequestPeerMsg(content path.QueryPeerMsg) error { //Send all my peers to all my peers if device.DRoute.P2P.UseP2P { + device.peers.RLock() for pubkey, peer := range device.peers.keyMap { if peer.ID >= path.Special_NodeID { continue } + if peer.endpoint == nil { + continue + } + peer.handshake.mutex.RLock() response := path.BoardcastPeerMsg{ Request_ID: content.Request_ID, NodeID: peer.ID, @@ -472,6 +519,7 @@ func (device *Device) process_RequestPeerMsg(content path.RequestPeerMsg) error PSKey: peer.handshake.presharedKey, ConnURL: peer.endpoint.DstToString(), } + peer.handshake.mutex.RUnlock() body, err := path.GetByte(response) if err != nil { device.log.Errorf("Error at receivesendproc.go line221: ", err) @@ -487,6 +535,7 @@ func (device *Device) process_RequestPeerMsg(content path.RequestPeerMsg) error copy(buf[path.EgHeaderLen:], body) device.SpreadPacket(make(map[config.Vertex]bool), buf, MessageTransportOffsetContent) } + device.peers.RUnlock() } return nil } @@ -494,6 +543,9 @@ func (device *Device) process_RequestPeerMsg(content path.RequestPeerMsg) error func (device *Device) process_BoardcastPeerMsg(content path.BoardcastPeerMsg) error { if device.DRoute.P2P.UseP2P { var sk NoisePublicKey + if bytes.Equal(content.PubKey[:], device.staticIdentity.publicKey[:]) { + return nil + } copy(sk[:], content.PubKey[:]) thepeer := device.LookupPeer(sk) if thepeer == nil { //not exist in local @@ -511,7 +563,8 @@ func (device *Device) process_BoardcastPeerMsg(content path.BoardcastPeerMsg) er } if thepeer.LastPingReceived.Add(path.S2TD(device.DRoute.P2P.PeerAliveTimeout)).Before(time.Now()) { //Peer died, try to switch to this new endpoint - thepeer.endpoint_trylist[content.ConnURL] = time.Time{} //another gorouting will process it + thepeer.endpoint_trylist.Store(content.ConnURL, time.Time{}) //another gorouting will process it + device.event_tryendpoint <- struct{}{} } } diff --git a/example_config/p2p_mode/n1.yaml b/example_config/p2p_mode/n1.yaml index c6a1879..2a26d5b 100644 --- a/example_config/p2p_mode/n1.yaml +++ b/example_config/p2p_mode/n1.yaml @@ -14,12 +14,12 @@ listenport: 3001 loglevel: loglevel: normal` logtransit: true - logcontrol: false + logcontrol: true dynamicroute: sendpinginterval: 20 dupchecktimeout: 40 conntimeout: 30 - savenewpeers: true + savenewpeers: false supernode: usesupernode: false connurlv4: 127.0.0.1:3000 @@ -29,7 +29,7 @@ dynamicroute: apiurl: http://127.0.0.1:3000/api supernodeinfotimeout: 40 p2p: - usep2p: false + usep2p: true sendpeerinterval: 20 peeralivetimeout: 30 graphrecalculatesetting: diff --git a/example_config/p2p_mode/n2.yaml b/example_config/p2p_mode/n2.yaml index 4711e53..f08c80e 100644 --- a/example_config/p2p_mode/n2.yaml +++ b/example_config/p2p_mode/n2.yaml @@ -14,12 +14,12 @@ listenport: 3002 loglevel: loglevel: normal logtransit: true - logcontrol: false + logcontrol: true dynamicroute: sendpinginterval: 20 dupchecktimeout: 40 conntimeout: 30 - savenewpeers: true + savenewpeers: false supernode: usesupernode: false connurlv4: 127.0.0.1:3000 @@ -29,7 +29,7 @@ dynamicroute: apiurl: http://127.0.0.1:3000/api supernodeinfotimeout: 40 p2p: - usep2p: false + usep2p: true sendpeerinterval: 20 peeralivetimeout: 30 graphrecalculatesetting: diff --git a/example_config/p2p_mode/n3.yaml b/example_config/p2p_mode/n3.yaml index 94ff448..869ac9b 100644 --- a/example_config/p2p_mode/n3.yaml +++ b/example_config/p2p_mode/n3.yaml @@ -14,12 +14,12 @@ listenport: 3003 loglevel: loglevel: normal logtransit: true - logcontrol: false + logcontrol: true dynamicroute: sendpinginterval: 20 dupchecktimeout: 40 conntimeout: 30 - savenewpeers: true + savenewpeers: false supernode: usesupernode: false connurlv4: 127.0.0.1:3000 @@ -29,7 +29,7 @@ dynamicroute: apiurl: http://127.0.0.1:3000/api supernodeinfotimeout: 40 p2p: - usep2p: false + usep2p: true sendpeerinterval: 20 peeralivetimeout: 30 graphrecalculatesetting: diff --git a/example_config/p2p_mode/n4.yaml b/example_config/p2p_mode/n4.yaml index 4cb926a..41df1ba 100644 --- a/example_config/p2p_mode/n4.yaml +++ b/example_config/p2p_mode/n4.yaml @@ -14,12 +14,12 @@ listenport: 3004 loglevel: loglevel: normal logtransit: true - logcontrol: false + logcontrol: true dynamicroute: sendpinginterval: 20 dupchecktimeout: 40 conntimeout: 30 - savenewpeers: true + savenewpeers: false supernode: usesupernode: false connurlv4: 127.0.0.1:3000 @@ -29,7 +29,7 @@ dynamicroute: apiurl: http://127.0.0.1:3000/api supernodeinfotimeout: 40 p2p: - usep2p: false + usep2p: true sendpeerinterval: 20 peeralivetimeout: 30 graphrecalculatesetting: diff --git a/example_config/p2p_mode/n5.yaml b/example_config/p2p_mode/n5.yaml index 0b54f22..26aacf6 100644 --- a/example_config/p2p_mode/n5.yaml +++ b/example_config/p2p_mode/n5.yaml @@ -14,12 +14,12 @@ listenport: 3005 loglevel: loglevel: normal logtransit: true - logcontrol: false + logcontrol: true dynamicroute: sendpinginterval: 20 dupchecktimeout: 40 conntimeout: 30 - savenewpeers: true + savenewpeers: false supernode: usesupernode: false connurlv4: 127.0.0.1:3000 @@ -29,7 +29,7 @@ dynamicroute: apiurl: http://127.0.0.1:3000/api supernodeinfotimeout: 40 p2p: - usep2p: false + usep2p: true sendpeerinterval: 20 peeralivetimeout: 30 graphrecalculatesetting: diff --git a/example_config/p2p_mode/n6.yaml b/example_config/p2p_mode/n6.yaml index 51b9c87..daf6f9a 100644 --- a/example_config/p2p_mode/n6.yaml +++ b/example_config/p2p_mode/n6.yaml @@ -14,12 +14,12 @@ listenport: 3006 loglevel: loglevel: normal logtransit: true - logcontrol: false + logcontrol: true dynamicroute: sendpinginterval: 20 dupchecktimeout: 40 conntimeout: 30 - savenewpeers: true + savenewpeers: false supernode: usesupernode: false connurlv4: 127.0.0.1:3000 @@ -29,7 +29,7 @@ dynamicroute: apiurl: http://127.0.0.1:3000/api supernodeinfotimeout: 40 p2p: - usep2p: false + usep2p: true sendpeerinterval: 20 peeralivetimeout: 30 graphrecalculatesetting: diff --git a/example_config/static_mode/n1.yaml b/example_config/static_mode/n1.yaml index c6a1879..fd86ecc 100644 --- a/example_config/static_mode/n1.yaml +++ b/example_config/static_mode/n1.yaml @@ -14,7 +14,7 @@ listenport: 3001 loglevel: loglevel: normal` logtransit: true - logcontrol: false + logcontrol: true dynamicroute: sendpinginterval: 20 dupchecktimeout: 40 diff --git a/example_config/static_mode/n2.yaml b/example_config/static_mode/n2.yaml index 4711e53..e5dbcc0 100644 --- a/example_config/static_mode/n2.yaml +++ b/example_config/static_mode/n2.yaml @@ -14,7 +14,7 @@ listenport: 3002 loglevel: loglevel: normal logtransit: true - logcontrol: false + logcontrol: true dynamicroute: sendpinginterval: 20 dupchecktimeout: 40 diff --git a/example_config/static_mode/n3.yaml b/example_config/static_mode/n3.yaml index 94ff448..19d8274 100644 --- a/example_config/static_mode/n3.yaml +++ b/example_config/static_mode/n3.yaml @@ -14,7 +14,7 @@ listenport: 3003 loglevel: loglevel: normal logtransit: true - logcontrol: false + logcontrol: true dynamicroute: sendpinginterval: 20 dupchecktimeout: 40 diff --git a/example_config/static_mode/n4.yaml b/example_config/static_mode/n4.yaml index 4cb926a..6710b57 100644 --- a/example_config/static_mode/n4.yaml +++ b/example_config/static_mode/n4.yaml @@ -14,7 +14,7 @@ listenport: 3004 loglevel: loglevel: normal logtransit: true - logcontrol: false + logcontrol: true dynamicroute: sendpinginterval: 20 dupchecktimeout: 40 diff --git a/example_config/static_mode/n5.yaml b/example_config/static_mode/n5.yaml index 0b54f22..7154ef9 100644 --- a/example_config/static_mode/n5.yaml +++ b/example_config/static_mode/n5.yaml @@ -14,7 +14,7 @@ listenport: 3005 loglevel: loglevel: normal logtransit: true - logcontrol: false + logcontrol: true dynamicroute: sendpinginterval: 20 dupchecktimeout: 40 diff --git a/example_config/static_mode/n6.yaml b/example_config/static_mode/n6.yaml index 51b9c87..6b3fcdb 100644 --- a/example_config/static_mode/n6.yaml +++ b/example_config/static_mode/n6.yaml @@ -14,7 +14,7 @@ listenport: 3006 loglevel: loglevel: normal logtransit: true - logcontrol: false + logcontrol: true dynamicroute: sendpinginterval: 20 dupchecktimeout: 40 diff --git a/go.mod b/go.mod index ddf3574..9af1242 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/KusakabeSi/EtherGuardVPN go 1.16 require ( - github.com/KusakabeSi/go-cache v0.0.0-20210817164551-57817be43e28 + github.com/KusakabeSi/go-cache v0.0.0-20210823132304-22b5b1d22b41 golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57 gopkg.in/yaml.v2 v2.4.0 diff --git a/path/header.go b/path/header.go index 89eece2..1732f60 100644 --- a/path/header.go +++ b/path/header.go @@ -24,7 +24,7 @@ const ( PingPacket //Comes from other peer PongPacket //Send to everyone, include server - RequestPeer + QueryPeer BoardcastPeer ) diff --git a/path/metamessage.go b/path/metamessage.go index 03bfaf4..35dd1d4 100644 --- a/path/metamessage.go +++ b/path/metamessage.go @@ -105,15 +105,15 @@ func ParsePongMsg(bin []byte) (StructPlace PongMsg, err error) { return } -type RequestPeerMsg struct { +type QueryPeerMsg struct { Request_ID uint32 `struc:"uint32"` } -func (c *RequestPeerMsg) ToString() string { - return "RequestPeerMsg Request_ID:" + strconv.Itoa(int(c.Request_ID)) +func (c *QueryPeerMsg) ToString() string { + return "QueryPeerMsg Request_ID:" + strconv.Itoa(int(c.Request_ID)) } -func ParseRequestPeerMsg(bin []byte) (StructPlace RequestPeerMsg, err error) { +func ParseQueryPeerMsg(bin []byte) (StructPlace QueryPeerMsg, err error) { var b bytes.Buffer b.Write(bin) d := gob.NewDecoder(&b) diff --git a/path/path.go b/path/path.go index a6d2788..18992f5 100644 --- a/path/path.go +++ b/path/path.go @@ -46,7 +46,7 @@ type Fullroute struct { type IG struct { Vert map[config.Vertex]bool edges map[config.Vertex]map[config.Vertex]Latency - edgelock sync.RWMutex + edgelock *sync.RWMutex JitterTolerance float64 JitterToleranceMultiplier float64 NodeReportTimeout time.Duration @@ -66,6 +66,7 @@ func S2TD(secs float64) time.Duration { func NewGraph(num_node int, IsSuperMode bool, theconfig config.GraphRecalculateSetting) *IG { g := IG{ + edgelock: &sync.RWMutex{}, JitterTolerance: theconfig.JitterTolerance, JitterToleranceMultiplier: theconfig.JitterToleranceMultiplier, NodeReportTimeout: S2TD(theconfig.NodeReportTimeout), @@ -126,13 +127,11 @@ func (g *IG) UpdateLentancy(u, v config.Vertex, dt time.Duration, checkchange bo g.edgelock.Lock() g.Vert[u] = true g.Vert[v] = true - g.edgelock.Unlock() w := float64(dt) / float64(time.Second) if _, ok := g.edges[u]; !ok { - g.edgelock.Lock() g.edges[u] = make(map[config.Vertex]Latency) - g.edgelock.Unlock() } + g.edgelock.Unlock() if g.ShouldUpdate(u, v, w) { changed = g.RecalculateNhTable(checkchange) } @@ -145,10 +144,16 @@ func (g *IG) UpdateLentancy(u, v config.Vertex, dt time.Duration, checkchange bo return } func (g IG) Vertices() map[config.Vertex]bool { - return g.Vert + vr := make(map[config.Vertex]bool) + for k, v := range g.Vert { //copy a new list + vr[k] = v + } + return vr } func (g IG) Neighbors(v config.Vertex) (vs []config.Vertex) { - for k := range g.edges[v] { + g.edgelock.RLock() + defer g.edgelock.RUnlock() + for k := range g.edges[v] { //copy a new list vs = append(vs, k) } return vs @@ -165,10 +170,14 @@ func (g IG) Next(u, v config.Vertex) *config.Vertex { } func (g IG) Weight(u, v config.Vertex) float64 { + g.edgelock.RLock() + defer g.edgelock.RUnlock() if _, ok := g.edges[u]; !ok { + g.edgelock.RUnlock() g.edgelock.Lock() g.edges[u] = make(map[config.Vertex]Latency) g.edgelock.Unlock() + g.edgelock.RLock() return Infinity } if _, ok := g.edges[u][v]; !ok { diff --git a/tap/tap_stdio.go b/tap/tap_stdio.go index 38d301d..7f71590 100644 --- a/tap/tap_stdio.go +++ b/tap/tap_stdio.go @@ -92,6 +92,7 @@ func (tap *StdIOTap) Events() chan Event { func (tap *StdIOTap) Close() error { tap.events <- EventDown os.Stdin.Close() + os.Stdin.WriteString("end\n") close(tap.events) return nil } // stops the device and closes the event channel