L2FIB Timeout

This commit is contained in:
KusakabeSi 2021-10-01 08:56:42 +00:00
parent 06b7ea1edb
commit b99b0254ea
25 changed files with 205 additions and 122 deletions

View File

@ -76,7 +76,8 @@ Usage of ./etherguard-go:
2. `nodeid`: NodeID. Must be unique in the whole Etherguard network.
3. `nodename`: Node Name.
4. `defaultttl`: Default TTL(etherguard layer. not affect ethernet layer)
4. `privkey`: Private key. Same spec as wireguard.
5. `l2fibtimeout`: The timeout(in seconds) of the MacAddr-> NodeID lookup table
5. `privkey`: Private key. Same spec as wireguard.
5. `listenport`: UDP lesten port
6. `loglevel`: Log Level
1. `loglevel`: `debug`,`error`,`slient` for wirefuard logger.

View File

@ -83,7 +83,8 @@ Usage of ./etherguard-go-vpp:
2. `nodeid`: 節點ID。節點之間辨識身分用的同一網路內節點ID不能重複
3. `nodename`: 節點名稱
4. `defaultttl`: 預設ttl(etherguard層使用和乙太層不共通)
4. `privkey`: 私鑰和wireguard規格一樣
5. `l2fibtimeout`: MacAddr-> NodeID 查找表的 timeout(秒)
5. `privkey`: 私鑰和wireguard規格一樣
5. `listenport`: 監聽的udp埠
6. `loglevel`: 紀錄log
1. `loglevel`: wireguard原本的log紀錄器的loglevel。

View File

