1
0
forked from extern/smegmesh

Compare commits

...

8 Commits

Author SHA1 Message Date
1b18d89c9f 41-bugfix-fluctuating-ips
Fluctuating ips creating hub and spoke.
2023-12-05 02:00:16 +00:00
245a2c5f58 41-bugfix-fluctuating-ips
If the node is a peer then add the client in the WG
configuration.
2023-12-04 17:40:24 +00:00
c40f7510b8 41-bugfix-fluctuating-ips
IPs of clients fluctuating because there isn't a strict order on
clients. Client's need to be processed before the peers.
2023-12-04 17:32:50 +00:00
78d748770c BUGIX Hash client by public key 2023-12-04 17:13:51 +00:00
0ff2a8eef9 BUGFIX: Allowed IPs fluctuating 2023-12-04 17:11:37 +00:00
fd7bd80485 BUGFIX
Don't get device each time it is an expensive operation.
2023-12-04 16:40:15 +00:00
3ef1b68ba5 BUGFIX: Hashing datastore to work out changes
Changed hashing implementation to work out if there are changes
in the data store
2023-11-30 15:58:26 +00:00
b9ba836ae3 Merge pull request #40 from tim-beatham/39-implement-two-phase-map
39-implement-two-phase-map
2023-11-30 02:03:36 +00:00
6 changed files with 146 additions and 51 deletions

View File

