DampingResistance to DampingFilterRadius

This commit is contained in:
Kusakabe Si 2021-12-26 16:06:25 +00:00
parent 78322227ef
commit cdb096b8e4
27 changed files with 151 additions and 77 deletions

View File

@ -372,7 +372,7 @@ func NewDevice(tapDevice tap.Device, id mtypes.Vertex, bind conn.Bind, logger *L
device.Chan_SendRegisterStart = make(chan struct{}, 1<<5)
device.Chan_HttpPostStart = make(chan struct{}, 1<<5)
device.LogLevel = econfig.LogLevel
device.SuperConfig.DampingResistance = device.EdgeConfig.DynamicRoute.DampingResistance
device.SuperConfig.DampingFilterRadius = device.EdgeConfig.DynamicRoute.DampingFilterRadius
}

View File

@ -11,7 +11,9 @@ import (
"errors"
"fmt"
"io/ioutil"
"math"
"net"
"sort"
"sync"
"sync/atomic"
"time"
@ -155,6 +157,96 @@ func (et *endpoint_trylist) GetNextTry() (bool, string) {
return FastTry, smallest.URL
}
type filterwindow struct {
sync.RWMutex
device *Device
size int
element []float64
value float64
}
func (f *filterwindow) Push(e float64) float64 {
f.Resize(f.device.SuperConfig.DampingFilterRadius*2 + 1)
f.Lock()
defer f.Unlock()
if f.size < 3 || e >= mtypes.Infinity {
f.value = e
return f.value
}
f.element = append(f.element, e)
if len(f.element) > f.size {
f.element = f.element[1:]
}
elemlen := len(f.element)
window := f.element
if elemlen%2 == 0 {
window = window[1:]
elemlen -= 1
}
if elemlen < 3 {
f.value = e
return f.value
}
pivot := ((elemlen + 1) / 2) - 1
left := window[:pivot+1]
right := window[pivot:]
lm := f.filter(left, 1)
rm := f.filter(right, 2)
pv := window[pivot]
ldiff := math.Abs(lm - pv)
rdiff := math.Abs(rm - pv)
if ldiff < rdiff {
f.value = lm
} else {
f.value = rm
}
return f.value
}
func (f *filterwindow) filter(w []float64, lr int) float64 { // find the medium
elemlen := len(w)
if elemlen == 0 {
return mtypes.Infinity
}
if elemlen%2 == 0 {
switch lr {
case 1:
w = w[:len(w)-1]
case 2:
w = w[1:]
}
elemlen -= 1
}
if elemlen < 3 {
return w[0]
}
pivot := ((elemlen + 1) / 2) - 1
w2 := make([]float64, elemlen)
copy(w2, w)
sort.Float64s(w2)
return w2[pivot]
}
func (f *filterwindow) Resize(s uint64) {
size := int(s)
f.Lock()
defer f.Unlock()
if f.size == size {
return
}
f.size = size
elemlen := len(f.element)
if elemlen > f.size {
f.element = f.element[elemlen-size:]
}
}
func (f *filterwindow) GetVal() float64 {
f.RLock()
defer f.RUnlock()
return f.value
}
type Peer struct {
isRunning AtomicBool
sync.RWMutex // Mostly protects endpoint, but is generally taken whenever we modify peer
@ -166,8 +258,9 @@ type Peer struct {
LastPacketReceivedAdd1Sec atomic.Value // *time.Time
SingleWayLatency atomic.Value
stopping sync.WaitGroup // routines pending stop
SingleWayLatency filterwindow
stopping sync.WaitGroup // routines pending stop
ID mtypes.Vertex
AskedForNeighbor bool
@ -257,7 +350,8 @@ func (device *Device) NewPeer(pk NoisePublicKey, id mtypes.Vertex, isSuper bool,
peer.cookieGenerator.Init(pk)
peer.device = device
peer.endpoint_trylist = NewEndpoint_trylist(peer, mtypes.S2TD(device.EdgeConfig.DynamicRoute.PeerAliveTimeout))
peer.SingleWayLatency.Store(mtypes.Infinity)
peer.SingleWayLatency.device = device
peer.SingleWayLatency.Push(mtypes.Infinity)
peer.queue.outbound = newAutodrainingOutboundQueue(device)
peer.queue.inbound = newAutodrainingInboundQueue(device)
peer.queue.staged = make(chan *QueueOutboundElement, QueueStagedSize)

View File

@ -324,13 +324,7 @@ func (device *Device) server_process_Pong(peer *Peer, content mtypes.PongMsg) er
func (device *Device) process_ping(peer *Peer, content mtypes.PingMsg) error {
Timediff := device.graph.GetCurrentTime().Sub(content.Time).Seconds()
OldTimediff := peer.SingleWayLatency.Load().(float64)
NewTimediff := Timediff
if (OldTimediff < mtypes.Infinity) == (NewTimediff < mtypes.Infinity) {
DR := device.SuperConfig.DampingResistance
NewTimediff = OldTimediff*DR + Timediff*(1-DR)
}
peer.SingleWayLatency.Store(NewTimediff)
NewTimediff := peer.SingleWayLatency.Push(Timediff)
PongMSG := mtypes.PongMsg{
Src_nodeID: content.Src_nodeID,
@ -627,9 +621,7 @@ func (device *Device) process_UpdateSuperParamsMsg(peer *Peer, State_hash string
device.EdgeConfig.DynamicRoute.PeerAliveTimeout = SuperParams.PeerAliveTimeout
device.EdgeConfig.DynamicRoute.SendPingInterval = SuperParams.SendPingInterval
device.SuperConfig.HttpPostInterval = SuperParams.HttpPostInterval
if SuperParams.DampingResistance > 0 && SuperParams.DampingResistance <= 1 {
device.SuperConfig.DampingResistance = SuperParams.DampingResistance
}
device.SuperConfig.DampingFilterRadius = SuperParams.DampingFilterRadius
device.Chan_SendPingStart <- struct{}{}
device.Chan_HttpPostStart <- struct{}{}
if SuperParams.AdditionalCost >= 0 {
@ -914,7 +906,7 @@ func (device *Device) RoutinePostPeerInfo(startchan <-chan struct{}) {
RequestID: 0,
Src_nodeID: id,
Dst_nodeID: device.ID,
Timediff: peer.SingleWayLatency.Load().(float64),
Timediff: peer.SingleWayLatency.GetVal(),
TimeToAlive: time.Since(*peer.LastPacketReceivedAdd1Sec.Load().(*time.Time)).Seconds() + device.EdgeConfig.DynamicRoute.PeerAliveTimeout,
}
pongs = append(pongs, pong)

View File

@ -32,7 +32,7 @@ DynamicRoute:
ConnNextTry: 5
DupCheckTimeout: 40
AdditionalCost: 1000
DampingResistance: 0.95
DampingFilterRadius: 4
SaveNewPeers: false
SuperNode:
UseSuperNode: false

View File

@ -32,7 +32,7 @@ DynamicRoute:
ConnNextTry: 5
DupCheckTimeout: 40
AdditionalCost: 1000
DampingResistance: 0.95
DampingFilterRadius: 4
SaveNewPeers: false
SuperNode:
UseSuperNode: false

View File

@ -32,7 +32,7 @@ DynamicRoute:
ConnNextTry: 5
DupCheckTimeout: 40
AdditionalCost: 1000
DampingResistance: 0.95
DampingFilterRadius: 4
SaveNewPeers: false
SuperNode:
UseSuperNode: false

View File

@ -32,7 +32,7 @@ DynamicRoute:
ConnNextTry: 5
DupCheckTimeout: 40
AdditionalCost: 1000
DampingResistance: 0.95
DampingFilterRadius: 4
SaveNewPeers: false
SuperNode:
UseSuperNode: false

View File

@ -32,7 +32,7 @@ DynamicRoute:
ConnNextTry: 5
DupCheckTimeout: 40
AdditionalCost: 1000
DampingResistance: 0.95
DampingFilterRadius: 4
SaveNewPeers: false
SuperNode:
UseSuperNode: false

View File

@ -32,7 +32,7 @@ DynamicRoute:
ConnNextTry: 5
DupCheckTimeout: 40
AdditionalCost: 1000
DampingResistance: 0.95
DampingFilterRadius: 4
SaveNewPeers: false
SuperNode:
UseSuperNode: false

View File

@ -32,7 +32,7 @@ DynamicRoute:
ConnNextTry: 5
DupCheckTimeout: 40
AdditionalCost: 10
DampingResistance: 0.9
DampingFilterRadius: 4
SaveNewPeers: true
SuperNode:
UseSuperNode: false

View File

@ -32,7 +32,7 @@ DynamicRoute:
ConnNextTry: 5
DupCheckTimeout: 40
AdditionalCost: 10
DampingResistance: 0.9
DampingFilterRadius: 4
SaveNewPeers: true
SuperNode:
UseSuperNode: false

View File

@ -32,7 +32,7 @@ DynamicRoute:
ConnNextTry: 5
DupCheckTimeout: 40
AdditionalCost: 10
DampingResistance: 0.9
DampingFilterRadius: 4
SaveNewPeers: true
SuperNode:
UseSuperNode: false

View File

@ -32,7 +32,7 @@ DynamicRoute:
ConnNextTry: 5
DupCheckTimeout: 40
AdditionalCost: 10
DampingResistance: 0.9
DampingFilterRadius: 4
SaveNewPeers: true
SuperNode:
UseSuperNode: false

View File

@ -32,7 +32,7 @@ DynamicRoute:
ConnNextTry: 5
DupCheckTimeout: 40
AdditionalCost: 10
DampingResistance: 0.9
DampingFilterRadius: 4
SaveNewPeers: true
SuperNode:
UseSuperNode: false

View File

@ -32,7 +32,7 @@ DynamicRoute:
ConnNextTry: 5
DupCheckTimeout: 40
AdditionalCost: 10
DampingResistance: 0.9
DampingFilterRadius: 4
SaveNewPeers: true
SuperNode:
UseSuperNode: false

View File

@ -33,7 +33,7 @@ DynamicRoute:
ConnNextTry: 5
DupCheckTimeout: 40
AdditionalCost: 10
DampingResistance: 0.9
DampingFilterRadius: 4
SaveNewPeers: true
SuperNode:
UseSuperNode: true

View File

@ -33,7 +33,7 @@ DynamicRoute:
ConnNextTry: 5
DupCheckTimeout: 40
AdditionalCost: 10
DampingResistance: 0.9
DampingFilterRadius: 4
SaveNewPeers: true
SuperNode:
UseSuperNode: true

View File

@ -33,7 +33,7 @@ DynamicRoute:
ConnNextTry: 5
DupCheckTimeout: 40
AdditionalCost: 10
DampingResistance: 0.9
DampingFilterRadius: 4
SaveNewPeers: true
SuperNode:
UseSuperNode: true

View File

@ -10,7 +10,7 @@ RePushConfigInterval: 30
HttpPostInterval: 50
PeerAliveTimeout: 70
SendPingInterval: 15
DampingResistance: 0.9
DampingFilterRadius: 4
LogLevel:
LogLevel: error
LogTransit: false

View File

@ -268,7 +268,7 @@ curl -X POST "http://127.0.0.1:3456/eg_net/eg_api/manage/peer/update?Password=pa
```bash
curl -X POST "http://127.0.0.1:3456/eg_net/eg_api/manage/super/update?Password=passwd_updatesuper" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "SendPingInterval=15&HttpPostInterval=60&PeerAliveTimeout=70&DampingResistance=0.9"
-d "SendPingInterval=15&HttpPostInterval=60&PeerAliveTimeout=70&DampingFilterRadius=3"
```
@ -311,7 +311,7 @@ StaticMode | Disable `Floyd-Warshall`, use `NextHopTable`in the
ManualLatency | Set latency manually, ignore Edge reported latency.
JitterTolerance | Jitter tolerance, after receiving Pong, one 37ms and one 39ms will not trigger recalculation<br>Compared to last calculation
JitterToleranceMultiplier | high ping allows more errors<br>https://www.desmos.com/calculator/raoti16r5n
DampingResistance | Damping resistance<br>`latency = latency_old * resistance + latency_in * (1-resistance)`
DampingFilterRadius | Windows radius for the low pass filter for latency damping prevention
TimeoutCheckInterval | The interval to check if there any `Pong` packet timed out, and recalculate the NhTable
RecalculateCoolDown | Floyd-Warshal is an O(n^3)time complexity algorithm<br>This option set a cooldown, and prevent it cost too many CPU<br>Connect/Disconnect event ignores this cooldown.

View File

@ -283,7 +283,7 @@ curl -X POST "http://127.0.0.1:3456/eg_net/eg_api/manage/peer/update?Password=pa
```bash
curl -X POST "http://127.0.0.1:3456/eg_net/eg_api/manage/super/update?Password=passwd_updatesuper" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "SendPingInterval=15&HttpPostInterval=60&PeerAliveTimeout=70&DampingResistance=0.9"
-d "SendPingInterval=15&HttpPostInterval=60&PeerAliveTimeout=70&DampingFilterRadius=3"
```
### SuperNode Config Parameter
@ -324,7 +324,7 @@ StaticMode | 關閉`Floyd-Warshall`演算法,只使用設定
ManualLatency | 手動設定延遲不採用EdgeNode回報的延遲(單位: 毫秒)
JitterTolerance | 抖動容許誤差收到Pong以後一個37ms一個39ms不會觸發重新計算<br>比較對象是上次更新使用的值。如果37 37 41 43 .. 100 ,每次變動一點點,總變動量超過域值還是會更新
JitterToleranceMultiplier | 抖動容許誤差的放大係數高ping的話允許更多誤差<br>https://www.desmos.com/calculator/raoti16r5n
DampingResistance | 防抖阻尼系數<br>`latency = latency_old * resistance + latency_in * (1-resistance)`
DampingFilterRadius | 防抖用低通濾波器的window半徑
TimeoutCheckInterval | 週期性檢查節點的連線狀況,是否斷線需要重新規劃線路
RecalculateCoolDown | Floyd-Warshal是O(n^3)時間複雜度,不能太常算。<br>設個冷卻時間<br>有節點加入/斷線觸發的重新計算無視這個CoolDown

View File

@ -32,7 +32,7 @@ DynamicRoute:
ConnNextTry: 5
DupCheckTimeout: 40
AdditionalCost: 10
DampingResistance: 0.9
DampingFilterRadius: 4
SaveNewPeers: true
SuperNode:
UseSuperNode: true

View File

@ -60,7 +60,7 @@ func GetExampleEdgeConf(templatePath string, getDemo bool) (mtypes.EdgeConfig, e
TimeoutCheckInterval: 20,
ConnNextTry: 5,
AdditionalCost: 10,
DampingResistance: 0.95,
DampingFilterRadius: 4,
SaveNewPeers: true,
SuperNode: mtypes.SuperInfo{
UseSuperNode: true,
@ -204,7 +204,7 @@ func GetExampleSuperConf(templatePath string, getDemo bool) (mtypes.SuperConfig,
},
RePushConfigInterval: 30,
PeerAliveTimeout: 70,
DampingResistance: 0.9,
DampingFilterRadius: 4,
HttpPostInterval: 50,
SendPingInterval: 15,
ResetEndPointInterval: 600,

View File

@ -52,9 +52,6 @@ func Edge(configPath string, useUAPI bool, printExample bool, bindmode string) (
if len(NodeName) > 32 {
return errors.New("Node name can't longer than 32 :" + NodeName)
}
if econfig.DynamicRoute.DampingResistance < 0 || econfig.DynamicRoute.DampingResistance >= 1 {
return fmt.Errorf("DampingResistance must in range [0,1) : %v", econfig.DynamicRoute.DampingResistance)
}
var logLevel int
switch econfig.LogLevel.LogLevel {
case "verbose", "debug":

View File

@ -255,11 +255,11 @@ func edge_get_superparams(w http.ResponseWriter, r *http.Request) {
}
// Do something
SuperParams := mtypes.API_SuperParams{
SendPingInterval: httpobj.http_sconfig.SendPingInterval,
HttpPostInterval: httpobj.http_sconfig.HttpPostInterval,
PeerAliveTimeout: httpobj.http_sconfig.PeerAliveTimeout,
AdditionalCost: httpobj.http_PeerID2Info[NodeID].AdditionalCost,
DampingResistance: httpobj.http_sconfig.DampingResistance,
SendPingInterval: httpobj.http_sconfig.SendPingInterval,
HttpPostInterval: httpobj.http_sconfig.HttpPostInterval,
PeerAliveTimeout: httpobj.http_sconfig.PeerAliveTimeout,
AdditionalCost: httpobj.http_PeerID2Info[NodeID].AdditionalCost,
DampingFilterRadius: httpobj.http_sconfig.DampingFilterRadius,
}
SuperParamStr, _ := json.Marshal(SuperParams)
httpobj.http_PeerState[PubKey].SuperParamStateClient.Store(State)
@ -763,11 +763,11 @@ func manage_peerupdate(w http.ResponseWriter, r *http.Request) {
httpobj.http_PeerID2Info[toUpdate] = new_superpeerinfo
SuperParams := mtypes.API_SuperParams{
SendPingInterval: httpobj.http_sconfig.SendPingInterval,
HttpPostInterval: httpobj.http_sconfig.HttpPostInterval,
PeerAliveTimeout: httpobj.http_sconfig.PeerAliveTimeout,
DampingResistance: httpobj.http_sconfig.DampingResistance,
AdditionalCost: new_superpeerinfo.AdditionalCost,
SendPingInterval: httpobj.http_sconfig.SendPingInterval,
HttpPostInterval: httpobj.http_sconfig.HttpPostInterval,
PeerAliveTimeout: httpobj.http_sconfig.PeerAliveTimeout,
DampingFilterRadius: httpobj.http_sconfig.DampingFilterRadius,
AdditionalCost: new_superpeerinfo.AdditionalCost,
}
SuperParamStr, _ := json.Marshal(SuperParams)
@ -815,7 +815,7 @@ func manage_superupdate(w http.ResponseWriter, r *http.Request) {
sconfig_temp.PeerAliveTimeout = httpobj.http_sconfig.PeerAliveTimeout
sconfig_temp.SendPingInterval = httpobj.http_sconfig.SendPingInterval
sconfig_temp.HttpPostInterval = httpobj.http_sconfig.HttpPostInterval
sconfig_temp.DampingResistance = httpobj.http_sconfig.DampingResistance
sconfig_temp.DampingFilterRadius = httpobj.http_sconfig.DampingFilterRadius
PeerAliveTimeout, err := extractParamsFloat(r.Form, "PeerAliveTimeout", 64, nil)
if err == nil {
@ -828,15 +828,10 @@ func manage_superupdate(w http.ResponseWriter, r *http.Request) {
sconfig_temp.PeerAliveTimeout = PeerAliveTimeout
}
DampingResistance, err := extractParamsFloat(r.Form, "DampingResistance", 64, nil)
DampingFilterRadius, err := extractParamsUint(r.Form, "DampingFilterRadius", 64, nil)
if err == nil {
if DampingResistance < 0 || DampingResistance >= 1 {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(fmt.Sprintf("Paramater DampingResistance %v: Must in range [0,1)\n", DampingResistance)))
return
}
Updated_params["DampingResistance"] = fmt.Sprintf("%v", DampingResistance)
sconfig_temp.DampingResistance = DampingResistance
Updated_params["DampingFilterRadius"] = fmt.Sprintf("%v", DampingFilterRadius)
sconfig_temp.DampingFilterRadius = DampingFilterRadius
}
SendPingInterval, err := extractParamsFloat(r.Form, "SendPingInterval", 64, nil)
@ -869,14 +864,14 @@ func manage_superupdate(w http.ResponseWriter, r *http.Request) {
httpobj.http_sconfig.PeerAliveTimeout = sconfig_temp.PeerAliveTimeout
httpobj.http_sconfig.SendPingInterval = sconfig_temp.SendPingInterval
httpobj.http_sconfig.HttpPostInterval = sconfig_temp.HttpPostInterval
httpobj.http_sconfig.DampingResistance = sconfig_temp.DampingResistance
httpobj.http_sconfig.DampingFilterRadius = sconfig_temp.DampingFilterRadius
SuperParams := mtypes.API_SuperParams{
SendPingInterval: httpobj.http_sconfig.SendPingInterval,
HttpPostInterval: httpobj.http_sconfig.HttpPostInterval,
PeerAliveTimeout: httpobj.http_sconfig.PeerAliveTimeout,
DampingResistance: httpobj.http_sconfig.DampingResistance,
AdditionalCost: 10,
SendPingInterval: httpobj.http_sconfig.SendPingInterval,
HttpPostInterval: httpobj.http_sconfig.HttpPostInterval,
PeerAliveTimeout: httpobj.http_sconfig.PeerAliveTimeout,
DampingFilterRadius: httpobj.http_sconfig.DampingFilterRadius,
AdditionalCost: 10,
}
httpobj.Lock()
defer httpobj.Unlock()

View File

@ -100,10 +100,6 @@ func Super(configPath string, useUAPI bool, printExample bool, bindmode string)
if sconfig.RePushConfigInterval <= 0 {
return fmt.Errorf("RePushConfigInterval must > 0 : %v", sconfig.RePushConfigInterval)
}
if sconfig.DampingResistance < 0 || sconfig.DampingResistance >= 1 {
return fmt.Errorf("DampingResistance must in range [0,1) : %v", sconfig.DampingResistance)
}
var logLevel int
switch sconfig.LogLevel.LogLevel {
case "verbose", "debug":

View File

@ -47,7 +47,7 @@ type SuperConfig struct {
HttpPostInterval float64 `yaml:"HttpPostInterval"`
PeerAliveTimeout float64 `yaml:"PeerAliveTimeout"`
SendPingInterval float64 `yaml:"SendPingInterval"`
DampingResistance float64 `yaml:"DampingResistance"`
DampingFilterRadius uint64 `yaml:"DampingFilterRadius"`
LogLevel LoggerInfo `yaml:"LogLevel"`
Passwords Passwords `yaml:"Passwords"`
GraphRecalculateSetting GraphRecalculateSetting `yaml:"GraphRecalculateSetting"`
@ -132,7 +132,7 @@ type DynamicRouteInfo struct {
ConnNextTry float64 `yaml:"ConnNextTry"`
DupCheckTimeout float64 `yaml:"DupCheckTimeout"`
AdditionalCost float64 `yaml:"AdditionalCost"`
DampingResistance float64 `yaml:"DampingResistance"`
DampingFilterRadius uint64 `yaml:"DampingFilterRadius"`
SaveNewPeers bool `yaml:"SaveNewPeers"`
SuperNode SuperInfo `yaml:"SuperNode"`
P2P P2PInfo `yaml:"P2P"`
@ -223,11 +223,11 @@ type API_Peerinfo struct {
}
type API_SuperParams struct {
SendPingInterval float64
HttpPostInterval float64
PeerAliveTimeout float64
DampingResistance float64
AdditionalCost float64
SendPingInterval float64
HttpPostInterval float64
PeerAliveTimeout float64
DampingFilterRadius uint64
AdditionalCost float64
}
type StateHash struct {