1
0
forked from extern/smegmesh

Compare commits

...

7 Commits

Author SHA1 Message Date
843caddf6b Implemented clustering betweeen nodes 2023-11-03 15:24:18 +00:00
8d8a13d6ff Build error forgot to add query.go 2023-11-01 13:17:58 +00:00
5183edc592 Merge pull request #6 from tim-beatham/5-ability-to-add-alias-and-description-to-services
Ability for each node to set their description describing what service they provide.
2023-11-01 11:58:52 +00:00
d462d95d6d Ability for each node to set their description describing what
service they provide.
2023-11-01 11:58:10 +00:00
8e50848043 Merge pull request #4 from tim-beatham/3-periodically-update-the-wg-configuration
Fixing an issue where packets are dropped each time
2023-11-01 10:42:56 +00:00
e63edea763 Fixing an issue where packets are dropped each time
we change wg configuration
2023-11-01 10:39:46 +00:00
a1caf2e8ae Merge pull request #2 from tim-beatham/1-log-key-events-in-the-mesh
1 log key events in the mesh
2023-10-31 10:37:59 +00:00
23 changed files with 358 additions and 134 deletions

View File

@ -157,6 +157,20 @@ func queryMesh(client *ipcRpc.Client, meshId, query string) {
fmt.Println(reply)
}
// putDescription: puts updates the description about the node to the meshes
func putDescription(client *ipcRpc.Client, description string) {
var reply string
err := client.Call("IpcHandler.PutDescription", &description, &reply)
if err != nil {
fmt.Println(err.Error())
return
}
fmt.Println(reply)
}
func main() {
parser := argparse.NewParser("wg-mesh",
"wg-mesh Manipulate WireGuard meshes")
@ -164,11 +178,12 @@ func main() {
newMeshCmd := parser.NewCommand("new-mesh", "Create a new mesh")
listMeshCmd := parser.NewCommand("list-meshes", "List meshes the node is connected to")
joinMeshCmd := parser.NewCommand("join-mesh", "Join a mesh network")
getMeshCmd := parser.NewCommand("get-mesh", "Get a mesh network")
// getMeshCmd := parser.NewCommand("get-mesh", "Get a mesh network")
enableInterfaceCmd := parser.NewCommand("enable-interface", "Enable A Specific Mesh Interface")
getGraphCmd := parser.NewCommand("get-graph", "Convert a mesh into DOT format")
leaveMeshCmd := parser.NewCommand("leave-mesh", "Leave a mesh network")
queryMeshCmd := parser.NewCommand("query-mesh", "Query a mesh network using JMESPath")
putDescriptionCmd := parser.NewCommand("put-description", "Place a description for the node")
var newMeshIfName *string = newMeshCmd.String("f", "ifname", &argparse.Options{Required: true})
var newMeshPort *int = newMeshCmd.Int("p", "wgport", &argparse.Options{Required: true})
@ -180,8 +195,10 @@ func main() {
var joinMeshPort *int = joinMeshCmd.Int("p", "wgport", &argparse.Options{Required: true})
var joinMeshEndpoint *string = joinMeshCmd.String("e", "endpoint", &argparse.Options{})
var getMeshId *string = getMeshCmd.String("m", "mesh", &argparse.Options{Required: true})
// var getMeshId *string = getMeshCmd.String("m", "mesh", &argparse.Options{Required: true})
var enableInterfaceMeshId *string = enableInterfaceCmd.String("m", "mesh", &argparse.Options{Required: true})
var getGraphMeshId *string = getGraphCmd.String("m", "mesh", &argparse.Options{Required: true})
var leaveMeshMeshId *string = leaveMeshCmd.String("m", "mesh", &argparse.Options{Required: true})
@ -189,6 +206,8 @@ func main() {
var queryMeshMeshId *string = queryMeshCmd.String("m", "mesh", &argparse.Options{Required: true})
var queryMeshQuery *string = queryMeshCmd.String("q", "query", &argparse.Options{Required: true})
var description *string = putDescriptionCmd.String("d", "description", &argparse.Options{Required: true})
err := parser.Parse(os.Args)
if err != nil {
@ -226,9 +245,9 @@ func main() {
}))
}
if getMeshCmd.Happened() {
getMesh(client, *getMeshId)
}
// if getMeshCmd.Happened() {
// getMesh(client, *getMeshId)
// }
if getGraphCmd.Happened() {
getGraph(client, *getGraphMeshId)
@ -245,4 +264,8 @@ func main() {
if queryMeshCmd.Happened() {
queryMesh(client, *queryMeshMeshId, *queryMeshQuery)
}
if putDescriptionCmd.Happened() {
putDescription(client, *description)
}
}

View File

@ -2,5 +2,11 @@ certificatePath: "/wgmesh/cert/cert.pem"
privateKeyPath: "/wgmesh/cert/priv.pem"
caCertificatePath: "/wgmesh/cert/cacert.pem"
skipCertVerification: true
gRPCPort: "8080"
advertiseRoutes: true
gRPCPort: "21906"
advertiseRoutes: true
clusterSize: 32
syncRate: 1
interClusterChance: 0.15
branchRate: 3
infectionCount: 3
keepAliveRate: 60

View File

@ -3,6 +3,7 @@ package main
import (
"log"
"os"
"os/signal"
"github.com/tim-beatham/wgmesh/pkg/conf"
ctrlserver "github.com/tim-beatham/wgmesh/pkg/ctrlserver"
@ -50,8 +51,8 @@ func main() {
ctrlServer, err := ctrlserver.NewCtrlServer(&ctrlServerParams)
syncProvider.Server = ctrlServer
syncRequester := sync.NewSyncRequester(ctrlServer)
syncScheduler := sync.NewSyncScheduler(ctrlServer, syncRequester, 2)
timestampScheduler := timestamp.NewTimestampScheduler(ctrlServer, 60)
syncScheduler := sync.NewSyncScheduler(ctrlServer, syncRequester)
timestampScheduler := timestamp.NewTimestampScheduler(ctrlServer)
robinIpcParams := robin.RobinIpcParams{
CtrlServer: ctrlServer,
@ -71,16 +72,30 @@ func main() {
go syncScheduler.Run()
go timestampScheduler.Run()
closeResources := func() {
logging.Log.WriteInfof("Closing resources")
syncScheduler.Stop()
timestampScheduler.Stop()
ctrlServer.Close()
client.Close()
}
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
for range c {
closeResources()
os.Exit(0)
}
}()
err = ctrlServer.ConnectionServer.Listen()
if err != nil {
logging.Log.WriteErrorf(err.Error())
return
}
defer syncScheduler.Stop()
defer timestampScheduler.Stop()
defer ctrlServer.Close()
defer client.Close()
go closeResources()
}

2
go.mod
View File

@ -6,6 +6,7 @@ require (
github.com/akamensky/argparse v1.4.0
github.com/automerge/automerge-go v0.0.0-20230903201930-b80ce8aadbb9
github.com/google/uuid v1.3.0
github.com/jmespath/go-jmespath v0.4.0
github.com/sirupsen/logrus v1.9.3
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20230429144221-925a1e7659e6
google.golang.org/grpc v1.58.1
@ -16,7 +17,6 @@ require (
require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/native v1.1.0 // indirect
github.com/mdlayher/genetlink v1.3.2 // indirect
github.com/mdlayher/netlink v1.7.2 // indirect

62
go.sum
View File

@ -1,62 +0,0 @@
github.com/akamensky/argparse v1.4.0 h1:YGzvsTqCvbEZhL8zZu2AiA5nq805NZh75JNj4ajn1xc=
github.com/akamensky/argparse v1.4.0/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA=
github.com/automerge/automerge-go v0.0.0-20230903201930-b80ce8aadbb9 h1:+6JSfuxZgmURoIlGdnYnY/FLRGWGagLyiBjt/VLtwi4=
github.com/automerge/automerge-go v0.0.0-20230903201930-b80ce8aadbb9/go.mod h1:6UxoDE+thWsISXK93pxaOuOfkcAfCvDbg0eAnFmxL5E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA=
github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
github.com/mdlayher/genetlink v1.3.2 h1:KdrNKe+CTu+IbZnm/GVUMXSqBBLqcGpRDa0xkQy56gw=
github.com/mdlayher/genetlink v1.3.2/go.mod h1:tcC3pkCrPUGIKKsCsp0B3AdaaKuHtaxoJRz3cc+528o=
github.com/mdlayher/netlink v1.7.2 h1:/UtM3ofJap7Vl4QWCPDGXY8d3GIY2UGSDbK+QWmY8/g=
github.com/mdlayher/netlink v1.7.2/go.mod h1:xraEF7uJbxLhc5fpHL4cPe221LI2bdttWlU+ZGLfQSw=
github.com/mdlayher/socket v0.5.0 h1:ilICZmJcQz70vrWVes1MFera4jGiWNocSkykwwoy3XI=
github.com/mdlayher/socket v0.5.0/go.mod h1:WkcBFfvyG8QENs5+hfQPl1X6Jpd2yeLIYgrGFmJiJxI=
github.com/mikioh/ipaddr v0.0.0-20190404000644-d465c8ab6721 h1:RlZweED6sbSArvlE924+mUcZuXKLBHA35U7LN621Bws=
github.com/mikioh/ipaddr v0.0.0-20190404000644-d465c8ab6721/go.mod h1:Ickgr2WtCLZ2MDGd4Gr0geeCH5HybhRJbonOgQpvSxc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.zx2c4.com/wireguard v0.0.0-20230704135630-469159ecf7d1 h1:EY138uSo1JYlDq+97u1FtcOUwPpIU6WL1Lkt7WpYjPA=
golang.zx2c4.com/wireguard v0.0.0-20230704135630-469159ecf7d1/go.mod h1:tqur9LnfstdR9ep2LaJT4lFUl0EjlHtge+gAjmsHUG4=
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20230429144221-925a1e7659e6 h1:CawjfCvYQH2OU3/TnxLx97WDSUDRABfT18pCOYwc2GE=
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20230429144221-925a1e7659e6/go.mod h1:3rxYc4HtVcSG9gVaTs2GEBdehh+sYPOwKtyUWEOTb80=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 h1:bVf09lpb+OJbByTj913DRJioFFAjf/ZGxEz7MajTp2U=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM=
google.golang.org/grpc v1.58.1 h1:OL+Vz23DTtrrldqHK49FUOPHyY75rvFqJfXC84NYW58=
google.golang.org/grpc v1.58.1/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -2,6 +2,7 @@ package crdt
import (
"errors"
"fmt"
"net"
"strings"
"time"
@ -161,7 +162,27 @@ func (m *CrdtMeshManager) UpdateTimeStamp(nodeId string) error {
err = node.Map().Set("timestamp", time.Now().Unix())
if err == nil {
logging.Log.WriteInfof("Timestamp Updated for %s", m.MeshId)
logging.Log.WriteInfof("Timestamp Updated for %s", nodeId)
}
return err
}
func (m *CrdtMeshManager) SetDescription(nodeId string, description string) error {
node, err := m.doc.Path("nodes").Map().Get(nodeId)
if err != nil {
return err
}
if node.Kind() != automerge.KindMap {
return errors.New(fmt.Sprintf("%s does not exist", nodeId))
}
err = node.Map().Set("description", description)
if err == nil {
logging.Log.WriteInfof("Description Updated for %s", nodeId)
}
return err
@ -232,6 +253,10 @@ func (m *MeshNodeCrdt) GetRoutes() []string {
return lib.MapKeys(m.Routes)
}
func (m *MeshNodeCrdt) GetDescription() string {
return m.Description
}
func (m *MeshNodeCrdt) GetIdentifier() string {
ipv6 := m.WgHost[:len(m.WgHost)-4]
@ -252,6 +277,7 @@ func (m *MeshCrdt) GetNodes() map[string]mesh.MeshNode {
WgHost: node.WgHost,
Timestamp: node.Timestamp,
Routes: node.Routes,
Description: node.Description,
}
}

View File

@ -8,6 +8,7 @@ type MeshNodeCrdt struct {
WgHost string `automerge:"wgHost"`
Timestamp int64 `automerge:"timestamp"`
Routes map[string]interface{} `automerge:"routes"`
Description string `automerge:"description"`
}
// MeshCrdt: Represents the mesh network as a whole

View File

@ -24,7 +24,13 @@ type WgMeshConfiguration struct {
AdvertiseRoutes bool `yaml:"advertiseRoutes"`
// Endpoint is the IP in which this computer is publicly reachable.
// usecase is when the node has multiple IP addresses
Endpoint string `yaml:"publicEndpoint"`
Endpoint string `yaml:"publicEndpoint"`
ClusterSize int `yaml:"clusterSize"`
SyncRate float64 `yaml:"syncRate"`
InterClusterChance float64 `yaml:"interClusterChance"`
BranchRate int `yaml:"branchRate"`
InfectionCount int `yaml:"infectionCount"`
KeepAliveRate int `yaml:"keepAliveRate"`
}
// ParseConfiguration parses the mesh configuration

View File

@ -5,12 +5,10 @@ package conn
import (
"crypto/tls"
"errors"
"time"
logging "github.com/tim-beatham/wgmesh/pkg/log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
)
// PeerConnection represents a client-side connection between two
@ -43,11 +41,7 @@ func NewWgCtrlConnection(clientConfig *tls.Config, server string) (*WgCtrlConnec
// ConnectWithToken: Connects to a new gRPC peer given the address of the other server.
func (c *WgCtrlConnection) createGrpcConn() error {
conn, err := grpc.Dial(c.endpoint,
grpc.WithTransportCredentials(credentials.NewTLS(c.clientConfig)),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Minute,
Timeout: 30 * time.Minute,
}))
grpc.WithTransportCredentials(credentials.NewTLS(c.clientConfig)))
if err != nil {
logging.Log.WriteErrorf("Could not connect: %s\n", err.Error())

View File

@ -94,7 +94,12 @@ func NewConnectionManager(params *NewConnectionManageParams) (ConnectionManager,
}
connections := make(map[string]PeerConnection)
connMgr := ConnectionManagerImpl{sync.RWMutex{}, connections, serverConfig, clientConfig}
connMgr := ConnectionManagerImpl{sync.RWMutex{},
connections,
serverConfig,
clientConfig,
}
return &connMgr, nil
}
@ -131,6 +136,7 @@ func (m *ConnectionManagerImpl) AddConnection(endPoint string) (PeerConnection,
m.conLoc.Lock()
m.clientConnections[endPoint] = connections
m.conLoc.Unlock()
return connections, nil
}

View File

@ -6,6 +6,7 @@ import (
"github.com/tim-beatham/wgmesh/pkg/conn"
"github.com/tim-beatham/wgmesh/pkg/ip"
"github.com/tim-beatham/wgmesh/pkg/lib"
logging "github.com/tim-beatham/wgmesh/pkg/log"
"github.com/tim-beatham/wgmesh/pkg/mesh"
"github.com/tim-beatham/wgmesh/pkg/query"
"github.com/tim-beatham/wgmesh/pkg/rpc"
@ -84,11 +85,11 @@ func NewCtrlServer(params *NewCtrlServerParams) (*MeshCtrlServer, error) {
// Close closes the ctrl server tearing down any connections that exist
func (s *MeshCtrlServer) Close() error {
if err := s.ConnectionManager.Close(); err != nil {
return err
logging.Log.WriteErrorf(err.Error())
}
if err := s.ConnectionServer.Close(); err != nil {
return err
logging.Log.WriteErrorf(err.Error())
}
return nil

View File

@ -56,6 +56,7 @@ type MeshIpc interface {
EnableInterface(meshId string, reply *string) error
GetDOT(meshId string, reply *string) error
Query(query QueryMesh, reply *string) error
PutDescription(description string, reply *string) error
}
const SockAddr = "/tmp/wgmesh_ipc.sock"

View File

@ -1,9 +1,5 @@
package lib
import (
logging "github.com/tim-beatham/wgmesh/pkg/log"
)
// MapToSlice converts a map to a slice in go
func MapValues[K comparable, V any](m map[K]V) []V {
return MapValuesWithExclude(m, map[K]struct{}{})
@ -23,8 +19,6 @@ func MapValuesWithExclude[K comparable, V any](m map[K]V, exclude map[K]struct{}
continue
}
logging.Log.WriteInfof("Key %s", k)
values[i] = v
i++
}
@ -46,6 +40,7 @@ func MapKeys[K comparable, V any](m map[K]V) []K {
type convert[V1 any, V2 any] func(V1) V2
// Map turns a list of type V1 into type V2
func Map[V1 any, V2 any](list []V1, f convert[V1, V2]) []V2 {
newList := make([]V2, len(list))
@ -55,3 +50,19 @@ func Map[V1 any, V2 any](list []V1, f convert[V1, V2]) []V2 {
return newList
}
type filterFunc[V any] func(V) bool
// Filter filters out elements given a filter function.
// If filter function is true keep it in otherwise leave it out
func Filter[V any](list []V, f filterFunc[V]) []V {
newList := make([]V, 0)
for _, elem := range newList {
if f(elem) {
newList = append(newList, elem)
}
}
return newList
}

View File

@ -1,6 +1,8 @@
package lib
import "math/rand"
import (
"math/rand"
)
// RandomSubsetOfLength: Given an array of nodes generate of random
// subset of 'num' length.
@ -17,6 +19,7 @@ func RandomSubsetOfLength[V any](vs []V, num int) []V {
if _, ok := selectedIndices[randomIndex]; !ok {
randomSubset = append(randomSubset, vs[randomIndex])
selectedIndices[randomIndex] = struct{}{}
i++
}
}

View File

@ -1,6 +1,8 @@
package mesh
import (
"errors"
"fmt"
"net"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
@ -9,6 +11,7 @@ import (
// MeshConfigApplyer abstracts applying the mesh configuration
type MeshConfigApplyer interface {
ApplyConfig() error
RemovePeers(meshId string) error
}
// WgMeshConfigApplyer applies WireGuard configuration
@ -16,7 +19,7 @@ type WgMeshConfigApplyer struct {
meshManager *MeshManager
}
func ConvertMeshNode(node MeshNode) (*wgtypes.PeerConfig, error) {
func convertMeshNode(node MeshNode) (*wgtypes.PeerConfig, error) {
endpoint, err := net.ResolveUDPAddr("udp", node.GetWgEndpoint())
if err != nil {
@ -59,7 +62,7 @@ func (m *WgMeshConfigApplyer) updateWgConf(mesh MeshProvider) error {
var count int = 0
for _, n := range nodes {
peer, err := ConvertMeshNode(n)
peer, err := convertMeshNode(n)
if err != nil {
return err
@ -70,8 +73,7 @@ func (m *WgMeshConfigApplyer) updateWgConf(mesh MeshProvider) error {
}
cfg := wgtypes.Config{
Peers: peerConfigs,
ReplacePeers: true,
Peers: peerConfigs,
}
dev, err := mesh.GetDevice()
@ -95,6 +97,27 @@ func (m *WgMeshConfigApplyer) ApplyConfig() error {
return nil
}
func (m *WgMeshConfigApplyer) RemovePeers(meshId string) error {
mesh := m.meshManager.GetMesh(meshId)
if mesh == nil {
return errors.New(fmt.Sprintf("mesh %s does not exist", meshId))
}
dev, err := mesh.GetDevice()
if err != nil {
return err
}
m.meshManager.Client.ConfigureDevice(dev.Name, wgtypes.Config{
ReplacePeers: true,
Peers: make([]wgtypes.PeerConfig, 1),
})
return nil
}
func NewWgMeshConfigApplyer(manager *MeshManager) MeshConfigApplyer {
return &WgMeshConfigApplyer{meshManager: manager}
}

View File

@ -49,11 +49,23 @@ func (m *MeshManager) CreateMesh(devName string, port int) (string, error) {
return "", err
}
m.Meshes[meshId] = nodeManager
return meshId, m.interfaceManipulator.CreateInterface(&wg.CreateInterfaceParams{
err = m.interfaceManipulator.CreateInterface(&wg.CreateInterfaceParams{
IfName: devName,
Port: port,
})
if err != nil {
return "", nil
}
m.Meshes[meshId] = nodeManager
err = m.configApplyer.RemovePeers(meshId)
if err != nil {
logging.Log.WriteErrorf(err.Error())
}
return meshId, nil
}
type AddMeshParams struct {
@ -218,6 +230,22 @@ func (s *MeshManager) GetSelf(meshId string) (MeshNode, error) {
return node, nil
}
func (s *MeshManager) ApplyConfig() error {
return s.configApplyer.ApplyConfig()
}
func (s *MeshManager) SetDescription(description string) error {
for _, mesh := range s.Meshes {
err := mesh.SetDescription(s.HostParameters.HostEndpoint, description)
if err != nil {
return err
}
}
return nil
}
// UpdateTimeStamp updates the timestamp of this node in all meshes
func (s *MeshManager) UpdateTimeStamp() error {
for _, mesh := range s.Meshes {

View File

@ -26,6 +26,8 @@ type MeshNode interface {
GetRoutes() []string
// GetIdentifier: returns the identifier of the node
GetIdentifier() string
// GetDescription: returns the description for this node
GetDescription() string
}
type MeshSnapshot interface {
@ -63,6 +65,7 @@ type MeshProvider interface {
// AddRoutes: adds routes to the given node
AddRoutes(nodeId string, route ...string) error
GetSyncer() MeshSyncer
SetDescription(nodeId string, description string) error
}
// HostParameters contains the IDs of a node

84
pkg/query/query.go Normal file
View File

@ -0,0 +1,84 @@
package query
import (
"encoding/json"
"fmt"
"github.com/jmespath/go-jmespath"
"github.com/tim-beatham/wgmesh/pkg/lib"
"github.com/tim-beatham/wgmesh/pkg/mesh"
)
// Querier queries a data store for the given data
// and returns data in the corresponding encoding
type Querier interface {
Query(meshId string, queryParams string) ([]byte, error)
}
type JmesQuerier struct {
manager *mesh.MeshManager
}
type QueryError struct {
msg string
}
type QueryNode struct {
HostEndpoint string `json:"hostEndpoint"`
PublicKey string `json:"publicKey"`
WgEndpoint string `json:"wgEndpoint"`
WgHost string `json:"wgHost"`
Timestamp int64 `json:"timestmap"`
Description string `json:"description"`
Routes []string `json:"routes"`
}
func (m *QueryError) Error() string {
return m.msg
}
// Query: queries the data
func (j *JmesQuerier) Query(meshId, queryParams string) ([]byte, error) {
mesh, ok := j.manager.Meshes[meshId]
if !ok {
return nil, &QueryError{msg: fmt.Sprintf("%s does not exist", meshId)}
}
snapshot, err := mesh.GetMesh()
if err != nil {
return nil, err
}
nodes := lib.Map(lib.MapValues(snapshot.GetNodes()), meshNodeToQueryNode)
result, err := jmespath.Search(queryParams, nodes)
if err != nil {
return nil, err
}
bytes, err := json.Marshal(result)
return bytes, err
}
func meshNodeToQueryNode(node mesh.MeshNode) *QueryNode {
queryNode := new(QueryNode)
queryNode.HostEndpoint = node.GetHostEndpoint()
pubKey, _ := node.GetPublicKey()
queryNode.PublicKey = pubKey.String()
queryNode.WgEndpoint = node.GetWgEndpoint()
queryNode.WgHost = node.GetWgHost().String()
queryNode.Timestamp = node.GetTimeStamp()
queryNode.Routes = node.GetRoutes()
queryNode.Description = node.GetDescription()
return queryNode
}
func NewJmesQuerier(manager *mesh.MeshManager) Querier {
return &JmesQuerier{manager: manager}
}

View File

@ -187,6 +187,17 @@ func (n *IpcHandler) Query(params ipc.QueryMesh, reply *string) error {
return nil
}
func (n *IpcHandler) PutDescription(description string, reply *string) error {
err := n.Server.MeshManager.SetDescription(description)
if err != nil {
return err
}
*reply = fmt.Sprintf("Set description to %s", description)
return nil
}
type RobinIpcParams struct {
CtrlServer *ctrlserver.MeshCtrlServer
}

View File

@ -2,10 +2,12 @@ package sync
import (
"errors"
"math/rand"
"sync"
"time"
crdt "github.com/tim-beatham/wgmesh/pkg/automerge"
"github.com/tim-beatham/wgmesh/pkg/conf"
"github.com/tim-beatham/wgmesh/pkg/conn"
"github.com/tim-beatham/wgmesh/pkg/lib"
logging "github.com/tim-beatham/wgmesh/pkg/log"
"github.com/tim-beatham/wgmesh/pkg/mesh"
@ -18,27 +20,31 @@ type Syncer interface {
}
type SyncerImpl struct {
manager *mesh.MeshManager
requester SyncRequester
authenticatedNodes []crdt.MeshNodeCrdt
manager *mesh.MeshManager
requester SyncRequester
infectionCount int
syncCount int
cluster conn.ConnCluster
conf *conf.WgMeshConfiguration
}
const subSetLength = 3
// Sync: Sync random nodes
func (s *SyncerImpl) Sync(meshId string) error {
if !s.manager.HasChanges(meshId) {
logging.Log.WriteInfof("UPDATING WG CONF")
s.manager.ApplyConfig()
if !s.manager.HasChanges(meshId) && s.infectionCount == 0 {
logging.Log.WriteInfof("No changes for %s", meshId)
return nil
}
mesh := s.manager.GetMesh(meshId)
theMesh := s.manager.GetMesh(meshId)
if mesh == nil {
if theMesh == nil {
return errors.New("the provided mesh does not exist")
}
snapshot, err := mesh.GetMesh()
snapshot, err := theMesh.GetMesh()
if err != nil {
return err
@ -53,29 +59,49 @@ func (s *SyncerImpl) Sync(meshId string) error {
excludedNodes := map[string]struct{}{
s.manager.HostParameters.HostEndpoint: {},
}
meshNodes := lib.MapValuesWithExclude(nodes, excludedNodes)
randomSubset := lib.RandomSubsetOfLength(meshNodes, subSetLength)
getNames := func(node mesh.MeshNode) string {
return node.GetHostEndpoint()
}
nodeNames := lib.Map(meshNodes, getNames)
neighbours := s.cluster.GetNeighbours(nodeNames, s.manager.HostParameters.HostEndpoint)
randomSubset := lib.RandomSubsetOfLength(neighbours, s.conf.BranchRate)
for _, node := range randomSubset {
logging.Log.WriteInfof("Random node: %s", node)
}
before := time.Now()
if len(meshNodes) > s.conf.ClusterSize && rand.Float64() < s.conf.InterClusterChance {
logging.Log.WriteInfof("Sending to random cluster")
interCluster := s.cluster.GetInterCluster(nodeNames, s.manager.HostParameters.HostEndpoint)
randomSubset = append(randomSubset, interCluster)
}
var waitGroup sync.WaitGroup
for _, n := range randomSubset {
for index := range randomSubset {
waitGroup.Add(1)
syncMeshFunc := func() error {
go func(i int) error {
defer waitGroup.Done()
err := s.requester.SyncMesh(meshId, n.GetHostEndpoint())
err := s.requester.SyncMesh(meshId, randomSubset[i])
return err
}
go syncMeshFunc()
}(index)
}
waitGroup.Wait()
s.syncCount++
logging.Log.WriteInfof("SYNC TIME: %v", time.Now().Sub(before))
logging.Log.WriteInfof("SYNC COUNT: %d", s.syncCount)
s.infectionCount = ((s.conf.InfectionCount + s.infectionCount - 1) % s.conf.InfectionCount)
return nil
}
@ -92,6 +118,13 @@ func (s *SyncerImpl) SyncMeshes() error {
return nil
}
func NewSyncer(m *mesh.MeshManager, r SyncRequester) Syncer {
return &SyncerImpl{manager: m, requester: r}
func NewSyncer(m *mesh.MeshManager, conf *conf.WgMeshConfiguration, r SyncRequester) Syncer {
cluster, _ := conn.NewConnCluster(conf.ClusterSize)
return &SyncerImpl{
manager: m,
conf: conf,
requester: r,
infectionCount: 0,
syncCount: 0,
cluster: cluster}
}

View File

@ -16,15 +16,14 @@ type SyncScheduler interface {
// SyncSchedulerImpl scheduler for sync scheduling
type SyncSchedulerImpl struct {
syncRate int
quit chan struct{}
server *ctrlserver.MeshCtrlServer
syncer Syncer
quit chan struct{}
server *ctrlserver.MeshCtrlServer
syncer Syncer
}
// Run implements SyncScheduler.
func (s *SyncSchedulerImpl) Run() error {
ticker := time.NewTicker(time.Duration(s.syncRate) * time.Second)
ticker := time.NewTicker(time.Duration(s.server.Conf.SyncRate) * time.Second)
quit := make(chan struct{})
s.quit = quit
@ -50,7 +49,7 @@ func (s *SyncSchedulerImpl) Stop() error {
return nil
}
func NewSyncScheduler(s *ctrlserver.MeshCtrlServer, syncRequester SyncRequester, syncRate int) SyncScheduler {
syncer := NewSyncer(s.MeshManager, syncRequester)
return &SyncSchedulerImpl{server: s, syncRate: syncRate, syncer: syncer}
func NewSyncScheduler(s *ctrlserver.MeshCtrlServer, syncRequester SyncRequester) SyncScheduler {
syncer := NewSyncer(s.MeshManager, s.Conf, syncRequester)
return &SyncSchedulerImpl{server: s, syncer: syncer}
}

View File

@ -38,8 +38,8 @@ func (s *TimeStampSchedulerImpl) Run() error {
}
}
func NewTimestampScheduler(ctrlServer *ctrlserver.MeshCtrlServer, updateRate int) TimestampScheduler {
return &TimeStampSchedulerImpl{meshManager: ctrlServer.MeshManager, updateRate: updateRate}
func NewTimestampScheduler(ctrlServer *ctrlserver.MeshCtrlServer) TimestampScheduler {
return &TimeStampSchedulerImpl{meshManager: ctrlServer.MeshManager, updateRate: ctrlServer.Conf.KeepAliveRate}
}
func (s *TimeStampSchedulerImpl) Stop() error {

View File

@ -1,6 +1,7 @@
package wg
import (
"errors"
"fmt"
"net"
"os/exec"
@ -16,7 +17,8 @@ func createInterface(ifName string) error {
_, err := net.InterfaceByName(ifName)
if err == nil {
return &WgError{msg: fmt.Sprintf("Interface %s already exists", ifName)}
err = flushInterface(ifName)
return err
}
// Check if the interface exists
@ -75,19 +77,29 @@ func flushInterface(ifName string) error {
// EnableInterface flushes the interface and sets the ip address of the
// interface
func (m *WgInterfaceManipulatorImpl) EnableInterface(ifName string, ip string) error {
if len(ifName) == 0 {
return errors.New("ifName not provided")
}
err := flushInterface(ifName)
if err != nil {
return err
}
cmd := exec.Command("/usr/bin/ip", "link", "set", "up", "dev", ifName)
if err := cmd.Run(); err != nil {
return err
}
hostIp, _, err := net.ParseCIDR(ip)
if err != nil {
return err
}
cmd := exec.Command("/usr/bin/ip", "addr", "add", hostIp.String()+"/64", "dev", ifName)
cmd = exec.Command("/usr/bin/ip", "addr", "add", hostIp.String()+"/64", "dev", ifName)
if err := cmd.Run(); err != nil {
return err