Added health system to count how many times a node

fails to conenct.
This commit is contained in:
Tim Beatham 2023-11-06 09:54:06 +00:00
parent 4dc85f3861
commit c88012cf71
15 changed files with 161 additions and 23 deletions

View File

@ -2,6 +2,7 @@ certificatePath: "/wgmesh/cert/cert.pem"
privateKeyPath: "/wgmesh/cert/priv.pem" privateKeyPath: "/wgmesh/cert/priv.pem"
caCertificatePath: "/wgmesh/cert/cacert.pem" caCertificatePath: "/wgmesh/cert/cacert.pem"
skipCertVerification: true skipCertVerification: true
timeout: 5
gRPCPort: "21906" gRPCPort: "21906"
advertiseRoutes: true advertiseRoutes: true
clusterSize: 32 clusterSize: 32
@ -9,4 +10,4 @@ syncRate: 1
interClusterChance: 0.15 interClusterChance: 0.15
branchRate: 3 branchRate: 3
infectionCount: 3 infectionCount: 3
keepAliveRate: 60 keepAliveTime: 60

View File

@ -212,6 +212,68 @@ func (m *CrdtMeshManager) GetSyncer() mesh.MeshSyncer {
return NewAutomergeSync(m) return NewAutomergeSync(m)
} }
// getHealthMap returns the health map from the automerge CRDT
func (m *CrdtMeshManager) getHealthMap(nodeId string) (*automerge.Map, error) {
node, err := m.doc.Path("nodes").Map().Get(nodeId)
if err != nil {
return nil, err
}
if node.Kind() != automerge.KindMap {
return nil, errors.New("node should be a map")
}
nodeMap := node.Map()
health, err := nodeMap.Get("health")
if err != nil {
return nil, err
}
if health.Kind() != automerge.KindMap {
return nil, errors.New("health should be a map")
}
healthMap := health.Map()
return healthMap, nil
}
// DecrementHealth: indicates that the current node has voted that the health is down
func (m *CrdtMeshManager) DecrementHealth(nodeId string, selfId string) error {
healthMap, err := m.getHealthMap(nodeId)
if err != nil {
return err
}
err = healthMap.Set(selfId, struct{}{})
if err != nil {
logging.Log.WriteErrorf(err.Error())
}
return nil
}
// IncrementHealth: indicates that the current node thinks that the noden is up
func (m *CrdtMeshManager) IncrementHealth(nodeId string, selfId string) error {
healthMap, err := m.getHealthMap(nodeId)
if err != nil {
return err
}
err = healthMap.Delete(selfId)
if err != nil {
logging.Log.WriteErrorf(err.Error())
}
return nil
}
func (m1 *MeshNodeCrdt) Compare(m2 *MeshNodeCrdt) int { func (m1 *MeshNodeCrdt) Compare(m2 *MeshNodeCrdt) int {
return strings.Compare(m1.PublicKey, m2.PublicKey) return strings.Compare(m1.PublicKey, m2.PublicKey)
} }
@ -260,6 +322,10 @@ func (m *MeshNodeCrdt) GetIdentifier() string {
return strings.Join(constituents, ":") return strings.Join(constituents, ":")
} }
func (m *MeshNodeCrdt) GetHealth() int {
return len(m.Health)
}
func (m *MeshCrdt) GetNodes() map[string]mesh.MeshNode { func (m *MeshCrdt) GetNodes() map[string]mesh.MeshNode {
nodes := make(map[string]mesh.MeshNode) nodes := make(map[string]mesh.MeshNode)

View File

@ -9,6 +9,7 @@ type MeshNodeCrdt struct {
Timestamp int64 `automerge:"timestamp"` Timestamp int64 `automerge:"timestamp"`
Routes map[string]interface{} `automerge:"routes"` Routes map[string]interface{} `automerge:"routes"`
Description string `automerge:"description"` Description string `automerge:"description"`
Health map[string]interface{} `automerge:"health"`
} }
// MeshCrdt: Represents the mesh network as a whole // MeshCrdt: Represents the mesh network as a whole

View File

@ -32,13 +32,21 @@ type WgMeshConfiguration struct {
AdvertiseRoutes bool `yaml:"advertiseRoutes"` AdvertiseRoutes bool `yaml:"advertiseRoutes"`
// Endpoint is the IP in which this computer is publicly reachable. // Endpoint is the IP in which this computer is publicly reachable.
// usecase is when the node has multiple IP addresses // usecase is when the node has multiple IP addresses
Endpoint string `yaml:"publicEndpoint"` Endpoint string `yaml:"publicEndpoint"`
ClusterSize int `yaml:"clusterSize"` // ClusterSize size of the cluster to split on
SyncRate float64 `yaml:"syncRate"` ClusterSize int `yaml:"clusterSize"`
// SyncRate number of times per second to perform a sync
SyncRate float64 `yaml:"syncRate"`
// InterClusterChance proability of inter-cluster communication in a sync round
InterClusterChance float64 `yaml:"interClusterChance"` InterClusterChance float64 `yaml:"interClusterChance"`
BranchRate int `yaml:"branchRate"` // BranchRate number of nodes to randomly communicate with
InfectionCount int `yaml:"infectionCount"` BranchRate int `yaml:"branchRate"`
KeepAliveRate int `yaml:"keepAliveRate"` // InfectionCount number of times we sync before we can no longer catch the udpate
InfectionCount int `yaml:"infectionCount"`
// KeepAliveTime
KeepAliveTime int `yaml:"keepAliveTime"`
// Timeout number of seconds before we update node indicating that we are still alive
Timeout int `yaml:"timeout"`
} }
func ValidateConfiguration(c *WgMeshConfiguration) error { func ValidateConfiguration(c *WgMeshConfiguration) error {
@ -90,7 +98,7 @@ func ValidateConfiguration(c *WgMeshConfiguration) error {
} }
} }
if c.KeepAliveRate <= 0 { if c.KeepAliveTime <= 0 {
return &WgMeshConfigurationError{ return &WgMeshConfigurationError{
msg: "KeepAliveRate cannot be less than negative", msg: "KeepAliveRate cannot be less than negative",
} }
@ -102,6 +110,12 @@ func ValidateConfiguration(c *WgMeshConfiguration) error {
} }
} }
if c.Timeout <= 1 {
return &WgMeshConfigurationError{
msg: "Timeout should be less than or equal to 1",
}
}
return nil return nil
} }

