diff --git a/pkg/conn/cluster.go b/pkg/conn/cluster.go index c8c344e..d3d241b 100644 --- a/pkg/conn/cluster.go +++ b/pkg/conn/cluster.go @@ -55,12 +55,14 @@ func (i *ConnClusterImpl) GetNeighbours(global []string, selfId string) []string // you will communicate with a random node that is not in your cluster. func (i *ConnClusterImpl) GetInterCluster(global []string, selfId string) string { // Doesn't matter if not in it. Get index of where the node 'should' be + slices.Sort(global) + index, _ := binarySearch(global, selfId, 1) - numClusters := math.Ceil(float64(len(global)) / float64(i.clusterSize)) - randomCluster := rand.Intn(int(numClusters)-1) + 1 + randomCluster := rand.Intn(2) + 1 - neighbourIndex := (index + randomCluster) % len(global) + // cluster is considered a heap + neighbourIndex := (2*index + (randomCluster * i.clusterSize)) % len(global) return global[neighbourIndex] } diff --git a/pkg/crdt/two_phase_map_syncer.go b/pkg/crdt/two_phase_map_syncer.go index ac38f47..bf0421b 100644 --- a/pkg/crdt/two_phase_map_syncer.go +++ b/pkg/crdt/two_phase_map_syncer.go @@ -68,9 +68,16 @@ func prepare(syncer *TwoPhaseSyncer) ([]byte, bool) { return nil, false } + // Increment the clock here so the clock gets + // distributed to everyone else in the mesh + syncer.manager.store.Clock.IncrementClock() + var buffer bytes.Buffer enc := gob.NewEncoder(&buffer) + mapState := syncer.manager.store.GenerateMessage() + + syncer.mapState = mapState err = enc.Encode(*syncer.mapState) if err != nil { @@ -94,7 +101,7 @@ func present(syncer *TwoPhaseSyncer) ([]byte, bool) { if err != nil { logging.Log.WriteErrorf(err.Error()) - } + } difference := syncer.mapState.Difference(syncer.manager.store.Clock.GetStaleCount(), &mapState) syncer.manager.store.Clock.Merge(mapState.Vectors) @@ -164,9 +171,6 @@ func (t *TwoPhaseSyncer) RecvMessage(msg []byte) error { func (t *TwoPhaseSyncer) Complete() { logging.Log.WriteInfof("SYNC COMPLETED") - if t.state >= MERGE { - t.manager.store.Clock.IncrementClock() - } } func NewTwoPhaseSyncer(manager *TwoPhaseStoreMeshManager) *TwoPhaseSyncer { @@ -181,7 +185,6 @@ func NewTwoPhaseSyncer(manager *TwoPhaseStoreMeshManager) *TwoPhaseSyncer { return &TwoPhaseSyncer{ manager: manager, state: HASH, - mapState: manager.store.GenerateMessage(), generateMessageFSM: generateMessageFsm, } } diff --git a/pkg/sync/syncer.go b/pkg/sync/syncer.go index a74685d..01d37bf 100644 --- a/pkg/sync/syncer.go +++ b/pkg/sync/syncer.go @@ -1,6 +1,8 @@ package sync import ( + "errors" + "fmt" "io" "math/rand" "time" @@ -25,7 +27,7 @@ type SyncerImpl struct { syncCount int cluster conn.ConnCluster conf *conf.DaemonConfiguration - lastSync uint64 + lastSync map[string]uint64 } // Sync: Sync random nodes @@ -39,6 +41,12 @@ func (s *SyncerImpl) Sync(meshId string) error { if self != nil && self.GetType() == conf.PEER_ROLE && !s.manager.HasChanges(meshId) && s.infectionCount == 0 { logging.Log.WriteInfof("No changes for %s", meshId) + + // If not synchronised in certain pull from random neighbour + if uint64(time.Now().Unix())-s.lastSync[meshId] > 20 { + return s.Pull(meshId) + } + return nil } @@ -110,7 +118,7 @@ func (s *SyncerImpl) Sync(meshId string) error { } s.manager.GetMesh(meshId).SaveChanges() - s.lastSync = uint64(time.Now().Unix()) + s.lastSync[meshId] = uint64(time.Now().Unix()) logging.Log.WriteInfof("UPDATING WG CONF") err := s.manager.ApplyConfig() @@ -122,6 +130,51 @@ func (s *SyncerImpl) Sync(meshId string) error { return nil } +// Pull one node in the cluster, if there has not been message dissemination +// in a certain period of time pull a random node within the cluster +func (s *SyncerImpl) Pull(meshId string) error { + mesh := s.manager.GetMesh(meshId) + self, err := s.manager.GetSelf(meshId) + + if err != nil { + return err + } + + pubKey, _ := self.GetPublicKey() + + if mesh == nil { + return errors.New("mesh is nil, invalid operation") + } + + peers := mesh.GetPeers() + neighbours := s.cluster.GetNeighbours(peers, pubKey.String()) + neighbour := lib.RandomSubsetOfLength(neighbours, 1) + + if len(neighbour) == 0 { + logging.Log.WriteInfof("no neighbours") + return nil + } + + logging.Log.WriteInfof("PULLING from node %s", neighbour[0]) + + pullNode, err := mesh.GetNode(neighbour[0]) + + if err != nil || pullNode == nil { + return fmt.Errorf("node %s does not exist in the mesh", neighbour[0]) + } + + err = s.requester.SyncMesh(meshId, pullNode) + + if err == nil || err == io.EOF { + s.lastSync[meshId] = uint64(time.Now().Unix()) + } else { + return err + } + + s.syncCount++ + return nil +} + // SyncMeshes: Sync all meshes func (s *SyncerImpl) SyncMeshes() error { for meshId := range s.manager.GetMeshes() { @@ -143,5 +196,6 @@ func NewSyncer(m mesh.MeshManager, conf *conf.DaemonConfiguration, r SyncRequest requester: r, infectionCount: 0, syncCount: 0, - cluster: cluster} + cluster: cluster, + lastSync: make(map[string]uint64)} }