diff --git a/cmd/wgmeshd/configuration.yaml b/cmd/wgmeshd/configuration.yaml index fc74dfd..343a69c 100644 --- a/cmd/wgmeshd/configuration.yaml +++ b/cmd/wgmeshd/configuration.yaml @@ -2,5 +2,11 @@ certificatePath: "/wgmesh/cert/cert.pem" privateKeyPath: "/wgmesh/cert/priv.pem" caCertificatePath: "/wgmesh/cert/cacert.pem" skipCertVerification: true -gRPCPort: "8080" -advertiseRoutes: true \ No newline at end of file +gRPCPort: "21906" +advertiseRoutes: true +clusterSize: 32 +syncRate: 1 +interClusterChance: 0.15 +branchRate: 3 +infectionCount: 3 +keepAliveRate: 60 \ No newline at end of file diff --git a/cmd/wgmeshd/main.go b/cmd/wgmeshd/main.go index a345755..f101994 100644 --- a/cmd/wgmeshd/main.go +++ b/cmd/wgmeshd/main.go @@ -51,8 +51,8 @@ func main() { ctrlServer, err := ctrlserver.NewCtrlServer(&ctrlServerParams) syncProvider.Server = ctrlServer syncRequester := sync.NewSyncRequester(ctrlServer) - syncScheduler := sync.NewSyncScheduler(ctrlServer, syncRequester, 2) - timestampScheduler := timestamp.NewTimestampScheduler(ctrlServer, 60) + syncScheduler := sync.NewSyncScheduler(ctrlServer, syncRequester) + timestampScheduler := timestamp.NewTimestampScheduler(ctrlServer) robinIpcParams := robin.RobinIpcParams{ CtrlServer: ctrlServer, diff --git a/pkg/conf/conf.go b/pkg/conf/conf.go index 138a9e4..6a8a928 100644 --- a/pkg/conf/conf.go +++ b/pkg/conf/conf.go @@ -24,7 +24,13 @@ type WgMeshConfiguration struct { AdvertiseRoutes bool `yaml:"advertiseRoutes"` // Endpoint is the IP in which this computer is publicly reachable. // usecase is when the node has multiple IP addresses - Endpoint string `yaml:"publicEndpoint"` + Endpoint string `yaml:"publicEndpoint"` + ClusterSize int `yaml:"clusterSize"` + SyncRate float64 `yaml:"syncRate"` + InterClusterChance float64 `yaml:"interClusterChance"` + BranchRate int `yaml:"branchRate"` + InfectionCount int `yaml:"infectionCount"` + KeepAliveRate int `yaml:"keepAliveRate"` } // ParseConfiguration parses the mesh configuration diff --git a/pkg/conn/connection.go b/pkg/conn/connection.go index 57d3571..36d30f6 100644 --- a/pkg/conn/connection.go +++ b/pkg/conn/connection.go @@ -5,12 +5,10 @@ package conn import ( "crypto/tls" "errors" - "time" logging "github.com/tim-beatham/wgmesh/pkg/log" "google.golang.org/grpc" "google.golang.org/grpc/credentials" - "google.golang.org/grpc/keepalive" ) // PeerConnection represents a client-side connection between two @@ -43,11 +41,7 @@ func NewWgCtrlConnection(clientConfig *tls.Config, server string) (*WgCtrlConnec // ConnectWithToken: Connects to a new gRPC peer given the address of the other server. func (c *WgCtrlConnection) createGrpcConn() error { conn, err := grpc.Dial(c.endpoint, - grpc.WithTransportCredentials(credentials.NewTLS(c.clientConfig)), - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: 10 * time.Minute, - Timeout: 30 * time.Minute, - })) + grpc.WithTransportCredentials(credentials.NewTLS(c.clientConfig))) if err != nil { logging.Log.WriteErrorf("Could not connect: %s\n", err.Error()) diff --git a/pkg/conn/connectionmanager.go b/pkg/conn/connectionmanager.go index 717913f..784482f 100644 --- a/pkg/conn/connectionmanager.go +++ b/pkg/conn/connectionmanager.go @@ -94,7 +94,12 @@ func NewConnectionManager(params *NewConnectionManageParams) (ConnectionManager, } connections := make(map[string]PeerConnection) - connMgr := ConnectionManagerImpl{sync.RWMutex{}, connections, serverConfig, clientConfig} + connMgr := ConnectionManagerImpl{sync.RWMutex{}, + connections, + serverConfig, + clientConfig, + } + return &connMgr, nil } @@ -131,6 +136,7 @@ func (m *ConnectionManagerImpl) AddConnection(endPoint string) (PeerConnection, m.conLoc.Lock() m.clientConnections[endPoint] = connections m.conLoc.Unlock() + return connections, nil } diff --git a/pkg/lib/conv.go b/pkg/lib/conv.go index 7fb47d0..65b29bb 100644 --- a/pkg/lib/conv.go +++ b/pkg/lib/conv.go @@ -40,6 +40,7 @@ func MapKeys[K comparable, V any](m map[K]V) []K { type convert[V1 any, V2 any] func(V1) V2 +// Map turns a list of type V1 into type V2 func Map[V1 any, V2 any](list []V1, f convert[V1, V2]) []V2 { newList := make([]V2, len(list)) @@ -49,3 +50,19 @@ func Map[V1 any, V2 any](list []V1, f convert[V1, V2]) []V2 { return newList } + +type filterFunc[V any] func(V) bool + +// Filter filters out elements given a filter function. +// If filter function is true keep it in otherwise leave it out +func Filter[V any](list []V, f filterFunc[V]) []V { + newList := make([]V, 0) + + for _, elem := range newList { + if f(elem) { + newList = append(newList, elem) + } + } + + return newList +} diff --git a/pkg/lib/random.go b/pkg/lib/random.go index 581c99c..e7659b1 100644 --- a/pkg/lib/random.go +++ b/pkg/lib/random.go @@ -1,6 +1,8 @@ package lib -import "math/rand" +import ( + "math/rand" +) // RandomSubsetOfLength: Given an array of nodes generate of random // subset of 'num' length. @@ -17,6 +19,7 @@ func RandomSubsetOfLength[V any](vs []V, num int) []V { if _, ok := selectedIndices[randomIndex]; !ok { randomSubset = append(randomSubset, vs[randomIndex]) + selectedIndices[randomIndex] = struct{}{} i++ } } diff --git a/pkg/sync/syncer.go b/pkg/sync/syncer.go index 9333206..dffd3d3 100644 --- a/pkg/sync/syncer.go +++ b/pkg/sync/syncer.go @@ -2,10 +2,12 @@ package sync import ( "errors" + "math/rand" "sync" "time" - crdt "github.com/tim-beatham/wgmesh/pkg/automerge" + "github.com/tim-beatham/wgmesh/pkg/conf" + "github.com/tim-beatham/wgmesh/pkg/conn" "github.com/tim-beatham/wgmesh/pkg/lib" logging "github.com/tim-beatham/wgmesh/pkg/log" "github.com/tim-beatham/wgmesh/pkg/mesh" @@ -18,15 +20,14 @@ type Syncer interface { } type SyncerImpl struct { - manager *mesh.MeshManager - requester SyncRequester - authenticatedNodes []crdt.MeshNodeCrdt - infectionCount int + manager *mesh.MeshManager + requester SyncRequester + infectionCount int + syncCount int + cluster conn.ConnCluster + conf *conf.WgMeshConfiguration } -const subSetLength = 3 -const infectionCount = 3 - // Sync: Sync random nodes func (s *SyncerImpl) Sync(meshId string) error { logging.Log.WriteInfof("UPDATING WG CONF") @@ -37,13 +38,13 @@ func (s *SyncerImpl) Sync(meshId string) error { return nil } - mesh := s.manager.GetMesh(meshId) + theMesh := s.manager.GetMesh(meshId) - if mesh == nil { + if theMesh == nil { return errors.New("the provided mesh does not exist") } - snapshot, err := mesh.GetMesh() + snapshot, err := theMesh.GetMesh() if err != nil { return err @@ -58,31 +59,48 @@ func (s *SyncerImpl) Sync(meshId string) error { excludedNodes := map[string]struct{}{ s.manager.HostParameters.HostEndpoint: {}, } - meshNodes := lib.MapValuesWithExclude(nodes, excludedNodes) - randomSubset := lib.RandomSubsetOfLength(meshNodes, subSetLength) + + getNames := func(node mesh.MeshNode) string { + return node.GetHostEndpoint() + } + + nodeNames := lib.Map(meshNodes, getNames) + + neighbours := s.cluster.GetNeighbours(nodeNames, s.manager.HostParameters.HostEndpoint) + randomSubset := lib.RandomSubsetOfLength(neighbours, s.conf.BranchRate) + + for _, node := range randomSubset { + logging.Log.WriteInfof("Random node: %s", node) + } before := time.Now() + if len(meshNodes) > s.conf.ClusterSize && rand.Float64() < s.conf.InterClusterChance { + logging.Log.WriteInfof("Sending to random cluster") + interCluster := s.cluster.GetInterCluster(nodeNames, s.manager.HostParameters.HostEndpoint) + randomSubset = append(randomSubset, interCluster) + } + var waitGroup sync.WaitGroup - for _, n := range randomSubset { + for index := range randomSubset { waitGroup.Add(1) - syncMeshFunc := func() error { + go func(i int) error { defer waitGroup.Done() - err := s.requester.SyncMesh(meshId, n.GetHostEndpoint()) + err := s.requester.SyncMesh(meshId, randomSubset[i]) return err - } - - go syncMeshFunc() + }(index) } waitGroup.Wait() + s.syncCount++ logging.Log.WriteInfof("SYNC TIME: %v", time.Now().Sub(before)) + logging.Log.WriteInfof("SYNC COUNT: %d", s.syncCount) - s.infectionCount = ((infectionCount + s.infectionCount - 1) % infectionCount) + s.infectionCount = ((s.conf.InfectionCount + s.infectionCount - 1) % s.conf.InfectionCount) return nil } @@ -100,6 +118,13 @@ func (s *SyncerImpl) SyncMeshes() error { return nil } -func NewSyncer(m *mesh.MeshManager, r SyncRequester) Syncer { - return &SyncerImpl{manager: m, requester: r, infectionCount: 0} +func NewSyncer(m *mesh.MeshManager, conf *conf.WgMeshConfiguration, r SyncRequester) Syncer { + cluster, _ := conn.NewConnCluster(conf.ClusterSize) + return &SyncerImpl{ + manager: m, + conf: conf, + requester: r, + infectionCount: 0, + syncCount: 0, + cluster: cluster} } diff --git a/pkg/sync/syncscheduler.go b/pkg/sync/syncscheduler.go index e58a500..8d0856e 100644 --- a/pkg/sync/syncscheduler.go +++ b/pkg/sync/syncscheduler.go @@ -16,15 +16,14 @@ type SyncScheduler interface { // SyncSchedulerImpl scheduler for sync scheduling type SyncSchedulerImpl struct { - syncRate int - quit chan struct{} - server *ctrlserver.MeshCtrlServer - syncer Syncer + quit chan struct{} + server *ctrlserver.MeshCtrlServer + syncer Syncer } // Run implements SyncScheduler. func (s *SyncSchedulerImpl) Run() error { - ticker := time.NewTicker(time.Duration(s.syncRate) * time.Second) + ticker := time.NewTicker(time.Duration(s.server.Conf.SyncRate) * time.Second) quit := make(chan struct{}) s.quit = quit @@ -50,7 +49,7 @@ func (s *SyncSchedulerImpl) Stop() error { return nil } -func NewSyncScheduler(s *ctrlserver.MeshCtrlServer, syncRequester SyncRequester, syncRate int) SyncScheduler { - syncer := NewSyncer(s.MeshManager, syncRequester) - return &SyncSchedulerImpl{server: s, syncRate: syncRate, syncer: syncer} +func NewSyncScheduler(s *ctrlserver.MeshCtrlServer, syncRequester SyncRequester) SyncScheduler { + syncer := NewSyncer(s.MeshManager, s.Conf, syncRequester) + return &SyncSchedulerImpl{server: s, syncer: syncer} } diff --git a/pkg/timestamp/timestamp.go b/pkg/timestamp/timestamp.go index 36218b5..971d138 100644 --- a/pkg/timestamp/timestamp.go +++ b/pkg/timestamp/timestamp.go @@ -38,8 +38,8 @@ func (s *TimeStampSchedulerImpl) Run() error { } } -func NewTimestampScheduler(ctrlServer *ctrlserver.MeshCtrlServer, updateRate int) TimestampScheduler { - return &TimeStampSchedulerImpl{meshManager: ctrlServer.MeshManager, updateRate: updateRate} +func NewTimestampScheduler(ctrlServer *ctrlserver.MeshCtrlServer) TimestampScheduler { + return &TimeStampSchedulerImpl{meshManager: ctrlServer.MeshManager, updateRate: ctrlServer.Conf.KeepAliveRate} } func (s *TimeStampSchedulerImpl) Stop() error {