EtherGuard-VPN/device/receivesendproc.go

1089 lines
35 KiB
Go
Raw Normal View History

2021-12-04 15:46:36 +01:00
/* SPDX-License-Identifier: MIT
*
* Copyright (C) 2017-2021 Kusakabe Si. All Rights Reserved.
*/
2021-08-20 19:32:50 +02:00
package device
import (
"bytes"
2021-12-02 18:13:48 +01:00
"encoding/base64"
2021-08-20 19:32:50 +02:00
"encoding/json"
"errors"
"fmt"
"hash/crc32"
"io/ioutil"
"net"
2021-08-20 19:32:50 +02:00
"net/http"
2021-08-21 16:54:24 +02:00
"strconv"
"strings"
2021-12-02 18:13:48 +01:00
"syscall"
2021-08-20 19:32:50 +02:00
"time"
2021-12-15 08:55:28 +01:00
"github.com/KusakabeSi/EtherGuard-VPN/conn"
2021-12-02 18:13:48 +01:00
"github.com/KusakabeSi/EtherGuard-VPN/mtypes"
"github.com/KusakabeSi/EtherGuard-VPN/path"
"github.com/KusakabeSi/EtherGuard-VPN/tap"
"github.com/golang-jwt/jwt"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
2021-08-20 19:32:50 +02:00
)
func (device *Device) SendPacket(peer *Peer, usage path.Usage, ttl uint8, packet []byte, offset int) {
2021-08-20 19:32:50 +02:00
if peer == nil {
return
} else if peer.endpoint == nil {
return
2021-08-20 19:32:50 +02:00
}
2021-10-01 10:56:42 +02:00
if usage == path.NormalPacket && len(packet)-path.EgHeaderLen <= 12 {
if device.LogLevel.LogNormal {
2021-12-12 09:29:05 +01:00
fmt.Printf("Normal: Send Len:%v Invalid packet: Ethernet packet too small\n", len(packet)-path.EgHeaderLen)
2021-10-01 10:56:42 +02:00
}
return
}
2021-08-25 10:13:53 +02:00
if device.LogLevel.LogNormal {
EgHeader, _ := path.NewEgHeader(packet[:path.EgHeaderLen], device.EdgeConfig.Interface.MTU)
2021-10-01 10:56:42 +02:00
if usage == path.NormalPacket && EgHeader.GetSrc() == device.ID {
2021-08-25 10:13:53 +02:00
dst_nodeID := EgHeader.GetDst()
2021-10-01 10:56:42 +02:00
packet_len := len(packet) - path.EgHeaderLen
2021-12-13 05:20:58 +01:00
fmt.Printf("Normal: Send Len:%v S:%v D:%v TTL:%v To:%v IP:%v:\n", packet_len, device.ID.ToString(), dst_nodeID.ToString(), ttl, peer.ID.ToString(), peer.GetEndpointDstStr())
packet := gopacket.NewPacket(packet[path.EgHeaderLen:], layers.LayerTypeEthernet, gopacket.Default)
fmt.Println(packet.Dump())
2021-08-25 10:13:53 +02:00
}
}
if device.LogLevel.LogControl {
EgHeader, _ := path.NewEgHeader(packet[:path.EgHeaderLen], device.EdgeConfig.Interface.MTU)
if usage != path.NormalPacket {
2021-08-21 16:54:24 +02:00
if peer.GetEndpointDstStr() != "" {
src_nodeID := EgHeader.GetSrc()
2021-12-09 23:39:37 +01:00
dst_nodeID := EgHeader.GetDst()
2021-12-13 05:20:58 +01:00
fmt.Printf("Control: Send %v S:%v D:%v TTL:%v To:%v IP:%v\n", device.sprint_received(usage, packet[path.EgHeaderLen:]), src_nodeID.ToString(), dst_nodeID.ToString(), ttl, peer.ID.ToString(), peer.GetEndpointDstStr())
2021-08-21 16:54:24 +02:00
}
}
}
2021-08-25 10:13:53 +02:00
2021-08-20 19:32:50 +02:00
var elem *QueueOutboundElement
elem = device.NewOutboundElement()
copy(elem.buffer[offset:offset+len(packet)], packet)
2021-09-21 03:15:23 +02:00
elem.Type = usage
elem.TTL = ttl
2021-08-20 19:32:50 +02:00
elem.packet = elem.buffer[offset : offset+len(packet)]
if peer.isRunning.Get() {
peer.StagePacket(elem)
elem = nil
peer.SendStagedPackets()
}
}
func (device *Device) BoardcastPacket(skip_list map[mtypes.Vertex]bool, usage path.Usage, ttl uint8, packet []byte, offset int) { // Send packet to all connected peers
2021-08-20 19:32:50 +02:00
send_list := device.graph.GetBoardcastList(device.ID)
2021-12-09 08:46:15 +01:00
for node_id := range skip_list {
2021-08-20 19:32:50 +02:00
send_list[node_id] = false
}
2021-08-23 18:39:04 +02:00
device.peers.RLock()
2021-08-20 19:32:50 +02:00
for node_id, should_send := range send_list {
if should_send {
2021-12-09 08:46:15 +01:00
peer_out := device.peers.IDMap[node_id]
go device.SendPacket(peer_out, usage, ttl, packet, offset)
2021-08-20 19:32:50 +02:00
}
}
2021-08-23 18:39:04 +02:00
device.peers.RUnlock()
2021-08-20 19:32:50 +02:00
}
func (device *Device) SpreadPacket(skip_list map[mtypes.Vertex]bool, usage path.Usage, ttl uint8, packet []byte, offset int) { // Send packet to all peers no matter it is alive
2021-08-23 18:39:04 +02:00
device.peers.RLock()
2021-08-20 19:32:50 +02:00
for peer_id, peer_out := range device.peers.IDMap {
if _, ok := skip_list[peer_id]; ok {
2021-12-13 05:20:58 +01:00
if device.LogLevel.LogTransit && peer_out.endpoint != nil {
fmt.Printf("Transit: Skipped Spread Packet packet Me:%v To:%d TTL:%v\n", device.ID, peer_out.ID, ttl)
2021-08-20 19:32:50 +02:00
}
continue
}
go device.SendPacket(peer_out, usage, ttl, packet, offset)
2021-08-20 19:32:50 +02:00
}
2021-08-23 18:39:04 +02:00
device.peers.RUnlock()
2021-08-20 19:32:50 +02:00
}
func (device *Device) TransitBoardcastPacket(src_nodeID mtypes.Vertex, in_id mtypes.Vertex, usage path.Usage, ttl uint8, packet []byte, offset int) {
2021-12-09 08:46:15 +01:00
node_boardcast_list, errs := device.graph.GetBoardcastThroughList(device.ID, in_id, src_nodeID)
if device.LogLevel.LogControl {
for _, err := range errs {
fmt.Printf("Internal: Can't boardcast: %v", err)
}
}
2021-08-23 18:39:04 +02:00
device.peers.RLock()
2021-08-20 19:32:50 +02:00
for peer_id := range node_boardcast_list {
peer_out := device.peers.IDMap[peer_id]
2021-08-25 10:13:53 +02:00
if device.LogLevel.LogTransit {
2021-12-13 05:20:58 +01:00
fmt.Printf("Transit: Transfer From:%v Me:%v To:%v S:%v D:%v TTL:%v\n", in_id, device.ID, peer_out.ID, src_nodeID.ToString(), peer_out.ID.ToString(), ttl)
2021-08-20 19:32:50 +02:00
}
go device.SendPacket(peer_out, usage, ttl, packet, offset)
2021-08-20 19:32:50 +02:00
}
2021-08-23 18:39:04 +02:00
device.peers.RUnlock()
2021-08-20 19:32:50 +02:00
}
func (device *Device) Send2Super(usage path.Usage, ttl uint8, packet []byte, offset int) {
2021-08-23 18:39:04 +02:00
device.peers.RLock()
2021-12-03 23:46:58 +01:00
if device.EdgeConfig.DynamicRoute.SuperNode.UseSuperNode {
2021-08-20 19:32:50 +02:00
for _, peer_out := range device.peers.SuperPeer {
2021-08-25 10:13:53 +02:00
/*if device.LogTransit {
2021-08-20 19:32:50 +02:00
fmt.Printf("Send to supernode %s\n", peer_out.endpoint.DstToString())
2021-08-25 10:13:53 +02:00
}*/
go device.SendPacket(peer_out, usage, ttl, packet, offset)
2021-08-20 19:32:50 +02:00
}
}
2021-08-23 18:39:04 +02:00
device.peers.RUnlock()
2021-08-20 19:32:50 +02:00
}
func (device *Device) CheckNoDup(packet []byte) bool {
hasher := crc32.New(crc32.MakeTable(crc32.Castagnoli))
hasher.Write(packet)
crc32result := hasher.Sum32()
_, ok := device.DupData.Get(crc32result)
device.DupData.Set(crc32result, true)
return !ok
}
2021-08-23 18:39:04 +02:00
func (device *Device) process_received(msg_type path.Usage, peer *Peer, body []byte) (err error) {
2021-08-20 19:32:50 +02:00
if device.IsSuperNode {
switch msg_type {
case path.Register:
2021-12-02 18:13:48 +01:00
if content, err := mtypes.ParseRegisterMsg(body); err == nil {
return device.server_process_RegisterMsg(peer, content)
2021-12-09 23:39:37 +01:00
} else {
return err
2021-08-20 19:32:50 +02:00
}
case path.PongPacket:
2021-12-02 18:13:48 +01:00
if content, err := mtypes.ParsePongMsg(body); err == nil {
return device.server_process_Pong(peer, content)
2021-12-09 23:39:37 +01:00
} else {
return err
2021-08-20 19:32:50 +02:00
}
default:
2021-12-09 08:46:15 +01:00
err = errors.New("not a valid msg_type")
2021-08-20 19:32:50 +02:00
}
} else {
switch msg_type {
2021-12-04 03:32:59 +01:00
case path.ServerUpdate:
if content, err := mtypes.ParseServerUpdateMsg(body); err == nil {
device.process_ServerUpdateMsg(peer, content)
2021-12-09 23:39:37 +01:00
} else {
return err
2021-08-20 19:32:50 +02:00
}
case path.PingPacket:
2021-12-02 18:13:48 +01:00
if content, err := mtypes.ParsePingMsg(body); err == nil {
2021-09-20 22:20:00 +02:00
return device.process_ping(peer, content)
2021-12-09 23:39:37 +01:00
} else {
return err
2021-08-20 19:32:50 +02:00
}
case path.PongPacket:
2021-12-02 18:13:48 +01:00
if content, err := mtypes.ParsePongMsg(body); err == nil {
2021-08-23 18:39:04 +02:00
return device.process_pong(peer, content)
2021-12-09 23:39:37 +01:00
} else {
return err
2021-08-20 19:32:50 +02:00
}
2021-08-23 18:39:04 +02:00
case path.QueryPeer:
2021-12-02 18:13:48 +01:00
if content, err := mtypes.ParseQueryPeerMsg(body); err == nil {
2021-08-20 19:32:50 +02:00
return device.process_RequestPeerMsg(content)
2021-12-09 23:39:37 +01:00
} else {
return err
2021-08-20 19:32:50 +02:00
}
case path.BroadcastPeer:
2021-12-02 18:13:48 +01:00
if content, err := mtypes.ParseBoardcastPeerMsg(body); err == nil {
2021-08-24 10:43:55 +02:00
return device.process_BoardcastPeerMsg(peer, content)
2021-12-09 23:39:37 +01:00
} else {
return err
2021-08-20 19:32:50 +02:00
}
default:
2021-12-09 08:46:15 +01:00
err = errors.New("not a valid msg_type")
2021-08-20 19:32:50 +02:00
}
}
return
}
func (device *Device) sprint_received(msg_type path.Usage, body []byte) string {
2021-08-21 16:54:24 +02:00
switch msg_type {
case path.Register:
2021-12-02 18:13:48 +01:00
if content, err := mtypes.ParseRegisterMsg(body); err == nil {
return content.ToString()
2021-08-21 16:54:24 +02:00
}
return "RegisterMsg: Parse failed"
2021-12-04 03:32:59 +01:00
case path.ServerUpdate:
if content, err := mtypes.ParseServerUpdateMsg(body); err == nil {
return content.ToString()
2021-08-21 16:54:24 +02:00
}
2021-12-04 03:32:59 +01:00
return "ServerUpdate: Parse failed"
2021-08-21 16:54:24 +02:00
case path.PingPacket:
2021-12-02 18:13:48 +01:00
if content, err := mtypes.ParsePingMsg(body); err == nil {
return content.ToString()
2021-08-21 16:54:24 +02:00
}
return "PingPacketMsg: Parse failed"
2021-08-21 16:54:24 +02:00
case path.PongPacket:
2021-12-02 18:13:48 +01:00
if content, err := mtypes.ParsePongMsg(body); err == nil {
return content.ToString()
2021-08-21 16:54:24 +02:00
}
return "PongPacketMsg: Parse failed"
2021-08-23 18:39:04 +02:00
case path.QueryPeer:
2021-12-02 18:13:48 +01:00
if content, err := mtypes.ParseQueryPeerMsg(body); err == nil {
return content.ToString()
2021-08-21 16:54:24 +02:00
}
return "QueryPeerMsg: Parse failed"
case path.BroadcastPeer:
2021-12-02 18:13:48 +01:00
if content, err := mtypes.ParseBoardcastPeerMsg(body); err == nil {
return content.ToString()
2021-08-21 16:54:24 +02:00
}
return "BoardcastPeerMsg: Parse failed"
2021-08-21 16:54:24 +02:00
default:
2021-10-01 10:56:42 +02:00
return "UnknownMsg: Not a valid msg_type"
}
}
func (device *Device) GeneratePingPacket(src_nodeID mtypes.Vertex, request_reply int) ([]byte, path.Usage, uint8, error) {
2021-12-02 18:13:48 +01:00
body, err := mtypes.GetByte(&mtypes.PingMsg{
2021-10-01 10:56:42 +02:00
Src_nodeID: src_nodeID,
Time: device.graph.GetCurrentTime(),
RequestReply: request_reply,
})
if err != nil {
return nil, path.PingPacket, 0, err
2021-10-01 10:56:42 +02:00
}
buf := make([]byte, path.EgHeaderLen+len(body))
header, _ := path.NewEgHeader(buf[0:path.EgHeaderLen], device.EdgeConfig.Interface.MTU)
2021-10-01 10:56:42 +02:00
if err != nil {
return nil, path.PingPacket, 0, err
2021-10-01 10:56:42 +02:00
}
2021-12-12 09:29:05 +01:00
header.SetDst(mtypes.NodeID_Spread)
2021-10-01 10:56:42 +02:00
header.SetSrc(device.ID)
copy(buf[path.EgHeaderLen:], body)
return buf, path.PingPacket, 0, nil
2021-10-01 10:56:42 +02:00
}
func (device *Device) SendPing(peer *Peer, times int, replies int, interval float64) {
for i := 0; i < times; i++ {
packet, usage, ttl, _ := device.GeneratePingPacket(device.ID, replies)
device.SendPacket(peer, usage, ttl, packet, MessageTransportOffsetContent)
2021-12-03 23:46:58 +01:00
time.Sleep(mtypes.S2TD(interval))
2021-08-21 16:54:24 +02:00
}
}
func compareVersion(v1 string, v2 string) bool {
if strings.Contains(v1, "-") {
v1 = strings.Split(v1, "-")[0]
}
if strings.Contains(v2, "-") {
v2 = strings.Split(v2, "-")[0]
}
return v1 == v2
}
2021-12-02 18:13:48 +01:00
func (device *Device) server_process_RegisterMsg(peer *Peer, content mtypes.RegisterMsg) error {
2021-12-04 03:32:59 +01:00
ServerUpdateMsg := mtypes.ServerUpdateMsg{
Node_id: peer.ID,
Action: mtypes.NoAction,
Code: 0,
Params: "",
2021-09-20 22:20:00 +02:00
}
if peer.ID != content.Node_id {
2021-12-04 03:32:59 +01:00
ServerUpdateMsg = mtypes.ServerUpdateMsg{
Node_id: peer.ID,
Action: mtypes.ThrowError,
Code: int(syscall.EPERM),
Params: fmt.Sprintf("Your nodeID: %v is not match with registered nodeID: %v", content.Node_id, peer.ID),
}
2021-09-20 22:20:00 +02:00
}
2021-12-09 08:46:15 +01:00
if !compareVersion(content.Version, device.Version) {
2021-12-04 03:32:59 +01:00
ServerUpdateMsg = mtypes.ServerUpdateMsg{
Node_id: peer.ID,
Action: mtypes.ThrowError,
Code: int(syscall.ENOSYS),
Params: fmt.Sprintf("Your version: \"%v\" is not compatible with our version: \"%v\"", content.Version, device.Version),
2021-09-20 22:20:00 +02:00
}
}
2021-12-04 03:32:59 +01:00
if ServerUpdateMsg.Action != mtypes.NoAction {
body, err := mtypes.GetByte(&ServerUpdateMsg)
if err != nil {
return err
}
buf := make([]byte, path.EgHeaderLen+len(body))
header, _ := path.NewEgHeader(buf[:path.EgHeaderLen], device.EdgeConfig.Interface.MTU)
header.SetSrc(device.ID)
copy(buf[path.EgHeaderLen:], body)
2021-12-09 08:46:15 +01:00
header.SetDst(mtypes.NodeID_SuperNode)
device.SendPacket(peer, path.ServerUpdate, 0, buf, MessageTransportOffsetContent)
return nil
}
2021-12-04 03:32:59 +01:00
device.Chan_server_register <- content
2021-08-20 19:32:50 +02:00
return nil
}
2021-12-02 18:13:48 +01:00
func (device *Device) server_process_Pong(peer *Peer, content mtypes.PongMsg) error {
2021-12-04 03:32:59 +01:00
device.Chan_server_pong <- content
2021-08-20 19:32:50 +02:00
return nil
}
2021-12-02 18:13:48 +01:00
func (device *Device) process_ping(peer *Peer, content mtypes.PingMsg) error {
Timediff := device.graph.GetCurrentTime().Sub(content.Time).Seconds()
NewTimediff := peer.SingleWayLatency.Push(Timediff)
2021-10-27 03:02:44 +02:00
2021-12-02 18:13:48 +01:00
PongMSG := mtypes.PongMsg{
2021-10-27 03:02:44 +02:00
Src_nodeID: content.Src_nodeID,
Dst_nodeID: device.ID,
2021-12-09 21:23:02 +01:00
Timediff: NewTimediff,
2021-12-03 23:46:58 +01:00
TimeToAlive: device.EdgeConfig.DynamicRoute.PeerAliveTimeout,
AdditionalCost: device.EdgeConfig.DynamicRoute.AdditionalCost,
2021-08-20 19:32:50 +02:00
}
2021-12-03 23:46:58 +01:00
if device.EdgeConfig.DynamicRoute.P2P.UseP2P && time.Now().After(device.graph.NhTableExpire) {
2021-12-09 21:23:02 +01:00
device.graph.UpdateLatencyMulti([]mtypes.PongMsg{PongMSG}, true, false)
}
2021-12-02 18:13:48 +01:00
body, err := mtypes.GetByte(&PongMSG)
2021-08-20 19:32:50 +02:00
if err != nil {
return err
}
buf := make([]byte, path.EgHeaderLen+len(body))
header, _ := path.NewEgHeader(buf[:path.EgHeaderLen], device.EdgeConfig.Interface.MTU)
2021-08-20 19:32:50 +02:00
header.SetSrc(device.ID)
2021-08-21 16:54:24 +02:00
copy(buf[path.EgHeaderLen:], body)
2021-12-03 23:46:58 +01:00
if device.EdgeConfig.DynamicRoute.SuperNode.UseSuperNode {
2021-12-09 08:46:15 +01:00
header.SetDst(mtypes.NodeID_SuperNode)
device.Send2Super(path.PongPacket, 0, buf, MessageTransportOffsetContent)
2021-08-20 19:32:50 +02:00
}
2021-12-03 23:46:58 +01:00
if device.EdgeConfig.DynamicRoute.P2P.UseP2P {
2021-12-12 09:29:05 +01:00
header.SetDst(mtypes.NodeID_Spread)
device.SpreadPacket(make(map[mtypes.Vertex]bool), path.PongPacket, device.EdgeConfig.DefaultTTL, buf, MessageTransportOffsetContent)
2021-08-20 19:32:50 +02:00
}
go device.SendPing(peer, content.RequestReply, 0, 3)
2021-08-20 19:32:50 +02:00
return nil
}
2021-12-02 18:13:48 +01:00
func (device *Device) process_pong(peer *Peer, content mtypes.PongMsg) error {
2021-12-03 23:46:58 +01:00
if device.EdgeConfig.DynamicRoute.P2P.UseP2P {
if time.Now().After(device.graph.NhTableExpire) {
2021-12-03 23:46:58 +01:00
device.graph.UpdateLatency(content.Src_nodeID, content.Dst_nodeID, content.Timediff, device.EdgeConfig.DynamicRoute.PeerAliveTimeout, content.AdditionalCost, true, false)
}
2021-08-23 18:39:04 +02:00
if !peer.AskedForNeighbor {
2021-12-02 18:13:48 +01:00
QueryPeerMsg := mtypes.QueryPeerMsg{
2021-08-23 18:39:04 +02:00
Request_ID: uint32(device.ID),
}
2021-12-02 18:13:48 +01:00
body, err := mtypes.GetByte(&QueryPeerMsg)
2021-08-23 18:39:04 +02:00
if err != nil {
return err
}
buf := make([]byte, path.EgHeaderLen+len(body))
header, _ := path.NewEgHeader(buf[:path.EgHeaderLen], device.EdgeConfig.Interface.MTU)
2021-08-23 18:39:04 +02:00
header.SetSrc(device.ID)
2021-12-13 05:20:58 +01:00
header.SetDst(mtypes.NodeID_Spread)
2021-08-23 18:39:04 +02:00
copy(buf[path.EgHeaderLen:], body)
device.SendPacket(peer, path.QueryPeer, device.EdgeConfig.DefaultTTL, buf, MessageTransportOffsetContent)
2021-08-23 18:39:04 +02:00
}
2021-08-20 19:32:50 +02:00
}
return nil
}
2021-12-04 03:32:59 +01:00
func (device *Device) process_UpdatePeerMsg(peer *Peer, State_hash string) error {
2021-08-20 19:32:50 +02:00
var send_signal bool
2021-12-03 23:46:58 +01:00
if device.EdgeConfig.DynamicRoute.SuperNode.UseSuperNode {
2021-12-04 03:32:59 +01:00
if device.state_hashes.Peer.Load().(string) == State_hash {
if device.LogLevel.LogControl {
2021-12-04 03:32:59 +01:00
fmt.Println("Control: Same Hash, skip download PeerInfo")
2021-10-27 03:02:44 +02:00
}
return nil
}
2021-12-02 18:13:48 +01:00
var peer_infos mtypes.API_Peers
2021-12-04 03:32:59 +01:00
//
2021-08-24 20:16:21 +02:00
client := http.Client{
2021-12-04 03:32:59 +01:00
Timeout: 8 * time.Second,
2021-08-24 20:16:21 +02:00
}
2021-12-04 15:46:36 +01:00
downloadurl := device.EdgeConfig.DynamicRoute.SuperNode.EndpointEdgeAPIUrl + "/edge/peerinfo" ////////////////////////////////////////////////////////////////////////////////////////////////
2021-12-04 03:32:59 +01:00
req, err := http.NewRequest("GET", downloadurl, nil)
2021-12-09 08:46:15 +01:00
if err != nil {
device.log.Errorf(err.Error())
return err
}
2021-12-04 03:32:59 +01:00
q := req.URL.Query()
q.Add("NodeID", device.ID.ToString())
q.Add("PubKey", device.staticIdentity.publicKey.ToString())
q.Add("State", State_hash)
req.URL.RawQuery = q.Encode()
if device.LogLevel.LogControl {
fmt.Println("Control: Download PeerInfo from :" + req.URL.RequestURI())
}
resp, err := client.Do(req)
2021-08-20 19:32:50 +02:00
if err != nil {
2021-08-24 20:16:21 +02:00
device.log.Errorf(err.Error())
2021-08-20 19:32:50 +02:00
return err
}
defer resp.Body.Close()
allbytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
2021-08-24 20:16:21 +02:00
device.log.Errorf(err.Error())
2021-08-20 19:32:50 +02:00
return err
}
2021-09-26 14:59:57 +02:00
if resp.StatusCode != 200 {
2021-12-04 03:32:59 +01:00
device.log.Errorf("Control: Download peerinfo failed: " + strconv.Itoa(resp.StatusCode) + " " + string(allbytes))
2021-09-26 14:59:57 +02:00
return nil
}
2021-08-25 10:13:53 +02:00
if device.LogLevel.LogControl {
2021-08-25 13:54:13 +02:00
fmt.Println("Control: Download peerinfo result :" + string(allbytes))
2021-08-25 10:13:53 +02:00
}
2021-08-20 19:32:50 +02:00
if err := json.Unmarshal(allbytes, &peer_infos); err != nil {
2021-09-26 14:59:57 +02:00
device.log.Errorf("JSON decode error:", err.Error())
2021-08-20 19:32:50 +02:00
return err
}
2021-09-21 03:15:23 +02:00
for nodeID, thepeer := range device.peers.IDMap {
pk := thepeer.handshake.remoteStatic
psk := thepeer.handshake.presharedKey
if val, ok := peer_infos[pk.ToString()]; ok {
if val.NodeID != nodeID {
device.RemovePeer(pk)
continue
} else if val.PSKey != psk.ToString() {
device.RemovePeer(pk)
continue
}
} else {
device.RemovePeer(pk)
continue
}
}
2021-09-21 22:03:11 +02:00
for PubKey, peerinfo := range peer_infos {
sk, err := Str2PubKey(PubKey)
2021-09-21 03:15:23 +02:00
if err != nil {
device.log.Errorf("Error decode base64:", err)
continue
2021-09-21 03:15:23 +02:00
}
2021-08-20 19:32:50 +02:00
if bytes.Equal(sk[:], device.staticIdentity.publicKey[:]) {
continue
}
thepeer := device.LookupPeer(sk)
if thepeer == nil { //not exist in local
2021-12-02 18:13:48 +01:00
if len(peerinfo.Connurl.ExternalV4)+len(peerinfo.Connurl.ExternalV6)+len(peerinfo.Connurl.LocalV4)+len(peerinfo.Connurl.LocalV6) == 0 {
2021-10-27 03:02:44 +02:00
continue
}
if device.LogLevel.LogControl {
2021-09-21 22:03:11 +02:00
fmt.Println("Control: Add new peer to local ID:" + peerinfo.NodeID.ToString() + " PubKey:" + PubKey)
}
2021-12-09 08:46:15 +01:00
if device.graph.Weight(device.ID, peerinfo.NodeID, false) == mtypes.Infinity { // add node to graph
device.graph.UpdateLatency(device.ID, peerinfo.NodeID, mtypes.Infinity, 0, device.EdgeConfig.DynamicRoute.AdditionalCost, true, false)
2021-08-20 19:32:50 +02:00
}
2021-12-09 08:46:15 +01:00
if device.graph.Weight(peerinfo.NodeID, device.ID, false) == mtypes.Infinity { // add node to graph
device.graph.UpdateLatency(peerinfo.NodeID, device.ID, mtypes.Infinity, 0, device.EdgeConfig.DynamicRoute.AdditionalCost, true, false)
2021-08-20 19:32:50 +02:00
}
thepeer, err = device.NewPeer(sk, peerinfo.NodeID, false, 0)
if err != nil {
device.log.Errorf("Failed to create peer with ID:%v PunKey:%v :%v", peerinfo.NodeID.ToString(), PubKey, err)
continue
}
}
if peerinfo.PSKey != "" {
pk, err := Str2PSKey(peerinfo.PSKey)
if err != nil {
device.log.Errorf("Error decode base64:", err)
continue
2021-08-20 19:32:50 +02:00
}
thepeer.SetPSK(pk)
2021-08-20 19:32:50 +02:00
}
2021-12-17 08:02:42 +01:00
thepeer.endpoint_trylist.UpdateSuper(*peerinfo.Connurl, !device.EdgeConfig.DynamicRoute.SuperNode.SkipLocalIP, device.EdgeConfig.AfPrefer)
2021-10-27 03:02:44 +02:00
if !thepeer.IsPeerAlive() {
2021-08-20 19:32:50 +02:00
//Peer died, try to switch to this new endpoint
2021-10-27 03:02:44 +02:00
send_signal = true
2021-08-20 19:32:50 +02:00
}
}
2021-12-04 03:32:59 +01:00
device.state_hashes.Peer.Store(State_hash)
2021-08-20 19:32:50 +02:00
if send_signal {
device.event_tryendpoint <- struct{}{}
}
}
return nil
}
2021-12-04 03:32:59 +01:00
func (device *Device) process_UpdateNhTableMsg(peer *Peer, State_hash string) error {
2021-12-03 23:46:58 +01:00
if device.EdgeConfig.DynamicRoute.SuperNode.UseSuperNode {
2021-12-04 03:32:59 +01:00
if device.state_hashes.NhTable.Load().(string) == State_hash {
if device.LogLevel.LogControl {
2021-12-04 03:32:59 +01:00
fmt.Println("Control: Same Hash, skip download nhTable")
}
2021-12-04 03:32:59 +01:00
device.graph.NhTableExpire = time.Now().Add(device.graph.SuperNodeInfoTimeout)
return nil
}
var NhTable mtypes.NextHopTable
// Download from supernode
client := &http.Client{
Timeout: 8 * time.Second,
}
2021-12-04 15:46:36 +01:00
downloadurl := device.EdgeConfig.DynamicRoute.SuperNode.EndpointEdgeAPIUrl + "/edge/nhtable" ////////////////////////////////////////////////////////////////////////////////////////////////
2021-12-04 03:32:59 +01:00
req, err := http.NewRequest("GET", downloadurl, nil)
2021-12-09 08:46:15 +01:00
if err != nil {
device.log.Errorf(err.Error())
return err
}
2021-12-04 03:32:59 +01:00
q := req.URL.Query()
q.Add("NodeID", device.ID.ToString())
q.Add("PubKey", device.staticIdentity.publicKey.ToString())
q.Add("State", State_hash)
req.URL.RawQuery = q.Encode()
if device.LogLevel.LogControl {
fmt.Println("Control: Download NhTable from :" + req.URL.RequestURI())
}
resp, err := client.Do(req)
if err != nil {
device.log.Errorf(err.Error())
return err
}
defer resp.Body.Close()
allbytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
device.log.Errorf(err.Error())
return err
}
if resp.StatusCode != 200 {
device.log.Errorf("Control: Download NhTable failed: " + strconv.Itoa(resp.StatusCode) + " " + string(allbytes))
return nil
}
2021-12-04 03:32:59 +01:00
if device.LogLevel.LogControl {
fmt.Println("Control: Download NhTable result :" + string(allbytes))
}
if err := json.Unmarshal(allbytes, &NhTable); err != nil {
device.log.Errorf("JSON decode error:", err.Error())
return err
}
device.graph.SetNHTable(NhTable)
device.state_hashes.NhTable.Store(State_hash)
}
return nil
}
func (device *Device) process_UpdateSuperParamsMsg(peer *Peer, State_hash string) error {
if device.EdgeConfig.DynamicRoute.SuperNode.UseSuperNode {
if device.state_hashes.SuperParam.Load().(string) == State_hash {
if device.LogLevel.LogControl {
2021-12-04 03:32:59 +01:00
fmt.Println("Control: Same Hash, skip download SuperParams")
}
device.graph.NhTableExpire = time.Now().Add(device.graph.SuperNodeInfoTimeout)
return nil
}
2021-12-04 03:32:59 +01:00
var SuperParams mtypes.API_SuperParams
client := &http.Client{
Timeout: 8 * time.Second,
}
2021-12-04 15:46:36 +01:00
downloadurl := device.EdgeConfig.DynamicRoute.SuperNode.EndpointEdgeAPIUrl + "/edge/superparams" ////////////////////////////////////////////////////////////////////////////////////////////////
2021-12-04 03:32:59 +01:00
req, err := http.NewRequest("GET", downloadurl, nil)
2021-12-09 08:46:15 +01:00
if err != nil {
device.log.Errorf(err.Error())
return err
}
2021-12-04 03:32:59 +01:00
q := req.URL.Query()
q.Add("NodeID", device.ID.ToString())
q.Add("PubKey", device.staticIdentity.publicKey.ToString())
q.Add("State", State_hash)
req.URL.RawQuery = q.Encode()
if device.LogLevel.LogControl {
2021-12-04 03:32:59 +01:00
fmt.Println("Control: Download SuperParams from :" + req.URL.RequestURI())
}
2021-12-04 03:32:59 +01:00
resp, err := client.Do(req)
if err != nil {
device.log.Errorf(err.Error())
return err
}
defer resp.Body.Close()
allbytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
device.log.Errorf(err.Error())
return err
}
2021-09-26 14:59:57 +02:00
if resp.StatusCode != 200 {
2021-12-04 03:32:59 +01:00
device.log.Errorf("Control: Download SuperParams failed: " + strconv.Itoa(resp.StatusCode) + " " + string(allbytes))
2021-09-26 14:59:57 +02:00
return nil
}
if device.LogLevel.LogControl {
2021-12-04 03:32:59 +01:00
fmt.Println("Control: Download SuperParams result :" + string(allbytes))
}
2021-12-04 03:32:59 +01:00
if err := json.Unmarshal(allbytes, &SuperParams); err != nil {
2021-09-26 14:59:57 +02:00
device.log.Errorf("JSON decode error:", err.Error())
return err
}
2021-12-04 03:32:59 +01:00
if SuperParams.PeerAliveTimeout <= 0 {
2021-12-04 04:51:50 +01:00
device.log.Errorf("SuperParams.PeerAliveTimeout <= 0: %v, please check the config of the supernode", SuperParams.PeerAliveTimeout)
return fmt.Errorf("SuperParams.PeerAliveTimeout <= 0: %v, please check the config of the supernode", SuperParams.PeerAliveTimeout)
2021-12-04 03:32:59 +01:00
}
if SuperParams.SendPingInterval <= 0 {
2021-12-04 04:51:50 +01:00
device.log.Errorf("SuperParams.SendPingInterval <= 0: %v, please check the config of the supernode", SuperParams.SendPingInterval)
return fmt.Errorf("SuperParams.SendPingInterval <= 0: %v, please check the config of the supernode", SuperParams.SendPingInterval)
2021-12-04 03:32:59 +01:00
}
2021-12-04 04:51:50 +01:00
if SuperParams.HttpPostInterval < 0 {
device.log.Errorf("SuperParams.HttpPostInterval < 0: %v, please check the config of the supernode", SuperParams.HttpPostInterval)
return fmt.Errorf("SuperParams.HttpPostInterval < 0: %v, please check the config of the supernode", SuperParams.HttpPostInterval)
2021-12-04 03:32:59 +01:00
}
device.EdgeConfig.DynamicRoute.PeerAliveTimeout = SuperParams.PeerAliveTimeout
2021-12-04 04:51:50 +01:00
device.EdgeConfig.DynamicRoute.SendPingInterval = SuperParams.SendPingInterval
device.SuperConfig.HttpPostInterval = SuperParams.HttpPostInterval
device.SuperConfig.DampingFilterRadius = SuperParams.DampingFilterRadius
2021-12-04 04:51:50 +01:00
device.Chan_SendPingStart <- struct{}{}
device.Chan_HttpPostStart <- struct{}{}
2021-12-04 03:32:59 +01:00
if SuperParams.AdditionalCost >= 0 {
device.EdgeConfig.DynamicRoute.AdditionalCost = SuperParams.AdditionalCost
}
device.state_hashes.SuperParam.Store(State_hash)
}
return nil
}
2021-12-04 03:32:59 +01:00
func (device *Device) process_ServerUpdateMsg(peer *Peer, content mtypes.ServerUpdateMsg) error {
2021-12-09 08:46:15 +01:00
if peer.ID != mtypes.NodeID_SuperNode {
if device.LogLevel.LogControl {
fmt.Println("Control: Ignored UpdateErrorMsg. Not from supernode.")
}
return nil
}
2021-12-04 03:32:59 +01:00
switch content.Action {
case mtypes.Shutdown:
2021-12-05 22:36:50 +01:00
device.log.Errorf("Shutdown: " + content.Params)
2021-12-02 18:13:48 +01:00
device.closed <- 0
2021-12-04 03:32:59 +01:00
case mtypes.ThrowError:
device.log.Errorf(strconv.Itoa(int(content.Code)) + ": " + content.Params)
device.closed <- content.Code
case mtypes.Panic:
device.log.Errorf(strconv.Itoa(int(content.Code)) + ": " + content.Params)
panic(content.ToString())
2021-12-04 03:32:59 +01:00
case mtypes.UpdateNhTable:
return device.process_UpdateNhTableMsg(peer, content.Params)
case mtypes.UpdatePeer:
return device.process_UpdatePeerMsg(peer, content.Params)
case mtypes.UpdateSuperParams:
return device.process_UpdateSuperParamsMsg(peer, content.Params)
default:
device.log.Errorf("Unknown Action: %v", content.ToString())
}
return nil
}
2021-12-02 18:13:48 +01:00
func (device *Device) process_RequestPeerMsg(content mtypes.QueryPeerMsg) error { //Send all my peers to all my peers
2021-12-03 23:46:58 +01:00
if device.EdgeConfig.DynamicRoute.P2P.UseP2P {
2021-10-01 10:56:42 +02:00
device.peers.RLock()
for pubkey, peer := range device.peers.keyMap {
2021-12-09 08:46:15 +01:00
if peer.ID >= mtypes.NodeID_Special {
2021-10-01 10:56:42 +02:00
continue
}
if peer.endpoint == nil {
2021-10-27 03:02:44 +02:00
// I don't have the infomation of this peer, skip
continue
}
if !peer.IsPeerAlive() {
// peer died, skip
2021-10-01 10:56:42 +02:00
continue
}
2021-10-27 03:02:44 +02:00
2021-10-01 10:56:42 +02:00
peer.handshake.mutex.RLock()
2021-12-02 18:13:48 +01:00
response := mtypes.BoardcastPeerMsg{
2021-10-01 10:56:42 +02:00
Request_ID: content.Request_ID,
NodeID: peer.ID,
PubKey: pubkey,
ConnURL: peer.endpoint.DstToString(),
}
peer.handshake.mutex.RUnlock()
2021-12-02 18:13:48 +01:00
body, err := mtypes.GetByte(response)
2021-10-01 10:56:42 +02:00
if err != nil {
device.log.Errorf("Error at receivesendproc.go line221: ", err)
continue
}
buf := make([]byte, path.EgHeaderLen+len(body))
header, _ := path.NewEgHeader(buf[0:path.EgHeaderLen], device.EdgeConfig.Interface.MTU)
2021-12-12 09:29:05 +01:00
header.SetDst(mtypes.NodeID_Spread)
2021-10-01 10:56:42 +02:00
header.SetSrc(device.ID)
copy(buf[path.EgHeaderLen:], body)
device.SpreadPacket(make(map[mtypes.Vertex]bool), path.BroadcastPeer, device.EdgeConfig.DefaultTTL, buf, MessageTransportOffsetContent)
2021-10-01 10:56:42 +02:00
}
device.peers.RUnlock()
}
return nil
}
2021-12-09 23:39:37 +01:00
func (device *Device) process_BoardcastPeerMsg(peer *Peer, content mtypes.BoardcastPeerMsg) (err error) {
2021-12-03 23:46:58 +01:00
if device.EdgeConfig.DynamicRoute.P2P.UseP2P {
2021-10-01 10:56:42 +02:00
var pk NoisePublicKey
if content.Request_ID == uint32(device.ID) {
peer.AskedForNeighbor = true
}
if bytes.Equal(content.PubKey[:], device.staticIdentity.publicKey[:]) {
return nil
}
copy(pk[:], content.PubKey[:])
thepeer := device.LookupPeer(pk)
if thepeer == nil { //not exist in local
if device.LogLevel.LogControl {
fmt.Println("Control: Add new peer to local ID:" + content.NodeID.ToString() + " PubKey:" + pk.ToString())
}
2021-12-09 08:46:15 +01:00
if device.graph.Weight(device.ID, content.NodeID, false) == mtypes.Infinity { // add node to graph
device.graph.UpdateLatency(device.ID, content.NodeID, mtypes.Infinity, 0, device.EdgeConfig.DynamicRoute.AdditionalCost, true, false)
2021-10-01 10:56:42 +02:00
}
2021-12-09 08:46:15 +01:00
if device.graph.Weight(content.NodeID, device.ID, false) == mtypes.Infinity { // add node to graph
device.graph.UpdateLatency(content.NodeID, device.ID, mtypes.Infinity, 0, device.EdgeConfig.DynamicRoute.AdditionalCost, true, false)
2021-10-01 10:56:42 +02:00
}
2021-12-09 23:39:37 +01:00
thepeer, err = device.NewPeer(pk, content.NodeID, false, 0)
if err != nil {
return err
}
2021-10-01 10:56:42 +02:00
}
2021-10-27 03:02:44 +02:00
if !thepeer.IsPeerAlive() {
2021-10-01 10:56:42 +02:00
//Peer died, try to switch to this new endpoint
2021-10-27 03:02:44 +02:00
thepeer.endpoint_trylist.UpdateP2P(content.ConnURL) //another gorouting will process it
2021-10-01 10:56:42 +02:00
device.event_tryendpoint <- struct{}{}
}
}
return nil
}
2021-12-16 03:56:02 +01:00
func (device *Device) RoutineTryReceivedEndpoint() {
2021-12-03 23:46:58 +01:00
if !(device.EdgeConfig.DynamicRoute.P2P.UseP2P || device.EdgeConfig.DynamicRoute.SuperNode.UseSuperNode) {
2021-08-20 19:32:50 +02:00
return
}
2021-12-03 23:46:58 +01:00
timeout := mtypes.S2TD(device.EdgeConfig.DynamicRoute.ConnNextTry)
2021-08-20 19:32:50 +02:00
for {
NextRun := false
<-device.event_tryendpoint
for _, thepeer := range device.peers.IDMap {
2021-12-03 23:46:58 +01:00
if thepeer.LastPacketReceivedAdd1Sec.Load().(*time.Time).Add(mtypes.S2TD(device.EdgeConfig.DynamicRoute.PeerAliveTimeout)).After(time.Now()) {
2021-08-20 19:32:50 +02:00
//Peer alives
2021-09-26 14:59:57 +02:00
continue
2021-08-20 19:32:50 +02:00
} else {
2021-10-27 03:41:13 +02:00
FastTry, connurl := thepeer.endpoint_trylist.GetNextTry()
2021-10-27 03:02:44 +02:00
if connurl == "" {
continue
}
2021-12-17 08:02:42 +01:00
err := thepeer.SetEndpointFromConnURL(connurl, thepeer.ConnAF, device.EdgeConfig.AfPrefer, thepeer.StaticConn) //trying to bind first url in the list and wait ConnNextTry seconds
2021-10-27 03:02:44 +02:00
if err != nil {
device.log.Errorf("Bind " + connurl + " failed!")
thepeer.endpoint_trylist.Delete(connurl)
continue
2021-09-21 03:15:23 +02:00
}
2021-10-27 03:41:13 +02:00
if FastTry {
NextRun = true
if device.LogLevel.LogControl {
fmt.Printf("Control: First try for peer %v at endpoint %v, sending hole-punching ping\n", thepeer.ID.ToString(), connurl)
}
2021-12-03 23:46:58 +01:00
go device.SendPing(thepeer, int(device.EdgeConfig.DynamicRoute.ConnNextTry+1), 1, 1)
2021-10-27 03:41:13 +02:00
}
2021-10-27 03:02:44 +02:00
2021-08-20 19:32:50 +02:00
}
}
ClearChanLoop:
for {
select {
case <-device.event_tryendpoint:
default:
break ClearChanLoop
}
}
2021-12-03 23:46:58 +01:00
time.Sleep(timeout)
2021-10-27 03:02:44 +02:00
if device.LogLevel.LogInternal {
fmt.Printf("Internal: RoutineSetEndpoint: NextRun:%v\n", NextRun)
}
2021-08-20 19:32:50 +02:00
if NextRun {
device.event_tryendpoint <- struct{}{}
}
}
}
2021-10-27 03:02:44 +02:00
func (device *Device) RoutineDetectOfflineAndTryNextEndpoint() {
2021-12-03 23:46:58 +01:00
if !(device.EdgeConfig.DynamicRoute.P2P.UseP2P || device.EdgeConfig.DynamicRoute.SuperNode.UseSuperNode) {
2021-10-27 03:02:44 +02:00
return
}
2021-12-09 23:39:37 +01:00
if device.EdgeConfig.DynamicRoute.TimeoutCheckInterval == 0 {
2021-10-27 03:02:44 +02:00
return
}
2021-12-09 23:39:37 +01:00
timeout := mtypes.S2TD(device.EdgeConfig.DynamicRoute.TimeoutCheckInterval)
2021-10-27 03:02:44 +02:00
for {
device.event_tryendpoint <- struct{}{}
2021-12-03 23:46:58 +01:00
time.Sleep(timeout)
2021-10-27 03:02:44 +02:00
}
}
2021-12-13 05:20:58 +01:00
func (device *Device) RoutineSendPing(startchan chan struct{}) {
2021-12-03 23:46:58 +01:00
if !(device.EdgeConfig.DynamicRoute.P2P.UseP2P || device.EdgeConfig.DynamicRoute.SuperNode.UseSuperNode) {
2021-08-20 19:32:50 +02:00
return
}
2021-12-09 08:46:15 +01:00
var waitchan <-chan time.Time
2021-12-13 05:20:58 +01:00
startchan <- struct{}{}
2021-08-20 19:32:50 +02:00
for {
2021-12-04 03:32:59 +01:00
if device.EdgeConfig.DynamicRoute.SendPingInterval > 0 {
waitchan = time.After(mtypes.S2TD(device.EdgeConfig.DynamicRoute.SendPingInterval))
} else {
waitchan = make(<-chan time.Time)
}
select {
case <-startchan:
if device.LogLevel.LogControl {
fmt.Println("Control: Start RoutineSendPing()")
}
for len(startchan) > 0 {
<-startchan
}
case <-waitchan:
}
packet, usage, ttl, _ := device.GeneratePingPacket(device.ID, 0)
device.SpreadPacket(make(map[mtypes.Vertex]bool), usage, ttl, packet, MessageTransportOffsetContent)
2021-08-20 19:32:50 +02:00
}
}
2021-12-05 22:36:50 +01:00
func (device *Device) RoutineRegister(startchan chan struct{}) {
2021-12-03 23:46:58 +01:00
if !(device.EdgeConfig.DynamicRoute.SuperNode.UseSuperNode) {
2021-08-20 19:32:50 +02:00
return
}
2021-12-09 08:46:15 +01:00
var waitchan <-chan time.Time
2021-12-05 22:36:50 +01:00
startchan <- struct{}{}
2021-08-20 19:32:50 +02:00
for {
2021-12-05 22:36:50 +01:00
if device.EdgeConfig.DynamicRoute.SendPingInterval > 0 {
waitchan = time.After(mtypes.S2TD(device.EdgeConfig.DynamicRoute.SendPingInterval))
} else {
waitchan = time.After(8 * time.Second)
}
select {
case <-startchan:
if device.LogLevel.LogControl {
fmt.Println("Control: Start RoutineRegister()")
}
for len(startchan) > 0 {
<-startchan
}
case <-waitchan:
}
2021-12-04 03:32:59 +01:00
local_PeerStateHash := device.state_hashes.Peer.Load().(string)
local_NhTableHash := device.state_hashes.NhTable.Load().(string)
local_SuperParamState := device.state_hashes.SuperParam.Load().(string)
2021-12-02 18:13:48 +01:00
body, _ := mtypes.GetByte(mtypes.RegisterMsg{
2021-12-04 03:32:59 +01:00
Node_id: device.ID,
PeerStateHash: local_PeerStateHash,
NhStateHash: local_NhTableHash,
SuperParamStateHash: local_SuperParamState,
Version: device.Version,
JWTSecret: device.JWTSecret,
HttpPostCount: device.HttpPostCount,
2021-08-20 19:32:50 +02:00
})
buf := make([]byte, path.EgHeaderLen+len(body))
header, _ := path.NewEgHeader(buf[0:path.EgHeaderLen], device.EdgeConfig.Interface.MTU)
2021-12-09 08:46:15 +01:00
header.SetDst(mtypes.NodeID_SuperNode)
2021-08-20 19:32:50 +02:00
header.SetSrc(device.ID)
copy(buf[path.EgHeaderLen:], body)
device.Send2Super(path.Register, 0, buf, MessageTransportOffsetContent)
2021-08-20 19:32:50 +02:00
}
}
2021-12-04 03:32:59 +01:00
func (device *Device) RoutinePostPeerInfo(startchan <-chan struct{}) {
2021-12-03 23:46:58 +01:00
if !(device.EdgeConfig.DynamicRoute.SuperNode.UseSuperNode) {
2021-12-02 18:13:48 +01:00
return
}
2021-12-09 08:46:15 +01:00
var waitchan <-chan time.Time
2021-12-02 18:13:48 +01:00
for {
2021-12-04 03:32:59 +01:00
if device.SuperConfig.HttpPostInterval > 0 {
waitchan = time.After(mtypes.S2TD(device.SuperConfig.HttpPostInterval))
} else {
waitchan = make(<-chan time.Time)
}
select {
case <-waitchan:
case <-startchan:
if device.LogLevel.LogControl {
fmt.Println("Control: Start RoutinePostPeerInfo()")
}
for len(startchan) > 0 {
<-startchan
}
}
2021-12-02 18:13:48 +01:00
// Stat all latency
device.peers.RLock()
pongs := make([]mtypes.PongMsg, 0, len(device.peers.IDMap))
for id, peer := range device.peers.IDMap {
device.peers.RUnlock()
if peer.IsPeerAlive() {
pong := mtypes.PongMsg{
RequestID: 0,
2021-12-24 20:28:01 +01:00
Src_nodeID: id,
Dst_nodeID: device.ID,
Timediff: peer.SingleWayLatency.GetVal(),
2021-12-09 08:46:15 +01:00
TimeToAlive: time.Since(*peer.LastPacketReceivedAdd1Sec.Load().(*time.Time)).Seconds() + device.EdgeConfig.DynamicRoute.PeerAliveTimeout,
2021-12-02 18:13:48 +01:00
}
pongs = append(pongs, pong)
if device.LogLevel.LogControl {
2021-12-12 09:29:05 +01:00
fmt.Printf("Control: Pack %v S:%v D:%v To:Post body\n", pong.ToString(), pong.Src_nodeID.ToString(), pong.Dst_nodeID.ToString())
2021-12-02 18:13:48 +01:00
}
}
device.peers.RLock()
}
device.peers.RUnlock()
// Prepare post paramater and post body
LocalV4s := make(map[string]float64)
LocalV6s := make(map[string]float64)
2021-12-03 23:46:58 +01:00
if !device.EdgeConfig.DynamicRoute.SuperNode.SkipLocalIP {
if !device.peers.LocalV4.Equal(net.IP{}) {
LocalV4 := net.UDPAddr{
IP: device.peers.LocalV4,
Port: int(device.net.port),
}
2021-12-02 18:13:48 +01:00
2021-12-03 23:46:58 +01:00
LocalV4s[LocalV4.String()] = 100
}
if !device.peers.LocalV6.Equal(net.IP{}) {
LocalV6 := net.UDPAddr{
IP: device.peers.LocalV6,
Port: int(device.net.port),
}
2021-12-15 08:55:28 +01:00
LocalV6s[LocalV6.String()] = 100
}
}
for _, AIP := range device.EdgeConfig.DynamicRoute.SuperNode.AdditionalLocalIP {
success := false
2021-12-17 08:02:42 +01:00
_, ipstr, err := conn.LookupIP(AIP, 4, 0)
2021-12-15 08:55:28 +01:00
if err == nil {
success = true
LocalV4s[ipstr] = 50
}
2021-12-17 08:02:42 +01:00
_, ipstr, err = conn.LookupIP(AIP, 6, 0)
2021-12-15 08:55:28 +01:00
if err == nil {
success = true
LocalV6s[ipstr] = 50
}
if !success {
device.log.Errorf("AdditionalLocalIP: Failed to LookupIP %v", AIP)
2021-12-02 18:13:48 +01:00
}
}
body, _ := mtypes.GetByte(mtypes.API_report_peerinfo{
Pongs: pongs,
LocalV4s: LocalV4s,
LocalV6s: LocalV6s,
})
body = mtypes.Gzip(body)
bodyhash := base64.StdEncoding.EncodeToString(body)
token := jwt.NewWithClaims(jwt.SigningMethodHS256, mtypes.API_report_peerinfo_jwt_claims{
PostCount: device.HttpPostCount,
BodyHash: bodyhash,
})
2021-12-09 08:46:15 +01:00
tokenString, _ := token.SignedString(device.JWTSecret[:])
2021-12-02 18:13:48 +01:00
// Construct post request
2021-12-04 03:32:59 +01:00
client := &http.Client{
Timeout: 8 * time.Second,
}
2021-12-04 15:46:36 +01:00
downloadurl := device.EdgeConfig.DynamicRoute.SuperNode.EndpointEdgeAPIUrl + "/edge/post/nodeinfo"
2021-12-02 18:13:48 +01:00
req, err := http.NewRequest("POST", downloadurl, bytes.NewReader(body))
2021-12-09 08:46:15 +01:00
if err != nil {
device.log.Errorf(err.Error())
continue
}
2021-12-02 18:13:48 +01:00
q := req.URL.Query()
q.Add("NodeID", device.ID.ToString())
2021-12-04 03:32:59 +01:00
q.Add("PubKey", device.staticIdentity.publicKey.ToString())
2021-12-02 18:13:48 +01:00
q.Add("JWTSig", tokenString)
req.URL.RawQuery = q.Encode()
2021-12-04 03:32:59 +01:00
req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("Content-Encoding", "gzip")
2021-12-02 18:13:48 +01:00
device.HttpPostCount += 1
if device.LogLevel.LogControl {
2021-12-12 09:29:05 +01:00
fmt.Printf("Control: Post to %v\n", downloadurl)
2021-12-02 18:13:48 +01:00
}
resp, err := client.Do(req)
if err != nil {
device.log.Errorf("RoutinePostPeerInfo: " + err.Error())
} else {
if device.LogLevel.LogControl {
2021-12-12 09:29:05 +01:00
res, err := ioutil.ReadAll(resp.Body)
if err == nil {
fmt.Printf("Control: Post result %v\n", string(res))
} else {
fmt.Printf("Control: Post error %v %v\n", err, string(res))
}
2021-12-02 18:13:48 +01:00
}
resp.Body.Close()
}
}
}
2021-08-20 19:32:50 +02:00
func (device *Device) RoutineRecalculateNhTable() {
2021-10-12 10:05:23 +02:00
if device.graph.TimeoutCheckInterval == 0 {
return
}
2021-12-04 03:32:59 +01:00
2021-12-03 23:46:58 +01:00
if !device.EdgeConfig.DynamicRoute.P2P.UseP2P {
2021-10-27 03:02:44 +02:00
return
}
for {
if time.Now().After(device.graph.NhTableExpire) {
2021-12-09 08:46:15 +01:00
device.graph.RecalculateNhTable(false)
2021-08-20 19:32:50 +02:00
}
2021-10-27 03:02:44 +02:00
time.Sleep(device.graph.TimeoutCheckInterval)
2021-08-20 19:32:50 +02:00
}
2021-10-27 03:02:44 +02:00
2021-08-23 18:39:04 +02:00
}
2021-08-20 19:32:50 +02:00
2021-08-23 18:39:04 +02:00
func (device *Device) RoutineSpreadAllMyNeighbor() {
2021-12-03 23:46:58 +01:00
if !device.EdgeConfig.DynamicRoute.P2P.UseP2P {
2021-08-23 18:39:04 +02:00
return
}
2021-12-03 23:46:58 +01:00
timeout := mtypes.S2TD(device.EdgeConfig.DynamicRoute.P2P.SendPeerInterval)
2021-08-23 18:39:04 +02:00
for {
2021-12-02 18:13:48 +01:00
device.process_RequestPeerMsg(mtypes.QueryPeerMsg{
2021-12-09 21:23:02 +01:00
Request_ID: uint32(mtypes.NodeID_Broadcast),
2021-08-23 18:39:04 +02:00
})
2021-12-03 23:46:58 +01:00
time.Sleep(timeout)
2021-08-23 18:39:04 +02:00
}
2021-08-20 19:32:50 +02:00
}
2021-12-16 03:56:02 +01:00
func (device *Device) RoutineResetEndpoint() {
var ResetEndPointInterval float64
if device.IsSuperNode {
ResetEndPointInterval = device.SuperConfig.ResetEndPointInterval
} else {
ResetEndPointInterval = device.EdgeConfig.ResetEndPointInterval
}
if ResetEndPointInterval <= 0.01 {
2021-08-24 10:43:55 +02:00
return
}
2021-12-16 03:56:02 +01:00
timeout := mtypes.S2TD(ResetEndPointInterval)
2021-08-24 10:43:55 +02:00
for {
for _, peer := range device.peers.keyMap {
if !peer.StaticConn { //Do not reset connecton for dynamic peer
2021-08-24 10:43:55 +02:00
continue
}
if peer.ConnURL == "" {
continue
}
2021-12-16 03:56:02 +01:00
if peer.IsPeerAlive() {
continue
}
2021-12-17 08:02:42 +01:00
err := peer.SetEndpointFromConnURL(peer.ConnURL, peer.ConnAF, device.EdgeConfig.AfPrefer, peer.StaticConn)
2021-08-24 10:43:55 +02:00
if err != nil {
device.log.Errorf("Failed to bind "+peer.ConnURL, err)
continue
}
}
2021-12-03 23:46:58 +01:00
time.Sleep(timeout)
2021-08-24 10:43:55 +02:00
}
}
2021-10-01 10:56:42 +02:00
func (device *Device) RoutineClearL2FIB() {
2021-12-03 23:46:58 +01:00
if device.EdgeConfig.L2FIBTimeout <= 0.01 {
2021-10-01 10:56:42 +02:00
return
2021-08-20 19:32:50 +02:00
}
2021-12-03 23:46:58 +01:00
timeout := mtypes.S2TD(device.EdgeConfig.L2FIBTimeout)
2021-10-01 10:56:42 +02:00
for {
device.l2fib.Range(func(k interface{}, v interface{}) bool {
val := v.(*IdAndTime)
if time.Now().After(val.Time.Add(timeout)) {
mac := k.(tap.MacAddress)
device.l2fib.Delete(k)
2021-10-09 12:22:27 +02:00
if device.LogLevel.LogInternal {
fmt.Printf("Internal: L2FIB [%v -> %v] deleted.\n", mac.String(), val.ID)
2021-10-01 10:56:42 +02:00
}
2021-08-20 19:32:50 +02:00
}
2021-10-01 10:56:42 +02:00
return true
})
time.Sleep(timeout)
2021-08-20 19:32:50 +02:00
}
}