View File

@ -15,7 +15,7 @@ func getExampleConfiguration() *WgMeshConfiguration {
SyncRate: 1, SyncRate: 1,
InterClusterChance: 0.1, InterClusterChance: 0.1,
BranchRate: 2, BranchRate: 2,
KeepAliveRate: 1, KeepAliveTime: 1,
InfectionCount: 1, InfectionCount: 1,
} }
} }
@ -110,7 +110,7 @@ func InfectionCountZero(t *testing.T) {
func KeepAliveRateZero(t *testing.T) { func KeepAliveRateZero(t *testing.T) {
conf := getExampleConfiguration() conf := getExampleConfiguration()
conf.KeepAliveRate = 0 conf.KeepAliveTime = 0
err := ValidateConfiguration(conf) err := ValidateConfiguration(conf)

View File

@ -35,6 +35,9 @@ func NewCtrlServer(params *NewCtrlServerParams) (*MeshCtrlServer, error) {
ipAllocator := &ip.ULABuilder{} ipAllocator := &ip.ULABuilder{}
interfaceManipulator := wg.NewWgInterfaceManipulator(params.Client) interfaceManipulator := wg.NewWgInterfaceManipulator(params.Client)
var meshManager mesh.MeshManagerImpl
configApplyer := mesh.NewWgMeshConfigApplyer()
meshManagerParams := &mesh.NewMeshManagerParams{ meshManagerParams := &mesh.NewMeshManagerParams{
Conf: *params.Conf, Conf: *params.Conf,
Client: params.Client, Client: params.Client,
@ -43,8 +46,10 @@ func NewCtrlServer(params *NewCtrlServerParams) (*MeshCtrlServer, error) {
IdGenerator: idGenerator, IdGenerator: idGenerator,
IPAllocator: ipAllocator, IPAllocator: ipAllocator,
InterfaceManipulator: interfaceManipulator, InterfaceManipulator: interfaceManipulator,
ConfigApplyer: mesh.NewWgMeshConfigApplyer(ctrlServer.MeshManager), ConfigApplyer: configApplyer,
} }
configApplyer.SetMeshManager(&meshManager)
ctrlServer.MeshManager = mesh.NewMeshManager(meshManagerParams) ctrlServer.MeshManager = mesh.NewMeshManager(meshManagerParams)
ctrlServer.Conf = params.Conf ctrlServer.Conf = params.Conf

View File

@ -1,7 +1,6 @@
package mesh package mesh
import ( import (
"errors"
"fmt" "fmt"
"net" "net"
@ -12,6 +11,7 @@ import (
type MeshConfigApplyer interface { type MeshConfigApplyer interface {
ApplyConfig() error ApplyConfig() error
RemovePeers(meshId string) error RemovePeers(meshId string) error
SetMeshManager(manager MeshManager)
} }
// WgMeshConfigApplyer applies WireGuard configuration // WgMeshConfigApplyer applies WireGuard configuration
@ -101,7 +101,7 @@ func (m *WgMeshConfigApplyer) RemovePeers(meshId string) error {
mesh := m.meshManager.GetMesh(meshId) mesh := m.meshManager.GetMesh(meshId)
if mesh == nil { if mesh == nil {
return errors.New(fmt.Sprintf("mesh %s does not exist", meshId)) return fmt.Errorf("mesh %s does not exist", meshId)
} }
dev, err := mesh.GetDevice() dev, err := mesh.GetDevice()
@ -118,6 +118,10 @@ func (m *WgMeshConfigApplyer) RemovePeers(meshId string) error {
return nil return nil
} }
func NewWgMeshConfigApplyer(manager MeshManager) MeshConfigApplyer { func (m *WgMeshConfigApplyer) SetMeshManager(manager MeshManager) {
return &WgMeshConfigApplyer{meshManager: manager} m.meshManager = manager
}
func NewWgMeshConfigApplyer() MeshConfigApplyer {
return &WgMeshConfigApplyer{}
} }

View File

@ -76,7 +76,6 @@ func (m *MeshManagerImpl) CreateMesh(devName string, port int) (string, error) {
} }
m.Meshes[meshId] = nodeManager m.Meshes[meshId] = nodeManager
err = m.configApplyer.RemovePeers(meshId)
if err != nil { if err != nil {
logging.Log.WriteErrorf(err.Error()) logging.Log.WriteErrorf(err.Error())
@ -327,6 +326,7 @@ func NewMeshManager(params *NewMeshManagerParams) *MeshManagerImpl {
Client: params.Client, Client: params.Client,
conf: &params.Conf, conf: &params.Conf,
} }
m.configApplyer = params.ConfigApplyer m.configApplyer = params.ConfigApplyer
m.RouteManager = NewRouteManager(m) m.RouteManager = NewRouteManager(m)
m.idGenerator = params.IdGenerator m.idGenerator = params.IdGenerator

View File

@ -18,7 +18,7 @@ func getMeshConfiguration() *conf.WgMeshConfiguration {
BranchRate: 3, BranchRate: 3,
InterClusterChance: 0.15, InterClusterChance: 0.15,
InfectionCount: 2, InfectionCount: 2,
KeepAliveRate: 60, KeepAliveTime: 60,
} }
} }

View File

@ -21,6 +21,11 @@ type MeshNodeStub struct {
description string description string
} }
// GetHealth implements MeshNode.
func (*MeshNodeStub) GetHealth() int {
return 5
}
func (m *MeshNodeStub) GetHostEndpoint() string { func (m *MeshNodeStub) GetHostEndpoint() string {
return m.hostEndpoint return m.hostEndpoint
} }
@ -66,6 +71,16 @@ type MeshProviderStub struct {
snapshot *MeshSnapshotStub snapshot *MeshSnapshotStub
} }
// DecrementHealth implements MeshProvider.
func (*MeshProviderStub) DecrementHealth(nodeId string, selfId string) error {
return nil
}
// IncrementHealth implements MeshProvider.
func (*MeshProviderStub) IncrementHealth(nodeId string, selfId string) error {
return nil
}
// UpdateTimeStamp implements MeshProvider. // UpdateTimeStamp implements MeshProvider.
func (*MeshProviderStub) UpdateTimeStamp(nodeId string) error { func (*MeshProviderStub) UpdateTimeStamp(nodeId string) error {
return nil return nil

View File

@ -28,6 +28,8 @@ type MeshNode interface {
GetIdentifier() string GetIdentifier() string
// GetDescription: returns the description for this node // GetDescription: returns the description for this node
GetDescription() string GetDescription() string
// GetHealth: returns the health score for this mesh node
GetHealth() int
} }
type MeshSnapshot interface { type MeshSnapshot interface {
@ -58,20 +60,27 @@ type MeshProvider interface {
GetDevice() (*wgtypes.Device, error) GetDevice() (*wgtypes.Device, error)
// HasChanges returns true if we have changes since last time we synced // HasChanges returns true if we have changes since last time we synced
HasChanges() bool HasChanges() bool
// Record that we have changges and save the corresponding changes // Record that we have changes and save the corresponding changes
SaveChanges() SaveChanges()
// UpdateTimeStamp: update the timestamp of the given node // UpdateTimeStamp: update the timestamp of the given node
UpdateTimeStamp(nodeId string) error UpdateTimeStamp(nodeId string) error
// AddRoutes: adds routes to the given node // AddRoutes: adds routes to the given node
AddRoutes(nodeId string, route ...string) error AddRoutes(nodeId string, route ...string) error
// GetSyncer: returns the automerge syncer for sync
GetSyncer() MeshSyncer GetSyncer() MeshSyncer
// SetDescription: sets the description of this automerge data type
SetDescription(nodeId string, description string) error SetDescription(nodeId string, description string) error
// DecrementHealth: indicates that the node with selfId thinks that the node
// is down
DecrementHealth(nodeId string, selfId string) error
// IncrementHealth: indicates that the node is up and so increment the health of the
// node
IncrementHealth(nodeId string, selfId string) error
} }
// HostParameters contains the IDs of a node // HostParameters contains the IDs of a node
type HostParameters struct { type HostParameters struct {
HostEndpoint string HostEndpoint string
// TODO: Contain the WireGungracefullyuard identifier in this
} }
// MeshProviderFactoryParams parameters required to build a mesh provider // MeshProviderFactoryParams parameters required to build a mesh provider

View File

@ -31,6 +31,7 @@ type QueryNode struct {
Timestamp int64 `json:"timestmap"` Timestamp int64 `json:"timestmap"`
Description string `json:"description"` Description string `json:"description"`
Routes []string `json:"routes"` Routes []string `json:"routes"`
Health int `json:"health"`
} }
func (m *QueryError) Error() string { func (m *QueryError) Error() string {
@ -76,6 +77,7 @@ func meshNodeToQueryNode(node mesh.MeshNode) *QueryNode {
queryNode.Timestamp = node.GetTimeStamp() queryNode.Timestamp = node.GetTimeStamp()
queryNode.Routes = node.GetRoutes() queryNode.Routes = node.GetRoutes()
queryNode.Description = node.GetDescription() queryNode.Description = node.GetDescription()
queryNode.Health = node.GetHealth()
return queryNode return queryNode
} }

View File

@ -24,6 +24,13 @@ func (s *SyncErrorHandlerImpl) incrementFailedCount(meshId string, endpoint stri
return false return false
} }
self, err := s.meshManager.GetSelf(meshId)
if err != nil {
return false
}
mesh.DecrementHealth(meshId, self.GetHostEndpoint())
return true return true
} }

View File

@ -89,20 +89,34 @@ func (s *SyncRequesterImpl) SyncMesh(meshId, endpoint string) error {
c := rpc.NewSyncServiceClient(client) c := rpc.NewSyncServiceClient(client)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) syncTimeOut := s.server.Conf.SyncRate * float64(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(syncTimeOut))
defer cancel() defer cancel()
err = syncMesh(mesh, ctx, c) err = s.syncMesh(mesh, ctx, c)
if err != nil { if err != nil {
return s.handleErr(meshId, endpoint, err) return s.handleErr(meshId, endpoint, err)
} }
self, err := s.server.MeshManager.GetSelf(mesh.GetMeshId())
if err != nil {
return err
}
err = mesh.IncrementHealth(meshId, self.GetHostEndpoint())
if err != nil {
return err
}
logging.Log.WriteInfof("Synced with node: %s meshId: %s\n", endpoint, meshId) logging.Log.WriteInfof("Synced with node: %s meshId: %s\n", endpoint, meshId)
return nil return nil
} }
func syncMesh(mesh mesh.MeshProvider, ctx context.Context, client rpc.SyncServiceClient) error { func (s *SyncRequesterImpl) syncMesh(mesh mesh.MeshProvider, ctx context.Context, client rpc.SyncServiceClient) error {
stream, err := client.SyncMesh(ctx) stream, err := client.SyncMesh(ctx)
syncer := mesh.GetSyncer() syncer := mesh.GetSyncer()

View File

@ -41,7 +41,7 @@ func (s *TimeStampSchedulerImpl) Run() error {
func NewTimestampScheduler(ctrlServer *ctrlserver.MeshCtrlServer) TimestampScheduler { func NewTimestampScheduler(ctrlServer *ctrlserver.MeshCtrlServer) TimestampScheduler {
return &TimeStampSchedulerImpl{ return &TimeStampSchedulerImpl{
meshManager: ctrlServer.MeshManager, meshManager: ctrlServer.MeshManager,
updateRate: ctrlServer.Conf.KeepAliveRate, updateRate: ctrlServer.Conf.KeepAliveTime,
} }
} }