Compare commits

...

5 Commits

Author SHA1 Message Date
1f0914e2df bugfix-node-not-leaving
- Add lock when perform synchronisation on concurrent access
2024-01-04 00:23:20 +00:00
27e00196cd main
- Not waiting in the waitgroup
2024-01-02 20:31:24 +00:00
dea6f1a22d main
- error in code invalid check for nil
2024-01-02 20:19:34 +00:00
913de57568 main
- Fixed bug
2024-01-02 20:11:11 +00:00
ce829114b1 bugfix
- on synchornisation node is not leaving mesh
2024-01-02 19:41:20 +00:00
5 changed files with 39 additions and 22 deletions

BIN
api Executable file

Binary file not shown.

View File

@ -235,9 +235,19 @@ func NewSmegServer(conf ApiServerConf) (ApiServer, error) {
words: words, words: words,
} }
router.GET("/meshes", smegServer.GetMeshes) v1 := router.Group("/api/v1")
router.GET("/mesh/:meshid", smegServer.GetMesh) {
router.POST("/mesh/create", smegServer.CreateMesh) meshes := v1.Group("/meshes")
router.POST("/mesh/join", smegServer.JoinMesh) {
meshes.GET("/", smegServer.GetMeshes)
}
mesh := v1.Group("/mesh")
{
mesh.GET("/:meshid", smegServer.GetMesh)
mesh.POST("/create", smegServer.CreateMesh)
mesh.POST("/join", smegServer.JoinMesh)
}
}
return smegServer, nil return smegServer, nil
} }

View File

