diff --git a/pkg/crdt/two_phase_map_syncer.go b/pkg/crdt/two_phase_map_syncer.go index ac38f47..bf0421b 100644 --- a/pkg/crdt/two_phase_map_syncer.go +++ b/pkg/crdt/two_phase_map_syncer.go @@ -68,9 +68,16 @@ func prepare(syncer *TwoPhaseSyncer) ([]byte, bool) { return nil, false } + // Increment the clock here so the clock gets + // distributed to everyone else in the mesh + syncer.manager.store.Clock.IncrementClock() + var buffer bytes.Buffer enc := gob.NewEncoder(&buffer) + mapState := syncer.manager.store.GenerateMessage() + + syncer.mapState = mapState err = enc.Encode(*syncer.mapState) if err != nil { @@ -94,7 +101,7 @@ func present(syncer *TwoPhaseSyncer) ([]byte, bool) { if err != nil { logging.Log.WriteErrorf(err.Error()) - } + } difference := syncer.mapState.Difference(syncer.manager.store.Clock.GetStaleCount(), &mapState) syncer.manager.store.Clock.Merge(mapState.Vectors) @@ -164,9 +171,6 @@ func (t *TwoPhaseSyncer) RecvMessage(msg []byte) error { func (t *TwoPhaseSyncer) Complete() { logging.Log.WriteInfof("SYNC COMPLETED") - if t.state >= MERGE { - t.manager.store.Clock.IncrementClock() - } } func NewTwoPhaseSyncer(manager *TwoPhaseStoreMeshManager) *TwoPhaseSyncer { @@ -181,7 +185,6 @@ func NewTwoPhaseSyncer(manager *TwoPhaseStoreMeshManager) *TwoPhaseSyncer { return &TwoPhaseSyncer{ manager: manager, state: HASH, - mapState: manager.store.GenerateMessage(), generateMessageFSM: generateMessageFsm, } } diff --git a/pkg/sync/syncer.go b/pkg/sync/syncer.go index a74685d..7bfdb69 100644 --- a/pkg/sync/syncer.go +++ b/pkg/sync/syncer.go @@ -37,11 +37,6 @@ func (s *SyncerImpl) Sync(meshId string) error { correspondingMesh.Prune() - if self != nil && self.GetType() == conf.PEER_ROLE && !s.manager.HasChanges(meshId) && s.infectionCount == 0 { - logging.Log.WriteInfof("No changes for %s", meshId) - return nil - } - before := time.Now() s.manager.GetRouteManager().UpdateRoutes()