mirror of
https://github.com/tim-beatham/smegmesh.git
synced 2025-08-12 06:29:05 +02:00
Compare commits
8 Commits
39-impleme
...
41-bugfix-
Author | SHA1 | Date | |
---|---|---|---|
1b18d89c9f | |||
245a2c5f58 | |||
c40f7510b8 | |||
78d748770c | |||
0ff2a8eef9 | |||
fd7bd80485 | |||
3ef1b68ba5 | |||
b9ba836ae3 |
@ -222,13 +222,13 @@ func (m *TwoPhaseStoreMeshManager) GetDevice() (*wgtypes.Device, error) {
|
|||||||
|
|
||||||
// HasChanges returns true if we have changes since last time we synced
|
// HasChanges returns true if we have changes since last time we synced
|
||||||
func (m *TwoPhaseStoreMeshManager) HasChanges() bool {
|
func (m *TwoPhaseStoreMeshManager) HasChanges() bool {
|
||||||
clockValue := m.store.GetClock()
|
clockValue := m.store.GetHash()
|
||||||
return clockValue != m.LastClock
|
return clockValue != m.LastClock
|
||||||
}
|
}
|
||||||
|
|
||||||
// Record that we have changes and save the corresponding changes
|
// Record that we have changes and save the corresponding changes
|
||||||
func (m *TwoPhaseStoreMeshManager) SaveChanges() {
|
func (m *TwoPhaseStoreMeshManager) SaveChanges() {
|
||||||
clockValue := m.store.GetClock()
|
clockValue := m.store.GetHash()
|
||||||
m.LastClock = clockValue
|
m.LastClock = clockValue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -128,16 +128,19 @@ func (m *TwoPhaseMap[K, D]) incrementClock() uint64 {
|
|||||||
return maxClock
|
return maxClock
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *TwoPhaseMap[K, D]) GetClock() uint64 {
|
// GetHash: Get the hash of the current state of the map
|
||||||
maxClock := uint64(0)
|
// Sums the current values of the vectors. Provides good approximation
|
||||||
|
// of increasing numbers
|
||||||
|
func (m *TwoPhaseMap[K, D]) GetHash() uint64 {
|
||||||
m.lock.RLock()
|
m.lock.RLock()
|
||||||
|
|
||||||
for _, value := range m.vectors {
|
sum := lib.Reduce(uint64(0), lib.MapValues(m.vectors), func(sum uint64, current uint64) uint64 {
|
||||||
maxClock = max(maxClock, value)
|
return current + sum
|
||||||
}
|
})
|
||||||
|
|
||||||
m.lock.RUnlock()
|
m.lock.RUnlock()
|
||||||
return maxClock
|
|
||||||
|
return sum
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetState: get the current vector clock of the add and remove
|
// GetState: get the current vector clock of the add and remove
|
||||||
|
@ -125,7 +125,6 @@ func (t *TwoPhaseSyncer) RecvMessage(msg []byte) error {
|
|||||||
|
|
||||||
func (t *TwoPhaseSyncer) Complete() {
|
func (t *TwoPhaseSyncer) Complete() {
|
||||||
logging.Log.WriteInfof("SYNC COMPLETED")
|
logging.Log.WriteInfof("SYNC COMPLETED")
|
||||||
t.manager.SaveChanges()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTwoPhaseSyncer(manager *TwoPhaseStoreMeshManager) *TwoPhaseSyncer {
|
func NewTwoPhaseSyncer(manager *TwoPhaseStoreMeshManager) *TwoPhaseSyncer {
|
||||||
|
@ -76,3 +76,13 @@ func Contains[V any](list []V, proposition func(V) bool) bool {
|
|||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Reduce[A any, V any](start A, values []V, reduce func(A, V) A) A {
|
||||||
|
accum := start
|
||||||
|
|
||||||
|
for _, elem := range values {
|
||||||
|
accum = reduce(accum, elem)
|
||||||
|
}
|
||||||
|
|
||||||
|
return accum
|
||||||
|
}
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"slices"
|
"slices"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tim-beatham/wgmesh/pkg/conf"
|
"github.com/tim-beatham/wgmesh/pkg/conf"
|
||||||
@ -52,7 +53,7 @@ func (m *WgMeshConfigApplyer) convertMeshNode(node MeshNode, device *wgtypes.Dev
|
|||||||
allowedips := make([]net.IPNet, 1)
|
allowedips := make([]net.IPNet, 1)
|
||||||
allowedips[0] = *node.GetWgHost()
|
allowedips[0] = *node.GetWgHost()
|
||||||
|
|
||||||
clients, ok := peerToClients[node.GetWgHost().String()]
|
clients, ok := peerToClients[pubKey.String()]
|
||||||
|
|
||||||
if ok {
|
if ok {
|
||||||
allowedips = append(allowedips, clients...)
|
allowedips = append(allowedips, clients...)
|
||||||
@ -154,59 +155,100 @@ func (m *WgMeshConfigApplyer) getRoutes(meshProvider MeshProvider) map[string][]
|
|||||||
return routes
|
return routes
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *WgMeshConfigApplyer) updateWgConf(mesh MeshProvider) error {
|
// getCorrespondignPeer: gets the peer corresponding to the client
|
||||||
snap, err := mesh.GetMesh()
|
func (m *WgMeshConfigApplyer) getCorrespondingPeer(peers []MeshNode, client MeshNode) MeshNode {
|
||||||
|
hashFunc := func(mn MeshNode) int {
|
||||||
if err != nil {
|
pubKey, _ := mn.GetPublicKey()
|
||||||
return err
|
return lib.HashString(pubKey.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
nodes := lib.MapValues(snap.GetNodes())
|
peer := lib.ConsistentHash(peers, client, hashFunc, hashFunc)
|
||||||
peerConfigs := make([]wgtypes.PeerConfig, len(nodes))
|
return peer
|
||||||
|
}
|
||||||
peers := lib.Filter(nodes, func(mn MeshNode) bool {
|
|
||||||
return mn.GetType() == conf.PEER_ROLE
|
|
||||||
})
|
|
||||||
|
|
||||||
var count int = 0
|
|
||||||
|
|
||||||
|
func (m *WgMeshConfigApplyer) getClientConfig(mesh MeshProvider, peers []MeshNode, clients []MeshNode) (*wgtypes.Config, error) {
|
||||||
self, err := m.meshManager.GetSelf(mesh.GetMeshId())
|
self, err := m.meshManager.GetSelf(mesh.GetMeshId())
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
peer := m.getCorrespondingPeer(peers, self)
|
||||||
|
|
||||||
|
pubKey, _ := peer.GetPublicKey()
|
||||||
|
|
||||||
|
keepAlive := time.Duration(m.config.KeepAliveWg) * time.Second
|
||||||
|
endpoint, err := net.ResolveUDPAddr("udp", peer.GetWgEndpoint())
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
allowedips := make([]net.IPNet, 1)
|
||||||
|
_, ipnet, _ := net.ParseCIDR("::/0")
|
||||||
|
allowedips[0] = *ipnet
|
||||||
|
|
||||||
|
peerCfgs := make([]wgtypes.PeerConfig, 1)
|
||||||
|
|
||||||
|
peerCfgs[0] = wgtypes.PeerConfig{
|
||||||
|
PublicKey: pubKey,
|
||||||
|
Endpoint: endpoint,
|
||||||
|
PersistentKeepaliveInterval: &keepAlive,
|
||||||
|
AllowedIPs: allowedips,
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg := wgtypes.Config{
|
||||||
|
Peers: peerCfgs,
|
||||||
|
}
|
||||||
|
|
||||||
|
return &cfg, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *WgMeshConfigApplyer) getPeerConfig(mesh MeshProvider, peers []MeshNode, clients []MeshNode, dev *wgtypes.Device) (*wgtypes.Config, error) {
|
||||||
peerToClients := make(map[string][]net.IPNet)
|
peerToClients := make(map[string][]net.IPNet)
|
||||||
routes := m.getRoutes(mesh)
|
routes := m.getRoutes(mesh)
|
||||||
installedRoutes := make([]lib.Route, 0)
|
installedRoutes := make([]lib.Route, 0)
|
||||||
|
peerConfigs := make([]wgtypes.PeerConfig, 0)
|
||||||
|
self, err := m.meshManager.GetSelf(mesh.GetMeshId())
|
||||||
|
|
||||||
for _, n := range nodes {
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, n := range clients {
|
||||||
|
if len(peers) > 0 {
|
||||||
|
peer := m.getCorrespondingPeer(peers, n)
|
||||||
|
pubKey, _ := peer.GetPublicKey()
|
||||||
|
clients, ok := peerToClients[pubKey.String()]
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
clients = make([]net.IPNet, 0)
|
||||||
|
peerToClients[pubKey.String()] = clients
|
||||||
|
}
|
||||||
|
|
||||||
|
peerToClients[pubKey.String()] = append(clients, *n.GetWgHost())
|
||||||
|
|
||||||
|
if NodeEquals(self, peer) {
|
||||||
|
cfg, err := m.convertMeshNode(n, dev, peerToClients, routes)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
peerConfigs = append(peerConfigs, *cfg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, n := range peers {
|
||||||
if NodeEquals(n, self) {
|
if NodeEquals(n, self) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if n.GetType() == conf.CLIENT_ROLE && len(peers) > 0 && self.GetType() == conf.CLIENT_ROLE {
|
|
||||||
hashFunc := func(mn MeshNode) int {
|
|
||||||
return lib.HashString(mn.GetWgHost().String())
|
|
||||||
}
|
|
||||||
peer := lib.ConsistentHash(peers, n, hashFunc, hashFunc)
|
|
||||||
|
|
||||||
clients, ok := peerToClients[peer.GetWgHost().String()]
|
|
||||||
|
|
||||||
if !ok {
|
|
||||||
clients = make([]net.IPNet, 0)
|
|
||||||
peerToClients[peer.GetWgHost().String()] = clients
|
|
||||||
}
|
|
||||||
|
|
||||||
peerToClients[peer.GetWgHost().String()] = append(clients, *n.GetWgHost())
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
dev, _ := mesh.GetDevice()
|
|
||||||
peer, err := m.convertMeshNode(n, dev, peerToClients, routes)
|
peer, err := m.convertMeshNode(n, dev, peerToClients, routes)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, route := range peer.AllowedIPs {
|
for _, route := range peer.AllowedIPs {
|
||||||
@ -221,27 +263,66 @@ func (m *WgMeshConfigApplyer) updateWgConf(mesh MeshProvider) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
peerConfigs[count] = *peer
|
peerConfigs = append(peerConfigs, *peer)
|
||||||
count++
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg := wgtypes.Config{
|
cfg := wgtypes.Config{
|
||||||
Peers: peerConfigs,
|
Peers: peerConfigs,
|
||||||
|
ReplacePeers: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
dev, err := mesh.GetDevice()
|
err = m.routeInstaller.InstallRoutes(dev.Name, installedRoutes...)
|
||||||
|
return &cfg, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *WgMeshConfigApplyer) updateWgConf(mesh MeshProvider) error {
|
||||||
|
snap, err := mesh.GetMesh()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = m.meshManager.GetClient().ConfigureDevice(dev.Name, cfg)
|
nodes := lib.MapValues(snap.GetNodes())
|
||||||
|
dev, _ := mesh.GetDevice()
|
||||||
|
|
||||||
|
slices.SortFunc(nodes, func(a, b MeshNode) int {
|
||||||
|
return strings.Compare(string(a.GetType()), string(b.GetType()))
|
||||||
|
})
|
||||||
|
|
||||||
|
peers := lib.Filter(nodes, func(mn MeshNode) bool {
|
||||||
|
return mn.GetType() == conf.PEER_ROLE
|
||||||
|
})
|
||||||
|
|
||||||
|
clients := lib.Filter(nodes, func(mn MeshNode) bool {
|
||||||
|
return mn.GetType() == conf.CLIENT_ROLE
|
||||||
|
})
|
||||||
|
|
||||||
|
self, err := m.meshManager.GetSelf(mesh.GetMeshId())
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return m.routeInstaller.InstallRoutes(dev.Name, installedRoutes...)
|
var cfg *wgtypes.Config = nil
|
||||||
|
|
||||||
|
switch self.GetType() {
|
||||||
|
case conf.PEER_ROLE:
|
||||||
|
cfg, err = m.getPeerConfig(mesh, peers, clients, dev)
|
||||||
|
case conf.CLIENT_ROLE:
|
||||||
|
cfg, err = m.getClientConfig(mesh, peers, clients)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = m.meshManager.GetClient().ConfigureDevice(dev.Name, *cfg)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *WgMeshConfigApplyer) ApplyConfig() error {
|
func (m *WgMeshConfigApplyer) ApplyConfig() error {
|
||||||
@ -270,7 +351,8 @@ func (m *WgMeshConfigApplyer) RemovePeers(meshId string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
m.meshManager.GetClient().ConfigureDevice(dev.Name, wgtypes.Config{
|
m.meshManager.GetClient().ConfigureDevice(dev.Name, wgtypes.Config{
|
||||||
Peers: make([]wgtypes.PeerConfig, 0),
|
Peers: make([]wgtypes.PeerConfig, 0),
|
||||||
|
ReplacePeers: true,
|
||||||
})
|
})
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -89,6 +89,7 @@ func (s *SyncerImpl) Sync(meshId string) error {
|
|||||||
logging.Log.WriteInfof("SYNC COUNT: %d", s.syncCount)
|
logging.Log.WriteInfof("SYNC COUNT: %d", s.syncCount)
|
||||||
|
|
||||||
s.infectionCount = ((s.conf.InfectionCount + s.infectionCount - 1) % s.conf.InfectionCount)
|
s.infectionCount = ((s.conf.InfectionCount + s.infectionCount - 1) % s.conf.InfectionCount)
|
||||||
|
s.manager.GetMesh(meshId).SaveChanges()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user