From c239162159cf83803a7f97027f512e28c04196c0 Mon Sep 17 00:00:00 2001 From: Kusakabe Si Date: Sat, 4 Dec 2021 02:32:59 +0000 Subject: [PATCH] SuperParams --- device/device.go | 33 +-- device/receivesendproc.go | 328 ++++++++++++++++++++---------- example_config/super_mode/n1.yaml | 4 +- example_config/super_mode/n2.yaml | 4 +- example_config/super_mode/s1.yaml | 10 +- main_edge.go | 14 +- main_httpserver.go | 290 +++++++++++++++++--------- main_super.go | 132 +++++++++--- mtypes/config.go | 21 +- mtypes/metamessage.go | 82 +++----- path/header.go | 6 +- path/path.go | 15 +- 12 files changed, 608 insertions(+), 331 deletions(-) diff --git a/device/device.go b/device/device.go index 36ea4d0..181742e 100644 --- a/device/device.go +++ b/device/device.go @@ -70,10 +70,12 @@ type Device struct { keyMap map[NoisePublicKey]*Peer IDMap map[mtypes.Vertex]*Peer SuperPeer map[NoisePublicKey]*Peer - Peer_state [32]byte LocalV4 net.IP LocalV6 net.IP } + + state_hashes mtypes.StateHash + event_tryendpoint chan struct{} EdgeConfigPath string @@ -81,11 +83,12 @@ type Device struct { SuperConfigPath string SuperConfig *mtypes.SuperConfig - Event_server_register chan mtypes.RegisterMsg - Event_server_pong chan mtypes.PongMsg - Event_save_config chan struct{} - - Event_Supernode_OK chan struct{} + Chan_server_register chan mtypes.RegisterMsg + Chan_server_pong chan mtypes.PongMsg + Chan_save_config chan struct{} + Chan_Supernode_OK chan struct{} + Chan_SendPingStart chan struct{} + Chan_HttpPostStart chan struct{} indexTable IndexTable cookieChecker CookieChecker @@ -341,6 +344,10 @@ func NewDevice(tapDevice tap.Device, id mtypes.Vertex, bind conn.Bind, logger *L device.Version = version device.JWTSecret = mtypes.ByteSlice2Byte32(mtypes.RandomBytes(32, []byte(fmt.Sprintf("%v", time.Now())))) + device.state_hashes.NhTable.Store("") + device.state_hashes.Peer.Store("") + device.state_hashes.SuperParam.Store("") + device.rate.limiter.Init() device.indexTable.Init() device.PopulatePools() @@ -348,8 +355,8 @@ func NewDevice(tapDevice tap.Device, id mtypes.Vertex, bind conn.Bind, logger *L device.SuperConfigPath = configpath device.SuperConfig = sconfig device.EdgeConfig = &mtypes.EdgeConfig{} - device.Event_server_pong = superevents.Event_server_pong - device.Event_server_register = superevents.Event_server_register + device.Chan_server_pong = superevents.Event_server_pong + device.Chan_server_register = superevents.Event_server_register device.LogLevel = sconfig.LogLevel } else { device.EdgeConfigPath = configpath @@ -357,19 +364,21 @@ func NewDevice(tapDevice tap.Device, id mtypes.Vertex, bind conn.Bind, logger *L device.SuperConfig = &mtypes.SuperConfig{} device.DupData = *fixed_time_cache.NewCache(mtypes.S2TD(econfig.DynamicRoute.DupCheckTimeout), false, mtypes.S2TD(60)) device.event_tryendpoint = make(chan struct{}, 1<<6) - device.Event_save_config = make(chan struct{}, 1<<5) - device.Event_Supernode_OK = make(chan struct{}, 4) + device.Chan_save_config = make(chan struct{}, 1<<5) + device.Chan_Supernode_OK = make(chan struct{}, 1<<5) + device.Chan_SendPingStart = make(chan struct{}, 1<<5) + device.Chan_HttpPostStart = make(chan struct{}, 1<<5) device.LogLevel = econfig.LogLevel go device.RoutineSetEndpoint() go device.RoutineDetectOfflineAndTryNextEndpoint() go device.RoutineRegister() - go device.RoutineSendPing() + go device.RoutineSendPing(device.Chan_SendPingStart) go device.RoutineSpreadAllMyNeighbor() go device.RoutineResetConn() go device.RoutineClearL2FIB() go device.RoutineRecalculateNhTable() - go device.RoutinePostPeerInfo() + go device.RoutinePostPeerInfo(device.Chan_HttpPostStart) } // create queues diff --git a/device/receivesendproc.go b/device/receivesendproc.go index 4220bbe..cc13a50 100644 --- a/device/receivesendproc.go +++ b/device/receivesendproc.go @@ -10,7 +10,6 @@ import ( "io/ioutil" "net" "net/http" - "net/url" "strconv" "strings" "syscall" @@ -147,17 +146,9 @@ func (device *Device) process_received(msg_type path.Usage, peer *Peer, body []b } } else { switch msg_type { - case path.UpdatePeer: - if content, err := mtypes.ParseUpdatePeerMsg(body); err == nil { - go device.process_UpdatePeerMsg(peer, content) - } - case path.UpdateNhTable: - if content, err := mtypes.ParseUpdateNhTableMsg(body); err == nil { - go device.process_UpdateNhTableMsg(peer, content) - } - case path.UpdateError: - if content, err := mtypes.ParseUpdateErrorMsg(body); err == nil { - device.process_UpdateErrorMsg(peer, content) + case path.ServerUpdate: + if content, err := mtypes.ParseServerUpdateMsg(body); err == nil { + device.process_ServerUpdateMsg(peer, content) } case path.PingPacket: if content, err := mtypes.ParsePingMsg(body); err == nil { @@ -189,21 +180,11 @@ func (device *Device) sprint_received(msg_type path.Usage, body []byte) string { return content.ToString() } return "RegisterMsg: Parse failed" - case path.UpdatePeer: - if content, err := mtypes.ParseUpdatePeerMsg(body); err == nil { + case path.ServerUpdate: + if content, err := mtypes.ParseServerUpdateMsg(body); err == nil { return content.ToString() } - return "UpdatePeerMsg: Parse failed" - case path.UpdateNhTable: - if content, err := mtypes.ParseUpdateNhTableMsg(body); err == nil { - return content.ToString() - } - return "UpdateNhTableMsg: Parse failed" - case path.UpdateError: - if content, err := mtypes.ParseUpdateErrorMsg(body); err == nil { - return content.ToString() - } - return "UpdateErrorMsg: Parse failed" + return "ServerUpdate: Parse failed" case path.PingPacket: if content, err := mtypes.ParsePingMsg(body); err == nil { return content.ToString() @@ -270,30 +251,30 @@ func compareVersion(v1 string, v2 string) bool { } func (device *Device) server_process_RegisterMsg(peer *Peer, content mtypes.RegisterMsg) error { - UpdateErrorMsg := mtypes.ServerCommandMsg{ - Node_id: peer.ID, - Action: mtypes.NoAction, - ErrorCode: 0, - ErrorMsg: "", + ServerUpdateMsg := mtypes.ServerUpdateMsg{ + Node_id: peer.ID, + Action: mtypes.NoAction, + Code: 0, + Params: "", } if peer.ID != content.Node_id { - UpdateErrorMsg = mtypes.ServerCommandMsg{ - Node_id: peer.ID, - Action: mtypes.ThrowError, - ErrorCode: int(syscall.EPERM), - ErrorMsg: fmt.Sprintf("Your nodeID: %v is not match with registered nodeID: %v", content.Node_id, peer.ID), + ServerUpdateMsg = mtypes.ServerUpdateMsg{ + Node_id: peer.ID, + Action: mtypes.ThrowError, + Code: int(syscall.EPERM), + Params: fmt.Sprintf("Your nodeID: %v is not match with registered nodeID: %v", content.Node_id, peer.ID), } } if compareVersion(content.Version, device.Version) == false { - UpdateErrorMsg = mtypes.ServerCommandMsg{ - Node_id: peer.ID, - Action: mtypes.ThrowError, - ErrorCode: int(syscall.ENOSYS), - ErrorMsg: fmt.Sprintf("Your version: \"%v\" is not compatible with our version: \"%v\"", content.Version, device.Version), + ServerUpdateMsg = mtypes.ServerUpdateMsg{ + Node_id: peer.ID, + Action: mtypes.ThrowError, + Code: int(syscall.ENOSYS), + Params: fmt.Sprintf("Your version: \"%v\" is not compatible with our version: \"%v\"", content.Version, device.Version), } } - if UpdateErrorMsg.Action != mtypes.NoAction { - body, err := mtypes.GetByte(&UpdateErrorMsg) + if ServerUpdateMsg.Action != mtypes.NoAction { + body, err := mtypes.GetByte(&ServerUpdateMsg) if err != nil { return err } @@ -304,15 +285,15 @@ func (device *Device) server_process_RegisterMsg(peer *Peer, content mtypes.Regi header.SetPacketLength(uint16(len(body))) copy(buf[path.EgHeaderLen:], body) header.SetDst(mtypes.SuperNodeMessage) - device.SendPacket(peer, path.UpdateError, buf, MessageTransportOffsetContent) + device.SendPacket(peer, path.ServerUpdate, buf, MessageTransportOffsetContent) return nil } - device.Event_server_register <- content + device.Chan_server_register <- content return nil } func (device *Device) server_process_Pong(peer *Peer, content mtypes.PongMsg) error { - device.Event_server_pong <- content + device.Chan_server_pong <- content return nil } @@ -377,31 +358,31 @@ func (device *Device) process_pong(peer *Peer, content mtypes.PongMsg) error { return nil } -func (device *Device) process_UpdatePeerMsg(peer *Peer, content mtypes.UpdatePeerMsg) error { +func (device *Device) process_UpdatePeerMsg(peer *Peer, State_hash string) error { var send_signal bool if device.EdgeConfig.DynamicRoute.SuperNode.UseSuperNode { - if peer.ID != mtypes.SuperNodeMessage { + if device.state_hashes.Peer.Load().(string) == State_hash { if device.LogLevel.LogControl { - fmt.Println("Control: Ignored UpdateErrorMsg. Not from supernode.") - } - return nil - } - if bytes.Equal(device.peers.Peer_state[:], content.State_hash[:]) { - if device.LogLevel.LogControl { - fmt.Println("Control: Same PeerState Hash, skip download nhTable") + fmt.Println("Control: Same Hash, skip download PeerInfo") } return nil } var peer_infos mtypes.API_Peers - - downloadurl := device.EdgeConfig.DynamicRoute.SuperNode.APIUrl + "/peerinfo?NodeID=" + strconv.Itoa(int(device.ID)) + "&PubKey=" + url.QueryEscape(device.staticIdentity.publicKey.ToString()) + "&State=" + url.QueryEscape(string(content.State_hash[:])) - if device.LogLevel.LogControl { - fmt.Println("Control: Download peerinfo from :" + downloadurl) - } + // client := http.Client{ - Timeout: 30 * time.Second, + Timeout: 8 * time.Second, } - resp, err := client.Get(downloadurl) + downloadurl := device.EdgeConfig.DynamicRoute.SuperNode.APIUrl + "/peerinfo" //////////////////////////////////////////////////////////////////////////////////////////////// + req, err := http.NewRequest("GET", downloadurl, nil) + q := req.URL.Query() + q.Add("NodeID", device.ID.ToString()) + q.Add("PubKey", device.staticIdentity.publicKey.ToString()) + q.Add("State", State_hash) + req.URL.RawQuery = q.Encode() + if device.LogLevel.LogControl { + fmt.Println("Control: Download PeerInfo from :" + req.URL.RequestURI()) + } + resp, err := client.Do(req) if err != nil { device.log.Errorf(err.Error()) return err @@ -413,7 +394,7 @@ func (device *Device) process_UpdatePeerMsg(peer *Peer, content mtypes.UpdatePee return err } if resp.StatusCode != 200 { - device.log.Errorf("Control: Download peerinfo result failed: " + strconv.Itoa(resp.StatusCode) + " " + string(allbytes)) + device.log.Errorf("Control: Download peerinfo failed: " + strconv.Itoa(resp.StatusCode) + " " + string(allbytes)) return nil } if device.LogLevel.LogControl { @@ -482,7 +463,7 @@ func (device *Device) process_UpdatePeerMsg(peer *Peer, content mtypes.UpdatePee send_signal = true } } - device.peers.Peer_state = content.State_hash + device.state_hashes.Peer.Store(State_hash) if send_signal { device.event_tryendpoint <- struct{}{} } @@ -490,33 +471,31 @@ func (device *Device) process_UpdatePeerMsg(peer *Peer, content mtypes.UpdatePee return nil } -func (device *Device) process_UpdateNhTableMsg(peer *Peer, content mtypes.UpdateNhTableMsg) error { +func (device *Device) process_UpdateNhTableMsg(peer *Peer, State_hash string) error { if device.EdgeConfig.DynamicRoute.SuperNode.UseSuperNode { - if peer.ID != mtypes.SuperNodeMessage { + if device.state_hashes.NhTable.Load().(string) == State_hash { if device.LogLevel.LogControl { - fmt.Println("Control: Ignored UpdateErrorMsg. Not from supernode.") - } - return nil - } - if bytes.Equal(device.graph.NhTableHash[:], content.State_hash[:]) { - if device.LogLevel.LogControl { - fmt.Println("Control: Same nhTable Hash, skip download nhTable") + fmt.Println("Control: Same Hash, skip download nhTable") } device.graph.NhTableExpire = time.Now().Add(device.graph.SuperNodeInfoTimeout) return nil } var NhTable mtypes.NextHopTable - if bytes.Equal(device.graph.NhTableHash[:], content.State_hash[:]) { - return nil + // Download from supernode + client := &http.Client{ + Timeout: 8 * time.Second, } - downloadurl := device.EdgeConfig.DynamicRoute.SuperNode.APIUrl + "/nhtable?NodeID=" + strconv.Itoa(int(device.ID)) + "&PubKey=" + url.QueryEscape(device.staticIdentity.publicKey.ToString()) + "&State=" + url.QueryEscape(string(content.State_hash[:])) + downloadurl := device.EdgeConfig.DynamicRoute.SuperNode.APIUrl + "/nhtable" //////////////////////////////////////////////////////////////////////////////////////////////// + req, err := http.NewRequest("GET", downloadurl, nil) + q := req.URL.Query() + q.Add("NodeID", device.ID.ToString()) + q.Add("PubKey", device.staticIdentity.publicKey.ToString()) + q.Add("State", State_hash) + req.URL.RawQuery = q.Encode() if device.LogLevel.LogControl { - fmt.Println("Control: Download NhTable from :" + downloadurl) + fmt.Println("Control: Download NhTable from :" + req.URL.RequestURI()) } - client := http.Client{ - Timeout: 30 * time.Second, - } - resp, err := client.Get(downloadurl) + resp, err := client.Do(req) if err != nil { device.log.Errorf(err.Error()) return err @@ -528,7 +507,7 @@ func (device *Device) process_UpdateNhTableMsg(peer *Peer, content mtypes.Update return err } if resp.StatusCode != 200 { - device.log.Errorf("Control: Download peerinfo result failed: " + strconv.Itoa(resp.StatusCode) + " " + string(allbytes)) + device.log.Errorf("Control: Download NhTable failed: " + strconv.Itoa(resp.StatusCode) + " " + string(allbytes)) return nil } if device.LogLevel.LogControl { @@ -538,25 +517,120 @@ func (device *Device) process_UpdateNhTableMsg(peer *Peer, content mtypes.Update device.log.Errorf("JSON decode error:", err.Error()) return err } - device.graph.SetNHTable(NhTable, content.State_hash) + device.graph.SetNHTable(NhTable) + device.state_hashes.NhTable.Store(State_hash) } return nil } -func (device *Device) process_UpdateErrorMsg(peer *Peer, content mtypes.ServerCommandMsg) error { +func (device *Device) process_UpdateSuperParamsMsg(peer *Peer, State_hash string) error { + if device.EdgeConfig.DynamicRoute.SuperNode.UseSuperNode { + if device.state_hashes.SuperParam.Load().(string) == State_hash { + if device.LogLevel.LogControl { + fmt.Println("Control: Same Hash, skip download SuperParams") + } + device.graph.NhTableExpire = time.Now().Add(device.graph.SuperNodeInfoTimeout) + return nil + } + var SuperParams mtypes.API_SuperParams + client := &http.Client{ + Timeout: 8 * time.Second, + } + downloadurl := device.EdgeConfig.DynamicRoute.SuperNode.APIUrl + "/superparams" //////////////////////////////////////////////////////////////////////////////////////////////// + req, err := http.NewRequest("GET", downloadurl, nil) + q := req.URL.Query() + q.Add("NodeID", device.ID.ToString()) + q.Add("PubKey", device.staticIdentity.publicKey.ToString()) + q.Add("State", State_hash) + req.URL.RawQuery = q.Encode() + if device.LogLevel.LogControl { + fmt.Println("Control: Download SuperParams from :" + req.URL.RequestURI()) + } + resp, err := client.Do(req) + if err != nil { + device.log.Errorf(err.Error()) + return err + } + defer resp.Body.Close() + allbytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + device.log.Errorf(err.Error()) + return err + } + if resp.StatusCode != 200 { + device.log.Errorf("Control: Download SuperParams failed: " + strconv.Itoa(resp.StatusCode) + " " + string(allbytes)) + return nil + } + if device.LogLevel.LogControl { + fmt.Println("Control: Download SuperParams result :" + string(allbytes)) + } + if err := json.Unmarshal(allbytes, &SuperParams); err != nil { + device.log.Errorf("JSON decode error:", err.Error()) + return err + } + if SuperParams.PeerAliveTimeout <= 0 { + device.log.Errorf("SuperParams.PeerAliveTimeout <= 0: %v", SuperParams.PeerAliveTimeout) + return fmt.Errorf("SuperParams.PeerAliveTimeout <= 0: %v", SuperParams.PeerAliveTimeout) + } + if SuperParams.SendPingInterval <= 0 { + device.log.Errorf("SuperParams.SendPingInterval <= 0: %v", SuperParams.SendPingInterval) + return fmt.Errorf("SuperParams.SendPingInterval <= 0: %v", SuperParams.SendPingInterval) + } + if SuperParams.HttpPostInterval <= 0 { + device.log.Errorf("SuperParams.HttpPostInterval <= 0: %v", SuperParams.HttpPostInterval) + return fmt.Errorf("SuperParams.HttpPostInterval <= 0: %v", SuperParams.HttpPostInterval) + } + + device.EdgeConfig.DynamicRoute.PeerAliveTimeout = SuperParams.PeerAliveTimeout + + if device.EdgeConfig.DynamicRoute.SendPingInterval <= 0 { + device.EdgeConfig.DynamicRoute.SendPingInterval = SuperParams.SendPingInterval + device.Chan_SendPingStart <- struct{}{} + } else { + device.EdgeConfig.DynamicRoute.SendPingInterval = SuperParams.SendPingInterval + } + + if device.SuperConfig.HttpPostInterval <= 0 { + device.SuperConfig.HttpPostInterval = SuperParams.HttpPostInterval + device.Chan_HttpPostStart <- struct{}{} + } else { + device.SuperConfig.HttpPostInterval = SuperParams.HttpPostInterval + } + + if SuperParams.AdditionalCost >= 0 { + device.EdgeConfig.DynamicRoute.AdditionalCost = SuperParams.AdditionalCost + } + + device.state_hashes.SuperParam.Store(State_hash) + } + return nil +} + +func (device *Device) process_ServerUpdateMsg(peer *Peer, content mtypes.ServerUpdateMsg) error { if peer.ID != mtypes.SuperNodeMessage { if device.LogLevel.LogControl { fmt.Println("Control: Ignored UpdateErrorMsg. Not from supernode.") } return nil } - device.log.Errorf(strconv.Itoa(int(content.ErrorCode)) + ": " + content.ErrorMsg) - if content.Action == mtypes.Shutdown { + + switch content.Action { + case mtypes.Shutdown: device.closed <- 0 - } else if content.Action == mtypes.ThrowError { - device.closed <- content.ErrorCode - } else if content.Action == mtypes.Panic { + case mtypes.ThrowError: + device.log.Errorf(strconv.Itoa(int(content.Code)) + ": " + content.Params) + device.closed <- content.Code + case mtypes.Panic: + device.log.Errorf(strconv.Itoa(int(content.Code)) + ": " + content.Params) panic(content.ToString()) + case mtypes.UpdateNhTable: + return device.process_UpdateNhTableMsg(peer, content.Params) + case mtypes.UpdatePeer: + return device.process_UpdatePeerMsg(peer, content.Params) + case mtypes.UpdateSuperParams: + return device.process_UpdateSuperParamsMsg(peer, content.Params) + default: + device.log.Errorf("Unknown Action: %v", content.ToString()) } return nil } @@ -699,15 +773,32 @@ func (device *Device) RoutineDetectOfflineAndTryNextEndpoint() { } } -func (device *Device) RoutineSendPing() { +func (device *Device) RoutineSendPing(startchan <-chan struct{}) { if !(device.EdgeConfig.DynamicRoute.P2P.UseP2P || device.EdgeConfig.DynamicRoute.SuperNode.UseSuperNode) { return } - timeout := mtypes.S2TD(device.EdgeConfig.DynamicRoute.SendPingInterval) + waitchan := make(<-chan time.Time) for { + if device.EdgeConfig.DynamicRoute.SendPingInterval > 0 { + waitchan = time.After(mtypes.S2TD(device.EdgeConfig.DynamicRoute.SendPingInterval)) + } else { + waitchan = make(<-chan time.Time) + } + select { + case <-startchan: + if device.LogLevel.LogControl { + fmt.Println("Control: Start RoutineSendPing()") + } + for len(startchan) > 0 { + <-startchan + } + case <-waitchan: + if device.LogLevel.LogControl { + fmt.Println("Control: Start RoutineSendPing() by timer") + } + } packet, usage, _ := device.GeneratePingPacket(device.ID, 0) device.SpreadPacket(make(map[mtypes.Vertex]bool), usage, packet, MessageTransportOffsetContent) - time.Sleep(timeout) } } @@ -716,15 +807,19 @@ func (device *Device) RoutineRegister() { return } timeout := mtypes.S2TD(device.EdgeConfig.DynamicRoute.SendPingInterval) - _ = <-device.Event_Supernode_OK + _ = <-device.Chan_Supernode_OK for { + local_PeerStateHash := device.state_hashes.Peer.Load().(string) + local_NhTableHash := device.state_hashes.NhTable.Load().(string) + local_SuperParamState := device.state_hashes.SuperParam.Load().(string) body, _ := mtypes.GetByte(mtypes.RegisterMsg{ - Node_id: device.ID, - PeerStateHash: device.peers.Peer_state, - NhStateHash: device.graph.NhTableHash, - Version: device.Version, - JWTSecret: device.JWTSecret, - HttpPostCount: device.HttpPostCount, + Node_id: device.ID, + PeerStateHash: local_PeerStateHash, + NhStateHash: local_NhTableHash, + SuperParamStateHash: local_SuperParamState, + Version: device.Version, + JWTSecret: device.JWTSecret, + HttpPostCount: device.HttpPostCount, }) buf := make([]byte, path.EgHeaderLen+len(body)) header, _ := path.NewEgHeader(buf[0:path.EgHeaderLen]) @@ -738,15 +833,28 @@ func (device *Device) RoutineRegister() { } } -func (device *Device) RoutinePostPeerInfo() { +func (device *Device) RoutinePostPeerInfo(startchan <-chan struct{}) { if !(device.EdgeConfig.DynamicRoute.SuperNode.UseSuperNode) { return } - if device.EdgeConfig.DynamicRoute.SuperNode.HttpPostInterval <= 0 { - return - } - timeout := mtypes.S2TD(device.EdgeConfig.DynamicRoute.SuperNode.HttpPostInterval) + waitchan := make(<-chan time.Time) for { + if device.SuperConfig.HttpPostInterval > 0 { + waitchan = time.After(mtypes.S2TD(device.SuperConfig.HttpPostInterval)) + } else { + waitchan = make(<-chan time.Time) + } + select { + case <-waitchan: + break + case <-startchan: + if device.LogLevel.LogControl { + fmt.Println("Control: Start RoutinePostPeerInfo()") + } + for len(startchan) > 0 { + <-startchan + } + } // Stat all latency device.peers.RLock() pongs := make([]mtypes.PongMsg, 0, len(device.peers.IDMap)) @@ -802,17 +910,21 @@ func (device *Device) RoutinePostPeerInfo() { }) tokenString, err := token.SignedString(device.JWTSecret[:]) // Construct post request - client := &http.Client{} + client := &http.Client{ + Timeout: 8 * time.Second, + } downloadurl := device.EdgeConfig.DynamicRoute.SuperNode.APIUrl + "/post/nodeinfo" req, err := http.NewRequest("POST", downloadurl, bytes.NewReader(body)) q := req.URL.Query() q.Add("NodeID", device.ID.ToString()) + q.Add("PubKey", device.staticIdentity.publicKey.ToString()) q.Add("JWTSig", tokenString) req.URL.RawQuery = q.Encode() - req.Header.Set("Content-Type", "application/binary") + req.Header.Set("Content-Type", "application/octet-stream") + req.Header.Set("Content-Encoding", "gzip") device.HttpPostCount += 1 if device.LogLevel.LogControl { - fmt.Println("Control: Post to " + req.URL.String()) + fmt.Println("Control: Post to " + req.URL.RequestURI()) } resp, err := client.Do(req) if err != nil { @@ -824,8 +936,6 @@ func (device *Device) RoutinePostPeerInfo() { } resp.Body.Close() } - - time.Sleep(timeout) } } @@ -833,7 +943,7 @@ func (device *Device) RoutineRecalculateNhTable() { if device.graph.TimeoutCheckInterval == 0 { return } - + if !device.EdgeConfig.DynamicRoute.P2P.UseP2P { return } diff --git a/example_config/super_mode/n1.yaml b/example_config/super_mode/n1.yaml index 833f14e..4a72681 100644 --- a/example_config/super_mode/n1.yaml +++ b/example_config/super_mode/n1.yaml @@ -32,14 +32,13 @@ dynamicroute: savenewpeers: true supernode: usesupernode: true - pskey: 'iPM8FXfnHVzwjguZHRW9bLNY+h7+B1O2oTJtktptQkI=' + pskey: iPM8FXfnHVzwjguZHRW9bLNY+h7+B1O2oTJtktptQkI= connurlv4: 127.0.0.1:3000 pubkeyv4: LJ8KKacUcIoACTGB/9Ed9w0osrJ3WWeelzpL2u4oUic= connurlv6: '' pubkeyv6: HCfL6YJtpJEGHTlJ2LgVXIWKB/K95P57LHTJ42ZG8VI= apiurl: http://127.0.0.1:3000/api skiplocalip: false - httppostinterval: 15 supernodeinfotimeout: 50 p2p: usep2p: false @@ -48,7 +47,6 @@ dynamicroute: staticmode: false jittertolerance: 20 jittertolerancemultiplier: 1.1 - nodereporttimeout: 40 timeoutcheckinterval: 5 recalculatecooldown: 5 ntpconfig: diff --git a/example_config/super_mode/n2.yaml b/example_config/super_mode/n2.yaml index c5e509e..d011299 100644 --- a/example_config/super_mode/n2.yaml +++ b/example_config/super_mode/n2.yaml @@ -32,14 +32,13 @@ dynamicroute: savenewpeers: true supernode: usesupernode: true - pskey: 'juJMQaGAaeSy8aDsXSKNsPZv/nFiPj4h/1G70tGYygs=' + pskey: juJMQaGAaeSy8aDsXSKNsPZv/nFiPj4h/1G70tGYygs= connurlv4: 127.0.0.1:3000 pubkeyv4: LJ8KKacUcIoACTGB/9Ed9w0osrJ3WWeelzpL2u4oUic= connurlv6: '' pubkeyv6: HCfL6YJtpJEGHTlJ2LgVXIWKB/K95P57LHTJ42ZG8VI= apiurl: http://127.0.0.1:3000/api skiplocalip: false - httppostinterval: 15 supernodeinfotimeout: 50 p2p: usep2p: false @@ -48,7 +47,6 @@ dynamicroute: staticmode: false jittertolerance: 20 jittertolerancemultiplier: 1.1 - nodereporttimeout: 40 timeoutcheckinterval: 5 recalculatecooldown: 5 ntpconfig: diff --git a/example_config/super_mode/s1.yaml b/example_config/super_mode/s1.yaml index f12a9ef..afb5c06 100644 --- a/example_config/super_mode/s1.yaml +++ b/example_config/super_mode/s1.yaml @@ -1,16 +1,19 @@ nodename: NodeSuper -postscript: example_config/echo.sh test +postscript: "" privkeyv4: mL5IW0GuqbjgDeOJuPHBU2iJzBPNKhaNEXbIGwwYWWk= privkeyv6: +EdOKIoBp/EvIusHDsvXhV1RJYbyN3Qr8nxlz35wl3I= listenport: 3000 +repushconfiginterval: 30 +httppostinterval: 50 +peeralivetimeout: 70 +sendpinginterval: 15 loglevel: loglevel: normal logtransit: true logcontrol: true - lognormal: true + lognormal: false loginternal: true logntp: false -repushconfiginterval: 30 passwords: showstate: passwd addpeer: passwd_addpeer @@ -19,7 +22,6 @@ graphrecalculatesetting: staticmode: false jittertolerance: 5 jittertolerancemultiplier: 1.01 - nodereporttimeout: 70 timeoutcheckinterval: 5 recalculatecooldown: 5 nexthoptable: diff --git a/main_edge.go b/main_edge.go index cf0bf13..b96478f 100644 --- a/main_edge.go +++ b/main_edge.go @@ -31,7 +31,7 @@ func printExampleEdgeConf() { Interface: mtypes.InterfaceConf{ Itype: "stdio", Name: "tap1", - VPPIfaceID: 5, + VPPIfaceID: 1, VPPBridgeID: 4242, MacAddrPrefix: "AA:BB:CC:DD", MTU: 1416, @@ -45,7 +45,7 @@ func printExampleEdgeConf() { DefaultTTL: 200, L2FIBTimeout: 3600, PrivKey: "6GyDagZKhbm5WNqMiRHhkf43RlbMJ34IieTlIuvfJ1M=", - ListenPort: 3001, + ListenPort: 0, LogLevel: mtypes.LoggerInfo{ LogLevel: "error", LogTransit: true, @@ -71,23 +71,21 @@ func printExampleEdgeConf() { APIUrl: "http://127.0.0.1:3000/api", SuperNodeInfoTimeout: 50, SkipLocalIP: false, - HttpPostInterval: 15, }, P2P: mtypes.P2Pinfo{ - UseP2P: true, + UseP2P: false, SendPeerInterval: 20, GraphRecalculateSetting: mtypes.GraphRecalculateSetting{ StaticMode: false, JitterTolerance: 20, JitterToleranceMultiplier: 1.1, - NodeReportTimeout: 40, TimeoutCheckInterval: 5, RecalculateCoolDown: 5, }, }, NTPconfig: mtypes.NTPinfo{ UseNTP: true, - MaxServerUse: 5, + MaxServerUse: 8, SyncTimeInterval: 3600, NTPTimeout: 3, Servers: []string{"time.google.com", @@ -229,7 +227,7 @@ func Edge(configPath string, useUAPI bool, printExample bool, bindmode string) ( econfig.LogLevel.LogNTP = false // NTP in static mode is useless } graph := path.NewGraph(3, false, econfig.DynamicRoute.P2P.GraphRecalculateSetting, econfig.DynamicRoute.NTPconfig, econfig.LogLevel) - graph.SetNHTable(econfig.NextHopTable, [32]byte{}) + graph.SetNHTable(econfig.NextHopTable) the_device := device.NewDevice(thetap, econfig.NodeID, conn.NewDefaultBind(true, true, bindmode), logger, graph, false, configPath, &econfig, nil, nil, Version) defer the_device.Close() @@ -310,7 +308,7 @@ func Edge(configPath string, useUAPI bool, printExample bool, bindmode string) ( return errors.New("Failed to connect to supernode.") } } - the_device.Event_Supernode_OK <- struct{}{} + the_device.Chan_Supernode_OK <- struct{}{} } logger.Verbosef("Device started") diff --git a/main_httpserver.go b/main_httpserver.go index 5018343..70164a7 100644 --- a/main_httpserver.go +++ b/main_httpserver.go @@ -1,7 +1,6 @@ package main import ( - "bytes" "crypto/md5" "crypto/sha256" "encoding/base64" @@ -28,15 +27,17 @@ import ( ) type http_shared_objects struct { - http_graph *path.IG - http_device4 *device.Device - http_device6 *device.Device - http_HashSalt []byte - http_NhTable_Hash [32]byte - http_PeerInfo_hash [32]byte - http_NhTableStr []byte - http_PeerInfo mtypes.API_Peers - http_super_chains *mtypes.SUPER_Events + http_graph *path.IG + http_device4 *device.Device + http_device6 *device.Device + http_HashSalt []byte + http_NhTable_Hash string + http_PeerInfo_hash string + http_SuperParams_Hash string + http_SuperParamsStr []byte + http_NhTableStr []byte + http_PeerInfo mtypes.API_Peers + http_super_chains *mtypes.SUPER_Events http_passwords mtypes.Passwords http_StateExpire time.Time @@ -78,11 +79,12 @@ type HttpPeerInfo struct { } type PeerState struct { - NhTableState [32]byte - PeerInfoState [32]byte - JETSecret mtypes.JWTSecret - httpPostCount uint64 - LastSeen time.Time + NhTableState string + PeerInfoState string + SuperParamState string + JETSecret mtypes.JWTSecret + httpPostCount uint64 + LastSeen time.Time } type client struct { @@ -149,7 +151,7 @@ func extractParamsVertex(params url.Values, key string, w http.ResponseWriter) ( return mtypes.Vertex(val), nil } -func get_api_peers(old_State_hash [32]byte) (api_peerinfo mtypes.API_Peers, StateHash [32]byte, changed bool) { +func get_api_peers(old_State_hash string) (api_peerinfo mtypes.API_Peers, StateHash string, changed bool) { // No lock api_peerinfo = make(mtypes.API_Peers) for _, peerinfo := range httpobj.http_sconfig.Peers { @@ -158,7 +160,7 @@ func get_api_peers(old_State_hash [32]byte) (api_peerinfo mtypes.API_Peers, Stat PSKey: peerinfo.PSKey, Connurl: &mtypes.API_connurl{}, } - if httpobj.http_PeerState[peerinfo.PubKey].LastSeen.Add(mtypes.S2TD(httpobj.http_sconfig.GraphRecalculateSetting.NodeReportTimeout)).After(time.Now()) { + if httpobj.http_PeerState[peerinfo.PubKey].LastSeen.Add(mtypes.S2TD(httpobj.http_sconfig.PeerAliveTimeout)).After(time.Now()) { connV4 := httpobj.http_device4.GetConnurl(peerinfo.NodeID) connV6 := httpobj.http_device6.GetConnurl(peerinfo.NodeID) if connV4 != "" { @@ -177,14 +179,15 @@ func get_api_peers(old_State_hash [32]byte) (api_peerinfo mtypes.API_Peers, Stat api_peerinfo_str_byte, _ := json.Marshal(&api_peerinfo) hash_raw := md5.Sum(append(api_peerinfo_str_byte, httpobj.http_HashSalt...)) hash_str := hex.EncodeToString(hash_raw[:]) - copy(StateHash[:], []byte(hash_str)) - if bytes.Equal(old_State_hash[:], StateHash[:]) == false { + StateHash = hash_str + if old_State_hash != StateHash { changed = true } return } -func get_peerinfo(w http.ResponseWriter, r *http.Request) { +func get_superparams(w http.ResponseWriter, r *http.Request) { + // Read all params params := r.URL.Query() PubKey, err := extractParamsStr(params, "PubKey", w) if err != nil { @@ -198,58 +201,135 @@ func get_peerinfo(w http.ResponseWriter, r *http.Request) { if err != nil { return } + if NodeID >= mtypes.Special_NodeID { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("Paramater NodeID: Can't use special nodeID.")) + return + } + // Authentication httpobj.RLock() defer httpobj.RUnlock() + if _, has := httpobj.http_PeerID2Info[NodeID]; !has { + w.WriteHeader(http.StatusNotFound) + w.Write([]byte("Paramater PubKey: NodeID and PubKey are not match")) + return + } if httpobj.http_PeerID2Info[NodeID].PubKey != PubKey { w.WriteHeader(http.StatusNotFound) - w.Write([]byte("Paramater NodeID: NodeID and PubKey are not match")) + w.Write([]byte("Paramater PubKey: NodeID and PubKey are not match")) return } - if bytes.Equal(httpobj.http_PeerInfo_hash[:], []byte(State)) { - if state := httpobj.http_PeerState[PubKey]; state != nil { - copy(httpobj.http_PeerState[PubKey].PeerInfoState[:], State) - http_PeerInfo_2peer := make(mtypes.API_Peers) - - for PeerPubKey, peerinfo := range httpobj.http_PeerInfo { - if httpobj.http_sconfig.UsePSKForInterEdge { - h := sha256.New() - if NodeID > peerinfo.NodeID { - h.Write([]byte(PubKey)) - h.Write([]byte(PeerPubKey)) - } else if NodeID < peerinfo.NodeID { - h.Write([]byte(PeerPubKey)) - h.Write([]byte(PubKey)) - } else { - continue - } - h.Write(httpobj.http_HashSalt) - bs := h.Sum(nil) - var psk device.NoisePresharedKey - copy(psk[:], bs[:]) - peerinfo.PSKey = psk.ToString() - } else { - peerinfo.PSKey = "" - } - if httpobj.http_PeerID2Info[NodeID].SkipLocalIP { // Clear all local IP - peerinfo.Connurl.LocalV4 = make(map[string]float64) - peerinfo.Connurl.LocalV6 = make(map[string]float64) - } - http_PeerInfo_2peer[PeerPubKey] = peerinfo - } - api_peerinfo_str_byte, _ := json.Marshal(&http_PeerInfo_2peer) - - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - w.Write(api_peerinfo_str_byte) - return - } + if httpobj.http_SuperParams_Hash != State { + w.WriteHeader(http.StatusNotFound) + w.Write([]byte("Paramater State: State not correct")) + return } - w.WriteHeader(http.StatusNotFound) - w.Write([]byte("Paramater State: State not correct")) + + if _, has := httpobj.http_PeerState[PubKey]; has == false { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("Paramater PubKey: Not found in httpobj.http_PeerState, this shouldn't happen. Please report to the author.")) + return + } + // Do something + SuperParams := mtypes.API_SuperParams{ + SendPingInterval: httpobj.http_sconfig.SendPingInterval, + HttpPostInterval: httpobj.http_sconfig.HttpPostInterval, + PeerAliveTimeout: httpobj.http_sconfig.PeerAliveTimeout, + AdditionalCost: httpobj.http_PeerID2Info[NodeID].AdditionalCost, + } + SuperParamStr, _ := json.Marshal(SuperParams) + httpobj.http_PeerState[PubKey].SuperParamState = State + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(SuperParamStr)) + return +} + +func get_peerinfo(w http.ResponseWriter, r *http.Request) { + // Read all params + params := r.URL.Query() + PubKey, err := extractParamsStr(params, "PubKey", w) + if err != nil { + return + } + State, err := extractParamsStr(params, "State", w) + if err != nil { + return + } + NodeID, err := extractParamsVertex(params, "NodeID", w) + if err != nil { + return + } + if NodeID >= mtypes.Special_NodeID { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("Paramater NodeID: Can't use special nodeID.")) + return + } + // Authentication + httpobj.RLock() + defer httpobj.RUnlock() + if _, has := httpobj.http_PeerID2Info[NodeID]; !has { + w.WriteHeader(http.StatusNotFound) + w.Write([]byte("Paramater PubKey: NodeID and PubKey are not match")) + return + } + if httpobj.http_PeerID2Info[NodeID].PubKey != PubKey { + w.WriteHeader(http.StatusNotFound) + w.Write([]byte("Paramater PubKey: NodeID and PubKey are not match")) + return + } + if httpobj.http_PeerInfo_hash != State { + w.WriteHeader(http.StatusNotFound) + w.Write([]byte("Paramater State: State not correct")) + return + } + if _, has := httpobj.http_PeerState[PubKey]; has == false { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("Paramater PubKey: Not found in httpobj.http_PeerState, this shouldn't happen. Please report to the author.")) + return + } + + // Do something + httpobj.http_PeerState[PubKey].PeerInfoState = State + http_PeerInfo_2peer := make(mtypes.API_Peers) + + for PeerPubKey, peerinfo := range httpobj.http_PeerInfo { + if httpobj.http_sconfig.UsePSKForInterEdge { + h := sha256.New() + if NodeID > peerinfo.NodeID { + h.Write([]byte(PubKey)) + h.Write([]byte(PeerPubKey)) + } else if NodeID < peerinfo.NodeID { + h.Write([]byte(PeerPubKey)) + h.Write([]byte(PubKey)) + } else { + continue + } + h.Write(httpobj.http_HashSalt) + bs := h.Sum(nil) + var psk device.NoisePresharedKey + copy(psk[:], bs[:]) + peerinfo.PSKey = psk.ToString() + } else { + peerinfo.PSKey = "" + } + if httpobj.http_PeerID2Info[NodeID].SkipLocalIP { // Clear all local IP + peerinfo.Connurl.LocalV4 = make(map[string]float64) + peerinfo.Connurl.LocalV6 = make(map[string]float64) + } + http_PeerInfo_2peer[PeerPubKey] = peerinfo + } + api_peerinfo_str_byte, _ := json.Marshal(&http_PeerInfo_2peer) + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write(api_peerinfo_str_byte) + return } func get_nhtable(w http.ResponseWriter, r *http.Request) { + // Read all params params := r.URL.Query() PubKey, err := extractParamsStr(params, "PubKey", w) if err != nil { @@ -263,25 +343,41 @@ func get_nhtable(w http.ResponseWriter, r *http.Request) { if err != nil { return } + if NodeID >= mtypes.Special_NodeID { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("Paramater NodeID: Can't use special nodeID.")) + return + } + // Authentication httpobj.RLock() defer httpobj.RUnlock() + if _, has := httpobj.http_PeerID2Info[NodeID]; !has { + w.WriteHeader(http.StatusNotFound) + w.Write([]byte("Paramater PubKey: NodeID and PubKey are not match")) + return + } if httpobj.http_PeerID2Info[NodeID].PubKey != PubKey { w.WriteHeader(http.StatusNotFound) - w.Write([]byte("NodeID and PunKey are not match")) + w.Write([]byte("Paramater PubKey: NodeID and PubKey are not match")) + return + } + if httpobj.http_NhTable_Hash != State { + w.WriteHeader(http.StatusNotFound) + w.Write([]byte("Paramater State: State not correct")) + return + } + if _, has := httpobj.http_PeerState[PubKey]; has == false { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("Paramater PubKey: Not found in httpobj.http_PeerState, this shouldn't happen. Please report to the author.")) return } - if bytes.Equal(httpobj.http_NhTable_Hash[:], []byte(State)) { - if state := httpobj.http_PeerState[PubKey]; state != nil { - copy(httpobj.http_PeerState[PubKey].NhTableState[:], State) - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - w.Write([]byte(httpobj.http_NhTableStr)) - return - } - } - w.WriteHeader(http.StatusNotFound) - w.Write([]byte("State not correct")) + httpobj.http_PeerState[PubKey].NhTableState = State + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(httpobj.http_NhTableStr)) + return + } func get_peerstate(w http.ResponseWriter, r *http.Request) { @@ -329,27 +425,34 @@ func post_nodeinfo(w http.ResponseWriter, r *http.Request) { if err != nil { return } - JWTSig, err := extractParamsStr(params, "JWTSig", w) + PubKey, err := extractParamsStr(params, "PubKey", w) if err != nil { return } - if NodeID >= mtypes.Special_NodeID { w.WriteHeader(http.StatusBadRequest) w.Write([]byte("Paramater NodeID: Can't use special nodeID.")) return } - httpobj.RLock() - defer httpobj.RUnlock() - var PubKey string - if peerconf, has := httpobj.http_PeerID2Info[NodeID]; has { - PubKey = peerconf.PubKey - } else { - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte("Paramater NodeID: NodeID not exists.")) + JWTSig, err := extractParamsStr(params, "JWTSig", w) + if err != nil { return } + + httpobj.RLock() + defer httpobj.RUnlock() + if _, has := httpobj.http_PeerID2Info[NodeID]; !has { + w.WriteHeader(http.StatusNotFound) + w.Write([]byte("NodeID and PunKey are not match")) + return + } + if httpobj.http_PeerID2Info[NodeID].PubKey != PubKey { + w.WriteHeader(http.StatusNotFound) + w.Write([]byte("NodeID and PunKey are not match")) + return + } + JWTSecret := httpobj.http_PeerState[PubKey].JETSecret httpPostCount := httpobj.http_PeerState[PubKey].httpPostCount @@ -432,11 +535,9 @@ func post_nodeinfo(w http.ResponseWriter, r *http.Request) { if changed { NhTable := httpobj.http_graph.GetNHTable(true) NhTablestr, _ := json.Marshal(NhTable) - md5_hash_raw := md5.Sum(append(httpobj.http_NhTableStr, httpobj.http_HashSalt...)) + md5_hash_raw := md5.Sum(append(NhTablestr, httpobj.http_HashSalt...)) new_hash_str := hex.EncodeToString(md5_hash_raw[:]) - new_hash_str_byte := []byte(new_hash_str) - copy(httpobj.http_NhTable_Hash[:], new_hash_str_byte) - copy(httpobj.http_graph.NhTableHash[:], new_hash_str_byte) + httpobj.http_NhTable_Hash = new_hash_str httpobj.http_NhTableStr = NhTablestr PushNhTable(false) } @@ -533,7 +634,7 @@ func peeradd(w http.ResponseWriter, r *http.Request) { //Waiting for test w.Write([]byte(fmt.Sprintf("Paramater nexthoptable: \"%v\", %v", NhTableStr, err))) return } - httpobj.http_graph.SetNHTable(NewNhTable, [32]byte{}) + httpobj.http_graph.SetNHTable(NewNhTable) } err = super_peeradd(mtypes.SuperPeerInfo{ NodeID: NodeID, @@ -583,14 +684,14 @@ func peerdel(w http.ResponseWriter, r *http.Request) { //Waiting for test defer httpobj.Unlock() if pwderr == nil { // user provide the password if password == httpobj.http_passwords.DelPeer { - NodeID, err = extractParamsVertex(params, "nodeid", w) + NodeID, err = extractParamsVertex(params, "NodeID", w) if err != nil { return } toDelete = NodeID if _, has := httpobj.http_PeerID2Info[toDelete]; !has { w.WriteHeader(http.StatusNotFound) - w.Write([]byte(fmt.Sprintf("Paramater nodeid: \"%v\" not found", PubKey))) + w.Write([]byte(fmt.Sprintf("Paramater NodeID: \"%v\" not found", PubKey))) return } } else { @@ -599,14 +700,14 @@ func peerdel(w http.ResponseWriter, r *http.Request) { //Waiting for test return } } else { // user don't provide the password - PrivKey, err = extractParamsStr(params, "privkey", w) + PrivKey, err = extractParamsStr(params, "PrivKey", w) if err != nil { return } privk, err := device.Str2PriKey(PrivKey) if err != nil { w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(fmt.Sprintf("Paramater privkey: %v", err))) + w.Write([]byte(fmt.Sprintf("Paramater PrivKey: %v", err))) return } pubk := privk.PublicKey() @@ -618,7 +719,7 @@ func peerdel(w http.ResponseWriter, r *http.Request) { //Waiting for test } if toDelete == mtypes.Broadcast { w.WriteHeader(http.StatusNotFound) - w.Write([]byte(fmt.Sprintf("Paramater privkey: \"%v\" not found", PubKey))) + w.Write([]byte(fmt.Sprintf("Paramater PrivKey: \"%v\" not found", PubKey))) return } } @@ -636,7 +737,7 @@ func peerdel(w http.ResponseWriter, r *http.Request) { //Waiting for test mtypesBytes, _ := yaml.Marshal(httpobj.http_sconfig) ioutil.WriteFile(httpobj.http_sconfig_path, mtypesBytes, 0644) w.WriteHeader(http.StatusOK) - w.Write([]byte("Node ID: " + toDelete.ToString() + " deleted.")) + w.Write([]byte("NodeID: " + toDelete.ToString() + " deleted.")) return } @@ -645,6 +746,7 @@ func HttpServer(http_port int, apiprefix string) { if apiprefix[0] != '/' { apiprefix = "/" + apiprefix } + mux.HandleFunc(apiprefix+"/superparams", get_superparams) mux.HandleFunc(apiprefix+"/peerinfo", get_peerinfo) mux.HandleFunc(apiprefix+"/nhtable", get_nhtable) mux.HandleFunc(apiprefix+"/peerstate", get_peerstate) diff --git a/main_super.go b/main_super.go index c01899f..a8c4736 100644 --- a/main_super.go +++ b/main_super.go @@ -6,7 +6,6 @@ package main import ( - "bytes" "crypto/md5" "encoding/hex" "encoding/json" @@ -80,6 +79,9 @@ func printExampleSuperConf() { LogNTP: false, }, RePushConfigInterval: 30, + PeerAliveTimeout: 70, + HttpPostInterval: 50, + SendPingInterval: 15, Passwords: mtypes.Passwords{ ShowState: "passwd", AddPeer: "passwd_addpeer", @@ -89,7 +91,6 @@ func printExampleSuperConf() { StaticMode: false, JitterTolerance: 5, JitterToleranceMultiplier: 1.01, - NodeReportTimeout: 70, TimeoutCheckInterval: 5, RecalculateCoolDown: 5, }, @@ -148,6 +149,20 @@ func Super(configPath string, useUAPI bool, printExample bool, bindmode string) if len(NodeName) > 32 { return errors.New("Node name can't longer than 32 :" + NodeName) } + if sconfig.PeerAliveTimeout <= 0 { + return fmt.Errorf("PeerAliveTimeout must > 0 : %v", sconfig.PeerAliveTimeout) + } + if sconfig.HttpPostInterval <= 0 { + return fmt.Errorf("HttpPostInterval must > 0 : %v", sconfig.HttpPostInterval) + } else if sconfig.HttpPostInterval > sconfig.PeerAliveTimeout { + return fmt.Errorf("HttpPostInterval must <= PeerAliveTimeout : %v", sconfig.HttpPostInterval) + } + if sconfig.SendPingInterval <= 0 { + return fmt.Errorf("SendPingInterval must > 0 : %v", sconfig.SendPingInterval) + } + if sconfig.RePushConfigInterval <= 0 { + return fmt.Errorf("RePushConfigInterval must > 0 : %v", sconfig.RePushConfigInterval) + } var logLevel int switch sconfig.LogLevel.LogLevel { @@ -182,7 +197,7 @@ func Super(configPath string, useUAPI bool, printExample bool, bindmode string) Event_server_register: make(chan mtypes.RegisterMsg, 1<<5), } httpobj.http_graph = path.NewGraph(3, true, sconfig.GraphRecalculateSetting, mtypes.NTPinfo{}, sconfig.LogLevel) - httpobj.http_graph.SetNHTable(httpobj.http_sconfig.NextHopTable, [32]byte{}) + httpobj.http_graph.SetNHTable(httpobj.http_sconfig.NextHopTable) if sconfig.GraphRecalculateSetting.StaticMode { err = checkNhTable(httpobj.http_sconfig.NextHopTable, sconfig.Peers) if err != nil { @@ -242,7 +257,7 @@ func Super(configPath string, useUAPI bool, printExample bool, bindmode string) } defer uapi6.Close() } - + prepare_superparams(*httpobj.http_sconfig) go Event_server_event_hendler(httpobj.http_graph, httpobj.http_super_chains) go RoutinePushSettings(mtypes.S2TD(sconfig.RePushConfigInterval)) go RoutineTimeoutCheck() @@ -278,6 +293,21 @@ func Super(configPath string, useUAPI bool, printExample bool, bindmode string) return } +func prepare_superparams(sconfig mtypes.SuperConfig) { + SuperParams := mtypes.API_SuperParams{ + SendPingInterval: sconfig.SendPingInterval, + HttpPostInterval: sconfig.HttpPostInterval, + PeerAliveTimeout: sconfig.PeerAliveTimeout, + AdditionalCost: -1, + } + SuperParamStr, _ := json.Marshal(SuperParams) + httpobj.http_SuperParamsStr = SuperParamStr + md5_hash_raw := md5.Sum(append(httpobj.http_SuperParamsStr, httpobj.http_HashSalt...)) + new_hash_str := hex.EncodeToString(md5_hash_raw[:]) + + httpobj.http_SuperParams_Hash = new_hash_str +} + func super_peeradd(peerconf mtypes.SuperPeerInfo) error { // No lock, lock before call me pk, err := device.Str2PubKey(peerconf.PubKey) @@ -337,14 +367,14 @@ func super_peerdel(toDelete mtypes.Vertex) { } func super_peerdel_notify(toDelete mtypes.Vertex, PubKey string) { - UpdateErrorMsg := mtypes.ServerCommandMsg{ - Node_id: toDelete, - Action: mtypes.Shutdown, - ErrorCode: int(syscall.ENOENT), - ErrorMsg: "You've been removed from supernode.", + ServerUpdateMsg := mtypes.ServerUpdateMsg{ + Node_id: toDelete, + Action: mtypes.Shutdown, + Code: int(syscall.ENOENT), + Params: "You've been removed from supernode.", } for i := 0; i < 10; i++ { - body, _ := mtypes.GetByte(&UpdateErrorMsg) + body, _ := mtypes.GetByte(&ServerUpdateMsg) buf := make([]byte, path.EgHeaderLen+len(body)) header, _ := path.NewEgHeader(buf[:path.EgHeaderLen]) header.SetSrc(mtypes.SuperNodeMessage) @@ -354,10 +384,10 @@ func super_peerdel_notify(toDelete mtypes.Vertex, PubKey string) { header.SetDst(toDelete) peer4 := httpobj.http_device4.LookupPeerByStr(PubKey) - httpobj.http_device4.SendPacket(peer4, path.UpdateError, buf, device.MessageTransportOffsetContent) + httpobj.http_device4.SendPacket(peer4, path.ServerUpdate, buf, device.MessageTransportOffsetContent) peer6 := httpobj.http_device6.LookupPeerByStr(PubKey) - httpobj.http_device6.SendPacket(peer6, path.UpdateError, buf, device.MessageTransportOffsetContent) + httpobj.http_device6.SendPacket(peer6, path.ServerUpdate, buf, device.MessageTransportOffsetContent) time.Sleep(mtypes.S2TD(0.1)) } httpobj.http_device4.RemovePeerByID(toDelete) @@ -378,12 +408,12 @@ func Event_server_event_hendler(graph *path.IG, events *mtypes.SUPER_Events) { httpobj.http_PeerState[PubKey].LastSeen = time.Now() httpobj.http_PeerState[PubKey].JETSecret = reg_msg.JWTSecret httpobj.http_PeerState[PubKey].httpPostCount = reg_msg.HttpPostCount - if bytes.Equal(httpobj.http_PeerState[PubKey].NhTableState[:], reg_msg.NhStateHash[:]) == false { - copy(httpobj.http_PeerState[PubKey].NhTableState[:], reg_msg.NhStateHash[:]) + if httpobj.http_PeerState[PubKey].NhTableState == reg_msg.NhStateHash == false { + httpobj.http_PeerState[PubKey].NhTableState = reg_msg.NhStateHash should_push_nh = true } - if bytes.Equal(httpobj.http_PeerState[PubKey].PeerInfoState[:], reg_msg.PeerStateHash[:]) == false { - copy(httpobj.http_PeerState[PubKey].PeerInfoState[:], reg_msg.PeerStateHash[:]) + if httpobj.http_PeerState[PubKey].PeerInfoState == reg_msg.PeerStateHash == false { + httpobj.http_PeerState[PubKey].PeerInfoState = reg_msg.PeerStateHash should_push_peer = true } } @@ -414,11 +444,9 @@ func Event_server_event_hendler(graph *path.IG, events *mtypes.SUPER_Events) { if changed { NhTable := graph.GetNHTable(true) NhTablestr, _ := json.Marshal(NhTable) - md5_hash_raw := md5.Sum(append(httpobj.http_NhTableStr, httpobj.http_HashSalt...)) + md5_hash_raw := md5.Sum(append(NhTablestr, httpobj.http_HashSalt...)) new_hash_str := hex.EncodeToString(md5_hash_raw[:]) - new_hash_str_byte := []byte(new_hash_str) - copy(httpobj.http_NhTable_Hash[:], new_hash_str_byte) - copy(graph.NhTableHash[:], new_hash_str_byte) + httpobj.http_NhTable_Hash = new_hash_str httpobj.http_NhTableStr = NhTablestr PushNhTable(false) } @@ -438,7 +466,8 @@ func RoutinePushSettings(interval time.Duration) { force = false } PushNhTable(force) - PushPeerinfo(force) + PushPeerinfo(false) + PushServerParams(false) time.Sleep(mtypes.S2TD(1)) } } @@ -460,8 +489,11 @@ func RoutineTimeoutCheck() { func PushNhTable(force bool) { // No lock - body, err := mtypes.GetByte(mtypes.UpdateNhTableMsg{ - State_hash: httpobj.http_NhTable_Hash, + body, err := mtypes.GetByte(mtypes.ServerUpdateMsg{ + Node_id: mtypes.SuperNodeMessage, + Action: mtypes.UpdateNhTable, + Code: 0, + Params: string(httpobj.http_NhTable_Hash[:]), }) if err != nil { fmt.Println("Error get byte") @@ -475,16 +507,16 @@ func PushNhTable(force bool) { header.SetTTL(0) copy(buf[path.EgHeaderLen:], body) for pkstr, peerstate := range httpobj.http_PeerState { - isAlive := peerstate.LastSeen.Add(mtypes.S2TD(httpobj.http_sconfig.GraphRecalculateSetting.NodeReportTimeout)).After(time.Now()) + isAlive := peerstate.LastSeen.Add(mtypes.S2TD(httpobj.http_sconfig.PeerAliveTimeout)).After(time.Now()) if !isAlive { continue } if force || peerstate.NhTableState != httpobj.http_NhTable_Hash { if peer := httpobj.http_device4.LookupPeerByStr(pkstr); peer != nil && peer.GetEndpointDstStr() != "" { - httpobj.http_device4.SendPacket(peer, path.UpdateNhTable, buf, device.MessageTransportOffsetContent) + httpobj.http_device4.SendPacket(peer, path.ServerUpdate, buf, device.MessageTransportOffsetContent) } if peer := httpobj.http_device6.LookupPeerByStr(pkstr); peer != nil && peer.GetEndpointDstStr() != "" { - httpobj.http_device6.SendPacket(peer, path.UpdateNhTable, buf, device.MessageTransportOffsetContent) + httpobj.http_device6.SendPacket(peer, path.ServerUpdate, buf, device.MessageTransportOffsetContent) } } } @@ -492,8 +524,11 @@ func PushNhTable(force bool) { func PushPeerinfo(force bool) { //No lock - body, err := mtypes.GetByte(mtypes.UpdatePeerMsg{ - State_hash: httpobj.http_PeerInfo_hash, + body, err := mtypes.GetByte(mtypes.ServerUpdateMsg{ + Node_id: mtypes.SuperNodeMessage, + Action: mtypes.UpdatePeer, + Code: 0, + Params: string(httpobj.http_PeerInfo_hash[:]), }) if err != nil { fmt.Println("Error get byte") @@ -507,16 +542,51 @@ func PushPeerinfo(force bool) { header.SetTTL(0) copy(buf[path.EgHeaderLen:], body) for pkstr, peerstate := range httpobj.http_PeerState { - isAlive := peerstate.LastSeen.Add(mtypes.S2TD(httpobj.http_sconfig.GraphRecalculateSetting.NodeReportTimeout)).After(time.Now()) + isAlive := peerstate.LastSeen.Add(mtypes.S2TD(httpobj.http_sconfig.PeerAliveTimeout)).After(time.Now()) if !isAlive { continue } if force || peerstate.PeerInfoState != httpobj.http_PeerInfo_hash { if peer := httpobj.http_device4.LookupPeerByStr(pkstr); peer != nil { - httpobj.http_device4.SendPacket(peer, path.UpdatePeer, buf, device.MessageTransportOffsetContent) + httpobj.http_device4.SendPacket(peer, path.ServerUpdate, buf, device.MessageTransportOffsetContent) } if peer := httpobj.http_device6.LookupPeerByStr(pkstr); peer != nil { - httpobj.http_device6.SendPacket(peer, path.UpdatePeer, buf, device.MessageTransportOffsetContent) + httpobj.http_device6.SendPacket(peer, path.ServerUpdate, buf, device.MessageTransportOffsetContent) + } + } + } +} + +func PushServerParams(force bool) { + //No lock + body, err := mtypes.GetByte(mtypes.ServerUpdateMsg{ + Node_id: mtypes.SuperNodeMessage, + Action: mtypes.UpdateSuperParams, + Code: 0, + Params: string(httpobj.http_SuperParams_Hash[:]), + }) + if err != nil { + fmt.Println("Error get byte") + return + } + buf := make([]byte, path.EgHeaderLen+len(body)) + header, _ := path.NewEgHeader(buf[:path.EgHeaderLen]) + header.SetDst(mtypes.SuperNodeMessage) + header.SetPacketLength(uint16(len(body))) + header.SetSrc(mtypes.SuperNodeMessage) + header.SetTTL(0) + copy(buf[path.EgHeaderLen:], body) + for pkstr, peerstate := range httpobj.http_PeerState { + isAlive := peerstate.LastSeen.Add(mtypes.S2TD(httpobj.http_sconfig.PeerAliveTimeout)).After(time.Now()) + if !isAlive { + continue + } + if force || peerstate.SuperParamState != httpobj.http_SuperParams_Hash { + if peer := httpobj.http_device4.LookupPeerByStr(pkstr); peer != nil { + httpobj.http_device4.SendPacket(peer, path.ServerUpdate, buf, device.MessageTransportOffsetContent) + } + if peer := httpobj.http_device6.LookupPeerByStr(pkstr); peer != nil { + httpobj.http_device6.SendPacket(peer, path.ServerUpdate, buf, device.MessageTransportOffsetContent) } } } diff --git a/mtypes/config.go b/mtypes/config.go index 8e35784..71a0026 100644 --- a/mtypes/config.go +++ b/mtypes/config.go @@ -3,6 +3,7 @@ package mtypes import ( "math" "strconv" + "sync/atomic" ) // Nonnegative integer ID of vertex @@ -38,8 +39,11 @@ type SuperConfig struct { PrivKeyV4 string PrivKeyV6 string ListenPort int - LogLevel LoggerInfo RePushConfigInterval float64 + HttpPostInterval float64 + PeerAliveTimeout float64 + SendPingInterval float64 + LogLevel LoggerInfo Passwords Passwords GraphRecalculateSetting GraphRecalculateSetting NextHopTable NextHopTable @@ -135,7 +139,6 @@ type SuperInfo struct { PubKeyV6 string APIUrl string SkipLocalIP bool - HttpPostInterval float64 SuperNodeInfoTimeout float64 } @@ -149,7 +152,6 @@ type GraphRecalculateSetting struct { StaticMode bool JitterTolerance float64 JitterToleranceMultiplier float64 - NodeReportTimeout float64 TimeoutCheckInterval float64 RecalculateCoolDown float64 } @@ -201,6 +203,19 @@ type API_Peerinfo struct { Connurl *API_connurl } +type API_SuperParams struct { + SendPingInterval float64 + HttpPostInterval float64 + PeerAliveTimeout float64 + AdditionalCost float64 +} + +type StateHash struct { + Peer atomic.Value //[32]byte + SuperParam atomic.Value //[32]byte + NhTable atomic.Value //[32]byte +} + type API_Peers map[string]API_Peerinfo // map[PubKey]API_Peerinfo type JWTSecret [32]byte diff --git a/mtypes/metamessage.go b/mtypes/metamessage.go index 4eeedaa..e2d2666 100644 --- a/mtypes/metamessage.go +++ b/mtypes/metamessage.go @@ -22,12 +22,13 @@ func GetByte(structIn interface{}) (bb []byte, err error) { } type RegisterMsg struct { - Node_id Vertex - Version string - PeerStateHash [32]byte - NhStateHash [32]byte - JWTSecret JWTSecret - HttpPostCount uint64 + Node_id Vertex + Version string + PeerStateHash string + NhStateHash string + SuperParamStateHash string + JWTSecret JWTSecret + HttpPostCount uint64 } func Hash2Str(h []byte) string { @@ -40,7 +41,7 @@ func Hash2Str(h []byte) string { } func (c *RegisterMsg) ToString() string { - return fmt.Sprint("RegisterMsg Node_id:"+c.Node_id.ToString(), " Version:"+c.Version, " PeerHash:"+Hash2Str(c.PeerStateHash[:]), " NhHash:"+Hash2Str(c.NhStateHash[:])) + return fmt.Sprint("RegisterMsg Node_id:"+c.Node_id.ToString(), " Version:"+c.Version, " PeerHash:"+c.PeerStateHash, " NhHash:"+c.NhStateHash, " SuperParamHash:"+c.SuperParamStateHash) } func ParseRegisterMsg(bin []byte) (StructPlace RegisterMsg, err error) { @@ -58,27 +59,38 @@ const ( Shutdown ThrowError Panic + UpdatePeer + UpdateNhTable + UpdateSuperParams ) func (a *ServerCommand) ToString() string { - if *a == Shutdown { + switch *a { + case Shutdown: return "Shutdown" - } else if *a == ThrowError { + case ThrowError: return "ThrowError" - } else if *a == Panic { + case Panic: return "Panic" + case UpdatePeer: + return "UpdatePeer" + case UpdateNhTable: + return "UpdateNhTable" + case UpdateSuperParams: + return "UpdateSuperParams" + default: + return "Unknown" } - return "Unknown" } -type ServerCommandMsg struct { - Node_id Vertex - Action ServerCommand - ErrorCode int - ErrorMsg string +type ServerUpdateMsg struct { + Node_id Vertex + Action ServerCommand + Code int + Params string } -func ParseUpdateErrorMsg(bin []byte) (StructPlace ServerCommandMsg, err error) { +func ParseServerUpdateMsg(bin []byte) (StructPlace ServerUpdateMsg, err error) { var b bytes.Buffer b.Write(bin) d := gob.NewDecoder(&b) @@ -86,40 +98,8 @@ func ParseUpdateErrorMsg(bin []byte) (StructPlace ServerCommandMsg, err error) { return } -func (c *ServerCommandMsg) ToString() string { - return "ServerCommandMsg Node_id:" + c.Node_id.ToString() + " Action:" + c.Action.ToString() + " ErrorCode:" + strconv.Itoa(int(c.ErrorCode)) + " ErrorMsg " + c.ErrorMsg -} - -type UpdatePeerMsg struct { - State_hash [32]byte -} - -func (c *UpdatePeerMsg) ToString() string { - return "UpdatePeerMsg State_hash:" + string(c.State_hash[:]) -} - -func ParseUpdatePeerMsg(bin []byte) (StructPlace UpdatePeerMsg, err error) { - var b bytes.Buffer - b.Write(bin) - d := gob.NewDecoder(&b) - err = d.Decode(&StructPlace) - return -} - -type UpdateNhTableMsg struct { - State_hash [32]byte -} - -func (c *UpdateNhTableMsg) ToString() string { - return "UpdateNhTableMsg State_hash:" + string(c.State_hash[:]) -} - -func ParseUpdateNhTableMsg(bin []byte) (StructPlace UpdateNhTableMsg, err error) { - var b bytes.Buffer - b.Write(bin) - d := gob.NewDecoder(&b) - err = d.Decode(&StructPlace) - return +func (c *ServerUpdateMsg) ToString() string { + return "ServerUpdateMsg Node_id:" + c.Node_id.ToString() + " Action:" + c.Action.ToString() + " Code:" + strconv.Itoa(int(c.Code)) + " Params: " + c.Params } type PingMsg struct { diff --git a/path/header.go b/path/header.go index feafe30..113b228 100644 --- a/path/header.go +++ b/path/header.go @@ -22,11 +22,9 @@ const ( MessageTransportType NormalPacket - Register //Register to server - UpdatePeer //Comes from server - UpdateNhTable - UpdateError + Register //Send to server + ServerUpdate //Comes from server PingPacket //Comes from other peer PongPacket //Send to everyone, include server diff --git a/path/path.go b/path/path.go index 455c3be..96d94de 100644 --- a/path/path.go +++ b/path/path.go @@ -1,7 +1,6 @@ package path import ( - "bytes" "errors" "fmt" "io/ioutil" @@ -48,7 +47,7 @@ type IG struct { recalculateTime time.Time dlTable mtypes.DistTable nhTable mtypes.NextHopTable - NhTableHash [32]byte + changed bool NhTableExpire time.Time IsSuperMode bool loglevel mtypes.LoggerInfo @@ -122,7 +121,7 @@ func (g *IG) CheckAnyShouldUpdate() bool { func (g *IG) RecalculateNhTable(checkchange bool) (changed bool) { if g.StaticMode { - if bytes.Equal(g.NhTableHash[:], make([]byte, 32)) { + if g.changed { changed = checkchange } return @@ -159,7 +158,7 @@ func (g *IG) RemoveVirt(v mtypes.Vertex, recalculate bool, checkchange bool) (ch delete(g.edges[u], v) } g.edgelock.Unlock() - g.NhTableHash = [32]byte{} + g.changed = true if recalculate { changed = g.RecalculateNhTable(checkchange) } @@ -409,9 +408,9 @@ func Path(u, v mtypes.Vertex, next mtypes.NextHopTable) (path []mtypes.Vertex) { return path } -func (g *IG) SetNHTable(nh mtypes.NextHopTable, table_hash [32]byte) { // set nhTable from supernode +func (g *IG) SetNHTable(nh mtypes.NextHopTable) { // set nhTable from supernode g.nhTable = nh - g.NhTableHash = table_hash + g.changed = true g.NhTableExpire = time.Now().Add(g.SuperNodeInfoTimeout) } @@ -500,9 +499,7 @@ func Solve(filePath string, pe bool) error { return nil } - g := NewGraph(3, false, mtypes.GraphRecalculateSetting{ - NodeReportTimeout: 9999, - }, mtypes.NTPinfo{}, mtypes.LoggerInfo{LogInternal: true}) + g := NewGraph(3, false, mtypes.GraphRecalculateSetting{ }, mtypes.NTPinfo{}, mtypes.LoggerInfo{LogInternal: true}) inputb, err := ioutil.ReadFile(filePath) if err != nil { return err