Add timeout and lastseen at supernode, add lock to map access

This commit is contained in:
KusakabeSi 2021-09-28 23:32:51 +00:00
parent a60a9e8388
commit 96682579f8
6 changed files with 72 additions and 37 deletions

View File

@ -119,7 +119,7 @@ func (device *Device) NewPeer(pk NoisePublicKey, id config.Vertex, isSuper bool)
peer.cookieGenerator.Init(pk) peer.cookieGenerator.Init(pk)
peer.device = device peer.device = device
peer.endpoint_trylist = orderedmap.New() peer.endpoint_trylist = *orderedmap.New()
peer.queue.outbound = newAutodrainingOutboundQueue(device) peer.queue.outbound = newAutodrainingOutboundQueue(device)
peer.queue.inbound = newAutodrainingInboundQueue(device) peer.queue.inbound = newAutodrainingInboundQueue(device)
peer.queue.staged = make(chan *QueueOutboundElement, QueueStagedSize) peer.queue.staged = make(chan *QueueOutboundElement, QueueStagedSize)

View File

@ -124,7 +124,7 @@ func (device *Device) process_received(msg_type path.Usage, peer *Peer, body []b
} }
case path.PongPacket: case path.PongPacket:
if content, err := path.ParsePongMsg(body); err == nil { if content, err := path.ParsePongMsg(body); err == nil {
return device.server_process_Pong(content) return device.server_process_Pong(peer, content)
} }
default: default:
err = errors.New("Not a valid msg_type") err = errors.New("Not a valid msg_type")
@ -251,11 +251,13 @@ func (device *Device) server_process_RegisterMsg(peer *Peer, content path.Regist
device.SendPacket(peer, path.UpdateError, buf, MessageTransportOffsetContent) device.SendPacket(peer, path.UpdateError, buf, MessageTransportOffsetContent)
return nil return nil
} }
peer.LastPingReceived = time.Now()
device.Event_server_register <- content device.Event_server_register <- content
return nil return nil
} }
func (device *Device) server_process_Pong(content path.PongMsg) error { func (device *Device) server_process_Pong(peer *Peer, content path.PongMsg) error {
peer.LastPingReceived = time.Now()
device.Event_server_pong <- content device.Event_server_pong <- content
return nil return nil
} }

View File