@ -21,6 +21,7 @@ type EdgeConfig struct {
NodeID Vertex
NodeName string
DefaultTTL uint8
L2FIBTimeout float64
PrivKey string
ListenPort int
LogLevel LoggerInfo

View File

@ -96,6 +96,7 @@ type Device struct {
DefaultTTL uint8
graph *path.IG
l2fib sync.Map
fibTimeout float64
LogLevel config.LoggerInfo
DRoute config.DynamicRouteInfo
DupData fixed_time_cache.Cache
@ -123,6 +124,11 @@ type Device struct {
log *Logger
}
type IdAndTime struct {
ID config.Vertex
Time time.Time
}
// deviceState represents the state of a Device.
// There are three states: down, up, closed.
// Transitions:
@ -357,12 +363,14 @@ func NewDevice(tapDevice tap.Device, id config.Vertex, bind conn.Bind, logger *L
device.LogLevel = econfig.LogLevel
device.ResetConnInterval = device.EdgeConfig.ResetConnInterval
device.DefaultTTL = econfig.DefaultTTL
device.fibTimeout = econfig.L2FIBTimeout
go device.RoutineSetEndpoint()
go device.RoutineRegister()
go device.RoutineSendPing()
go device.RoutineRecalculateNhTable()
go device.RoutineSpreadAllMyNeighbor()
go device.RoutineResetConn()
go device.RoutineClearL2FIB()
}
// create queues

View File

@ -553,20 +553,34 @@ func (peer *Peer) RoutineSequentialReceiver() {
if should_receive { // Write message to tap device
if packet_type == path.NormalPacket {
if len(elem.packet) <= path.EgHeaderLen+12 {
device.log.Errorf("Invalid normal packet from peer %v", peer.ID.ToString())
device.log.Errorf("Invalid normal packet: Ethernet packet too small from peer %v", peer.ID.ToString())
goto skip
}
if device.LogLevel.LogNormal {
fmt.Println("Normal: Reveived Normal packet From:" + peer.GetEndpointDstStr() + " SrcID:" + src_nodeID.ToString() + " DstID:" + dst_nodeID.ToString() + " Len:" + strconv.Itoa(len(elem.packet)))
packet_len := len(elem.packet) - path.EgHeaderLen
fmt.Println("Normal: Reveived Normal packet From:" + peer.GetEndpointDstStr() + " SrcID:" + src_nodeID.ToString() + " DstID:" + dst_nodeID.ToString() + " Len:" + strconv.Itoa(packet_len))
packet := gopacket.NewPacket(elem.packet[path.EgHeaderLen:], layers.LayerTypeEthernet, gopacket.Default)
fmt.Println(packet.Dump())
}
src_macaddr := tap.GetSrcMacAddr(elem.packet[path.EgHeaderLen:])
if !tap.IsNotUnicast(src_macaddr) {
actual, loaded := device.l2fib.LoadOrStore(src_macaddr, src_nodeID)
if loaded {
if actual.(config.Vertex) != src_nodeID {
device.l2fib.Store(src_macaddr, src_nodeID) // Write to l2fib table
val, ok := device.l2fib.Load(src_macaddr)
if ok {
idtime := val.(*IdAndTime)
if idtime.ID != src_nodeID {
idtime.ID = src_nodeID
if device.LogLevel.LogNormal {
fmt.Printf("Normal: L2FIB [%v -> %v] updated.\n", src_macaddr.String(), src_nodeID)
}
}
idtime.Time = time.Now()
} else {
device.l2fib.Store(src_macaddr, &IdAndTime{
ID: src_nodeID,
Time: time.Now(),
}) // Write to l2fib table
if device.LogLevel.LogNormal {
fmt.Printf("Normal: L2FIB [%v -> %v] added.\n", src_macaddr.String(), src_nodeID)
}
}
}

View File

@ -17,6 +17,7 @@ import (
"github.com/KusakabeSi/EtherGuardVPN/config"
orderedmap "github.com/KusakabeSi/EtherGuardVPN/orderdmap"
"github.com/KusakabeSi/EtherGuardVPN/path"
"github.com/KusakabeSi/EtherGuardVPN/tap"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
)
@ -27,11 +28,19 @@ func (device *Device) SendPacket(peer *Peer, usage path.Usage, packet []byte, of
} else if peer.endpoint == nil {
return
}
if usage == path.NormalPacket && len(packet)-path.EgHeaderLen <= 12 {
if device.LogLevel.LogNormal {
fmt.Println("Normal: Invalid packet: Ethernet packet too small")
}
return
}
if device.LogLevel.LogNormal {
EgHeader, _ := path.NewEgHeader(packet[:path.EgHeaderLen])
if usage == path.NormalPacket {
if usage == path.NormalPacket && EgHeader.GetSrc() == device.ID {
dst_nodeID := EgHeader.GetDst()
fmt.Println("Normal: Send Normal packet To:" + peer.GetEndpointDstStr() + " SrcID:" + device.ID.ToString() + " DstID:" + dst_nodeID.ToString() + " Len:" + strconv.Itoa(len(packet)))
packet_len := len(packet) - path.EgHeaderLen
fmt.Println("Normal: Send Normal packet To:" + peer.GetEndpointDstStr() + " SrcID:" + device.ID.ToString() + " DstID:" + dst_nodeID.ToString() + " Len:" + strconv.Itoa(packet_len))
packet := gopacket.NewPacket(packet[path.EgHeaderLen:], layers.LayerTypeEthernet, gopacket.Default)
fmt.Println(packet.Dump())
}
@ -214,7 +223,37 @@ func (device *Device) sprint_received(msg_type path.Usage, body []byte) string {
}
return "BoardcastPeerMsg: Parse failed"
default:
return "UnknowMsg: Not a valid msg_type"
return "UnknownMsg: Not a valid msg_type"
}
}
func (device *Device) GeneratePingPacket(src_nodeID config.Vertex, request_reply int) ([]byte, path.Usage, error) {
body, err := path.GetByte(&path.PingMsg{
Src_nodeID: src_nodeID,
Time: device.graph.GetCurrentTime(),
RequestReply: request_reply,
})
if err != nil {
return nil, path.PingPacket, err
}
buf := make([]byte, path.EgHeaderLen+len(body))
header, _ := path.NewEgHeader(buf[0:path.EgHeaderLen])
if err != nil {
return nil, path.PingPacket, err
}
header.SetDst(config.ControlMessage)
header.SetTTL(0)
header.SetSrc(device.ID)
header.SetPacketLength(uint16(len(body)))
copy(buf[path.EgHeaderLen:], body)
return buf, path.PingPacket, nil
}
func (device *Device) SendPing(peer *Peer, times int, replies int, interval float64) {
for i := 0; i < times; i++ {
packet, usage, _ := device.GeneratePingPacket(device.ID, replies)
device.SendPacket(peer, usage, packet, MessageTransportOffsetContent)
time.Sleep(path.S2TD(interval))
}
}
@ -248,7 +287,7 @@ func (device *Device) server_process_RegisterMsg(peer *Peer, content path.Regist
Node_id: peer.ID,
Action: path.Shutdown,
ErrorCode: 400,
ErrorMsg: "Your version is not match with our version: " + device.Version,
ErrorMsg: fmt.Sprintf("Your version: \"%v\" is not compatible with our version: \"%v\"", content.Version, device.Version),
}
}
if UpdateErrorMsg.Action != path.NoAction {
@ -312,14 +351,6 @@ func (device *Device) process_ping(peer *Peer, content path.PingMsg) error {
return nil
}
func (device *Device) SendPing(peer *Peer, times int, replies int, interval float64) {
for i := 0; i < times; i++ {
packet, usage, _ := device.GeneratePingPacket(device.ID, replies)
device.SendPacket(peer, usage, packet, MessageTransportOffsetContent)
time.Sleep(path.S2TD(interval))
}
}
func (device *Device) process_pong(peer *Peer, content path.PongMsg) error {
if device.DRoute.P2P.UseP2P {
if time.Now().After(device.graph.NhTableExpire) {
@ -531,6 +562,78 @@ func (device *Device) process_UpdateErrorMsg(peer *Peer, content path.UpdateErro
return nil
}
func (device *Device) process_RequestPeerMsg(content path.QueryPeerMsg) error { //Send all my peers to all my peers
if device.DRoute.P2P.UseP2P {
device.peers.RLock()
for pubkey, peer := range device.peers.keyMap {
if peer.ID >= config.Special_NodeID {
continue
}
if peer.endpoint == nil {
continue
}
peer.handshake.mutex.RLock()
response := path.BoardcastPeerMsg{
Request_ID: content.Request_ID,
NodeID: peer.ID,
PubKey: pubkey,
ConnURL: peer.endpoint.DstToString(),
}
peer.handshake.mutex.RUnlock()
body, err := path.GetByte(response)
if err != nil {
device.log.Errorf("Error at receivesendproc.go line221: ", err)
continue
}
buf := make([]byte, path.EgHeaderLen+len(body))
header, _ := path.NewEgHeader(buf[0:path.EgHeaderLen])
header.SetDst(config.ControlMessage)
header.SetTTL(device.DefaultTTL)
header.SetSrc(device.ID)
header.SetPacketLength(uint16(len(body)))
copy(buf[path.EgHeaderLen:], body)
device.SpreadPacket(make(map[config.Vertex]bool), path.BroadcastPeer, buf, MessageTransportOffsetContent)
}
device.peers.RUnlock()
}
return nil
}
func (device *Device) process_BoardcastPeerMsg(peer *Peer, content path.BoardcastPeerMsg) error {
if device.DRoute.P2P.UseP2P {
var pk NoisePublicKey
if content.Request_ID == uint32(device.ID) {
peer.AskedForNeighbor = true
}
if bytes.Equal(content.PubKey[:], device.staticIdentity.publicKey[:]) {
return nil
}
copy(pk[:], content.PubKey[:])
thepeer := device.LookupPeer(pk)
if thepeer == nil { //not exist in local
if device.LogLevel.LogControl {
fmt.Println("Control: Add new peer to local ID:" + content.NodeID.ToString() + " PubKey:" + pk.ToString())
}
if device.graph.Weight(device.ID, content.NodeID) == path.Infinity { // add node to graph
device.graph.UpdateLentancy(device.ID, content.NodeID, path.S2TD(path.Infinity), true, false)
}
if device.graph.Weight(content.NodeID, device.ID) == path.Infinity { // add node to graph
device.graph.UpdateLentancy(content.NodeID, device.ID, path.S2TD(path.Infinity), true, false)
}
device.NewPeer(pk, content.NodeID, false)
}
if thepeer.LastPingReceived.Add(path.S2TD(device.DRoute.PeerAliveTimeout)).Before(time.Now()) {
//Peer died, try to switch to this new endpoint
thepeer.Lock()
thepeer.endpoint_trylist.Set(content.ConnURL, time.Time{}) //another gorouting will process it
thepeer.Unlock()
device.event_tryendpoint <- struct{}{}
}
}
return nil
}
func (device *Device) RoutineSetEndpoint() {
if !(device.DRoute.P2P.UseP2P || device.DRoute.SuperNode.UseSuperNode) {
return
@ -700,96 +803,23 @@ func (device *Device) RoutineResetConn() {
}
}
func (device *Device) GeneratePingPacket(src_nodeID config.Vertex, request_reply int) ([]byte, path.Usage, error) {
body, err := path.GetByte(&path.PingMsg{
Src_nodeID: src_nodeID,
Time: device.graph.GetCurrentTime(),
RequestReply: request_reply,
func (device *Device) RoutineClearL2FIB() {
if device.fibTimeout <= 0.01 {
return
}
timeout := path.S2TD(device.fibTimeout)
for {
device.l2fib.Range(func(k interface{}, v interface{}) bool {
val := v.(*IdAndTime)
if time.Now().After(val.Time.Add(timeout)) {
mac := k.(tap.MacAddress)
device.l2fib.Delete(k)
if device.LogLevel.LogNormal {
fmt.Printf("Normal: L2FIB [%v -> %v] deleted.\n", mac.String(), val.ID)
}
}
return true
})
if err != nil {
return nil, path.PingPacket, err
time.Sleep(timeout)
}
buf := make([]byte, path.EgHeaderLen+len(body))
header, _ := path.NewEgHeader(buf[0:path.EgHeaderLen])
if err != nil {
return nil, path.PingPacket, err
}
header.SetDst(config.ControlMessage)
header.SetTTL(0)
header.SetSrc(device.ID)
header.SetPacketLength(uint16(len(body)))
copy(buf[path.EgHeaderLen:], body)
return buf, path.PingPacket, nil
}
func (device *Device) process_RequestPeerMsg(content path.QueryPeerMsg) error { //Send all my peers to all my peers
if device.DRoute.P2P.UseP2P {
device.peers.RLock()
for pubkey, peer := range device.peers.keyMap {
if peer.ID >= config.Special_NodeID {
continue
}
if peer.endpoint == nil {
continue
}
peer.handshake.mutex.RLock()
response := path.BoardcastPeerMsg{
Request_ID: content.Request_ID,
NodeID: peer.ID,
PubKey: pubkey,
ConnURL: peer.endpoint.DstToString(),
}
peer.handshake.mutex.RUnlock()
body, err := path.GetByte(response)
if err != nil {
device.log.Errorf("Error at receivesendproc.go line221: ", err)
continue
}
buf := make([]byte, path.EgHeaderLen+len(body))
header, _ := path.NewEgHeader(buf[0:path.EgHeaderLen])
header.SetDst(config.ControlMessage)
header.SetTTL(device.DefaultTTL)
header.SetSrc(device.ID)
header.SetPacketLength(uint16(len(body)))
copy(buf[path.EgHeaderLen:], body)
device.SpreadPacket(make(map[config.Vertex]bool), path.BroadcastPeer, buf, MessageTransportOffsetContent)
}
device.peers.RUnlock()
}
return nil
}
func (device *Device) process_BoardcastPeerMsg(peer *Peer, content path.BoardcastPeerMsg) error {
if device.DRoute.P2P.UseP2P {
var pk NoisePublicKey
if content.Request_ID == uint32(device.ID) {
peer.AskedForNeighbor = true
}
if bytes.Equal(content.PubKey[:], device.staticIdentity.publicKey[:]) {
return nil
}
copy(pk[:], content.PubKey[:])
thepeer := device.LookupPeer(pk)
if thepeer == nil { //not exist in local
if device.LogLevel.LogControl {
fmt.Println("Control: Add new peer to local ID:" + content.NodeID.ToString() + " PubKey:" + pk.ToString())
}
if device.graph.Weight(device.ID, content.NodeID) == path.Infinity { // add node to graph
device.graph.UpdateLentancy(device.ID, content.NodeID, path.S2TD(path.Infinity), true, false)
}
if device.graph.Weight(content.NodeID, device.ID) == path.Infinity { // add node to graph
device.graph.UpdateLentancy(content.NodeID, device.ID, path.S2TD(path.Infinity), true, false)
}
device.NewPeer(pk, content.NodeID, false)
}
if thepeer.LastPingReceived.Add(path.S2TD(device.DRoute.PeerAliveTimeout)).Before(time.Now()) {
//Peer died, try to switch to this new endpoint
thepeer.Lock()
thepeer.endpoint_trylist.Set(content.ConnURL, time.Time{}) //another gorouting will process it
thepeer.Unlock()
device.event_tryendpoint <- struct{}{}
}
}
return nil
}

View File

@ -260,13 +260,20 @@ func (device *Device) RoutineReadFromTUN() {
} else if val, ok := device.l2fib.Load(dstMacAddr); !ok { //Lookup failed
dst_nodeID = config.Broadcast
} else {
dst_nodeID = val.(config.Vertex)
dst_nodeID = val.(*IdAndTime).ID
}
packet_len := len(elem.packet) - path.EgHeaderLen
EgBody.SetSrc(device.ID)
EgBody.SetDst(dst_nodeID)
EgBody.SetPacketLength(uint16(len(elem.packet) - path.EgHeaderLen))
EgBody.SetPacketLength(uint16(packet_len))
EgBody.SetTTL(device.DefaultTTL)
elem.Type = path.NormalPacket
if packet_len <= 12 {
if device.LogLevel.LogNormal {
fmt.Println("Normal: Invalid packet: Ethernet packet too small." + " Len:" + strconv.Itoa(packet_len))
}
continue
}
if dst_nodeID != config.Broadcast {
var peer *Peer
@ -279,7 +286,7 @@ func (device *Device) RoutineReadFromTUN() {
continue
}
if device.LogLevel.LogNormal {
fmt.Println("Normal: Send Normal packet To:" + peer.GetEndpointDstStr() + " SrcID:" + device.ID.ToString() + " DstID:" + dst_nodeID.ToString() + " Len:" + strconv.Itoa(len(elem.packet)))
fmt.Println("Normal: Send packet To:" + peer.GetEndpointDstStr() + " SrcID:" + device.ID.ToString() + " DstID:" + dst_nodeID.ToString() + " Len:" + strconv.Itoa(len(elem.packet)))
packet := gopacket.NewPacket(elem.packet[path.EgHeaderLen:], layers.LayerTypeEthernet, gopacket.Default)
fmt.Println(packet.Dump())
}

View File

@ -11,6 +11,7 @@ interface:
nodeid: 1
nodename: Node01
defaultttl: 200
l2fibtimeout: 3600
privkey: aABzjKhWdkFfQ29ZuijtMp1h1TNJe66SDCwvfmvQznw=
listenport: 3001
loglevel:

View File

@ -11,6 +11,7 @@ interface:
nodeid: 2
nodename: Node02
defaultttl: 200
l2fibtimeout: 3600
privkey: UNZMzPX5fG/8yGC8edVj/ksF9N6ARRqdq7fqE/PD7ls=
listenport: 3002
loglevel:

View File

@ -11,6 +11,7 @@ interface:
nodeid: 3
nodename: Node03
defaultttl: 200
l2fibtimeout: 3600
privkey: gJy35nbsd8FuuxyWHjsefN+U+oM7RkuIB1EanNLSVHg=
listenport: 3003
loglevel:

View File

@ -11,6 +11,7 @@ interface:
nodeid: 4
nodename: Node04
defaultttl: 200
l2fibtimeout: 3600
privkey: wAdLgCk0SHiO11/aUf9944focD1BUCH5b6Pe+cRHHXQ=
listenport: 3004
loglevel:

View File

@ -11,6 +11,7 @@ interface:
nodeid: 5
nodename: Node05
defaultttl: 200
l2fibtimeout: 3600
privkey: gLmzeCbmN/hjiE+ehNXL9IxuG9hhWIYv2s16/DOW6FE=
listenport: 3005
loglevel:

View File

@ -11,6 +11,7 @@ interface:
nodeid: 6
nodename: Node06
defaultttl: 200
l2fibtimeout: 3600
privkey: IIX5F6oWZUS2dlhxWFJ7TxdJtDCr5jzeuhxUB6YM7Us=
listenport: 3006
loglevel:

View File

@ -11,10 +11,11 @@ interface:
nodeid: 1
nodename: Node01
defaultttl: 200
l2fibtimeout: 3600
privkey: aABzjKhWdkFfQ29ZuijtMp1h1TNJe66SDCwvfmvQznw=
listenport: 3001
loglevel:
loglevel: normal
loglevel: error
logtransit: true
logcontrol: true
lognormal: true

View File

@ -11,10 +11,11 @@ interface:
nodeid: 2
nodename: Node02
defaultttl: 200
l2fibtimeout: 3600
privkey: UNZMzPX5fG/8yGC8edVj/ksF9N6ARRqdq7fqE/PD7ls=
listenport: 3002
loglevel:
loglevel: normal
loglevel: error
logtransit: true
logcontrol: true
lognormal: true

View File

@ -11,10 +11,11 @@ interface:
nodeid: 3
nodename: Node03
defaultttl: 200
l2fibtimeout: 3600
privkey: gJy35nbsd8FuuxyWHjsefN+U+oM7RkuIB1EanNLSVHg=
listenport: 3003
loglevel:
loglevel: normal
loglevel: error
logtransit: true
logcontrol: true
lognormal: true

View File

@ -11,10 +11,11 @@ interface:
nodeid: 4
nodename: Node04
defaultttl: 200
l2fibtimeout: 3600
privkey: wAdLgCk0SHiO11/aUf9944focD1BUCH5b6Pe+cRHHXQ=
listenport: 3004
loglevel:
loglevel: normal
loglevel: error
logtransit: true
logcontrol: true
lognormal: true

View File

@ -11,10 +11,11 @@ interface:
nodeid: 5
nodename: Node05
defaultttl: 200
l2fibtimeout: 3600
privkey: gLmzeCbmN/hjiE+ehNXL9IxuG9hhWIYv2s16/DOW6FE=
listenport: 3005
loglevel:
loglevel: normal
loglevel: error
logtransit: true
logcontrol: true
lognormal: true

View File

@ -11,10 +11,11 @@ interface:
nodeid: 6
nodename: Node06
defaultttl: 200
l2fibtimeout: 3600
privkey: IIX5F6oWZUS2dlhxWFJ7TxdJtDCr5jzeuhxUB6YM7Us=
listenport: 3006
loglevel:
loglevel: normal
loglevel: error
logtransit: true
logcontrol: true
lognormal: true

View File

@ -290,6 +290,7 @@ Parameter:
1. nodeid: Node ID
1. pubkey: Public Key
1. pskey: Pre shared Key
1. nexthoptable: If the `graphrecalculatesetting` of your super node is in static mode, you need to provide a new `NextHopTable` in json format in this parameter.
Return value:
1. http code != 200: Error reason

View File

@ -296,6 +296,7 @@ curl -X POST "http://127.0.0.1:3000/api/peer/add?Password=passwd_addpeer" \
1. nodeid: Node ID
1. pubkey: Public Key
1. pskey: Preshared Key
1. nexthoptable: 如果你的super node的`graphrecalculatesetting`是static mode那麼你需要在這提供一張新的`NextHopTable`json格式
返回值:
1. http code != 200: 出錯原因

View File

@ -11,6 +11,7 @@ interface:
nodeid: 1
nodename: Node01
defaultttl: 200
l2fibtimeout: 3600
privkey: 6GyDagZKhbm5WNqMiRHhkf43RlbMJ34IieTlIuvfJ1M=
listenport: 3001
loglevel:

View File

@ -11,6 +11,7 @@ interface:
nodeid: 2
nodename: Node02
defaultttl: 200
l2fibtimeout: 3600
privkey: OH8BsVUU2Rqzeu9B2J5GPG8PUmxWfX8uVvNFZKhVF3o=
listenport: 3002
loglevel:

View File

@ -45,7 +45,7 @@ func printExampleEdgeConf() {
PrivKey: "6GyDagZKhbm5WNqMiRHhkf43RlbMJ34IieTlIuvfJ1M=",
ListenPort: 3001,
LogLevel: config.LoggerInfo{
LogLevel: "normal",
LogLevel: "error",
LogTransit: true,
LogControl: true,
LogNormal: true,

View File

@ -8,12 +8,17 @@ package tap
import (
"encoding/binary"
"errors"
"net"
"strconv"
"strings"
)
type Event int
type MacAddress [6]uint8
type MacAddress [6]byte
func (mac *MacAddress) String() string {
return net.HardwareAddr((*mac)[:]).String()
}
func GetDstMacAddr(packet []byte) (dstMacAddr MacAddress) {
copy(dstMacAddr[:], packet[0:6])