diff --git a/pkg/crdt/two_phase_map.go b/pkg/crdt/two_phase_map.go index f735244..88e8b24 100644 --- a/pkg/crdt/two_phase_map.go +++ b/pkg/crdt/two_phase_map.go @@ -151,7 +151,7 @@ func (m *TwoPhaseMap[K, D]) GenerateMessage() *TwoPhaseMapState[K] { } } -func (m *TwoPhaseMapState[K]) Difference(state *TwoPhaseMapState[K]) *TwoPhaseMapState[K] { +func (m *TwoPhaseMapState[K]) Difference(highestStale uint64, state *TwoPhaseMapState[K]) *TwoPhaseMapState[K] { mapState := &TwoPhaseMapState[K]{ AddContents: make(map[uint64]uint64), RemoveContents: make(map[uint64]uint64), @@ -160,7 +160,7 @@ func (m *TwoPhaseMapState[K]) Difference(state *TwoPhaseMapState[K]) *TwoPhaseMa for key, value := range state.AddContents { otherValue, ok := m.AddContents[key] - if !ok || otherValue < value { + if value > highestStale && (!ok || otherValue < value) { mapState.AddContents[key] = value } } @@ -168,7 +168,7 @@ func (m *TwoPhaseMapState[K]) Difference(state *TwoPhaseMapState[K]) *TwoPhaseMa for key, value := range state.RemoveContents { otherValue, ok := m.RemoveContents[key] - if !ok || otherValue < value { + if value > highestStale && (!ok || otherValue < value) { mapState.RemoveContents[key] = value } } diff --git a/pkg/crdt/two_phase_map_syncer.go b/pkg/crdt/two_phase_map_syncer.go index 1247823..ac38f47 100644 --- a/pkg/crdt/two_phase_map_syncer.go +++ b/pkg/crdt/two_phase_map_syncer.go @@ -94,9 +94,9 @@ func present(syncer *TwoPhaseSyncer) ([]byte, bool) { if err != nil { logging.Log.WriteErrorf(err.Error()) - } + } - difference := syncer.mapState.Difference(&mapState) + difference := syncer.mapState.Difference(syncer.manager.store.Clock.GetStaleCount(), &mapState) syncer.manager.store.Clock.Merge(mapState.Vectors) var sendBuffer bytes.Buffer diff --git a/pkg/crdt/vector_clock.go b/pkg/crdt/vector_clock.go index 584cef3..278a5c1 100644 --- a/pkg/crdt/vector_clock.go +++ b/pkg/crdt/vector_clock.go @@ -23,6 +23,8 @@ type VectorClock[K cmp.Ordered] struct { processID K staleTime uint64 hashFunc func(K) uint64 + // highest update that's been garbage collected + highestStale uint64 } // IncrementClock: increments the node's value in the vector clock @@ -78,6 +80,7 @@ func (m *VectorClock[K]) getStale() []uint64 { for key, bucket := range m.vectors { if maxTimeStamp-bucket.lastUpdate > m.staleTime { toRemove = append(toRemove, key) + m.highestStale = max(bucket.clock, m.highestStale) } } @@ -85,6 +88,15 @@ func (m *VectorClock[K]) getStale() []uint64 { return toRemove } +// GetStaleCount: returns a vector clock which is considered to be stale. +// all updates must be greater than this +func (m *VectorClock[K]) GetStaleCount() uint64 { + m.lock.RLock() + staleCount := m.highestStale + m.lock.RUnlock() + return staleCount +} + func (m *VectorClock[K]) Prune() { stale := m.getStale() @@ -120,7 +132,9 @@ func (m *VectorClock[K]) put(key uint64, value uint64) { clockValue = bucket.clock } - if value > clockValue { + // Make sure that entries that were garbage collected don't get + // addded back + if value > clockValue && value > m.highestStale { newBucket := VectorBucket{ clock: value, lastUpdate: uint64(time.Now().Unix()),