diff --git a/pkg/conf/conf.go b/pkg/conf/conf.go index 86ca077..c38f85c 100644 --- a/pkg/conf/conf.go +++ b/pkg/conf/conf.go @@ -79,6 +79,8 @@ type DaemonConfiguration struct { StubWg bool `yaml:"stubWg"` // SyncRate specifies how long the minimum time should be between synchronisation SyncRate int `yaml:"syncRate" validate:"required,gte=1"` + // PullTime specifies the interval between checking for configuration changes + PullTime int `yaml:"pullTime" validate:"required,gte=0"` // KeepAliveTime: number of seconds before the leader of the mesh sends an update to // send to every member in the mesh KeepAliveTime int `yaml:"keepAliveTime" validate:"required,gte=1"` diff --git a/pkg/conf/conf_test.go b/pkg/conf/conf_test.go index 45c8138..5f6aa62 100644 --- a/pkg/conf/conf_test.go +++ b/pkg/conf/conf_test.go @@ -26,6 +26,7 @@ func getExampleConfiguration() *DaemonConfiguration { ClusterSize: 64, InterClusterChance: 0.15, BranchRate: 3, + PullTime: 0, InfectionCount: 2, BaseConfiguration: WgConfiguration{ IPDiscovery: &discovery, @@ -215,6 +216,17 @@ func TestInfectionCountOne(t *testing.T) { } } +func TestPullTimeNegative(t *testing.T) { + conf := getExampleConfiguration() + conf.PullTime = -1 + + err := ValidateDaemonConfiguration(conf) + + if err == nil { + t.Fatal(`error should be thrown`) + } +} + func TestValidConfiguration(t *testing.T) { conf := getExampleConfiguration() err := ValidateDaemonConfiguration(conf) diff --git a/pkg/sync/syncer.go b/pkg/sync/syncer.go index a052bd4..c64a86e 100644 --- a/pkg/sync/syncer.go +++ b/pkg/sync/syncer.go @@ -27,7 +27,7 @@ type SyncerImpl struct { syncCount int cluster conn.ConnCluster conf *conf.DaemonConfiguration - lastSync map[string]uint64 + lastSync map[string]int64 } // Sync: Sync with random nodes @@ -54,8 +54,8 @@ func (s *SyncerImpl) Sync(correspondingMesh mesh.MeshProvider) error { if self.GetType() == conf.PEER_ROLE && !correspondingMesh.HasChanges() && s.infectionCount == 0 { logging.Log.WriteInfof("no changes for %s", correspondingMesh.GetMeshId()) - // If not synchronised in certain pull from random neighbour - if uint64(time.Now().Unix())-s.lastSync[correspondingMesh.GetMeshId()] > 20 { + // If not synchronised in certain time pull from random neighbour + if s.conf.PullTime != 0 && time.Now().Unix()-s.lastSync[correspondingMesh.GetMeshId()] > int64(s.conf.PullTime) { return s.Pull(self, correspondingMesh) } @@ -84,7 +84,8 @@ func (s *SyncerImpl) Sync(correspondingMesh mesh.MeshProvider) error { return nil } - redundancyLength := min(len(neighbours), 3) + // Peer with 2 nodes + redundancyLength := min(len(neighbours), 2) gossipNodes = neighbours[:redundancyLength] } else { neighbours := s.cluster.GetNeighbours(nodeNames, publicKey.String()) @@ -113,24 +114,23 @@ func (s *SyncerImpl) Sync(correspondingMesh mesh.MeshProvider) error { } if err != nil { - logging.Log.WriteInfof(err.Error()) + logging.Log.WriteErrorf(err.Error()) } } s.syncCount++ - logging.Log.WriteInfof("SYNC TIME: %v", time.Since(before)) - logging.Log.WriteInfof("SYNC COUNT: %d", s.syncCount) + logging.Log.WriteInfof("sync time: %v", time.Since(before)) + logging.Log.WriteInfof("number of syncs: %d", s.syncCount) s.infectionCount = ((s.conf.InfectionCount + s.infectionCount - 1) % s.conf.InfectionCount) if !succeeded { - // If could not gossip with anyone then repeat. s.infectionCount++ } correspondingMesh.SaveChanges() - s.lastSync[correspondingMesh.GetMeshId()] = uint64(time.Now().Unix()) + s.lastSync[correspondingMesh.GetMeshId()] = time.Now().Unix() return nil } @@ -148,7 +148,7 @@ func (s *SyncerImpl) Pull(self mesh.MeshNode, mesh mesh.MeshProvider) error { return nil } - logging.Log.WriteInfof("PULLING from node %s", neighbour[0]) + logging.Log.WriteInfof("pulling from node %s", neighbour[0]) pullNode, err := mesh.GetNode(neighbour[0]) @@ -159,7 +159,7 @@ func (s *SyncerImpl) Pull(self mesh.MeshNode, mesh mesh.MeshProvider) error { err = s.requester.SyncMesh(mesh.GetMeshId(), pullNode) if err == nil || err == io.EOF { - s.lastSync[mesh.GetMeshId()] = uint64(time.Now().Unix()) + s.lastSync[mesh.GetMeshId()] = time.Now().Unix() } else { return err } @@ -206,5 +206,5 @@ func NewSyncer(m mesh.MeshManager, conf *conf.DaemonConfiguration, r SyncRequest infectionCount: 0, syncCount: 0, cluster: cluster, - lastSync: make(map[string]uint64)} + lastSync: make(map[string]int64)} } diff --git a/pkg/sync/syncrequester.go b/pkg/sync/syncrequester.go index a3fa6da..4caf029 100644 --- a/pkg/sync/syncrequester.go +++ b/pkg/sync/syncrequester.go @@ -99,11 +99,11 @@ func (s *SyncRequesterImpl) SyncMesh(meshId string, meshNode mesh.MeshNode) erro err = s.syncMesh(mesh, ctx, c) if err != nil { - return s.handleErr(meshId, pubKey.String(), err) + s.handleErr(meshId, pubKey.String(), err) } logging.Log.WriteInfof("Synced with node: %s meshId: %s\n", endpoint, meshId) - return nil + return err } func (s *SyncRequesterImpl) syncMesh(mesh mesh.MeshProvider, ctx context.Context, client rpc.SyncServiceClient) error {