CRDTs fully incorporated

This commit is contained in:
Tim Beatham 2023-10-06 18:25:38 +01:00
parent 47e260e310
commit e729c5b181
5 changed files with 61 additions and 129 deletions

View File

@ -7,8 +7,8 @@ import (
"os"
"github.com/akamensky/argparse"
"github.com/tim-beatham/wgmesh/pkg/ctrlserver"
"github.com/tim-beatham/wgmesh/pkg/ipc"
logging "github.com/tim-beatham/wgmesh/pkg/log"
)
const SockAddr = "/tmp/wgmesh_ipc.sock"
@ -25,16 +25,17 @@ func createMesh(client *ipcRpc.Client) string {
}
func listMeshes(client *ipcRpc.Client) {
var reply map[string]ctrlserver.Mesh
reply := new(ipc.ListMeshReply)
err := client.Call("RobinIpc.ListMeshes", "", &reply)
if err != nil {
logging.ErrorLog.Println(err.Error())
return
}
for sharedKey := range reply {
fmt.Println(sharedKey)
for _, meshId := range reply.Meshes {
fmt.Println(meshId)
}
}
@ -64,8 +65,8 @@ func getMesh(client *ipcRpc.Client, meshId string) {
for _, node := range reply.Nodes {
fmt.Println("Public Key: " + node.PublicKey)
fmt.Println("WireGuard Endpoint: " + node.WgEndpoint)
fmt.Println("Control Endpoint: " + node.HostEndpoint)
fmt.Println("WireGuard Endpoint: " + node.WgEndpoint)
fmt.Println("Wg IP: " + node.WgHost)
fmt.Println("---")
}

View File

@ -11,7 +11,7 @@ import (
// CrdtNodeManager manages nodes in the crdt mesh
type CrdtNodeManager struct {
meshId string
MeshId string
IfName string
Client *wgctrl.Client
doc *automerge.Doc
@ -72,7 +72,7 @@ func (c *CrdtNodeManager) SaveChanges() []byte {
// NewCrdtNodeManager: Create a new crdt node manager
func NewCrdtNodeManager(meshId, devName string, client *wgctrl.Client) *CrdtNodeManager {
var manager CrdtNodeManager
manager.meshId = meshId
manager.MeshId = meshId
manager.doc = automerge.New()
manager.IfName = devName
manager.Client = client

View File

@ -8,7 +8,6 @@ import (
"os"
crdt "github.com/tim-beatham/wgmesh/pkg/automerge"
"github.com/tim-beatham/wgmesh/pkg/ctrlserver"
)
type JoinMeshArgs struct {
@ -20,9 +19,13 @@ type GetMeshReply struct {
Nodes []crdt.MeshNodeCrdt
}
type ListMeshReply struct {
Meshes []string
}
type MeshIpc interface {
CreateMesh(name string, reply *string) error
ListMeshes(name string, reply *map[string]ctrlserver.Mesh) error
ListMeshes(name string, reply *ListMeshReply) error
JoinMesh(args JoinMeshArgs, reply *string) error
GetMesh(meshId string, reply *GetMeshReply) error
EnableInterface(meshId string, reply *string) error

View File

@ -53,121 +53,17 @@ func (n *RobinIpc) CreateMesh(name string, reply *string) error {
return nil
}
func (n *RobinIpc) ListMeshes(name string, reply *map[string]ctrlserver.Mesh) error {
// *reply = n.Server.Meshes
return nil
}
func (n *RobinIpc) ListMeshes(_ string, reply *ipc.ListMeshReply) error {
meshNames := make([]string, len(n.Server.MeshManager.Meshes))
func updateMesh(n *RobinIpc, meshId string, endPoint string) error {
// peerConn, err := n.Server.ConnectionManager.GetConnection(endPoint)
i := 0
for _, mesh := range n.Server.MeshManager.Meshes {
meshNames[i] = mesh.MeshId
fmt.Println(meshNames[i])
i++
}
// if err != nil {
// return err
// }
// conn, err := peerConn.GetClient()
// c := rpc.NewMeshCtrlServerClient(conn)
// authContext, err := peerConn.CreateAuthContext(meshId)
// if err != nil {
// return err
// }
// ctx, cancel := context.WithTimeout(authContext, time.Second)
// defer cancel()
// getMeshReq := rpc.GetMeshRequest{
// MeshId: meshId,
// }
// r, err := c.GetMesh(ctx, &getMeshReq)
// if err != nil {
// return err
// }
// key, err := wgtypes.ParseKey(meshId)
// if err != nil {
// return err
// }
// err := n.Server.MeshManager.AddMesh(meshId, "wgmesh", r.Mesh)
// if err != nil {
// return err
// }
return nil
}
func updatePeer(n *RobinIpc, node ctrlserver.MeshNode, wgHost string, meshId string) error {
// // err := n.Authenticate(meshId, node.HostEndpoint)
// // if err != nil {
// // return err
// // }
// peerConnection, err := n.Server.ConnectionManager.GetConnection(node.HostEndpoint)
// if err != nil {
// return err
// }
// conn, err := peerConnection.GetClient()
// if err != nil {
// return err
// }
// c := rpc.NewMeshCtrlServerClient(conn)
// authContext, err := peerConnection.CreateAuthContext(meshId)
// if err != nil {
// return err
// }
// ctx, cancel := context.WithTimeout(authContext, time.Second)
// defer cancel()
// dev := n.Server.GetDevice()
// joinMeshReq := rpc.JoinMeshRequest{
// MeshId: meshId,
// HostPort: 8080,
// PublicKey: dev.PublicKey.String(),
// WgPort: int32(dev.ListenPort),
// WgIp: wgHost + "/128",
// }
// r, err := c.JoinMesh(ctx, &joinMeshReq)
// if err != nil {
// return err
// }
// if !r.GetSuccess() {
// return errors.New("Could not join the mesh")
// }
return nil
}
func updatePeers(n *RobinIpc, meshId string, wgHost string, nodesToExclude []string) error {
// for _, node := range n.Server.Meshes[meshId].Nodes {
// nodeEndpoint := node.HostEndpoint
// if !slices.Contains(nodesToExclude, nodeEndpoint) {
// Best effort service
// err := updatePeer(n, node, wgHost, meshId)
// if err != nil {
// return err
// }
// }
// }
*reply = ipc.ListMeshReply{Meshes: meshNames}
return nil
}
@ -187,14 +83,46 @@ func (n *RobinIpc) Authenticate(meshId, endpoint string) error {
return err
}
func (n *RobinIpc) JoinMesh(args ipc.JoinMeshArgs, reply *string) error {
err := n.Authenticate(args.MeshId, args.IpAdress+":8080")
func (n *RobinIpc) updatePeers(meshId string) error {
theMesh := n.Server.MeshManager.GetMesh(meshId)
if theMesh == nil {
return errors.New("the mesh does not exist")
}
snapshot, _ := theMesh.GetCrdt()
publicKey, err := n.Server.MeshManager.GetPublicKey(meshId)
if err != nil {
return err
}
peerConnection, err := n.Server.ConnectionManager.GetConnection(args.IpAdress + ":8080")
for nodeKey, node := range snapshot.Nodes {
logging.InfoLog.Println(nodeKey)
if nodeKey == publicKey.String() {
continue
}
var reply string
err := n.JoinMesh(ipc.JoinMeshArgs{MeshId: meshId, IpAdress: node.HostEndpoint}, &reply)
if err != nil {
logging.InfoLog.Println(err)
return err
}
}
return nil
}
func (n *RobinIpc) JoinMesh(args ipc.JoinMeshArgs, reply *string) error {
err := n.Authenticate(args.MeshId, args.IpAdress)
if err != nil {
return err
}
peerConnection, err := n.Server.ConnectionManager.GetConnection(args.IpAdress)
if err != nil {
return err
@ -273,12 +201,9 @@ func (n *RobinIpc) JoinMesh(args ipc.JoinMeshArgs, reply *string) error {
}
if joinReply.GetSuccess() {
updateMesh(n, args.MeshId, args.IpAdress+":8080")
err = n.updatePeers(args.MeshId)
}
if joinReply.GetSuccess() {
err = updatePeers(n, args.MeshId, ipAddr.String(), make([]string, 0))
}
*reply = strconv.FormatBool(joinReply.GetSuccess())
return nil
}

View File

@ -5,6 +5,7 @@ import (
"errors"
"github.com/tim-beatham/wgmesh/pkg/ctrlserver"
logging "github.com/tim-beatham/wgmesh/pkg/log"
"github.com/tim-beatham/wgmesh/pkg/rpc"
)
@ -55,6 +56,8 @@ func (m *RobinRpc) GetMesh(ctx context.Context, request *rpc.GetMeshRequest) (*r
func (m *RobinRpc) JoinMesh(ctx context.Context, request *rpc.JoinMeshRequest) (*rpc.JoinMeshReply, error) {
mesh := m.Server.MeshManager.GetMesh(request.MeshId)
logging.InfoLog.Println("[JOINING MESH]: " + request.MeshId)
if mesh == nil {
return nil, errors.New("mesh does not exist")
}