From 092d9a4af50d5562be00425505a9ff101ea37551 Mon Sep 17 00:00:00 2001 From: Tim Beatham Date: Sun, 17 Dec 2023 09:44:32 +0000 Subject: [PATCH 1/4] checking-latency-for-pull-only --- pkg/crdt/two_phase_map_syncer.go | 13 ++++++++----- pkg/sync/syncer.go | 5 ----- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/pkg/crdt/two_phase_map_syncer.go b/pkg/crdt/two_phase_map_syncer.go index ac38f47..bf0421b 100644 --- a/pkg/crdt/two_phase_map_syncer.go +++ b/pkg/crdt/two_phase_map_syncer.go @@ -68,9 +68,16 @@ func prepare(syncer *TwoPhaseSyncer) ([]byte, bool) { return nil, false } + // Increment the clock here so the clock gets + // distributed to everyone else in the mesh + syncer.manager.store.Clock.IncrementClock() + var buffer bytes.Buffer enc := gob.NewEncoder(&buffer) + mapState := syncer.manager.store.GenerateMessage() + + syncer.mapState = mapState err = enc.Encode(*syncer.mapState) if err != nil { @@ -94,7 +101,7 @@ func present(syncer *TwoPhaseSyncer) ([]byte, bool) { if err != nil { logging.Log.WriteErrorf(err.Error()) - } + } difference := syncer.mapState.Difference(syncer.manager.store.Clock.GetStaleCount(), &mapState) syncer.manager.store.Clock.Merge(mapState.Vectors) @@ -164,9 +171,6 @@ func (t *TwoPhaseSyncer) RecvMessage(msg []byte) error { func (t *TwoPhaseSyncer) Complete() { logging.Log.WriteInfof("SYNC COMPLETED") - if t.state >= MERGE { - t.manager.store.Clock.IncrementClock() - } } func NewTwoPhaseSyncer(manager *TwoPhaseStoreMeshManager) *TwoPhaseSyncer { @@ -181,7 +185,6 @@ func NewTwoPhaseSyncer(manager *TwoPhaseStoreMeshManager) *TwoPhaseSyncer { return &TwoPhaseSyncer{ manager: manager, state: HASH, - mapState: manager.store.GenerateMessage(), generateMessageFSM: generateMessageFsm, } } diff --git a/pkg/sync/syncer.go b/pkg/sync/syncer.go index a74685d..7bfdb69 100644 --- a/pkg/sync/syncer.go +++ b/pkg/sync/syncer.go @@ -37,11 +37,6 @@ func (s *SyncerImpl) Sync(meshId string) error { correspondingMesh.Prune() - if self != nil && self.GetType() == conf.PEER_ROLE && !s.manager.HasChanges(meshId) && s.infectionCount == 0 { - logging.Log.WriteInfof("No changes for %s", meshId) - return nil - } - before := time.Now() s.manager.GetRouteManager().UpdateRoutes() From ad22f04b0d3094ec9adea2951fbb47bc865cecdd Mon Sep 17 00:00:00 2001 From: Tim Beatham Date: Mon, 18 Dec 2023 20:45:56 +0000 Subject: [PATCH 2/4] bugfix-pull-only After certain period of time if no changes have occurred then pull --- pkg/sync/syncer.go | 65 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 62 insertions(+), 3 deletions(-) diff --git a/pkg/sync/syncer.go b/pkg/sync/syncer.go index 7bfdb69..01d37bf 100644 --- a/pkg/sync/syncer.go +++ b/pkg/sync/syncer.go @@ -1,6 +1,8 @@ package sync import ( + "errors" + "fmt" "io" "math/rand" "time" @@ -25,7 +27,7 @@ type SyncerImpl struct { syncCount int cluster conn.ConnCluster conf *conf.DaemonConfiguration - lastSync uint64 + lastSync map[string]uint64 } // Sync: Sync random nodes @@ -37,6 +39,17 @@ func (s *SyncerImpl) Sync(meshId string) error { correspondingMesh.Prune() + if self != nil && self.GetType() == conf.PEER_ROLE && !s.manager.HasChanges(meshId) && s.infectionCount == 0 { + logging.Log.WriteInfof("No changes for %s", meshId) + + // If not synchronised in certain pull from random neighbour + if uint64(time.Now().Unix())-s.lastSync[meshId] > 20 { + return s.Pull(meshId) + } + + return nil + } + before := time.Now() s.manager.GetRouteManager().UpdateRoutes() @@ -105,7 +118,7 @@ func (s *SyncerImpl) Sync(meshId string) error { } s.manager.GetMesh(meshId).SaveChanges() - s.lastSync = uint64(time.Now().Unix()) + s.lastSync[meshId] = uint64(time.Now().Unix()) logging.Log.WriteInfof("UPDATING WG CONF") err := s.manager.ApplyConfig() @@ -117,6 +130,51 @@ func (s *SyncerImpl) Sync(meshId string) error { return nil } +// Pull one node in the cluster, if there has not been message dissemination +// in a certain period of time pull a random node within the cluster +func (s *SyncerImpl) Pull(meshId string) error { + mesh := s.manager.GetMesh(meshId) + self, err := s.manager.GetSelf(meshId) + + if err != nil { + return err + } + + pubKey, _ := self.GetPublicKey() + + if mesh == nil { + return errors.New("mesh is nil, invalid operation") + } + + peers := mesh.GetPeers() + neighbours := s.cluster.GetNeighbours(peers, pubKey.String()) + neighbour := lib.RandomSubsetOfLength(neighbours, 1) + + if len(neighbour) == 0 { + logging.Log.WriteInfof("no neighbours") + return nil + } + + logging.Log.WriteInfof("PULLING from node %s", neighbour[0]) + + pullNode, err := mesh.GetNode(neighbour[0]) + + if err != nil || pullNode == nil { + return fmt.Errorf("node %s does not exist in the mesh", neighbour[0]) + } + + err = s.requester.SyncMesh(meshId, pullNode) + + if err == nil || err == io.EOF { + s.lastSync[meshId] = uint64(time.Now().Unix()) + } else { + return err + } + + s.syncCount++ + return nil +} + // SyncMeshes: Sync all meshes func (s *SyncerImpl) SyncMeshes() error { for meshId := range s.manager.GetMeshes() { @@ -138,5 +196,6 @@ func NewSyncer(m mesh.MeshManager, conf *conf.DaemonConfiguration, r SyncRequest requester: r, infectionCount: 0, syncCount: 0, - cluster: cluster} + cluster: cluster, + lastSync: make(map[string]uint64)} } From b6199892f0dfff6dc8a0efb40646aa905d6d8770 Mon Sep 17 00:00:00 2001 From: Tim Beatham Date: Mon, 18 Dec 2023 22:17:46 +0000 Subject: [PATCH 3/4] bugfix-pull-only Bugfix with inter-cluster communication pull not working --- pkg/conn/cluster.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/conn/cluster.go b/pkg/conn/cluster.go index c8c344e..bc7c925 100644 --- a/pkg/conn/cluster.go +++ b/pkg/conn/cluster.go @@ -55,12 +55,14 @@ func (i *ConnClusterImpl) GetNeighbours(global []string, selfId string) []string // you will communicate with a random node that is not in your cluster. func (i *ConnClusterImpl) GetInterCluster(global []string, selfId string) string { // Doesn't matter if not in it. Get index of where the node 'should' be + slices.Sort(global) + index, _ := binarySearch(global, selfId, 1) numClusters := math.Ceil(float64(len(global)) / float64(i.clusterSize)) randomCluster := rand.Intn(int(numClusters)-1) + 1 - neighbourIndex := (index + randomCluster) % len(global) + neighbourIndex := (index + (randomCluster * i.clusterSize)) % len(global) return global[neighbourIndex] } From 6ed32f3a79b72dbb83fa8e6bfe788392cfedfba6 Mon Sep 17 00:00:00 2001 From: Tim Beatham Date: Tue, 19 Dec 2023 00:50:17 +0000 Subject: [PATCH 4/4] bugfix-push-pull Organised groups as a tree so that there isn't a limit to dissemination --- pkg/conn/cluster.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/conn/cluster.go b/pkg/conn/cluster.go index bc7c925..d3d241b 100644 --- a/pkg/conn/cluster.go +++ b/pkg/conn/cluster.go @@ -58,11 +58,11 @@ func (i *ConnClusterImpl) GetInterCluster(global []string, selfId string) string slices.Sort(global) index, _ := binarySearch(global, selfId, 1) - numClusters := math.Ceil(float64(len(global)) / float64(i.clusterSize)) - randomCluster := rand.Intn(int(numClusters)-1) + 1 + randomCluster := rand.Intn(2) + 1 - neighbourIndex := (index + (randomCluster * i.clusterSize)) % len(global) + // cluster is considered a heap + neighbourIndex := (2*index + (randomCluster * i.clusterSize)) % len(global) return global[neighbourIndex] }