2021-08-20 19:32:50 +02:00
package device
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"hash/crc32"
"io/ioutil"
"net/http"
2021-08-21 16:54:24 +02:00
"net/url"
"strconv"
2021-08-20 19:32:50 +02:00
"time"
"github.com/KusakabeSi/EtherGuardVPN/config"
"github.com/KusakabeSi/EtherGuardVPN/path"
)
2021-09-21 03:15:23 +02:00
func ( device * Device ) SendPacket ( peer * Peer , usage path . Usage , packet [ ] byte , offset int ) {
2021-08-20 19:32:50 +02:00
if peer == nil {
return
2021-09-20 18:27:53 +02:00
} else if peer . endpoint == nil {
return
2021-08-20 19:32:50 +02:00
}
2021-08-25 10:13:53 +02:00
if device . LogLevel . LogNormal {
EgHeader , _ := path . NewEgHeader ( packet [ : path . EgHeaderLen ] )
2021-09-21 03:15:23 +02:00
if usage == path . NornalPacket {
2021-08-25 10:13:53 +02:00
dst_nodeID := EgHeader . GetDst ( )
2021-08-25 13:54:13 +02:00
fmt . Println ( "Normal: Send Normal packet To:" + peer . GetEndpointDstStr ( ) + " SrcID:" + device . ID . ToString ( ) + " DstID:" + dst_nodeID . ToString ( ) + " Len:" + strconv . Itoa ( len ( packet ) ) )
2021-08-25 10:13:53 +02:00
}
}
if device . LogLevel . LogControl {
2021-09-21 03:15:23 +02:00
if usage != path . NornalPacket {
2021-08-21 16:54:24 +02:00
if peer . GetEndpointDstStr ( ) != "" {
2021-09-21 03:15:23 +02:00
fmt . Println ( "Control: Send To:" + peer . GetEndpointDstStr ( ) + " " + device . sprint_received ( usage , packet [ path . EgHeaderLen : ] ) )
2021-08-21 16:54:24 +02:00
}
}
}
2021-08-25 10:13:53 +02:00
2021-08-20 19:32:50 +02:00
var elem * QueueOutboundElement
elem = device . NewOutboundElement ( )
copy ( elem . buffer [ offset : offset + len ( packet ) ] , packet )
2021-09-21 03:15:23 +02:00
elem . Type = usage
2021-08-20 19:32:50 +02:00
elem . packet = elem . buffer [ offset : offset + len ( packet ) ]
if peer . isRunning . Get ( ) {
peer . StagePacket ( elem )
elem = nil
peer . SendStagedPackets ( )
}
}
2021-09-21 03:15:23 +02:00
func ( device * Device ) BoardcastPacket ( skip_list map [ config . Vertex ] bool , usage path . Usage , packet [ ] byte , offset int ) { // Send packet to all connected peers
2021-08-20 19:32:50 +02:00
send_list := device . graph . GetBoardcastList ( device . ID )
for node_id , _ := range skip_list {
send_list [ node_id ] = false
}
2021-08-23 18:39:04 +02:00
device . peers . RLock ( )
2021-08-20 19:32:50 +02:00
for node_id , should_send := range send_list {
if should_send {
2021-09-21 03:15:23 +02:00
device . SendPacket ( device . peers . IDMap [ node_id ] , usage , packet , offset )
2021-08-20 19:32:50 +02:00
}
}
2021-08-23 18:39:04 +02:00
device . peers . RUnlock ( )
2021-08-20 19:32:50 +02:00
}
2021-09-21 03:15:23 +02:00
func ( device * Device ) SpreadPacket ( skip_list map [ config . Vertex ] bool , usage path . Usage , packet [ ] byte , offset int ) { // Send packet to all peers no matter it is alive
2021-08-23 18:39:04 +02:00
device . peers . RLock ( )
2021-08-20 19:32:50 +02:00
for peer_id , peer_out := range device . peers . IDMap {
if _ , ok := skip_list [ peer_id ] ; ok {
2021-08-25 10:13:53 +02:00
if device . LogLevel . LogTransit {
2021-08-25 13:54:13 +02:00
fmt . Printf ( "Transit: Skipped Spread Packet packet through %d to %d\n" , device . ID , peer_out . ID )
2021-08-20 19:32:50 +02:00
}
continue
}
2021-09-21 03:15:23 +02:00
device . SendPacket ( peer_out , usage , packet , MessageTransportOffsetContent )
2021-08-20 19:32:50 +02:00
}
2021-08-23 18:39:04 +02:00
device . peers . RUnlock ( )
2021-08-20 19:32:50 +02:00
}
2021-09-21 03:15:23 +02:00
func ( device * Device ) TransitBoardcastPacket ( src_nodeID config . Vertex , in_id config . Vertex , usage path . Usage , packet [ ] byte , offset int ) {
2021-08-20 19:32:50 +02:00
node_boardcast_list := device . graph . GetBoardcastThroughList ( device . ID , in_id , src_nodeID )
2021-08-23 18:39:04 +02:00
device . peers . RLock ( )
2021-08-20 19:32:50 +02:00
for peer_id := range node_boardcast_list {
peer_out := device . peers . IDMap [ peer_id ]
2021-08-25 10:13:53 +02:00
if device . LogLevel . LogTransit {
2021-08-25 13:54:13 +02:00
fmt . Printf ( "Transit: Transfer packet from %d through %d to %d\n" , in_id , device . ID , peer_out . ID )
2021-08-20 19:32:50 +02:00
}
2021-09-21 03:15:23 +02:00
device . SendPacket ( peer_out , usage , packet , offset )
2021-08-20 19:32:50 +02:00
}
2021-08-23 18:39:04 +02:00
device . peers . RUnlock ( )
2021-08-20 19:32:50 +02:00
}
2021-09-21 03:15:23 +02:00
func ( device * Device ) Send2Super ( usage path . Usage , packet [ ] byte , offset int ) {
2021-08-23 18:39:04 +02:00
device . peers . RLock ( )
2021-08-20 19:32:50 +02:00
if device . DRoute . SuperNode . UseSuperNode {
for _ , peer_out := range device . peers . SuperPeer {
2021-08-25 10:13:53 +02:00
/ * if device . LogTransit {
2021-08-20 19:32:50 +02:00
fmt . Printf ( "Send to supernode %s\n" , peer_out . endpoint . DstToString ( ) )
2021-08-25 10:13:53 +02:00
} * /
2021-09-21 03:15:23 +02:00
device . SendPacket ( peer_out , usage , packet , offset )
2021-08-20 19:32:50 +02:00
}
}
2021-08-23 18:39:04 +02:00
device . peers . RUnlock ( )
2021-08-20 19:32:50 +02:00
}
func ( device * Device ) CheckNoDup ( packet [ ] byte ) bool {
hasher := crc32 . New ( crc32 . MakeTable ( crc32 . Castagnoli ) )
hasher . Write ( packet )
crc32result := hasher . Sum32 ( )
_ , ok := device . DupData . Get ( crc32result )
device . DupData . Set ( crc32result , true )
return ! ok
}
2021-08-23 18:39:04 +02:00
func ( device * Device ) process_received ( msg_type path . Usage , peer * Peer , body [ ] byte ) ( err error ) {
2021-08-20 19:32:50 +02:00
if device . IsSuperNode {
switch msg_type {
case path . Register :
if content , err := path . ParseRegisterMsg ( body ) ; err == nil {
2021-09-20 18:27:53 +02:00
return device . server_process_RegisterMsg ( peer , content )
2021-08-20 19:32:50 +02:00
}
case path . PongPacket :
if content , err := path . ParsePongMsg ( body ) ; err == nil {
return device . server_process_Pong ( content )
}
default :
err = errors . New ( "Not a valid msg_type" )
}
} else {
switch msg_type {
case path . UpdatePeer :
if content , err := path . ParseUpdatePeerMsg ( body ) ; err == nil {
2021-09-20 18:27:53 +02:00
go device . process_UpdatePeerMsg ( peer , content )
2021-08-20 19:32:50 +02:00
}
case path . UpdateNhTable :
if content , err := path . ParseUpdateNhTableMsg ( body ) ; err == nil {
2021-09-20 18:27:53 +02:00
go device . process_UpdateNhTableMsg ( peer , content )
}
case path . UpdateError :
if content , err := path . ParseUpdateErrorMsg ( body ) ; err == nil {
device . process_UpdateErrorMsg ( peer , content )
2021-08-20 19:32:50 +02:00
}
case path . PingPacket :
if content , err := path . ParsePingMsg ( body ) ; err == nil {
2021-09-20 22:20:00 +02:00
return device . process_ping ( peer , content )
2021-08-20 19:32:50 +02:00
}
case path . PongPacket :
if content , err := path . ParsePongMsg ( body ) ; err == nil {
2021-08-23 18:39:04 +02:00
return device . process_pong ( peer , content )
2021-08-20 19:32:50 +02:00
}
2021-08-23 18:39:04 +02:00
case path . QueryPeer :
if content , err := path . ParseQueryPeerMsg ( body ) ; err == nil {
2021-08-20 19:32:50 +02:00
return device . process_RequestPeerMsg ( content )
}
case path . BoardcastPeer :
if content , err := path . ParseBoardcastPeerMsg ( body ) ; err == nil {
2021-08-24 10:43:55 +02:00
return device . process_BoardcastPeerMsg ( peer , content )
2021-08-20 19:32:50 +02:00
}
default :
err = errors . New ( "Not a valid msg_type" )
}
}
return
}
2021-08-21 16:54:24 +02:00
func ( device * Device ) sprint_received ( msg_type path . Usage , body [ ] byte ) ( ret string ) {
switch msg_type {
case path . Register :
if content , err := path . ParseRegisterMsg ( body ) ; err == nil {
ret = content . ToString ( )
}
case path . UpdatePeer :
if content , err := path . ParseUpdatePeerMsg ( body ) ; err == nil {
ret = content . ToString ( )
}
case path . UpdateNhTable :
if content , err := path . ParseUpdateNhTableMsg ( body ) ; err == nil {
ret = content . ToString ( )
}
2021-09-20 18:27:53 +02:00
case path . UpdateError :
if content , err := path . ParseUpdateErrorMsg ( body ) ; err == nil {
ret = content . ToString ( )
}
2021-08-21 16:54:24 +02:00
case path . PingPacket :
if content , err := path . ParsePingMsg ( body ) ; err == nil {
ret = content . ToString ( )
}
case path . PongPacket :
if content , err := path . ParsePongMsg ( body ) ; err == nil {
ret = content . ToString ( )
}
2021-08-23 18:39:04 +02:00
case path . QueryPeer :
if content , err := path . ParseQueryPeerMsg ( body ) ; err == nil {
2021-08-21 16:54:24 +02:00
ret = content . ToString ( )
}
case path . BoardcastPeer :
if content , err := path . ParseBoardcastPeerMsg ( body ) ; err == nil {
ret = content . ToString ( )
}
default :
ret = "Not a valid msg_type"
}
return
}
2021-09-20 18:27:53 +02:00
func ( device * Device ) server_process_RegisterMsg ( peer * Peer , content path . RegisterMsg ) error {
2021-09-20 22:20:00 +02:00
UpdateErrorMsg := path . UpdateErrorMsg {
Node_id : peer . ID ,
Action : path . NoAction ,
ErrorCode : 0 ,
ErrorMsg : "" ,
}
2021-09-20 18:27:53 +02:00
if peer . ID != content . Node_id {
2021-09-20 22:20:00 +02:00
UpdateErrorMsg = path . UpdateErrorMsg {
2021-09-20 18:27:53 +02:00
Node_id : peer . ID ,
Action : path . Shutdown ,
ErrorCode : 401 ,
2021-09-20 22:20:00 +02:00
ErrorMsg : "Your node ID is not match with our registered nodeID" ,
2021-09-20 18:27:53 +02:00
}
2021-09-20 22:20:00 +02:00
}
if content . Version != device . Version {
UpdateErrorMsg = path . UpdateErrorMsg {
Node_id : peer . ID ,
Action : path . Shutdown ,
ErrorCode : 400 ,
ErrorMsg : "Your version is not match with our version: " + device . Version ,
}
}
if UpdateErrorMsg . Action != path . NoAction {
2021-09-20 18:27:53 +02:00
body , err := path . GetByte ( & UpdateErrorMsg )
if err != nil {
return err
}
buf := make ( [ ] byte , path . EgHeaderLen + len ( body ) )
header , err := path . NewEgHeader ( buf [ : path . EgHeaderLen ] )
header . SetSrc ( device . ID )
2021-09-20 22:20:00 +02:00
header . SetTTL ( device . DefaultTTL )
2021-09-20 18:27:53 +02:00
header . SetPacketLength ( uint16 ( len ( body ) ) )
copy ( buf [ path . EgHeaderLen : ] , body )
2021-09-20 22:20:00 +02:00
header . SetDst ( peer . ID )
2021-09-21 03:15:23 +02:00
device . SendPacket ( peer , path . UpdateError , buf , MessageTransportOffsetContent )
2021-09-20 18:27:53 +02:00
return nil
}
2021-08-20 19:32:50 +02:00
device . Event_server_register <- content
return nil
}
func ( device * Device ) server_process_Pong ( content path . PongMsg ) error {
device . Event_server_pong <- content
return nil
}
2021-09-20 22:20:00 +02:00
func ( device * Device ) process_ping ( peer * Peer , content path . PingMsg ) error {
peer . LastPingReceived = time . Now ( )
2021-08-20 19:32:50 +02:00
PongMSG := path . PongMsg {
Src_nodeID : content . Src_nodeID ,
Dst_nodeID : device . ID ,
Timediff : device . graph . GetCurrentTime ( ) . Sub ( content . Time ) ,
}
2021-08-25 11:18:54 +02:00
if device . DRoute . P2P . UseP2P && time . Now ( ) . After ( device . graph . NhTableExpire ) {
2021-08-25 15:21:26 +02:00
device . graph . UpdateLentancy ( content . Src_nodeID , device . ID , PongMSG . Timediff , true , false )
2021-08-25 11:18:54 +02:00
}
2021-08-20 19:32:50 +02:00
body , err := path . GetByte ( & PongMSG )
if err != nil {
return err
}
buf := make ( [ ] byte , path . EgHeaderLen + len ( body ) )
header , err := path . NewEgHeader ( buf [ : path . EgHeaderLen ] )
header . SetSrc ( device . ID )
2021-09-20 22:20:00 +02:00
header . SetTTL ( device . DefaultTTL )
2021-08-20 19:32:50 +02:00
header . SetPacketLength ( uint16 ( len ( body ) ) )
2021-08-21 16:54:24 +02:00
copy ( buf [ path . EgHeaderLen : ] , body )
2021-08-20 19:32:50 +02:00
if device . DRoute . SuperNode . UseSuperNode {
2021-08-25 10:13:53 +02:00
header . SetDst ( config . SuperNodeMessage )
2021-09-21 03:15:23 +02:00
device . Send2Super ( path . PongPacket , buf , MessageTransportOffsetContent )
2021-08-20 19:32:50 +02:00
}
if device . DRoute . P2P . UseP2P {
2021-08-25 10:13:53 +02:00
header . SetDst ( config . ControlMessage )
2021-09-21 03:15:23 +02:00
device . SpreadPacket ( make ( map [ config . Vertex ] bool ) , path . PongPacket , buf , MessageTransportOffsetContent )
2021-08-20 19:32:50 +02:00
}
return nil
}
2021-08-23 18:39:04 +02:00
func ( device * Device ) process_pong ( peer * Peer , content path . PongMsg ) error {
2021-08-20 19:32:50 +02:00
if device . DRoute . P2P . UseP2P {
2021-08-25 11:18:54 +02:00
if time . Now ( ) . After ( device . graph . NhTableExpire ) {
2021-08-25 15:21:26 +02:00
device . graph . UpdateLentancy ( content . Src_nodeID , content . Dst_nodeID , content . Timediff , true , false )
2021-08-25 11:18:54 +02:00
}
2021-08-23 18:39:04 +02:00
if ! peer . AskedForNeighbor {
QueryPeerMsg := path . QueryPeerMsg {
Request_ID : uint32 ( device . ID ) ,
}
body , err := path . GetByte ( & QueryPeerMsg )
if err != nil {
return err
}
buf := make ( [ ] byte , path . EgHeaderLen + len ( body ) )
header , err := path . NewEgHeader ( buf [ : path . EgHeaderLen ] )
header . SetSrc ( device . ID )
2021-09-20 22:20:00 +02:00
header . SetTTL ( device . DefaultTTL )
2021-08-23 18:39:04 +02:00
header . SetPacketLength ( uint16 ( len ( body ) ) )
copy ( buf [ path . EgHeaderLen : ] , body )
2021-09-21 03:15:23 +02:00
device . SendPacket ( peer , path . QueryPeer , buf , MessageTransportOffsetContent )
2021-08-23 18:39:04 +02:00
}
2021-08-20 19:32:50 +02:00
}
return nil
}
2021-09-20 18:27:53 +02:00
func ( device * Device ) process_UpdatePeerMsg ( peer * Peer , content path . UpdatePeerMsg ) error {
2021-08-20 19:32:50 +02:00
var send_signal bool
if device . DRoute . SuperNode . UseSuperNode {
2021-09-20 18:27:53 +02:00
if peer . ID != config . SuperNodeMessage {
if device . LogLevel . LogControl {
fmt . Println ( "Control: Ignored UpdateErrorMsg. Not from supernode." )
}
return nil
}
2021-08-20 19:32:50 +02:00
var peer_infos config . HTTP_Peers
if bytes . Equal ( device . peers . Peer_state [ : ] , content . State_hash [ : ] ) {
2021-08-25 13:54:13 +02:00
if device . LogLevel . LogControl {
fmt . Println ( "Control: Same PeerState Hash, skip download nhTable" )
}
2021-08-20 19:32:50 +02:00
return nil
}
2021-08-21 16:54:24 +02:00
2021-09-21 03:15:23 +02:00
downloadurl := device . DRoute . SuperNode . APIUrl + "/peerinfo?PubKey=" + url . QueryEscape ( device . staticIdentity . publicKey . ToString ( ) ) + "&State=" + url . QueryEscape ( string ( content . State_hash [ : ] ) )
2021-08-25 10:13:53 +02:00
if device . LogLevel . LogControl {
2021-08-25 13:54:13 +02:00
fmt . Println ( "Control: Download peerinfo from :" + downloadurl )
2021-08-21 16:54:24 +02:00
}
2021-08-24 20:16:21 +02:00
client := http . Client {
Timeout : 30 * time . Second ,
}
resp , err := client . Get ( downloadurl )
2021-08-20 19:32:50 +02:00
if err != nil {
2021-08-24 20:16:21 +02:00
device . log . Errorf ( err . Error ( ) )
2021-08-20 19:32:50 +02:00
return err
}
defer resp . Body . Close ( )
allbytes , err := ioutil . ReadAll ( resp . Body )
if err != nil {
2021-08-24 20:16:21 +02:00
device . log . Errorf ( err . Error ( ) )
2021-08-20 19:32:50 +02:00
return err
}
2021-08-25 10:13:53 +02:00
if device . LogLevel . LogControl {
2021-08-25 13:54:13 +02:00
fmt . Println ( "Control: Download peerinfo result :" + string ( allbytes ) )
2021-08-25 10:13:53 +02:00
}
2021-08-20 19:32:50 +02:00
if err := json . Unmarshal ( allbytes , & peer_infos ) ; err != nil {
2021-08-24 20:16:21 +02:00
device . log . Errorf ( err . Error ( ) )
2021-08-20 19:32:50 +02:00
return err
}
2021-09-21 03:15:23 +02:00
for nodeID , thepeer := range device . peers . IDMap {
pk := thepeer . handshake . remoteStatic
psk := thepeer . handshake . presharedKey
if val , ok := peer_infos [ pk . ToString ( ) ] ; ok {
if val . NodeID != nodeID {
device . RemovePeer ( pk )
continue
} else if val . PSKey != psk . ToString ( ) {
device . RemovePeer ( pk )
continue
}
} else {
device . RemovePeer ( pk )
continue
}
}
2021-08-24 20:16:21 +02:00
for pubkey , peerinfo := range peer_infos {
2021-08-20 19:32:50 +02:00
if len ( peerinfo . Connurl ) == 0 {
return nil
}
2021-09-21 03:15:23 +02:00
sk , err := Str2PubKey ( pubkey )
if err != nil {
device . log . Errorf ( "Error decode base64:" , err )
return err
}
2021-08-20 19:32:50 +02:00
if bytes . Equal ( sk [ : ] , device . staticIdentity . publicKey [ : ] ) {
continue
}
thepeer := device . LookupPeer ( sk )
if thepeer == nil { //not exist in local
2021-08-25 11:18:54 +02:00
if device . LogLevel . LogControl {
2021-08-25 13:54:13 +02:00
fmt . Println ( "Control: Add new peer to local ID:" + peerinfo . NodeID . ToString ( ) + " PubKey:" + pubkey )
2021-08-25 11:18:54 +02:00
}
2021-08-20 19:32:50 +02:00
if device . graph . Weight ( device . ID , peerinfo . NodeID ) == path . Infinity { // add node to graph
2021-08-25 15:21:26 +02:00
device . graph . UpdateLentancy ( device . ID , peerinfo . NodeID , path . S2TD ( path . Infinity ) , true , false )
2021-08-20 19:32:50 +02:00
}
if device . graph . Weight ( peerinfo . NodeID , device . ID ) == path . Infinity { // add node to graph
2021-08-25 15:21:26 +02:00
device . graph . UpdateLentancy ( peerinfo . NodeID , device . ID , path . S2TD ( path . Infinity ) , true , false )
2021-08-20 19:32:50 +02:00
}
device . NewPeer ( sk , peerinfo . NodeID )
thepeer = device . LookupPeer ( sk )
if peerinfo . PSKey != "" {
2021-09-21 03:15:23 +02:00
pk , err := Str2PSKey ( peerinfo . PSKey )
if err != nil {
device . log . Errorf ( "Error decode base64:" , err )
return err
}
2021-08-20 19:32:50 +02:00
thepeer . handshake . presharedKey = pk
}
}
if thepeer . LastPingReceived . Add ( path . S2TD ( device . DRoute . P2P . PeerAliveTimeout ) ) . Before ( time . Now ( ) ) {
//Peer died, try to switch to this new endpoint
for url , _ := range peerinfo . Connurl {
2021-09-21 03:15:23 +02:00
thepeer . Lock ( )
thepeer . endpoint_trylist . Set ( url , time . Time { } ) //another gorouting will process it
thepeer . Unlock ( )
2021-08-20 19:32:50 +02:00
send_signal = true
}
}
}
device . peers . Peer_state = content . State_hash
if send_signal {
device . event_tryendpoint <- struct { } { }
}
}
return nil
}
2021-09-20 18:27:53 +02:00
func ( device * Device ) process_UpdateNhTableMsg ( peer * Peer , content path . UpdateNhTableMsg ) error {
if device . DRoute . SuperNode . UseSuperNode {
if peer . ID != config . SuperNodeMessage {
if device . LogLevel . LogControl {
fmt . Println ( "Control: Ignored UpdateErrorMsg. Not from supernode." )
}
return nil
}
if bytes . Equal ( device . graph . NhTableHash [ : ] , content . State_hash [ : ] ) {
if device . LogLevel . LogControl {
fmt . Println ( "Control: Same nhTable Hash, skip download nhTable" )
}
device . graph . NhTableExpire = time . Now ( ) . Add ( device . graph . SuperNodeInfoTimeout )
return nil
}
var NhTable config . NextHopTable
if bytes . Equal ( device . graph . NhTableHash [ : ] , content . State_hash [ : ] ) {
return nil
}
2021-09-21 03:15:23 +02:00
downloadurl := device . DRoute . SuperNode . APIUrl + "/nhtable?PubKey=" + url . QueryEscape ( device . staticIdentity . publicKey . ToString ( ) ) + "&State=" + url . QueryEscape ( string ( content . State_hash [ : ] ) )
2021-09-20 18:27:53 +02:00
if device . LogLevel . LogControl {
fmt . Println ( "Control: Download NhTable from :" + downloadurl )
}
client := http . Client {
Timeout : 30 * time . Second ,
}
resp , err := client . Get ( downloadurl )
if err != nil {
device . log . Errorf ( err . Error ( ) )
return err
}
defer resp . Body . Close ( )
allbytes , err := ioutil . ReadAll ( resp . Body )
if err != nil {
device . log . Errorf ( err . Error ( ) )
return err
}
if device . LogLevel . LogControl {
fmt . Println ( "Control: Download NhTable result :" + string ( allbytes ) )
}
if err := json . Unmarshal ( allbytes , & NhTable ) ; err != nil {
device . log . Errorf ( err . Error ( ) )
return err
}
device . graph . SetNHTable ( NhTable , content . State_hash )
}
return nil
}
func ( device * Device ) process_UpdateErrorMsg ( peer * Peer , content path . UpdateErrorMsg ) error {
if peer . ID != config . SuperNodeMessage {
if device . LogLevel . LogControl {
fmt . Println ( "Control: Ignored UpdateErrorMsg. Not from supernode." )
}
return nil
}
device . log . Errorf ( content . ToString ( ) )
if content . Action == path . Shutdown {
device . closed <- struct { } { }
} else if content . Action == path . Panic {
panic ( content . ToString ( ) )
}
return nil
}
2021-08-20 19:32:50 +02:00
func ( device * Device ) RoutineSetEndpoint ( ) {
if ! ( device . DRoute . P2P . UseP2P || device . DRoute . SuperNode . UseSuperNode ) {
return
}
for {
NextRun := false
<- device . event_tryendpoint
for _ , thepeer := range device . peers . IDMap {
if thepeer . LastPingReceived . Add ( path . S2TD ( device . DRoute . P2P . PeerAliveTimeout ) ) . After ( time . Now ( ) ) {
//Peer alives
2021-09-21 03:15:23 +02:00
thepeer . Lock ( )
for _ , key := range thepeer . endpoint_trylist . Keys ( ) { // delete whole endpoint_trylist
2021-08-23 18:39:04 +02:00
thepeer . endpoint_trylist . Delete ( key )
2021-09-21 03:15:23 +02:00
}
thepeer . Unlock ( )
2021-08-20 19:32:50 +02:00
} else {
2021-09-21 03:15:23 +02:00
thepeer . RLock ( )
trylist := thepeer . endpoint_trylist . Keys ( )
thepeer . RUnlock ( )
for _ , key := range trylist { // try next endpoint
connurl := key
thepeer . RLock ( )
val , hasval := thepeer . endpoint_trylist . Get ( key )
thepeer . RUnlock ( )
if ! hasval {
continue
}
trytime := val . ( time . Time )
if trytime . Sub ( time . Time { } ) != time . Duration ( 0 ) && time . Now ( ) . Sub ( trytime ) > path . S2TD ( device . DRoute . ConnTimeOut ) { // tried before, but no response
thepeer . Lock ( )
2021-08-23 18:39:04 +02:00
thepeer . endpoint_trylist . Delete ( key )
2021-09-21 03:15:23 +02:00
thepeer . Unlock ( )
2021-08-20 19:32:50 +02:00
} else {
2021-09-21 03:15:23 +02:00
endpoint , err := device . Bind ( ) . ParseEndpoint ( connurl ) //trying to bind first url in the list and wait device.DRoute.P2P.PeerAliveTimeout seconds
2021-08-20 19:32:50 +02:00
if err != nil {
2021-09-21 03:15:23 +02:00
device . log . Errorf ( "Can't bind " + connurl )
thepeer . Lock ( )
thepeer . endpoint_trylist . Delete ( connurl )
thepeer . Unlock ( )
continue
2021-08-20 19:32:50 +02:00
}
2021-08-25 10:13:53 +02:00
if device . LogLevel . LogControl {
2021-08-25 13:54:13 +02:00
fmt . Println ( "Control: Set endpoint to " + endpoint . DstToString ( ) + " for NodeID:" + thepeer . ID . ToString ( ) )
2021-08-21 16:54:24 +02:00
}
2021-08-20 19:32:50 +02:00
thepeer . SetEndpointFromPacket ( endpoint )
NextRun = true
2021-09-21 03:15:23 +02:00
thepeer . Lock ( )
thepeer . endpoint_trylist . Set ( key , time . Now ( ) )
thepeer . Unlock ( )
2021-08-20 19:32:50 +02:00
//Send Ping message to it
2021-09-21 03:15:23 +02:00
packet , usage , err := device . GeneratePingPacket ( device . ID )
device . SendPacket ( thepeer , usage , packet , MessageTransportOffsetContent )
break
2021-08-20 19:32:50 +02:00
}
2021-09-21 03:15:23 +02:00
}
2021-08-20 19:32:50 +02:00
}
}
ClearChanLoop :
for {
select {
case <- device . event_tryendpoint :
default :
break ClearChanLoop
}
}
time . Sleep ( path . S2TD ( device . DRoute . P2P . PeerAliveTimeout ) )
if NextRun {
device . event_tryendpoint <- struct { } { }
}
}
}
func ( device * Device ) RoutineSendPing ( ) {
if ! ( device . DRoute . P2P . UseP2P || device . DRoute . SuperNode . UseSuperNode ) {
return
}
for {
2021-09-21 03:15:23 +02:00
packet , usage , _ := device . GeneratePingPacket ( device . ID )
device . SpreadPacket ( make ( map [ config . Vertex ] bool ) , usage , packet , MessageTransportOffsetContent )
2021-08-20 19:32:50 +02:00
time . Sleep ( path . S2TD ( device . DRoute . SendPingInterval ) )
}
}
func ( device * Device ) RoutineRegister ( ) {
if ! ( device . DRoute . SuperNode . UseSuperNode ) {
return
}
2021-08-25 10:13:53 +02:00
_ = <- device . Event_Supernode_OK
2021-08-20 19:32:50 +02:00
for {
body , _ := path . GetByte ( path . RegisterMsg {
2021-08-23 21:11:01 +02:00
Node_id : device . ID ,
PeerStateHash : device . peers . Peer_state ,
NhStateHash : device . graph . NhTableHash ,
2021-09-20 22:20:00 +02:00
Version : device . Version ,
2021-08-20 19:32:50 +02:00
} )
buf := make ( [ ] byte , path . EgHeaderLen + len ( body ) )
header , _ := path . NewEgHeader ( buf [ 0 : path . EgHeaderLen ] )
2021-08-25 10:13:53 +02:00
header . SetDst ( config . SuperNodeMessage )
2021-08-20 19:32:50 +02:00
header . SetTTL ( 0 )
header . SetSrc ( device . ID )
header . SetPacketLength ( uint16 ( len ( body ) ) )
copy ( buf [ path . EgHeaderLen : ] , body )
2021-09-21 03:15:23 +02:00
device . Send2Super ( path . Register , buf , MessageTransportOffsetContent )
2021-08-20 19:32:50 +02:00
time . Sleep ( path . S2TD ( device . DRoute . SendPingInterval ) )
}
}
func ( device * Device ) RoutineRecalculateNhTable ( ) {
if device . IsSuperNode {
for {
changed := device . graph . RecalculateNhTable ( true )
if changed {
device . Event_server_NhTable_changed <- struct { } { }
}
time . Sleep ( device . graph . NodeReportTimeout )
}
} else {
if ! device . DRoute . P2P . UseP2P {
return
}
for {
2021-08-24 20:16:21 +02:00
if time . Now ( ) . After ( device . graph . NhTableExpire ) {
device . graph . RecalculateNhTable ( false )
}
2021-08-20 19:32:50 +02:00
time . Sleep ( device . graph . NodeReportTimeout )
}
}
2021-08-23 18:39:04 +02:00
}
2021-08-20 19:32:50 +02:00
2021-08-23 18:39:04 +02:00
func ( device * Device ) RoutineSpreadAllMyNeighbor ( ) {
if ! device . DRoute . P2P . UseP2P {
return
}
for {
device . process_RequestPeerMsg ( path . QueryPeerMsg {
2021-08-25 10:13:53 +02:00
Request_ID : uint32 ( config . Boardcast ) ,
2021-08-23 18:39:04 +02:00
} )
time . Sleep ( path . S2TD ( device . DRoute . P2P . SendPeerInterval ) )
}
2021-08-20 19:32:50 +02:00
}
2021-08-24 10:43:55 +02:00
func ( device * Device ) RoutineResetConn ( ) {
if device . ResetConnInterval <= 0.01 {
return
}
for {
for _ , peer := range device . peers . keyMap {
if peer . StaticConn {
continue
}
if peer . ConnURL == "" {
continue
}
endpoint , err := device . Bind ( ) . ParseEndpoint ( peer . ConnURL )
if err != nil {
device . log . Errorf ( "Failed to bind " + peer . ConnURL , err )
continue
}
peer . SetEndpointFromPacket ( endpoint )
}
time . Sleep ( time . Duration ( device . ResetConnInterval ) )
}
}
2021-09-21 03:15:23 +02:00
func ( device * Device ) GeneratePingPacket ( src_nodeID config . Vertex ) ( [ ] byte , path . Usage , error ) {
2021-08-20 19:32:50 +02:00
body , err := path . GetByte ( & path . PingMsg {
Src_nodeID : src_nodeID ,
Time : device . graph . GetCurrentTime ( ) ,
} )
if err != nil {
2021-09-21 03:15:23 +02:00
return nil , path . PingPacket , err
2021-08-20 19:32:50 +02:00
}
buf := make ( [ ] byte , path . EgHeaderLen + len ( body ) )
header , _ := path . NewEgHeader ( buf [ 0 : path . EgHeaderLen ] )
if err != nil {
2021-09-21 03:15:23 +02:00
return nil , path . PingPacket , err
2021-08-20 19:32:50 +02:00
}
2021-09-20 22:20:00 +02:00
header . SetDst ( config . ControlMessage )
2021-08-20 19:32:50 +02:00
header . SetTTL ( 0 )
header . SetSrc ( device . ID )
header . SetPacketLength ( uint16 ( len ( body ) ) )
copy ( buf [ path . EgHeaderLen : ] , body )
2021-09-21 03:15:23 +02:00
return buf , path . PingPacket , nil
2021-08-20 19:32:50 +02:00
}
2021-08-23 18:39:04 +02:00
func ( device * Device ) process_RequestPeerMsg ( content path . QueryPeerMsg ) error { //Send all my peers to all my peers
2021-08-20 19:32:50 +02:00
if device . DRoute . P2P . UseP2P {
2021-08-23 18:39:04 +02:00
device . peers . RLock ( )
2021-08-20 19:32:50 +02:00
for pubkey , peer := range device . peers . keyMap {
2021-08-25 10:13:53 +02:00
if peer . ID >= config . Special_NodeID {
2021-08-20 19:32:50 +02:00
continue
}
2021-08-23 18:39:04 +02:00
if peer . endpoint == nil {
continue
}
peer . handshake . mutex . RLock ( )
2021-08-20 19:32:50 +02:00
response := path . BoardcastPeerMsg {
2021-08-21 16:54:24 +02:00
Request_ID : content . Request_ID ,
NodeID : peer . ID ,
PubKey : pubkey ,
PSKey : peer . handshake . presharedKey ,
ConnURL : peer . endpoint . DstToString ( ) ,
2021-08-20 19:32:50 +02:00
}
2021-08-23 18:39:04 +02:00
peer . handshake . mutex . RUnlock ( )
2021-08-20 19:32:50 +02:00
body , err := path . GetByte ( response )
if err != nil {
device . log . Errorf ( "Error at receivesendproc.go line221: " , err )
continue
}
buf := make ( [ ] byte , path . EgHeaderLen + len ( body ) )
header , _ := path . NewEgHeader ( buf [ 0 : path . EgHeaderLen ] )
2021-08-25 10:13:53 +02:00
header . SetDst ( config . ControlMessage )
2021-09-20 22:20:00 +02:00
header . SetTTL ( device . DefaultTTL )
2021-08-20 19:32:50 +02:00
header . SetSrc ( device . ID )
header . SetPacketLength ( uint16 ( len ( body ) ) )
copy ( buf [ path . EgHeaderLen : ] , body )
2021-09-21 03:15:23 +02:00
device . SpreadPacket ( make ( map [ config . Vertex ] bool ) , path . BoardcastPeer , buf , MessageTransportOffsetContent )
2021-08-20 19:32:50 +02:00
}
2021-08-23 18:39:04 +02:00
device . peers . RUnlock ( )
2021-08-20 19:32:50 +02:00
}
return nil
}
2021-08-24 10:43:55 +02:00
func ( device * Device ) process_BoardcastPeerMsg ( peer * Peer , content path . BoardcastPeerMsg ) error {
2021-08-20 19:32:50 +02:00
if device . DRoute . P2P . UseP2P {
2021-09-21 03:15:23 +02:00
var pk NoisePublicKey
2021-08-24 10:43:55 +02:00
if content . Request_ID == uint32 ( device . ID ) {
peer . AskedForNeighbor = true
}
2021-08-23 18:39:04 +02:00
if bytes . Equal ( content . PubKey [ : ] , device . staticIdentity . publicKey [ : ] ) {
return nil
}
2021-09-21 03:15:23 +02:00
copy ( pk [ : ] , content . PubKey [ : ] )
thepeer := device . LookupPeer ( pk )
2021-08-20 19:32:50 +02:00
if thepeer == nil { //not exist in local
2021-08-25 11:18:54 +02:00
if device . LogLevel . LogControl {
2021-09-21 03:15:23 +02:00
fmt . Println ( "Control: Add new peer to local ID:" + content . NodeID . ToString ( ) + " PubKey:" + pk . ToString ( ) )
2021-08-25 11:18:54 +02:00
}
2021-08-20 19:32:50 +02:00
if device . graph . Weight ( device . ID , content . NodeID ) == path . Infinity { // add node to graph
2021-08-25 15:21:26 +02:00
device . graph . UpdateLentancy ( device . ID , content . NodeID , path . S2TD ( path . Infinity ) , true , false )
2021-08-20 19:32:50 +02:00
}
if device . graph . Weight ( content . NodeID , device . ID ) == path . Infinity { // add node to graph
2021-08-25 15:21:26 +02:00
device . graph . UpdateLentancy ( content . NodeID , device . ID , path . S2TD ( path . Infinity ) , true , false )
2021-08-20 19:32:50 +02:00
}
2021-09-21 03:15:23 +02:00
device . NewPeer ( pk , content . NodeID )
thepeer = device . LookupPeer ( pk )
2021-08-20 19:32:50 +02:00
var pk NoisePresharedKey
copy ( pk [ : ] , content . PSKey [ : ] )
thepeer . handshake . presharedKey = pk
}
if thepeer . LastPingReceived . Add ( path . S2TD ( device . DRoute . P2P . PeerAliveTimeout ) ) . Before ( time . Now ( ) ) {
//Peer died, try to switch to this new endpoint
2021-09-21 03:15:23 +02:00
thepeer . Lock ( )
thepeer . endpoint_trylist . Set ( content . ConnURL , time . Time { } ) //another gorouting will process it
thepeer . Unlock ( )
2021-08-23 18:39:04 +02:00
device . event_tryendpoint <- struct { } { }
2021-08-20 19:32:50 +02:00
}
}
return nil
}