From 19fe84cf0cfe28d0f2b427cd15483406fa3d9b8e Mon Sep 17 00:00:00 2001 From: KusakabeSi Date: Sat, 21 Aug 2021 14:54:24 +0000 Subject: [PATCH] bunch of bugfix, supermode OK --- Makefile | 2 + config/config.go | 4 +- conn/bind_linux.go | 4 +- device/device.go | 15 ++- device/peer.go | 20 +++- device/receive.go | 29 +++-- device/receivesendproc.go | 79 ++++++++++-- main_edge.go | 3 +- main_httpserver.go | 27 ++--- main_super.go | 245 +++++++++++++++++++++++--------------- path/header.go | 9 +- path/metamessage.go | 116 +++++++++++------- path/path.go | 51 ++++---- 13 files changed, 403 insertions(+), 201 deletions(-) diff --git a/Makefile b/Makefile index ff33562..f296836 100644 --- a/Makefile +++ b/Makefile @@ -8,6 +8,8 @@ all: generate-version-and-build MAKEFLAGS += --no-print-directory generate-version-and-build: + go mod download && \ + go mod tidy && \ @export GIT_CEILING_DIRECTORIES="$(realpath $(CURDIR)/..)" && \ tag="$$(git describe --dirty 2>/dev/null)" && \ ver="$$(printf 'package main\n\nconst Version = "%s"\n' "$$tag")" && \ diff --git a/config/config.go b/config/config.go index 584e418..c5bc7fc 100644 --- a/config/config.go +++ b/config/config.go @@ -18,6 +18,7 @@ type SuperConfig struct { PrivKeyV6 string ListenPort int LogLevel LoggerInfo + RePushConfigInterval float64 GraphRecalculateSetting GraphRecalculateSetting Peers []PeerInfo } @@ -36,6 +37,7 @@ type InterfaceConf struct { type PeerInfo struct { NodeID Vertex PubKey string + PSKey string EndPoint string Static bool } @@ -43,6 +45,7 @@ type PeerInfo struct { type LoggerInfo struct { LogLevel string LogTransit bool + LogControl bool } // Nonnegative integer ID of vertex @@ -100,4 +103,3 @@ type HTTP_Peerinfo struct { type HTTP_Peers struct { Peers map[string]HTTP_Peerinfo } - diff --git a/conn/bind_linux.go b/conn/bind_linux.go index 575bb8c..b78c272 100644 --- a/conn/bind_linux.go +++ b/conn/bind_linux.go @@ -155,11 +155,11 @@ again: } } var fns []ReceiveFunc - if sock4 != -1 { + if sock4 != -1 && bind.use4 { bind.sock4 = sock4 fns = append(fns, bind.receiveIPv4) } - if sock6 != -1 { + if sock6 != -1 && bind.use6 { bind.sock6 = sock6 fns = append(fns, bind.receiveIPv6) } diff --git a/device/device.go b/device/device.go index b4af106..8479190 100644 --- a/device/device.go +++ b/device/device.go @@ -81,11 +81,13 @@ type Device struct { indexTable IndexTable cookieChecker CookieChecker + MsgCount uint32 IsSuperNode bool ID config.Vertex graph *path.IG l2fib map[tap.MacAddress]config.Vertex LogTransit bool + LogControl bool DRoute config.DynamicRouteInfo DupData fixed_time_cache.Cache @@ -302,7 +304,7 @@ func (device *Device) SetPrivateKey(sk NoisePrivateKey) error { return nil } -func NewDevice(tapDevice tap.Device, id config.Vertex, bind conn.Bind, logger *Logger, graph *path.IG, IsSuperNode bool, theconfigpath string, theconfig *config.EdgeConfig, superevents *path.SUPER_Events) *Device { +func NewDevice(tapDevice tap.Device, id config.Vertex, bind conn.Bind, logger *Logger, graph *path.IG, IsSuperNode bool, theconfigpath string, econfig *config.EdgeConfig, sconfig *config.SuperConfig, superevents *path.SUPER_Events) *Device { device := new(Device) device.state.state = uint32(deviceStateDown) device.closed = make(chan struct{}) @@ -317,6 +319,7 @@ func NewDevice(tapDevice tap.Device, id config.Vertex, bind conn.Bind, logger *L device.tap.mtu = int32(mtu) device.peers.keyMap = make(map[NoisePublicKey]*Peer) device.peers.IDMap = make(map[config.Vertex]*Peer) + device.peers.SuperPeer = make(map[NoisePublicKey]*Peer) device.IsSuperNode = IsSuperNode device.ID = id device.graph = graph @@ -329,14 +332,18 @@ func NewDevice(tapDevice tap.Device, id config.Vertex, bind conn.Bind, logger *L device.Event_server_pong = superevents.Event_server_pong device.Event_server_register = superevents.Event_server_register device.Event_server_NhTable_changed = superevents.Event_server_NhTable_changed + device.LogTransit = sconfig.LogLevel.LogTransit + device.LogControl = sconfig.LogLevel.LogControl go device.RoutineRecalculateNhTable() } else { device.EdgeConfigPath = theconfigpath - device.EdgeConfig = theconfig - device.DRoute = theconfig.DynamicRoute - device.DupData = *fixed_time_cache.NewCache(path.S2TD(theconfig.DynamicRoute.DupCheckTimeout)) + device.EdgeConfig = econfig + device.DRoute = econfig.DynamicRoute + device.DupData = *fixed_time_cache.NewCache(path.S2TD(econfig.DynamicRoute.DupCheckTimeout)) device.event_tryendpoint = make(chan struct{}, 1<<6) device.Event_save_config = make(chan struct{}, 1<<5) + device.LogTransit = econfig.LogLevel.LogTransit + device.LogControl = econfig.LogLevel.LogControl go device.RoutineSetEndpoint() go device.RoutineRegister() go device.RoutineSendPing() diff --git a/device/peer.go b/device/peer.go index 921200a..dc538ed 100644 --- a/device/peer.go +++ b/device/peer.go @@ -7,8 +7,10 @@ package device import ( "container/list" + "encoding/base64" "errors" "fmt" + "strconv" "sync" "sync/atomic" "time" @@ -88,6 +90,9 @@ func (device *Device) NewPeer(pk NoisePublicKey, id config.Vertex) (*Peer, error } // create peer + if device.LogControl { + fmt.Println("Create peer with ID : " + strconv.Itoa(int(id)) + " and PubKey:" + base64.StdEncoding.EncodeToString(pk[:])) + } peer := new(Peer) peer.Lock() defer peer.Unlock() @@ -97,6 +102,7 @@ func (device *Device) NewPeer(pk NoisePublicKey, id config.Vertex) (*Peer, error peer.queue.outbound = newAutodrainingOutboundQueue(device) peer.queue.inbound = newAutodrainingInboundQueue(device) peer.queue.staged = make(chan *QueueOutboundElement, QueueStagedSize) + peer.endpoint_trylist = make(map[string]time.Time) // map public key _, ok := device.peers.keyMap[pk] @@ -122,8 +128,8 @@ func (device *Device) NewPeer(pk NoisePublicKey, id config.Vertex) (*Peer, error // add if id == path.SuperNodeMessage { // To communicate with supernode device.peers.SuperPeer[pk] = peer + device.peers.keyMap[pk] = peer } else { // Regular peer, other edgenodes - _, ok = device.peers.IDMap[id] device.peers.keyMap[pk] = peer device.peers.IDMap[id] = peer } @@ -288,6 +294,12 @@ func (peer *Peer) Stop() { peer.ZeroAndFlushAll() } +func (peer *Peer) SetPSK(psk NoisePresharedKey) { + peer.handshake.mutex.Lock() + peer.handshake.presharedKey = psk + peer.handshake.mutex.Unlock() +} + func (peer *Peer) SetEndpointFromPacket(endpoint conn.Endpoint) { if peer.disableRoaming { return @@ -298,9 +310,15 @@ func (peer *Peer) SetEndpointFromPacket(endpoint conn.Endpoint) { } func (peer *Peer) GetEndpointSrcStr() string { + if peer.endpoint == nil { + return "" + } return peer.endpoint.SrcToString() } func (peer *Peer) GetEndpointDstStr() string { + if peer.endpoint == nil { + return "" + } return peer.endpoint.DstToString() } diff --git a/device/receive.go b/device/receive.go index 2290849..d057f89 100644 --- a/device/receive.go +++ b/device/receive.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "net" + "strconv" "sync" "sync/atomic" "time" @@ -410,7 +411,7 @@ func (peer *Peer) RoutineSequentialReceiver() { if elem == nil { return } - var EgBody path.EgHeader + var EgHeader path.EgHeader var err error var src_nodeID config.Vertex var dst_nodeID config.Vertex @@ -445,11 +446,11 @@ func (peer *Peer) RoutineSequentialReceiver() { } peer.timersDataReceived() - EgBody, err = path.NewEgHeader(elem.packet[0:path.EgHeaderLen]) - src_nodeID = EgBody.GetSrc() - dst_nodeID = EgBody.GetDst() - elem.packet = elem.packet[:EgBody.GetPacketLength()] - packet_type = EgBody.GetUsage() + EgHeader, err = path.NewEgHeader(elem.packet[0:path.EgHeaderLen]) // EG header + src_nodeID = EgHeader.GetSrc() + dst_nodeID = EgHeader.GetDst() + elem.packet = elem.packet[:EgHeader.GetPacketLength()+path.EgHeaderLen] // EG header + true packet + packet_type = EgHeader.GetUsage() if device.IsSuperNode { peer.LastPingReceived = time.Now() @@ -472,7 +473,7 @@ func (peer *Peer) RoutineSequentialReceiver() { case path.SuperNodeMessage: should_process = true case path.ControlMessage: - packet := elem.packet[path.EgHeaderLen:] + packet := elem.packet[path.EgHeaderLen:] //true packet if device.CheckNoDup(packet) { should_process = true should_transfer = true @@ -498,11 +499,11 @@ func (peer *Peer) RoutineSequentialReceiver() { } } if should_transfer { - l2ttl := EgBody.GetTTL() + l2ttl := EgHeader.GetTTL() if l2ttl == 0 { device.log.Verbosef("TTL is 0 %v", dst_nodeID) } else { - EgBody.SetTTL(l2ttl - 1) + EgHeader.SetTTL(l2ttl - 1) if dst_nodeID == path.Boardcast { //Regular transfer algorithm device.TransitBoardcastPacket(src_nodeID, peer.ID, elem.packet, MessageTransportOffsetContent) } else if dst_nodeID == path.ControlMessage { // Control Message will try send to every know node regardless the connectivity @@ -524,7 +525,15 @@ func (peer *Peer) RoutineSequentialReceiver() { if should_process { if packet_type != path.NornalPacket { - device.process_received(packet_type, elem.packet[path.EgHeaderLen:]) + if device.LogControl { + if peer.GetEndpointDstStr() != "" { + fmt.Printf("Received MID:" + strconv.Itoa(int(EgHeader.GetMessageID())) + " From:" + peer.GetEndpointDstStr() + " " + device.sprint_received(packet_type, elem.packet[path.EgHeaderLen:]) + "\n") + } + } + err = device.process_received(packet_type, elem.packet[path.EgHeaderLen:]) + if err != nil { + device.log.Errorf(err.Error()) + } } } diff --git a/device/receivesendproc.go b/device/receivesendproc.go index 2e6e3db..deaf22d 100644 --- a/device/receivesendproc.go +++ b/device/receivesendproc.go @@ -8,6 +8,8 @@ import ( "hash/crc32" "io/ioutil" "net/http" + "net/url" + "strconv" "time" "github.com/KusakabeSi/EtherGuardVPN/config" @@ -19,6 +21,16 @@ func (device *Device) SendPacket(peer *Peer, packet []byte, offset int) { if peer == nil { return } + if device.LogControl { + EgHeader, _ := path.NewEgHeader(packet[:path.EgHeaderLen]) + if EgHeader.GetUsage() != path.NornalPacket { + device.MsgCount += 1 + EgHeader.SetMessageID(device.MsgCount) + if peer.GetEndpointDstStr() != "" { + fmt.Printf("Send MID:" + strconv.Itoa(int(device.MsgCount)) + " To:" + peer.GetEndpointDstStr() + " " + device.sprint_received(EgHeader.GetUsage(), packet[path.EgHeaderLen:]) + "\n") + } + } + } var elem *QueueOutboundElement elem = device.NewOutboundElement() copy(elem.buffer[offset:offset+len(packet)], packet) @@ -135,6 +147,42 @@ func (device *Device) process_received(msg_type path.Usage, body []byte) (err er return } +func (device *Device) sprint_received(msg_type path.Usage, body []byte) (ret string) { + switch msg_type { + case path.Register: + if content, err := path.ParseRegisterMsg(body); err == nil { + ret = content.ToString() + } + case path.UpdatePeer: + if content, err := path.ParseUpdatePeerMsg(body); err == nil { + ret = content.ToString() + } + case path.UpdateNhTable: + if content, err := path.ParseUpdateNhTableMsg(body); err == nil { + ret = content.ToString() + } + case path.PingPacket: + if content, err := path.ParsePingMsg(body); err == nil { + ret = content.ToString() + } + case path.PongPacket: + if content, err := path.ParsePongMsg(body); err == nil { + ret = content.ToString() + } + case path.RequestPeer: + if content, err := path.ParseRequestPeerMsg(body); err == nil { + ret = content.ToString() + } + case path.BoardcastPeer: + if content, err := path.ParseBoardcastPeerMsg(body); err == nil { + ret = content.ToString() + } + default: + ret = "Not a valid msg_type" + } + return +} + func (device *Device) server_process_RegisterMsg(content path.RegisterMsg) error { device.Event_server_register <- content return nil @@ -161,6 +209,7 @@ func (device *Device) process_ping(content path.PingMsg) error { header.SetTTL(200) header.SetUsage(path.PongPacket) header.SetPacketLength(uint16(len(body))) + copy(buf[path.EgHeaderLen:], body) if device.DRoute.SuperNode.UseSuperNode { header.SetDst(path.SuperNodeMessage) device.Send2Super(buf, MessageTransportOffsetContent) @@ -186,7 +235,12 @@ func (device *Device) process_UpdatePeerMsg(content path.UpdatePeerMsg) error { if bytes.Equal(device.peers.Peer_state[:], content.State_hash[:]) { return nil } - resp, err := http.Get(device.DRoute.SuperNode.APIUrl + "/peerinfo?PubKey=" + PubKey2Str(device.staticIdentity.publicKey) + "?State=" + string(content.State_hash[:])) + + downloadurl := device.DRoute.SuperNode.APIUrl + "/peerinfo?PubKey=" + url.QueryEscape(PubKey2Str(device.staticIdentity.publicKey)) + "&State=" + url.QueryEscape(string(content.State_hash[:])) + if device.LogControl { + fmt.Println("Download peerinfo from :" + downloadurl) + } + resp, err := http.Get(downloadurl) if err != nil { return err } @@ -263,6 +317,9 @@ func (device *Device) RoutineSetEndpoint() { device.log.Errorf("Can't bind " + url) delete(thepeer.endpoint_trylist, url) } + if device.LogControl { + fmt.Println("Set endpoint to " + endpoint.DstToString() + " for NodeID:" + strconv.Itoa(int(thepeer.ID))) + } thepeer.SetEndpointFromPacket(endpoint) NextRun = true thepeer.endpoint_trylist[url] = time.Now() @@ -312,9 +369,11 @@ func (device *Device) RoutineRegister() { if !(device.DRoute.SuperNode.UseSuperNode) { return } + first := true for { body, _ := path.GetByte(path.RegisterMsg{ Node_id: device.ID, + Init: first, }) buf := make([]byte, path.EgHeaderLen+len(body)) header, _ := path.NewEgHeader(buf[0:path.EgHeaderLen]) @@ -326,6 +385,7 @@ func (device *Device) RoutineRegister() { copy(buf[path.EgHeaderLen:], body) device.Send2Super(buf, MessageTransportOffsetContent) time.Sleep(path.S2TD(device.DRoute.SendPingInterval)) + first = false } } @@ -343,7 +403,6 @@ func (device *Device) RoutineRecalculateNhTable() { return } for { - device.graph.RecalculateNhTable(false) time.Sleep(device.graph.NodeReportTimeout) } @@ -382,7 +441,11 @@ func (device *Device) process_UpdateNhTableMsg(content path.UpdateNhTableMsg) er if bytes.Equal(device.graph.NhTableHash[:], content.State_hash[:]) { return nil } - resp, err := http.Get(device.DRoute.SuperNode.APIUrl + "/nhtable?PubKey=" + PubKey2Str(device.staticIdentity.publicKey) + "?State=" + string(content.State_hash[:])) + downloadurl := device.DRoute.SuperNode.APIUrl + "/nhtable?PubKey=" + url.QueryEscape(PubKey2Str(device.staticIdentity.publicKey)) + "&State=" + url.QueryEscape(string(content.State_hash[:])) + if device.LogControl { + fmt.Println("Download NhTable from :" + downloadurl) + } + resp, err := http.Get(downloadurl) if err != nil { return err } @@ -407,11 +470,11 @@ func (device *Device) process_RequestPeerMsg(content path.RequestPeerMsg) error } response := path.BoardcastPeerMsg{ - RequestID: content.Request_ID, - NodeID: peer.ID, - PubKey: pubkey, - PSKey: peer.handshake.presharedKey, - ConnURL: peer.endpoint.DstToString(), + Request_ID: content.Request_ID, + NodeID: peer.ID, + PubKey: pubkey, + PSKey: peer.handshake.presharedKey, + ConnURL: peer.endpoint.DstToString(), } body, err := path.GetByte(response) if err != nil { diff --git a/main_edge.go b/main_edge.go index abf1916..8976e44 100644 --- a/main_edge.go +++ b/main_edge.go @@ -183,8 +183,7 @@ func Edge(configPath string, useUAPI bool, printExample bool) (err error) { graph := path.NewGraph(3, false, tconfig.DynamicRoute.P2P.GraphRecalculateSetting) graph.SetNHTable(tconfig.NextHopTable, [32]byte{}) - the_device := device.NewDevice(thetap, tconfig.NodeID, conn.NewDefaultBind(), logger, &graph, false, configPath, &tconfig, nil) - the_device.LogTransit = tconfig.LogLevel.LogTransit + the_device := device.NewDevice(thetap, tconfig.NodeID, conn.NewDefaultBind(), logger, graph, false, configPath, &tconfig, nil, nil) defer the_device.Close() var sk [32]byte sk_slice, _ := base64.StdEncoding.DecodeString(tconfig.PrivKey) diff --git a/main_httpserver.go b/main_httpserver.go index e86a11b..32147b5 100644 --- a/main_httpserver.go +++ b/main_httpserver.go @@ -15,8 +15,8 @@ var ( http_graph *path.IG http_device4 *device.Device http_device6 *device.Device - http_NhTable_Hash string - http_PeerInfo_hash string + http_NhTable_Hash [32]byte + http_PeerInfo_hash [32]byte http_NhTableStr []byte http_PeerInfoStr []byte http_PeerState map[string]*PeerState @@ -25,8 +25,8 @@ var ( ) type PeerState struct { - NhTableState string - PeerInfoState string + NhTableState [32]byte + PeerInfoState [32]byte } type client struct { @@ -43,7 +43,7 @@ func get_peerinfo(w http.ResponseWriter, r *http.Request) { PubKey, _ := params["PubKey"] State, _ := params["State"] if state := http_PeerState[PubKey[0]]; state != nil { - http_PeerState[PubKey[0]].PeerInfoState = State[0] + copy(http_PeerState[PubKey[0]].PeerInfoState[:], State[0]) } w.WriteHeader(http.StatusOK) w.Write([]byte(http_PeerInfoStr)) @@ -54,19 +54,18 @@ func get_nhtable(w http.ResponseWriter, r *http.Request) { PubKey, _ := params["PubKey"] State, _ := params["State"] if state := http_PeerState[PubKey[0]]; state != nil { - http_PeerState[PubKey[0]].NhTableState = State[0] + copy(http_PeerState[PubKey[0]].NhTableState[:], State[0]) } w.WriteHeader(http.StatusOK) w.Write([]byte(http_NhTableStr)) } -func HttpServer(http_port int, apiprefix string, graph *path.IG, device4 *device.Device, device6 *device.Device) { - http_graph = graph - http_device4 = device4 - http_device6 = device6 - http_PeerState = make(map[string]*PeerState) +func HttpServer(http_port int, apiprefix string) { mux := http.NewServeMux() - mux.HandleFunc("/"+apiprefix+"/peerinfo", get_peerinfo) - mux.HandleFunc("/"+apiprefix+"/nhtable", get_nhtable) - go http.ListenAndServe(":"+strconv.Itoa(http_port), mux) + if apiprefix[0] != '/' { + apiprefix = "/" + apiprefix + } + mux.HandleFunc(apiprefix+"/peerinfo", get_peerinfo) + mux.HandleFunc(apiprefix+"/nhtable", get_nhtable) + http.ListenAndServe(":"+strconv.Itoa(http_port), mux) } diff --git a/main_super.go b/main_super.go index 5e8b5f6..04f7dbd 100644 --- a/main_super.go +++ b/main_super.go @@ -8,6 +8,7 @@ package main import ( + "bytes" "crypto/md5" "encoding/base64" "encoding/hex" @@ -19,6 +20,7 @@ import ( "os/signal" "strconv" "syscall" + "time" "github.com/KusakabeSi/EtherGuardVPN/config" "github.com/KusakabeSi/EtherGuardVPN/conn" @@ -92,6 +94,10 @@ func Super(configPath string, useUAPI bool, printExample bool) (err error) { fmt.Sprintf("(%s) ", interfaceName+"_v6"), ) + http_PeerState = make(map[string]*PeerState) + http_PeerID2Map = make(map[config.Vertex]string) + http_PeerInfos.Peers = make(map[string]config.HTTP_Peerinfo) + super_chains := path.SUPER_Events{ Event_server_pong: make(chan path.PongMsg, 1<<5), Event_server_register: make(chan path.RegisterMsg, 1<<5), @@ -99,31 +105,55 @@ func Super(configPath string, useUAPI bool, printExample bool) (err error) { } thetap, _ := tap.CreateDummyTAP() - graph := path.NewGraph(3, true, sconfig.GraphRecalculateSetting) - device_v4 := device.NewDevice(thetap, path.SuperNodeMessage, conn.NewCustomBind(true, false), logger4, &graph, true, "", nil, &super_chains) - device_v6 := device.NewDevice(thetap, path.SuperNodeMessage, conn.NewCustomBind(false, true), logger6, &graph, true, "", nil, &super_chains) - defer device_v4.Close() - defer device_v6.Close() + http_graph = path.NewGraph(3, true, sconfig.GraphRecalculateSetting) + http_device4 = device.NewDevice(thetap, path.SuperNodeMessage, conn.NewCustomBind(true, false), logger4, http_graph, true, "", nil, &sconfig, &super_chains) + http_device6 = device.NewDevice(thetap, path.SuperNodeMessage, conn.NewCustomBind(false, true), logger6, http_graph, true, "", nil, &sconfig, &super_chains) + defer http_device4.Close() + defer http_device6.Close() var sk [32]byte sk_slice, _ := base64.StdEncoding.DecodeString(sconfig.PrivKeyV4) copy(sk[:], sk_slice) - device_v4.SetPrivateKey(sk) + http_device4.SetPrivateKey(sk) sk_slice, _ = base64.StdEncoding.DecodeString(sconfig.PrivKeyV6) copy(sk[:], sk_slice) - device_v6.SetPrivateKey(sk) - device_v4.IpcSet("listen_port=" + strconv.Itoa(sconfig.ListenPort) + "\n") - device_v6.IpcSet("listen_port=" + strconv.Itoa(sconfig.ListenPort) + "\n") - device_v4.IpcSet("replace_peers=true\n") - device_v6.IpcSet("listen_port=" + strconv.Itoa(sconfig.ListenPort) + "\n") + http_device6.SetPrivateKey(sk) + http_device4.IpcSet("fwmark=0\n") + http_device6.IpcSet("fwmark=0\n") + http_device4.IpcSet("listen_port=" + strconv.Itoa(sconfig.ListenPort) + "\n") + http_device6.IpcSet("listen_port=" + strconv.Itoa(sconfig.ListenPort-1) + "\n") + http_device4.IpcSet("replace_peers=true\n") + http_device6.IpcSet("replace_peers=true\n") for _, peerconf := range sconfig.Peers { - sk_slice, _ = base64.StdEncoding.DecodeString(peerconf.PubKey) - copy(sk[:], sk_slice) + var pk device.NoisePublicKey + + pk_slice, err := base64.StdEncoding.DecodeString(peerconf.PubKey) + if err != nil { + fmt.Println("Error decode base64 ", err) + } + copy(pk[:], pk_slice) if peerconf.NodeID >= path.SuperNodeMessage { return errors.New(fmt.Sprintf("Invalid Node_id at peer %s\n", peerconf.PubKey)) } - device_v4.NewPeer(sk, peerconf.NodeID) - device_v6.NewPeer(sk, peerconf.NodeID) + http_PeerID2Map[peerconf.NodeID] = peerconf.PubKey + http_PeerInfos.Peers[peerconf.PubKey] = config.HTTP_Peerinfo{ + NodeID: peerconf.NodeID, + PubKey: peerconf.PubKey, + PSKey: peerconf.PSKey, + Connurl: make(map[string]bool), + } + peer4, _ := http_device4.NewPeer(pk, peerconf.NodeID) + peer6, _ := http_device6.NewPeer(pk, peerconf.NodeID) + if peerconf.PSKey != "" { + var psk device.NoisePresharedKey + psk_slice, err := base64.StdEncoding.DecodeString(peerconf.PSKey) + if err != nil { + fmt.Println("Error decode base64 ", err) + } + copy(psk[:], psk_slice) + peer4.SetPSK(psk) + peer6.SetPSK(psk) + } http_PeerState[peerconf.PubKey] = &PeerState{} } logger4.Verbosef("Device started") @@ -131,12 +161,12 @@ func Super(configPath string, useUAPI bool, printExample bool) (err error) { errs := make(chan error, 1<<3) term := make(chan os.Signal, 1) if useUAPI { - uapi4, err := startUAPI(interfaceName+"_v4", logger4, device_v4, errs) + uapi4, err := startUAPI(interfaceName+"_v4", logger4, http_device4, errs) if err != nil { return err } defer uapi4.Close() - uapi6, err := startUAPI(interfaceName+"_v6", logger6, device_v6, errs) + uapi6, err := startUAPI(interfaceName+"_v6", logger6, http_device6, errs) if err != nil { return err } @@ -145,106 +175,131 @@ func Super(configPath string, useUAPI bool, printExample bool) (err error) { signal.Notify(term, syscall.SIGTERM) signal.Notify(term, os.Interrupt) + go Event_server_event_hendler(http_graph, super_chains) + go RoutinePushSettings(path.S2TD(sconfig.RePushConfigInterval)) + go HttpServer(sconfig.ListenPort, "/api") + select { case <-term: case <-errs: - case <-device_v4.Wait(): - case <-device_v6.Wait(): + case <-http_device4.Wait(): + case <-http_device6.Wait(): } logger4.Verbosef("Shutting down") return } -func Event_server_pong_hendler(graph path.IG, events path.SUPER_Events) { +func Event_server_event_hendler(graph *path.IG, events path.SUPER_Events) { for { - pongmsg := <-events.Event_server_pong - changed := graph.UpdateLentancy(pongmsg.Src_nodeID, pongmsg.Dst_nodeID, pongmsg.Timediff, true) - if changed { + select { + case reg_msg := <-events.Event_server_register: + if reg_msg.Init == true { + copy(http_PeerState[http_PeerID2Map[reg_msg.Node_id]].NhTableState[:], make([]byte, 32)) + copy(http_PeerState[http_PeerID2Map[reg_msg.Node_id]].PeerInfoState[:], make([]byte, 32)) + } + PubKey := http_PeerID2Map[reg_msg.Node_id] + if peer := http_device4.LookupPeerByStr(PubKey); peer != nil { + if connstr := peer.GetEndpointDstStr(); connstr != "" { + http_PeerInfos.Peers[PubKey].Connurl[connstr] = true + } + } + if peer := http_device6.LookupPeerByStr(PubKey); peer != nil { + if connstr := peer.GetEndpointDstStr(); connstr != "" { + http_PeerInfos.Peers[PubKey].Connurl[connstr] = true + } + } + http_PeerInfoStr, _ = json.Marshal(&http_PeerInfos) + PeerInfo_hash_raw := md5.Sum(http_PeerInfoStr) + PeerInfo_hash_str := hex.EncodeToString(PeerInfo_hash_raw[:]) + PeerInfo_hash_str_byte := []byte(PeerInfo_hash_str) + if bytes.Equal(http_PeerInfo_hash[:], PeerInfo_hash_str_byte) == false { + copy(http_PeerInfo_hash[:], PeerInfo_hash_str_byte) + PushUpdate() + } + case <-events.Event_server_NhTable_changed: NhTable := graph.GetNHTable(false) NhTablestr, _ := json.Marshal(NhTable) md5_hash_raw := md5.Sum(http_NhTableStr) - new_hash := hex.EncodeToString(md5_hash_raw[:]) - if http_NhTable_Hash != new_hash { - http_NhTable_Hash = new_hash + new_hash_str := hex.EncodeToString(md5_hash_raw[:]) + new_hash_str_byte := []byte(new_hash_str) + copy(http_NhTable_Hash[:], new_hash_str_byte) + http_NhTableStr = NhTablestr + PushUpdate() + case pong_msg := <-events.Event_server_pong: + changed := graph.UpdateLentancy(pong_msg.Src_nodeID, pong_msg.Dst_nodeID, pong_msg.Timediff, true) + if changed { + NhTable := graph.GetNHTable(false) + NhTablestr, _ := json.Marshal(NhTable) + md5_hash_raw := md5.Sum(http_NhTableStr) + new_hash_str := hex.EncodeToString(md5_hash_raw[:]) + new_hash_str_byte := []byte(new_hash_str) + copy(http_NhTable_Hash[:], new_hash_str_byte) http_NhTableStr = NhTablestr - NhTable_Hash_fixbyte := [32]byte{} - copy(NhTable_Hash_fixbyte[:], []byte(http_NhTable_Hash)) - body, err := path.GetByte(path.UpdateNhTableMsg{ - State_hash: NhTable_Hash_fixbyte, - }) - if err != nil { - fmt.Println("Error get byte") - continue - } - buf := make([]byte, path.EgHeaderLen+len(body)) - header, _ := path.NewEgHeader(buf[:path.EgHeaderLen]) - header.SetDst(path.SuperNodeMessage) - header.SetPacketLength(uint16(len(body))) - header.SetSrc(path.SuperNodeMessage) - header.SetTTL(0) - header.SetUsage(path.UpdateNhTable) - copy(buf[path.EgHeaderLen:], body) - for pkstr, peerstate := range http_PeerState { - if peerstate.NhTableState != http_NhTable_Hash { - if peer := http_device4.LookupPeerByStr(pkstr); peer != nil { - http_device4.SendPacket(peer, buf, device.MessageTransportOffsetContent) - } - if peer := http_device6.LookupPeerByStr(pkstr); peer != nil { - http_device6.SendPacket(peer, buf, device.MessageTransportOffsetContent) - } - } - - } + PushNhTable() } } } } -func Event_server_register_hendler(raph path.IG, events path.SUPER_Events) { +func RoutinePushSettings(interval time.Duration) { for { - reg_msg := <-events.Event_server_register - PubKey := http_PeerID2Map[reg_msg.Node_id] - if peer := http_device4.LookupPeerByStr(PubKey); peer != nil { - if connstr := peer.GetEndpointDstStr(); connstr != "" { - http_PeerInfos.Peers[PubKey].Connurl[connstr] = true + time.Sleep(interval) + PushNhTable() + PushUpdate() + } +} + +func PushNhTable() { + body, err := path.GetByte(path.UpdateNhTableMsg{ + State_hash: http_NhTable_Hash, + }) + if err != nil { + fmt.Println("Error get byte") + return + } + buf := make([]byte, path.EgHeaderLen+len(body)) + header, _ := path.NewEgHeader(buf[:path.EgHeaderLen]) + header.SetDst(path.SuperNodeMessage) + header.SetPacketLength(uint16(len(body))) + header.SetSrc(path.SuperNodeMessage) + header.SetTTL(0) + header.SetUsage(path.UpdateNhTable) + copy(buf[path.EgHeaderLen:], body) + for pkstr, peerstate := range http_PeerState { + if peerstate.NhTableState != http_NhTable_Hash { + if peer := http_device4.LookupPeerByStr(pkstr); peer != nil { + http_device4.SendPacket(peer, buf, device.MessageTransportOffsetContent) + } + if peer := http_device6.LookupPeerByStr(pkstr); peer != nil { + http_device6.SendPacket(peer, buf, device.MessageTransportOffsetContent) } } - if peer := http_device6.LookupPeerByStr(PubKey); peer != nil { - if connstr := peer.GetEndpointDstStr(); connstr != "" { - http_PeerInfos.Peers[PubKey].Connurl[connstr] = true + } +} + +func PushUpdate() { + body, err := path.GetByte(path.UpdatePeerMsg{ + State_hash: http_PeerInfo_hash, + }) + if err != nil { + fmt.Println("Error get byte") + return + } + buf := make([]byte, path.EgHeaderLen+len(body)) + header, _ := path.NewEgHeader(buf[:path.EgHeaderLen]) + header.SetDst(path.SuperNodeMessage) + header.SetPacketLength(uint16(len(body))) + header.SetSrc(path.SuperNodeMessage) + header.SetTTL(0) + header.SetUsage(path.UpdatePeer) + copy(buf[path.EgHeaderLen:], body) + for pkstr, peerstate := range http_PeerState { + if peerstate.PeerInfoState != http_PeerInfo_hash { + if peer := http_device4.LookupPeerByStr(pkstr); peer != nil { + http_device4.SendPacket(peer, buf, device.MessageTransportOffsetContent) } - } - http_PeerInfoStr, _ = json.Marshal(&http_PeerInfos) - PeerInfo_hash_raw := md5.Sum(http_PeerInfoStr) - PeerInfo_hash := hex.EncodeToString(PeerInfo_hash_raw[:]) - http_PeerInfo_hash_fixbyte := [32]byte{} - copy(http_PeerInfo_hash_fixbyte[:], []byte(PeerInfo_hash)) - if http_PeerInfo_hash != PeerInfo_hash { - http_PeerInfo_hash = PeerInfo_hash - body, err := path.GetByte(path.UpdatePeerMsg{ - State_hash: http_PeerInfo_hash_fixbyte, - }) - if err != nil { - fmt.Println("Error get byte") - continue - } - buf := make([]byte, path.EgHeaderLen+len(body)) - header, _ := path.NewEgHeader(buf[:path.EgHeaderLen]) - header.SetDst(path.SuperNodeMessage) - header.SetPacketLength(uint16(len(body))) - header.SetSrc(path.SuperNodeMessage) - header.SetTTL(0) - header.SetUsage(path.UpdatePeer) - copy(buf[path.EgHeaderLen:], body) - for pkstr, peerstate := range http_PeerState { - if peerstate.PeerInfoState != PeerInfo_hash { - if peer := http_device4.LookupPeerByStr(pkstr); peer != nil { - http_device4.SendPacket(peer, buf, device.MessageTransportOffsetContent) - } - if peer := http_device6.LookupPeerByStr(pkstr); peer != nil { - http_device6.SendPacket(peer, buf, device.MessageTransportOffsetContent) - } - } + if peer := http_device6.LookupPeerByStr(pkstr); peer != nil { + http_device6.SendPacket(peer, buf, device.MessageTransportOffsetContent) } } } diff --git a/path/header.go b/path/header.go index 1801614..89eece2 100644 --- a/path/header.go +++ b/path/header.go @@ -7,7 +7,7 @@ import ( "github.com/KusakabeSi/EtherGuardVPN/config" ) -const EgHeaderLen = 12 +const EgHeaderLen = 16 type EgHeader struct { buf []byte @@ -72,3 +72,10 @@ func (e EgHeader) GetPacketLength() uint16 { func (e EgHeader) SetPacketLength(length uint16) { binary.BigEndian.PutUint16(e.buf[10:12], length) } + +func (e EgHeader) GetMessageID() uint32 { + return binary.BigEndian.Uint32(e.buf[12:16]) +} +func (e EgHeader) SetMessageID(MessageID uint32) { + binary.BigEndian.PutUint32(e.buf[12:16], MessageID) +} diff --git a/path/metamessage.go b/path/metamessage.go index 11cf2f1..03bfaf4 100644 --- a/path/metamessage.go +++ b/path/metamessage.go @@ -3,6 +3,7 @@ package path import ( "bytes" "encoding/gob" + "strconv" "time" "github.com/KusakabeSi/EtherGuardVPN/config" @@ -19,94 +20,125 @@ func GetByte(structIn interface{}) (bb []byte, err error) { } type RegisterMsg struct { - Node_id config.Vertex + Node_id config.Vertex `struc:"uint32"` + Init bool } -func ParseRegisterMsg(bin []byte) (RegisterMsg, error) { - var StructPlace RegisterMsg +func (c *RegisterMsg) ToString() string { + return "RegisterMsg Node_id:" + strconv.Itoa(int(c.Node_id)) +} + +func ParseRegisterMsg(bin []byte) (StructPlace RegisterMsg, err error) { var b bytes.Buffer + b.Write(bin) d := gob.NewDecoder(&b) - err := d.Decode(&StructPlace) - return StructPlace, err + err = d.Decode(&StructPlace) + return } type UpdatePeerMsg struct { - State_hash [32]byte + State_hash [32]byte `struc:"[32]uint8"` } -func ParseUpdatePeerMsg(bin []byte) (UpdatePeerMsg, error) { - var StructPlace UpdatePeerMsg +func (c *UpdatePeerMsg) ToString() string { + return "UpdatePeerMsg State_hash:" + string(c.State_hash[:]) +} + +func ParseUpdatePeerMsg(bin []byte) (StructPlace UpdatePeerMsg, err error) { var b bytes.Buffer + b.Write(bin) d := gob.NewDecoder(&b) - err := d.Decode(&StructPlace) - return StructPlace, err + err = d.Decode(&StructPlace) + return } type UpdateNhTableMsg struct { - State_hash [32]byte + State_hash [32]byte `struc:"[32]uint8"` } -func ParseUpdateNhTableMsg(bin []byte) (UpdateNhTableMsg, error) { - var StructPlace UpdateNhTableMsg +func (c *UpdateNhTableMsg) ToString() string { + return "UpdateNhTableMsg State_hash:" + string(c.State_hash[:]) +} + +func ParseUpdateNhTableMsg(bin []byte) (StructPlace UpdateNhTableMsg, err error) { var b bytes.Buffer + b.Write(bin) d := gob.NewDecoder(&b) - err := d.Decode(&StructPlace) - return StructPlace, err + err = d.Decode(&StructPlace) + return } type PingMsg struct { - Src_nodeID config.Vertex - Time time.Time + RequestID uint32 `struc:"uint32"` + Src_nodeID config.Vertex `struc:"uint32"` + Time time.Time `struc:"uint64"` } -func ParsePingMsg(bin []byte) (PingMsg, error) { - var StructPlace PingMsg +func (c *PingMsg) ToString() string { + return "PingMsg SID:" + strconv.Itoa(int(c.Src_nodeID)) + " Time:" + c.Time.String() + " RequestID:" + strconv.Itoa(int(c.RequestID)) +} + +func ParsePingMsg(bin []byte) (StructPlace PingMsg, err error) { var b bytes.Buffer + b.Write(bin) d := gob.NewDecoder(&b) - err := d.Decode(&StructPlace) - return StructPlace, err + err = d.Decode(&StructPlace) + return } type PongMsg struct { - Src_nodeID config.Vertex - Dst_nodeID config.Vertex - Timediff time.Duration + RequestID uint32 + Src_nodeID config.Vertex `struc:"uint32"` + Dst_nodeID config.Vertex `struc:"uint32"` + Timediff time.Duration `struc:"int64"` } -func ParsePongMsg(bin []byte) (PongMsg, error) { - var StructPlace PongMsg +func (c *PongMsg) ToString() string { + return "PongMsg SID:" + strconv.Itoa(int(c.Src_nodeID)) + " DID:" + strconv.Itoa(int(c.Dst_nodeID)) + " Timediff:" + c.Timediff.String() + " RequestID:" + strconv.Itoa(int(c.RequestID)) +} + +func ParsePongMsg(bin []byte) (StructPlace PongMsg, err error) { var b bytes.Buffer + b.Write(bin) d := gob.NewDecoder(&b) - err := d.Decode(&StructPlace) - return StructPlace, err + err = d.Decode(&StructPlace) + return } type RequestPeerMsg struct { - Request_ID uint32 + Request_ID uint32 `struc:"uint32"` } -func ParseRequestPeerMsg(bin []byte) (RequestPeerMsg, error) { - var StructPlace RequestPeerMsg +func (c *RequestPeerMsg) ToString() string { + return "RequestPeerMsg Request_ID:" + strconv.Itoa(int(c.Request_ID)) +} + +func ParseRequestPeerMsg(bin []byte) (StructPlace RequestPeerMsg, err error) { var b bytes.Buffer + b.Write(bin) d := gob.NewDecoder(&b) - err := d.Decode(&StructPlace) - return StructPlace, err + err = d.Decode(&StructPlace) + return } type BoardcastPeerMsg struct { - RequestID uint32 - NodeID config.Vertex - PubKey [32]byte - PSKey [32]byte - ConnURL string + Request_ID uint32 `struc:"uint32"` + NodeID config.Vertex `struc:"uint32"` + PubKey [32]byte `struc:"[32]uint8"` + PSKey [32]byte `struc:"[32]uint8"` + ConnURL string } -func ParseBoardcastPeerMsg(bin []byte) (BoardcastPeerMsg, error) { - var StructPlace BoardcastPeerMsg +func (c *BoardcastPeerMsg) ToString() string { + return "BoardcastPeerMsg Request_ID:" + strconv.Itoa(int(c.Request_ID)) + " NodeID:" + strconv.Itoa(int(c.NodeID)) + " ConnURL:" + c.ConnURL +} + +func ParseBoardcastPeerMsg(bin []byte) (StructPlace BoardcastPeerMsg, err error) { var b bytes.Buffer + b.Write(bin) d := gob.NewDecoder(&b) - err := d.Decode(&StructPlace) - return StructPlace, err + err = d.Decode(&StructPlace) + return } type SUPER_Events struct { diff --git a/path/path.go b/path/path.go index 1d51098..a6d2788 100644 --- a/path/path.go +++ b/path/path.go @@ -3,6 +3,7 @@ package path import ( "fmt" "math" + "sync" "time" "github.com/KusakabeSi/EtherGuardVPN/config" @@ -20,7 +21,7 @@ const ( ) func (g *IG) GetCurrentTime() time.Time { - return time.Now() + return time.Now().Round(0) } // A Graph is the interface implemented by graphs that @@ -44,7 +45,8 @@ type Fullroute struct { // IG is a graph of integers that satisfies the Graph interface. type IG struct { Vert map[config.Vertex]bool - Edges map[config.Vertex]map[config.Vertex]Latency + edges map[config.Vertex]map[config.Vertex]Latency + edgelock sync.RWMutex JitterTolerance float64 JitterToleranceMultiplier float64 NodeReportTimeout time.Duration @@ -62,7 +64,7 @@ func S2TD(secs float64) time.Duration { return time.Duration(secs * float64(time.Second)) } -func NewGraph(num_node int, IsSuperMode bool, theconfig config.GraphRecalculateSetting) IG { +func NewGraph(num_node int, IsSuperMode bool, theconfig config.GraphRecalculateSetting) *IG { g := IG{ JitterTolerance: theconfig.JitterTolerance, JitterToleranceMultiplier: theconfig.JitterToleranceMultiplier, @@ -70,10 +72,10 @@ func NewGraph(num_node int, IsSuperMode bool, theconfig config.GraphRecalculateS RecalculateCoolDown: S2TD(theconfig.RecalculateCoolDown), } g.Vert = make(map[config.Vertex]bool, num_node) - g.Edges = make(map[config.Vertex]map[config.Vertex]Latency, num_node) + g.edges = make(map[config.Vertex]map[config.Vertex]Latency, num_node) g.IsSuperMode = IsSuperMode - return g + return &g } func (g *IG) GetWeightType(x float64) float64 { @@ -91,7 +93,7 @@ func (g *IG) ShouldUpdate(u config.Vertex, v config.Vertex, newval float64) bool oldval := g.Weight(u, v) * 1000 newval *= 1000 if g.IsSuperMode { - return (oldval-newval)*(oldval*g.JitterToleranceMultiplier) <= g.JitterTolerance + return (oldval-newval)*(oldval*g.JitterToleranceMultiplier) >= g.JitterTolerance } else { return g.GetWeightType(oldval) == g.GetWeightType(newval) } @@ -100,16 +102,15 @@ func (g *IG) ShouldUpdate(u config.Vertex, v config.Vertex, newval float64) bool func (g *IG) RecalculateNhTable(checkchange bool) (changed bool) { if g.RecalculateTime.Add(g.RecalculateCoolDown).Before(time.Now()) { dist, next := FloydWarshall(g) + changed = false if checkchange { CheckLoop: for src, dsts := range next { - for dst, cost := range dsts { + for dst, old_next := range dsts { nexthop := g.Next(src, dst) - if nexthop != nil { - changed = cost == nexthop - if changed { - break CheckLoop - } + if old_next != nexthop { + changed = true + break CheckLoop } } } @@ -122,26 +123,32 @@ func (g *IG) RecalculateNhTable(checkchange bool) (changed bool) { } func (g *IG) UpdateLentancy(u, v config.Vertex, dt time.Duration, checkchange bool) (changed bool) { + g.edgelock.Lock() g.Vert[u] = true g.Vert[v] = true + g.edgelock.Unlock() w := float64(dt) / float64(time.Second) - if _, ok := g.Edges[u]; !ok { - g.Edges[u] = make(map[config.Vertex]Latency) + if _, ok := g.edges[u]; !ok { + g.edgelock.Lock() + g.edges[u] = make(map[config.Vertex]Latency) + g.edgelock.Unlock() } if g.ShouldUpdate(u, v, w) { changed = g.RecalculateNhTable(checkchange) } - g.Edges[u][v] = Latency{ + g.edgelock.Lock() + g.edges[u][v] = Latency{ ping: w, time: time.Now(), } + g.edgelock.Unlock() return } func (g IG) Vertices() map[config.Vertex]bool { return g.Vert } func (g IG) Neighbors(v config.Vertex) (vs []config.Vertex) { - for k := range g.Edges[v] { + for k := range g.edges[v] { vs = append(vs, k) } return vs @@ -158,17 +165,19 @@ func (g IG) Next(u, v config.Vertex) *config.Vertex { } func (g IG) Weight(u, v config.Vertex) float64 { - if _, ok := g.Edges[u]; !ok { - g.Edges[u] = make(map[config.Vertex]Latency) + if _, ok := g.edges[u]; !ok { + g.edgelock.Lock() + g.edges[u] = make(map[config.Vertex]Latency) + g.edgelock.Unlock() return Infinity } - if _, ok := g.Edges[u][v]; !ok { + if _, ok := g.edges[u][v]; !ok { return Infinity } - if time.Now().After(g.Edges[u][v].time.Add(g.NodeReportTimeout)) { + if time.Now().After(g.edges[u][v].time.Add(g.NodeReportTimeout)) { return Infinity } - return g.Edges[u][v].ping + return g.edges[u][v].ping } func FloydWarshall(g Graph) (dist config.DistTable, next config.NextHopTable) {