@ -10,6 +10,7 @@ import (
"io/ioutil" "io/ioutil"
"net" "net"
"strconv" "strconv"
"sync"
"time" "time"
"net/http" "net/http"
@ -37,9 +38,10 @@ var (
http_StateExpire time.Time http_StateExpire time.Time
http_StateString_tmp []byte http_StateString_tmp []byte
http_PeerState map[string]*PeerState //the state hash reported by peer http_PeerState map[string]*PeerState //the state hash reported by peer
http_PeerIPs map[string]*HttpPeerLocalIP http_PeerIPs map[string]*HttpPeerLocalIP
http_sconfig *config.SuperConfig http_PeerLastSeen sync.Map // ID -> time.Time
http_sconfig *config.SuperConfig
http_sconfig_path string http_sconfig_path string
http_econfig_tmp *config.EdgeConfig http_econfig_tmp *config.EdgeConfig
@ -59,7 +61,8 @@ type HttpState struct {
} }
type HttpPeerInfo struct { type HttpPeerInfo struct {
Name string Name string
LastSeen string
} }
type PeerState struct { type PeerState struct {
@ -84,24 +87,26 @@ func get_api_peers(old_State_hash [32]byte) (api_peerinfo config.API_Peers, Stat
PSKey: peerinfo.PSKey, PSKey: peerinfo.PSKey,
Connurl: make(map[string]int), Connurl: make(map[string]int),
} }
connV4 := http_device4.GetConnurl(peerinfo.NodeID) lastSeen, has := http_PeerLastSeen.Load(peerinfo.NodeID)
connV6 := http_device6.GetConnurl(peerinfo.NodeID) if has && lastSeen.(time.Time).Add(path.S2TD(http_sconfig.GraphRecalculateSetting.NodeReportTimeout)).After(time.Now()) {
api_peerinfo[peerinfo.PubKey].Connurl[connV4] = 4 connV4 := http_device4.GetConnurl(peerinfo.NodeID)
api_peerinfo[peerinfo.PubKey].Connurl[connV6] = 6 connV6 := http_device6.GetConnurl(peerinfo.NodeID)
L4Addr := http_PeerIPs[peerinfo.PubKey].IPv4 api_peerinfo[peerinfo.PubKey].Connurl[connV4] = 4
L4IP := L4Addr.IP api_peerinfo[peerinfo.PubKey].Connurl[connV6] = 6
L4str := L4Addr.String() L4Addr := http_PeerIPs[peerinfo.PubKey].IPv4
if L4str != connV4 && conn.ValidIP(L4IP) { L4IP := L4Addr.IP
api_peerinfo[peerinfo.PubKey].Connurl[L4str] = 14 L4str := L4Addr.String()
if L4str != connV4 && conn.ValidIP(L4IP) {
api_peerinfo[peerinfo.PubKey].Connurl[L4str] = 14
}
L6Addr := http_PeerIPs[peerinfo.PubKey].IPv6
L6IP := L6Addr.IP
L6str := L6Addr.String()
if L6str != connV6 && conn.ValidIP(L6IP) {
api_peerinfo[peerinfo.PubKey].Connurl[L6str] = 16
}
delete(api_peerinfo[peerinfo.PubKey].Connurl, "")
} }
L6Addr := http_PeerIPs[peerinfo.PubKey].IPv6
L6IP := L6Addr.IP
L6str := L6Addr.String()
if L6str != connV6 && conn.ValidIP(L6IP) {
api_peerinfo[peerinfo.PubKey].Connurl[L6str] = 16
}
delete(api_peerinfo[peerinfo.PubKey].Connurl, "")
} }
api_peerinfo_str_byte, _ := json.Marshal(&api_peerinfo) api_peerinfo_str_byte, _ := json.Marshal(&api_peerinfo)
hash_raw := md5.Sum(append(api_peerinfo_str_byte, http_HashSalt...)) hash_raw := md5.Sum(append(api_peerinfo_str_byte, http_HashSalt...))
@ -254,8 +259,13 @@ func get_info(w http.ResponseWriter, r *http.Request) {
Dist: http_graph.GetDtst(), Dist: http_graph.GetDtst(),
} }
for _, peerinfo := range http_sconfig.Peers { for _, peerinfo := range http_sconfig.Peers {
LastSeenStr := ""
if lastseen, has := http_PeerLastSeen.Load(peerinfo.NodeID); has {
LastSeenStr = lastseen.(time.Time).String()
}
hs.PeerInfo[peerinfo.NodeID] = HttpPeerInfo{ hs.PeerInfo[peerinfo.NodeID] = HttpPeerInfo{
Name: peerinfo.Name, Name: peerinfo.Name,
LastSeen: LastSeenStr,
} }
} }
http_StateExpire = time.Now().Add(5 * time.Second) http_StateExpire = time.Now().Add(5 * time.Second)

View File

