43-use-gravestones

Change of approach from keepalive to a noiseless protocol
This commit is contained in:
Tim Beatham
2023-12-06 22:45:04 +00:00
parent b78d96986c
commit a3ceff019d
6 changed files with 113 additions and 27 deletions

View File

@@ -1,8 +1,8 @@
package sync
import (
"io"
"math/rand"
"sync"
"time"
"github.com/tim-beatham/wgmesh/pkg/conf"
@@ -59,36 +59,42 @@ func (s *SyncerImpl) Sync(meshId string) error {
if len(nodeNames) > s.conf.ClusterSize && rand.Float64() < s.conf.InterClusterChance {
logging.Log.WriteInfof("Sending to random cluster")
interCluster := s.cluster.GetInterCluster(nodeNames, publicKey.String())
randomSubset = append(randomSubset, interCluster)
randomSubset[len(randomSubset)-1] = s.cluster.GetInterCluster(nodeNames, publicKey.String())
}
var waitGroup sync.WaitGroup
var succeeded bool = false
for index := range randomSubset {
waitGroup.Add(1)
// Do this synchronously to conserve bandwidth
for _, node := range randomSubset {
correspondingPeer := s.manager.GetNode(meshId, node)
go func(i int) error {
defer waitGroup.Done()
if correspondingPeer == nil {
logging.Log.WriteErrorf("node %s does not exist", node)
}
correspondingPeer := s.manager.GetNode(meshId, randomSubset[i])
err = s.requester.SyncMesh(meshId, correspondingPeer.GetHostEndpoint())
if correspondingPeer == nil {
logging.Log.WriteErrorf("node %s does not exist", randomSubset[i])
}
err := s.requester.SyncMesh(meshId, correspondingPeer.GetHostEndpoint())
return err
}(index)
if err == nil || err == io.EOF {
succeeded = true
} else {
// If the synchronisation operation has failed them mark a gravestone
// preventing the peer from being re-contacted until it has updated
// itself
s.manager.GetMesh(meshId).Mark(node)
}
}
waitGroup.Wait()
s.syncCount++
logging.Log.WriteInfof("SYNC TIME: %v", time.Since(before))
logging.Log.WriteInfof("SYNC COUNT: %d", s.syncCount)
s.infectionCount = ((s.conf.InfectionCount + s.infectionCount - 1) % s.conf.InfectionCount)
if !succeeded {
// If could not gossip with anyone then repeat.
s.infectionCount++
}
s.manager.GetMesh(meshId).SaveChanges()
return nil
}