Add new AdditionalCost, fix some bugs

This commit is contained in:
KusakabeSi 2021-10-27 01:02:44 +00:00
parent b95646576f
commit 056f7173a0
12 changed files with 373 additions and 219 deletions

View File

@ -73,10 +73,11 @@ type PeerInfo struct {
}
type SuperPeerInfo struct {
NodeID Vertex
Name string
PubKey string
PSKey string
NodeID Vertex
Name string
PubKey string
PSKey string
AdditionalCost float64
}
type LoggerInfo struct {
@ -135,6 +136,7 @@ type SuperInfo struct {
type P2Pinfo struct {
UseP2P bool
SendPeerInterval float64
AdditionalCost float64
GraphRecalculateSetting GraphRecalculateSetting
}
@ -153,7 +155,7 @@ type NextHopTable map[Vertex]map[Vertex]*Vertex
type API_Peerinfo struct {
NodeID Vertex
PSKey string
Connurl map[string]int
Connurl map[string]float64
}
type API_Peers map[string]API_Peerinfo // map[PubKey]API_Peerinfo

View File

@ -83,7 +83,6 @@ type Device struct {
Event_server_register chan path.RegisterMsg
Event_server_pong chan path.PongMsg
Event_server_NhTable_changed chan struct{}
Event_save_config chan struct{}
Event_Supernode_OK chan struct{}
@ -91,16 +90,17 @@ type Device struct {
indexTable IndexTable
cookieChecker CookieChecker
IsSuperNode bool
ID config.Vertex
DefaultTTL uint8
graph *path.IG
l2fib sync.Map
fibTimeout float64
LogLevel config.LoggerInfo
DRoute config.DynamicRouteInfo
DupData fixed_time_cache.Cache
Version string
IsSuperNode bool
ID config.Vertex
DefaultTTL uint8
graph *path.IG
l2fib sync.Map
fibTimeout float64
LogLevel config.LoggerInfo
DRoute config.DynamicRouteInfo
DupData fixed_time_cache.Cache
Version string
AdditionalCost float64
pool struct {
messageBuffers *WaitPool
@ -362,14 +362,17 @@ func NewDevice(tapDevice tap.Device, id config.Vertex, bind conn.Bind, logger *L
device.ResetConnInterval = device.EdgeConfig.ResetConnInterval
device.DefaultTTL = econfig.DefaultTTL
device.fibTimeout = econfig.L2FIBTimeout
device.AdditionalCost = device.DRoute.P2P.AdditionalCost
go device.RoutineSetEndpoint()
go device.RoutineDetectOfflineAndTryNextEndpoint()
go device.RoutineRegister()
go device.RoutineSendPing()
go device.RoutineSpreadAllMyNeighbor()
go device.RoutineResetConn()
go device.RoutineClearL2FIB()
go device.RoutineRecalculateNhTable()
}
go device.RoutineRecalculateNhTable()
// create queues
device.queue.handshake = newHandshakeQueue()

View File

@ -18,10 +18,125 @@ import (
"github.com/KusakabeSi/EtherGuardVPN/config"
"github.com/KusakabeSi/EtherGuardVPN/conn"
orderedmap "github.com/KusakabeSi/EtherGuardVPN/orderdmap"
"github.com/KusakabeSi/EtherGuardVPN/path"
"gopkg.in/yaml.v2"
)
type endpoint_tryitem struct {
URL string
lastTry time.Time
firstTry time.Time
}
type endpoint_trylist struct {
sync.RWMutex
timeout time.Duration
peer *Peer
trymap_super map[string]*endpoint_tryitem
trymap_p2p map[string]*endpoint_tryitem
}
func NewEndpoint_trylist(peer *Peer, timeout time.Duration) *endpoint_trylist {
return &endpoint_trylist{
timeout: timeout,
peer: peer,
trymap_super: make(map[string]*endpoint_tryitem),
trymap_p2p: make(map[string]*endpoint_tryitem),
}
}
func (et *endpoint_trylist) UpdateSuper(urls map[string]float64) {
et.Lock()
defer et.Unlock()
newmap_super := make(map[string]*endpoint_tryitem)
if len(urls) == 0 {
if et.peer.device.LogLevel.LogInternal {
fmt.Println(fmt.Sprintf("Internal: Peer %v : Reset trylist(super) %v", et.peer.ID.ToString(), "nil"))
}
}
for url, it := range urls {
_, err := conn.LookupIP(url, 0)
if err != nil {
if et.peer.device.LogLevel.LogInternal {
fmt.Println(fmt.Sprintf("Internal: Peer %v : Update trylist(super) %v error: %v", et.peer.ID.ToString(), url, err))
}
continue
}
if val, ok := et.trymap_super[url]; ok {
if et.peer.device.LogLevel.LogInternal {
fmt.Println(fmt.Sprintf("Internal: Peer %v : Update trylist(super) %v", et.peer.ID.ToString(), url))
}
newmap_super[url] = val
} else {
if et.peer.device.LogLevel.LogInternal {
fmt.Println(fmt.Sprintf("Internal: Peer %v : New trylist(super) %v", et.peer.ID.ToString(), url))
}
newmap_super[url] = &endpoint_tryitem{
URL: url,
lastTry: time.Time{}.Add(path.S2TD(it)),
firstTry: time.Time{},
}
}
}
et.trymap_super = newmap_super
}
func (et *endpoint_trylist) UpdateP2P(url string) {
_, err := conn.LookupIP(url, 0)
if err != nil {
return
}
et.Lock()
defer et.Unlock()
if _, ok := et.trymap_p2p[url]; !ok {
if et.peer.device.LogLevel.LogInternal {
fmt.Println(fmt.Sprintf("Internal: Peer %v : Add trylist(p2p) %v", et.peer.ID.ToString(), url))
}
et.trymap_p2p[url] = &endpoint_tryitem{
URL: url,
lastTry: time.Now(),
firstTry: time.Time{},
}
}
}
func (et *endpoint_trylist) Delete(url string) {
et.Lock()
defer et.Unlock()
delete(et.trymap_super, url)
delete(et.trymap_p2p, url)
}
func (et *endpoint_trylist) GetNextTry() string {
et.RLock()
defer et.RUnlock()
var smallest *endpoint_tryitem
for _, v := range et.trymap_super {
if smallest == nil || smallest.lastTry.After(v.lastTry) {
smallest = v
}
}
for url, v := range et.trymap_p2p {
if v.firstTry.After(time.Time{}) && v.firstTry.Add(et.timeout).Before(time.Now()) {
if et.peer.device.LogLevel.LogInternal {
fmt.Println(fmt.Sprintf("Internal: Peer %v : Delete trylist(p2p) %v", et.peer.ID.ToString(), url))
}
delete(et.trymap_p2p, url)
}
if smallest.lastTry.After(v.lastTry) {
smallest = v
}
}
if smallest == nil {
return ""
}
smallest.lastTry = time.Now()
if smallest.firstTry.After(time.Time{}) {
smallest.firstTry = time.Now()
}
return smallest.URL
}
type Peer struct {
isRunning AtomicBool
sync.RWMutex // Mostly protects endpoint, but is generally taken whenever we modify peer
@ -29,7 +144,7 @@ type Peer struct {
handshake Handshake
device *Device
endpoint conn.Endpoint
endpoint_trylist orderedmap.OrderedMap //map[string]time.Time
endpoint_trylist *endpoint_trylist
LastPingReceived time.Time
stopping sync.WaitGroup // routines pending stop
@ -110,8 +225,8 @@ func (device *Device) NewPeer(pk NoisePublicKey, id config.Vertex, isSuper bool)
}
// create peer
if device.LogLevel.LogControl {
fmt.Println("Control: Create peer with ID : " + id.ToString() + " and PubKey:" + pk.ToString())
if device.LogLevel.LogInternal {
fmt.Println("Internal: Create peer with ID : " + id.ToString() + " and PubKey:" + pk.ToString())
}
peer := new(Peer)
peer.Lock()
@ -119,7 +234,7 @@ func (device *Device) NewPeer(pk NoisePublicKey, id config.Vertex, isSuper bool)
peer.cookieGenerator.Init(pk)
peer.device = device
peer.endpoint_trylist = *orderedmap.New()
peer.endpoint_trylist = NewEndpoint_trylist(peer, path.S2TD(device.DRoute.PeerAliveTimeout))
peer.queue.outbound = newAutodrainingOutboundQueue(device)
peer.queue.inbound = newAutodrainingInboundQueue(device)
peer.queue.staged = make(chan *QueueOutboundElement, QueueStagedSize)
@ -162,6 +277,17 @@ func (device *Device) NewPeer(pk NoisePublicKey, id config.Vertex, isSuper bool)
return peer, nil
}
func (peer *Peer) IsPeerAlive() bool {
PeerAliveTimeout := path.S2TD(peer.device.DRoute.PeerAliveTimeout)
if peer.endpoint == nil {
return false
}
if peer.LastPingReceived.Add(PeerAliveTimeout).Before(time.Now()) {
return false
}
return true
}
func (peer *Peer) SendBuffer(buffer []byte) error {
peer.device.net.RLock()
defer peer.device.net.RUnlock()
@ -328,6 +454,9 @@ func (peer *Peer) SetEndpointFromConnURL(connurl string, af int, static bool) er
peer.ConnURL = connurl
peer.ConnAF = af
if peer.device.LogLevel.LogInternal {
fmt.Println("Internal: Set endpoint to " + connurl + " for NodeID:" + peer.ID.ToString())
}
var err error
connurl, err = conn.LookupIP(connurl, af)
if err != nil {

View File

@ -15,7 +15,6 @@ import (
"time"
"github.com/KusakabeSi/EtherGuardVPN/config"
orderedmap "github.com/KusakabeSi/EtherGuardVPN/orderdmap"
"github.com/KusakabeSi/EtherGuardVPN/path"
"github.com/KusakabeSi/EtherGuardVPN/tap"
"github.com/google/gopacket"
@ -321,13 +320,15 @@ func (device *Device) process_ping(peer *Peer, content path.PingMsg) error {
//peer.Lock()
//remove peer.endpoint_trylist
//peer.Unlock()
PongMSG := path.PongMsg{
Src_nodeID: content.Src_nodeID,
Dst_nodeID: device.ID,
Timediff: device.graph.GetCurrentTime().Sub(content.Time),
Src_nodeID: content.Src_nodeID,
Dst_nodeID: device.ID,
Timediff: device.graph.GetCurrentTime().Sub(content.Time),
AdditionalCost: device.AdditionalCost,
}
if device.DRoute.P2P.UseP2P && time.Now().After(device.graph.NhTableExpire) {
device.graph.UpdateLatency(content.Src_nodeID, device.ID, PongMSG.Timediff, true, false)
device.graph.UpdateLatency(content.Src_nodeID, device.ID, PongMSG.Timediff, device.AdditionalCost, true, false)
}
body, err := path.GetByte(&PongMSG)
if err != nil {
@ -354,7 +355,7 @@ func (device *Device) process_ping(peer *Peer, content path.PingMsg) error {
func (device *Device) process_pong(peer *Peer, content path.PongMsg) error {
if device.DRoute.P2P.UseP2P {
if time.Now().After(device.graph.NhTableExpire) {
device.graph.UpdateLatency(content.Src_nodeID, content.Dst_nodeID, content.Timediff, true, false)
device.graph.UpdateLatency(content.Src_nodeID, content.Dst_nodeID, content.Timediff, content.AdditionalCost, true, false)
}
if !peer.AskedForNeighbor {
QueryPeerMsg := path.QueryPeerMsg{
@ -385,6 +386,12 @@ func (device *Device) process_UpdatePeerMsg(peer *Peer, content path.UpdatePeerM
}
return nil
}
if bytes.Equal(device.peers.Peer_state[:], content.State_hash[:]) {
if device.LogLevel.LogControl {
fmt.Println("Control: Same PeerState Hash, skip download nhTable")
}
return nil
}
var peer_infos config.API_Peers
downloadurl := device.DRoute.SuperNode.APIUrl + "/peerinfo?NodeID=" + strconv.Itoa(int(device.ID)) + "&PubKey=" + url.QueryEscape(device.staticIdentity.publicKey.ToString()) + "&State=" + url.QueryEscape(string(content.State_hash[:]))
@ -435,9 +442,6 @@ func (device *Device) process_UpdatePeerMsg(peer *Peer, content path.UpdatePeerM
}
for PubKey, peerinfo := range peer_infos {
if len(peerinfo.Connurl) == 0 {
continue
}
sk, err := Str2PubKey(PubKey)
if err != nil {
device.log.Errorf("Error decode base64:", err)
@ -448,14 +452,17 @@ func (device *Device) process_UpdatePeerMsg(peer *Peer, content path.UpdatePeerM
}
thepeer := device.LookupPeer(sk)
if thepeer == nil { //not exist in local
if len(peerinfo.Connurl) == 0 {
continue
}
if device.LogLevel.LogControl {
fmt.Println("Control: Add new peer to local ID:" + peerinfo.NodeID.ToString() + " PubKey:" + PubKey)
}
if device.graph.Weight(device.ID, peerinfo.NodeID) == path.Infinity { // add node to graph
device.graph.UpdateLatency(device.ID, peerinfo.NodeID, path.S2TD(path.Infinity), true, false)
if device.graph.Weight(device.ID, peerinfo.NodeID, false) == path.Infinity { // add node to graph
device.graph.UpdateLatency(device.ID, peerinfo.NodeID, path.S2TD(path.Infinity), device.AdditionalCost, true, false)
}
if device.graph.Weight(peerinfo.NodeID, device.ID) == path.Infinity { // add node to graph
device.graph.UpdateLatency(peerinfo.NodeID, device.ID, path.S2TD(path.Infinity), true, false)
if device.graph.Weight(peerinfo.NodeID, device.ID, false) == path.Infinity { // add node to graph
device.graph.UpdateLatency(peerinfo.NodeID, device.ID, path.S2TD(path.Infinity), device.AdditionalCost, true, false)
}
device.NewPeer(sk, peerinfo.NodeID, false)
thepeer = device.LookupPeer(sk)
@ -469,14 +476,10 @@ func (device *Device) process_UpdatePeerMsg(peer *Peer, content path.UpdatePeerM
thepeer.SetPSK(pk)
}
if thepeer.LastPingReceived.Add(path.S2TD(device.DRoute.PeerAliveTimeout)).Before(time.Now()) {
thepeer.endpoint_trylist.UpdateSuper(peerinfo.Connurl)
if !thepeer.IsPeerAlive() {
//Peer died, try to switch to this new endpoint
for url, _ := range peerinfo.Connurl {
thepeer.Lock()
thepeer.endpoint_trylist.LoadOrStore(url, time.Time{}) //another gorouting will process it
thepeer.Unlock()
send_signal = true
}
send_signal = true
}
}
device.peers.Peer_state = content.State_hash
@ -564,8 +567,14 @@ func (device *Device) process_RequestPeerMsg(content path.QueryPeerMsg) error {
continue
}
if peer.endpoint == nil {
// I don't have the infomation of this peer, skip
continue
}
if !peer.IsPeerAlive() {
// peer died, skip
continue
}
peer.handshake.mutex.RLock()
response := path.BoardcastPeerMsg{
Request_ID: content.Request_ID,
@ -608,19 +617,17 @@ func (device *Device) process_BoardcastPeerMsg(peer *Peer, content path.Boardcas
if device.LogLevel.LogControl {
fmt.Println("Control: Add new peer to local ID:" + content.NodeID.ToString() + " PubKey:" + pk.ToString())
}
if device.graph.Weight(device.ID, content.NodeID) == path.Infinity { // add node to graph
device.graph.UpdateLatency(device.ID, content.NodeID, path.S2TD(path.Infinity), true, false)
if device.graph.Weight(device.ID, content.NodeID, false) == path.Infinity { // add node to graph
device.graph.UpdateLatency(device.ID, content.NodeID, path.S2TD(path.Infinity), device.AdditionalCost, true, false)
}
if device.graph.Weight(content.NodeID, device.ID) == path.Infinity { // add node to graph
device.graph.UpdateLatency(content.NodeID, device.ID, path.S2TD(path.Infinity), true, false)
if device.graph.Weight(content.NodeID, device.ID, false) == path.Infinity { // add node to graph
device.graph.UpdateLatency(content.NodeID, device.ID, path.S2TD(path.Infinity), device.AdditionalCost, true, false)
}
device.NewPeer(pk, content.NodeID, false)
}
if thepeer.LastPingReceived.Add(path.S2TD(device.DRoute.PeerAliveTimeout)).Before(time.Now()) {
if !thepeer.IsPeerAlive() {
//Peer died, try to switch to this new endpoint
thepeer.Lock()
thepeer.endpoint_trylist.LoadOrStore(content.ConnURL, time.Time{}) //another gorouting will process it
thepeer.Unlock()
thepeer.endpoint_trylist.UpdateP2P(content.ConnURL) //another gorouting will process it
device.event_tryendpoint <- struct{}{}
}
@ -640,46 +647,19 @@ func (device *Device) RoutineSetEndpoint() {
//Peer alives
continue
} else {
thepeer.RLock()
thepeer.endpoint_trylist.Sort(func(a *orderedmap.Pair, b *orderedmap.Pair) bool {
return a.Value().(time.Time).Before(b.Value().(time.Time))
})
trylist := thepeer.endpoint_trylist.Keys()
thepeer.RUnlock()
for _, key := range trylist { // try next endpoint
connurl := key
thepeer.RLock()
val, hasval := thepeer.endpoint_trylist.Get(key)
thepeer.RUnlock()
if !hasval {
continue
}
trytime := val.(time.Time)
if trytime.Sub(time.Time{}) != time.Duration(0) && time.Now().Sub(trytime) > path.S2TD(device.DRoute.ConnTimeOut) { // tried before, but no response
thepeer.Lock()
thepeer.endpoint_trylist.Delete(key)
thepeer.Unlock()
} else {
if device.LogLevel.LogControl {
fmt.Println("Control: Set endpoint to " + connurl + " for NodeID:" + thepeer.ID.ToString())
}
err := thepeer.SetEndpointFromConnURL(connurl, thepeer.ConnAF, thepeer.StaticConn) //trying to bind first url in the list and wait device.DRoute.P2P.PeerAliveTimeout seconds
if err != nil {
device.log.Errorf("Bind " + connurl + " failed!")
thepeer.Lock()
thepeer.endpoint_trylist.Delete(connurl)
thepeer.Unlock()
continue
}
NextRun = true
thepeer.Lock()
thepeer.endpoint_trylist.Set(key, time.Now())
thepeer.Unlock()
//Send Ping message to it
go device.SendPing(thepeer, int(device.DRoute.ConnNextTry+1), 1, 1)
break
}
connurl := thepeer.endpoint_trylist.GetNextTry()
if connurl == "" {
continue
}
err := thepeer.SetEndpointFromConnURL(connurl, thepeer.ConnAF, thepeer.StaticConn) //trying to bind first url in the list and wait ConnNextTry seconds
if err != nil {
device.log.Errorf("Bind " + connurl + " failed!")
thepeer.endpoint_trylist.Delete(connurl)
continue
}
NextRun = true
go device.SendPing(thepeer, int(device.DRoute.ConnNextTry+1), 1, 1)
}
}
ClearChanLoop:
@ -691,12 +671,28 @@ func (device *Device) RoutineSetEndpoint() {
}
}
time.Sleep(path.S2TD(device.DRoute.ConnNextTry))
if device.LogLevel.LogInternal {
fmt.Printf("Internal: RoutineSetEndpoint: NextRun:%v\n", NextRun)
}
if NextRun {
device.event_tryendpoint <- struct{}{}
}
}
}
func (device *Device) RoutineDetectOfflineAndTryNextEndpoint() {
if !(device.DRoute.P2P.UseP2P || device.DRoute.SuperNode.UseSuperNode) {
return
}
if device.DRoute.ConnTimeOut == 0 {
return
}
for {
device.event_tryendpoint <- struct{}{}
time.Sleep(path.S2TD(device.DRoute.ConnTimeOut))
}
}
func (device *Device) RoutineSendPing() {
if !(device.DRoute.P2P.UseP2P || device.DRoute.SuperNode.UseSuperNode) {
return
@ -745,29 +741,19 @@ func (device *Device) RoutineRecalculateNhTable() {
if device.graph.TimeoutCheckInterval == 0 {
return
}
if device.IsSuperNode {
for {
if device.graph.CheckAnyShouldUpdate() {
changed := device.graph.RecalculateNhTable(true)
if changed {
device.Event_server_NhTable_changed <- struct{}{}
}
}
time.Sleep(device.graph.TimeoutCheckInterval)
}
} else {
if !device.DRoute.P2P.UseP2P {
return
}
for {
if time.Now().After(device.graph.NhTableExpire) {
if device.graph.CheckAnyShouldUpdate() {
device.graph.RecalculateNhTable(false)
}
}
time.Sleep(device.graph.TimeoutCheckInterval)
}
if !device.DRoute.P2P.UseP2P {
return
}
for {
if time.Now().After(device.graph.NhTableExpire) {
if device.graph.CheckAnyShouldUpdate() {
device.graph.RecalculateNhTable(false)
}
}
time.Sleep(device.graph.TimeoutCheckInterval)
}
}
func (device *Device) RoutineSpreadAllMyNeighbor() {

View File

@ -13,7 +13,7 @@ nodename: Node01
defaultttl: 200
l2fibtimeout: 3600
privkey: 6GyDagZKhbm5WNqMiRHhkf43RlbMJ34IieTlIuvfJ1M=
listenport: 3001
listenport: 0
loglevel:
loglevel: normal
logtransit: true
@ -40,6 +40,7 @@ dynamicroute:
p2p:
usep2p: false
sendpeerinterval: 20
additionalcost: 0
graphrecalculatesetting:
staticmode: false
jittertolerance: 20

View File

@ -13,7 +13,7 @@ nodename: Node02
defaultttl: 200
l2fibtimeout: 3600
privkey: OH8BsVUU2Rqzeu9B2J5GPG8PUmxWfX8uVvNFZKhVF3o=
listenport: 3002
listenport: 0
loglevel:
loglevel: normal
logtransit: true
@ -40,6 +40,7 @@ dynamicroute:
p2p:
usep2p: false
sendpeerinterval: 20
additionalcost: 0
graphrecalculatesetting:
staticmode: false
jittertolerance: 20
@ -70,4 +71,4 @@ dynamicroute:
- time.windows.com
nexthoptable: {}
resetconninterval: 86400
peers: []
peers: []

View File

@ -7,6 +7,7 @@ loglevel:
logtransit: true
logcontrol: true
lognormal: true
loginternal: true
logntp: false
repushconfiginterval: 30
passwords:
@ -32,7 +33,9 @@ peers:
name: Node_01
pubkey: ZqzLVSbXzjppERslwbf2QziWruW3V/UIx9oqwU8Fn3I=
pskey: iPM8FXfnHVzwjguZHRW9bLNY+h7+B1O2oTJtktptQkI=
additionalcost: 1000
- nodeid: 2
name: Node_02
pubkey: dHeWQtlTPQGy87WdbUARS4CtwVaR2y7IQ1qcX4GKSXk=
pskey: juJMQaGAaeSy8aDsXSKNsPZv/nFiPj4h/1G70tGYygs=
additionalcost: 1000

View File

@ -38,11 +38,12 @@ func printExampleEdgeConf() {
SendAddr: "127.0.0.1:5001",
L2HeaderMode: "nochg",
},
NodeID: 1,
NodeName: "Node01",
DefaultTTL: 200,
PrivKey: "6GyDagZKhbm5WNqMiRHhkf43RlbMJ34IieTlIuvfJ1M=",
ListenPort: 3001,
NodeID: 1,
NodeName: "Node01",
DefaultTTL: 200,
L2FIBTimeout: 3600,
PrivKey: "6GyDagZKhbm5WNqMiRHhkf43RlbMJ34IieTlIuvfJ1M=",
ListenPort: 3001,
LogLevel: config.LoggerInfo{
LogLevel: "error",
LogTransit: true,
@ -76,6 +77,7 @@ func printExampleEdgeConf() {
JitterTolerance: 20,
JitterToleranceMultiplier: 1.1,
NodeReportTimeout: 40,
TimeoutCheckInterval: 5,
RecalculateCoolDown: 5,
},
},
@ -122,18 +124,18 @@ func printExampleEdgeConf() {
}
g := path.NewGraph(3, false, tconfig.DynamicRoute.P2P.GraphRecalculateSetting, tconfig.DynamicRoute.NTPconfig, tconfig.LogLevel)
g.UpdateLatency(1, 2, path.S2TD(0.5), false, false)
g.UpdateLatency(2, 1, path.S2TD(0.5), false, false)
g.UpdateLatency(2, 3, path.S2TD(0.5), false, false)
g.UpdateLatency(3, 2, path.S2TD(0.5), false, false)
g.UpdateLatency(2, 4, path.S2TD(0.5), false, false)
g.UpdateLatency(4, 2, path.S2TD(0.5), false, false)
g.UpdateLatency(3, 4, path.S2TD(0.5), false, false)
g.UpdateLatency(4, 3, path.S2TD(0.5), false, false)
g.UpdateLatency(5, 3, path.S2TD(0.5), false, false)
g.UpdateLatency(3, 5, path.S2TD(0.5), false, false)
g.UpdateLatency(6, 4, path.S2TD(0.5), false, false)
g.UpdateLatency(4, 6, path.S2TD(0.5), false, false)
g.UpdateLatency(1, 2, path.S2TD(0.5), 0, false, false)
g.UpdateLatency(2, 1, path.S2TD(0.5), 0, false, false)
g.UpdateLatency(2, 3, path.S2TD(0.5), 0, false, false)
g.UpdateLatency(3, 2, path.S2TD(0.5), 0, false, false)
g.UpdateLatency(2, 4, path.S2TD(0.5), 0, false, false)
g.UpdateLatency(4, 2, path.S2TD(0.5), 0, false, false)
g.UpdateLatency(3, 4, path.S2TD(0.5), 0, false, false)
g.UpdateLatency(4, 3, path.S2TD(0.5), 0, false, false)
g.UpdateLatency(5, 3, path.S2TD(0.5), 0, false, false)
g.UpdateLatency(3, 5, path.S2TD(0.5), 0, false, false)
g.UpdateLatency(6, 4, path.S2TD(0.5), 0, false, false)
g.UpdateLatency(4, 6, path.S2TD(0.5), 0, false, false)
_, next, _ := g.FloydWarshall(false)
tconfig.NextHopTable = next
toprint, _ := yaml.Marshal(tconfig)

View File

@ -31,15 +31,16 @@ var (
http_PeerInfo_hash [32]byte
http_NhTableStr []byte
http_PeerInfo config.API_Peers
http_super_chains *path.SUPER_Events
http_passwords config.Passwords
http_StateExpire time.Time
http_StateString_tmp []byte
http_maps_lock sync.RWMutex
http_PeerID2PubKey map[config.Vertex]string
http_PeerState map[string]*PeerState //the state hash reported by peer
http_PeerIPs map[string]*HttpPeerLocalIP
http_maps_lock sync.RWMutex
http_PeerID2Info map[config.Vertex]config.SuperPeerInfo
http_PeerState map[string]*PeerState //the state hash reported by peer
http_PeerIPs map[string]*HttpPeerLocalIP
http_sconfig *config.SuperConfig
@ -87,7 +88,7 @@ func get_api_peers(old_State_hash [32]byte) (api_peerinfo config.API_Peers, Stat
api_peerinfo[peerinfo.PubKey] = config.API_Peerinfo{
NodeID: peerinfo.NodeID,
PSKey: peerinfo.PSKey,
Connurl: make(map[string]int),
Connurl: make(map[string]float64),
}
http_maps_lock.RLock()
if http_PeerState[peerinfo.PubKey].LastSeen.Add(path.S2TD(http_sconfig.GraphRecalculateSetting.NodeReportTimeout)).After(time.Now()) {
@ -152,7 +153,7 @@ func get_peerinfo(w http.ResponseWriter, r *http.Request) {
NodeID := config.Vertex(NID2)
http_maps_lock.RLock()
defer http_maps_lock.RUnlock()
if http_PeerID2PubKey[NodeID] != PubKey {
if http_PeerID2Info[NodeID].PubKey != PubKey {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("NodeID and PunKey are not match"))
return
@ -228,7 +229,7 @@ func get_nhtable(w http.ResponseWriter, r *http.Request) {
NodeID := config.Vertex(NID2)
http_maps_lock.RLock()
defer http_maps_lock.RUnlock()
if http_PeerID2PubKey[NodeID] != PubKey {
if http_PeerID2Info[NodeID].PubKey != PubKey {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("NodeID and PunKey are not match"))
return
@ -266,8 +267,8 @@ func get_info(w http.ResponseWriter, r *http.Request) {
PeerInfo: make(map[config.Vertex]HttpPeerInfo),
NhTable: http_graph.GetNHTable(false),
Infinity: path.Infinity,
Edges: http_graph.GetEdges(false),
Edges_Nh: http_graph.GetEdges(true),
Edges: http_graph.GetEdges(false, false),
Edges_Nh: http_graph.GetEdges(true, true),
Dist: http_graph.GetDtst(),
}
http_maps_lock.RLock()

View File

@ -70,9 +70,12 @@ func printExampleSuperConf() {
PrivKeyV6: "+EdOKIoBp/EvIusHDsvXhV1RJYbyN3Qr8nxlz35wl3I=",
ListenPort: 3000,
LogLevel: config.LoggerInfo{
LogLevel: "normal",
LogTransit: true,
LogControl: true,
LogLevel: "normal",
LogTransit: true,
LogControl: true,
LogNormal: false,
LogInternal: true,
LogNTP: false,
},
RePushConfigInterval: 30,
Passwords: config.Passwords{
@ -85,6 +88,7 @@ func printExampleSuperConf() {
JitterTolerance: 5,
JitterToleranceMultiplier: 1.01,
NodeReportTimeout: 40,
TimeoutCheckInterval: 5,
RecalculateCoolDown: 5,
},
NextHopTable: config.NextHopTable{
@ -99,16 +103,18 @@ func printExampleSuperConf() {
UsePSKForInterEdge: true,
Peers: []config.SuperPeerInfo{
{
NodeID: 1,
Name: "Node_01",
PubKey: "ZqzLVSbXzjppERslwbf2QziWruW3V/UIx9oqwU8Fn3I=",
PSKey: "iPM8FXfnHVzwjguZHRW9bLNY+h7+B1O2oTJtktptQkI=",
NodeID: 1,
Name: "Node_01",
PubKey: "ZqzLVSbXzjppERslwbf2QziWruW3V/UIx9oqwU8Fn3I=",
PSKey: "iPM8FXfnHVzwjguZHRW9bLNY+h7+B1O2oTJtktptQkI=",
AdditionalCost: 0,
},
{
NodeID: 2,
Name: "Node_02",
PubKey: "dHeWQtlTPQGy87WdbUARS4CtwVaR2y7IQ1qcX4GKSXk=",
PSKey: "juJMQaGAaeSy8aDsXSKNsPZv/nFiPj4h/1G70tGYygs=",
NodeID: 2,
Name: "Node_02",
PubKey: "dHeWQtlTPQGy87WdbUARS4CtwVaR2y7IQ1qcX4GKSXk=",
PSKey: "juJMQaGAaeSy8aDsXSKNsPZv/nFiPj4h/1G70tGYygs=",
AdditionalCost: 0,
},
},
}
@ -165,14 +171,13 @@ func Super(configPath string, useUAPI bool, printExample bool, bindmode string)
http_sconfig_path = configPath
http_PeerState = make(map[string]*PeerState)
http_PeerIPs = make(map[string]*HttpPeerLocalIP)
http_PeerID2PubKey = make(map[config.Vertex]string)
http_PeerID2Info = make(map[config.Vertex]config.SuperPeerInfo)
http_HashSalt = []byte(config.RandomStr(32, "Salt generate failed"))
http_passwords = sconfig.Passwords
super_chains := path.SUPER_Events{
Event_server_pong: make(chan path.PongMsg, 1<<5),
Event_server_register: make(chan path.RegisterMsg, 1<<5),
Event_server_NhTable_changed: make(chan struct{}, 1<<4),
http_super_chains = &path.SUPER_Events{
Event_server_pong: make(chan path.PongMsg, 1<<5),
Event_server_register: make(chan path.RegisterMsg, 1<<5),
}
http_graph = path.NewGraph(3, true, sconfig.GraphRecalculateSetting, config.NTPinfo{}, sconfig.LogLevel)
http_graph.SetNHTable(http_sconfig.NextHopTable, [32]byte{})
@ -183,10 +188,10 @@ func Super(configPath string, useUAPI bool, printExample bool, bindmode string)
}
}
thetap4, _ := tap.CreateDummyTAP()
http_device4 = device.NewDevice(thetap4, config.SuperNodeMessage, conn.NewDefaultBind(true, false, bindmode), logger4, http_graph, true, configPath, nil, &sconfig, &super_chains, Version)
http_device4 = device.NewDevice(thetap4, config.SuperNodeMessage, conn.NewDefaultBind(true, false, bindmode), logger4, http_graph, true, configPath, nil, &sconfig, http_super_chains, Version)
defer http_device4.Close()
thetap6, _ := tap.CreateDummyTAP()
http_device6 = device.NewDevice(thetap6, config.SuperNodeMessage, conn.NewDefaultBind(false, true, bindmode), logger6, http_graph, true, configPath, nil, &sconfig, &super_chains, Version)
http_device6 = device.NewDevice(thetap6, config.SuperNodeMessage, conn.NewDefaultBind(false, true, bindmode), logger6, http_graph, true, configPath, nil, &sconfig, http_super_chains, Version)
defer http_device6.Close()
if sconfig.PrivKeyV4 != "" {
pk4, err := device.Str2PriKey(sconfig.PrivKeyV4)
@ -235,8 +240,9 @@ func Super(configPath string, useUAPI bool, printExample bool, bindmode string)
signal.Notify(term, syscall.SIGTERM)
signal.Notify(term, os.Interrupt)
go Event_server_event_hendler(http_graph, super_chains)
go Event_server_event_hendler(http_graph, http_super_chains)
go RoutinePushSettings(path.S2TD(sconfig.RePushConfigInterval))
go RoutineTimeoutCheck()
go HttpServer(sconfig.ListenPort, "/api")
select {
@ -288,7 +294,7 @@ func super_peeradd(peerconf config.SuperPeerInfo) error {
}
}
http_maps_lock.Lock()
http_PeerID2PubKey[peerconf.NodeID] = peerconf.PubKey
http_PeerID2Info[peerconf.NodeID] = peerconf
http_PeerState[peerconf.PubKey] = &PeerState{}
http_PeerIPs[peerconf.PubKey] = &HttpPeerLocalIP{}
http_maps_lock.Unlock()
@ -297,7 +303,7 @@ func super_peeradd(peerconf config.SuperPeerInfo) error {
func super_peerdel(toDelete config.Vertex) {
http_maps_lock.RLock()
PubKey := http_PeerID2PubKey[toDelete]
PubKey := http_PeerID2Info[toDelete].PubKey
http_maps_lock.RUnlock()
UpdateErrorMsg := path.UpdateErrorMsg{
Node_id: toDelete,
@ -328,20 +334,20 @@ func super_peerdel(toDelete config.Vertex) {
http_maps_lock.Lock()
delete(http_PeerState, PubKey)
delete(http_PeerIPs, PubKey)
delete(http_PeerID2PubKey, toDelete)
delete(http_PeerID2Info, toDelete)
http_maps_lock.Unlock()
}
func Event_server_event_hendler(graph *path.IG, events path.SUPER_Events) {
func Event_server_event_hendler(graph *path.IG, events *path.SUPER_Events) {
for {
select {
case reg_msg := <-events.Event_server_register:
var should_push_peer bool
var should_push_nh bool
http_maps_lock.RLock()
http_PeerState[http_PeerID2PubKey[reg_msg.Node_id]].LastSeen = time.Now()
if reg_msg.Node_id < config.Special_NodeID {
PubKey := http_PeerID2PubKey[reg_msg.Node_id]
http_PeerState[http_PeerID2Info[reg_msg.Node_id].PubKey].LastSeen = time.Now()
PubKey := http_PeerID2Info[reg_msg.Node_id].PubKey
if bytes.Equal(http_PeerState[PubKey].NhTableState[:], reg_msg.NhStateHash[:]) == false {
copy(http_PeerState[PubKey].NhTableState[:], reg_msg.NhStateHash[:])
should_push_nh = true
@ -363,17 +369,17 @@ func Event_server_event_hendler(graph *path.IG, events path.SUPER_Events) {
if should_push_nh {
PushNhTable(false)
}
case <-events.Event_server_NhTable_changed:
NhTable := graph.GetNHTable(true)
NhTablestr, _ := json.Marshal(NhTable)
md5_hash_raw := md5.Sum(http_NhTableStr)
new_hash_str := hex.EncodeToString(md5_hash_raw[:])
new_hash_str_byte := []byte(new_hash_str)
copy(http_NhTable_Hash[:], new_hash_str_byte)
http_NhTableStr = NhTablestr
PushNhTable(false)
case pong_msg := <-events.Event_server_pong:
changed := graph.UpdateLatency(pong_msg.Src_nodeID, pong_msg.Dst_nodeID, pong_msg.Timediff, true, true)
var changed bool
http_maps_lock.RLock()
if pong_msg.Src_nodeID < config.Special_NodeID && pong_msg.Dst_nodeID < config.Special_NodeID {
changed = graph.UpdateLatency(pong_msg.Src_nodeID, pong_msg.Dst_nodeID, pong_msg.Timediff, http_PeerID2Info[pong_msg.Dst_nodeID].AdditionalCost, true, true)
} else {
if http_graph.CheckAnyShouldUpdate() {
changed = http_graph.RecalculateNhTable(true)
}
}
http_maps_lock.RUnlock()
if changed {
NhTable := graph.GetNHTable(true)
NhTablestr, _ := json.Marshal(NhTable)
@ -405,6 +411,21 @@ func RoutinePushSettings(interval time.Duration) {
}
}
func RoutineTimeoutCheck() {
for {
http_super_chains.Event_server_register <- path.RegisterMsg{
Node_id: config.SuperNodeMessage,
Version: "dummy",
}
http_super_chains.Event_server_pong <- path.PongMsg{
RequestID: 0,
Src_nodeID: config.SuperNodeMessage,
Dst_nodeID: config.SuperNodeMessage,
}
time.Sleep(http_graph.TimeoutCheckInterval)
}
}
func PushNhTable(force bool) {
body, err := path.GetByte(path.UpdateNhTableMsg{
State_hash: http_NhTable_Hash,

View File

@ -140,10 +140,11 @@ func ParsePingMsg(bin []byte) (StructPlace PingMsg, err error) {
}
type PongMsg struct {
RequestID uint32
Src_nodeID config.Vertex
Dst_nodeID config.Vertex
Timediff time.Duration
RequestID uint32
Src_nodeID config.Vertex
Dst_nodeID config.Vertex
Timediff time.Duration
AdditionalCost float64
}
func (c *PongMsg) ToString() string {
@ -194,7 +195,6 @@ func ParseBoardcastPeerMsg(bin []byte) (StructPlace BoardcastPeerMsg, err error)
}
type SUPER_Events struct {
Event_server_pong chan PongMsg
Event_server_register chan RegisterMsg
Event_server_NhTable_changed chan struct{}
Event_server_pong chan PongMsg
Event_server_register chan RegisterMsg
}

View File

@ -23,9 +23,10 @@ func (g *IG) GetCurrentTime() time.Time {
}
type Latency struct {
ping float64
ping_old float64
time time.Time
ping float64
ping_old float64
additionalCost float64
time time.Time
}
type Fullroute struct {
@ -94,7 +95,7 @@ func (g *IG) GetWeightType(x float64) (y float64) {
}
func (g *IG) ShouldUpdate(u config.Vertex, v config.Vertex, newval float64) bool {
oldval := math.Abs(g.OldWeight(u, v) * 1000)
oldval := math.Abs(g.OldWeight(u, v, false) * 1000)
newval = math.Abs(newval * 1000)
if g.IsSuperMode {
if g.JitterTolerance > 0.001 && g.JitterToleranceMultiplier >= 1 {
@ -114,9 +115,11 @@ func (g *IG) CheckAnyShouldUpdate() bool {
vert := g.Vertices()
for u, _ := range vert {
for v, _ := range vert {
newVal := g.Weight(u, v)
if g.ShouldUpdate(u, v, newVal) {
return true
if u != v {
newVal := g.Weight(u, v, false)
if g.ShouldUpdate(u, v, newVal) {
return true
}
}
}
}
@ -130,7 +133,7 @@ func (g *IG) RecalculateNhTable(checkchange bool) (changed bool) {
}
return
}
if !g.ShouldCalculate() {
if !g.CheckAnyShouldUpdate() {
return
}
if g.recalculateTime.Add(g.RecalculateCoolDown).Before(time.Now()) {
@ -169,7 +172,7 @@ func (g *IG) RemoveVirt(v config.Vertex, recalculate bool, checkchange bool) (ch
return
}
func (g *IG) UpdateLatency(u, v config.Vertex, dt time.Duration, recalculate bool, checkchange bool) (changed bool) {
func (g *IG) UpdateLatency(u, v config.Vertex, dt time.Duration, additionalCost float64, recalculate bool, checkchange bool) (changed bool) {
g.edgelock.Lock()
g.Vert[u] = true
g.Vert[v] = true
@ -184,11 +187,13 @@ func (g *IG) UpdateLatency(u, v config.Vertex, dt time.Duration, recalculate boo
if _, ok := g.edges[u][v]; ok {
g.edges[u][v].ping = w
g.edges[u][v].time = time.Now()
g.edges[u][v].additionalCost = additionalCost / 1000
} else {
g.edges[u][v] = &Latency{
ping: w,
ping_old: Infinity,
time: time.Now(),
ping: w,
ping_old: Infinity,
time: time.Now(),
additionalCost: additionalCost / 1000,
}
}
g.edgelock.Unlock()
@ -225,7 +230,7 @@ func (g *IG) Next(u, v config.Vertex) *config.Vertex {
return g.nhTable[u][v]
}
func (g *IG) Weight(u, v config.Vertex) (ret float64) {
func (g *IG) Weight(u, v config.Vertex, withAC bool) (ret float64) {
g.edgelock.RLock()
defer g.edgelock.RUnlock()
//defer func() { fmt.Println(u, v, ret) }()
@ -241,10 +246,17 @@ func (g *IG) Weight(u, v config.Vertex) (ret float64) {
if time.Now().After(g.edges[u][v].time.Add(g.NodeReportTimeout)) {
return Infinity
}
return g.edges[u][v].ping
if withAC {
ret = g.edges[u][v].ping + g.edges[u][v].additionalCost
}
ret = g.edges[u][v].ping
if ret >= Infinity {
return Infinity
}
return
}
func (g *IG) OldWeight(u, v config.Vertex) (ret float64) {
func (g *IG) OldWeight(u, v config.Vertex, withAC bool) (ret float64) {
g.edgelock.RLock()
defer g.edgelock.RUnlock()
if u == v {
@ -256,22 +268,14 @@ func (g *IG) OldWeight(u, v config.Vertex) (ret float64) {
if _, ok := g.edges[u][v]; !ok {
return Infinity
}
return g.edges[u][v].ping_old
}
func (g *IG) ShouldCalculate() bool {
vert := g.Vertices()
for u, _ := range vert {
for v, _ := range vert {
if u != v {
w := g.Weight(u, v)
if g.ShouldUpdate(u, v, w) {
return true
}
}
}
if withAC {
ret = g.edges[u][v].ping_old + g.edges[u][v].additionalCost
}
return false
ret = g.edges[u][v].ping_old
if ret >= Infinity {
return Infinity
}
return
}
func (g *IG) SetWeight(u, v config.Vertex, weight float64) {
@ -302,7 +306,7 @@ func (g *IG) RemoveAllNegativeValue() {
vert := g.Vertices()
for u, _ := range vert {
for v, _ := range vert {
if g.Weight(u, v) < 0 {
if g.Weight(u, v, true) < 0 {
if g.loglevel.LogInternal {
fmt.Printf("Internal: Remove negative value : edge[%v][%v] = 0\n", u, v)
}
@ -332,13 +336,14 @@ func (g *IG) FloydWarshall(again bool) (dist config.DistTable, next config.NextH
}
dist[u][u] = 0
for _, v := range g.Neighbors(u) {
w := g.Weight(u, v)
w := g.Weight(u, v, true)
wo := g.Weight(u, v, false)
if w < Infinity {
v := v
dist[u][v] = w
next[u][v] = &v
}
g.SetOldWeight(u, v, w)
g.SetOldWeight(u, v, wo)
}
}
for k, _ := range vert {
@ -406,7 +411,7 @@ func (g *IG) GetDtst() config.DistTable {
return g.dlTable
}
func (g *IG) GetEdges(isOld bool) (edges map[config.Vertex]map[config.Vertex]float64) {
func (g *IG) GetEdges(isOld bool, withAC bool) (edges map[config.Vertex]map[config.Vertex]float64) {
vert := g.Vertices()
edges = make(map[config.Vertex]map[config.Vertex]float64, len(vert))
for src, _ := range vert {
@ -414,9 +419,9 @@ func (g *IG) GetEdges(isOld bool) (edges map[config.Vertex]map[config.Vertex]flo
for dst, _ := range vert {
if src != dst {
if isOld {
edges[src][dst] = g.OldWeight(src, dst)
edges[src][dst] = g.OldWeight(src, dst, withAC)
} else {
edges[src][dst] = g.Weight(src, dst)
edges[src][dst] = g.Weight(src, dst, withAC)
}
}
}
@ -498,7 +503,7 @@ func Solve(filePath string, pe bool) error {
val := a2n(sval)
dst := a2v(verts[index+1])
if src != dst && val != Infinity {
g.UpdateLatency(src, dst, S2TD(val), false, false)
g.UpdateLatency(src, dst, S2TD(val), 0, false, false)
}
}
}