72-pull-rate-in-configuration

- Added the pull rate to the configuration file
This commit is contained in:
Tim Beatham 2023-12-31 12:17:16 +00:00
parent c29eb197f3
commit 9e1058e0f2
4 changed files with 28 additions and 14 deletions

View File

@ -79,6 +79,8 @@ type DaemonConfiguration struct {
StubWg bool `yaml:"stubWg"` StubWg bool `yaml:"stubWg"`
// SyncRate specifies how long the minimum time should be between synchronisation // SyncRate specifies how long the minimum time should be between synchronisation
SyncRate int `yaml:"syncRate" validate:"required,gte=1"` SyncRate int `yaml:"syncRate" validate:"required,gte=1"`
// PullTime specifies the interval between checking for configuration changes
PullTime int `yaml:"pullTime" validate:"required,gte=0"`
// KeepAliveTime: number of seconds before the leader of the mesh sends an update to // KeepAliveTime: number of seconds before the leader of the mesh sends an update to
// send to every member in the mesh // send to every member in the mesh
KeepAliveTime int `yaml:"keepAliveTime" validate:"required,gte=1"` KeepAliveTime int `yaml:"keepAliveTime" validate:"required,gte=1"`

View File

@ -26,6 +26,7 @@ func getExampleConfiguration() *DaemonConfiguration {
ClusterSize: 64, ClusterSize: 64,
InterClusterChance: 0.15, InterClusterChance: 0.15,
BranchRate: 3, BranchRate: 3,
PullTime: 0,
InfectionCount: 2, InfectionCount: 2,
BaseConfiguration: WgConfiguration{ BaseConfiguration: WgConfiguration{
IPDiscovery: &discovery, IPDiscovery: &discovery,
@ -215,6 +216,17 @@ func TestInfectionCountOne(t *testing.T) {
} }
} }
func TestPullTimeNegative(t *testing.T) {
conf := getExampleConfiguration()
conf.PullTime = -1
err := ValidateDaemonConfiguration(conf)
if err == nil {
t.Fatal(`error should be thrown`)
}
}
func TestValidConfiguration(t *testing.T) { func TestValidConfiguration(t *testing.T) {
conf := getExampleConfiguration() conf := getExampleConfiguration()
err := ValidateDaemonConfiguration(conf) err := ValidateDaemonConfiguration(conf)

View File

@ -27,7 +27,7 @@ type SyncerImpl struct {
syncCount int syncCount int
cluster conn.ConnCluster cluster conn.ConnCluster
conf *conf.DaemonConfiguration conf *conf.DaemonConfiguration
lastSync map[string]uint64 lastSync map[string]int64
} }
// Sync: Sync with random nodes // Sync: Sync with random nodes
@ -54,8 +54,8 @@ func (s *SyncerImpl) Sync(correspondingMesh mesh.MeshProvider) error {
if self.GetType() == conf.PEER_ROLE && !correspondingMesh.HasChanges() && s.infectionCount == 0 { if self.GetType() == conf.PEER_ROLE && !correspondingMesh.HasChanges() && s.infectionCount == 0 {
logging.Log.WriteInfof("no changes for %s", correspondingMesh.GetMeshId()) logging.Log.WriteInfof("no changes for %s", correspondingMesh.GetMeshId())
// If not synchronised in certain pull from random neighbour // If not synchronised in certain time pull from random neighbour
if uint64(time.Now().Unix())-s.lastSync[correspondingMesh.GetMeshId()] > 20 { if s.conf.PullTime != 0 && time.Now().Unix()-s.lastSync[correspondingMesh.GetMeshId()] > int64(s.conf.PullTime) {
return s.Pull(self, correspondingMesh) return s.Pull(self, correspondingMesh)
} }
@ -84,7 +84,8 @@ func (s *SyncerImpl) Sync(correspondingMesh mesh.MeshProvider) error {
return nil return nil
} }
redundancyLength := min(len(neighbours), 3) // Peer with 2 nodes
redundancyLength := min(len(neighbours), 2)
gossipNodes = neighbours[:redundancyLength] gossipNodes = neighbours[:redundancyLength]
} else { } else {
neighbours := s.cluster.GetNeighbours(nodeNames, publicKey.String()) neighbours := s.cluster.GetNeighbours(nodeNames, publicKey.String())
@ -113,24 +114,23 @@ func (s *SyncerImpl) Sync(correspondingMesh mesh.MeshProvider) error {
} }
if err != nil { if err != nil {
logging.Log.WriteInfof(err.Error()) logging.Log.WriteErrorf(err.Error())
} }
} }
s.syncCount++ s.syncCount++
logging.Log.WriteInfof("SYNC TIME: %v", time.Since(before)) logging.Log.WriteInfof("sync time: %v", time.Since(before))
logging.Log.WriteInfof("SYNC COUNT: %d", s.syncCount) logging.Log.WriteInfof("number of syncs: %d", s.syncCount)
s.infectionCount = ((s.conf.InfectionCount + s.infectionCount - 1) % s.conf.InfectionCount) s.infectionCount = ((s.conf.InfectionCount + s.infectionCount - 1) % s.conf.InfectionCount)
if !succeeded { if !succeeded {
// If could not gossip with anyone then repeat.
s.infectionCount++ s.infectionCount++
} }
correspondingMesh.SaveChanges() correspondingMesh.SaveChanges()
s.lastSync[correspondingMesh.GetMeshId()] = uint64(time.Now().Unix()) s.lastSync[correspondingMesh.GetMeshId()] = time.Now().Unix()
return nil return nil
} }
@ -148,7 +148,7 @@ func (s *SyncerImpl) Pull(self mesh.MeshNode, mesh mesh.MeshProvider) error {
return nil return nil
} }
logging.Log.WriteInfof("PULLING from node %s", neighbour[0]) logging.Log.WriteInfof("pulling from node %s", neighbour[0])
pullNode, err := mesh.GetNode(neighbour[0]) pullNode, err := mesh.GetNode(neighbour[0])
@ -159,7 +159,7 @@ func (s *SyncerImpl) Pull(self mesh.MeshNode, mesh mesh.MeshProvider) error {
err = s.requester.SyncMesh(mesh.GetMeshId(), pullNode) err = s.requester.SyncMesh(mesh.GetMeshId(), pullNode)
if err == nil || err == io.EOF { if err == nil || err == io.EOF {
s.lastSync[mesh.GetMeshId()] = uint64(time.Now().Unix()) s.lastSync[mesh.GetMeshId()] = time.Now().Unix()
} else { } else {
return err return err
} }
@ -206,5 +206,5 @@ func NewSyncer(m mesh.MeshManager, conf *conf.DaemonConfiguration, r SyncRequest
infectionCount: 0, infectionCount: 0,
syncCount: 0, syncCount: 0,
cluster: cluster, cluster: cluster,
lastSync: make(map[string]uint64)} lastSync: make(map[string]int64)}
} }

View File

@ -99,11 +99,11 @@ func (s *SyncRequesterImpl) SyncMesh(meshId string, meshNode mesh.MeshNode) erro
err = s.syncMesh(mesh, ctx, c) err = s.syncMesh(mesh, ctx, c)
if err != nil { if err != nil {
return s.handleErr(meshId, pubKey.String(), err) s.handleErr(meshId, pubKey.String(), err)
} }
logging.Log.WriteInfof("Synced with node: %s meshId: %s\n", endpoint, meshId) logging.Log.WriteInfof("Synced with node: %s meshId: %s\n", endpoint, meshId)
return nil return err
} }
func (s *SyncRequesterImpl) syncMesh(mesh mesh.MeshProvider, ctx context.Context, client rpc.SyncServiceClient) error { func (s *SyncRequesterImpl) syncMesh(mesh mesh.MeshProvider, ctx context.Context, client rpc.SyncServiceClient) error {