Merge pull request #73 from tim-beatham/72-pull-rate-in-configuration

72 pull rate in configuration
This commit is contained in:
Tim Beatham 2023-12-31 14:26:34 +00:00 committed by GitHub
commit 14f335af74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 380 additions and 233 deletions

View File

@ -22,25 +22,22 @@ type CreateMeshParams struct {
AdvertiseDefault bool AdvertiseDefault bool
} }
func createMesh(params *CreateMeshParams) string { func createMesh(client *ipc.ClientIpc, args *ipc.NewMeshArgs) {
var reply string var reply string
newMeshParams := ipc.NewMeshArgs{ err := client.CreateMesh(args, &reply)
WgArgs: params.WgArgs,
}
err := params.Client.Call("IpcHandler.CreateMesh", &newMeshParams, &reply)
if err != nil { if err != nil {
return err.Error() fmt.Println(err.Error())
return
} }
return reply fmt.Println(reply)
} }
func listMeshes(client *ipcRpc.Client) { func listMeshes(client *ipc.ClientIpc) {
reply := new(ipc.ListMeshReply) reply := new(ipc.ListMeshReply)
err := client.Call("IpcHandler.ListMeshes", "", &reply) err := client.ListMeshes(reply)
if err != nil { if err != nil {
logging.Log.WriteErrorf(err.Error()) logging.Log.WriteErrorf(err.Error())
@ -52,38 +49,22 @@ func listMeshes(client *ipcRpc.Client) {
} }
} }
type JoinMeshParams struct { func joinMesh(client *ipc.ClientIpc, args ipc.JoinMeshArgs) {
Client *ipcRpc.Client
MeshId string
IpAddress string
Endpoint string
WgArgs ipc.WireGuardArgs
AdvertiseRoutes bool
AdvertiseDefault bool
}
func joinMesh(params *JoinMeshParams) string {
var reply string var reply string
args := ipc.JoinMeshArgs{ err := client.JoinMesh(args, &reply)
MeshId: params.MeshId,
IpAdress: params.IpAddress,
WgArgs: params.WgArgs,
}
err := params.Client.Call("IpcHandler.JoinMesh", &args, &reply)
if err != nil { if err != nil {
return err.Error() fmt.Println(err.Error())
} }
return reply fmt.Println(reply)
} }
func leaveMesh(client *ipcRpc.Client, meshId string) { func leaveMesh(client *ipc.ClientIpc, meshId string) {
var reply string var reply string
err := client.Call("IpcHandler.LeaveMesh", &meshId, &reply) err := client.LeaveMesh(meshId, &reply)
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
@ -93,10 +74,10 @@ func leaveMesh(client *ipcRpc.Client, meshId string) {
fmt.Println(reply) fmt.Println(reply)
} }
func getGraph(client *ipcRpc.Client) { func getGraph(client *ipc.ClientIpc) {
listMeshesReply := new(ipc.ListMeshReply) listMeshesReply := new(ipc.ListMeshReply)
err := client.Call("IpcHandler.ListMeshes", "", &listMeshesReply) err := client.ListMeshes(listMeshesReply)
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
@ -108,7 +89,7 @@ func getGraph(client *ipcRpc.Client) {
for _, meshId := range listMeshesReply.Meshes { for _, meshId := range listMeshesReply.Meshes {
var meshReply ipc.GetMeshReply var meshReply ipc.GetMeshReply
err := client.Call("IpcHandler.GetMesh", &meshId, &meshReply) err := client.GetMesh(meshId, &meshReply)
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
@ -129,10 +110,15 @@ func getGraph(client *ipcRpc.Client) {
fmt.Println(dot) fmt.Println(dot)
} }
func queryMesh(client *ipcRpc.Client, meshId, query string) { func queryMesh(client *ipc.ClientIpc, meshId, query string) {
var reply string var reply string
err := client.Call("IpcHandler.Query", &ipc.QueryMesh{MeshId: meshId, Query: query}, &reply) args := ipc.QueryMesh{
MeshId: meshId,
Query: query,
}
err := client.Query(args, &reply)
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
@ -142,11 +128,13 @@ func queryMesh(client *ipcRpc.Client, meshId, query string) {
fmt.Println(reply) fmt.Println(reply)
} }
// putDescription: puts updates the description about the node to the meshes func putDescription(client *ipc.ClientIpc, meshId, description string) {
func putDescription(client *ipcRpc.Client, description string) {
var reply string var reply string
err := client.Call("IpcHandler.PutDescription", &description, &reply) err := client.PutDescription(ipc.PutDescriptionArgs{
MeshId: meshId,
Description: description,
}, &reply)
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
@ -157,10 +145,13 @@ func putDescription(client *ipcRpc.Client, description string) {
} }
// putAlias: puts an alias for the node // putAlias: puts an alias for the node
func putAlias(client *ipcRpc.Client, alias string) { func putAlias(client *ipc.ClientIpc, meshid, alias string) {
var reply string var reply string
err := client.Call("IpcHandler.PutAlias", &alias, &reply) err := client.PutAlias(ipc.PutAliasArgs{
MeshId: meshid,
Alias: alias,
}, &reply)
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
@ -170,15 +161,14 @@ func putAlias(client *ipcRpc.Client, alias string) {
fmt.Println(reply) fmt.Println(reply)
} }
func setService(client *ipcRpc.Client, service, value string) { func setService(client *ipc.ClientIpc, meshId, service, value string) {
var reply string var reply string
serviceArgs := &ipc.PutServiceArgs{ err := client.PutService(ipc.PutServiceArgs{
MeshId: meshId,
Service: service, Service: service,
Value: value, Value: value,
} }, &reply)
err := client.Call("IpcHandler.PutService", serviceArgs, &reply)
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
@ -188,10 +178,13 @@ func setService(client *ipcRpc.Client, service, value string) {
fmt.Println(reply) fmt.Println(reply)
} }
func deleteService(client *ipcRpc.Client, service string) { func deleteService(client *ipc.ClientIpc, meshId, service string) {
var reply string var reply string
err := client.Call("IpcHandler.PutService", &service, &reply) err := client.DeleteService(ipc.DeleteServiceArgs{
MeshId: meshId,
Service: service,
}, &reply)
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
@ -226,7 +219,6 @@ func main() {
}) })
var newMeshRole *string = newMeshCmd.Selector("r", "role", []string{"peer", "client"}, &argparse.Options{ var newMeshRole *string = newMeshCmd.Selector("r", "role", []string{"peer", "client"}, &argparse.Options{
Default: "peer",
Help: "Role in the mesh network. A value of peer means that the node is publicly routeable and thus considered" + Help: "Role in the mesh network. A value of peer means that the node is publicly routeable and thus considered" +
" in the gossip protocol. Client means that the node is not publicly routeable and is not a candidate in the gossip" + " in the gossip protocol. Client means that the node is not publicly routeable and is not a candidate in the gossip" +
" protocol", " protocol",
@ -259,7 +251,6 @@ func main() {
}) })
var joinMeshRole *string = joinMeshCmd.Selector("r", "role", []string{"peer", "client"}, &argparse.Options{ var joinMeshRole *string = joinMeshCmd.Selector("r", "role", []string{"peer", "client"}, &argparse.Options{
Default: "peer",
Help: "Role in the mesh network. A value of peer means that the node is publicly routeable and thus considered" + Help: "Role in the mesh network. A value of peer means that the node is publicly routeable and thus considered" +
" in the gossip protocol. Client means that the node is not publicly routeable and is not a candidate in the gossip" + " in the gossip protocol. Client means that the node is not publicly routeable and is not a candidate in the gossip" +
" protocol", " protocol",
@ -302,6 +293,16 @@ func main() {
Help: "Description of the node in the mesh", Help: "Description of the node in the mesh",
}) })
var descriptionMeshId *string = putDescriptionCmd.String("m", "meshid", &argparse.Options{
Required: true,
Help: "MeshID of the mesh network to join",
})
var aliasMeshId *string = putAliasCmd.String("m", "meshid", &argparse.Options{
Required: true,
Help: "MeshID of the mesh network to join",
})
var alias *string = putAliasCmd.String("a", "alias", &argparse.Options{ var alias *string = putAliasCmd.String("a", "alias", &argparse.Options{
Required: true, Required: true,
Help: "Alias of the node to set can be used in DNS to lookup an IP address", Help: "Alias of the node to set can be used in DNS to lookup an IP address",
@ -316,11 +317,21 @@ func main() {
Help: "Value of the service to advertise in the mesh network", Help: "Value of the service to advertise in the mesh network",
}) })
var serviceMeshId *string = setServiceCmd.String("m", "meshid", &argparse.Options{
Required: true,
Help: "MeshID of the mesh network to join",
})
var deleteServiceKey *string = deleteServiceCmd.String("s", "service", &argparse.Options{ var deleteServiceKey *string = deleteServiceCmd.String("s", "service", &argparse.Options{
Required: true, Required: true,
Help: "Key of the service to remove", Help: "Key of the service to remove",
}) })
var deleteServiceMeshid *string = deleteServiceCmd.String("m", "meshid", &argparse.Options{
Required: true,
Help: "MeshID of the mesh network to join",
})
err := parser.Parse(os.Args) err := parser.Parse(os.Args)
if err != nil { if err != nil {
@ -328,16 +339,13 @@ func main() {
return return
} }
client, err := ipcRpc.DialHTTP("unix", SockAddr) client, err := ipc.NewClientIpc()
if err != nil { if err != nil {
fmt.Println(err.Error()) panic(err)
return
} }
if newMeshCmd.Happened() { if newMeshCmd.Happened() {
fmt.Println(createMesh(&CreateMeshParams{ args := &ipc.NewMeshArgs{
Client: client,
Endpoint: *newMeshEndpoint,
WgArgs: ipc.WireGuardArgs{ WgArgs: ipc.WireGuardArgs{
Endpoint: *newMeshEndpoint, Endpoint: *newMeshEndpoint,
Role: *newMeshRole, Role: *newMeshRole,
@ -346,7 +354,9 @@ func main() {
AdvertiseDefaultRoute: *newMeshAdvertiseDefaults, AdvertiseDefaultRoute: *newMeshAdvertiseDefaults,
AdvertiseRoutes: *newMeshAdvertiseRoutes, AdvertiseRoutes: *newMeshAdvertiseRoutes,
}, },
})) }
createMesh(client, args)
} }
if listMeshCmd.Happened() { if listMeshCmd.Happened() {
@ -354,11 +364,9 @@ func main() {
} }
if joinMeshCmd.Happened() { if joinMeshCmd.Happened() {
fmt.Println(joinMesh(&JoinMeshParams{ args := ipc.JoinMeshArgs{
Client: client,
IpAddress: *joinMeshIpAddress, IpAddress: *joinMeshIpAddress,
MeshId: *joinMeshId, MeshId: *joinMeshId,
Endpoint: *joinMeshEndpoint,
WgArgs: ipc.WireGuardArgs{ WgArgs: ipc.WireGuardArgs{
Endpoint: *joinMeshEndpoint, Endpoint: *joinMeshEndpoint,
Role: *joinMeshRole, Role: *joinMeshRole,
@ -367,7 +375,8 @@ func main() {
AdvertiseDefaultRoute: *joinMeshAdvertiseDefaults, AdvertiseDefaultRoute: *joinMeshAdvertiseDefaults,
AdvertiseRoutes: *joinMeshAdvertiseRoutes, AdvertiseRoutes: *joinMeshAdvertiseRoutes,
}, },
})) }
joinMesh(client, args)
} }
if getGraphCmd.Happened() { if getGraphCmd.Happened() {
@ -383,18 +392,18 @@ func main() {
} }
if putDescriptionCmd.Happened() { if putDescriptionCmd.Happened() {
putDescription(client, *description) putDescription(client, *descriptionMeshId, *description)
} }
if putAliasCmd.Happened() { if putAliasCmd.Happened() {
putAlias(client, *alias) putAlias(client, *aliasMeshId, *alias)
} }
if setServiceCmd.Happened() { if setServiceCmd.Happened() {
setService(client, *serviceKey, *serviceValue) setService(client, *serviceMeshId, *serviceKey, *serviceValue)
} }
if deleteServiceCmd.Happened() { if deleteServiceCmd.Happened() {
deleteService(client, *deleteServiceKey) deleteService(client, *deleteServiceMeshid, *deleteServiceKey)
} }
} }

View File

@ -10,5 +10,5 @@ syncRate: 1
interClusterChance: 0.15 interClusterChance: 0.15
branchRate: 3 branchRate: 3
infectionCount: 3 infectionCount: 3
keepAliveTime: 10 heartBeatTime: 10
pruneTime: 20 pruneTime: 20

View File

@ -59,6 +59,11 @@ func main() {
} }
ctrlServer, err := ctrlserver.NewCtrlServer(&ctrlServerParams) ctrlServer, err := ctrlserver.NewCtrlServer(&ctrlServerParams)
if err != nil {
panic(err)
}
syncProvider.Server = ctrlServer syncProvider.Server = ctrlServer
syncRequester = sync.NewSyncRequester(ctrlServer) syncRequester = sync.NewSyncRequester(ctrlServer)
syncer = sync.NewSyncer(ctrlServer.MeshManager, conf, syncRequester) syncer = sync.NewSyncer(ctrlServer.MeshManager, conf, syncRequester)

View File

@ -10,5 +10,5 @@ syncRate: 1
interClusterChance: 0.15 interClusterChance: 0.15
branchRate: 3 branchRate: 3
infectionCount: 3 infectionCount: 3
keepAliveTime: 10 heartBeatTime: 10
pruneTime: 20 pruneTime: 20

View File

@ -10,5 +10,5 @@ syncRate: 1
interClusterChance: 0.15 interClusterChance: 0.15
branchRate: 3 branchRate: 3
infectionCount: 3 infectionCount: 3
keepAliveTime: 10 heartBeatTime: 10
pruneTime: 20 pruneTime: 20

View File

@ -4,8 +4,6 @@ import (
"fmt" "fmt"
"net/http" "net/http"
ipcRpc "net/rpc"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/tim-beatham/wgmesh/pkg/ctrlserver" "github.com/tim-beatham/wgmesh/pkg/ctrlserver"
"github.com/tim-beatham/wgmesh/pkg/ipc" "github.com/tim-beatham/wgmesh/pkg/ipc"
@ -13,8 +11,6 @@ import (
"github.com/tim-beatham/wgmesh/pkg/what8words" "github.com/tim-beatham/wgmesh/pkg/what8words"
) )
const SockAddr = "/tmp/wgmesh_ipc.sock"
type ApiServer interface { type ApiServer interface {
GetMeshes(c *gin.Context) GetMeshes(c *gin.Context)
Run(addr string) error Run(addr string) error
@ -22,7 +18,7 @@ type ApiServer interface {
type SmegServer struct { type SmegServer struct {
router *gin.Engine router *gin.Engine
client *ipcRpc.Client client *ipc.ClientIpc
words *what8words.What8Words words *what8words.What8Words
} }
@ -106,7 +102,7 @@ func (s *SmegServer) CreateMesh(c *gin.Context) {
var reply string var reply string
err := s.client.Call("IpcHandler.CreateMesh", &ipcRequest, &reply) err := s.client.CreateMesh(&ipcRequest, &reply)
if err != nil { if err != nil {
c.JSON(http.StatusBadRequest, &gin.H{ c.JSON(http.StatusBadRequest, &gin.H{
@ -133,7 +129,7 @@ func (s *SmegServer) JoinMesh(c *gin.Context) {
ipcRequest := ipc.JoinMeshArgs{ ipcRequest := ipc.JoinMeshArgs{
MeshId: joinMesh.MeshId, MeshId: joinMesh.MeshId,
IpAdress: joinMesh.Bootstrap, IpAddress: joinMesh.Bootstrap,
WgArgs: ipc.WireGuardArgs{ WgArgs: ipc.WireGuardArgs{
WgPort: joinMesh.WgPort, WgPort: joinMesh.WgPort,
}, },
@ -141,7 +137,7 @@ func (s *SmegServer) JoinMesh(c *gin.Context) {
var reply string var reply string
err := s.client.Call("IpcHandler.JoinMesh", &ipcRequest, &reply) err := s.client.JoinMesh(ipcRequest, &reply)
if err != nil { if err != nil {
c.JSON(http.StatusBadRequest, &gin.H{ c.JSON(http.StatusBadRequest, &gin.H{
@ -164,7 +160,7 @@ func (s *SmegServer) GetMesh(c *gin.Context) {
getMeshReply := new(ipc.GetMeshReply) getMeshReply := new(ipc.GetMeshReply)
err := s.client.Call("IpcHandler.GetMesh", &meshid, &getMeshReply) err := s.client.GetMesh(meshid, getMeshReply)
if err != nil { if err != nil {
c.JSON(http.StatusNotFound, c.JSON(http.StatusNotFound,
@ -182,7 +178,7 @@ func (s *SmegServer) GetMesh(c *gin.Context) {
func (s *SmegServer) GetMeshes(c *gin.Context) { func (s *SmegServer) GetMeshes(c *gin.Context) {
listMeshesReply := new(ipc.ListMeshReply) listMeshesReply := new(ipc.ListMeshReply)
err := s.client.Call("IpcHandler.ListMeshes", "", &listMeshesReply) err := s.client.ListMeshes(listMeshesReply)
if err != nil { if err != nil {
logging.Log.WriteErrorf(err.Error()) logging.Log.WriteErrorf(err.Error())
@ -195,7 +191,7 @@ func (s *SmegServer) GetMeshes(c *gin.Context) {
for _, mesh := range listMeshesReply.Meshes { for _, mesh := range listMeshesReply.Meshes {
getMeshReply := new(ipc.GetMeshReply) getMeshReply := new(ipc.GetMeshReply)
err := s.client.Call("IpcHandler.GetMesh", &mesh, &getMeshReply) err := s.client.GetMesh(mesh, getMeshReply)
if err != nil { if err != nil {
logging.Log.WriteErrorf(err.Error()) logging.Log.WriteErrorf(err.Error())
@ -215,7 +211,7 @@ func (s *SmegServer) Run(addr string) error {
} }
func NewSmegServer(conf ApiServerConf) (ApiServer, error) { func NewSmegServer(conf ApiServerConf) (ApiServer, error) {
client, err := ipcRpc.DialHTTP("unix", SockAddr) client, err := ipc.NewClientIpc()
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -47,7 +47,6 @@ type WgConfiguration struct {
// If the user is globaly accessible they specify themselves as a client. // If the user is globaly accessible they specify themselves as a client.
Role *NodeType `yaml:"role" validate:"required,eq=client|eq=peer"` Role *NodeType `yaml:"role" validate:"required,eq=client|eq=peer"`
// KeepAliveWg configures the implementation so that we send keep alive packets to peers. // KeepAliveWg configures the implementation so that we send keep alive packets to peers.
// KeepAlive can only be set if role is type client
KeepAliveWg *int `yaml:"keepAliveWg" validate:"omitempty,gte=0"` KeepAliveWg *int `yaml:"keepAliveWg" validate:"omitempty,gte=0"`
// PreUp are WireGuard commands to run before adding the WG interface // PreUp are WireGuard commands to run before adding the WG interface
PreUp []string `yaml:"preUp"` PreUp []string `yaml:"preUp"`
@ -77,11 +76,13 @@ type DaemonConfiguration struct {
Profile bool `yaml:"profile"` Profile bool `yaml:"profile"`
// StubWg whether or not to stub the WireGuard types // StubWg whether or not to stub the WireGuard types
StubWg bool `yaml:"stubWg"` StubWg bool `yaml:"stubWg"`
// SyncRate specifies how long the minimum time should be between synchronisation // SyncTime specifies how long the minimum time should be between synchronisation
SyncRate int `yaml:"syncRate" validate:"required,gte=1"` SyncTime int `yaml:"syncTime" validate:"required,gte=1"`
// KeepAliveTime: number of seconds before the leader of the mesh sends an update to // PullTime specifies the interval between checking for configuration changes
PullTime int `yaml:"pullTime" validate:"gte=0"`
// HeartBeat: number of seconds before the leader of the mesh sends an update to
// send to every member in the mesh // send to every member in the mesh
KeepAliveTime int `yaml:"keepAliveTime" validate:"required,gte=1"` HeartBeat int `yaml:"heartBeatTime" validate:"required,gte=1"`
// ClusterSize specifies how many neighbours you should synchronise with per round // ClusterSize specifies how many neighbours you should synchronise with per round
ClusterSize int `yaml:"clusterSize" validate:"gte=1"` ClusterSize int `yaml:"clusterSize" validate:"gte=1"`
// InterClusterChance specifies the probabilityof inter-cluster communication in a sync round // InterClusterChance specifies the probabilityof inter-cluster communication in a sync round

View File

@ -21,11 +21,12 @@ func getExampleConfiguration() *DaemonConfiguration {
Timeout: 5, Timeout: 5,
Profile: false, Profile: false,
StubWg: false, StubWg: false,
SyncRate: 2, SyncTime: 2,
KeepAliveTime: 2, HeartBeat: 2,
ClusterSize: 64, ClusterSize: 64,
InterClusterChance: 0.15, InterClusterChance: 0.15,
BranchRate: 3, BranchRate: 3,
PullTime: 0,
InfectionCount: 2, InfectionCount: 2,
BaseConfiguration: WgConfiguration{ BaseConfiguration: WgConfiguration{
IPDiscovery: &discovery, IPDiscovery: &discovery,
@ -162,9 +163,9 @@ func TestBranchRateZero(t *testing.T) {
} }
} }
func TestSyncRateZero(t *testing.T) { func TestsyncTimeZero(t *testing.T) {
conf := getExampleConfiguration() conf := getExampleConfiguration()
conf.SyncRate = 0 conf.SyncTime = 0
err := ValidateDaemonConfiguration(conf) err := ValidateDaemonConfiguration(conf)
@ -175,7 +176,7 @@ func TestSyncRateZero(t *testing.T) {
func TestKeepAliveTimeZero(t *testing.T) { func TestKeepAliveTimeZero(t *testing.T) {
conf := getExampleConfiguration() conf := getExampleConfiguration()
conf.KeepAliveTime = 0 conf.HeartBeat = 0
err := ValidateDaemonConfiguration(conf) err := ValidateDaemonConfiguration(conf)
if err == nil { if err == nil {
@ -215,6 +216,17 @@ func TestInfectionCountOne(t *testing.T) {
} }
} }
func TestPullTimeNegative(t *testing.T) {
conf := getExampleConfiguration()
conf.PullTime = -1
err := ValidateDaemonConfiguration(conf)
if err == nil {
t.Fatal(`error should be thrown`)
}
}
func TestValidConfiguration(t *testing.T) { func TestValidConfiguration(t *testing.T) {
conf := getExampleConfiguration() conf := getExampleConfiguration()
err := ValidateDaemonConfiguration(conf) err := ValidateDaemonConfiguration(conf)

View File

@ -264,7 +264,7 @@ func (m *TwoPhaseStoreMeshManager) UpdateTimeStamp(nodeId string) error {
peerToUpdate := peers[0] peerToUpdate := peers[0]
if uint64(time.Now().Unix())-m.store.Clock.GetTimestamp(peerToUpdate) > 3*uint64(m.DaemonConf.KeepAliveTime) { if uint64(time.Now().Unix())-m.store.Clock.GetTimestamp(peerToUpdate) > 3*uint64(m.DaemonConf.HeartBeat) {
m.store.Mark(peerToUpdate) m.store.Mark(peerToUpdate)
if len(peers) < 2 { if len(peers) < 2 {

View File

@ -32,8 +32,8 @@ func setUpTests() *TestParams {
GrpcPort: 0, GrpcPort: 0,
Timeout: 20, Timeout: 20,
Profile: false, Profile: false,
SyncRate: 2, SyncTime: 2,
KeepAliveTime: 10, HeartBeat: 10,
ClusterSize: 32, ClusterSize: 32,
InterClusterChance: 0.15, InterClusterChance: 0.15,
BranchRate: 3, BranchRate: 3,

View File

@ -24,7 +24,7 @@ func (f *TwoPhaseMapFactory) CreateMesh(params *mesh.MeshProviderFactoryParams)
h := fnv.New64a() h := fnv.New64a()
h.Write([]byte(s)) h.Write([]byte(s))
return h.Sum64() return h.Sum64()
}, uint64(3*f.Config.KeepAliveTime)), }, uint64(3*f.Config.HeartBeat)),
}, nil }, nil
} }

View File

@ -4,7 +4,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net" "net"
"net/rpc"
"github.com/miekg/dns" "github.com/miekg/dns"
"github.com/tim-beatham/wgmesh/pkg/ipc" "github.com/tim-beatham/wgmesh/pkg/ipc"
@ -18,7 +17,7 @@ const SockAddr = "/tmp/wgmesh_ipc.sock"
const MeshRegularExpression = `(?P<meshId>.+)\.(?P<alias>.+)\.smeg\.` const MeshRegularExpression = `(?P<meshId>.+)\.(?P<alias>.+)\.smeg\.`
type DNSHandler struct { type DNSHandler struct {
client *rpc.Client client *ipc.ClientIpc
server *dns.Server server *dns.Server
} }
@ -27,7 +26,7 @@ type DNSHandler struct {
func (d *DNSHandler) queryMesh(meshId, alias string) net.IP { func (d *DNSHandler) queryMesh(meshId, alias string) net.IP {
var reply string var reply string
err := d.client.Call("IpcHandler.Query", &ipc.QueryMesh{ err := d.client.Query(ipc.QueryMesh{
MeshId: meshId, MeshId: meshId,
Query: fmt.Sprintf("[?alias == '%s'] | [0]", alias), Query: fmt.Sprintf("[?alias == '%s'] | [0]", alias),
}, &reply) }, &reply)
@ -97,7 +96,7 @@ func (h *DNSHandler) Close() error {
} }
func NewDns(udpPort int) (*DNSHandler, error) { func NewDns(udpPort int) (*DNSHandler, error) {
client, err := rpc.DialHTTP("unix", SockAddr) client, err := ipc.NewClientIpc()
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -5,11 +5,27 @@ import (
"net" "net"
"net/http" "net/http"
"net/rpc" "net/rpc"
ipcRpc "net/rpc"
"os" "os"
"github.com/tim-beatham/wgmesh/pkg/ctrlserver" "github.com/tim-beatham/wgmesh/pkg/ctrlserver"
) )
const SockAddr = "/tmp/wgmesh_sock"
type MeshIpc interface {
CreateMesh(args *NewMeshArgs, reply *string) error
ListMeshes(name string, reply *ListMeshReply) error
JoinMesh(args *JoinMeshArgs, reply *string) error
LeaveMesh(meshId string, reply *string) error
GetMesh(meshId string, reply *GetMeshReply) error
Query(query QueryMesh, reply *string) error
PutDescription(args PutDescriptionArgs, reply *string) error
PutAlias(args PutAliasArgs, reply *string) error
PutService(args PutServiceArgs, reply *string) error
DeleteService(args DeleteServiceArgs, reply *string) error
}
// WireGuardArgs are provided args specific to WireGuard // WireGuardArgs are provided args specific to WireGuard
type WireGuardArgs struct { type WireGuardArgs struct {
// WgPort is the WireGuard port to expose // WgPort is the WireGuard port to expose
@ -39,7 +55,7 @@ type JoinMeshArgs struct {
// MeshId is the ID of the mesh to join // MeshId is the ID of the mesh to join
MeshId string MeshId string
// IpAddress is a routable IP in another mesh // IpAddress is a routable IP in another mesh
IpAdress string IpAddress string
// WgArgs is the WireGuard parameters to use. // WgArgs is the WireGuard parameters to use.
WgArgs WireGuardArgs WgArgs WireGuardArgs
} }
@ -47,6 +63,22 @@ type JoinMeshArgs struct {
type PutServiceArgs struct { type PutServiceArgs struct {
Service string Service string
Value string Value string
MeshId string
}
type DeleteServiceArgs struct {
Service string
MeshId string
}
type PutAliasArgs struct {
Alias string
MeshId string
}
type PutDescriptionArgs struct {
Description string
MeshId string
} }
type GetMeshReply struct { type GetMeshReply struct {
@ -62,20 +94,65 @@ type QueryMesh struct {
Query string Query string
} }
type MeshIpc interface { type ClientIpc struct {
CreateMesh(args *NewMeshArgs, reply *string) error client *ipcRpc.Client
ListMeshes(name string, reply *ListMeshReply) error
JoinMesh(args JoinMeshArgs, reply *string) error
LeaveMesh(meshId string, reply *string) error
GetMesh(meshId string, reply *GetMeshReply) error
Query(query QueryMesh, reply *string) error
PutDescription(description string, reply *string) error
PutAlias(alias string, reply *string) error
PutService(args PutServiceArgs, reply *string) error
DeleteService(service string, reply *string) error
} }
const SockAddr = "/tmp/wgmesh_ipc.sock" func NewClientIpc() (*ClientIpc, error) {
client, err := ipcRpc.DialHTTP("unix", SockAddr)
if err != nil {
return nil, err
}
return &ClientIpc{
client: client,
}, nil
}
func (c *ClientIpc) CreateMesh(args *NewMeshArgs, reply *string) error {
return c.client.Call("IpcHandler.CreateMesh", args, reply)
}
func (c *ClientIpc) ListMeshes(reply *ListMeshReply) error {
return c.client.Call("IpcHandler.ListMeshes", "", reply)
}
func (c *ClientIpc) JoinMesh(args JoinMeshArgs, reply *string) error {
return c.client.Call("IpcHandler.JoinMesh", &args, reply)
}
func (c *ClientIpc) LeaveMesh(meshId string, reply *string) error {
return c.client.Call("IpcHandler.LeaveMesh", &meshId, reply)
}
func (c *ClientIpc) GetMesh(meshId string, reply *GetMeshReply) error {
return c.client.Call("IpcHandler.GetMesh", &meshId, reply)
}
func (c *ClientIpc) Query(query QueryMesh, reply *string) error {
return c.client.Call("IpcHandler.Query", &query, reply)
}
func (c *ClientIpc) PutDescription(args PutDescriptionArgs, reply *string) error {
return c.client.Call("IpcHandler.PutDescription", &args, reply)
}
func (c *ClientIpc) PutAlias(args PutAliasArgs, reply *string) error {
return c.client.Call("IpcHandler.PutAlias", &args, reply)
}
func (c *ClientIpc) PutService(args PutServiceArgs, reply *string) error {
return c.client.Call("IpcHandler.PutService", &args, reply)
}
func (c *ClientIpc) DeleteService(args DeleteServiceArgs, reply *string) error {
return c.client.Call("IpcHandler.DeleteService", &args, reply)
}
func (c *ClientIpc) Close() error {
return c.Close()
}
func RunIpcHandler(server MeshIpc) error { func RunIpcHandler(server MeshIpc) error {
if err := os.RemoveAll(SockAddr); err != nil { if err := os.RemoveAll(SockAddr); err != nil {

View File

@ -24,10 +24,10 @@ type MeshManager interface {
LeaveMesh(meshId string) error LeaveMesh(meshId string) error
GetSelf(meshId string) (MeshNode, error) GetSelf(meshId string) (MeshNode, error)
ApplyConfig() error ApplyConfig() error
SetDescription(description string) error SetDescription(meshId, description string) error
SetAlias(alias string) error SetAlias(meshId, alias string) error
SetService(service string, value string) error SetService(meshId, service, value string) error
RemoveService(service string) error RemoveService(meshId, service string) error
UpdateTimeStamp() error UpdateTimeStamp() error
GetClient() *wgctrl.Client GetClient() *wgctrl.Client
GetMeshes() map[string]MeshProvider GetMeshes() map[string]MeshProvider
@ -61,29 +61,33 @@ func (m *MeshManagerImpl) GetRouteManager() RouteManager {
} }
// RemoveService implements MeshManager. // RemoveService implements MeshManager.
func (m *MeshManagerImpl) RemoveService(service string) error { func (m *MeshManagerImpl) RemoveService(meshId, service string) error {
for _, mesh := range m.Meshes { mesh := m.GetMesh(meshId)
err := mesh.RemoveService(m.HostParameters.GetPublicKey(), service)
if err != nil { if mesh == nil {
return err return fmt.Errorf("mesh %s does not exist", meshId)
}
} }
return nil if !mesh.NodeExists(m.HostParameters.GetPublicKey()) {
return fmt.Errorf("node %s does not exist in the mesh", meshId)
}
return mesh.RemoveService(m.HostParameters.GetPublicKey(), service)
} }
// SetService implements MeshManager. // SetService implements MeshManager.
func (m *MeshManagerImpl) SetService(service string, value string) error { func (m *MeshManagerImpl) SetService(meshId, service, value string) error {
for _, mesh := range m.Meshes { mesh := m.GetMesh(meshId)
err := mesh.AddService(m.HostParameters.GetPublicKey(), service, value)
if err != nil { if mesh == nil {
return err return fmt.Errorf("mesh %s does not exist", meshId)
}
} }
return nil if !mesh.NodeExists(m.HostParameters.GetPublicKey()) {
return fmt.Errorf("node %s does not exist in the mesh", meshId)
}
return mesh.AddService(m.HostParameters.GetPublicKey(), service, value)
} }
func (m *MeshManagerImpl) GetNode(meshid, nodeId string) MeshNode { func (m *MeshManagerImpl) GetNode(meshid, nodeId string) MeshNode {
@ -134,6 +138,10 @@ func (m *MeshManagerImpl) CreateMesh(args *CreateMeshParams) (string, error) {
return "", err return "", err
} }
if *meshConfiguration.Role == conf.CLIENT_ROLE {
return "", fmt.Errorf("cannot create mesh as a client")
}
meshId, err := m.idGenerator.GetId() meshId, err := m.idGenerator.GetId()
var ifName string = "" var ifName string = ""
@ -348,7 +356,6 @@ func (s *MeshManagerImpl) LeaveMesh(meshId string) error {
} }
s.cmdRunner.RunCommands(s.conf.BaseConfiguration.PostDown...) s.cmdRunner.RunCommands(s.conf.BaseConfiguration.PostDown...)
return err return err
} }
@ -373,43 +380,36 @@ func (s *MeshManagerImpl) ApplyConfig() error {
return nil return nil
} }
err := s.configApplyer.ApplyConfig() return s.configApplyer.ApplyConfig()
if err != nil {
return err
} }
return nil func (s *MeshManagerImpl) SetDescription(meshId, description string) error {
mesh := s.GetMesh(meshId)
if mesh == nil {
return fmt.Errorf("mesh %s does not exist", meshId)
} }
func (s *MeshManagerImpl) SetDescription(description string) error { if !mesh.NodeExists(s.HostParameters.GetPublicKey()) {
meshes := s.GetMeshes() return fmt.Errorf("node %s does not exist in the mesh", meshId)
for _, mesh := range meshes {
if mesh.NodeExists(s.HostParameters.GetPublicKey()) {
err := mesh.SetDescription(s.HostParameters.GetPublicKey(), description)
if err != nil {
return err
}
}
} }
return nil return mesh.SetDescription(s.HostParameters.GetPublicKey(), description)
} }
// SetAlias implements MeshManager. // SetAlias implements MeshManager.
func (s *MeshManagerImpl) SetAlias(alias string) error { func (s *MeshManagerImpl) SetAlias(meshId, alias string) error {
meshes := s.GetMeshes() mesh := s.GetMesh(meshId)
for _, mesh := range meshes {
if mesh.NodeExists(s.HostParameters.GetPublicKey()) {
err := mesh.SetAlias(s.HostParameters.GetPublicKey(), alias)
if err != nil { if mesh == nil {
return err return fmt.Errorf("mesh %s does not exist", meshId)
} }
if !mesh.NodeExists(s.HostParameters.GetPublicKey()) {
return fmt.Errorf("node %s does not exist in the mesh", meshId)
} }
}
return nil return mesh.SetAlias(s.HostParameters.GetPublicKey(), alias)
} }
// UpdateTimeStamp updates the timestamp of this node in all meshes // UpdateTimeStamp updates the timestamp of this node in all meshes

View File

@ -24,8 +24,8 @@ func getMeshConfiguration() *conf.DaemonConfiguration {
Timeout: 5, Timeout: 5,
Profile: false, Profile: false,
StubWg: true, StubWg: true,
SyncRate: 2, SyncTime: 2,
KeepAliveTime: 60, HeartBeat: 60,
ClusterSize: 64, ClusterSize: 64,
InterClusterChance: 0.15, InterClusterChance: 0.15,
BranchRate: 3, BranchRate: 3,
@ -213,7 +213,7 @@ func TestLeaveMeshDeletesMesh(t *testing.T) {
} }
} }
func TestSetAlias(t *testing.T) { func TestSetAliasUpdatesAliasOfNode(t *testing.T) {
manager := getMeshManager() manager := getMeshManager()
alias := "Firpo" alias := "Firpo"
@ -221,14 +221,13 @@ func TestSetAlias(t *testing.T) {
Port: 5000, Port: 5000,
Conf: &conf.WgConfiguration{}, Conf: &conf.WgConfiguration{},
}) })
manager.AddSelf(&AddSelfParams{ manager.AddSelf(&AddSelfParams{
MeshId: meshId, MeshId: meshId,
WgPort: 5000, WgPort: 5000,
Endpoint: "abc.com:8080", Endpoint: "abc.com:8080",
}) })
err := manager.SetAlias(alias) err := manager.SetAlias(meshId, alias)
if err != nil { if err != nil {
t.Fatalf(`failed to set the alias`) t.Fatalf(`failed to set the alias`)
@ -245,7 +244,7 @@ func TestSetAlias(t *testing.T) {
} }
} }
func TestSetDescription(t *testing.T) { func TestSetDescriptionSetsTheDescriptionOfTheNode(t *testing.T) {
manager := getMeshManager() manager := getMeshManager()
description := "wooooo" description := "wooooo"
@ -254,23 +253,13 @@ func TestSetDescription(t *testing.T) {
Conf: &conf.WgConfiguration{}, Conf: &conf.WgConfiguration{},
}) })
meshId2, _ := manager.CreateMesh(&CreateMeshParams{
Port: 5001,
Conf: &conf.WgConfiguration{},
})
manager.AddSelf(&AddSelfParams{ manager.AddSelf(&AddSelfParams{
MeshId: meshId1, MeshId: meshId1,
WgPort: 5000, WgPort: 5000,
Endpoint: "abc.com:8080", Endpoint: "abc.com:8080",
}) })
manager.AddSelf(&AddSelfParams{
MeshId: meshId2,
WgPort: 5000,
Endpoint: "abc.com:8080",
})
err := manager.SetDescription(description) err := manager.SetDescription(meshId1, description)
if err != nil { if err != nil {
t.Fatalf(`failed to set the descriptions`) t.Fatalf(`failed to set the descriptions`)
@ -285,18 +274,7 @@ func TestSetDescription(t *testing.T) {
if description != self1.GetDescription() { if description != self1.GetDescription() {
t.Fatalf(`description should be %s was %s`, description, self1.GetDescription()) t.Fatalf(`description should be %s was %s`, description, self1.GetDescription())
} }
self2, err := manager.GetSelf(meshId2)
if err != nil {
t.Fatalf(`failed to set the description`)
} }
if description != self2.GetDescription() {
t.Fatalf(`description should be %s was %s`, description, self2.GetDescription())
}
}
func TestUpdateTimeStampUpdatesAllMeshes(t *testing.T) { func TestUpdateTimeStampUpdatesAllMeshes(t *testing.T) {
manager := getMeshManager() manager := getMeshManager()
@ -327,3 +305,68 @@ func TestUpdateTimeStampUpdatesAllMeshes(t *testing.T) {
t.Fatalf(`failed to update the timestamp`) t.Fatalf(`failed to update the timestamp`)
} }
} }
func TestAddServiceAddsServiceToTheMesh(t *testing.T) {
manager := getMeshManager()
meshId1, _ := manager.CreateMesh(&CreateMeshParams{
Port: 5000,
Conf: &conf.WgConfiguration{},
})
manager.AddSelf(&AddSelfParams{
MeshId: meshId1,
WgPort: 5000,
Endpoint: "abc.com:8080",
})
serviceName := "hello"
manager.SetService(meshId1, serviceName, "dave")
self, err := manager.GetSelf(meshId1)
if err != nil {
t.Fatalf(`error thrown %s:`, err.Error())
}
if _, ok := self.GetServices()[serviceName]; !ok {
t.Fatalf(`service not added`)
}
}
func TestRemoveServiceRemovesTheServiceFromTheMesh(t *testing.T) {
manager := getMeshManager()
meshId1, _ := manager.CreateMesh(&CreateMeshParams{
Port: 5000,
Conf: &conf.WgConfiguration{},
})
manager.AddSelf(&AddSelfParams{
MeshId: meshId1,
WgPort: 5000,
Endpoint: "abc.com:8080",
})
serviceName := "hello"
manager.SetService(meshId1, serviceName, "dave")
self, err := manager.GetSelf(meshId1)
if err != nil {
t.Fatalf(`error thrown %s:`, err.Error())
}
if _, ok := self.GetServices()[serviceName]; !ok {
t.Fatalf(`service not added`)
}
manager.RemoveService(meshId1, serviceName)
self, err = manager.GetSelf(meshId1)
if err != nil {
t.Fatalf(`error thrown %s:`, err.Error())
}
if _, ok := self.GetServices()[serviceName]; ok {
t.Fatalf(`service still exists`)
}
}

View File

@ -30,8 +30,8 @@ func (*MeshNodeStub) GetType() conf.NodeType {
} }
// GetServices implements MeshNode. // GetServices implements MeshNode.
func (*MeshNodeStub) GetServices() map[string]string { func (m *MeshNodeStub) GetServices() map[string]string {
return make(map[string]string) return m.services
} }
// GetAlias implements MeshNode. // GetAlias implements MeshNode.
@ -249,6 +249,7 @@ func (s *StubNodeFactory) Build(params *MeshNodeFactoryParams) MeshNode {
routes: make([]Route, 0), routes: make([]Route, 0),
identifier: "abc", identifier: "abc",
description: "A Mesh Node Stub", description: "A Mesh Node Stub",
services: make(map[string]string),
} }
} }
@ -271,32 +272,32 @@ type MeshManagerStub struct {
// GetRouteManager implements MeshManager. // GetRouteManager implements MeshManager.
func (*MeshManagerStub) GetRouteManager() RouteManager { func (*MeshManagerStub) GetRouteManager() RouteManager {
panic("unimplemented") return nil
} }
// GetNode implements MeshManager. // GetNode implements MeshManager.
func (*MeshManagerStub) GetNode(string, string) MeshNode { func (*MeshManagerStub) GetNode(meshId, nodeId string) MeshNode {
panic("unimplemented") return nil
} }
// RemoveService implements MeshManager. // RemoveService implements MeshManager.
func (*MeshManagerStub) RemoveService(service string) error { func (*MeshManagerStub) RemoveService(meshId, service string) error {
panic("unimplemented") return nil
} }
// SetService implements MeshManager. // SetService implements MeshManager.
func (*MeshManagerStub) SetService(service string, value string) error { func (*MeshManagerStub) SetService(meshId, service, value string) error {
panic("unimplemented") return nil
} }
// SetAlias implements MeshManager. // SetAlias implements MeshManager.
func (*MeshManagerStub) SetAlias(alias string) error { func (*MeshManagerStub) SetAlias(meshId, alias string) error {
panic("unimplemented") return nil
} }
// Close implements MeshManager. // Close implements MeshManager.
func (*MeshManagerStub) Close() error { func (*MeshManagerStub) Close() error {
panic("unimplemented") return nil
} }
// Prune implements MeshManager. // Prune implements MeshManager.
@ -348,7 +349,7 @@ func (m *MeshManagerStub) ApplyConfig() error {
return nil return nil
} }
func (m *MeshManagerStub) SetDescription(description string) error { func (m *MeshManagerStub) SetDescription(meshId, description string) error {
return nil return nil
} }

View File

@ -43,10 +43,6 @@ func getOverrideConfiguration(args *ipc.WireGuardArgs) conf.WgConfiguration {
func (n *IpcHandler) CreateMesh(args *ipc.NewMeshArgs, reply *string) error { func (n *IpcHandler) CreateMesh(args *ipc.NewMeshArgs, reply *string) error {
overrideConf := getOverrideConfiguration(&args.WgArgs) overrideConf := getOverrideConfiguration(&args.WgArgs)
if overrideConf.Role != nil && *overrideConf.Role == conf.CLIENT_ROLE {
return fmt.Errorf("cannot create a mesh with no public endpoint")
}
meshId, err := n.Server.GetMeshManager().CreateMesh(&mesh.CreateMeshParams{ meshId, err := n.Server.GetMeshManager().CreateMesh(&mesh.CreateMeshParams{
Port: args.WgArgs.WgPort, Port: args.WgArgs.WgPort,
Conf: &overrideConf, Conf: &overrideConf,
@ -83,10 +79,14 @@ func (n *IpcHandler) ListMeshes(_ string, reply *ipc.ListMeshReply) error {
return nil return nil
} }
func (n *IpcHandler) JoinMesh(args ipc.JoinMeshArgs, reply *string) error { func (n *IpcHandler) JoinMesh(args *ipc.JoinMeshArgs, reply *string) error {
overrideConf := getOverrideConfiguration(&args.WgArgs) overrideConf := getOverrideConfiguration(&args.WgArgs)
peerConnection, err := n.Server.GetConnectionManager().GetConnection(args.IpAdress) if n.Server.GetMeshManager().GetMesh(args.MeshId) != nil {
return fmt.Errorf("user is already apart of the mesh")
}
peerConnection, err := n.Server.GetConnectionManager().GetConnection(args.IpAddress)
if err != nil { if err != nil {
return err return err
@ -147,7 +147,6 @@ func (n *IpcHandler) LeaveMesh(meshId string, reply *string) error {
if err == nil { if err == nil {
*reply = fmt.Sprintf("Left Mesh %s", meshId) *reply = fmt.Sprintf("Left Mesh %s", meshId)
} }
return err return err
} }
@ -193,30 +192,34 @@ func (n *IpcHandler) Query(params ipc.QueryMesh, reply *string) error {
return nil return nil
} }
func (n *IpcHandler) PutDescription(description string, reply *string) error { func (n *IpcHandler) PutDescription(args ipc.PutDescriptionArgs, reply *string) error {
err := n.Server.GetMeshManager().SetDescription(description) err := n.Server.GetMeshManager().SetDescription(args.MeshId, args.Description)
if err != nil { if err != nil {
return err return err
} }
*reply = fmt.Sprintf("Set description to %s", description) *reply = fmt.Sprintf("set description to %s for %s", args.Description, args.MeshId)
return nil return nil
} }
func (n *IpcHandler) PutAlias(alias string, reply *string) error { func (n *IpcHandler) PutAlias(args ipc.PutAliasArgs, reply *string) error {
err := n.Server.GetMeshManager().SetAlias(alias) if args.Alias == "" {
return fmt.Errorf("alias not provided")
}
err := n.Server.GetMeshManager().SetAlias(args.MeshId, args.Alias)
if err != nil { if err != nil {
return err return err
} }
*reply = fmt.Sprintf("Set alias to %s", alias) *reply = fmt.Sprintf("Set alias to %s", args.Alias)
return nil return nil
} }
func (n *IpcHandler) PutService(service ipc.PutServiceArgs, reply *string) error { func (n *IpcHandler) PutService(service ipc.PutServiceArgs, reply *string) error {
err := n.Server.GetMeshManager().SetService(service.Service, service.Value) err := n.Server.GetMeshManager().SetService(service.MeshId, service.Service, service.Value)
if err != nil { if err != nil {
return err return err
@ -226,8 +229,8 @@ func (n *IpcHandler) PutService(service ipc.PutServiceArgs, reply *string) error
return nil return nil
} }
func (n *IpcHandler) DeleteService(service string, reply *string) error { func (n *IpcHandler) DeleteService(service ipc.DeleteServiceArgs, reply *string) error {
err := n.Server.GetMeshManager().RemoveService(service) err := n.Server.GetMeshManager().RemoveService(service.MeshId, service.Service)
if err != nil { if err != nil {
return err return err

View File

@ -27,7 +27,7 @@ type SyncerImpl struct {
syncCount int syncCount int
cluster conn.ConnCluster cluster conn.ConnCluster
conf *conf.DaemonConfiguration conf *conf.DaemonConfiguration
lastSync map[string]uint64 lastSync map[string]int64
} }
// Sync: Sync with random nodes // Sync: Sync with random nodes
@ -54,8 +54,8 @@ func (s *SyncerImpl) Sync(correspondingMesh mesh.MeshProvider) error {
if self.GetType() == conf.PEER_ROLE && !correspondingMesh.HasChanges() && s.infectionCount == 0 { if self.GetType() == conf.PEER_ROLE && !correspondingMesh.HasChanges() && s.infectionCount == 0 {
logging.Log.WriteInfof("no changes for %s", correspondingMesh.GetMeshId()) logging.Log.WriteInfof("no changes for %s", correspondingMesh.GetMeshId())
// If not synchronised in certain pull from random neighbour // If not synchronised in certain time pull from random neighbour
if uint64(time.Now().Unix())-s.lastSync[correspondingMesh.GetMeshId()] > 20 { if s.conf.PullTime != 0 && time.Now().Unix()-s.lastSync[correspondingMesh.GetMeshId()] > int64(s.conf.PullTime) {
return s.Pull(self, correspondingMesh) return s.Pull(self, correspondingMesh)
} }
@ -84,7 +84,9 @@ func (s *SyncerImpl) Sync(correspondingMesh mesh.MeshProvider) error {
return nil return nil
} }
redundancyLength := min(len(neighbours), 3) // Peer with 2 nodes so that there is redundnacy in
// the situation the node leaves pre-emptively
redundancyLength := min(len(neighbours), 2)
gossipNodes = neighbours[:redundancyLength] gossipNodes = neighbours[:redundancyLength]
} else { } else {
neighbours := s.cluster.GetNeighbours(nodeNames, publicKey.String()) neighbours := s.cluster.GetNeighbours(nodeNames, publicKey.String())
@ -113,24 +115,23 @@ func (s *SyncerImpl) Sync(correspondingMesh mesh.MeshProvider) error {
} }
if err != nil { if err != nil {
logging.Log.WriteInfof(err.Error()) logging.Log.WriteErrorf(err.Error())
} }
} }
s.syncCount++ s.syncCount++
logging.Log.WriteInfof("SYNC TIME: %v", time.Since(before)) logging.Log.WriteInfof("sync time: %v", time.Since(before))
logging.Log.WriteInfof("SYNC COUNT: %d", s.syncCount) logging.Log.WriteInfof("number of syncs: %d", s.syncCount)
s.infectionCount = ((s.conf.InfectionCount + s.infectionCount - 1) % s.conf.InfectionCount) s.infectionCount = ((s.conf.InfectionCount + s.infectionCount - 1) % s.conf.InfectionCount)
if !succeeded { if !succeeded {
// If could not gossip with anyone then repeat.
s.infectionCount++ s.infectionCount++
} }
correspondingMesh.SaveChanges() correspondingMesh.SaveChanges()
s.lastSync[correspondingMesh.GetMeshId()] = uint64(time.Now().Unix()) s.lastSync[correspondingMesh.GetMeshId()] = time.Now().Unix()
return nil return nil
} }
@ -148,7 +149,7 @@ func (s *SyncerImpl) Pull(self mesh.MeshNode, mesh mesh.MeshProvider) error {
return nil return nil
} }
logging.Log.WriteInfof("PULLING from node %s", neighbour[0]) logging.Log.WriteInfof("pulling from node %s", neighbour[0])
pullNode, err := mesh.GetNode(neighbour[0]) pullNode, err := mesh.GetNode(neighbour[0])
@ -159,7 +160,7 @@ func (s *SyncerImpl) Pull(self mesh.MeshNode, mesh mesh.MeshProvider) error {
err = s.requester.SyncMesh(mesh.GetMeshId(), pullNode) err = s.requester.SyncMesh(mesh.GetMeshId(), pullNode)
if err == nil || err == io.EOF { if err == nil || err == io.EOF {
s.lastSync[mesh.GetMeshId()] = uint64(time.Now().Unix()) s.lastSync[mesh.GetMeshId()] = time.Now().Unix()
} else { } else {
return err return err
} }
@ -206,5 +207,5 @@ func NewSyncer(m mesh.MeshManager, conf *conf.DaemonConfiguration, r SyncRequest
infectionCount: 0, infectionCount: 0,
syncCount: 0, syncCount: 0,
cluster: cluster, cluster: cluster,
lastSync: make(map[string]uint64)} lastSync: make(map[string]int64)}
} }

View File

@ -91,7 +91,7 @@ func (s *SyncRequesterImpl) SyncMesh(meshId string, meshNode mesh.MeshNode) erro
c := rpc.NewSyncServiceClient(client) c := rpc.NewSyncServiceClient(client)
syncTimeOut := float64(s.server.Conf.SyncRate) * float64(time.Second) syncTimeOut := float64(s.server.Conf.SyncTime) * float64(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(syncTimeOut)) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(syncTimeOut))
defer cancel() defer cancel()
@ -99,11 +99,11 @@ func (s *SyncRequesterImpl) SyncMesh(meshId string, meshNode mesh.MeshNode) erro
err = s.syncMesh(mesh, ctx, c) err = s.syncMesh(mesh, ctx, c)
if err != nil { if err != nil {
return s.handleErr(meshId, pubKey.String(), err) s.handleErr(meshId, pubKey.String(), err)
} }
logging.Log.WriteInfof("Synced with node: %s meshId: %s\n", endpoint, meshId) logging.Log.WriteInfof("Synced with node: %s meshId: %s\n", endpoint, meshId)
return nil return err
} }
func (s *SyncRequesterImpl) syncMesh(mesh mesh.MeshProvider, ctx context.Context, client rpc.SyncServiceClient) error { func (s *SyncRequesterImpl) syncMesh(mesh mesh.MeshProvider, ctx context.Context, client rpc.SyncServiceClient) error {

View File

@ -14,5 +14,5 @@ func syncFunction(syncer Syncer) lib.TimerFunc {
} }
func NewSyncScheduler(s *ctrlserver.MeshCtrlServer, syncRequester SyncRequester, syncer Syncer) *lib.Timer { func NewSyncScheduler(s *ctrlserver.MeshCtrlServer, syncRequester SyncRequester, syncer Syncer) *lib.Timer {
return lib.NewTimer(syncFunction(syncer), s.Conf.SyncRate) return lib.NewTimer(syncFunction(syncer), s.Conf.SyncTime)
} }

View File

@ -11,5 +11,5 @@ func NewTimestampScheduler(ctrlServer *ctrlserver.MeshCtrlServer) lib.Timer {
logging.Log.WriteInfof("Updated Timestamp") logging.Log.WriteInfof("Updated Timestamp")
return ctrlServer.MeshManager.UpdateTimeStamp() return ctrlServer.MeshManager.UpdateTimeStamp()
} }
return *lib.NewTimer(timerFunc, ctrlServer.Conf.KeepAliveTime) return *lib.NewTimer(timerFunc, ctrlServer.Conf.HeartBeat)
} }