From 96682579f8c03dfdf3479b824034bf8ffe55a8cf Mon Sep 17 00:00:00 2001 From: KusakabeSi Date: Tue, 28 Sep 2021 23:32:51 +0000 Subject: [PATCH] Add timeout and lastseen at supernode, add lock to map access --- device/peer.go | 2 +- device/receivesendproc.go | 6 +++-- main_httpserver.go | 54 +++++++++++++++++++++++---------------- main_super.go | 3 +++ orderdmap/orderdmap.go | 42 ++++++++++++++++++++++-------- path/ntp.go | 2 +- 6 files changed, 72 insertions(+), 37 deletions(-) diff --git a/device/peer.go b/device/peer.go index cb80fee..cc8caed 100644 --- a/device/peer.go +++ b/device/peer.go @@ -119,7 +119,7 @@ func (device *Device) NewPeer(pk NoisePublicKey, id config.Vertex, isSuper bool) peer.cookieGenerator.Init(pk) peer.device = device - peer.endpoint_trylist = orderedmap.New() + peer.endpoint_trylist = *orderedmap.New() peer.queue.outbound = newAutodrainingOutboundQueue(device) peer.queue.inbound = newAutodrainingInboundQueue(device) peer.queue.staged = make(chan *QueueOutboundElement, QueueStagedSize) diff --git a/device/receivesendproc.go b/device/receivesendproc.go index 9f16938..fe5f052 100644 --- a/device/receivesendproc.go +++ b/device/receivesendproc.go @@ -124,7 +124,7 @@ func (device *Device) process_received(msg_type path.Usage, peer *Peer, body []b } case path.PongPacket: if content, err := path.ParsePongMsg(body); err == nil { - return device.server_process_Pong(content) + return device.server_process_Pong(peer, content) } default: err = errors.New("Not a valid msg_type") @@ -251,11 +251,13 @@ func (device *Device) server_process_RegisterMsg(peer *Peer, content path.Regist device.SendPacket(peer, path.UpdateError, buf, MessageTransportOffsetContent) return nil } + peer.LastPingReceived = time.Now() device.Event_server_register <- content return nil } -func (device *Device) server_process_Pong(content path.PongMsg) error { +func (device *Device) server_process_Pong(peer *Peer, content path.PongMsg) error { + peer.LastPingReceived = time.Now() device.Event_server_pong <- content return nil } diff --git a/main_httpserver.go b/main_httpserver.go index 71ed829..cefbb0e 100644 --- a/main_httpserver.go +++ b/main_httpserver.go @@ -10,6 +10,7 @@ import ( "io/ioutil" "net" "strconv" + "sync" "time" "net/http" @@ -37,9 +38,10 @@ var ( http_StateExpire time.Time http_StateString_tmp []byte - http_PeerState map[string]*PeerState //the state hash reported by peer - http_PeerIPs map[string]*HttpPeerLocalIP - http_sconfig *config.SuperConfig + http_PeerState map[string]*PeerState //the state hash reported by peer + http_PeerIPs map[string]*HttpPeerLocalIP + http_PeerLastSeen sync.Map // ID -> time.Time + http_sconfig *config.SuperConfig http_sconfig_path string http_econfig_tmp *config.EdgeConfig @@ -59,7 +61,8 @@ type HttpState struct { } type HttpPeerInfo struct { - Name string + Name string + LastSeen string } type PeerState struct { @@ -84,24 +87,26 @@ func get_api_peers(old_State_hash [32]byte) (api_peerinfo config.API_Peers, Stat PSKey: peerinfo.PSKey, Connurl: make(map[string]int), } - connV4 := http_device4.GetConnurl(peerinfo.NodeID) - connV6 := http_device6.GetConnurl(peerinfo.NodeID) - api_peerinfo[peerinfo.PubKey].Connurl[connV4] = 4 - api_peerinfo[peerinfo.PubKey].Connurl[connV6] = 6 - L4Addr := http_PeerIPs[peerinfo.PubKey].IPv4 - L4IP := L4Addr.IP - L4str := L4Addr.String() - if L4str != connV4 && conn.ValidIP(L4IP) { - api_peerinfo[peerinfo.PubKey].Connurl[L4str] = 14 + lastSeen, has := http_PeerLastSeen.Load(peerinfo.NodeID) + if has && lastSeen.(time.Time).Add(path.S2TD(http_sconfig.GraphRecalculateSetting.NodeReportTimeout)).After(time.Now()) { + connV4 := http_device4.GetConnurl(peerinfo.NodeID) + connV6 := http_device6.GetConnurl(peerinfo.NodeID) + api_peerinfo[peerinfo.PubKey].Connurl[connV4] = 4 + api_peerinfo[peerinfo.PubKey].Connurl[connV6] = 6 + L4Addr := http_PeerIPs[peerinfo.PubKey].IPv4 + L4IP := L4Addr.IP + L4str := L4Addr.String() + if L4str != connV4 && conn.ValidIP(L4IP) { + api_peerinfo[peerinfo.PubKey].Connurl[L4str] = 14 + } + L6Addr := http_PeerIPs[peerinfo.PubKey].IPv6 + L6IP := L6Addr.IP + L6str := L6Addr.String() + if L6str != connV6 && conn.ValidIP(L6IP) { + api_peerinfo[peerinfo.PubKey].Connurl[L6str] = 16 + } + delete(api_peerinfo[peerinfo.PubKey].Connurl, "") } - L6Addr := http_PeerIPs[peerinfo.PubKey].IPv6 - L6IP := L6Addr.IP - L6str := L6Addr.String() - if L6str != connV6 && conn.ValidIP(L6IP) { - api_peerinfo[peerinfo.PubKey].Connurl[L6str] = 16 - } - - delete(api_peerinfo[peerinfo.PubKey].Connurl, "") } api_peerinfo_str_byte, _ := json.Marshal(&api_peerinfo) hash_raw := md5.Sum(append(api_peerinfo_str_byte, http_HashSalt...)) @@ -254,8 +259,13 @@ func get_info(w http.ResponseWriter, r *http.Request) { Dist: http_graph.GetDtst(), } for _, peerinfo := range http_sconfig.Peers { + LastSeenStr := "" + if lastseen, has := http_PeerLastSeen.Load(peerinfo.NodeID); has { + LastSeenStr = lastseen.(time.Time).String() + } hs.PeerInfo[peerinfo.NodeID] = HttpPeerInfo{ - Name: peerinfo.Name, + Name: peerinfo.Name, + LastSeen: LastSeenStr, } } http_StateExpire = time.Now().Add(5 * time.Second) diff --git a/main_super.go b/main_super.go index 81a96d5..1ab7a8a 100644 --- a/main_super.go +++ b/main_super.go @@ -230,6 +230,7 @@ func super_peeradd(peerconf config.SuperPeerInfo) error { http_PeerID2PubKey[peerconf.NodeID] = peerconf.PubKey http_PeerState[peerconf.PubKey] = &PeerState{} http_PeerIPs[peerconf.PubKey] = &HttpPeerLocalIP{} + http_PeerLastSeen.Store(peerconf.NodeID, time.Time{}) return nil } @@ -261,6 +262,7 @@ func super_peerdel(toDelete config.Vertex) { http_device4.RemovePeerByID(toDelete) http_device6.RemovePeerByID(toDelete) http_graph.RemoveVirt(toDelete, true, false) + http_PeerLastSeen.Delete(toDelete) delete(http_PeerState, PubKey) delete(http_PeerIPs, PubKey) @@ -273,6 +275,7 @@ func Event_server_event_hendler(graph *path.IG, events path.SUPER_Events) { case reg_msg := <-events.Event_server_register: var should_push_peer bool var should_push_nh bool + http_PeerLastSeen.Store(reg_msg.Node_id, time.Now()) if reg_msg.Node_id < config.Special_NodeID { PubKey := http_PeerID2PubKey[reg_msg.Node_id] if bytes.Equal(http_PeerState[PubKey].NhTableState[:], reg_msg.NhStateHash[:]) == false { diff --git a/orderdmap/orderdmap.go b/orderdmap/orderdmap.go index dba6378..54fec54 100644 --- a/orderdmap/orderdmap.go +++ b/orderdmap/orderdmap.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "sort" + "sync" ) type Pair struct { @@ -32,14 +33,15 @@ type OrderedMap struct { keys []string values map[string]interface{} escapeHTML bool + rwLock sync.RWMutex } -func New() OrderedMap { +func New() *OrderedMap { o := OrderedMap{} o.keys = []string{} o.values = map[string]interface{}{} o.escapeHTML = true - return o + return &o } func (o *OrderedMap) SetEscapeHTML(on bool) { @@ -47,11 +49,15 @@ func (o *OrderedMap) SetEscapeHTML(on bool) { } func (o *OrderedMap) Get(key string) (interface{}, bool) { + o.rwLock.RLock() + defer o.rwLock.RUnlock() val, exists := o.values[key] return val, exists } func (o *OrderedMap) Set(key string, value interface{}) { + o.rwLock.Lock() + defer o.rwLock.Unlock() _, exists := o.values[key] if !exists { o.keys = append(o.keys, key) @@ -61,6 +67,8 @@ func (o *OrderedMap) Set(key string, value interface{}) { func (o *OrderedMap) Delete(key string) { // check key is in use + o.rwLock.Lock() + defer o.rwLock.Unlock() _, ok := o.values[key] if !ok { return @@ -77,12 +85,16 @@ func (o *OrderedMap) Delete(key string) { } func (o *OrderedMap) Clear() { // delete whole orderdmap + o.rwLock.Lock() + defer o.rwLock.Unlock() for _, key := range o.Keys() { o.Delete(key) } } func (o *OrderedMap) Keys() []string { + o.rwLock.RLock() + defer o.rwLock.RUnlock() ret := make([]string, len(o.keys)) for i, v := range o.keys { ret[i] = v @@ -92,11 +104,15 @@ func (o *OrderedMap) Keys() []string { // SortKeys Sort the map keys using your sort func func (o *OrderedMap) SortKeys(sortFunc func(keys []string)) { + o.rwLock.Lock() + defer o.rwLock.Unlock() sortFunc(o.keys) } // Sort Sort the map using your sort func func (o *OrderedMap) Sort(lessFunc func(a *Pair, b *Pair) bool) { + o.rwLock.Lock() + defer o.rwLock.Unlock() pairs := make([]*Pair, len(o.keys)) for i, key := range o.keys { pairs[i] = &Pair{key, o.values[key]} @@ -110,6 +126,8 @@ func (o *OrderedMap) Sort(lessFunc func(a *Pair, b *Pair) bool) { } func (o *OrderedMap) UnmarshalJSON(b []byte) error { + o.rwLock.RLock() + defer o.rwLock.RUnlock() if o.values == nil { o.values = map[string]interface{}{} } @@ -158,22 +176,22 @@ func decodeOrderedMap(dec *json.Decoder, o *OrderedMap) error { switch delim { case '{': if values, ok := o.values[key].(map[string]interface{}); ok { - newMap := OrderedMap{ + newMap := &OrderedMap{ keys: make([]string, 0, len(values)), values: values, escapeHTML: o.escapeHTML, } - if err = decodeOrderedMap(dec, &newMap); err != nil { + if err = decodeOrderedMap(dec, newMap); err != nil { return err } o.values[key] = newMap } else if oldMap, ok := o.values[key].(OrderedMap); ok { - newMap := OrderedMap{ + newMap := &OrderedMap{ keys: make([]string, 0, len(oldMap.values)), values: oldMap.values, escapeHTML: o.escapeHTML, } - if err = decodeOrderedMap(dec, &newMap); err != nil { + if err = decodeOrderedMap(dec, newMap); err != nil { return err } o.values[key] = newMap @@ -204,22 +222,22 @@ func decodeSlice(dec *json.Decoder, s []interface{}, escapeHTML bool) error { case '{': if index < len(s) { if values, ok := s[index].(map[string]interface{}); ok { - newMap := OrderedMap{ + newMap := &OrderedMap{ keys: make([]string, 0, len(values)), values: values, escapeHTML: escapeHTML, } - if err = decodeOrderedMap(dec, &newMap); err != nil { + if err = decodeOrderedMap(dec, newMap); err != nil { return err } s[index] = newMap } else if oldMap, ok := s[index].(OrderedMap); ok { - newMap := OrderedMap{ + newMap := &OrderedMap{ keys: make([]string, 0, len(oldMap.values)), values: oldMap.values, escapeHTML: escapeHTML, } - if err = decodeOrderedMap(dec, &newMap); err != nil { + if err = decodeOrderedMap(dec, newMap); err != nil { return err } s[index] = newMap @@ -248,7 +266,9 @@ func decodeSlice(dec *json.Decoder, s []interface{}, escapeHTML bool) error { } } -func (o OrderedMap) MarshalJSON() ([]byte, error) { +func (o *OrderedMap) MarshalJSON() ([]byte, error) { + o.rwLock.RLock() + defer o.rwLock.RUnlock() var buf bytes.Buffer buf.WriteByte('{') encoder := json.NewEncoder(&buf) diff --git a/path/ntp.go b/path/ntp.go index 1ff13be..b396b74 100644 --- a/path/ntp.go +++ b/path/ntp.go @@ -17,7 +17,7 @@ func (g *IG) InitNTP() { g.ntp_info.UseNTP = false return } - g.ntp_servers = orderedmap.New() + g.ntp_servers = *orderedmap.New() for _, url := range g.ntp_info.Servers { g.ntp_servers.Set(url, ntp.Response{ RTT: forever,