mirror of
https://github.com/tim-beatham/smegmesh.git
synced 2025-01-22 13:38:35 +01:00
Merge pull request #44 from tim-beatham/43-gravestones
43-use-gravestones
This commit is contained in:
commit
2169f7796f
@ -48,6 +48,13 @@ type MeshNode struct {
|
|||||||
Description string
|
Description string
|
||||||
Services map[string]string
|
Services map[string]string
|
||||||
Type string
|
Type string
|
||||||
|
Tombstone bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark: marks the node is unreachable. This is not broadcast on
|
||||||
|
// syncrhonisation
|
||||||
|
func (m *TwoPhaseStoreMeshManager) Mark(nodeId string) {
|
||||||
|
m.store.Mark(nodeId)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetHostEndpoint: gets the gRPC endpoint of the node
|
// GetHostEndpoint: gets the gRPC endpoint of the node
|
||||||
@ -200,11 +207,11 @@ func (m *TwoPhaseStoreMeshManager) Save() []byte {
|
|||||||
// Load() loads a mesh network
|
// Load() loads a mesh network
|
||||||
func (m *TwoPhaseStoreMeshManager) Load(bs []byte) error {
|
func (m *TwoPhaseStoreMeshManager) Load(bs []byte) error {
|
||||||
buf := bytes.NewBuffer(bs)
|
buf := bytes.NewBuffer(bs)
|
||||||
|
|
||||||
dec := gob.NewDecoder(buf)
|
dec := gob.NewDecoder(buf)
|
||||||
|
|
||||||
var snapshot TwoPhaseMapSnapshot[string, MeshNode]
|
var snapshot TwoPhaseMapSnapshot[string, MeshNode]
|
||||||
err := dec.Decode(&snapshot)
|
err := dec.Decode(&snapshot)
|
||||||
|
|
||||||
m.store.Merge(snapshot)
|
m.store.Merge(snapshot)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -256,14 +263,25 @@ func (m *TwoPhaseStoreMeshManager) AddRoutes(nodeId string, routes ...mesh.Route
|
|||||||
|
|
||||||
node := m.store.Get(nodeId)
|
node := m.store.Get(nodeId)
|
||||||
|
|
||||||
|
changes := false
|
||||||
|
|
||||||
for _, route := range routes {
|
for _, route := range routes {
|
||||||
|
prevRoute, ok := node.Routes[route.GetDestination().String()]
|
||||||
|
|
||||||
|
if !ok || route.GetHopCount() < prevRoute.GetHopCount() {
|
||||||
|
changes = true
|
||||||
|
|
||||||
node.Routes[route.GetDestination().String()] = Route{
|
node.Routes[route.GetDestination().String()] = Route{
|
||||||
Destination: route.GetDestination().String(),
|
Destination: route.GetDestination().String(),
|
||||||
Path: route.GetPath(),
|
Path: route.GetPath(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if changes {
|
||||||
m.store.Put(nodeId, node)
|
m.store.Put(nodeId, node)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -357,8 +375,18 @@ func (m *TwoPhaseStoreMeshManager) RemoveService(nodeId string, key string) erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Prune: prunes all nodes that have not updated their timestamp in
|
// Prune: prunes all nodes that have not updated their timestamp in
|
||||||
// pruneAmount seconds
|
// pruneAmount of seconds
|
||||||
func (m *TwoPhaseStoreMeshManager) Prune(pruneAmount int) error {
|
func (m *TwoPhaseStoreMeshManager) Prune(pruneAmount int) error {
|
||||||
|
nodes := lib.MapValues(m.store.AsMap())
|
||||||
|
nodes = lib.Filter(nodes, func(mn MeshNode) bool {
|
||||||
|
return time.Now().Unix()-mn.Timestamp > int64(pruneAmount)
|
||||||
|
})
|
||||||
|
|
||||||
|
for _, node := range nodes {
|
||||||
|
key, _ := node.GetPublicKey()
|
||||||
|
m.store.Remove(key.String())
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -370,6 +398,13 @@ func (m *TwoPhaseStoreMeshManager) GetPeers() []string {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the node is marked as unreachable don't consider it a peer.
|
||||||
|
// this help to optimize convergence time for unreachable nodes.
|
||||||
|
// However advertising it to other nodes could result in flapping.
|
||||||
|
if m.store.IsMarked(mn.PublicKey) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
return time.Now().Unix()-mn.Timestamp < int64(m.conf.DeadTime)
|
return time.Now().Unix()-mn.Timestamp < int64(m.conf.DeadTime)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -8,6 +8,7 @@ import (
|
|||||||
type Bucket[D any] struct {
|
type Bucket[D any] struct {
|
||||||
Vector uint64
|
Vector uint64
|
||||||
Contents D
|
Contents D
|
||||||
|
Gravestone bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// GMap is a set that can only grow in size
|
// GMap is a set that can only grow in size
|
||||||
@ -62,6 +63,30 @@ func (g *GMap[K, D]) Get(key K) D {
|
|||||||
return g.get(key).Contents
|
return g.get(key).Contents
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *GMap[K, D]) Mark(key K) {
|
||||||
|
g.lock.Lock()
|
||||||
|
bucket := g.contents[key]
|
||||||
|
bucket.Gravestone = true
|
||||||
|
g.lock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsMarked: returns true if the node is marked
|
||||||
|
func (g *GMap[K, D]) IsMarked(key K) bool {
|
||||||
|
marked := false
|
||||||
|
|
||||||
|
g.lock.RLock()
|
||||||
|
|
||||||
|
bucket, ok := g.contents[key]
|
||||||
|
|
||||||
|
if ok {
|
||||||
|
marked = bucket.Gravestone
|
||||||
|
}
|
||||||
|
|
||||||
|
g.lock.RUnlock()
|
||||||
|
|
||||||
|
return marked
|
||||||
|
}
|
||||||
|
|
||||||
func (g *GMap[K, D]) Keys() []K {
|
func (g *GMap[K, D]) Keys() []K {
|
||||||
g.lock.RLock()
|
g.lock.RLock()
|
||||||
|
|
||||||
|
@ -60,6 +60,10 @@ func (m *TwoPhaseMap[K, D]) Put(key K, data D) {
|
|||||||
m.addMap.Put(key, data)
|
m.addMap.Put(key, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *TwoPhaseMap[K, D]) Mark(key K) {
|
||||||
|
m.addMap.Mark(key)
|
||||||
|
}
|
||||||
|
|
||||||
// Remove removes the value from the map
|
// Remove removes the value from the map
|
||||||
func (m *TwoPhaseMap[K, D]) Remove(key K) {
|
func (m *TwoPhaseMap[K, D]) Remove(key K) {
|
||||||
m.removeMap.Put(key, true)
|
m.removeMap.Put(key, true)
|
||||||
@ -115,6 +119,10 @@ type TwoPhaseMapState[K comparable] struct {
|
|||||||
RemoveContents map[K]uint64
|
RemoveContents map[K]uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *TwoPhaseMap[K, D]) IsMarked(key K) bool {
|
||||||
|
return m.addMap.IsMarked(key)
|
||||||
|
}
|
||||||
|
|
||||||
func (m *TwoPhaseMap[K, D]) incrementClock() uint64 {
|
func (m *TwoPhaseMap[K, D]) incrementClock() uint64 {
|
||||||
maxClock := uint64(0)
|
maxClock := uint64(0)
|
||||||
m.lock.Lock()
|
m.lock.Lock()
|
||||||
@ -184,11 +192,15 @@ func (m *TwoPhaseMap[K, D]) Merge(snapshot TwoPhaseMapSnapshot[K, D]) {
|
|||||||
m.lock.Lock()
|
m.lock.Lock()
|
||||||
|
|
||||||
for key, value := range snapshot.Add {
|
for key, value := range snapshot.Add {
|
||||||
|
// Gravestone is local only to that node.
|
||||||
|
// Discover ourselves if the node is alive
|
||||||
|
value.Gravestone = false
|
||||||
m.addMap.put(key, value)
|
m.addMap.put(key, value)
|
||||||
m.vectors[key] = max(value.Vector, m.vectors[key])
|
m.vectors[key] = max(value.Vector, m.vectors[key])
|
||||||
}
|
}
|
||||||
|
|
||||||
for key, value := range snapshot.Remove {
|
for key, value := range snapshot.Remove {
|
||||||
|
value.Gravestone = false
|
||||||
m.removeMap.put(key, value)
|
m.removeMap.put(key, value)
|
||||||
m.vectors[key] = max(value.Vector, m.vectors[key])
|
m.vectors[key] = max(value.Vector, m.vectors[key])
|
||||||
}
|
}
|
||||||
|
@ -81,6 +81,11 @@ type MeshProviderStub struct {
|
|||||||
snapshot *MeshSnapshotStub
|
snapshot *MeshSnapshotStub
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Mark implements MeshProvider.
|
||||||
|
func (*MeshProviderStub) Mark(nodeId string) {
|
||||||
|
panic("unimplemented")
|
||||||
|
}
|
||||||
|
|
||||||
// RemoveNode implements MeshProvider.
|
// RemoveNode implements MeshProvider.
|
||||||
func (*MeshProviderStub) RemoveNode(nodeId string) error {
|
func (*MeshProviderStub) RemoveNode(nodeId string) error {
|
||||||
panic("unimplemented")
|
panic("unimplemented")
|
||||||
@ -117,7 +122,7 @@ func (*MeshProviderStub) RemoveService(nodeId string, key string) error {
|
|||||||
|
|
||||||
// SetAlias implements MeshProvider.
|
// SetAlias implements MeshProvider.
|
||||||
func (*MeshProviderStub) SetAlias(nodeId string, alias string) error {
|
func (*MeshProviderStub) SetAlias(nodeId string, alias string) error {
|
||||||
panic("unimplemented")
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoveRoutes implements MeshProvider.
|
// RemoveRoutes implements MeshProvider.
|
||||||
|
@ -140,6 +140,9 @@ type MeshProvider interface {
|
|||||||
GetRoutes(targetNode string) (map[string]Route, error)
|
GetRoutes(targetNode string) (map[string]Route, error)
|
||||||
// RemoveNode(): remove the node from the mesh
|
// RemoveNode(): remove the node from the mesh
|
||||||
RemoveNode(nodeId string) error
|
RemoveNode(nodeId string) error
|
||||||
|
// Mark: marks the node as unreachable. This is not broadcast to the entire
|
||||||
|
// this is not considered when syncing node state
|
||||||
|
Mark(nodeId string)
|
||||||
}
|
}
|
||||||
|
|
||||||
// HostParameters contains the IDs of a node
|
// HostParameters contains the IDs of a node
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
package sync
|
package sync
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tim-beatham/wgmesh/pkg/conf"
|
"github.com/tim-beatham/wgmesh/pkg/conf"
|
||||||
@ -59,36 +59,42 @@ func (s *SyncerImpl) Sync(meshId string) error {
|
|||||||
|
|
||||||
if len(nodeNames) > s.conf.ClusterSize && rand.Float64() < s.conf.InterClusterChance {
|
if len(nodeNames) > s.conf.ClusterSize && rand.Float64() < s.conf.InterClusterChance {
|
||||||
logging.Log.WriteInfof("Sending to random cluster")
|
logging.Log.WriteInfof("Sending to random cluster")
|
||||||
interCluster := s.cluster.GetInterCluster(nodeNames, publicKey.String())
|
randomSubset[len(randomSubset)-1] = s.cluster.GetInterCluster(nodeNames, publicKey.String())
|
||||||
randomSubset = append(randomSubset, interCluster)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var waitGroup sync.WaitGroup
|
var succeeded bool = false
|
||||||
|
|
||||||
for index := range randomSubset {
|
// Do this synchronously to conserve bandwidth
|
||||||
waitGroup.Add(1)
|
for _, node := range randomSubset {
|
||||||
|
correspondingPeer := s.manager.GetNode(meshId, node)
|
||||||
go func(i int) error {
|
|
||||||
defer waitGroup.Done()
|
|
||||||
|
|
||||||
correspondingPeer := s.manager.GetNode(meshId, randomSubset[i])
|
|
||||||
|
|
||||||
if correspondingPeer == nil {
|
if correspondingPeer == nil {
|
||||||
logging.Log.WriteErrorf("node %s does not exist", randomSubset[i])
|
logging.Log.WriteErrorf("node %s does not exist", node)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := s.requester.SyncMesh(meshId, correspondingPeer.GetHostEndpoint())
|
err = s.requester.SyncMesh(meshId, correspondingPeer.GetHostEndpoint())
|
||||||
return err
|
|
||||||
}(index)
|
|
||||||
}
|
|
||||||
|
|
||||||
waitGroup.Wait()
|
if err == nil || err == io.EOF {
|
||||||
|
succeeded = true
|
||||||
|
} else {
|
||||||
|
// If the synchronisation operation has failed them mark a gravestone
|
||||||
|
// preventing the peer from being re-contacted until it has updated
|
||||||
|
// itself
|
||||||
|
s.manager.GetMesh(meshId).Mark(node)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
s.syncCount++
|
s.syncCount++
|
||||||
logging.Log.WriteInfof("SYNC TIME: %v", time.Since(before))
|
logging.Log.WriteInfof("SYNC TIME: %v", time.Since(before))
|
||||||
logging.Log.WriteInfof("SYNC COUNT: %d", s.syncCount)
|
logging.Log.WriteInfof("SYNC COUNT: %d", s.syncCount)
|
||||||
|
|
||||||
s.infectionCount = ((s.conf.InfectionCount + s.infectionCount - 1) % s.conf.InfectionCount)
|
s.infectionCount = ((s.conf.InfectionCount + s.infectionCount - 1) % s.conf.InfectionCount)
|
||||||
|
|
||||||
|
if !succeeded {
|
||||||
|
// If could not gossip with anyone then repeat.
|
||||||
|
s.infectionCount++
|
||||||
|
}
|
||||||
|
|
||||||
s.manager.GetMesh(meshId).SaveChanges()
|
s.manager.GetMesh(meshId).SaveChanges()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user