diff --git a/cmd/wgmeshd/configuration.yaml b/cmd/wgmeshd/configuration.yaml index 343a69c..103e958 100644 --- a/cmd/wgmeshd/configuration.yaml +++ b/cmd/wgmeshd/configuration.yaml @@ -2,6 +2,7 @@ certificatePath: "/wgmesh/cert/cert.pem" privateKeyPath: "/wgmesh/cert/priv.pem" caCertificatePath: "/wgmesh/cert/cacert.pem" skipCertVerification: true +timeout: 5 gRPCPort: "21906" advertiseRoutes: true clusterSize: 32 @@ -9,4 +10,4 @@ syncRate: 1 interClusterChance: 0.15 branchRate: 3 infectionCount: 3 -keepAliveRate: 60 \ No newline at end of file +keepAliveTime: 60 \ No newline at end of file diff --git a/pkg/automerge/automerge.go b/pkg/automerge/automerge.go index c21de6f..1443a9a 100644 --- a/pkg/automerge/automerge.go +++ b/pkg/automerge/automerge.go @@ -212,6 +212,68 @@ func (m *CrdtMeshManager) GetSyncer() mesh.MeshSyncer { return NewAutomergeSync(m) } +// getHealthMap returns the health map from the automerge CRDT +func (m *CrdtMeshManager) getHealthMap(nodeId string) (*automerge.Map, error) { + node, err := m.doc.Path("nodes").Map().Get(nodeId) + + if err != nil { + return nil, err + } + + if node.Kind() != automerge.KindMap { + return nil, errors.New("node should be a map") + } + + nodeMap := node.Map() + + health, err := nodeMap.Get("health") + + if err != nil { + return nil, err + } + + if health.Kind() != automerge.KindMap { + return nil, errors.New("health should be a map") + } + + healthMap := health.Map() + return healthMap, nil +} + +// DecrementHealth: indicates that the current node has voted that the health is down +func (m *CrdtMeshManager) DecrementHealth(nodeId string, selfId string) error { + healthMap, err := m.getHealthMap(nodeId) + + if err != nil { + return err + } + + err = healthMap.Set(selfId, struct{}{}) + + if err != nil { + logging.Log.WriteErrorf(err.Error()) + } + + return nil +} + +// IncrementHealth: indicates that the current node thinks that the noden is up +func (m *CrdtMeshManager) IncrementHealth(nodeId string, selfId string) error { + healthMap, err := m.getHealthMap(nodeId) + + if err != nil { + return err + } + + err = healthMap.Delete(selfId) + + if err != nil { + logging.Log.WriteErrorf(err.Error()) + } + + return nil +} + func (m1 *MeshNodeCrdt) Compare(m2 *MeshNodeCrdt) int { return strings.Compare(m1.PublicKey, m2.PublicKey) } @@ -260,6 +322,10 @@ func (m *MeshNodeCrdt) GetIdentifier() string { return strings.Join(constituents, ":") } +func (m *MeshNodeCrdt) GetHealth() int { + return len(m.Health) +} + func (m *MeshCrdt) GetNodes() map[string]mesh.MeshNode { nodes := make(map[string]mesh.MeshNode) diff --git a/pkg/automerge/types.go b/pkg/automerge/types.go index 60315fe..f4216e9 100644 --- a/pkg/automerge/types.go +++ b/pkg/automerge/types.go @@ -9,6 +9,7 @@ type MeshNodeCrdt struct { Timestamp int64 `automerge:"timestamp"` Routes map[string]interface{} `automerge:"routes"` Description string `automerge:"description"` + Health map[string]interface{} `automerge:"health"` } // MeshCrdt: Represents the mesh network as a whole diff --git a/pkg/conf/conf.go b/pkg/conf/conf.go index 8ea0f01..146d479 100644 --- a/pkg/conf/conf.go +++ b/pkg/conf/conf.go @@ -32,13 +32,21 @@ type WgMeshConfiguration struct { AdvertiseRoutes bool `yaml:"advertiseRoutes"` // Endpoint is the IP in which this computer is publicly reachable. // usecase is when the node has multiple IP addresses - Endpoint string `yaml:"publicEndpoint"` - ClusterSize int `yaml:"clusterSize"` - SyncRate float64 `yaml:"syncRate"` + Endpoint string `yaml:"publicEndpoint"` + // ClusterSize size of the cluster to split on + ClusterSize int `yaml:"clusterSize"` + // SyncRate number of times per second to perform a sync + SyncRate float64 `yaml:"syncRate"` + // InterClusterChance proability of inter-cluster communication in a sync round InterClusterChance float64 `yaml:"interClusterChance"` - BranchRate int `yaml:"branchRate"` - InfectionCount int `yaml:"infectionCount"` - KeepAliveRate int `yaml:"keepAliveRate"` + // BranchRate number of nodes to randomly communicate with + BranchRate int `yaml:"branchRate"` + // InfectionCount number of times we sync before we can no longer catch the udpate + InfectionCount int `yaml:"infectionCount"` + // KeepAliveTime + KeepAliveTime int `yaml:"keepAliveTime"` + // Timeout number of seconds before we update node indicating that we are still alive + Timeout int `yaml:"timeout"` } func ValidateConfiguration(c *WgMeshConfiguration) error { @@ -90,7 +98,7 @@ func ValidateConfiguration(c *WgMeshConfiguration) error { } } - if c.KeepAliveRate <= 0 { + if c.KeepAliveTime <= 0 { return &WgMeshConfigurationError{ msg: "KeepAliveRate cannot be less than negative", } @@ -102,6 +110,12 @@ func ValidateConfiguration(c *WgMeshConfiguration) error { } } + if c.Timeout <= 1 { + return &WgMeshConfigurationError{ + msg: "Timeout should be less than or equal to 1", + } + } + return nil } diff --git a/pkg/conf/conf_test.go b/pkg/conf/conf_test.go index 04d85e9..ba7b665 100644 --- a/pkg/conf/conf_test.go +++ b/pkg/conf/conf_test.go @@ -15,7 +15,7 @@ func getExampleConfiguration() *WgMeshConfiguration { SyncRate: 1, InterClusterChance: 0.1, BranchRate: 2, - KeepAliveRate: 1, + KeepAliveTime: 1, InfectionCount: 1, } } @@ -110,7 +110,7 @@ func InfectionCountZero(t *testing.T) { func KeepAliveRateZero(t *testing.T) { conf := getExampleConfiguration() - conf.KeepAliveRate = 0 + conf.KeepAliveTime = 0 err := ValidateConfiguration(conf) diff --git a/pkg/ctrlserver/ctrlserver.go b/pkg/ctrlserver/ctrlserver.go index 0a6e760..a3ed638 100644 --- a/pkg/ctrlserver/ctrlserver.go +++ b/pkg/ctrlserver/ctrlserver.go @@ -35,6 +35,9 @@ func NewCtrlServer(params *NewCtrlServerParams) (*MeshCtrlServer, error) { ipAllocator := &ip.ULABuilder{} interfaceManipulator := wg.NewWgInterfaceManipulator(params.Client) + var meshManager mesh.MeshManagerImpl + configApplyer := mesh.NewWgMeshConfigApplyer() + meshManagerParams := &mesh.NewMeshManagerParams{ Conf: *params.Conf, Client: params.Client, @@ -43,8 +46,10 @@ func NewCtrlServer(params *NewCtrlServerParams) (*MeshCtrlServer, error) { IdGenerator: idGenerator, IPAllocator: ipAllocator, InterfaceManipulator: interfaceManipulator, - ConfigApplyer: mesh.NewWgMeshConfigApplyer(ctrlServer.MeshManager), + ConfigApplyer: configApplyer, } + + configApplyer.SetMeshManager(&meshManager) ctrlServer.MeshManager = mesh.NewMeshManager(meshManagerParams) ctrlServer.Conf = params.Conf diff --git a/pkg/mesh/config.go b/pkg/mesh/config.go index c269d88..35e9258 100644 --- a/pkg/mesh/config.go +++ b/pkg/mesh/config.go @@ -1,7 +1,6 @@ package mesh import ( - "errors" "fmt" "net" @@ -12,6 +11,7 @@ import ( type MeshConfigApplyer interface { ApplyConfig() error RemovePeers(meshId string) error + SetMeshManager(manager MeshManager) } // WgMeshConfigApplyer applies WireGuard configuration @@ -101,7 +101,7 @@ func (m *WgMeshConfigApplyer) RemovePeers(meshId string) error { mesh := m.meshManager.GetMesh(meshId) if mesh == nil { - return errors.New(fmt.Sprintf("mesh %s does not exist", meshId)) + return fmt.Errorf("mesh %s does not exist", meshId) } dev, err := mesh.GetDevice() @@ -118,6 +118,10 @@ func (m *WgMeshConfigApplyer) RemovePeers(meshId string) error { return nil } -func NewWgMeshConfigApplyer(manager MeshManager) MeshConfigApplyer { - return &WgMeshConfigApplyer{meshManager: manager} +func (m *WgMeshConfigApplyer) SetMeshManager(manager MeshManager) { + m.meshManager = manager +} + +func NewWgMeshConfigApplyer() MeshConfigApplyer { + return &WgMeshConfigApplyer{} } diff --git a/pkg/mesh/manager.go b/pkg/mesh/manager.go index fc0140d..5b85f6c 100644 --- a/pkg/mesh/manager.go +++ b/pkg/mesh/manager.go @@ -76,7 +76,6 @@ func (m *MeshManagerImpl) CreateMesh(devName string, port int) (string, error) { } m.Meshes[meshId] = nodeManager - err = m.configApplyer.RemovePeers(meshId) if err != nil { logging.Log.WriteErrorf(err.Error()) @@ -327,6 +326,7 @@ func NewMeshManager(params *NewMeshManagerParams) *MeshManagerImpl { Client: params.Client, conf: ¶ms.Conf, } + m.configApplyer = params.ConfigApplyer m.RouteManager = NewRouteManager(m) m.idGenerator = params.IdGenerator diff --git a/pkg/mesh/manager_test.go b/pkg/mesh/manager_test.go index 9760683..8393b88 100644 --- a/pkg/mesh/manager_test.go +++ b/pkg/mesh/manager_test.go @@ -18,7 +18,7 @@ func getMeshConfiguration() *conf.WgMeshConfiguration { BranchRate: 3, InterClusterChance: 0.15, InfectionCount: 2, - KeepAliveRate: 60, + KeepAliveTime: 60, } } diff --git a/pkg/mesh/stub_types.go b/pkg/mesh/stub_types.go index 8d1ed14..9605d88 100644 --- a/pkg/mesh/stub_types.go +++ b/pkg/mesh/stub_types.go @@ -21,6 +21,11 @@ type MeshNodeStub struct { description string } +// GetHealth implements MeshNode. +func (*MeshNodeStub) GetHealth() int { + return 5 +} + func (m *MeshNodeStub) GetHostEndpoint() string { return m.hostEndpoint } @@ -66,6 +71,16 @@ type MeshProviderStub struct { snapshot *MeshSnapshotStub } +// DecrementHealth implements MeshProvider. +func (*MeshProviderStub) DecrementHealth(nodeId string, selfId string) error { + return nil +} + +// IncrementHealth implements MeshProvider. +func (*MeshProviderStub) IncrementHealth(nodeId string, selfId string) error { + return nil +} + // UpdateTimeStamp implements MeshProvider. func (*MeshProviderStub) UpdateTimeStamp(nodeId string) error { return nil diff --git a/pkg/mesh/types.go b/pkg/mesh/types.go index fd0b0e2..ef6e1e5 100644 --- a/pkg/mesh/types.go +++ b/pkg/mesh/types.go @@ -28,6 +28,8 @@ type MeshNode interface { GetIdentifier() string // GetDescription: returns the description for this node GetDescription() string + // GetHealth: returns the health score for this mesh node + GetHealth() int } type MeshSnapshot interface { @@ -58,20 +60,27 @@ type MeshProvider interface { GetDevice() (*wgtypes.Device, error) // HasChanges returns true if we have changes since last time we synced HasChanges() bool - // Record that we have changges and save the corresponding changes + // Record that we have changes and save the corresponding changes SaveChanges() // UpdateTimeStamp: update the timestamp of the given node UpdateTimeStamp(nodeId string) error // AddRoutes: adds routes to the given node AddRoutes(nodeId string, route ...string) error + // GetSyncer: returns the automerge syncer for sync GetSyncer() MeshSyncer + // SetDescription: sets the description of this automerge data type SetDescription(nodeId string, description string) error + // DecrementHealth: indicates that the node with selfId thinks that the node + // is down + DecrementHealth(nodeId string, selfId string) error + // IncrementHealth: indicates that the node is up and so increment the health of the + // node + IncrementHealth(nodeId string, selfId string) error } // HostParameters contains the IDs of a node type HostParameters struct { HostEndpoint string - // TODO: Contain the WireGungracefullyuard identifier in this } // MeshProviderFactoryParams parameters required to build a mesh provider diff --git a/pkg/query/query.go b/pkg/query/query.go index 0978f08..95372b3 100644 --- a/pkg/query/query.go +++ b/pkg/query/query.go @@ -31,6 +31,7 @@ type QueryNode struct { Timestamp int64 `json:"timestmap"` Description string `json:"description"` Routes []string `json:"routes"` + Health int `json:"health"` } func (m *QueryError) Error() string { @@ -76,6 +77,7 @@ func meshNodeToQueryNode(node mesh.MeshNode) *QueryNode { queryNode.Timestamp = node.GetTimeStamp() queryNode.Routes = node.GetRoutes() queryNode.Description = node.GetDescription() + queryNode.Health = node.GetHealth() return queryNode } diff --git a/pkg/sync/syncererror.go b/pkg/sync/syncererror.go index e2d2260..7a5b529 100644 --- a/pkg/sync/syncererror.go +++ b/pkg/sync/syncererror.go @@ -24,6 +24,13 @@ func (s *SyncErrorHandlerImpl) incrementFailedCount(meshId string, endpoint stri return false } + self, err := s.meshManager.GetSelf(meshId) + + if err != nil { + return false + } + + mesh.DecrementHealth(meshId, self.GetHostEndpoint()) return true } diff --git a/pkg/sync/syncrequester.go b/pkg/sync/syncrequester.go index 75d037c..221544e 100644 --- a/pkg/sync/syncrequester.go +++ b/pkg/sync/syncrequester.go @@ -89,20 +89,34 @@ func (s *SyncRequesterImpl) SyncMesh(meshId, endpoint string) error { c := rpc.NewSyncServiceClient(client) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + syncTimeOut := s.server.Conf.SyncRate * float64(time.Second) + + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(syncTimeOut)) defer cancel() - err = syncMesh(mesh, ctx, c) + err = s.syncMesh(mesh, ctx, c) if err != nil { return s.handleErr(meshId, endpoint, err) } + self, err := s.server.MeshManager.GetSelf(mesh.GetMeshId()) + + if err != nil { + return err + } + + err = mesh.IncrementHealth(meshId, self.GetHostEndpoint()) + + if err != nil { + return err + } + logging.Log.WriteInfof("Synced with node: %s meshId: %s\n", endpoint, meshId) return nil } -func syncMesh(mesh mesh.MeshProvider, ctx context.Context, client rpc.SyncServiceClient) error { +func (s *SyncRequesterImpl) syncMesh(mesh mesh.MeshProvider, ctx context.Context, client rpc.SyncServiceClient) error { stream, err := client.SyncMesh(ctx) syncer := mesh.GetSyncer() diff --git a/pkg/timestamp/timestamp.go b/pkg/timestamp/timestamp.go index d03dbe8..0ca38e9 100644 --- a/pkg/timestamp/timestamp.go +++ b/pkg/timestamp/timestamp.go @@ -41,7 +41,7 @@ func (s *TimeStampSchedulerImpl) Run() error { func NewTimestampScheduler(ctrlServer *ctrlserver.MeshCtrlServer) TimestampScheduler { return &TimeStampSchedulerImpl{ meshManager: ctrlServer.MeshManager, - updateRate: ctrlServer.Conf.KeepAliveRate, + updateRate: ctrlServer.Conf.KeepAliveTime, } }