SuperParams

This commit is contained in:
Kusakabe Si 2021-12-04 02:32:59 +00:00
parent e949229827
commit c239162159
12 changed files with 608 additions and 331 deletions

View File

@ -70,10 +70,12 @@ type Device struct {
keyMap map[NoisePublicKey]*Peer
IDMap map[mtypes.Vertex]*Peer
SuperPeer map[NoisePublicKey]*Peer
Peer_state [32]byte
LocalV4 net.IP
LocalV6 net.IP
}
state_hashes mtypes.StateHash
event_tryendpoint chan struct{}
EdgeConfigPath string
@ -81,11 +83,12 @@ type Device struct {
SuperConfigPath string
SuperConfig *mtypes.SuperConfig
Event_server_register chan mtypes.RegisterMsg
Event_server_pong chan mtypes.PongMsg
Event_save_config chan struct{}
Event_Supernode_OK chan struct{}
Chan_server_register chan mtypes.RegisterMsg
Chan_server_pong chan mtypes.PongMsg
Chan_save_config chan struct{}
Chan_Supernode_OK chan struct{}
Chan_SendPingStart chan struct{}
Chan_HttpPostStart chan struct{}
indexTable IndexTable
cookieChecker CookieChecker
@ -341,6 +344,10 @@ func NewDevice(tapDevice tap.Device, id mtypes.Vertex, bind conn.Bind, logger *L
device.Version = version
device.JWTSecret = mtypes.ByteSlice2Byte32(mtypes.RandomBytes(32, []byte(fmt.Sprintf("%v", time.Now()))))
device.state_hashes.NhTable.Store("")
device.state_hashes.Peer.Store("")
device.state_hashes.SuperParam.Store("")
device.rate.limiter.Init()
device.indexTable.Init()
device.PopulatePools()
@ -348,8 +355,8 @@ func NewDevice(tapDevice tap.Device, id mtypes.Vertex, bind conn.Bind, logger *L
device.SuperConfigPath = configpath
device.SuperConfig = sconfig
device.EdgeConfig = &mtypes.EdgeConfig{}
device.Event_server_pong = superevents.Event_server_pong
device.Event_server_register = superevents.Event_server_register
device.Chan_server_pong = superevents.Event_server_pong
device.Chan_server_register = superevents.Event_server_register
device.LogLevel = sconfig.LogLevel
} else {
device.EdgeConfigPath = configpath
@ -357,19 +364,21 @@ func NewDevice(tapDevice tap.Device, id mtypes.Vertex, bind conn.Bind, logger *L
device.SuperConfig = &mtypes.SuperConfig{}
device.DupData = *fixed_time_cache.NewCache(mtypes.S2TD(econfig.DynamicRoute.DupCheckTimeout), false, mtypes.S2TD(60))
device.event_tryendpoint = make(chan struct{}, 1<<6)
device.Event_save_config = make(chan struct{}, 1<<5)
device.Event_Supernode_OK = make(chan struct{}, 4)
device.Chan_save_config = make(chan struct{}, 1<<5)
device.Chan_Supernode_OK = make(chan struct{}, 1<<5)
device.Chan_SendPingStart = make(chan struct{}, 1<<5)
device.Chan_HttpPostStart = make(chan struct{}, 1<<5)
device.LogLevel = econfig.LogLevel
go device.RoutineSetEndpoint()
go device.RoutineDetectOfflineAndTryNextEndpoint()
go device.RoutineRegister()
go device.RoutineSendPing()
go device.RoutineSendPing(device.Chan_SendPingStart)
go device.RoutineSpreadAllMyNeighbor()
go device.RoutineResetConn()
go device.RoutineClearL2FIB()
go device.RoutineRecalculateNhTable()
go device.RoutinePostPeerInfo()
go device.RoutinePostPeerInfo(device.Chan_HttpPostStart)
}
// create queues

View File

@ -10,7 +10,6 @@ import (
"io/ioutil"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"syscall"
@ -147,17 +146,9 @@ func (device *Device) process_received(msg_type path.Usage, peer *Peer, body []b
}
} else {
switch msg_type {
case path.UpdatePeer:
if content, err := mtypes.ParseUpdatePeerMsg(body); err == nil {
go device.process_UpdatePeerMsg(peer, content)
}
case path.UpdateNhTable:
if content, err := mtypes.ParseUpdateNhTableMsg(body); err == nil {
go device.process_UpdateNhTableMsg(peer, content)
}
case path.UpdateError:
if content, err := mtypes.ParseUpdateErrorMsg(body); err == nil {
device.process_UpdateErrorMsg(peer, content)
case path.ServerUpdate:
if content, err := mtypes.ParseServerUpdateMsg(body); err == nil {
device.process_ServerUpdateMsg(peer, content)
}
case path.PingPacket:
if content, err := mtypes.ParsePingMsg(body); err == nil {
@ -189,21 +180,11 @@ func (device *Device) sprint_received(msg_type path.Usage, body []byte) string {
return content.ToString()
}
return "RegisterMsg: Parse failed"
case path.UpdatePeer:
if content, err := mtypes.ParseUpdatePeerMsg(body); err == nil {
case path.ServerUpdate:
if content, err := mtypes.ParseServerUpdateMsg(body); err == nil {
return content.ToString()
}
return "UpdatePeerMsg: Parse failed"
case path.UpdateNhTable:
if content, err := mtypes.ParseUpdateNhTableMsg(body); err == nil {
return content.ToString()
}
return "UpdateNhTableMsg: Parse failed"
case path.UpdateError:
if content, err := mtypes.ParseUpdateErrorMsg(body); err == nil {
return content.ToString()
}
return "UpdateErrorMsg: Parse failed"
return "ServerUpdate: Parse failed"
case path.PingPacket:
if content, err := mtypes.ParsePingMsg(body); err == nil {
return content.ToString()
@ -270,30 +251,30 @@ func compareVersion(v1 string, v2 string) bool {
}
func (device *Device) server_process_RegisterMsg(peer *Peer, content mtypes.RegisterMsg) error {
UpdateErrorMsg := mtypes.ServerCommandMsg{
Node_id: peer.ID,
Action: mtypes.NoAction,
ErrorCode: 0,
ErrorMsg: "",
ServerUpdateMsg := mtypes.ServerUpdateMsg{
Node_id: peer.ID,
Action: mtypes.NoAction,
Code: 0,
Params: "",
}
if peer.ID != content.Node_id {
UpdateErrorMsg = mtypes.ServerCommandMsg{
Node_id: peer.ID,
Action: mtypes.ThrowError,
ErrorCode: int(syscall.EPERM),
ErrorMsg: fmt.Sprintf("Your nodeID: %v is not match with registered nodeID: %v", content.Node_id, peer.ID),
ServerUpdateMsg = mtypes.ServerUpdateMsg{
Node_id: peer.ID,
Action: mtypes.ThrowError,
Code: int(syscall.EPERM),
Params: fmt.Sprintf("Your nodeID: %v is not match with registered nodeID: %v", content.Node_id, peer.ID),
}
}
if compareVersion(content.Version, device.Version) == false {
UpdateErrorMsg = mtypes.ServerCommandMsg{
Node_id: peer.ID,
Action: mtypes.ThrowError,
ErrorCode: int(syscall.ENOSYS),
ErrorMsg: fmt.Sprintf("Your version: \"%v\" is not compatible with our version: \"%v\"", content.Version, device.Version),
ServerUpdateMsg = mtypes.ServerUpdateMsg{
Node_id: peer.ID,
Action: mtypes.ThrowError,
Code: int(syscall.ENOSYS),
Params: fmt.Sprintf("Your version: \"%v\" is not compatible with our version: \"%v\"", content.Version, device.Version),
}
}
if UpdateErrorMsg.Action != mtypes.NoAction {
body, err := mtypes.GetByte(&UpdateErrorMsg)
if ServerUpdateMsg.Action != mtypes.NoAction {
body, err := mtypes.GetByte(&ServerUpdateMsg)
if err != nil {
return err
}
@ -304,15 +285,15 @@ func (device *Device) server_process_RegisterMsg(peer *Peer, content mtypes.Regi
header.SetPacketLength(uint16(len(body)))
copy(buf[path.EgHeaderLen:], body)
header.SetDst(mtypes.SuperNodeMessage)
device.SendPacket(peer, path.UpdateError, buf, MessageTransportOffsetContent)
device.SendPacket(peer, path.ServerUpdate, buf, MessageTransportOffsetContent)
return nil
}
device.Event_server_register <- content
device.Chan_server_register <- content
return nil
}
func (device *Device) server_process_Pong(peer *Peer, content mtypes.PongMsg) error {
device.Event_server_pong <- content
device.Chan_server_pong <- content
return nil
}
@ -377,31 +358,31 @@ func (device *Device) process_pong(peer *Peer, content mtypes.PongMsg) error {
return nil
}
func (device *Device) process_UpdatePeerMsg(peer *Peer, content mtypes.UpdatePeerMsg) error {
func (device *Device) process_UpdatePeerMsg(peer *Peer, State_hash string) error {
var send_signal bool
if device.EdgeConfig.DynamicRoute.SuperNode.UseSuperNode {
if peer.ID != mtypes.SuperNodeMessage {
if device.state_hashes.Peer.Load().(string) == State_hash {
if device.LogLevel.LogControl {
fmt.Println("Control: Ignored UpdateErrorMsg. Not from supernode.")
}
return nil
}
if bytes.Equal(device.peers.Peer_state[:], content.State_hash[:]) {
if device.LogLevel.LogControl {
fmt.Println("Control: Same PeerState Hash, skip download nhTable")
fmt.Println("Control: Same Hash, skip download PeerInfo")
}
return nil
}
var peer_infos mtypes.API_Peers
downloadurl := device.EdgeConfig.DynamicRoute.SuperNode.APIUrl + "/peerinfo?NodeID=" + strconv.Itoa(int(device.ID)) + "&PubKey=" + url.QueryEscape(device.staticIdentity.publicKey.ToString()) + "&State=" + url.QueryEscape(string(content.State_hash[:]))
if device.LogLevel.LogControl {
fmt.Println("Control: Download peerinfo from :" + downloadurl)
}
//
client := http.Client{
Timeout: 30 * time.Second,
Timeout: 8 * time.Second,
}
resp, err := client.Get(downloadurl)
downloadurl := device.EdgeConfig.DynamicRoute.SuperNode.APIUrl + "/peerinfo" ////////////////////////////////////////////////////////////////////////////////////////////////
req, err := http.NewRequest("GET", downloadurl, nil)
q := req.URL.Query()
q.Add("NodeID", device.ID.ToString())
q.Add("PubKey", device.staticIdentity.publicKey.ToString())
q.Add("State", State_hash)
req.URL.RawQuery = q.Encode()
if device.LogLevel.LogControl {
fmt.Println("Control: Download PeerInfo from :" + req.URL.RequestURI())
}
resp, err := client.Do(req)
if err != nil {
device.log.Errorf(err.Error())
return err
@ -413,7 +394,7 @@ func (device *Device) process_UpdatePeerMsg(peer *Peer, content mtypes.UpdatePee
return err
}
if resp.StatusCode != 200 {
device.log.Errorf("Control: Download peerinfo result failed: " + strconv.Itoa(resp.StatusCode) + " " + string(allbytes))
device.log.Errorf("Control: Download peerinfo failed: " + strconv.Itoa(resp.StatusCode) + " " + string(allbytes))
return nil
}
if device.LogLevel.LogControl {
@ -482,7 +463,7 @@ func (device *Device) process_UpdatePeerMsg(peer *Peer, content mtypes.UpdatePee
send_signal = true
}
}
device.peers.Peer_state = content.State_hash
device.state_hashes.Peer.Store(State_hash)
if send_signal {
device.event_tryendpoint <- struct{}{}
}
@ -490,33 +471,31 @@ func (device *Device) process_UpdatePeerMsg(peer *Peer, content mtypes.UpdatePee
return nil
}
func (device *Device) process_UpdateNhTableMsg(peer *Peer, content mtypes.UpdateNhTableMsg) error {
func (device *Device) process_UpdateNhTableMsg(peer *Peer, State_hash string) error {
if device.EdgeConfig.DynamicRoute.SuperNode.UseSuperNode {
if peer.ID != mtypes.SuperNodeMessage {
if device.state_hashes.NhTable.Load().(string) == State_hash {
if device.LogLevel.LogControl {
fmt.Println("Control: Ignored UpdateErrorMsg. Not from supernode.")
}
return nil
}
if bytes.Equal(device.graph.NhTableHash[:], content.State_hash[:]) {
if device.LogLevel.LogControl {
fmt.Println("Control: Same nhTable Hash, skip download nhTable")
fmt.Println("Control: Same Hash, skip download nhTable")
}
device.graph.NhTableExpire = time.Now().Add(device.graph.SuperNodeInfoTimeout)
return nil
}
var NhTable mtypes.NextHopTable
if bytes.Equal(device.graph.NhTableHash[:], content.State_hash[:]) {
return nil
// Download from supernode
client := &http.Client{
Timeout: 8 * time.Second,
}
downloadurl := device.EdgeConfig.DynamicRoute.SuperNode.APIUrl + "/nhtable?NodeID=" + strconv.Itoa(int(device.ID)) + "&PubKey=" + url.QueryEscape(device.staticIdentity.publicKey.ToString()) + "&State=" + url.QueryEscape(string(content.State_hash[:]))
downloadurl := device.EdgeConfig.DynamicRoute.SuperNode.APIUrl + "/nhtable" ////////////////////////////////////////////////////////////////////////////////////////////////
req, err := http.NewRequest("GET", downloadurl, nil)
q := req.URL.Query()
q.Add("NodeID", device.ID.ToString())
q.Add("PubKey", device.staticIdentity.publicKey.ToString())
q.Add("State", State_hash)
req.URL.RawQuery = q.Encode()
if device.LogLevel.LogControl {
fmt.Println("Control: Download NhTable from :" + downloadurl)
fmt.Println("Control: Download NhTable from :" + req.URL.RequestURI())
}
client := http.Client{
Timeout: 30 * time.Second,
}
resp, err := client.Get(downloadurl)
resp, err := client.Do(req)
if err != nil {
device.log.Errorf(err.Error())
return err
@ -528,7 +507,7 @@ func (device *Device) process_UpdateNhTableMsg(peer *Peer, content mtypes.Update
return err
}
if resp.StatusCode != 200 {
device.log.Errorf("Control: Download peerinfo result failed: " + strconv.Itoa(resp.StatusCode) + " " + string(allbytes))
device.log.Errorf("Control: Download NhTable failed: " + strconv.Itoa(resp.StatusCode) + " " + string(allbytes))
return nil
}
if device.LogLevel.LogControl {
@ -538,25 +517,120 @@ func (device *Device) process_UpdateNhTableMsg(peer *Peer, content mtypes.Update
device.log.Errorf("JSON decode error:", err.Error())
return err
}
device.graph.SetNHTable(NhTable, content.State_hash)
device.graph.SetNHTable(NhTable)
device.state_hashes.NhTable.Store(State_hash)
}
return nil
}
func (device *Device) process_UpdateErrorMsg(peer *Peer, content mtypes.ServerCommandMsg) error {
func (device *Device) process_UpdateSuperParamsMsg(peer *Peer, State_hash string) error {
if device.EdgeConfig.DynamicRoute.SuperNode.UseSuperNode {
if device.state_hashes.SuperParam.Load().(string) == State_hash {
if device.LogLevel.LogControl {
fmt.Println("Control: Same Hash, skip download SuperParams")
}
device.graph.NhTableExpire = time.Now().Add(device.graph.SuperNodeInfoTimeout)
return nil
}
var SuperParams mtypes.API_SuperParams
client := &http.Client{
Timeout: 8 * time.Second,
}
downloadurl := device.EdgeConfig.DynamicRoute.SuperNode.APIUrl + "/superparams" ////////////////////////////////////////////////////////////////////////////////////////////////
req, err := http.NewRequest("GET", downloadurl, nil)
q := req.URL.Query()
q.Add("NodeID", device.ID.ToString())
q.Add("PubKey", device.staticIdentity.publicKey.ToString())
q.Add("State", State_hash)
req.URL.RawQuery = q.Encode()
if device.LogLevel.LogControl {
fmt.Println("Control: Download SuperParams from :" + req.URL.RequestURI())
}
resp, err := client.Do(req)
if err != nil {
device.log.Errorf(err.Error())
return err
}
defer resp.Body.Close()
allbytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
device.log.Errorf(err.Error())
return err
}
if resp.StatusCode != 200 {
device.log.Errorf("Control: Download SuperParams failed: " + strconv.Itoa(resp.StatusCode) + " " + string(allbytes))
return nil
}
if device.LogLevel.LogControl {
fmt.Println("Control: Download SuperParams result :" + string(allbytes))
}
if err := json.Unmarshal(allbytes, &SuperParams); err != nil {
device.log.Errorf("JSON decode error:", err.Error())
return err
}
if SuperParams.PeerAliveTimeout <= 0 {
device.log.Errorf("SuperParams.PeerAliveTimeout <= 0: %v", SuperParams.PeerAliveTimeout)
return fmt.Errorf("SuperParams.PeerAliveTimeout <= 0: %v", SuperParams.PeerAliveTimeout)
}
if SuperParams.SendPingInterval <= 0 {
device.log.Errorf("SuperParams.SendPingInterval <= 0: %v", SuperParams.SendPingInterval)
return fmt.Errorf("SuperParams.SendPingInterval <= 0: %v", SuperParams.SendPingInterval)
}
if SuperParams.HttpPostInterval <= 0 {
device.log.Errorf("SuperParams.HttpPostInterval <= 0: %v", SuperParams.HttpPostInterval)
return fmt.Errorf("SuperParams.HttpPostInterval <= 0: %v", SuperParams.HttpPostInterval)
}
device.EdgeConfig.DynamicRoute.PeerAliveTimeout = SuperParams.PeerAliveTimeout
if device.EdgeConfig.DynamicRoute.SendPingInterval <= 0 {
device.EdgeConfig.DynamicRoute.SendPingInterval = SuperParams.SendPingInterval
device.Chan_SendPingStart <- struct{}{}
} else {
device.EdgeConfig.DynamicRoute.SendPingInterval = SuperParams.SendPingInterval
}
if device.SuperConfig.HttpPostInterval <= 0 {
device.SuperConfig.HttpPostInterval = SuperParams.HttpPostInterval
device.Chan_HttpPostStart <- struct{}{}
} else {
device.SuperConfig.HttpPostInterval = SuperParams.HttpPostInterval
}
if SuperParams.AdditionalCost >= 0 {
device.EdgeConfig.DynamicRoute.AdditionalCost = SuperParams.AdditionalCost
}
device.state_hashes.SuperParam.Store(State_hash)
}
return nil
}
func (device *Device) process_ServerUpdateMsg(peer *Peer, content mtypes.ServerUpdateMsg) error {
if peer.ID != mtypes.SuperNodeMessage {
if device.LogLevel.LogControl {
fmt.Println("Control: Ignored UpdateErrorMsg. Not from supernode.")
}
return nil
}
device.log.Errorf(strconv.Itoa(int(content.ErrorCode)) + ": " + content.ErrorMsg)
if content.Action == mtypes.Shutdown {
switch content.Action {
case mtypes.Shutdown:
device.closed <- 0
} else if content.Action == mtypes.ThrowError {
device.closed <- content.ErrorCode
} else if content.Action == mtypes.Panic {
case mtypes.ThrowError:
device.log.Errorf(strconv.Itoa(int(content.Code)) + ": " + content.Params)
device.closed <- content.Code
case mtypes.Panic:
device.log.Errorf(strconv.Itoa(int(content.Code)) + ": " + content.Params)
panic(content.ToString())
case mtypes.UpdateNhTable:
return device.process_UpdateNhTableMsg(peer, content.Params)
case mtypes.UpdatePeer:
return device.process_UpdatePeerMsg(peer, content.Params)
case mtypes.UpdateSuperParams:
return device.process_UpdateSuperParamsMsg(peer, content.Params)
default:
device.log.Errorf("Unknown Action: %v", content.ToString())
}
return nil
}
@ -699,15 +773,32 @@ func (device *Device) RoutineDetectOfflineAndTryNextEndpoint() {
}
}
func (device *Device) RoutineSendPing() {
func (device *Device) RoutineSendPing(startchan <-chan struct{}) {
if !(device.EdgeConfig.DynamicRoute.P2P.UseP2P || device.EdgeConfig.DynamicRoute.SuperNode.UseSuperNode) {
return
}
timeout := mtypes.S2TD(device.EdgeConfig.DynamicRoute.SendPingInterval)
waitchan := make(<-chan time.Time)
for {
if device.EdgeConfig.DynamicRoute.SendPingInterval > 0 {
waitchan = time.After(mtypes.S2TD(device.EdgeConfig.DynamicRoute.SendPingInterval))
} else {
waitchan = make(<-chan time.Time)
}
select {
case <-startchan:
if device.LogLevel.LogControl {
fmt.Println("Control: Start RoutineSendPing()")
}
for len(startchan) > 0 {
<-startchan
}
case <-waitchan:
if device.LogLevel.LogControl {
fmt.Println("Control: Start RoutineSendPing() by timer")
}
}
packet, usage, _ := device.GeneratePingPacket(device.ID, 0)
device.SpreadPacket(make(map[mtypes.Vertex]bool), usage, packet, MessageTransportOffsetContent)
time.Sleep(timeout)
}
}
@ -716,15 +807,19 @@ func (device *Device) RoutineRegister() {
return
}
timeout := mtypes.S2TD(device.EdgeConfig.DynamicRoute.SendPingInterval)
_ = <-device.Event_Supernode_OK
_ = <-device.Chan_Supernode_OK
for {
local_PeerStateHash := device.state_hashes.Peer.Load().(string)
local_NhTableHash := device.state_hashes.NhTable.Load().(string)
local_SuperParamState := device.state_hashes.SuperParam.Load().(string)
body, _ := mtypes.GetByte(mtypes.RegisterMsg{
Node_id: device.ID,
PeerStateHash: device.peers.Peer_state,
NhStateHash: device.graph.NhTableHash,
Version: device.Version,
JWTSecret: device.JWTSecret,
HttpPostCount: device.HttpPostCount,
Node_id: device.ID,
PeerStateHash: local_PeerStateHash,
NhStateHash: local_NhTableHash,
SuperParamStateHash: local_SuperParamState,
Version: device.Version,
JWTSecret: device.JWTSecret,
HttpPostCount: device.HttpPostCount,
})
buf := make([]byte, path.EgHeaderLen+len(body))
header, _ := path.NewEgHeader(buf[0:path.EgHeaderLen])
@ -738,15 +833,28 @@ func (device *Device) RoutineRegister() {
}
}
func (device *Device) RoutinePostPeerInfo() {
func (device *Device) RoutinePostPeerInfo(startchan <-chan struct{}) {
if !(device.EdgeConfig.DynamicRoute.SuperNode.UseSuperNode) {
return
}
if device.EdgeConfig.DynamicRoute.SuperNode.HttpPostInterval <= 0 {
return
}
timeout := mtypes.S2TD(device.EdgeConfig.DynamicRoute.SuperNode.HttpPostInterval)
waitchan := make(<-chan time.Time)
for {
if device.SuperConfig.HttpPostInterval > 0 {
waitchan = time.After(mtypes.S2TD(device.SuperConfig.HttpPostInterval))
} else {
waitchan = make(<-chan time.Time)
}
select {
case <-waitchan:
break
case <-startchan:
if device.LogLevel.LogControl {
fmt.Println("Control: Start RoutinePostPeerInfo()")
}
for len(startchan) > 0 {
<-startchan
}
}
// Stat all latency
device.peers.RLock()
pongs := make([]mtypes.PongMsg, 0, len(device.peers.IDMap))
@ -802,17 +910,21 @@ func (device *Device) RoutinePostPeerInfo() {
})
tokenString, err := token.SignedString(device.JWTSecret[:])
// Construct post request
client := &http.Client{}
client := &http.Client{
Timeout: 8 * time.Second,
}
downloadurl := device.EdgeConfig.DynamicRoute.SuperNode.APIUrl + "/post/nodeinfo"
req, err := http.NewRequest("POST", downloadurl, bytes.NewReader(body))
q := req.URL.Query()
q.Add("NodeID", device.ID.ToString())
q.Add("PubKey", device.staticIdentity.publicKey.ToString())
q.Add("JWTSig", tokenString)
req.URL.RawQuery = q.Encode()
req.Header.Set("Content-Type", "application/binary")
req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("Content-Encoding", "gzip")
device.HttpPostCount += 1
if device.LogLevel.LogControl {
fmt.Println("Control: Post to " + req.URL.String())
fmt.Println("Control: Post to " + req.URL.RequestURI())
}
resp, err := client.Do(req)
if err != nil {
@ -824,8 +936,6 @@ func (device *Device) RoutinePostPeerInfo() {
}
resp.Body.Close()
}
time.Sleep(timeout)
}
}
@ -833,7 +943,7 @@ func (device *Device) RoutineRecalculateNhTable() {
if device.graph.TimeoutCheckInterval == 0 {
return
}
if !device.EdgeConfig.DynamicRoute.P2P.UseP2P {
return
}

View File

@ -32,14 +32,13 @@ dynamicroute:
savenewpeers: true
supernode:
usesupernode: true
pskey: 'iPM8FXfnHVzwjguZHRW9bLNY+h7+B1O2oTJtktptQkI='
pskey: iPM8FXfnHVzwjguZHRW9bLNY+h7+B1O2oTJtktptQkI=
connurlv4: 127.0.0.1:3000
pubkeyv4: LJ8KKacUcIoACTGB/9Ed9w0osrJ3WWeelzpL2u4oUic=
connurlv6: ''
pubkeyv6: HCfL6YJtpJEGHTlJ2LgVXIWKB/K95P57LHTJ42ZG8VI=
apiurl: http://127.0.0.1:3000/api
skiplocalip: false
httppostinterval: 15
supernodeinfotimeout: 50
p2p:
usep2p: false
@ -48,7 +47,6 @@ dynamicroute:
staticmode: false
jittertolerance: 20
jittertolerancemultiplier: 1.1
nodereporttimeout: 40
timeoutcheckinterval: 5
recalculatecooldown: 5
ntpconfig:

View File

@ -32,14 +32,13 @@ dynamicroute:
savenewpeers: true
supernode:
usesupernode: true
pskey: 'juJMQaGAaeSy8aDsXSKNsPZv/nFiPj4h/1G70tGYygs='
pskey: juJMQaGAaeSy8aDsXSKNsPZv/nFiPj4h/1G70tGYygs=
connurlv4: 127.0.0.1:3000
pubkeyv4: LJ8KKacUcIoACTGB/9Ed9w0osrJ3WWeelzpL2u4oUic=
connurlv6: ''
pubkeyv6: HCfL6YJtpJEGHTlJ2LgVXIWKB/K95P57LHTJ42ZG8VI=
apiurl: http://127.0.0.1:3000/api
skiplocalip: false
httppostinterval: 15
supernodeinfotimeout: 50
p2p:
usep2p: false
@ -48,7 +47,6 @@ dynamicroute:
staticmode: false
jittertolerance: 20
jittertolerancemultiplier: 1.1
nodereporttimeout: 40
timeoutcheckinterval: 5
recalculatecooldown: 5
ntpconfig:

View File

@ -1,16 +1,19 @@
nodename: NodeSuper
postscript: example_config/echo.sh test
postscript: ""
privkeyv4: mL5IW0GuqbjgDeOJuPHBU2iJzBPNKhaNEXbIGwwYWWk=
privkeyv6: +EdOKIoBp/EvIusHDsvXhV1RJYbyN3Qr8nxlz35wl3I=
listenport: 3000
repushconfiginterval: 30
httppostinterval: 50
peeralivetimeout: 70
sendpinginterval: 15
loglevel:
loglevel: normal
logtransit: true
logcontrol: true
lognormal: true
lognormal: false
loginternal: true
logntp: false
repushconfiginterval: 30
passwords:
showstate: passwd
addpeer: passwd_addpeer
@ -19,7 +22,6 @@ graphrecalculatesetting:
staticmode: false
jittertolerance: 5
jittertolerancemultiplier: 1.01
nodereporttimeout: 70
timeoutcheckinterval: 5
recalculatecooldown: 5
nexthoptable:

View File

@ -31,7 +31,7 @@ func printExampleEdgeConf() {
Interface: mtypes.InterfaceConf{
Itype: "stdio",
Name: "tap1",
VPPIfaceID: 5,
VPPIfaceID: 1,
VPPBridgeID: 4242,
MacAddrPrefix: "AA:BB:CC:DD",
MTU: 1416,
@ -45,7 +45,7 @@ func printExampleEdgeConf() {
DefaultTTL: 200,
L2FIBTimeout: 3600,
PrivKey: "6GyDagZKhbm5WNqMiRHhkf43RlbMJ34IieTlIuvfJ1M=",
ListenPort: 3001,
ListenPort: 0,
LogLevel: mtypes.LoggerInfo{
LogLevel: "error",
LogTransit: true,
@ -71,23 +71,21 @@ func printExampleEdgeConf() {
APIUrl: "http://127.0.0.1:3000/api",
SuperNodeInfoTimeout: 50,
SkipLocalIP: false,
HttpPostInterval: 15,
},
P2P: mtypes.P2Pinfo{
UseP2P: true,
UseP2P: false,
SendPeerInterval: 20,
GraphRecalculateSetting: mtypes.GraphRecalculateSetting{
StaticMode: false,
JitterTolerance: 20,
JitterToleranceMultiplier: 1.1,
NodeReportTimeout: 40,
TimeoutCheckInterval: 5,
RecalculateCoolDown: 5,
},
},
NTPconfig: mtypes.NTPinfo{
UseNTP: true,
MaxServerUse: 5,
MaxServerUse: 8,
SyncTimeInterval: 3600,
NTPTimeout: 3,
Servers: []string{"time.google.com",
@ -229,7 +227,7 @@ func Edge(configPath string, useUAPI bool, printExample bool, bindmode string) (
econfig.LogLevel.LogNTP = false // NTP in static mode is useless
}
graph := path.NewGraph(3, false, econfig.DynamicRoute.P2P.GraphRecalculateSetting, econfig.DynamicRoute.NTPconfig, econfig.LogLevel)
graph.SetNHTable(econfig.NextHopTable, [32]byte{})
graph.SetNHTable(econfig.NextHopTable)
the_device := device.NewDevice(thetap, econfig.NodeID, conn.NewDefaultBind(true, true, bindmode), logger, graph, false, configPath, &econfig, nil, nil, Version)
defer the_device.Close()
@ -310,7 +308,7 @@ func Edge(configPath string, useUAPI bool, printExample bool, bindmode string) (
return errors.New("Failed to connect to supernode.")
}
}
the_device.Event_Supernode_OK <- struct{}{}
the_device.Chan_Supernode_OK <- struct{}{}
}
logger.Verbosef("Device started")

View File

@ -1,7 +1,6 @@
package main
import (
"bytes"
"crypto/md5"
"crypto/sha256"
"encoding/base64"
@ -28,15 +27,17 @@ import (
)
type http_shared_objects struct {
http_graph *path.IG
http_device4 *device.Device
http_device6 *device.Device
http_HashSalt []byte
http_NhTable_Hash [32]byte
http_PeerInfo_hash [32]byte
http_NhTableStr []byte
http_PeerInfo mtypes.API_Peers
http_super_chains *mtypes.SUPER_Events
http_graph *path.IG
http_device4 *device.Device
http_device6 *device.Device
http_HashSalt []byte
http_NhTable_Hash string
http_PeerInfo_hash string
http_SuperParams_Hash string
http_SuperParamsStr []byte
http_NhTableStr []byte
http_PeerInfo mtypes.API_Peers
http_super_chains *mtypes.SUPER_Events
http_passwords mtypes.Passwords
http_StateExpire time.Time
@ -78,11 +79,12 @@ type HttpPeerInfo struct {
}
type PeerState struct {
NhTableState [32]byte
PeerInfoState [32]byte
JETSecret mtypes.JWTSecret
httpPostCount uint64
LastSeen time.Time
NhTableState string
PeerInfoState string
SuperParamState string
JETSecret mtypes.JWTSecret
httpPostCount uint64
LastSeen time.Time
}
type client struct {
@ -149,7 +151,7 @@ func extractParamsVertex(params url.Values, key string, w http.ResponseWriter) (
return mtypes.Vertex(val), nil
}
func get_api_peers(old_State_hash [32]byte) (api_peerinfo mtypes.API_Peers, StateHash [32]byte, changed bool) {
func get_api_peers(old_State_hash string) (api_peerinfo mtypes.API_Peers, StateHash string, changed bool) {
// No lock
api_peerinfo = make(mtypes.API_Peers)
for _, peerinfo := range httpobj.http_sconfig.Peers {
@ -158,7 +160,7 @@ func get_api_peers(old_State_hash [32]byte) (api_peerinfo mtypes.API_Peers, Stat
PSKey: peerinfo.PSKey,
Connurl: &mtypes.API_connurl{},
}
if httpobj.http_PeerState[peerinfo.PubKey].LastSeen.Add(mtypes.S2TD(httpobj.http_sconfig.GraphRecalculateSetting.NodeReportTimeout)).After(time.Now()) {
if httpobj.http_PeerState[peerinfo.PubKey].LastSeen.Add(mtypes.S2TD(httpobj.http_sconfig.PeerAliveTimeout)).After(time.Now()) {
connV4 := httpobj.http_device4.GetConnurl(peerinfo.NodeID)
connV6 := httpobj.http_device6.GetConnurl(peerinfo.NodeID)
if connV4 != "" {
@ -177,14 +179,15 @@ func get_api_peers(old_State_hash [32]byte) (api_peerinfo mtypes.API_Peers, Stat
api_peerinfo_str_byte, _ := json.Marshal(&api_peerinfo)
hash_raw := md5.Sum(append(api_peerinfo_str_byte, httpobj.http_HashSalt...))
hash_str := hex.EncodeToString(hash_raw[:])
copy(StateHash[:], []byte(hash_str))
if bytes.Equal(old_State_hash[:], StateHash[:]) == false {
StateHash = hash_str
if old_State_hash != StateHash {
changed = true
}
return
}
func get_peerinfo(w http.ResponseWriter, r *http.Request) {
func get_superparams(w http.ResponseWriter, r *http.Request) {
// Read all params
params := r.URL.Query()
PubKey, err := extractParamsStr(params, "PubKey", w)
if err != nil {
@ -198,58 +201,135 @@ func get_peerinfo(w http.ResponseWriter, r *http.Request) {
if err != nil {
return
}
if NodeID >= mtypes.Special_NodeID {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("Paramater NodeID: Can't use special nodeID."))
return
}
// Authentication
httpobj.RLock()
defer httpobj.RUnlock()
if _, has := httpobj.http_PeerID2Info[NodeID]; !has {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("Paramater PubKey: NodeID and PubKey are not match"))
return
}
if httpobj.http_PeerID2Info[NodeID].PubKey != PubKey {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("Paramater NodeID: NodeID and PubKey are not match"))
w.Write([]byte("Paramater PubKey: NodeID and PubKey are not match"))
return
}
if bytes.Equal(httpobj.http_PeerInfo_hash[:], []byte(State)) {
if state := httpobj.http_PeerState[PubKey]; state != nil {
copy(httpobj.http_PeerState[PubKey].PeerInfoState[:], State)
http_PeerInfo_2peer := make(mtypes.API_Peers)
for PeerPubKey, peerinfo := range httpobj.http_PeerInfo {
if httpobj.http_sconfig.UsePSKForInterEdge {
h := sha256.New()
if NodeID > peerinfo.NodeID {
h.Write([]byte(PubKey))
h.Write([]byte(PeerPubKey))
} else if NodeID < peerinfo.NodeID {
h.Write([]byte(PeerPubKey))
h.Write([]byte(PubKey))
} else {
continue
}
h.Write(httpobj.http_HashSalt)
bs := h.Sum(nil)
var psk device.NoisePresharedKey
copy(psk[:], bs[:])
peerinfo.PSKey = psk.ToString()
} else {
peerinfo.PSKey = ""
}
if httpobj.http_PeerID2Info[NodeID].SkipLocalIP { // Clear all local IP
peerinfo.Connurl.LocalV4 = make(map[string]float64)
peerinfo.Connurl.LocalV6 = make(map[string]float64)
}
http_PeerInfo_2peer[PeerPubKey] = peerinfo
}
api_peerinfo_str_byte, _ := json.Marshal(&http_PeerInfo_2peer)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(api_peerinfo_str_byte)
return
}
if httpobj.http_SuperParams_Hash != State {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("Paramater State: State not correct"))
return
}
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("Paramater State: State not correct"))
if _, has := httpobj.http_PeerState[PubKey]; has == false {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("Paramater PubKey: Not found in httpobj.http_PeerState, this shouldn't happen. Please report to the author."))
return
}
// Do something
SuperParams := mtypes.API_SuperParams{
SendPingInterval: httpobj.http_sconfig.SendPingInterval,
HttpPostInterval: httpobj.http_sconfig.HttpPostInterval,
PeerAliveTimeout: httpobj.http_sconfig.PeerAliveTimeout,
AdditionalCost: httpobj.http_PeerID2Info[NodeID].AdditionalCost,
}
SuperParamStr, _ := json.Marshal(SuperParams)
httpobj.http_PeerState[PubKey].SuperParamState = State
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(SuperParamStr))
return
}
func get_peerinfo(w http.ResponseWriter, r *http.Request) {
// Read all params
params := r.URL.Query()
PubKey, err := extractParamsStr(params, "PubKey", w)
if err != nil {
return
}
State, err := extractParamsStr(params, "State", w)
if err != nil {
return
}
NodeID, err := extractParamsVertex(params, "NodeID", w)
if err != nil {
return
}
if NodeID >= mtypes.Special_NodeID {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("Paramater NodeID: Can't use special nodeID."))
return
}
// Authentication
httpobj.RLock()
defer httpobj.RUnlock()
if _, has := httpobj.http_PeerID2Info[NodeID]; !has {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("Paramater PubKey: NodeID and PubKey are not match"))
return
}
if httpobj.http_PeerID2Info[NodeID].PubKey != PubKey {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("Paramater PubKey: NodeID and PubKey are not match"))
return
}
if httpobj.http_PeerInfo_hash != State {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("Paramater State: State not correct"))
return
}
if _, has := httpobj.http_PeerState[PubKey]; has == false {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("Paramater PubKey: Not found in httpobj.http_PeerState, this shouldn't happen. Please report to the author."))
return
}
// Do something
httpobj.http_PeerState[PubKey].PeerInfoState = State
http_PeerInfo_2peer := make(mtypes.API_Peers)
for PeerPubKey, peerinfo := range httpobj.http_PeerInfo {
if httpobj.http_sconfig.UsePSKForInterEdge {
h := sha256.New()
if NodeID > peerinfo.NodeID {
h.Write([]byte(PubKey))
h.Write([]byte(PeerPubKey))
} else if NodeID < peerinfo.NodeID {
h.Write([]byte(PeerPubKey))
h.Write([]byte(PubKey))
} else {
continue
}
h.Write(httpobj.http_HashSalt)
bs := h.Sum(nil)
var psk device.NoisePresharedKey
copy(psk[:], bs[:])
peerinfo.PSKey = psk.ToString()
} else {
peerinfo.PSKey = ""
}
if httpobj.http_PeerID2Info[NodeID].SkipLocalIP { // Clear all local IP
peerinfo.Connurl.LocalV4 = make(map[string]float64)
peerinfo.Connurl.LocalV6 = make(map[string]float64)
}
http_PeerInfo_2peer[PeerPubKey] = peerinfo
}
api_peerinfo_str_byte, _ := json.Marshal(&http_PeerInfo_2peer)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(api_peerinfo_str_byte)
return
}
func get_nhtable(w http.ResponseWriter, r *http.Request) {
// Read all params
params := r.URL.Query()
PubKey, err := extractParamsStr(params, "PubKey", w)
if err != nil {
@ -263,25 +343,41 @@ func get_nhtable(w http.ResponseWriter, r *http.Request) {
if err != nil {
return
}
if NodeID >= mtypes.Special_NodeID {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("Paramater NodeID: Can't use special nodeID."))
return
}
// Authentication
httpobj.RLock()
defer httpobj.RUnlock()
if _, has := httpobj.http_PeerID2Info[NodeID]; !has {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("Paramater PubKey: NodeID and PubKey are not match"))
return
}
if httpobj.http_PeerID2Info[NodeID].PubKey != PubKey {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("NodeID and PunKey are not match"))
w.Write([]byte("Paramater PubKey: NodeID and PubKey are not match"))
return
}
if httpobj.http_NhTable_Hash != State {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("Paramater State: State not correct"))
return
}
if _, has := httpobj.http_PeerState[PubKey]; has == false {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("Paramater PubKey: Not found in httpobj.http_PeerState, this shouldn't happen. Please report to the author."))
return
}
if bytes.Equal(httpobj.http_NhTable_Hash[:], []byte(State)) {
if state := httpobj.http_PeerState[PubKey]; state != nil {
copy(httpobj.http_PeerState[PubKey].NhTableState[:], State)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(httpobj.http_NhTableStr))
return
}
}
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("State not correct"))
httpobj.http_PeerState[PubKey].NhTableState = State
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(httpobj.http_NhTableStr))
return
}
func get_peerstate(w http.ResponseWriter, r *http.Request) {
@ -329,27 +425,34 @@ func post_nodeinfo(w http.ResponseWriter, r *http.Request) {
if err != nil {
return
}
JWTSig, err := extractParamsStr(params, "JWTSig", w)
PubKey, err := extractParamsStr(params, "PubKey", w)
if err != nil {
return
}
if NodeID >= mtypes.Special_NodeID {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("Paramater NodeID: Can't use special nodeID."))
return
}
httpobj.RLock()
defer httpobj.RUnlock()
var PubKey string
if peerconf, has := httpobj.http_PeerID2Info[NodeID]; has {
PubKey = peerconf.PubKey
} else {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("Paramater NodeID: NodeID not exists."))
JWTSig, err := extractParamsStr(params, "JWTSig", w)
if err != nil {
return
}
httpobj.RLock()
defer httpobj.RUnlock()
if _, has := httpobj.http_PeerID2Info[NodeID]; !has {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("NodeID and PunKey are not match"))
return
}
if httpobj.http_PeerID2Info[NodeID].PubKey != PubKey {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("NodeID and PunKey are not match"))
return
}
JWTSecret := httpobj.http_PeerState[PubKey].JETSecret
httpPostCount := httpobj.http_PeerState[PubKey].httpPostCount
@ -432,11 +535,9 @@ func post_nodeinfo(w http.ResponseWriter, r *http.Request) {
if changed {
NhTable := httpobj.http_graph.GetNHTable(true)
NhTablestr, _ := json.Marshal(NhTable)
md5_hash_raw := md5.Sum(append(httpobj.http_NhTableStr, httpobj.http_HashSalt...))
md5_hash_raw := md5.Sum(append(NhTablestr, httpobj.http_HashSalt...))
new_hash_str := hex.EncodeToString(md5_hash_raw[:])
new_hash_str_byte := []byte(new_hash_str)
copy(httpobj.http_NhTable_Hash[:], new_hash_str_byte)
copy(httpobj.http_graph.NhTableHash[:], new_hash_str_byte)
httpobj.http_NhTable_Hash = new_hash_str
httpobj.http_NhTableStr = NhTablestr
PushNhTable(false)
}
@ -533,7 +634,7 @@ func peeradd(w http.ResponseWriter, r *http.Request) { //Waiting for test
w.Write([]byte(fmt.Sprintf("Paramater nexthoptable: \"%v\", %v", NhTableStr, err)))
return
}
httpobj.http_graph.SetNHTable(NewNhTable, [32]byte{})
httpobj.http_graph.SetNHTable(NewNhTable)
}
err = super_peeradd(mtypes.SuperPeerInfo{
NodeID: NodeID,
@ -583,14 +684,14 @@ func peerdel(w http.ResponseWriter, r *http.Request) { //Waiting for test
defer httpobj.Unlock()
if pwderr == nil { // user provide the password
if password == httpobj.http_passwords.DelPeer {
NodeID, err = extractParamsVertex(params, "nodeid", w)
NodeID, err = extractParamsVertex(params, "NodeID", w)
if err != nil {
return
}
toDelete = NodeID
if _, has := httpobj.http_PeerID2Info[toDelete]; !has {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte(fmt.Sprintf("Paramater nodeid: \"%v\" not found", PubKey)))
w.Write([]byte(fmt.Sprintf("Paramater NodeID: \"%v\" not found", PubKey)))
return
}
} else {
@ -599,14 +700,14 @@ func peerdel(w http.ResponseWriter, r *http.Request) { //Waiting for test
return
}
} else { // user don't provide the password
PrivKey, err = extractParamsStr(params, "privkey", w)
PrivKey, err = extractParamsStr(params, "PrivKey", w)
if err != nil {
return
}
privk, err := device.Str2PriKey(PrivKey)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fmt.Sprintf("Paramater privkey: %v", err)))
w.Write([]byte(fmt.Sprintf("Paramater PrivKey: %v", err)))
return
}
pubk := privk.PublicKey()
@ -618,7 +719,7 @@ func peerdel(w http.ResponseWriter, r *http.Request) { //Waiting for test
}
if toDelete == mtypes.Broadcast {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte(fmt.Sprintf("Paramater privkey: \"%v\" not found", PubKey)))
w.Write([]byte(fmt.Sprintf("Paramater PrivKey: \"%v\" not found", PubKey)))
return
}
}
@ -636,7 +737,7 @@ func peerdel(w http.ResponseWriter, r *http.Request) { //Waiting for test
mtypesBytes, _ := yaml.Marshal(httpobj.http_sconfig)
ioutil.WriteFile(httpobj.http_sconfig_path, mtypesBytes, 0644)
w.WriteHeader(http.StatusOK)
w.Write([]byte("Node ID: " + toDelete.ToString() + " deleted."))
w.Write([]byte("NodeID: " + toDelete.ToString() + " deleted."))
return
}
@ -645,6 +746,7 @@ func HttpServer(http_port int, apiprefix string) {
if apiprefix[0] != '/' {
apiprefix = "/" + apiprefix
}
mux.HandleFunc(apiprefix+"/superparams", get_superparams)
mux.HandleFunc(apiprefix+"/peerinfo", get_peerinfo)
mux.HandleFunc(apiprefix+"/nhtable", get_nhtable)
mux.HandleFunc(apiprefix+"/peerstate", get_peerstate)

View File

@ -6,7 +6,6 @@
package main
import (
"bytes"
"crypto/md5"
"encoding/hex"
"encoding/json"
@ -80,6 +79,9 @@ func printExampleSuperConf() {
LogNTP: false,
},
RePushConfigInterval: 30,
PeerAliveTimeout: 70,
HttpPostInterval: 50,
SendPingInterval: 15,
Passwords: mtypes.Passwords{
ShowState: "passwd",
AddPeer: "passwd_addpeer",
@ -89,7 +91,6 @@ func printExampleSuperConf() {
StaticMode: false,
JitterTolerance: 5,
JitterToleranceMultiplier: 1.01,
NodeReportTimeout: 70,
TimeoutCheckInterval: 5,
RecalculateCoolDown: 5,
},
@ -148,6 +149,20 @@ func Super(configPath string, useUAPI bool, printExample bool, bindmode string)
if len(NodeName) > 32 {
return errors.New("Node name can't longer than 32 :" + NodeName)
}
if sconfig.PeerAliveTimeout <= 0 {
return fmt.Errorf("PeerAliveTimeout must > 0 : %v", sconfig.PeerAliveTimeout)
}
if sconfig.HttpPostInterval <= 0 {
return fmt.Errorf("HttpPostInterval must > 0 : %v", sconfig.HttpPostInterval)
} else if sconfig.HttpPostInterval > sconfig.PeerAliveTimeout {
return fmt.Errorf("HttpPostInterval must <= PeerAliveTimeout : %v", sconfig.HttpPostInterval)
}
if sconfig.SendPingInterval <= 0 {
return fmt.Errorf("SendPingInterval must > 0 : %v", sconfig.SendPingInterval)
}
if sconfig.RePushConfigInterval <= 0 {
return fmt.Errorf("RePushConfigInterval must > 0 : %v", sconfig.RePushConfigInterval)
}
var logLevel int
switch sconfig.LogLevel.LogLevel {
@ -182,7 +197,7 @@ func Super(configPath string, useUAPI bool, printExample bool, bindmode string)
Event_server_register: make(chan mtypes.RegisterMsg, 1<<5),
}
httpobj.http_graph = path.NewGraph(3, true, sconfig.GraphRecalculateSetting, mtypes.NTPinfo{}, sconfig.LogLevel)
httpobj.http_graph.SetNHTable(httpobj.http_sconfig.NextHopTable, [32]byte{})
httpobj.http_graph.SetNHTable(httpobj.http_sconfig.NextHopTable)
if sconfig.GraphRecalculateSetting.StaticMode {
err = checkNhTable(httpobj.http_sconfig.NextHopTable, sconfig.Peers)
if err != nil {
@ -242,7 +257,7 @@ func Super(configPath string, useUAPI bool, printExample bool, bindmode string)
}
defer uapi6.Close()
}
prepare_superparams(*httpobj.http_sconfig)
go Event_server_event_hendler(httpobj.http_graph, httpobj.http_super_chains)
go RoutinePushSettings(mtypes.S2TD(sconfig.RePushConfigInterval))
go RoutineTimeoutCheck()
@ -278,6 +293,21 @@ func Super(configPath string, useUAPI bool, printExample bool, bindmode string)
return
}
func prepare_superparams(sconfig mtypes.SuperConfig) {
SuperParams := mtypes.API_SuperParams{
SendPingInterval: sconfig.SendPingInterval,
HttpPostInterval: sconfig.HttpPostInterval,
PeerAliveTimeout: sconfig.PeerAliveTimeout,
AdditionalCost: -1,
}
SuperParamStr, _ := json.Marshal(SuperParams)
httpobj.http_SuperParamsStr = SuperParamStr
md5_hash_raw := md5.Sum(append(httpobj.http_SuperParamsStr, httpobj.http_HashSalt...))
new_hash_str := hex.EncodeToString(md5_hash_raw[:])
httpobj.http_SuperParams_Hash = new_hash_str
}
func super_peeradd(peerconf mtypes.SuperPeerInfo) error {
// No lock, lock before call me
pk, err := device.Str2PubKey(peerconf.PubKey)
@ -337,14 +367,14 @@ func super_peerdel(toDelete mtypes.Vertex) {
}
func super_peerdel_notify(toDelete mtypes.Vertex, PubKey string) {
UpdateErrorMsg := mtypes.ServerCommandMsg{
Node_id: toDelete,
Action: mtypes.Shutdown,
ErrorCode: int(syscall.ENOENT),
ErrorMsg: "You've been removed from supernode.",
ServerUpdateMsg := mtypes.ServerUpdateMsg{
Node_id: toDelete,
Action: mtypes.Shutdown,
Code: int(syscall.ENOENT),
Params: "You've been removed from supernode.",
}
for i := 0; i < 10; i++ {
body, _ := mtypes.GetByte(&UpdateErrorMsg)
body, _ := mtypes.GetByte(&ServerUpdateMsg)
buf := make([]byte, path.EgHeaderLen+len(body))
header, _ := path.NewEgHeader(buf[:path.EgHeaderLen])
header.SetSrc(mtypes.SuperNodeMessage)
@ -354,10 +384,10 @@ func super_peerdel_notify(toDelete mtypes.Vertex, PubKey string) {
header.SetDst(toDelete)
peer4 := httpobj.http_device4.LookupPeerByStr(PubKey)
httpobj.http_device4.SendPacket(peer4, path.UpdateError, buf, device.MessageTransportOffsetContent)
httpobj.http_device4.SendPacket(peer4, path.ServerUpdate, buf, device.MessageTransportOffsetContent)
peer6 := httpobj.http_device6.LookupPeerByStr(PubKey)
httpobj.http_device6.SendPacket(peer6, path.UpdateError, buf, device.MessageTransportOffsetContent)
httpobj.http_device6.SendPacket(peer6, path.ServerUpdate, buf, device.MessageTransportOffsetContent)
time.Sleep(mtypes.S2TD(0.1))
}
httpobj.http_device4.RemovePeerByID(toDelete)
@ -378,12 +408,12 @@ func Event_server_event_hendler(graph *path.IG, events *mtypes.SUPER_Events) {
httpobj.http_PeerState[PubKey].LastSeen = time.Now()
httpobj.http_PeerState[PubKey].JETSecret = reg_msg.JWTSecret
httpobj.http_PeerState[PubKey].httpPostCount = reg_msg.HttpPostCount
if bytes.Equal(httpobj.http_PeerState[PubKey].NhTableState[:], reg_msg.NhStateHash[:]) == false {
copy(httpobj.http_PeerState[PubKey].NhTableState[:], reg_msg.NhStateHash[:])
if httpobj.http_PeerState[PubKey].NhTableState == reg_msg.NhStateHash == false {
httpobj.http_PeerState[PubKey].NhTableState = reg_msg.NhStateHash
should_push_nh = true
}
if bytes.Equal(httpobj.http_PeerState[PubKey].PeerInfoState[:], reg_msg.PeerStateHash[:]) == false {
copy(httpobj.http_PeerState[PubKey].PeerInfoState[:], reg_msg.PeerStateHash[:])
if httpobj.http_PeerState[PubKey].PeerInfoState == reg_msg.PeerStateHash == false {
httpobj.http_PeerState[PubKey].PeerInfoState = reg_msg.PeerStateHash
should_push_peer = true
}
}
@ -414,11 +444,9 @@ func Event_server_event_hendler(graph *path.IG, events *mtypes.SUPER_Events) {
if changed {
NhTable := graph.GetNHTable(true)
NhTablestr, _ := json.Marshal(NhTable)
md5_hash_raw := md5.Sum(append(httpobj.http_NhTableStr, httpobj.http_HashSalt...))
md5_hash_raw := md5.Sum(append(NhTablestr, httpobj.http_HashSalt...))
new_hash_str := hex.EncodeToString(md5_hash_raw[:])
new_hash_str_byte := []byte(new_hash_str)
copy(httpobj.http_NhTable_Hash[:], new_hash_str_byte)
copy(graph.NhTableHash[:], new_hash_str_byte)
httpobj.http_NhTable_Hash = new_hash_str
httpobj.http_NhTableStr = NhTablestr
PushNhTable(false)
}
@ -438,7 +466,8 @@ func RoutinePushSettings(interval time.Duration) {
force = false
}
PushNhTable(force)
PushPeerinfo(force)
PushPeerinfo(false)
PushServerParams(false)
time.Sleep(mtypes.S2TD(1))
}
}
@ -460,8 +489,11 @@ func RoutineTimeoutCheck() {
func PushNhTable(force bool) {
// No lock
body, err := mtypes.GetByte(mtypes.UpdateNhTableMsg{
State_hash: httpobj.http_NhTable_Hash,
body, err := mtypes.GetByte(mtypes.ServerUpdateMsg{
Node_id: mtypes.SuperNodeMessage,
Action: mtypes.UpdateNhTable,
Code: 0,
Params: string(httpobj.http_NhTable_Hash[:]),
})
if err != nil {
fmt.Println("Error get byte")
@ -475,16 +507,16 @@ func PushNhTable(force bool) {
header.SetTTL(0)
copy(buf[path.EgHeaderLen:], body)
for pkstr, peerstate := range httpobj.http_PeerState {
isAlive := peerstate.LastSeen.Add(mtypes.S2TD(httpobj.http_sconfig.GraphRecalculateSetting.NodeReportTimeout)).After(time.Now())
isAlive := peerstate.LastSeen.Add(mtypes.S2TD(httpobj.http_sconfig.PeerAliveTimeout)).After(time.Now())
if !isAlive {
continue
}
if force || peerstate.NhTableState != httpobj.http_NhTable_Hash {
if peer := httpobj.http_device4.LookupPeerByStr(pkstr); peer != nil && peer.GetEndpointDstStr() != "" {
httpobj.http_device4.SendPacket(peer, path.UpdateNhTable, buf, device.MessageTransportOffsetContent)
httpobj.http_device4.SendPacket(peer, path.ServerUpdate, buf, device.MessageTransportOffsetContent)
}
if peer := httpobj.http_device6.LookupPeerByStr(pkstr); peer != nil && peer.GetEndpointDstStr() != "" {
httpobj.http_device6.SendPacket(peer, path.UpdateNhTable, buf, device.MessageTransportOffsetContent)
httpobj.http_device6.SendPacket(peer, path.ServerUpdate, buf, device.MessageTransportOffsetContent)
}
}
}
@ -492,8 +524,11 @@ func PushNhTable(force bool) {
func PushPeerinfo(force bool) {
//No lock
body, err := mtypes.GetByte(mtypes.UpdatePeerMsg{
State_hash: httpobj.http_PeerInfo_hash,
body, err := mtypes.GetByte(mtypes.ServerUpdateMsg{
Node_id: mtypes.SuperNodeMessage,
Action: mtypes.UpdatePeer,
Code: 0,
Params: string(httpobj.http_PeerInfo_hash[:]),
})
if err != nil {
fmt.Println("Error get byte")
@ -507,16 +542,51 @@ func PushPeerinfo(force bool) {
header.SetTTL(0)
copy(buf[path.EgHeaderLen:], body)
for pkstr, peerstate := range httpobj.http_PeerState {
isAlive := peerstate.LastSeen.Add(mtypes.S2TD(httpobj.http_sconfig.GraphRecalculateSetting.NodeReportTimeout)).After(time.Now())
isAlive := peerstate.LastSeen.Add(mtypes.S2TD(httpobj.http_sconfig.PeerAliveTimeout)).After(time.Now())
if !isAlive {
continue
}
if force || peerstate.PeerInfoState != httpobj.http_PeerInfo_hash {
if peer := httpobj.http_device4.LookupPeerByStr(pkstr); peer != nil {
httpobj.http_device4.SendPacket(peer, path.UpdatePeer, buf, device.MessageTransportOffsetContent)
httpobj.http_device4.SendPacket(peer, path.ServerUpdate, buf, device.MessageTransportOffsetContent)
}
if peer := httpobj.http_device6.LookupPeerByStr(pkstr); peer != nil {
httpobj.http_device6.SendPacket(peer, path.UpdatePeer, buf, device.MessageTransportOffsetContent)
httpobj.http_device6.SendPacket(peer, path.ServerUpdate, buf, device.MessageTransportOffsetContent)
}
}
}
}
func PushServerParams(force bool) {
//No lock
body, err := mtypes.GetByte(mtypes.ServerUpdateMsg{
Node_id: mtypes.SuperNodeMessage,
Action: mtypes.UpdateSuperParams,
Code: 0,
Params: string(httpobj.http_SuperParams_Hash[:]),
})
if err != nil {
fmt.Println("Error get byte")
return
}
buf := make([]byte, path.EgHeaderLen+len(body))
header, _ := path.NewEgHeader(buf[:path.EgHeaderLen])
header.SetDst(mtypes.SuperNodeMessage)
header.SetPacketLength(uint16(len(body)))
header.SetSrc(mtypes.SuperNodeMessage)
header.SetTTL(0)
copy(buf[path.EgHeaderLen:], body)
for pkstr, peerstate := range httpobj.http_PeerState {
isAlive := peerstate.LastSeen.Add(mtypes.S2TD(httpobj.http_sconfig.PeerAliveTimeout)).After(time.Now())
if !isAlive {
continue
}
if force || peerstate.SuperParamState != httpobj.http_SuperParams_Hash {
if peer := httpobj.http_device4.LookupPeerByStr(pkstr); peer != nil {
httpobj.http_device4.SendPacket(peer, path.ServerUpdate, buf, device.MessageTransportOffsetContent)
}
if peer := httpobj.http_device6.LookupPeerByStr(pkstr); peer != nil {
httpobj.http_device6.SendPacket(peer, path.ServerUpdate, buf, device.MessageTransportOffsetContent)
}
}
}

View File

@ -3,6 +3,7 @@ package mtypes
import (
"math"
"strconv"
"sync/atomic"
)
// Nonnegative integer ID of vertex
@ -38,8 +39,11 @@ type SuperConfig struct {
PrivKeyV4 string
PrivKeyV6 string
ListenPort int
LogLevel LoggerInfo
RePushConfigInterval float64
HttpPostInterval float64
PeerAliveTimeout float64
SendPingInterval float64
LogLevel LoggerInfo
Passwords Passwords
GraphRecalculateSetting GraphRecalculateSetting
NextHopTable NextHopTable
@ -135,7 +139,6 @@ type SuperInfo struct {
PubKeyV6 string
APIUrl string
SkipLocalIP bool
HttpPostInterval float64
SuperNodeInfoTimeout float64
}
@ -149,7 +152,6 @@ type GraphRecalculateSetting struct {
StaticMode bool
JitterTolerance float64
JitterToleranceMultiplier float64
NodeReportTimeout float64
TimeoutCheckInterval float64
RecalculateCoolDown float64
}
@ -201,6 +203,19 @@ type API_Peerinfo struct {
Connurl *API_connurl
}
type API_SuperParams struct {
SendPingInterval float64
HttpPostInterval float64
PeerAliveTimeout float64
AdditionalCost float64
}
type StateHash struct {
Peer atomic.Value //[32]byte
SuperParam atomic.Value //[32]byte
NhTable atomic.Value //[32]byte
}
type API_Peers map[string]API_Peerinfo // map[PubKey]API_Peerinfo
type JWTSecret [32]byte

View File

@ -22,12 +22,13 @@ func GetByte(structIn interface{}) (bb []byte, err error) {
}
type RegisterMsg struct {
Node_id Vertex
Version string
PeerStateHash [32]byte
NhStateHash [32]byte
JWTSecret JWTSecret
HttpPostCount uint64
Node_id Vertex
Version string
PeerStateHash string
NhStateHash string
SuperParamStateHash string
JWTSecret JWTSecret
HttpPostCount uint64
}
func Hash2Str(h []byte) string {
@ -40,7 +41,7 @@ func Hash2Str(h []byte) string {
}
func (c *RegisterMsg) ToString() string {
return fmt.Sprint("RegisterMsg Node_id:"+c.Node_id.ToString(), " Version:"+c.Version, " PeerHash:"+Hash2Str(c.PeerStateHash[:]), " NhHash:"+Hash2Str(c.NhStateHash[:]))
return fmt.Sprint("RegisterMsg Node_id:"+c.Node_id.ToString(), " Version:"+c.Version, " PeerHash:"+c.PeerStateHash, " NhHash:"+c.NhStateHash, " SuperParamHash:"+c.SuperParamStateHash)
}
func ParseRegisterMsg(bin []byte) (StructPlace RegisterMsg, err error) {
@ -58,27 +59,38 @@ const (
Shutdown
ThrowError
Panic
UpdatePeer
UpdateNhTable
UpdateSuperParams
)
func (a *ServerCommand) ToString() string {
if *a == Shutdown {
switch *a {
case Shutdown:
return "Shutdown"
} else if *a == ThrowError {
case ThrowError:
return "ThrowError"
} else if *a == Panic {
case Panic:
return "Panic"
case UpdatePeer:
return "UpdatePeer"
case UpdateNhTable:
return "UpdateNhTable"
case UpdateSuperParams:
return "UpdateSuperParams"
default:
return "Unknown"
}
return "Unknown"
}
type ServerCommandMsg struct {
Node_id Vertex
Action ServerCommand
ErrorCode int
ErrorMsg string
type ServerUpdateMsg struct {
Node_id Vertex
Action ServerCommand
Code int
Params string
}
func ParseUpdateErrorMsg(bin []byte) (StructPlace ServerCommandMsg, err error) {
func ParseServerUpdateMsg(bin []byte) (StructPlace ServerUpdateMsg, err error) {
var b bytes.Buffer
b.Write(bin)
d := gob.NewDecoder(&b)
@ -86,40 +98,8 @@ func ParseUpdateErrorMsg(bin []byte) (StructPlace ServerCommandMsg, err error) {
return
}
func (c *ServerCommandMsg) ToString() string {
return "ServerCommandMsg Node_id:" + c.Node_id.ToString() + " Action:" + c.Action.ToString() + " ErrorCode:" + strconv.Itoa(int(c.ErrorCode)) + " ErrorMsg " + c.ErrorMsg
}
type UpdatePeerMsg struct {
State_hash [32]byte
}
func (c *UpdatePeerMsg) ToString() string {
return "UpdatePeerMsg State_hash:" + string(c.State_hash[:])
}
func ParseUpdatePeerMsg(bin []byte) (StructPlace UpdatePeerMsg, err error) {
var b bytes.Buffer
b.Write(bin)
d := gob.NewDecoder(&b)
err = d.Decode(&StructPlace)
return
}
type UpdateNhTableMsg struct {
State_hash [32]byte
}
func (c *UpdateNhTableMsg) ToString() string {
return "UpdateNhTableMsg State_hash:" + string(c.State_hash[:])
}
func ParseUpdateNhTableMsg(bin []byte) (StructPlace UpdateNhTableMsg, err error) {
var b bytes.Buffer
b.Write(bin)
d := gob.NewDecoder(&b)
err = d.Decode(&StructPlace)
return
func (c *ServerUpdateMsg) ToString() string {
return "ServerUpdateMsg Node_id:" + c.Node_id.ToString() + " Action:" + c.Action.ToString() + " Code:" + strconv.Itoa(int(c.Code)) + " Params: " + c.Params
}
type PingMsg struct {

View File

@ -22,11 +22,9 @@ const (
MessageTransportType
NormalPacket
Register //Register to server
UpdatePeer //Comes from server
UpdateNhTable
UpdateError
Register //Send to server
ServerUpdate //Comes from server
PingPacket //Comes from other peer
PongPacket //Send to everyone, include server

View File

@ -1,7 +1,6 @@
package path
import (
"bytes"
"errors"
"fmt"
"io/ioutil"
@ -48,7 +47,7 @@ type IG struct {
recalculateTime time.Time
dlTable mtypes.DistTable
nhTable mtypes.NextHopTable
NhTableHash [32]byte
changed bool
NhTableExpire time.Time
IsSuperMode bool
loglevel mtypes.LoggerInfo
@ -122,7 +121,7 @@ func (g *IG) CheckAnyShouldUpdate() bool {
func (g *IG) RecalculateNhTable(checkchange bool) (changed bool) {
if g.StaticMode {
if bytes.Equal(g.NhTableHash[:], make([]byte, 32)) {
if g.changed {
changed = checkchange
}
return
@ -159,7 +158,7 @@ func (g *IG) RemoveVirt(v mtypes.Vertex, recalculate bool, checkchange bool) (ch
delete(g.edges[u], v)
}
g.edgelock.Unlock()
g.NhTableHash = [32]byte{}
g.changed = true
if recalculate {
changed = g.RecalculateNhTable(checkchange)
}
@ -409,9 +408,9 @@ func Path(u, v mtypes.Vertex, next mtypes.NextHopTable) (path []mtypes.Vertex) {
return path
}
func (g *IG) SetNHTable(nh mtypes.NextHopTable, table_hash [32]byte) { // set nhTable from supernode
func (g *IG) SetNHTable(nh mtypes.NextHopTable) { // set nhTable from supernode
g.nhTable = nh
g.NhTableHash = table_hash
g.changed = true
g.NhTableExpire = time.Now().Add(g.SuperNodeInfoTimeout)
}
@ -500,9 +499,7 @@ func Solve(filePath string, pe bool) error {
return nil
}
g := NewGraph(3, false, mtypes.GraphRecalculateSetting{
NodeReportTimeout: 9999,
}, mtypes.NTPinfo{}, mtypes.LoggerInfo{LogInternal: true})
g := NewGraph(3, false, mtypes.GraphRecalculateSetting{ }, mtypes.NTPinfo{}, mtypes.LoggerInfo{LogInternal: true})
inputb, err := ioutil.ReadFile(filePath)
if err != nil {
return err