From 3ef1b68ba5b0c02979ca31fe79b84f0a352f2faf Mon Sep 17 00:00:00 2001 From: Tim Beatham Date: Thu, 30 Nov 2023 15:58:26 +0000 Subject: [PATCH] BUGFIX: Hashing datastore to work out changes Changed hashing implementation to work out if there are changes in the data store --- pkg/crdt/datastore.go | 4 ++-- pkg/crdt/two_phase_map.go | 15 +++++++++------ pkg/crdt/two_phase_map_syncer.go | 1 - pkg/lib/conv.go | 10 ++++++++++ pkg/sync/syncer.go | 1 + 5 files changed, 22 insertions(+), 9 deletions(-) diff --git a/pkg/crdt/datastore.go b/pkg/crdt/datastore.go index 599224a..343b8a9 100644 --- a/pkg/crdt/datastore.go +++ b/pkg/crdt/datastore.go @@ -222,13 +222,13 @@ func (m *TwoPhaseStoreMeshManager) GetDevice() (*wgtypes.Device, error) { // HasChanges returns true if we have changes since last time we synced func (m *TwoPhaseStoreMeshManager) HasChanges() bool { - clockValue := m.store.GetClock() + clockValue := m.store.GetHash() return clockValue != m.LastClock } // Record that we have changes and save the corresponding changes func (m *TwoPhaseStoreMeshManager) SaveChanges() { - clockValue := m.store.GetClock() + clockValue := m.store.GetHash() m.LastClock = clockValue } diff --git a/pkg/crdt/two_phase_map.go b/pkg/crdt/two_phase_map.go index 25ec162..931deec 100644 --- a/pkg/crdt/two_phase_map.go +++ b/pkg/crdt/two_phase_map.go @@ -128,16 +128,19 @@ func (m *TwoPhaseMap[K, D]) incrementClock() uint64 { return maxClock } -func (m *TwoPhaseMap[K, D]) GetClock() uint64 { - maxClock := uint64(0) +// GetHash: Get the hash of the current state of the map +// Sums the current values of the vectors. Provides good approximation +// of increasing numbers +func (m *TwoPhaseMap[K, D]) GetHash() uint64 { m.lock.RLock() - for _, value := range m.vectors { - maxClock = max(maxClock, value) - } + sum := lib.Reduce(uint64(0), lib.MapValues(m.vectors), func(sum uint64, current uint64) uint64 { + return current + sum + }) m.lock.RUnlock() - return maxClock + + return sum } // GetState: get the current vector clock of the add and remove diff --git a/pkg/crdt/two_phase_map_syncer.go b/pkg/crdt/two_phase_map_syncer.go index 77645bd..ff43183 100644 --- a/pkg/crdt/two_phase_map_syncer.go +++ b/pkg/crdt/two_phase_map_syncer.go @@ -125,7 +125,6 @@ func (t *TwoPhaseSyncer) RecvMessage(msg []byte) error { func (t *TwoPhaseSyncer) Complete() { logging.Log.WriteInfof("SYNC COMPLETED") - t.manager.SaveChanges() } func NewTwoPhaseSyncer(manager *TwoPhaseStoreMeshManager) *TwoPhaseSyncer { diff --git a/pkg/lib/conv.go b/pkg/lib/conv.go index 28e1b33..b540088 100644 --- a/pkg/lib/conv.go +++ b/pkg/lib/conv.go @@ -76,3 +76,13 @@ func Contains[V any](list []V, proposition func(V) bool) bool { return false } + +func Reduce[A any, V any](start A, values []V, reduce func(A, V) A) A { + accum := start + + for _, elem := range values { + accum = reduce(accum, elem) + } + + return accum +} diff --git a/pkg/sync/syncer.go b/pkg/sync/syncer.go index be7d836..0090d63 100644 --- a/pkg/sync/syncer.go +++ b/pkg/sync/syncer.go @@ -89,6 +89,7 @@ func (s *SyncerImpl) Sync(meshId string) error { logging.Log.WriteInfof("SYNC COUNT: %d", s.syncCount) s.infectionCount = ((s.conf.InfectionCount + s.infectionCount - 1) % s.conf.InfectionCount) + s.manager.GetMesh(meshId).SaveChanges() return nil }