@ -222,13 +222,13 @@ func (m *TwoPhaseStoreMeshManager) GetDevice() (*wgtypes.Device, error) {
// HasChanges returns true if we have changes since last time we synced
func (m *TwoPhaseStoreMeshManager) HasChanges() bool {
clockValue := m.store.GetClock()
clockValue := m.store.GetHash()
return clockValue != m.LastClock
}
// Record that we have changes and save the corresponding changes
func (m *TwoPhaseStoreMeshManager) SaveChanges() {
clockValue := m.store.GetClock()
clockValue := m.store.GetHash()
m.LastClock = clockValue
}

View File

@ -128,16 +128,19 @@ func (m *TwoPhaseMap[K, D]) incrementClock() uint64 {
return maxClock
}
func (m *TwoPhaseMap[K, D]) GetClock() uint64 {
maxClock := uint64(0)
// GetHash: Get the hash of the current state of the map
// Sums the current values of the vectors. Provides good approximation
// of increasing numbers
func (m *TwoPhaseMap[K, D]) GetHash() uint64 {
m.lock.RLock()
for _, value := range m.vectors {
maxClock = max(maxClock, value)
}
sum := lib.Reduce(uint64(0), lib.MapValues(m.vectors), func(sum uint64, current uint64) uint64 {
return current + sum
})
m.lock.RUnlock()
return maxClock
return sum
}
// GetState: get the current vector clock of the add and remove

View File

@ -125,7 +125,6 @@ func (t *TwoPhaseSyncer) RecvMessage(msg []byte) error {
func (t *TwoPhaseSyncer) Complete() {
logging.Log.WriteInfof("SYNC COMPLETED")
t.manager.SaveChanges()
}
func NewTwoPhaseSyncer(manager *TwoPhaseStoreMeshManager) *TwoPhaseSyncer {

View File

@ -76,3 +76,13 @@ func Contains[V any](list []V, proposition func(V) bool) bool {
return false
}
func Reduce[A any, V any](start A, values []V, reduce func(A, V) A) A {
accum := start
for _, elem := range values {
accum = reduce(accum, elem)
}
return accum
}

View File

@ -4,6 +4,7 @@ import (
"fmt"
"net"
"slices"
"strings"
"time"
"github.com/tim-beatham/wgmesh/pkg/conf"
@ -52,7 +53,7 @@ func (m *WgMeshConfigApplyer) convertMeshNode(node MeshNode, device *wgtypes.Dev
allowedips := make([]net.IPNet, 1)
allowedips[0] = *node.GetWgHost()
clients, ok := peerToClients[node.GetWgHost().String()]
clients, ok := peerToClients[pubKey.String()]
if ok {
allowedips = append(allowedips, clients...)
@ -154,59 +155,100 @@ func (m *WgMeshConfigApplyer) getRoutes(meshProvider MeshProvider) map[string][]
return routes
}
func (m *WgMeshConfigApplyer) updateWgConf(mesh MeshProvider) error {
snap, err := mesh.GetMesh()
if err != nil {
return err
// getCorrespondignPeer: gets the peer corresponding to the client
func (m *WgMeshConfigApplyer) getCorrespondingPeer(peers []MeshNode, client MeshNode) MeshNode {
hashFunc := func(mn MeshNode) int {
pubKey, _ := mn.GetPublicKey()
return lib.HashString(pubKey.String())
}
nodes := lib.MapValues(snap.GetNodes())
peerConfigs := make([]wgtypes.PeerConfig, len(nodes))
peers := lib.Filter(nodes, func(mn MeshNode) bool {
return mn.GetType() == conf.PEER_ROLE
})
var count int = 0
peer := lib.ConsistentHash(peers, client, hashFunc, hashFunc)
return peer
}
func (m *WgMeshConfigApplyer) getClientConfig(mesh MeshProvider, peers []MeshNode, clients []MeshNode) (*wgtypes.Config, error) {
self, err := m.meshManager.GetSelf(mesh.GetMeshId())
if err != nil {
return err
return nil, err
}
peer := m.getCorrespondingPeer(peers, self)
pubKey, _ := peer.GetPublicKey()
keepAlive := time.Duration(m.config.KeepAliveWg) * time.Second
endpoint, err := net.ResolveUDPAddr("udp", peer.GetWgEndpoint())
if err != nil {
return nil, err
}
allowedips := make([]net.IPNet, 1)
_, ipnet, _ := net.ParseCIDR("::/0")
allowedips[0] = *ipnet
peerCfgs := make([]wgtypes.PeerConfig, 1)
peerCfgs[0] = wgtypes.PeerConfig{
PublicKey: pubKey,
Endpoint: endpoint,
PersistentKeepaliveInterval: &keepAlive,
AllowedIPs: allowedips,
}
cfg := wgtypes.Config{
Peers: peerCfgs,
}
return &cfg, err
}
func (m *WgMeshConfigApplyer) getPeerConfig(mesh MeshProvider, peers []MeshNode, clients []MeshNode, dev *wgtypes.Device) (*wgtypes.Config, error) {
peerToClients := make(map[string][]net.IPNet)
routes := m.getRoutes(mesh)
installedRoutes := make([]lib.Route, 0)
peerConfigs := make([]wgtypes.PeerConfig, 0)
self, err := m.meshManager.GetSelf(mesh.GetMeshId())
for _, n := range nodes {
if err != nil {
return nil, err
}
for _, n := range clients {
if len(peers) > 0 {
peer := m.getCorrespondingPeer(peers, n)
pubKey, _ := peer.GetPublicKey()
clients, ok := peerToClients[pubKey.String()]
if !ok {
clients = make([]net.IPNet, 0)
peerToClients[pubKey.String()] = clients
}
peerToClients[pubKey.String()] = append(clients, *n.GetWgHost())
if NodeEquals(self, peer) {
cfg, err := m.convertMeshNode(n, dev, peerToClients, routes)
if err != nil {
return nil, err
}
peerConfigs = append(peerConfigs, *cfg)
}
}
}
for _, n := range peers {
if NodeEquals(n, self) {
continue
}
if n.GetType() == conf.CLIENT_ROLE && len(peers) > 0 && self.GetType() == conf.CLIENT_ROLE {
hashFunc := func(mn MeshNode) int {
return lib.HashString(mn.GetWgHost().String())
}
peer := lib.ConsistentHash(peers, n, hashFunc, hashFunc)
clients, ok := peerToClients[peer.GetWgHost().String()]
if !ok {
clients = make([]net.IPNet, 0)
peerToClients[peer.GetWgHost().String()] = clients
}
peerToClients[peer.GetWgHost().String()] = append(clients, *n.GetWgHost())
continue
}
dev, _ := mesh.GetDevice()
peer, err := m.convertMeshNode(n, dev, peerToClients, routes)
if err != nil {
return err
return nil, err
}
for _, route := range peer.AllowedIPs {
@ -221,27 +263,66 @@ func (m *WgMeshConfigApplyer) updateWgConf(mesh MeshProvider) error {
}
}
peerConfigs[count] = *peer
count++
peerConfigs = append(peerConfigs, *peer)
}
cfg := wgtypes.Config{
Peers: peerConfigs,
Peers: peerConfigs,
ReplacePeers: true,
}
dev, err := mesh.GetDevice()
err = m.routeInstaller.InstallRoutes(dev.Name, installedRoutes...)
return &cfg, err
}
func (m *WgMeshConfigApplyer) updateWgConf(mesh MeshProvider) error {
snap, err := mesh.GetMesh()
if err != nil {
return err
}
err = m.meshManager.GetClient().ConfigureDevice(dev.Name, cfg)
nodes := lib.MapValues(snap.GetNodes())
dev, _ := mesh.GetDevice()
slices.SortFunc(nodes, func(a, b MeshNode) int {
return strings.Compare(string(a.GetType()), string(b.GetType()))
})
peers := lib.Filter(nodes, func(mn MeshNode) bool {
return mn.GetType() == conf.PEER_ROLE
})
clients := lib.Filter(nodes, func(mn MeshNode) bool {
return mn.GetType() == conf.CLIENT_ROLE
})
self, err := m.meshManager.GetSelf(mesh.GetMeshId())
if err != nil {
return err
}
return m.routeInstaller.InstallRoutes(dev.Name, installedRoutes...)
var cfg *wgtypes.Config = nil
switch self.GetType() {
case conf.PEER_ROLE:
cfg, err = m.getPeerConfig(mesh, peers, clients, dev)
case conf.CLIENT_ROLE:
cfg, err = m.getClientConfig(mesh, peers, clients)
}
if err != nil {
return err
}
err = m.meshManager.GetClient().ConfigureDevice(dev.Name, *cfg)
if err != nil {
return err
}
return nil
}
func (m *WgMeshConfigApplyer) ApplyConfig() error {
@ -270,7 +351,8 @@ func (m *WgMeshConfigApplyer) RemovePeers(meshId string) error {
}
m.meshManager.GetClient().ConfigureDevice(dev.Name, wgtypes.Config{
Peers: make([]wgtypes.PeerConfig, 0),
Peers: make([]wgtypes.PeerConfig, 0),
ReplacePeers: true,
})
return nil

View File

@ -89,6 +89,7 @@ func (s *SyncerImpl) Sync(meshId string) error {
logging.Log.WriteInfof("SYNC COUNT: %d", s.syncCount)
s.infectionCount = ((s.conf.InfectionCount + s.infectionCount - 1) % s.conf.InfectionCount)
s.manager.GetMesh(meshId).SaveChanges()
return nil
}