fix supernode ipv6 problem, add error message

This commit is contained in:
KusakabeSi 2021-09-20 16:27:53 +00:00
parent 612901c16a
commit 7951ba2f6a
8 changed files with 179 additions and 70 deletions

2
.vscode/launch.json vendored
View File

@ -12,7 +12,7 @@
"program": "${workspaceFolder}",
"buildFlags": "-tags=vpp",
"env": {"CGO_CFLAGS":"-I/usr/include/memif"},
"args":["-config","example_config/super_mode/n1.yaml","-mode","edge"/*,"-example"*/],
"args":["-config","example_config/super_mode/s1.yaml","-mode","super"/*,"-example"*/],
}
]
}

View File

@ -129,7 +129,13 @@ again:
// Attempt ipv6 bind, update port if successful.
sock6, newPort, err = create6(port)
if err != nil {
if originalPort == 0 && errors.Is(err, syscall.EADDRINUSE) && tries < 100 {
unix.Close(sock4)
tries++
goto again
}
if !errors.Is(err, syscall.EAFNOSUPPORT) {
unix.Close(sock4)
return nil, 0, err
}
} else {

View File

@ -549,13 +549,18 @@ func (peer *Peer) RoutineSequentialReceiver() {
fmt.Println("Normal: Reveived Normal packet From:" + peer.GetEndpointDstStr() + " SrcID:" + src_nodeID.ToString() + " DstID:" + dst_nodeID.ToString() + " Len:" + strconv.Itoa(len(elem.packet)))
}
if len(elem.packet) <= path.EgHeaderLen+12 {
device.log.Errorf("Invalid normal packet from peer %v", peer)
device.log.Errorf("Invalid normal packet from peer %v", peer.ID.ToString())
goto skip
}
src_macaddr := tap.GetSrcMacAddr(elem.packet[path.EgHeaderLen:])
if !tap.IsNotUnicast(src_macaddr) {
actual, loaded := device.l2fib.LoadOrStore(src_macaddr, src_nodeID)
if loaded {
if actual.(config.Vertex) != src_nodeID {
device.l2fib.Store(src_macaddr, src_nodeID) // Write to l2fib table
}
}
}
_, err = device.tap.device.Write(elem.buffer[:MessageTransportOffsetContent+len(elem.packet)], MessageTransportOffsetContent+path.EgHeaderLen)
if err != nil && !device.isClosed() {
device.log.Errorf("Failed to write packet to TUN device: %v", err)

View File

@ -20,6 +20,8 @@ import (
func (device *Device) SendPacket(peer *Peer, packet []byte, offset int) {
if peer == nil {
return
} else if peer.endpoint == nil {
return
}
if device.LogLevel.LogNormal {
EgHeader, _ := path.NewEgHeader(packet[:path.EgHeaderLen])
@ -118,7 +120,7 @@ func (device *Device) process_received(msg_type path.Usage, peer *Peer, body []b
switch msg_type {
case path.Register:
if content, err := path.ParseRegisterMsg(body); err == nil {
return device.server_process_RegisterMsg(content)
return device.server_process_RegisterMsg(peer, content)
}
case path.PongPacket:
if content, err := path.ParsePongMsg(body); err == nil {
@ -131,11 +133,15 @@ func (device *Device) process_received(msg_type path.Usage, peer *Peer, body []b
switch msg_type {
case path.UpdatePeer:
if content, err := path.ParseUpdatePeerMsg(body); err == nil {
go device.process_UpdatePeerMsg(content)
go device.process_UpdatePeerMsg(peer, content)
}
case path.UpdateNhTable:
if content, err := path.ParseUpdateNhTableMsg(body); err == nil {
go device.process_UpdateNhTableMsg(content)
go device.process_UpdateNhTableMsg(peer, content)
}
case path.UpdateError:
if content, err := path.ParseUpdateErrorMsg(body); err == nil {
device.process_UpdateErrorMsg(peer, content)
}
case path.PingPacket:
if content, err := path.ParsePingMsg(body); err == nil {
@ -174,6 +180,10 @@ func (device *Device) sprint_received(msg_type path.Usage, body []byte) (ret str
if content, err := path.ParseUpdateNhTableMsg(body); err == nil {
ret = content.ToString()
}
case path.UpdateError:
if content, err := path.ParseUpdateErrorMsg(body); err == nil {
ret = content.ToString()
}
case path.PingPacket:
if content, err := path.ParsePingMsg(body); err == nil {
ret = content.ToString()
@ -196,7 +206,29 @@ func (device *Device) sprint_received(msg_type path.Usage, body []byte) (ret str
return
}
func (device *Device) server_process_RegisterMsg(content path.RegisterMsg) error {
func (device *Device) server_process_RegisterMsg(peer *Peer, content path.RegisterMsg) error {
if peer.ID != content.Node_id {
UpdateErrorMsg := path.UpdateErrorMsg{
Node_id: peer.ID,
Action: path.Shutdown,
ErrorCode: 401,
ErrorMsg: "Your node ID is not match with our nodeID",
}
body, err := path.GetByte(&UpdateErrorMsg)
if err != nil {
return err
}
buf := make([]byte, path.EgHeaderLen+len(body))
header, err := path.NewEgHeader(buf[:path.EgHeaderLen])
header.SetSrc(device.ID)
header.SetTTL(200)
header.SetUsage(path.UpdateError)
header.SetPacketLength(uint16(len(body)))
copy(buf[path.EgHeaderLen:], body)
header.SetDst(config.ControlMessage)
device.SendPacket(peer, buf, MessageTransportOffsetContent)
return nil
}
device.Event_server_register <- content
return nil
}
@ -263,9 +295,15 @@ func (device *Device) process_pong(peer *Peer, content path.PongMsg) error {
return nil
}
func (device *Device) process_UpdatePeerMsg(content path.UpdatePeerMsg) error {
func (device *Device) process_UpdatePeerMsg(peer *Peer, content path.UpdatePeerMsg) error {
var send_signal bool
if device.DRoute.SuperNode.UseSuperNode {
if peer.ID != config.SuperNodeMessage {
if device.LogLevel.LogControl {
fmt.Println("Control: Ignored UpdateErrorMsg. Not from supernode.")
}
return nil
}
var peer_infos config.HTTP_Peers
if bytes.Equal(device.peers.Peer_state[:], content.State_hash[:]) {
if device.LogLevel.LogControl {
@ -343,6 +381,71 @@ func (device *Device) process_UpdatePeerMsg(content path.UpdatePeerMsg) error {
return nil
}
func (device *Device) process_UpdateNhTableMsg(peer *Peer, content path.UpdateNhTableMsg) error {
if device.DRoute.SuperNode.UseSuperNode {
if peer.ID != config.SuperNodeMessage {
if device.LogLevel.LogControl {
fmt.Println("Control: Ignored UpdateErrorMsg. Not from supernode.")
}
return nil
}
if bytes.Equal(device.graph.NhTableHash[:], content.State_hash[:]) {
if device.LogLevel.LogControl {
fmt.Println("Control: Same nhTable Hash, skip download nhTable")
}
device.graph.NhTableExpire = time.Now().Add(device.graph.SuperNodeInfoTimeout)
return nil
}
var NhTable config.NextHopTable
if bytes.Equal(device.graph.NhTableHash[:], content.State_hash[:]) {
return nil
}
downloadurl := device.DRoute.SuperNode.APIUrl + "/nhtable?PubKey=" + url.QueryEscape(PubKey2Str(device.staticIdentity.publicKey)) + "&State=" + url.QueryEscape(string(content.State_hash[:]))
if device.LogLevel.LogControl {
fmt.Println("Control: Download NhTable from :" + downloadurl)
}
client := http.Client{
Timeout: 30 * time.Second,
}
resp, err := client.Get(downloadurl)
if err != nil {
device.log.Errorf(err.Error())
return err
}
defer resp.Body.Close()
allbytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
device.log.Errorf(err.Error())
return err
}
if device.LogLevel.LogControl {
fmt.Println("Control: Download NhTable result :" + string(allbytes))
}
if err := json.Unmarshal(allbytes, &NhTable); err != nil {
device.log.Errorf(err.Error())
return err
}
device.graph.SetNHTable(NhTable, content.State_hash)
}
return nil
}
func (device *Device) process_UpdateErrorMsg(peer *Peer, content path.UpdateErrorMsg) error {
if peer.ID != config.SuperNodeMessage {
if device.LogLevel.LogControl {
fmt.Println("Control: Ignored UpdateErrorMsg. Not from supernode.")
}
return nil
}
device.log.Errorf(content.ToString())
if content.Action == path.Shutdown {
device.closed <- struct{}{}
} else if content.Action == path.Panic {
panic(content.ToString())
}
return nil
}
func (device *Device) RoutineSetEndpoint() {
if !(device.DRoute.P2P.UseP2P || device.DRoute.SuperNode.UseSuperNode) {
return
@ -516,49 +619,6 @@ func (device *Device) GeneratePingPacket(src_nodeID config.Vertex) ([]byte, erro
return buf, nil
}
func (device *Device) process_UpdateNhTableMsg(content path.UpdateNhTableMsg) error {
if device.DRoute.SuperNode.UseSuperNode {
if bytes.Equal(device.graph.NhTableHash[:], content.State_hash[:]) {
if device.LogLevel.LogControl {
fmt.Println("Control: Same nhTable Hash, skip download nhTable")
}
device.graph.NhTableExpire = time.Now().Add(device.graph.SuperNodeInfoTimeout)
return nil
}
var NhTable config.NextHopTable
if bytes.Equal(device.graph.NhTableHash[:], content.State_hash[:]) {
return nil
}
downloadurl := device.DRoute.SuperNode.APIUrl + "/nhtable?PubKey=" + url.QueryEscape(PubKey2Str(device.staticIdentity.publicKey)) + "&State=" + url.QueryEscape(string(content.State_hash[:]))
if device.LogLevel.LogControl {
fmt.Println("Control: Download NhTable from :" + downloadurl)
}
client := http.Client{
Timeout: 30 * time.Second,
}
resp, err := client.Get(downloadurl)
if err != nil {
device.log.Errorf(err.Error())
return err
}
defer resp.Body.Close()
allbytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
device.log.Errorf(err.Error())
return err
}
if device.LogLevel.LogControl {
fmt.Println("Control: Download NhTable result :" + string(allbytes))
}
if err := json.Unmarshal(allbytes, &NhTable); err != nil {
device.log.Errorf(err.Error())
return err
}
device.graph.SetNHTable(NhTable, content.State_hash)
}
return nil
}
func (device *Device) process_RequestPeerMsg(content path.QueryPeerMsg) error { //Send all my peers to all my peers
if device.DRoute.P2P.UseP2P {
device.peers.RLock()

View File

@ -113,10 +113,11 @@ func Super(configPath string, useUAPI bool, printExample bool) (err error) {
Event_server_NhTable_changed: make(chan struct{}, 1<<4),
}
thetap, _ := tap.CreateDummyTAP()
thetap4, _ := tap.CreateDummyTAP()
thetap6, _ := tap.CreateDummyTAP()
http_graph = path.NewGraph(3, true, sconfig.GraphRecalculateSetting, config.NTPinfo{}, sconfig.LogLevel.LogNTP)
http_device4 = device.NewDevice(thetap, config.SuperNodeMessage, conn.NewCustomBind(true, false), logger4, http_graph, true, configPath, nil, &sconfig, &super_chains)
http_device6 = device.NewDevice(thetap, config.SuperNodeMessage, conn.NewCustomBind(false, true), logger6, http_graph, true, configPath, nil, &sconfig, &super_chains)
http_device4 = device.NewDevice(thetap4, config.SuperNodeMessage, conn.NewCustomBind(true, false), logger4, http_graph, true, configPath, nil, &sconfig, &super_chains)
http_device6 = device.NewDevice(thetap6, config.SuperNodeMessage, conn.NewCustomBind(false, true), logger6, http_graph, true, configPath, nil, &sconfig, &super_chains)
defer http_device4.Close()
defer http_device6.Close()
var sk [32]byte

View File

@ -26,6 +26,8 @@ const (
PongPacket //Send to everyone, include server
QueryPeer
BoardcastPeer
UpdateError
)
func NewEgHeader(pac []byte) (e EgHeader, err error) {

View File

@ -21,14 +21,14 @@ func GetByte(structIn interface{}) (bb []byte, err error) {
}
type RegisterMsg struct {
Node_id config.Vertex `struc:"uint32"`
Node_id config.Vertex
PeerStateHash [32]byte
NhStateHash [32]byte
Name string
}
func (c *RegisterMsg) ToString() string {
return "RegisterMsg Node_id:" + c.Node_id.ToString() + " Name:" + c.Name + " PeerHash" + base64.StdEncoding.EncodeToString(c.PeerStateHash[:]) + " NhHash:" + base64.StdEncoding.EncodeToString(c.NhStateHash[:])
return "RegisterMsg Node_id:" + c.Node_id.ToString() + " Name:" + c.Name + " PeerHash:" + base64.StdEncoding.EncodeToString(c.PeerStateHash[:]) + " NhHash:" + base64.StdEncoding.EncodeToString(c.NhStateHash[:])
}
func ParseRegisterMsg(bin []byte) (StructPlace RegisterMsg, err error) {
@ -39,8 +39,43 @@ func ParseRegisterMsg(bin []byte) (StructPlace RegisterMsg, err error) {
return
}
type ErrorAction int
const (
Shutdown ErrorAction = iota
Panic
)
func (a *ErrorAction) ToString() string {
if *a == Shutdown {
return "shutdown"
} else if *a == Panic {
return "panic"
}
return "unknow"
}
type UpdateErrorMsg struct {
Node_id config.Vertex
Action ErrorAction
ErrorCode int
ErrorMsg string
}
func ParseUpdateErrorMsg(bin []byte) (StructPlace UpdateErrorMsg, err error) {
var b bytes.Buffer
b.Write(bin)
d := gob.NewDecoder(&b)
err = d.Decode(&StructPlace)
return
}
func (c *UpdateErrorMsg) ToString() string {
return "UpdateErrorMsg Node_id:" + c.Node_id.ToString() + " Action:" + c.Action.ToString() + " ErrorCode:" + strconv.Itoa(c.ErrorCode) + " ErrorMsg " + c.ErrorMsg
}
type UpdatePeerMsg struct {
State_hash [32]byte `struc:"[32]uint8"`
State_hash [32]byte
}
func (c *UpdatePeerMsg) ToString() string {
@ -56,7 +91,7 @@ func ParseUpdatePeerMsg(bin []byte) (StructPlace UpdatePeerMsg, err error) {
}
type UpdateNhTableMsg struct {
State_hash [32]byte `struc:"[32]uint8"`
State_hash [32]byte
}
func (c *UpdateNhTableMsg) ToString() string {
@ -72,9 +107,9 @@ func ParseUpdateNhTableMsg(bin []byte) (StructPlace UpdateNhTableMsg, err error)
}
type PingMsg struct {
RequestID uint32 `struc:"uint32"`
Src_nodeID config.Vertex `struc:"uint32"`
Time time.Time `struc:"uint64"`
RequestID uint32
Src_nodeID config.Vertex
Time time.Time
}
func (c *PingMsg) ToString() string {
@ -91,9 +126,9 @@ func ParsePingMsg(bin []byte) (StructPlace PingMsg, err error) {
type PongMsg struct {
RequestID uint32
Src_nodeID config.Vertex `struc:"uint32"`
Dst_nodeID config.Vertex `struc:"uint32"`
Timediff time.Duration `struc:"int64"`
Src_nodeID config.Vertex
Dst_nodeID config.Vertex
Timediff time.Duration
}
func (c *PongMsg) ToString() string {
@ -109,7 +144,7 @@ func ParsePongMsg(bin []byte) (StructPlace PongMsg, err error) {
}
type QueryPeerMsg struct {
Request_ID uint32 `struc:"uint32"`
Request_ID uint32
}
func (c *QueryPeerMsg) ToString() string {
@ -125,10 +160,10 @@ func ParseQueryPeerMsg(bin []byte) (StructPlace QueryPeerMsg, err error) {
}
type BoardcastPeerMsg struct {
Request_ID uint32 `struc:"uint32"`
NodeID config.Vertex `struc:"uint32"`
PubKey [32]byte `struc:"[32]uint8"`
PSKey [32]byte `struc:"[32]uint8"`
Request_ID uint32
NodeID config.Vertex
PubKey [32]byte
PSKey [32]byte
ConnURL string
}