@ -230,6 +230,7 @@ func super_peeradd(peerconf config.SuperPeerInfo) error {
http_PeerID2PubKey[peerconf.NodeID] = peerconf.PubKey http_PeerID2PubKey[peerconf.NodeID] = peerconf.PubKey
http_PeerState[peerconf.PubKey] = &PeerState{} http_PeerState[peerconf.PubKey] = &PeerState{}
http_PeerIPs[peerconf.PubKey] = &HttpPeerLocalIP{} http_PeerIPs[peerconf.PubKey] = &HttpPeerLocalIP{}
http_PeerLastSeen.Store(peerconf.NodeID, time.Time{})
return nil return nil
} }
@ -261,6 +262,7 @@ func super_peerdel(toDelete config.Vertex) {
http_device4.RemovePeerByID(toDelete) http_device4.RemovePeerByID(toDelete)
http_device6.RemovePeerByID(toDelete) http_device6.RemovePeerByID(toDelete)
http_graph.RemoveVirt(toDelete, true, false) http_graph.RemoveVirt(toDelete, true, false)
http_PeerLastSeen.Delete(toDelete)
delete(http_PeerState, PubKey) delete(http_PeerState, PubKey)
delete(http_PeerIPs, PubKey) delete(http_PeerIPs, PubKey)
@ -273,6 +275,7 @@ func Event_server_event_hendler(graph *path.IG, events path.SUPER_Events) {
case reg_msg := <-events.Event_server_register: case reg_msg := <-events.Event_server_register:
var should_push_peer bool var should_push_peer bool
var should_push_nh bool var should_push_nh bool
http_PeerLastSeen.Store(reg_msg.Node_id, time.Now())
if reg_msg.Node_id < config.Special_NodeID { if reg_msg.Node_id < config.Special_NodeID {
PubKey := http_PeerID2PubKey[reg_msg.Node_id] PubKey := http_PeerID2PubKey[reg_msg.Node_id]
if bytes.Equal(http_PeerState[PubKey].NhTableState[:], reg_msg.NhStateHash[:]) == false { if bytes.Equal(http_PeerState[PubKey].NhTableState[:], reg_msg.NhStateHash[:]) == false {

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"sort" "sort"
"sync"
) )
type Pair struct { type Pair struct {
@ -32,14 +33,15 @@ type OrderedMap struct {
keys []string keys []string
values map[string]interface{} values map[string]interface{}
escapeHTML bool escapeHTML bool
rwLock sync.RWMutex
} }
func New() OrderedMap { func New() *OrderedMap {
o := OrderedMap{} o := OrderedMap{}
o.keys = []string{} o.keys = []string{}
o.values = map[string]interface{}{} o.values = map[string]interface{}{}
o.escapeHTML = true o.escapeHTML = true
return o return &o
} }
func (o *OrderedMap) SetEscapeHTML(on bool) { func (o *OrderedMap) SetEscapeHTML(on bool) {
@ -47,11 +49,15 @@ func (o *OrderedMap) SetEscapeHTML(on bool) {
} }
func (o *OrderedMap) Get(key string) (interface{}, bool) { func (o *OrderedMap) Get(key string) (interface{}, bool) {
o.rwLock.RLock()
defer o.rwLock.RUnlock()
val, exists := o.values[key] val, exists := o.values[key]
return val, exists return val, exists
} }
func (o *OrderedMap) Set(key string, value interface{}) { func (o *OrderedMap) Set(key string, value interface{}) {
o.rwLock.Lock()
defer o.rwLock.Unlock()
_, exists := o.values[key] _, exists := o.values[key]
if !exists { if !exists {
o.keys = append(o.keys, key) o.keys = append(o.keys, key)
@ -61,6 +67,8 @@ func (o *OrderedMap) Set(key string, value interface{}) {
func (o *OrderedMap) Delete(key string) { func (o *OrderedMap) Delete(key string) {
// check key is in use // check key is in use
o.rwLock.Lock()
defer o.rwLock.Unlock()
_, ok := o.values[key] _, ok := o.values[key]
if !ok { if !ok {
return return
@ -77,12 +85,16 @@ func (o *OrderedMap) Delete(key string) {
} }
func (o *OrderedMap) Clear() { // delete whole orderdmap func (o *OrderedMap) Clear() { // delete whole orderdmap
o.rwLock.Lock()
defer o.rwLock.Unlock()
for _, key := range o.Keys() { for _, key := range o.Keys() {
o.Delete(key) o.Delete(key)
} }
} }
func (o *OrderedMap) Keys() []string { func (o *OrderedMap) Keys() []string {
o.rwLock.RLock()
defer o.rwLock.RUnlock()
ret := make([]string, len(o.keys)) ret := make([]string, len(o.keys))
for i, v := range o.keys { for i, v := range o.keys {
ret[i] = v ret[i] = v
@ -92,11 +104,15 @@ func (o *OrderedMap) Keys() []string {
// SortKeys Sort the map keys using your sort func // SortKeys Sort the map keys using your sort func
func (o *OrderedMap) SortKeys(sortFunc func(keys []string)) { func (o *OrderedMap) SortKeys(sortFunc func(keys []string)) {
o.rwLock.Lock()
defer o.rwLock.Unlock()
sortFunc(o.keys) sortFunc(o.keys)
} }
// Sort Sort the map using your sort func // Sort Sort the map using your sort func
func (o *OrderedMap) Sort(lessFunc func(a *Pair, b *Pair) bool) { func (o *OrderedMap) Sort(lessFunc func(a *Pair, b *Pair) bool) {
o.rwLock.Lock()
defer o.rwLock.Unlock()
pairs := make([]*Pair, len(o.keys)) pairs := make([]*Pair, len(o.keys))
for i, key := range o.keys { for i, key := range o.keys {
pairs[i] = &Pair{key, o.values[key]} pairs[i] = &Pair{key, o.values[key]}
@ -110,6 +126,8 @@ func (o *OrderedMap) Sort(lessFunc func(a *Pair, b *Pair) bool) {
} }
func (o *OrderedMap) UnmarshalJSON(b []byte) error { func (o *OrderedMap) UnmarshalJSON(b []byte) error {
o.rwLock.RLock()
defer o.rwLock.RUnlock()
if o.values == nil { if o.values == nil {
o.values = map[string]interface{}{} o.values = map[string]interface{}{}
} }
@ -158,22 +176,22 @@ func decodeOrderedMap(dec *json.Decoder, o *OrderedMap) error {
switch delim { switch delim {
case '{': case '{':
if values, ok := o.values[key].(map[string]interface{}); ok { if values, ok := o.values[key].(map[string]interface{}); ok {
newMap := OrderedMap{ newMap := &OrderedMap{
keys: make([]string, 0, len(values)), keys: make([]string, 0, len(values)),
values: values, values: values,
escapeHTML: o.escapeHTML, escapeHTML: o.escapeHTML,
} }
if err = decodeOrderedMap(dec, &newMap); err != nil { if err = decodeOrderedMap(dec, newMap); err != nil {
return err return err
} }
o.values[key] = newMap o.values[key] = newMap
} else if oldMap, ok := o.values[key].(OrderedMap); ok { } else if oldMap, ok := o.values[key].(OrderedMap); ok {
newMap := OrderedMap{ newMap := &OrderedMap{
keys: make([]string, 0, len(oldMap.values)), keys: make([]string, 0, len(oldMap.values)),
values: oldMap.values, values: oldMap.values,
escapeHTML: o.escapeHTML, escapeHTML: o.escapeHTML,
} }
if err = decodeOrderedMap(dec, &newMap); err != nil { if err = decodeOrderedMap(dec, newMap); err != nil {
return err return err
} }
o.values[key] = newMap o.values[key] = newMap
@ -204,22 +222,22 @@ func decodeSlice(dec *json.Decoder, s []interface{}, escapeHTML bool) error {
case '{': case '{':
if index < len(s) { if index < len(s) {
if values, ok := s[index].(map[string]interface{}); ok { if values, ok := s[index].(map[string]interface{}); ok {
newMap := OrderedMap{ newMap := &OrderedMap{
keys: make([]string, 0, len(values)), keys: make([]string, 0, len(values)),
values: values, values: values,
escapeHTML: escapeHTML, escapeHTML: escapeHTML,
} }
if err = decodeOrderedMap(dec, &newMap); err != nil { if err = decodeOrderedMap(dec, newMap); err != nil {
return err return err
} }
s[index] = newMap s[index] = newMap
} else if oldMap, ok := s[index].(OrderedMap); ok { } else if oldMap, ok := s[index].(OrderedMap); ok {
newMap := OrderedMap{ newMap := &OrderedMap{
keys: make([]string, 0, len(oldMap.values)), keys: make([]string, 0, len(oldMap.values)),
values: oldMap.values, values: oldMap.values,
escapeHTML: escapeHTML, escapeHTML: escapeHTML,
} }
if err = decodeOrderedMap(dec, &newMap); err != nil { if err = decodeOrderedMap(dec, newMap); err != nil {
return err return err
} }
s[index] = newMap s[index] = newMap
@ -248,7 +266,9 @@ func decodeSlice(dec *json.Decoder, s []interface{}, escapeHTML bool) error {
} }
} }
func (o OrderedMap) MarshalJSON() ([]byte, error) { func (o *OrderedMap) MarshalJSON() ([]byte, error) {
o.rwLock.RLock()
defer o.rwLock.RUnlock()
var buf bytes.Buffer var buf bytes.Buffer
buf.WriteByte('{') buf.WriteByte('{')
encoder := json.NewEncoder(&buf) encoder := json.NewEncoder(&buf)

View File

@ -17,7 +17,7 @@ func (g *IG) InitNTP() {
g.ntp_info.UseNTP = false g.ntp_info.UseNTP = false
return return
} }
g.ntp_servers = orderedmap.New() g.ntp_servers = *orderedmap.New()
for _, url := range g.ntp_info.Servers { for _, url := range g.ntp_info.Servers {
g.ntp_servers.Set(url, ntp.Response{ g.ntp_servers.Set(url, ntp.Response{
RTT: forever, RTT: forever,