1
0
forked from extern/smegmesh

Merge pull request #8 from tim-beatham/7-create-rotating-window-of-connections

Implemented clustering betweeen nodes
This commit is contained in:
Tim Beatham 2023-11-03 15:25:30 +00:00 committed by GitHub
commit e2c6db3a4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 102 additions and 46 deletions

View File

@ -2,5 +2,11 @@ certificatePath: "/wgmesh/cert/cert.pem"
privateKeyPath: "/wgmesh/cert/priv.pem" privateKeyPath: "/wgmesh/cert/priv.pem"
caCertificatePath: "/wgmesh/cert/cacert.pem" caCertificatePath: "/wgmesh/cert/cacert.pem"
skipCertVerification: true skipCertVerification: true
gRPCPort: "8080" gRPCPort: "21906"
advertiseRoutes: true advertiseRoutes: true
clusterSize: 32
syncRate: 1
interClusterChance: 0.15
branchRate: 3
infectionCount: 3
keepAliveRate: 60

View File

@ -51,8 +51,8 @@ func main() {
ctrlServer, err := ctrlserver.NewCtrlServer(&ctrlServerParams) ctrlServer, err := ctrlserver.NewCtrlServer(&ctrlServerParams)
syncProvider.Server = ctrlServer syncProvider.Server = ctrlServer
syncRequester := sync.NewSyncRequester(ctrlServer) syncRequester := sync.NewSyncRequester(ctrlServer)
syncScheduler := sync.NewSyncScheduler(ctrlServer, syncRequester, 2) syncScheduler := sync.NewSyncScheduler(ctrlServer, syncRequester)
timestampScheduler := timestamp.NewTimestampScheduler(ctrlServer, 60) timestampScheduler := timestamp.NewTimestampScheduler(ctrlServer)
robinIpcParams := robin.RobinIpcParams{ robinIpcParams := robin.RobinIpcParams{
CtrlServer: ctrlServer, CtrlServer: ctrlServer,

View File

@ -25,6 +25,12 @@ type WgMeshConfiguration struct {
// Endpoint is the IP in which this computer is publicly reachable. // Endpoint is the IP in which this computer is publicly reachable.
// usecase is when the node has multiple IP addresses // usecase is when the node has multiple IP addresses
Endpoint string `yaml:"publicEndpoint"` Endpoint string `yaml:"publicEndpoint"`
ClusterSize int `yaml:"clusterSize"`
SyncRate float64 `yaml:"syncRate"`
InterClusterChance float64 `yaml:"interClusterChance"`
BranchRate int `yaml:"branchRate"`
InfectionCount int `yaml:"infectionCount"`
KeepAliveRate int `yaml:"keepAliveRate"`
} }
// ParseConfiguration parses the mesh configuration // ParseConfiguration parses the mesh configuration

View File

@ -5,12 +5,10 @@ package conn
import ( import (
"crypto/tls" "crypto/tls"
"errors" "errors"
"time"
logging "github.com/tim-beatham/wgmesh/pkg/log" logging "github.com/tim-beatham/wgmesh/pkg/log"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
) )
// PeerConnection represents a client-side connection between two // PeerConnection represents a client-side connection between two
@ -43,11 +41,7 @@ func NewWgCtrlConnection(clientConfig *tls.Config, server string) (*WgCtrlConnec
// ConnectWithToken: Connects to a new gRPC peer given the address of the other server. // ConnectWithToken: Connects to a new gRPC peer given the address of the other server.
func (c *WgCtrlConnection) createGrpcConn() error { func (c *WgCtrlConnection) createGrpcConn() error {
conn, err := grpc.Dial(c.endpoint, conn, err := grpc.Dial(c.endpoint,
grpc.WithTransportCredentials(credentials.NewTLS(c.clientConfig)), grpc.WithTransportCredentials(credentials.NewTLS(c.clientConfig)))
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Minute,
Timeout: 30 * time.Minute,
}))
if err != nil { if err != nil {
logging.Log.WriteErrorf("Could not connect: %s\n", err.Error()) logging.Log.WriteErrorf("Could not connect: %s\n", err.Error())

View File

@ -94,7 +94,12 @@ func NewConnectionManager(params *NewConnectionManageParams) (ConnectionManager,
} }
connections := make(map[string]PeerConnection) connections := make(map[string]PeerConnection)
connMgr := ConnectionManagerImpl{sync.RWMutex{}, connections, serverConfig, clientConfig} connMgr := ConnectionManagerImpl{sync.RWMutex{},
connections,
serverConfig,
clientConfig,
}
return &connMgr, nil return &connMgr, nil
} }
@ -131,6 +136,7 @@ func (m *ConnectionManagerImpl) AddConnection(endPoint string) (PeerConnection,
m.conLoc.Lock() m.conLoc.Lock()
m.clientConnections[endPoint] = connections m.clientConnections[endPoint] = connections
m.conLoc.Unlock() m.conLoc.Unlock()
return connections, nil return connections, nil
} }

View File

@ -40,6 +40,7 @@ func MapKeys[K comparable, V any](m map[K]V) []K {
type convert[V1 any, V2 any] func(V1) V2 type convert[V1 any, V2 any] func(V1) V2
// Map turns a list of type V1 into type V2
func Map[V1 any, V2 any](list []V1, f convert[V1, V2]) []V2 { func Map[V1 any, V2 any](list []V1, f convert[V1, V2]) []V2 {
newList := make([]V2, len(list)) newList := make([]V2, len(list))
@ -49,3 +50,19 @@ func Map[V1 any, V2 any](list []V1, f convert[V1, V2]) []V2 {
return newList return newList
} }
type filterFunc[V any] func(V) bool
// Filter filters out elements given a filter function.
// If filter function is true keep it in otherwise leave it out
func Filter[V any](list []V, f filterFunc[V]) []V {
newList := make([]V, 0)
for _, elem := range newList {
if f(elem) {
newList = append(newList, elem)
}
}
return newList
}

View File

@ -1,6 +1,8 @@
package lib package lib
import "math/rand" import (
"math/rand"
)
// RandomSubsetOfLength: Given an array of nodes generate of random // RandomSubsetOfLength: Given an array of nodes generate of random
// subset of 'num' length. // subset of 'num' length.
@ -17,6 +19,7 @@ func RandomSubsetOfLength[V any](vs []V, num int) []V {
if _, ok := selectedIndices[randomIndex]; !ok { if _, ok := selectedIndices[randomIndex]; !ok {
randomSubset = append(randomSubset, vs[randomIndex]) randomSubset = append(randomSubset, vs[randomIndex])
selectedIndices[randomIndex] = struct{}{}
i++ i++
} }
} }

View File

@ -2,10 +2,12 @@ package sync
import ( import (
"errors" "errors"
"math/rand"
"sync" "sync"
"time" "time"
crdt "github.com/tim-beatham/wgmesh/pkg/automerge" "github.com/tim-beatham/wgmesh/pkg/conf"
"github.com/tim-beatham/wgmesh/pkg/conn"
"github.com/tim-beatham/wgmesh/pkg/lib" "github.com/tim-beatham/wgmesh/pkg/lib"
logging "github.com/tim-beatham/wgmesh/pkg/log" logging "github.com/tim-beatham/wgmesh/pkg/log"
"github.com/tim-beatham/wgmesh/pkg/mesh" "github.com/tim-beatham/wgmesh/pkg/mesh"
@ -20,13 +22,12 @@ type Syncer interface {
type SyncerImpl struct { type SyncerImpl struct {
manager *mesh.MeshManager manager *mesh.MeshManager
requester SyncRequester requester SyncRequester
authenticatedNodes []crdt.MeshNodeCrdt
infectionCount int infectionCount int
syncCount int
cluster conn.ConnCluster
conf *conf.WgMeshConfiguration
} }
const subSetLength = 3
const infectionCount = 3
// Sync: Sync random nodes // Sync: Sync random nodes
func (s *SyncerImpl) Sync(meshId string) error { func (s *SyncerImpl) Sync(meshId string) error {
logging.Log.WriteInfof("UPDATING WG CONF") logging.Log.WriteInfof("UPDATING WG CONF")
@ -37,13 +38,13 @@ func (s *SyncerImpl) Sync(meshId string) error {
return nil return nil
} }
mesh := s.manager.GetMesh(meshId) theMesh := s.manager.GetMesh(meshId)
if mesh == nil { if theMesh == nil {
return errors.New("the provided mesh does not exist") return errors.New("the provided mesh does not exist")
} }
snapshot, err := mesh.GetMesh() snapshot, err := theMesh.GetMesh()
if err != nil { if err != nil {
return err return err
@ -58,31 +59,48 @@ func (s *SyncerImpl) Sync(meshId string) error {
excludedNodes := map[string]struct{}{ excludedNodes := map[string]struct{}{
s.manager.HostParameters.HostEndpoint: {}, s.manager.HostParameters.HostEndpoint: {},
} }
meshNodes := lib.MapValuesWithExclude(nodes, excludedNodes) meshNodes := lib.MapValuesWithExclude(nodes, excludedNodes)
randomSubset := lib.RandomSubsetOfLength(meshNodes, subSetLength)
getNames := func(node mesh.MeshNode) string {
return node.GetHostEndpoint()
}
nodeNames := lib.Map(meshNodes, getNames)
neighbours := s.cluster.GetNeighbours(nodeNames, s.manager.HostParameters.HostEndpoint)
randomSubset := lib.RandomSubsetOfLength(neighbours, s.conf.BranchRate)
for _, node := range randomSubset {
logging.Log.WriteInfof("Random node: %s", node)
}
before := time.Now() before := time.Now()
var waitGroup sync.WaitGroup if len(meshNodes) > s.conf.ClusterSize && rand.Float64() < s.conf.InterClusterChance {
logging.Log.WriteInfof("Sending to random cluster")
for _, n := range randomSubset { interCluster := s.cluster.GetInterCluster(nodeNames, s.manager.HostParameters.HostEndpoint)
waitGroup.Add(1) randomSubset = append(randomSubset, interCluster)
syncMeshFunc := func() error {
defer waitGroup.Done()
err := s.requester.SyncMesh(meshId, n.GetHostEndpoint())
return err
} }
go syncMeshFunc() var waitGroup sync.WaitGroup
for index := range randomSubset {
waitGroup.Add(1)
go func(i int) error {
defer waitGroup.Done()
err := s.requester.SyncMesh(meshId, randomSubset[i])
return err
}(index)
} }
waitGroup.Wait() waitGroup.Wait()
s.syncCount++
logging.Log.WriteInfof("SYNC TIME: %v", time.Now().Sub(before)) logging.Log.WriteInfof("SYNC TIME: %v", time.Now().Sub(before))
logging.Log.WriteInfof("SYNC COUNT: %d", s.syncCount)
s.infectionCount = ((infectionCount + s.infectionCount - 1) % infectionCount) s.infectionCount = ((s.conf.InfectionCount + s.infectionCount - 1) % s.conf.InfectionCount)
return nil return nil
} }
@ -100,6 +118,13 @@ func (s *SyncerImpl) SyncMeshes() error {
return nil return nil
} }
func NewSyncer(m *mesh.MeshManager, r SyncRequester) Syncer { func NewSyncer(m *mesh.MeshManager, conf *conf.WgMeshConfiguration, r SyncRequester) Syncer {
return &SyncerImpl{manager: m, requester: r, infectionCount: 0} cluster, _ := conn.NewConnCluster(conf.ClusterSize)
return &SyncerImpl{
manager: m,
conf: conf,
requester: r,
infectionCount: 0,
syncCount: 0,
cluster: cluster}
} }

View File

@ -16,7 +16,6 @@ type SyncScheduler interface {
// SyncSchedulerImpl scheduler for sync scheduling // SyncSchedulerImpl scheduler for sync scheduling
type SyncSchedulerImpl struct { type SyncSchedulerImpl struct {
syncRate int
quit chan struct{} quit chan struct{}
server *ctrlserver.MeshCtrlServer server *ctrlserver.MeshCtrlServer
syncer Syncer syncer Syncer
@ -24,7 +23,7 @@ type SyncSchedulerImpl struct {
// Run implements SyncScheduler. // Run implements SyncScheduler.
func (s *SyncSchedulerImpl) Run() error { func (s *SyncSchedulerImpl) Run() error {
ticker := time.NewTicker(time.Duration(s.syncRate) * time.Second) ticker := time.NewTicker(time.Duration(s.server.Conf.SyncRate) * time.Second)
quit := make(chan struct{}) quit := make(chan struct{})
s.quit = quit s.quit = quit
@ -50,7 +49,7 @@ func (s *SyncSchedulerImpl) Stop() error {
return nil return nil
} }
func NewSyncScheduler(s *ctrlserver.MeshCtrlServer, syncRequester SyncRequester, syncRate int) SyncScheduler { func NewSyncScheduler(s *ctrlserver.MeshCtrlServer, syncRequester SyncRequester) SyncScheduler {
syncer := NewSyncer(s.MeshManager, syncRequester) syncer := NewSyncer(s.MeshManager, s.Conf, syncRequester)
return &SyncSchedulerImpl{server: s, syncRate: syncRate, syncer: syncer} return &SyncSchedulerImpl{server: s, syncer: syncer}
} }

View File

@ -38,8 +38,8 @@ func (s *TimeStampSchedulerImpl) Run() error {
} }
} }
func NewTimestampScheduler(ctrlServer *ctrlserver.MeshCtrlServer, updateRate int) TimestampScheduler { func NewTimestampScheduler(ctrlServer *ctrlserver.MeshCtrlServer) TimestampScheduler {
return &TimeStampSchedulerImpl{meshManager: ctrlServer.MeshManager, updateRate: updateRate} return &TimeStampSchedulerImpl{meshManager: ctrlServer.MeshManager, updateRate: ctrlServer.Conf.KeepAliveRate}
} }
func (s *TimeStampSchedulerImpl) Stop() error { func (s *TimeStampSchedulerImpl) Stop() error {