1
0
forked from extern/smegmesh

Implemented clustering betweeen nodes

This commit is contained in:
Tim Beatham 2023-11-03 15:24:18 +00:00
parent 8d8a13d6ff
commit 843caddf6b
10 changed files with 102 additions and 46 deletions

View File

@ -2,5 +2,11 @@ certificatePath: "/wgmesh/cert/cert.pem"
privateKeyPath: "/wgmesh/cert/priv.pem"
caCertificatePath: "/wgmesh/cert/cacert.pem"
skipCertVerification: true
gRPCPort: "8080"
advertiseRoutes: true
gRPCPort: "21906"
advertiseRoutes: true
clusterSize: 32
syncRate: 1
interClusterChance: 0.15
branchRate: 3
infectionCount: 3
keepAliveRate: 60

View File

@ -51,8 +51,8 @@ func main() {
ctrlServer, err := ctrlserver.NewCtrlServer(&ctrlServerParams)
syncProvider.Server = ctrlServer
syncRequester := sync.NewSyncRequester(ctrlServer)
syncScheduler := sync.NewSyncScheduler(ctrlServer, syncRequester, 2)
timestampScheduler := timestamp.NewTimestampScheduler(ctrlServer, 60)
syncScheduler := sync.NewSyncScheduler(ctrlServer, syncRequester)
timestampScheduler := timestamp.NewTimestampScheduler(ctrlServer)
robinIpcParams := robin.RobinIpcParams{
CtrlServer: ctrlServer,

View File

@ -24,7 +24,13 @@ type WgMeshConfiguration struct {
AdvertiseRoutes bool `yaml:"advertiseRoutes"`
// Endpoint is the IP in which this computer is publicly reachable.
// usecase is when the node has multiple IP addresses
Endpoint string `yaml:"publicEndpoint"`
Endpoint string `yaml:"publicEndpoint"`
ClusterSize int `yaml:"clusterSize"`
SyncRate float64 `yaml:"syncRate"`
InterClusterChance float64 `yaml:"interClusterChance"`
BranchRate int `yaml:"branchRate"`
InfectionCount int `yaml:"infectionCount"`
KeepAliveRate int `yaml:"keepAliveRate"`
}
// ParseConfiguration parses the mesh configuration

View File

@ -5,12 +5,10 @@ package conn
import (
"crypto/tls"
"errors"
"time"
logging "github.com/tim-beatham/wgmesh/pkg/log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
)
// PeerConnection represents a client-side connection between two
@ -43,11 +41,7 @@ func NewWgCtrlConnection(clientConfig *tls.Config, server string) (*WgCtrlConnec
// ConnectWithToken: Connects to a new gRPC peer given the address of the other server.
func (c *WgCtrlConnection) createGrpcConn() error {
conn, err := grpc.Dial(c.endpoint,
grpc.WithTransportCredentials(credentials.NewTLS(c.clientConfig)),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Minute,
Timeout: 30 * time.Minute,
}))
grpc.WithTransportCredentials(credentials.NewTLS(c.clientConfig)))
if err != nil {
logging.Log.WriteErrorf("Could not connect: %s\n", err.Error())

View File

@ -94,7 +94,12 @@ func NewConnectionManager(params *NewConnectionManageParams) (ConnectionManager,
}
connections := make(map[string]PeerConnection)
connMgr := ConnectionManagerImpl{sync.RWMutex{}, connections, serverConfig, clientConfig}
connMgr := ConnectionManagerImpl{sync.RWMutex{},
connections,
serverConfig,
clientConfig,
}
return &connMgr, nil
}
@ -131,6 +136,7 @@ func (m *ConnectionManagerImpl) AddConnection(endPoint string) (PeerConnection,
m.conLoc.Lock()
m.clientConnections[endPoint] = connections
m.conLoc.Unlock()
return connections, nil
}

View File

@ -40,6 +40,7 @@ func MapKeys[K comparable, V any](m map[K]V) []K {
type convert[V1 any, V2 any] func(V1) V2
// Map turns a list of type V1 into type V2
func Map[V1 any, V2 any](list []V1, f convert[V1, V2]) []V2 {
newList := make([]V2, len(list))
@ -49,3 +50,19 @@ func Map[V1 any, V2 any](list []V1, f convert[V1, V2]) []V2 {
return newList
}
type filterFunc[V any] func(V) bool
// Filter filters out elements given a filter function.
// If filter function is true keep it in otherwise leave it out
func Filter[V any](list []V, f filterFunc[V]) []V {
newList := make([]V, 0)
for _, elem := range newList {
if f(elem) {
newList = append(newList, elem)
}
}
return newList
}

View File

@ -1,6 +1,8 @@
package lib
import "math/rand"
import (
"math/rand"
)
// RandomSubsetOfLength: Given an array of nodes generate of random
// subset of 'num' length.
@ -17,6 +19,7 @@ func RandomSubsetOfLength[V any](vs []V, num int) []V {
if _, ok := selectedIndices[randomIndex]; !ok {
randomSubset = append(randomSubset, vs[randomIndex])
selectedIndices[randomIndex] = struct{}{}
i++
}
}

View File

@ -2,10 +2,12 @@ package sync
import (
"errors"
"math/rand"
"sync"
"time"
crdt "github.com/tim-beatham/wgmesh/pkg/automerge"
"github.com/tim-beatham/wgmesh/pkg/conf"
"github.com/tim-beatham/wgmesh/pkg/conn"
"github.com/tim-beatham/wgmesh/pkg/lib"
logging "github.com/tim-beatham/wgmesh/pkg/log"
"github.com/tim-beatham/wgmesh/pkg/mesh"
@ -18,15 +20,14 @@ type Syncer interface {
}
type SyncerImpl struct {
manager *mesh.MeshManager
requester SyncRequester
authenticatedNodes []crdt.MeshNodeCrdt
infectionCount int
manager *mesh.MeshManager
requester SyncRequester
infectionCount int
syncCount int
cluster conn.ConnCluster
conf *conf.WgMeshConfiguration
}
const subSetLength = 3
const infectionCount = 3
// Sync: Sync random nodes
func (s *SyncerImpl) Sync(meshId string) error {
logging.Log.WriteInfof("UPDATING WG CONF")
@ -37,13 +38,13 @@ func (s *SyncerImpl) Sync(meshId string) error {
return nil
}
mesh := s.manager.GetMesh(meshId)
theMesh := s.manager.GetMesh(meshId)
if mesh == nil {
if theMesh == nil {
return errors.New("the provided mesh does not exist")
}
snapshot, err := mesh.GetMesh()
snapshot, err := theMesh.GetMesh()
if err != nil {
return err
@ -58,31 +59,48 @@ func (s *SyncerImpl) Sync(meshId string) error {
excludedNodes := map[string]struct{}{
s.manager.HostParameters.HostEndpoint: {},
}
meshNodes := lib.MapValuesWithExclude(nodes, excludedNodes)
randomSubset := lib.RandomSubsetOfLength(meshNodes, subSetLength)
getNames := func(node mesh.MeshNode) string {
return node.GetHostEndpoint()
}
nodeNames := lib.Map(meshNodes, getNames)
neighbours := s.cluster.GetNeighbours(nodeNames, s.manager.HostParameters.HostEndpoint)
randomSubset := lib.RandomSubsetOfLength(neighbours, s.conf.BranchRate)
for _, node := range randomSubset {
logging.Log.WriteInfof("Random node: %s", node)
}
before := time.Now()
if len(meshNodes) > s.conf.ClusterSize && rand.Float64() < s.conf.InterClusterChance {
logging.Log.WriteInfof("Sending to random cluster")
interCluster := s.cluster.GetInterCluster(nodeNames, s.manager.HostParameters.HostEndpoint)
randomSubset = append(randomSubset, interCluster)
}
var waitGroup sync.WaitGroup
for _, n := range randomSubset {
for index := range randomSubset {
waitGroup.Add(1)
syncMeshFunc := func() error {
go func(i int) error {
defer waitGroup.Done()
err := s.requester.SyncMesh(meshId, n.GetHostEndpoint())
err := s.requester.SyncMesh(meshId, randomSubset[i])
return err
}
go syncMeshFunc()
}(index)
}
waitGroup.Wait()
s.syncCount++
logging.Log.WriteInfof("SYNC TIME: %v", time.Now().Sub(before))
logging.Log.WriteInfof("SYNC COUNT: %d", s.syncCount)
s.infectionCount = ((infectionCount + s.infectionCount - 1) % infectionCount)
s.infectionCount = ((s.conf.InfectionCount + s.infectionCount - 1) % s.conf.InfectionCount)
return nil
}
@ -100,6 +118,13 @@ func (s *SyncerImpl) SyncMeshes() error {
return nil
}
func NewSyncer(m *mesh.MeshManager, r SyncRequester) Syncer {
return &SyncerImpl{manager: m, requester: r, infectionCount: 0}
func NewSyncer(m *mesh.MeshManager, conf *conf.WgMeshConfiguration, r SyncRequester) Syncer {
cluster, _ := conn.NewConnCluster(conf.ClusterSize)
return &SyncerImpl{
manager: m,
conf: conf,
requester: r,
infectionCount: 0,
syncCount: 0,
cluster: cluster}
}

View File

@ -16,15 +16,14 @@ type SyncScheduler interface {
// SyncSchedulerImpl scheduler for sync scheduling
type SyncSchedulerImpl struct {
syncRate int
quit chan struct{}
server *ctrlserver.MeshCtrlServer
syncer Syncer
quit chan struct{}
server *ctrlserver.MeshCtrlServer
syncer Syncer
}
// Run implements SyncScheduler.
func (s *SyncSchedulerImpl) Run() error {
ticker := time.NewTicker(time.Duration(s.syncRate) * time.Second)
ticker := time.NewTicker(time.Duration(s.server.Conf.SyncRate) * time.Second)
quit := make(chan struct{})
s.quit = quit
@ -50,7 +49,7 @@ func (s *SyncSchedulerImpl) Stop() error {
return nil
}
func NewSyncScheduler(s *ctrlserver.MeshCtrlServer, syncRequester SyncRequester, syncRate int) SyncScheduler {
syncer := NewSyncer(s.MeshManager, syncRequester)
return &SyncSchedulerImpl{server: s, syncRate: syncRate, syncer: syncer}
func NewSyncScheduler(s *ctrlserver.MeshCtrlServer, syncRequester SyncRequester) SyncScheduler {
syncer := NewSyncer(s.MeshManager, s.Conf, syncRequester)
return &SyncSchedulerImpl{server: s, syncer: syncer}
}

View File

@ -38,8 +38,8 @@ func (s *TimeStampSchedulerImpl) Run() error {
}
}
func NewTimestampScheduler(ctrlServer *ctrlserver.MeshCtrlServer, updateRate int) TimestampScheduler {
return &TimeStampSchedulerImpl{meshManager: ctrlServer.MeshManager, updateRate: updateRate}
func NewTimestampScheduler(ctrlServer *ctrlserver.MeshCtrlServer) TimestampScheduler {
return &TimeStampSchedulerImpl{meshManager: ctrlServer.MeshManager, updateRate: ctrlServer.Conf.KeepAliveRate}
}
func (s *TimeStampSchedulerImpl) Stop() error {