@ -23,9 +23,10 @@ func binarySearch(global []string, selfId string, groupSize int) (int, int) {
lower := 0 lower := 0
higher := len(global) - 1 higher := len(global) - 1
mid := (lower + higher) / 2
for (higher+1)-lower > groupSize { for (higher+1)-lower > groupSize {
mid := (lower + higher) / 2
if global[mid] < selfId { if global[mid] < selfId {
lower = mid + 1 lower = mid + 1
} else if global[mid] > selfId { } else if global[mid] > selfId {
@ -33,8 +34,6 @@ func binarySearch(global []string, selfId string, groupSize int) (int, int) {
} else { } else {
break break
} }
mid = (lower + higher) / 2
} }
return lower, int(math.Min(float64(lower+groupSize), float64(len(global)))) return lower, int(math.Min(float64(lower+groupSize), float64(len(global))))

View File

@ -10,6 +10,7 @@ import (
"github.com/tim-beatham/smegmesh/pkg/conf" "github.com/tim-beatham/smegmesh/pkg/conf"
"github.com/tim-beatham/smegmesh/pkg/ip" "github.com/tim-beatham/smegmesh/pkg/ip"
"github.com/tim-beatham/smegmesh/pkg/lib" "github.com/tim-beatham/smegmesh/pkg/lib"
logging "github.com/tim-beatham/smegmesh/pkg/log"
"github.com/tim-beatham/smegmesh/pkg/wg" "github.com/tim-beatham/smegmesh/pkg/wg"
"golang.zx2c4.com/wireguard/wgctrl" "golang.zx2c4.com/wireguard/wgctrl"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
@ -355,7 +356,7 @@ func (s *MeshManagerImpl) LeaveMesh(meshId string) error {
err := mesh.RemoveNode(s.HostParameters.GetPublicKey()) err := mesh.RemoveNode(s.HostParameters.GetPublicKey())
if err != nil { if err != nil {
return err logging.Log.WriteErrorf(err.Error())
} }
if s.OnDelete != nil { if s.OnDelete != nil {

View File

@ -28,6 +28,7 @@ type SyncerImpl struct {
cluster conn.ConnCluster cluster conn.ConnCluster
conf *conf.DaemonConfiguration conf *conf.DaemonConfiguration
lastSync map[string]int64 lastSync map[string]int64
lock sync.RWMutex
} }
// Sync: Sync with random nodes // Sync: Sync with random nodes
@ -38,11 +39,10 @@ func (s *SyncerImpl) Sync(correspondingMesh mesh.MeshProvider) error {
// Self can be nil if the node is removed // Self can be nil if the node is removed
selfID := s.manager.GetPublicKey() selfID := s.manager.GetPublicKey()
self, _ := correspondingMesh.GetNode(selfID.String()) self, err := correspondingMesh.GetNode(selfID.String())
// Mesh has been removed if err != nil {
if self == nil { logging.Log.WriteErrorf(err.Error())
return fmt.Errorf("mesh %s does not exist", correspondingMesh.GetMeshId())
} }
correspondingMesh.Prune() correspondingMesh.Prune()
@ -51,7 +51,8 @@ func (s *SyncerImpl) Sync(correspondingMesh mesh.MeshProvider) error {
logging.Log.WriteInfof("meshes %s has changes", correspondingMesh.GetMeshId()) logging.Log.WriteInfof("meshes %s has changes", correspondingMesh.GetMeshId())
} }
if self.GetType() == conf.PEER_ROLE && !correspondingMesh.HasChanges() && s.infectionCount == 0 { // If removed sync with other nodes to gossip the node is removed
if self != nil && self.GetType() == conf.PEER_ROLE && !correspondingMesh.HasChanges() && s.infectionCount == 0 {
logging.Log.WriteInfof("no changes for %s", correspondingMesh.GetMeshId()) logging.Log.WriteInfof("no changes for %s", correspondingMesh.GetMeshId())
// If not synchronised in certain time pull from random neighbour // If not synchronised in certain time pull from random neighbour
@ -63,16 +64,19 @@ func (s *SyncerImpl) Sync(correspondingMesh mesh.MeshProvider) error {
} }
before := time.Now() before := time.Now()
s.manager.GetRouteManager().UpdateRoutes() err = s.manager.GetRouteManager().UpdateRoutes()
if err != nil {
logging.Log.WriteErrorf(err.Error())
}
publicKey := s.manager.GetPublicKey() publicKey := s.manager.GetPublicKey()
nodeNames := correspondingMesh.GetPeers() nodeNames := correspondingMesh.GetPeers()
if self != nil { nodeNames = lib.Filter(nodeNames, func(s string) bool {
nodeNames = lib.Filter(nodeNames, func(s string) bool { // Filter our only public key out so we dont sync with ourself
return s != mesh.NodeID(self) return s != publicKey.String()
}) })
}
var gossipNodes []string var gossipNodes []string
@ -101,14 +105,14 @@ func (s *SyncerImpl) Sync(correspondingMesh mesh.MeshProvider) error {
// Do this synchronously to conserve bandwidth // Do this synchronously to conserve bandwidth
for _, node := range gossipNodes { for _, node := range gossipNodes {
correspondingPeer := s.manager.GetNode(correspondingMesh.GetMeshId(), node) correspondingPeer, err := correspondingMesh.GetNode(node)
if correspondingPeer == nil { if correspondingPeer == nil || err != nil {
logging.Log.WriteErrorf("node %s does not exist", node) logging.Log.WriteErrorf("node %s does not exist", node)
continue continue
} }
err := s.requester.SyncMesh(correspondingMesh.GetMeshId(), correspondingPeer) err = s.requester.SyncMesh(correspondingMesh.GetMeshId(), correspondingPeer)
if err == nil || err == io.EOF { if err == nil || err == io.EOF {
succeeded = true succeeded = true
@ -131,7 +135,9 @@ func (s *SyncerImpl) Sync(correspondingMesh mesh.MeshProvider) error {
correspondingMesh.SaveChanges() correspondingMesh.SaveChanges()
s.lock.Lock()
s.lastSync[correspondingMesh.GetMeshId()] = time.Now().Unix() s.lastSync[correspondingMesh.GetMeshId()] = time.Now().Unix()
s.lock.Unlock()
return nil return nil
} }
@ -188,6 +194,7 @@ func (s *SyncerImpl) SyncMeshes() error {
go sync() go sync()
} }
wg.Wait()
logging.Log.WriteInfof("updating the WireGuard configuration") logging.Log.WriteInfof("updating the WireGuard configuration")
err := s.manager.ApplyConfig() err := s.manager.ApplyConfig()