mirror of
https://github.com/tim-beatham/smegmesh.git
synced 2025-06-21 20:41:48 +02:00
main - bugfix
- Nodes not being removed when deleted because when node gossips again it is readded. - Keep track of highest vector clock we have removed and used this as a mark for determining if something is stale.
This commit is contained in:
parent
3222d7e388
commit
13bea10638
@ -151,7 +151,7 @@ func (m *TwoPhaseMap[K, D]) GenerateMessage() *TwoPhaseMapState[K] {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *TwoPhaseMapState[K]) Difference(state *TwoPhaseMapState[K]) *TwoPhaseMapState[K] {
|
func (m *TwoPhaseMapState[K]) Difference(highestStale uint64, state *TwoPhaseMapState[K]) *TwoPhaseMapState[K] {
|
||||||
mapState := &TwoPhaseMapState[K]{
|
mapState := &TwoPhaseMapState[K]{
|
||||||
AddContents: make(map[uint64]uint64),
|
AddContents: make(map[uint64]uint64),
|
||||||
RemoveContents: make(map[uint64]uint64),
|
RemoveContents: make(map[uint64]uint64),
|
||||||
@ -160,7 +160,7 @@ func (m *TwoPhaseMapState[K]) Difference(state *TwoPhaseMapState[K]) *TwoPhaseMa
|
|||||||
for key, value := range state.AddContents {
|
for key, value := range state.AddContents {
|
||||||
otherValue, ok := m.AddContents[key]
|
otherValue, ok := m.AddContents[key]
|
||||||
|
|
||||||
if !ok || otherValue < value {
|
if value > highestStale && (!ok || otherValue < value) {
|
||||||
mapState.AddContents[key] = value
|
mapState.AddContents[key] = value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -168,7 +168,7 @@ func (m *TwoPhaseMapState[K]) Difference(state *TwoPhaseMapState[K]) *TwoPhaseMa
|
|||||||
for key, value := range state.RemoveContents {
|
for key, value := range state.RemoveContents {
|
||||||
otherValue, ok := m.RemoveContents[key]
|
otherValue, ok := m.RemoveContents[key]
|
||||||
|
|
||||||
if !ok || otherValue < value {
|
if value > highestStale && (!ok || otherValue < value) {
|
||||||
mapState.RemoveContents[key] = value
|
mapState.RemoveContents[key] = value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -94,9 +94,9 @@ func present(syncer *TwoPhaseSyncer) ([]byte, bool) {
|
|||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logging.Log.WriteErrorf(err.Error())
|
logging.Log.WriteErrorf(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
difference := syncer.mapState.Difference(&mapState)
|
difference := syncer.mapState.Difference(syncer.manager.store.Clock.GetStaleCount(), &mapState)
|
||||||
syncer.manager.store.Clock.Merge(mapState.Vectors)
|
syncer.manager.store.Clock.Merge(mapState.Vectors)
|
||||||
|
|
||||||
var sendBuffer bytes.Buffer
|
var sendBuffer bytes.Buffer
|
||||||
|
@ -23,6 +23,8 @@ type VectorClock[K cmp.Ordered] struct {
|
|||||||
processID K
|
processID K
|
||||||
staleTime uint64
|
staleTime uint64
|
||||||
hashFunc func(K) uint64
|
hashFunc func(K) uint64
|
||||||
|
// highest update that's been garbage collected
|
||||||
|
highestStale uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// IncrementClock: increments the node's value in the vector clock
|
// IncrementClock: increments the node's value in the vector clock
|
||||||
@ -78,6 +80,7 @@ func (m *VectorClock[K]) getStale() []uint64 {
|
|||||||
for key, bucket := range m.vectors {
|
for key, bucket := range m.vectors {
|
||||||
if maxTimeStamp-bucket.lastUpdate > m.staleTime {
|
if maxTimeStamp-bucket.lastUpdate > m.staleTime {
|
||||||
toRemove = append(toRemove, key)
|
toRemove = append(toRemove, key)
|
||||||
|
m.highestStale = max(bucket.clock, m.highestStale)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -85,6 +88,15 @@ func (m *VectorClock[K]) getStale() []uint64 {
|
|||||||
return toRemove
|
return toRemove
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetStaleCount: returns a vector clock which is considered to be stale.
|
||||||
|
// all updates must be greater than this
|
||||||
|
func (m *VectorClock[K]) GetStaleCount() uint64 {
|
||||||
|
m.lock.RLock()
|
||||||
|
staleCount := m.highestStale
|
||||||
|
m.lock.RUnlock()
|
||||||
|
return staleCount
|
||||||
|
}
|
||||||
|
|
||||||
func (m *VectorClock[K]) Prune() {
|
func (m *VectorClock[K]) Prune() {
|
||||||
stale := m.getStale()
|
stale := m.getStale()
|
||||||
|
|
||||||
@ -120,7 +132,9 @@ func (m *VectorClock[K]) put(key uint64, value uint64) {
|
|||||||
clockValue = bucket.clock
|
clockValue = bucket.clock
|
||||||
}
|
}
|
||||||
|
|
||||||
if value > clockValue {
|
// Make sure that entries that were garbage collected don't get
|
||||||
|
// addded back
|
||||||
|
if value > clockValue && value > m.highestStale {
|
||||||
newBucket := VectorBucket{
|
newBucket := VectorBucket{
|
||||||
clock: value,
|
clock: value,
|
||||||
lastUpdate: uint64(time.Now().Unix()),
|
lastUpdate: uint64(time.Now().Unix()),
|
||||||
|
Loading…
x
Reference in New Issue
Block a user