diff --git a/pkg/sync/syncer.go b/pkg/sync/syncer.go index 7bfdb69..01d37bf 100644 --- a/pkg/sync/syncer.go +++ b/pkg/sync/syncer.go @@ -1,6 +1,8 @@ package sync import ( + "errors" + "fmt" "io" "math/rand" "time" @@ -25,7 +27,7 @@ type SyncerImpl struct { syncCount int cluster conn.ConnCluster conf *conf.DaemonConfiguration - lastSync uint64 + lastSync map[string]uint64 } // Sync: Sync random nodes @@ -37,6 +39,17 @@ func (s *SyncerImpl) Sync(meshId string) error { correspondingMesh.Prune() + if self != nil && self.GetType() == conf.PEER_ROLE && !s.manager.HasChanges(meshId) && s.infectionCount == 0 { + logging.Log.WriteInfof("No changes for %s", meshId) + + // If not synchronised in certain pull from random neighbour + if uint64(time.Now().Unix())-s.lastSync[meshId] > 20 { + return s.Pull(meshId) + } + + return nil + } + before := time.Now() s.manager.GetRouteManager().UpdateRoutes() @@ -105,7 +118,7 @@ func (s *SyncerImpl) Sync(meshId string) error { } s.manager.GetMesh(meshId).SaveChanges() - s.lastSync = uint64(time.Now().Unix()) + s.lastSync[meshId] = uint64(time.Now().Unix()) logging.Log.WriteInfof("UPDATING WG CONF") err := s.manager.ApplyConfig() @@ -117,6 +130,51 @@ func (s *SyncerImpl) Sync(meshId string) error { return nil } +// Pull one node in the cluster, if there has not been message dissemination +// in a certain period of time pull a random node within the cluster +func (s *SyncerImpl) Pull(meshId string) error { + mesh := s.manager.GetMesh(meshId) + self, err := s.manager.GetSelf(meshId) + + if err != nil { + return err + } + + pubKey, _ := self.GetPublicKey() + + if mesh == nil { + return errors.New("mesh is nil, invalid operation") + } + + peers := mesh.GetPeers() + neighbours := s.cluster.GetNeighbours(peers, pubKey.String()) + neighbour := lib.RandomSubsetOfLength(neighbours, 1) + + if len(neighbour) == 0 { + logging.Log.WriteInfof("no neighbours") + return nil + } + + logging.Log.WriteInfof("PULLING from node %s", neighbour[0]) + + pullNode, err := mesh.GetNode(neighbour[0]) + + if err != nil || pullNode == nil { + return fmt.Errorf("node %s does not exist in the mesh", neighbour[0]) + } + + err = s.requester.SyncMesh(meshId, pullNode) + + if err == nil || err == io.EOF { + s.lastSync[meshId] = uint64(time.Now().Unix()) + } else { + return err + } + + s.syncCount++ + return nil +} + // SyncMeshes: Sync all meshes func (s *SyncerImpl) SyncMeshes() error { for meshId := range s.manager.GetMeshes() { @@ -138,5 +196,6 @@ func NewSyncer(m mesh.MeshManager, conf *conf.DaemonConfiguration, r SyncRequest requester: r, infectionCount: 0, syncCount: 0, - cluster: cluster} + cluster: cluster, + lastSync: make(map[string]uint64)} }