EtherGuard-VPN/device/receivesendproc.go

796 lines
24 KiB
Go
Raw Normal View History

2021-08-20 19:32:50 +02:00
package device
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"hash/crc32"
"io/ioutil"
"net"
2021-08-20 19:32:50 +02:00
"net/http"
2021-08-21 16:54:24 +02:00
"net/url"
"strconv"
"strings"
2021-08-20 19:32:50 +02:00
"time"
"github.com/KusakabeSi/EtherGuardVPN/config"
orderedmap "github.com/KusakabeSi/EtherGuardVPN/orderdmap"
2021-08-20 19:32:50 +02:00
"github.com/KusakabeSi/EtherGuardVPN/path"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
2021-08-20 19:32:50 +02:00
)
2021-09-21 03:15:23 +02:00
func (device *Device) SendPacket(peer *Peer, usage path.Usage, packet []byte, offset int) {
2021-08-20 19:32:50 +02:00
if peer == nil {
return
} else if peer.endpoint == nil {
return
2021-08-20 19:32:50 +02:00
}
2021-08-25 10:13:53 +02:00
if device.LogLevel.LogNormal {
EgHeader, _ := path.NewEgHeader(packet[:path.EgHeaderLen])
if usage == path.NormalPacket {
2021-08-25 10:13:53 +02:00
dst_nodeID := EgHeader.GetDst()
2021-08-25 13:54:13 +02:00
fmt.Println("Normal: Send Normal packet To:" + peer.GetEndpointDstStr() + " SrcID:" + device.ID.ToString() + " DstID:" + dst_nodeID.ToString() + " Len:" + strconv.Itoa(len(packet)))
packet := gopacket.NewPacket(packet[path.EgHeaderLen:], layers.LayerTypeEthernet, gopacket.Default)
fmt.Println(packet.Dump())
2021-08-25 10:13:53 +02:00
}
}
if device.LogLevel.LogControl {
if usage != path.NormalPacket {
2021-08-21 16:54:24 +02:00
if peer.GetEndpointDstStr() != "" {
2021-09-21 03:15:23 +02:00
fmt.Println("Control: Send To:" + peer.GetEndpointDstStr() + " " + device.sprint_received(usage, packet[path.EgHeaderLen:]))
2021-08-21 16:54:24 +02:00
}
}
}
2021-08-25 10:13:53 +02:00
2021-08-20 19:32:50 +02:00
var elem *QueueOutboundElement
elem = device.NewOutboundElement()
copy(elem.buffer[offset:offset+len(packet)], packet)
2021-09-21 03:15:23 +02:00
elem.Type = usage
2021-08-20 19:32:50 +02:00
elem.packet = elem.buffer[offset : offset+len(packet)]
if peer.isRunning.Get() {
peer.StagePacket(elem)
elem = nil
peer.SendStagedPackets()
}
}
2021-09-21 03:15:23 +02:00
func (device *Device) BoardcastPacket(skip_list map[config.Vertex]bool, usage path.Usage, packet []byte, offset int) { // Send packet to all connected peers
2021-08-20 19:32:50 +02:00
send_list := device.graph.GetBoardcastList(device.ID)
for node_id, _ := range skip_list {
send_list[node_id] = false
}
2021-08-23 18:39:04 +02:00
device.peers.RLock()
2021-08-20 19:32:50 +02:00
for node_id, should_send := range send_list {
if should_send {
2021-09-21 22:03:11 +02:00
peer_out, _ := device.peers.IDMap[node_id]
device.SendPacket(peer_out, usage, packet, offset)
2021-08-20 19:32:50 +02:00
}
}
2021-08-23 18:39:04 +02:00
device.peers.RUnlock()
2021-08-20 19:32:50 +02:00
}
2021-09-21 03:15:23 +02:00
func (device *Device) SpreadPacket(skip_list map[config.Vertex]bool, usage path.Usage, packet []byte, offset int) { // Send packet to all peers no matter it is alive
2021-08-23 18:39:04 +02:00
device.peers.RLock()
2021-08-20 19:32:50 +02:00
for peer_id, peer_out := range device.peers.IDMap {
if _, ok := skip_list[peer_id]; ok {
2021-08-25 10:13:53 +02:00
if device.LogLevel.LogTransit {
2021-08-25 13:54:13 +02:00
fmt.Printf("Transit: Skipped Spread Packet packet through %d to %d\n", device.ID, peer_out.ID)
2021-08-20 19:32:50 +02:00
}
continue
}
2021-09-21 03:15:23 +02:00
device.SendPacket(peer_out, usage, packet, MessageTransportOffsetContent)
2021-08-20 19:32:50 +02:00
}
2021-08-23 18:39:04 +02:00
device.peers.RUnlock()
2021-08-20 19:32:50 +02:00
}
2021-09-21 03:15:23 +02:00
func (device *Device) TransitBoardcastPacket(src_nodeID config.Vertex, in_id config.Vertex, usage path.Usage, packet []byte, offset int) {
2021-08-20 19:32:50 +02:00
node_boardcast_list := device.graph.GetBoardcastThroughList(device.ID, in_id, src_nodeID)
2021-08-23 18:39:04 +02:00
device.peers.RLock()
2021-08-20 19:32:50 +02:00
for peer_id := range node_boardcast_list {
peer_out := device.peers.IDMap[peer_id]
2021-08-25 10:13:53 +02:00
if device.LogLevel.LogTransit {
2021-08-25 13:54:13 +02:00
fmt.Printf("Transit: Transfer packet from %d through %d to %d\n", in_id, device.ID, peer_out.ID)
2021-08-20 19:32:50 +02:00
}
2021-09-21 03:15:23 +02:00
device.SendPacket(peer_out, usage, packet, offset)
2021-08-20 19:32:50 +02:00
}
2021-08-23 18:39:04 +02:00
device.peers.RUnlock()
2021-08-20 19:32:50 +02:00
}
2021-09-21 03:15:23 +02:00
func (device *Device) Send2Super(usage path.Usage, packet []byte, offset int) {
2021-08-23 18:39:04 +02:00
device.peers.RLock()
2021-08-20 19:32:50 +02:00
if device.DRoute.SuperNode.UseSuperNode {
for _, peer_out := range device.peers.SuperPeer {
2021-08-25 10:13:53 +02:00
/*if device.LogTransit {
2021-08-20 19:32:50 +02:00
fmt.Printf("Send to supernode %s\n", peer_out.endpoint.DstToString())
2021-08-25 10:13:53 +02:00
}*/
2021-09-21 03:15:23 +02:00
device.SendPacket(peer_out, usage, packet, offset)
2021-08-20 19:32:50 +02:00
}
}
2021-08-23 18:39:04 +02:00
device.peers.RUnlock()
2021-08-20 19:32:50 +02:00
}
func (device *Device) CheckNoDup(packet []byte) bool {
hasher := crc32.New(crc32.MakeTable(crc32.Castagnoli))
hasher.Write(packet)
crc32result := hasher.Sum32()
_, ok := device.DupData.Get(crc32result)
device.DupData.Set(crc32result, true)
return !ok
}
2021-08-23 18:39:04 +02:00
func (device *Device) process_received(msg_type path.Usage, peer *Peer, body []byte) (err error) {
2021-08-20 19:32:50 +02:00
if device.IsSuperNode {
switch msg_type {
case path.Register:
if content, err := path.ParseRegisterMsg(body); err == nil {
return device.server_process_RegisterMsg(peer, content)
2021-08-20 19:32:50 +02:00
}
case path.PongPacket:
if content, err := path.ParsePongMsg(body); err == nil {
return device.server_process_Pong(peer, content)
2021-08-20 19:32:50 +02:00
}
default:
err = errors.New("Not a valid msg_type")
}
} else {
switch msg_type {
case path.UpdatePeer:
if content, err := path.ParseUpdatePeerMsg(body); err == nil {
go device.process_UpdatePeerMsg(peer, content)
2021-08-20 19:32:50 +02:00
}
case path.UpdateNhTable:
if content, err := path.ParseUpdateNhTableMsg(body); err == nil {
go device.process_UpdateNhTableMsg(peer, content)
}
case path.UpdateError:
if content, err := path.ParseUpdateErrorMsg(body); err == nil {
device.process_UpdateErrorMsg(peer, content)
2021-08-20 19:32:50 +02:00
}
case path.PingPacket:
if content, err := path.ParsePingMsg(body); err == nil {
2021-09-20 22:20:00 +02:00
return device.process_ping(peer, content)
2021-08-20 19:32:50 +02:00
}
case path.PongPacket:
if content, err := path.ParsePongMsg(body); err == nil {
2021-08-23 18:39:04 +02:00
return device.process_pong(peer, content)
2021-08-20 19:32:50 +02:00
}
2021-08-23 18:39:04 +02:00
case path.QueryPeer:
if content, err := path.ParseQueryPeerMsg(body); err == nil {
2021-08-20 19:32:50 +02:00
return device.process_RequestPeerMsg(content)
}
case path.BroadcastPeer:
2021-08-20 19:32:50 +02:00
if content, err := path.ParseBoardcastPeerMsg(body); err == nil {
2021-08-24 10:43:55 +02:00
return device.process_BoardcastPeerMsg(peer, content)
2021-08-20 19:32:50 +02:00
}
default:
err = errors.New("Not a valid msg_type")
}
}
return
}
func (device *Device) sprint_received(msg_type path.Usage, body []byte) string {
2021-08-21 16:54:24 +02:00
switch msg_type {
case path.Register:
if content, err := path.ParseRegisterMsg(body); err == nil {
return content.ToString()
2021-08-21 16:54:24 +02:00
}
return "RegisterMsg: Parse failed"
2021-08-21 16:54:24 +02:00
case path.UpdatePeer:
if content, err := path.ParseUpdatePeerMsg(body); err == nil {
return content.ToString()
2021-08-21 16:54:24 +02:00
}
return "UpdatePeerMsg: Parse failed"
2021-08-21 16:54:24 +02:00
case path.UpdateNhTable:
if content, err := path.ParseUpdateNhTableMsg(body); err == nil {
return content.ToString()
2021-08-21 16:54:24 +02:00
}
return "UpdateNhTableMsg: Parse failed"
case path.UpdateError:
if content, err := path.ParseUpdateErrorMsg(body); err == nil {
return content.ToString()
}
return "UpdateErrorMsg: Parse failed"
2021-08-21 16:54:24 +02:00
case path.PingPacket:
if content, err := path.ParsePingMsg(body); err == nil {
return content.ToString()
2021-08-21 16:54:24 +02:00
}
return "PingPacketMsg: Parse failed"
2021-08-21 16:54:24 +02:00
case path.PongPacket:
if content, err := path.ParsePongMsg(body); err == nil {
return content.ToString()
2021-08-21 16:54:24 +02:00
}
return "PongPacketMsg: Parse failed"
2021-08-23 18:39:04 +02:00
case path.QueryPeer:
if content, err := path.ParseQueryPeerMsg(body); err == nil {
return content.ToString()
2021-08-21 16:54:24 +02:00
}
return "QueryPeerMsg: Parse failed"
case path.BroadcastPeer:
2021-08-21 16:54:24 +02:00
if content, err := path.ParseBoardcastPeerMsg(body); err == nil {
return content.ToString()
2021-08-21 16:54:24 +02:00
}
return "BoardcastPeerMsg: Parse failed"
2021-08-21 16:54:24 +02:00
default:
return "UnknowMsg: Not a valid msg_type"
2021-08-21 16:54:24 +02:00
}
}
func compareVersion(v1 string, v2 string) bool {
if strings.Contains(v1, "-") {
v1 = strings.Split(v1, "-")[0]
}
if strings.Contains(v2, "-") {
v2 = strings.Split(v2, "-")[0]
}
return v1 == v2
}
func (device *Device) server_process_RegisterMsg(peer *Peer, content path.RegisterMsg) error {
2021-09-20 22:20:00 +02:00
UpdateErrorMsg := path.UpdateErrorMsg{
Node_id: peer.ID,
Action: path.NoAction,
ErrorCode: 0,
ErrorMsg: "",
}
if peer.ID != content.Node_id {
2021-09-20 22:20:00 +02:00
UpdateErrorMsg = path.UpdateErrorMsg{
Node_id: peer.ID,
Action: path.Shutdown,
ErrorCode: 401,
2021-09-20 22:20:00 +02:00
ErrorMsg: "Your node ID is not match with our registered nodeID",
}
2021-09-20 22:20:00 +02:00
}
if compareVersion(content.Version, device.Version) == false {
2021-09-20 22:20:00 +02:00
UpdateErrorMsg = path.UpdateErrorMsg{
Node_id: peer.ID,
Action: path.Shutdown,
ErrorCode: 400,
ErrorMsg: "Your version is not match with our version: " + device.Version,
}
}
if UpdateErrorMsg.Action != path.NoAction {
body, err := path.GetByte(&UpdateErrorMsg)
if err != nil {
return err
}
buf := make([]byte, path.EgHeaderLen+len(body))
header, err := path.NewEgHeader(buf[:path.EgHeaderLen])
header.SetSrc(device.ID)
2021-09-20 22:20:00 +02:00
header.SetTTL(device.DefaultTTL)
header.SetPacketLength(uint16(len(body)))
copy(buf[path.EgHeaderLen:], body)
2021-09-20 22:20:00 +02:00
header.SetDst(peer.ID)
2021-09-21 03:15:23 +02:00
device.SendPacket(peer, path.UpdateError, buf, MessageTransportOffsetContent)
return nil
}
peer.LastPingReceived = time.Now()
2021-08-20 19:32:50 +02:00
device.Event_server_register <- content
return nil
}
func (device *Device) server_process_Pong(peer *Peer, content path.PongMsg) error {
peer.LastPingReceived = time.Now()
2021-08-20 19:32:50 +02:00
device.Event_server_pong <- content
return nil
}
2021-09-20 22:20:00 +02:00
func (device *Device) process_ping(peer *Peer, content path.PingMsg) error {
peer.LastPingReceived = time.Now()
peer.Lock()
2021-09-26 14:59:57 +02:00
//remove peer.endpoint_trylist
peer.Unlock()
2021-08-20 19:32:50 +02:00
PongMSG := path.PongMsg{
Src_nodeID: content.Src_nodeID,
Dst_nodeID: device.ID,
Timediff: device.graph.GetCurrentTime().Sub(content.Time),
}
if device.DRoute.P2P.UseP2P && time.Now().After(device.graph.NhTableExpire) {
2021-08-25 15:21:26 +02:00
device.graph.UpdateLentancy(content.Src_nodeID, device.ID, PongMSG.Timediff, true, false)
}
2021-08-20 19:32:50 +02:00
body, err := path.GetByte(&PongMSG)
if err != nil {
return err
}
buf := make([]byte, path.EgHeaderLen+len(body))
header, err := path.NewEgHeader(buf[:path.EgHeaderLen])
header.SetSrc(device.ID)
2021-09-20 22:20:00 +02:00
header.SetTTL(device.DefaultTTL)
2021-08-20 19:32:50 +02:00
header.SetPacketLength(uint16(len(body)))
2021-08-21 16:54:24 +02:00
copy(buf[path.EgHeaderLen:], body)
2021-08-20 19:32:50 +02:00
if device.DRoute.SuperNode.UseSuperNode {
2021-08-25 10:13:53 +02:00
header.SetDst(config.SuperNodeMessage)
2021-09-21 03:15:23 +02:00
device.Send2Super(path.PongPacket, buf, MessageTransportOffsetContent)
2021-08-20 19:32:50 +02:00
}
if device.DRoute.P2P.UseP2P {
2021-08-25 10:13:53 +02:00
header.SetDst(config.ControlMessage)
2021-09-21 03:15:23 +02:00
device.SpreadPacket(make(map[config.Vertex]bool), path.PongPacket, buf, MessageTransportOffsetContent)
2021-08-20 19:32:50 +02:00
}
go device.SendPing(peer, content.RequestReply, 0, 3)
2021-08-20 19:32:50 +02:00
return nil
}
func (device *Device) SendPing(peer *Peer, times int, replies int, interval float64) {
for i := 0; i < times; i++ {
packet, usage, _ := device.GeneratePingPacket(device.ID, replies)
device.SendPacket(peer, usage, packet, MessageTransportOffsetContent)
time.Sleep(path.S2TD(interval))
}
}
2021-08-23 18:39:04 +02:00
func (device *Device) process_pong(peer *Peer, content path.PongMsg) error {
2021-08-20 19:32:50 +02:00
if device.DRoute.P2P.UseP2P {
if time.Now().After(device.graph.NhTableExpire) {
2021-08-25 15:21:26 +02:00
device.graph.UpdateLentancy(content.Src_nodeID, content.Dst_nodeID, content.Timediff, true, false)
}
2021-08-23 18:39:04 +02:00
if !peer.AskedForNeighbor {
QueryPeerMsg := path.QueryPeerMsg{
Request_ID: uint32(device.ID),
}
body, err := path.GetByte(&QueryPeerMsg)
if err != nil {
return err
}
buf := make([]byte, path.EgHeaderLen+len(body))
header, err := path.NewEgHeader(buf[:path.EgHeaderLen])
header.SetSrc(device.ID)
2021-09-20 22:20:00 +02:00
header.SetTTL(device.DefaultTTL)
2021-08-23 18:39:04 +02:00
header.SetPacketLength(uint16(len(body)))
copy(buf[path.EgHeaderLen:], body)
2021-09-21 03:15:23 +02:00
device.SendPacket(peer, path.QueryPeer, buf, MessageTransportOffsetContent)
2021-08-23 18:39:04 +02:00
}
2021-08-20 19:32:50 +02:00
}
return nil
}
func (device *Device) process_UpdatePeerMsg(peer *Peer, content path.UpdatePeerMsg) error {
2021-08-20 19:32:50 +02:00
var send_signal bool
if device.DRoute.SuperNode.UseSuperNode {
if peer.ID != config.SuperNodeMessage {
if device.LogLevel.LogControl {
fmt.Println("Control: Ignored UpdateErrorMsg. Not from supernode.")
}
return nil
}
2021-09-21 22:03:11 +02:00
var peer_infos config.API_Peers
2021-08-20 19:32:50 +02:00
if bytes.Equal(device.peers.Peer_state[:], content.State_hash[:]) {
2021-08-25 13:54:13 +02:00
if device.LogLevel.LogControl {
fmt.Println("Control: Same PeerState Hash, skip download nhTable")
}
2021-08-20 19:32:50 +02:00
return nil
}
2021-08-21 16:54:24 +02:00
downloadurl := device.DRoute.SuperNode.APIUrl + "/peerinfo?NodeID=" + strconv.Itoa(int(device.ID)) + "&PubKey=" + url.QueryEscape(device.staticIdentity.publicKey.ToString()) + "&State=" + url.QueryEscape(string(content.State_hash[:]))
2021-08-25 10:13:53 +02:00
if device.LogLevel.LogControl {
2021-08-25 13:54:13 +02:00
fmt.Println("Control: Download peerinfo from :" + downloadurl)
2021-08-21 16:54:24 +02:00
}
2021-08-24 20:16:21 +02:00
client := http.Client{
Timeout: 30 * time.Second,
}
resp, err := client.Get(downloadurl)
2021-08-20 19:32:50 +02:00
if err != nil {
2021-08-24 20:16:21 +02:00
device.log.Errorf(err.Error())
2021-08-20 19:32:50 +02:00
return err
}
defer resp.Body.Close()
allbytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
2021-08-24 20:16:21 +02:00
device.log.Errorf(err.Error())
2021-08-20 19:32:50 +02:00
return err
}
2021-09-26 14:59:57 +02:00
if resp.StatusCode != 200 {
device.log.Errorf("Control: Download peerinfo result failed: " + strconv.Itoa(resp.StatusCode) + " " + string(allbytes))
return nil
}
2021-08-25 10:13:53 +02:00
if device.LogLevel.LogControl {
2021-08-25 13:54:13 +02:00
fmt.Println("Control: Download peerinfo result :" + string(allbytes))
2021-08-25 10:13:53 +02:00
}
2021-08-20 19:32:50 +02:00
if err := json.Unmarshal(allbytes, &peer_infos); err != nil {
2021-09-26 14:59:57 +02:00
device.log.Errorf("JSON decode error:", err.Error())
2021-08-20 19:32:50 +02:00
return err
}
2021-09-21 03:15:23 +02:00
for nodeID, thepeer := range device.peers.IDMap {
pk := thepeer.handshake.remoteStatic
psk := thepeer.handshake.presharedKey
if val, ok := peer_infos[pk.ToString()]; ok {
if val.NodeID != nodeID {
device.RemovePeer(pk)
continue
} else if val.PSKey != psk.ToString() {
device.RemovePeer(pk)
continue
}
} else {
device.RemovePeer(pk)
continue
}
}
2021-09-21 22:03:11 +02:00
for PubKey, peerinfo := range peer_infos {
2021-08-20 19:32:50 +02:00
if len(peerinfo.Connurl) == 0 {
continue
2021-08-20 19:32:50 +02:00
}
sk, err := Str2PubKey(PubKey)
2021-09-21 03:15:23 +02:00
if err != nil {
device.log.Errorf("Error decode base64:", err)
continue
2021-09-21 03:15:23 +02:00
}
2021-08-20 19:32:50 +02:00
if bytes.Equal(sk[:], device.staticIdentity.publicKey[:]) {
continue
}
thepeer := device.LookupPeer(sk)
if thepeer == nil { //not exist in local
if device.LogLevel.LogControl {
2021-09-21 22:03:11 +02:00
fmt.Println("Control: Add new peer to local ID:" + peerinfo.NodeID.ToString() + " PubKey:" + PubKey)
}
2021-08-20 19:32:50 +02:00
if device.graph.Weight(device.ID, peerinfo.NodeID) == path.Infinity { // add node to graph
2021-08-25 15:21:26 +02:00
device.graph.UpdateLentancy(device.ID, peerinfo.NodeID, path.S2TD(path.Infinity), true, false)
2021-08-20 19:32:50 +02:00
}
if device.graph.Weight(peerinfo.NodeID, device.ID) == path.Infinity { // add node to graph
2021-08-25 15:21:26 +02:00
device.graph.UpdateLentancy(peerinfo.NodeID, device.ID, path.S2TD(path.Infinity), true, false)
2021-08-20 19:32:50 +02:00
}
2021-09-21 22:03:11 +02:00
device.NewPeer(sk, peerinfo.NodeID, false)
2021-08-20 19:32:50 +02:00
thepeer = device.LookupPeer(sk)
}
if peerinfo.PSKey != "" {
pk, err := Str2PSKey(peerinfo.PSKey)
if err != nil {
device.log.Errorf("Error decode base64:", err)
continue
2021-08-20 19:32:50 +02:00
}
thepeer.SetPSK(pk)
2021-08-20 19:32:50 +02:00
}
if thepeer.LastPingReceived.Add(path.S2TD(device.DRoute.PeerAliveTimeout)).Before(time.Now()) {
2021-08-20 19:32:50 +02:00
//Peer died, try to switch to this new endpoint
for url, _ := range peerinfo.Connurl {
2021-09-21 03:15:23 +02:00
thepeer.Lock()
thepeer.endpoint_trylist.Set(url, time.Time{}) //another gorouting will process it
thepeer.Unlock()
2021-08-20 19:32:50 +02:00
send_signal = true
}
}
}
device.peers.Peer_state = content.State_hash
if send_signal {
device.event_tryendpoint <- struct{}{}
}
}
return nil
}
func (device *Device) process_UpdateNhTableMsg(peer *Peer, content path.UpdateNhTableMsg) error {
if device.DRoute.SuperNode.UseSuperNode {
if peer.ID != config.SuperNodeMessage {
if device.LogLevel.LogControl {
fmt.Println("Control: Ignored UpdateErrorMsg. Not from supernode.")
}
return nil
}
if bytes.Equal(device.graph.NhTableHash[:], content.State_hash[:]) {
if device.LogLevel.LogControl {
fmt.Println("Control: Same nhTable Hash, skip download nhTable")
}
device.graph.NhTableExpire = time.Now().Add(device.graph.SuperNodeInfoTimeout)
return nil
}
var NhTable config.NextHopTable
if bytes.Equal(device.graph.NhTableHash[:], content.State_hash[:]) {
return nil
}
downloadurl := device.DRoute.SuperNode.APIUrl + "/nhtable?NodeID=" + strconv.Itoa(int(device.ID)) + "&PubKey=" + url.QueryEscape(device.staticIdentity.publicKey.ToString()) + "&State=" + url.QueryEscape(string(content.State_hash[:]))
if device.LogLevel.LogControl {
fmt.Println("Control: Download NhTable from :" + downloadurl)
}
client := http.Client{
Timeout: 30 * time.Second,
}
resp, err := client.Get(downloadurl)
if err != nil {
device.log.Errorf(err.Error())
return err
}
defer resp.Body.Close()
allbytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
device.log.Errorf(err.Error())
return err
}
2021-09-26 14:59:57 +02:00
if resp.StatusCode != 200 {
device.log.Errorf("Control: Download peerinfo result failed: " + strconv.Itoa(resp.StatusCode) + " " + string(allbytes))
return nil
}
if device.LogLevel.LogControl {
fmt.Println("Control: Download NhTable result :" + string(allbytes))
}
if err := json.Unmarshal(allbytes, &NhTable); err != nil {
2021-09-26 14:59:57 +02:00
device.log.Errorf("JSON decode error:", err.Error())
return err
}
device.graph.SetNHTable(NhTable, content.State_hash)
}
return nil
}
func (device *Device) process_UpdateErrorMsg(peer *Peer, content path.UpdateErrorMsg) error {
if peer.ID != config.SuperNodeMessage {
if device.LogLevel.LogControl {
fmt.Println("Control: Ignored UpdateErrorMsg. Not from supernode.")
}
return nil
}
device.log.Errorf(strconv.Itoa(content.ErrorCode) + ": " + content.ErrorMsg)
if content.Action == path.Shutdown {
device.closed <- struct{}{}
} else if content.Action == path.Panic {
panic(content.ToString())
}
return nil
}
2021-08-20 19:32:50 +02:00
func (device *Device) RoutineSetEndpoint() {
if !(device.DRoute.P2P.UseP2P || device.DRoute.SuperNode.UseSuperNode) {
return
}
for {
NextRun := false
<-device.event_tryendpoint
for _, thepeer := range device.peers.IDMap {
if thepeer.LastPingReceived.Add(path.S2TD(device.DRoute.PeerAliveTimeout)).After(time.Now()) {
2021-08-20 19:32:50 +02:00
//Peer alives
2021-09-26 14:59:57 +02:00
continue
2021-08-20 19:32:50 +02:00
} else {
2021-09-21 03:15:23 +02:00
thepeer.RLock()
thepeer.endpoint_trylist.Sort(func(a *orderedmap.Pair, b *orderedmap.Pair) bool {
return a.Value().(time.Time).Before(b.Value().(time.Time))
})
2021-09-21 03:15:23 +02:00
trylist := thepeer.endpoint_trylist.Keys()
thepeer.RUnlock()
for _, key := range trylist { // try next endpoint
connurl := key
thepeer.RLock()
val, hasval := thepeer.endpoint_trylist.Get(key)
thepeer.RUnlock()
if !hasval {
continue
}
trytime := val.(time.Time)
if trytime.Sub(time.Time{}) != time.Duration(0) && time.Now().Sub(trytime) > path.S2TD(device.DRoute.ConnTimeOut) { // tried before, but no response
thepeer.Lock()
2021-08-23 18:39:04 +02:00
thepeer.endpoint_trylist.Delete(key)
2021-09-21 03:15:23 +02:00
thepeer.Unlock()
2021-08-20 19:32:50 +02:00
} else {
if device.LogLevel.LogControl {
fmt.Println("Control: Set endpoint to " + connurl + " for NodeID:" + thepeer.ID.ToString())
}
err := thepeer.SetEndpointFromConnURL(connurl, thepeer.ConnAF, thepeer.StaticConn) //trying to bind first url in the list and wait device.DRoute.P2P.PeerAliveTimeout seconds
2021-08-20 19:32:50 +02:00
if err != nil {
device.log.Errorf("Bind " + connurl + " failed!")
2021-09-21 03:15:23 +02:00
thepeer.Lock()
thepeer.endpoint_trylist.Delete(connurl)
thepeer.Unlock()
continue
2021-08-20 19:32:50 +02:00
}
NextRun = true
2021-09-21 03:15:23 +02:00
thepeer.Lock()
thepeer.endpoint_trylist.Set(key, time.Now())
thepeer.Unlock()
2021-08-20 19:32:50 +02:00
//Send Ping message to it
go device.SendPing(thepeer, int(device.DRoute.ConnNextTry+1), 1, 1)
2021-09-21 03:15:23 +02:00
break
2021-08-20 19:32:50 +02:00
}
2021-09-21 03:15:23 +02:00
}
2021-08-20 19:32:50 +02:00
}
}
ClearChanLoop:
for {
select {
case <-device.event_tryendpoint:
default:
break ClearChanLoop
}
}
time.Sleep(path.S2TD(device.DRoute.ConnNextTry))
2021-08-20 19:32:50 +02:00
if NextRun {
device.event_tryendpoint <- struct{}{}
}
}
}
func (device *Device) RoutineSendPing() {
if !(device.DRoute.P2P.UseP2P || device.DRoute.SuperNode.UseSuperNode) {
return
}
for {
packet, usage, _ := device.GeneratePingPacket(device.ID, 0)
2021-09-21 03:15:23 +02:00
device.SpreadPacket(make(map[config.Vertex]bool), usage, packet, MessageTransportOffsetContent)
2021-08-20 19:32:50 +02:00
time.Sleep(path.S2TD(device.DRoute.SendPingInterval))
}
}
func (device *Device) RoutineRegister() {
if !(device.DRoute.SuperNode.UseSuperNode) {
return
}
2021-08-25 10:13:53 +02:00
_ = <-device.Event_Supernode_OK
2021-08-20 19:32:50 +02:00
for {
2021-08-20 19:32:50 +02:00
body, _ := path.GetByte(path.RegisterMsg{
2021-08-23 21:11:01 +02:00
Node_id: device.ID,
PeerStateHash: device.peers.Peer_state,
NhStateHash: device.graph.NhTableHash,
2021-09-20 22:20:00 +02:00
Version: device.Version,
LocalV4: net.UDPAddr{
IP: device.peers.LocalV4,
Port: int(device.net.port),
},
LocalV6: net.UDPAddr{
IP: device.peers.LocalV6,
Port: int(device.net.port),
},
2021-08-20 19:32:50 +02:00
})
buf := make([]byte, path.EgHeaderLen+len(body))
header, _ := path.NewEgHeader(buf[0:path.EgHeaderLen])
2021-08-25 10:13:53 +02:00
header.SetDst(config.SuperNodeMessage)
2021-08-20 19:32:50 +02:00
header.SetTTL(0)
header.SetSrc(device.ID)
header.SetPacketLength(uint16(len(body)))
copy(buf[path.EgHeaderLen:], body)
2021-09-21 03:15:23 +02:00
device.Send2Super(path.Register, buf, MessageTransportOffsetContent)
2021-08-20 19:32:50 +02:00
time.Sleep(path.S2TD(device.DRoute.SendPingInterval))
}
}
func (device *Device) RoutineRecalculateNhTable() {
if device.IsSuperNode {
for {
changed := device.graph.RecalculateNhTable(true)
if changed {
device.Event_server_NhTable_changed <- struct{}{}
}
time.Sleep(device.graph.NodeReportTimeout)
}
} else {
if !device.DRoute.P2P.UseP2P {
return
}
for {
2021-08-24 20:16:21 +02:00
if time.Now().After(device.graph.NhTableExpire) {
device.graph.RecalculateNhTable(false)
}
2021-08-20 19:32:50 +02:00
time.Sleep(device.graph.NodeReportTimeout)
}
}
2021-08-23 18:39:04 +02:00
}
2021-08-20 19:32:50 +02:00
2021-08-23 18:39:04 +02:00
func (device *Device) RoutineSpreadAllMyNeighbor() {
if !device.DRoute.P2P.UseP2P {
return
}
for {
device.process_RequestPeerMsg(path.QueryPeerMsg{
Request_ID: uint32(config.Broadcast),
2021-08-23 18:39:04 +02:00
})
time.Sleep(path.S2TD(device.DRoute.P2P.SendPeerInterval))
}
2021-08-20 19:32:50 +02:00
}
2021-08-24 10:43:55 +02:00
func (device *Device) RoutineResetConn() {
if device.ResetConnInterval <= 0.01 {
return
}
for {
for _, peer := range device.peers.keyMap {
if !peer.StaticConn { //Do not reset connecton for dynamic peer
2021-08-24 10:43:55 +02:00
continue
}
if peer.ConnURL == "" {
continue
}
err := peer.SetEndpointFromConnURL(peer.ConnURL, peer.ConnAF, peer.StaticConn)
2021-08-24 10:43:55 +02:00
if err != nil {
device.log.Errorf("Failed to bind "+peer.ConnURL, err)
continue
}
}
time.Sleep(path.S2TD(device.ResetConnInterval))
2021-08-24 10:43:55 +02:00
}
}
func (device *Device) GeneratePingPacket(src_nodeID config.Vertex, request_reply int) ([]byte, path.Usage, error) {
2021-08-20 19:32:50 +02:00
body, err := path.GetByte(&path.PingMsg{
Src_nodeID: src_nodeID,
Time: device.graph.GetCurrentTime(),
RequestReply: request_reply,
2021-08-20 19:32:50 +02:00
})
if err != nil {
2021-09-21 03:15:23 +02:00
return nil, path.PingPacket, err
2021-08-20 19:32:50 +02:00
}
buf := make([]byte, path.EgHeaderLen+len(body))
header, _ := path.NewEgHeader(buf[0:path.EgHeaderLen])
if err != nil {
2021-09-21 03:15:23 +02:00
return nil, path.PingPacket, err
2021-08-20 19:32:50 +02:00
}
2021-09-20 22:20:00 +02:00
header.SetDst(config.ControlMessage)
2021-08-20 19:32:50 +02:00
header.SetTTL(0)
header.SetSrc(device.ID)
header.SetPacketLength(uint16(len(body)))
copy(buf[path.EgHeaderLen:], body)
2021-09-21 03:15:23 +02:00
return buf, path.PingPacket, nil
2021-08-20 19:32:50 +02:00
}
2021-08-23 18:39:04 +02:00
func (device *Device) process_RequestPeerMsg(content path.QueryPeerMsg) error { //Send all my peers to all my peers
2021-08-20 19:32:50 +02:00
if device.DRoute.P2P.UseP2P {
2021-08-23 18:39:04 +02:00
device.peers.RLock()
2021-08-20 19:32:50 +02:00
for pubkey, peer := range device.peers.keyMap {
2021-08-25 10:13:53 +02:00
if peer.ID >= config.Special_NodeID {
2021-08-20 19:32:50 +02:00
continue
}
2021-08-23 18:39:04 +02:00
if peer.endpoint == nil {
continue
}
peer.handshake.mutex.RLock()
2021-08-20 19:32:50 +02:00
response := path.BoardcastPeerMsg{
2021-08-21 16:54:24 +02:00
Request_ID: content.Request_ID,
NodeID: peer.ID,
PubKey: pubkey,
ConnURL: peer.endpoint.DstToString(),
2021-08-20 19:32:50 +02:00
}
2021-08-23 18:39:04 +02:00
peer.handshake.mutex.RUnlock()
2021-08-20 19:32:50 +02:00
body, err := path.GetByte(response)
if err != nil {
device.log.Errorf("Error at receivesendproc.go line221: ", err)
continue
}
buf := make([]byte, path.EgHeaderLen+len(body))
header, _ := path.NewEgHeader(buf[0:path.EgHeaderLen])
2021-08-25 10:13:53 +02:00
header.SetDst(config.ControlMessage)
2021-09-20 22:20:00 +02:00
header.SetTTL(device.DefaultTTL)
2021-08-20 19:32:50 +02:00
header.SetSrc(device.ID)
header.SetPacketLength(uint16(len(body)))
copy(buf[path.EgHeaderLen:], body)
device.SpreadPacket(make(map[config.Vertex]bool), path.BroadcastPeer, buf, MessageTransportOffsetContent)
2021-08-20 19:32:50 +02:00
}
2021-08-23 18:39:04 +02:00
device.peers.RUnlock()
2021-08-20 19:32:50 +02:00
}
return nil
}
2021-08-24 10:43:55 +02:00
func (device *Device) process_BoardcastPeerMsg(peer *Peer, content path.BoardcastPeerMsg) error {
2021-08-20 19:32:50 +02:00
if device.DRoute.P2P.UseP2P {
2021-09-21 03:15:23 +02:00
var pk NoisePublicKey
2021-08-24 10:43:55 +02:00
if content.Request_ID == uint32(device.ID) {
peer.AskedForNeighbor = true
}
2021-08-23 18:39:04 +02:00
if bytes.Equal(content.PubKey[:], device.staticIdentity.publicKey[:]) {
return nil
}
2021-09-21 03:15:23 +02:00
copy(pk[:], content.PubKey[:])
thepeer := device.LookupPeer(pk)
2021-08-20 19:32:50 +02:00
if thepeer == nil { //not exist in local
if device.LogLevel.LogControl {
2021-09-21 03:15:23 +02:00
fmt.Println("Control: Add new peer to local ID:" + content.NodeID.ToString() + " PubKey:" + pk.ToString())
}
2021-08-20 19:32:50 +02:00
if device.graph.Weight(device.ID, content.NodeID) == path.Infinity { // add node to graph
2021-08-25 15:21:26 +02:00
device.graph.UpdateLentancy(device.ID, content.NodeID, path.S2TD(path.Infinity), true, false)
2021-08-20 19:32:50 +02:00
}
if device.graph.Weight(content.NodeID, device.ID) == path.Infinity { // add node to graph
2021-08-25 15:21:26 +02:00
device.graph.UpdateLentancy(content.NodeID, device.ID, path.S2TD(path.Infinity), true, false)
2021-08-20 19:32:50 +02:00
}
2021-09-21 22:03:11 +02:00
device.NewPeer(pk, content.NodeID, false)
2021-08-20 19:32:50 +02:00
}
if thepeer.LastPingReceived.Add(path.S2TD(device.DRoute.PeerAliveTimeout)).Before(time.Now()) {
2021-08-20 19:32:50 +02:00
//Peer died, try to switch to this new endpoint
2021-09-21 03:15:23 +02:00
thepeer.Lock()
thepeer.endpoint_trylist.Set(content.ConnURL, time.Time{}) //another gorouting will process it
thepeer.Unlock()
2021-08-23 18:39:04 +02:00
device.event_tryendpoint <- struct{}{}
2021-08-20 19:32:50 +02:00
}
}
return nil
}