From 7951ba2f6af7770e2abc6a6a1a88a44d1cd0507f Mon Sep 17 00:00:00 2001 From: KusakabeSi Date: Mon, 20 Sep 2021 16:27:53 +0000 Subject: [PATCH] fix supernode ipv6 problem, add error message --- .vscode/launch.json | 2 +- conn/bind_linux.go | 6 ++ device/receive.go | 9 +- device/receivesendproc.go | 156 +++++++++++++++++++++--------- example_config/super_mode/n1.yaml | 2 +- main_super.go | 7 +- path/header.go | 2 + path/metamessage.go | 65 ++++++++++--- 8 files changed, 179 insertions(+), 70 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index c18040f..5a6bdde 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -12,7 +12,7 @@ "program": "${workspaceFolder}", "buildFlags": "-tags=vpp", "env": {"CGO_CFLAGS":"-I/usr/include/memif"}, - "args":["-config","example_config/super_mode/n1.yaml","-mode","edge"/*,"-example"*/], + "args":["-config","example_config/super_mode/s1.yaml","-mode","super"/*,"-example"*/], } ] } \ No newline at end of file diff --git a/conn/bind_linux.go b/conn/bind_linux.go index b78c272..0880a7a 100644 --- a/conn/bind_linux.go +++ b/conn/bind_linux.go @@ -129,7 +129,13 @@ again: // Attempt ipv6 bind, update port if successful. sock6, newPort, err = create6(port) if err != nil { + if originalPort == 0 && errors.Is(err, syscall.EADDRINUSE) && tries < 100 { + unix.Close(sock4) + tries++ + goto again + } if !errors.Is(err, syscall.EAFNOSUPPORT) { + unix.Close(sock4) return nil, 0, err } } else { diff --git a/device/receive.go b/device/receive.go index 36139a2..3a83084 100644 --- a/device/receive.go +++ b/device/receive.go @@ -549,12 +549,17 @@ func (peer *Peer) RoutineSequentialReceiver() { fmt.Println("Normal: Reveived Normal packet From:" + peer.GetEndpointDstStr() + " SrcID:" + src_nodeID.ToString() + " DstID:" + dst_nodeID.ToString() + " Len:" + strconv.Itoa(len(elem.packet))) } if len(elem.packet) <= path.EgHeaderLen+12 { - device.log.Errorf("Invalid normal packet from peer %v", peer) + device.log.Errorf("Invalid normal packet from peer %v", peer.ID.ToString()) goto skip } src_macaddr := tap.GetSrcMacAddr(elem.packet[path.EgHeaderLen:]) if !tap.IsNotUnicast(src_macaddr) { - device.l2fib.Store(src_macaddr, src_nodeID) // Write to l2fib table + actual, loaded := device.l2fib.LoadOrStore(src_macaddr, src_nodeID) + if loaded { + if actual.(config.Vertex) != src_nodeID { + device.l2fib.Store(src_macaddr, src_nodeID) // Write to l2fib table + } + } } _, err = device.tap.device.Write(elem.buffer[:MessageTransportOffsetContent+len(elem.packet)], MessageTransportOffsetContent+path.EgHeaderLen) if err != nil && !device.isClosed() { diff --git a/device/receivesendproc.go b/device/receivesendproc.go index f708bbd..ce3ab3e 100644 --- a/device/receivesendproc.go +++ b/device/receivesendproc.go @@ -20,6 +20,8 @@ import ( func (device *Device) SendPacket(peer *Peer, packet []byte, offset int) { if peer == nil { return + } else if peer.endpoint == nil { + return } if device.LogLevel.LogNormal { EgHeader, _ := path.NewEgHeader(packet[:path.EgHeaderLen]) @@ -118,7 +120,7 @@ func (device *Device) process_received(msg_type path.Usage, peer *Peer, body []b switch msg_type { case path.Register: if content, err := path.ParseRegisterMsg(body); err == nil { - return device.server_process_RegisterMsg(content) + return device.server_process_RegisterMsg(peer, content) } case path.PongPacket: if content, err := path.ParsePongMsg(body); err == nil { @@ -131,11 +133,15 @@ func (device *Device) process_received(msg_type path.Usage, peer *Peer, body []b switch msg_type { case path.UpdatePeer: if content, err := path.ParseUpdatePeerMsg(body); err == nil { - go device.process_UpdatePeerMsg(content) + go device.process_UpdatePeerMsg(peer, content) } case path.UpdateNhTable: if content, err := path.ParseUpdateNhTableMsg(body); err == nil { - go device.process_UpdateNhTableMsg(content) + go device.process_UpdateNhTableMsg(peer, content) + } + case path.UpdateError: + if content, err := path.ParseUpdateErrorMsg(body); err == nil { + device.process_UpdateErrorMsg(peer, content) } case path.PingPacket: if content, err := path.ParsePingMsg(body); err == nil { @@ -174,6 +180,10 @@ func (device *Device) sprint_received(msg_type path.Usage, body []byte) (ret str if content, err := path.ParseUpdateNhTableMsg(body); err == nil { ret = content.ToString() } + case path.UpdateError: + if content, err := path.ParseUpdateErrorMsg(body); err == nil { + ret = content.ToString() + } case path.PingPacket: if content, err := path.ParsePingMsg(body); err == nil { ret = content.ToString() @@ -196,7 +206,29 @@ func (device *Device) sprint_received(msg_type path.Usage, body []byte) (ret str return } -func (device *Device) server_process_RegisterMsg(content path.RegisterMsg) error { +func (device *Device) server_process_RegisterMsg(peer *Peer, content path.RegisterMsg) error { + if peer.ID != content.Node_id { + UpdateErrorMsg := path.UpdateErrorMsg{ + Node_id: peer.ID, + Action: path.Shutdown, + ErrorCode: 401, + ErrorMsg: "Your node ID is not match with our nodeID", + } + body, err := path.GetByte(&UpdateErrorMsg) + if err != nil { + return err + } + buf := make([]byte, path.EgHeaderLen+len(body)) + header, err := path.NewEgHeader(buf[:path.EgHeaderLen]) + header.SetSrc(device.ID) + header.SetTTL(200) + header.SetUsage(path.UpdateError) + header.SetPacketLength(uint16(len(body))) + copy(buf[path.EgHeaderLen:], body) + header.SetDst(config.ControlMessage) + device.SendPacket(peer, buf, MessageTransportOffsetContent) + return nil + } device.Event_server_register <- content return nil } @@ -263,9 +295,15 @@ func (device *Device) process_pong(peer *Peer, content path.PongMsg) error { return nil } -func (device *Device) process_UpdatePeerMsg(content path.UpdatePeerMsg) error { +func (device *Device) process_UpdatePeerMsg(peer *Peer, content path.UpdatePeerMsg) error { var send_signal bool if device.DRoute.SuperNode.UseSuperNode { + if peer.ID != config.SuperNodeMessage { + if device.LogLevel.LogControl { + fmt.Println("Control: Ignored UpdateErrorMsg. Not from supernode.") + } + return nil + } var peer_infos config.HTTP_Peers if bytes.Equal(device.peers.Peer_state[:], content.State_hash[:]) { if device.LogLevel.LogControl { @@ -343,6 +381,71 @@ func (device *Device) process_UpdatePeerMsg(content path.UpdatePeerMsg) error { return nil } +func (device *Device) process_UpdateNhTableMsg(peer *Peer, content path.UpdateNhTableMsg) error { + if device.DRoute.SuperNode.UseSuperNode { + if peer.ID != config.SuperNodeMessage { + if device.LogLevel.LogControl { + fmt.Println("Control: Ignored UpdateErrorMsg. Not from supernode.") + } + return nil + } + if bytes.Equal(device.graph.NhTableHash[:], content.State_hash[:]) { + if device.LogLevel.LogControl { + fmt.Println("Control: Same nhTable Hash, skip download nhTable") + } + device.graph.NhTableExpire = time.Now().Add(device.graph.SuperNodeInfoTimeout) + return nil + } + var NhTable config.NextHopTable + if bytes.Equal(device.graph.NhTableHash[:], content.State_hash[:]) { + return nil + } + downloadurl := device.DRoute.SuperNode.APIUrl + "/nhtable?PubKey=" + url.QueryEscape(PubKey2Str(device.staticIdentity.publicKey)) + "&State=" + url.QueryEscape(string(content.State_hash[:])) + if device.LogLevel.LogControl { + fmt.Println("Control: Download NhTable from :" + downloadurl) + } + client := http.Client{ + Timeout: 30 * time.Second, + } + resp, err := client.Get(downloadurl) + if err != nil { + device.log.Errorf(err.Error()) + return err + } + defer resp.Body.Close() + allbytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + device.log.Errorf(err.Error()) + return err + } + if device.LogLevel.LogControl { + fmt.Println("Control: Download NhTable result :" + string(allbytes)) + } + if err := json.Unmarshal(allbytes, &NhTable); err != nil { + device.log.Errorf(err.Error()) + return err + } + device.graph.SetNHTable(NhTable, content.State_hash) + } + return nil +} + +func (device *Device) process_UpdateErrorMsg(peer *Peer, content path.UpdateErrorMsg) error { + if peer.ID != config.SuperNodeMessage { + if device.LogLevel.LogControl { + fmt.Println("Control: Ignored UpdateErrorMsg. Not from supernode.") + } + return nil + } + device.log.Errorf(content.ToString()) + if content.Action == path.Shutdown { + device.closed <- struct{}{} + } else if content.Action == path.Panic { + panic(content.ToString()) + } + return nil +} + func (device *Device) RoutineSetEndpoint() { if !(device.DRoute.P2P.UseP2P || device.DRoute.SuperNode.UseSuperNode) { return @@ -516,49 +619,6 @@ func (device *Device) GeneratePingPacket(src_nodeID config.Vertex) ([]byte, erro return buf, nil } -func (device *Device) process_UpdateNhTableMsg(content path.UpdateNhTableMsg) error { - if device.DRoute.SuperNode.UseSuperNode { - if bytes.Equal(device.graph.NhTableHash[:], content.State_hash[:]) { - if device.LogLevel.LogControl { - fmt.Println("Control: Same nhTable Hash, skip download nhTable") - } - device.graph.NhTableExpire = time.Now().Add(device.graph.SuperNodeInfoTimeout) - return nil - } - var NhTable config.NextHopTable - if bytes.Equal(device.graph.NhTableHash[:], content.State_hash[:]) { - return nil - } - downloadurl := device.DRoute.SuperNode.APIUrl + "/nhtable?PubKey=" + url.QueryEscape(PubKey2Str(device.staticIdentity.publicKey)) + "&State=" + url.QueryEscape(string(content.State_hash[:])) - if device.LogLevel.LogControl { - fmt.Println("Control: Download NhTable from :" + downloadurl) - } - client := http.Client{ - Timeout: 30 * time.Second, - } - resp, err := client.Get(downloadurl) - if err != nil { - device.log.Errorf(err.Error()) - return err - } - defer resp.Body.Close() - allbytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - device.log.Errorf(err.Error()) - return err - } - if device.LogLevel.LogControl { - fmt.Println("Control: Download NhTable result :" + string(allbytes)) - } - if err := json.Unmarshal(allbytes, &NhTable); err != nil { - device.log.Errorf(err.Error()) - return err - } - device.graph.SetNHTable(NhTable, content.State_hash) - } - return nil -} - func (device *Device) process_RequestPeerMsg(content path.QueryPeerMsg) error { //Send all my peers to all my peers if device.DRoute.P2P.UseP2P { device.peers.RLock() diff --git a/example_config/super_mode/n1.yaml b/example_config/super_mode/n1.yaml index 31e3ecf..f25c42f 100644 --- a/example_config/super_mode/n1.yaml +++ b/example_config/super_mode/n1.yaml @@ -63,4 +63,4 @@ dynamicroute: - time.windows.com nexthoptable: {} resetconninterval: 86400 -peers: [] \ No newline at end of file +peers: [] diff --git a/main_super.go b/main_super.go index 16ac17b..bd43bbf 100644 --- a/main_super.go +++ b/main_super.go @@ -113,10 +113,11 @@ func Super(configPath string, useUAPI bool, printExample bool) (err error) { Event_server_NhTable_changed: make(chan struct{}, 1<<4), } - thetap, _ := tap.CreateDummyTAP() + thetap4, _ := tap.CreateDummyTAP() + thetap6, _ := tap.CreateDummyTAP() http_graph = path.NewGraph(3, true, sconfig.GraphRecalculateSetting, config.NTPinfo{}, sconfig.LogLevel.LogNTP) - http_device4 = device.NewDevice(thetap, config.SuperNodeMessage, conn.NewCustomBind(true, false), logger4, http_graph, true, configPath, nil, &sconfig, &super_chains) - http_device6 = device.NewDevice(thetap, config.SuperNodeMessage, conn.NewCustomBind(false, true), logger6, http_graph, true, configPath, nil, &sconfig, &super_chains) + http_device4 = device.NewDevice(thetap4, config.SuperNodeMessage, conn.NewCustomBind(true, false), logger4, http_graph, true, configPath, nil, &sconfig, &super_chains) + http_device6 = device.NewDevice(thetap6, config.SuperNodeMessage, conn.NewCustomBind(false, true), logger6, http_graph, true, configPath, nil, &sconfig, &super_chains) defer http_device4.Close() defer http_device6.Close() var sk [32]byte diff --git a/path/header.go b/path/header.go index 1732f60..a10816a 100644 --- a/path/header.go +++ b/path/header.go @@ -26,6 +26,8 @@ const ( PongPacket //Send to everyone, include server QueryPeer BoardcastPeer + + UpdateError ) func NewEgHeader(pac []byte) (e EgHeader, err error) { diff --git a/path/metamessage.go b/path/metamessage.go index 76ef170..d691420 100644 --- a/path/metamessage.go +++ b/path/metamessage.go @@ -21,14 +21,14 @@ func GetByte(structIn interface{}) (bb []byte, err error) { } type RegisterMsg struct { - Node_id config.Vertex `struc:"uint32"` + Node_id config.Vertex PeerStateHash [32]byte NhStateHash [32]byte Name string } func (c *RegisterMsg) ToString() string { - return "RegisterMsg Node_id:" + c.Node_id.ToString() + " Name:" + c.Name + " PeerHash" + base64.StdEncoding.EncodeToString(c.PeerStateHash[:]) + " NhHash:" + base64.StdEncoding.EncodeToString(c.NhStateHash[:]) + return "RegisterMsg Node_id:" + c.Node_id.ToString() + " Name:" + c.Name + " PeerHash:" + base64.StdEncoding.EncodeToString(c.PeerStateHash[:]) + " NhHash:" + base64.StdEncoding.EncodeToString(c.NhStateHash[:]) } func ParseRegisterMsg(bin []byte) (StructPlace RegisterMsg, err error) { @@ -39,8 +39,43 @@ func ParseRegisterMsg(bin []byte) (StructPlace RegisterMsg, err error) { return } +type ErrorAction int + +const ( + Shutdown ErrorAction = iota + Panic +) + +func (a *ErrorAction) ToString() string { + if *a == Shutdown { + return "shutdown" + } else if *a == Panic { + return "panic" + } + return "unknow" +} + +type UpdateErrorMsg struct { + Node_id config.Vertex + Action ErrorAction + ErrorCode int + ErrorMsg string +} + +func ParseUpdateErrorMsg(bin []byte) (StructPlace UpdateErrorMsg, err error) { + var b bytes.Buffer + b.Write(bin) + d := gob.NewDecoder(&b) + err = d.Decode(&StructPlace) + return +} + +func (c *UpdateErrorMsg) ToString() string { + return "UpdateErrorMsg Node_id:" + c.Node_id.ToString() + " Action:" + c.Action.ToString() + " ErrorCode:" + strconv.Itoa(c.ErrorCode) + " ErrorMsg " + c.ErrorMsg +} + type UpdatePeerMsg struct { - State_hash [32]byte `struc:"[32]uint8"` + State_hash [32]byte } func (c *UpdatePeerMsg) ToString() string { @@ -56,7 +91,7 @@ func ParseUpdatePeerMsg(bin []byte) (StructPlace UpdatePeerMsg, err error) { } type UpdateNhTableMsg struct { - State_hash [32]byte `struc:"[32]uint8"` + State_hash [32]byte } func (c *UpdateNhTableMsg) ToString() string { @@ -72,9 +107,9 @@ func ParseUpdateNhTableMsg(bin []byte) (StructPlace UpdateNhTableMsg, err error) } type PingMsg struct { - RequestID uint32 `struc:"uint32"` - Src_nodeID config.Vertex `struc:"uint32"` - Time time.Time `struc:"uint64"` + RequestID uint32 + Src_nodeID config.Vertex + Time time.Time } func (c *PingMsg) ToString() string { @@ -91,9 +126,9 @@ func ParsePingMsg(bin []byte) (StructPlace PingMsg, err error) { type PongMsg struct { RequestID uint32 - Src_nodeID config.Vertex `struc:"uint32"` - Dst_nodeID config.Vertex `struc:"uint32"` - Timediff time.Duration `struc:"int64"` + Src_nodeID config.Vertex + Dst_nodeID config.Vertex + Timediff time.Duration } func (c *PongMsg) ToString() string { @@ -109,7 +144,7 @@ func ParsePongMsg(bin []byte) (StructPlace PongMsg, err error) { } type QueryPeerMsg struct { - Request_ID uint32 `struc:"uint32"` + Request_ID uint32 } func (c *QueryPeerMsg) ToString() string { @@ -125,10 +160,10 @@ func ParseQueryPeerMsg(bin []byte) (StructPlace QueryPeerMsg, err error) { } type BoardcastPeerMsg struct { - Request_ID uint32 `struc:"uint32"` - NodeID config.Vertex `struc:"uint32"` - PubKey [32]byte `struc:"[32]uint8"` - PSKey [32]byte `struc:"[32]uint8"` + Request_ID uint32 + NodeID config.Vertex + PubKey [32]byte + PSKey [32]byte ConnURL string }