From fd29af73e3680296c2a1c2930a0d2d832df49b2f Mon Sep 17 00:00:00 2001 From: Tim Beatham Date: Sun, 31 Dec 2023 12:44:50 +0000 Subject: [PATCH] 72-pull-rate-in-configuration - Added pull rate to configuration (finally) so this can be modified by an administrator. --- cmd/wg-mesh/main.go | 2 - cmd/wgmeshd/main.go | 5 ++ pkg/conf/conf.go | 9 ++-- pkg/conf/conf_test.go | 10 ++-- pkg/crdt/datastore.go | 2 +- pkg/crdt/datastore_test.go | 4 +- pkg/crdt/factory.go | 2 +- pkg/libp2p/advertiser.go | 105 +++++++++++++++++++++++++++++++++++++ pkg/mesh/manager.go | 4 ++ pkg/mesh/manager_test.go | 4 +- pkg/robin/requester.go | 9 ++-- pkg/sync/syncer.go | 3 +- pkg/sync/syncrequester.go | 2 +- pkg/sync/syncscheduler.go | 2 +- pkg/timers/timers.go | 2 +- 15 files changed, 138 insertions(+), 27 deletions(-) create mode 100644 pkg/libp2p/advertiser.go diff --git a/cmd/wg-mesh/main.go b/cmd/wg-mesh/main.go index 7f879f7..2894326 100644 --- a/cmd/wg-mesh/main.go +++ b/cmd/wg-mesh/main.go @@ -226,7 +226,6 @@ func main() { }) var newMeshRole *string = newMeshCmd.Selector("r", "role", []string{"peer", "client"}, &argparse.Options{ - Default: "peer", Help: "Role in the mesh network. A value of peer means that the node is publicly routeable and thus considered" + " in the gossip protocol. Client means that the node is not publicly routeable and is not a candidate in the gossip" + " protocol", @@ -259,7 +258,6 @@ func main() { }) var joinMeshRole *string = joinMeshCmd.Selector("r", "role", []string{"peer", "client"}, &argparse.Options{ - Default: "peer", Help: "Role in the mesh network. A value of peer means that the node is publicly routeable and thus considered" + " in the gossip protocol. Client means that the node is not publicly routeable and is not a candidate in the gossip" + " protocol", diff --git a/cmd/wgmeshd/main.go b/cmd/wgmeshd/main.go index 95aa9b3..226b2d7 100644 --- a/cmd/wgmeshd/main.go +++ b/cmd/wgmeshd/main.go @@ -59,6 +59,11 @@ func main() { } ctrlServer, err := ctrlserver.NewCtrlServer(&ctrlServerParams) + + if err != nil { + panic(err) + } + syncProvider.Server = ctrlServer syncRequester = sync.NewSyncRequester(ctrlServer) syncer = sync.NewSyncer(ctrlServer.MeshManager, conf, syncRequester) diff --git a/pkg/conf/conf.go b/pkg/conf/conf.go index c38f85c..3276a78 100644 --- a/pkg/conf/conf.go +++ b/pkg/conf/conf.go @@ -47,7 +47,6 @@ type WgConfiguration struct { // If the user is globaly accessible they specify themselves as a client. Role *NodeType `yaml:"role" validate:"required,eq=client|eq=peer"` // KeepAliveWg configures the implementation so that we send keep alive packets to peers. - // KeepAlive can only be set if role is type client KeepAliveWg *int `yaml:"keepAliveWg" validate:"omitempty,gte=0"` // PreUp are WireGuard commands to run before adding the WG interface PreUp []string `yaml:"preUp"` @@ -77,13 +76,13 @@ type DaemonConfiguration struct { Profile bool `yaml:"profile"` // StubWg whether or not to stub the WireGuard types StubWg bool `yaml:"stubWg"` - // SyncRate specifies how long the minimum time should be between synchronisation - SyncRate int `yaml:"syncRate" validate:"required,gte=1"` + // SyncTime specifies how long the minimum time should be between synchronisation + SyncTime int `yaml:"syncTime" validate:"required,gte=1"` // PullTime specifies the interval between checking for configuration changes PullTime int `yaml:"pullTime" validate:"required,gte=0"` - // KeepAliveTime: number of seconds before the leader of the mesh sends an update to + // HeartBeat: number of seconds before the leader of the mesh sends an update to // send to every member in the mesh - KeepAliveTime int `yaml:"keepAliveTime" validate:"required,gte=1"` + HeartBeat int `yaml:"heartBeatTime" validate:"required,gte=1"` // ClusterSize specifies how many neighbours you should synchronise with per round ClusterSize int `yaml:"clusterSize" validate:"gte=1"` // InterClusterChance specifies the probabilityof inter-cluster communication in a sync round diff --git a/pkg/conf/conf_test.go b/pkg/conf/conf_test.go index 5f6aa62..4189470 100644 --- a/pkg/conf/conf_test.go +++ b/pkg/conf/conf_test.go @@ -21,8 +21,8 @@ func getExampleConfiguration() *DaemonConfiguration { Timeout: 5, Profile: false, StubWg: false, - SyncRate: 2, - KeepAliveTime: 2, + SyncTime: 2, + HeartBeat: 2, ClusterSize: 64, InterClusterChance: 0.15, BranchRate: 3, @@ -163,9 +163,9 @@ func TestBranchRateZero(t *testing.T) { } } -func TestSyncRateZero(t *testing.T) { +func TestsyncTimeZero(t *testing.T) { conf := getExampleConfiguration() - conf.SyncRate = 0 + conf.SyncTime = 0 err := ValidateDaemonConfiguration(conf) @@ -176,7 +176,7 @@ func TestSyncRateZero(t *testing.T) { func TestKeepAliveTimeZero(t *testing.T) { conf := getExampleConfiguration() - conf.KeepAliveTime = 0 + conf.HeartBeat = 0 err := ValidateDaemonConfiguration(conf) if err == nil { diff --git a/pkg/crdt/datastore.go b/pkg/crdt/datastore.go index 229f5ed..d5ef623 100644 --- a/pkg/crdt/datastore.go +++ b/pkg/crdt/datastore.go @@ -264,7 +264,7 @@ func (m *TwoPhaseStoreMeshManager) UpdateTimeStamp(nodeId string) error { peerToUpdate := peers[0] - if uint64(time.Now().Unix())-m.store.Clock.GetTimestamp(peerToUpdate) > 3*uint64(m.DaemonConf.KeepAliveTime) { + if uint64(time.Now().Unix())-m.store.Clock.GetTimestamp(peerToUpdate) > 3*uint64(m.DaemonConf.HeartBeat) { m.store.Mark(peerToUpdate) if len(peers) < 2 { diff --git a/pkg/crdt/datastore_test.go b/pkg/crdt/datastore_test.go index 4b8fa87..c86ec81 100644 --- a/pkg/crdt/datastore_test.go +++ b/pkg/crdt/datastore_test.go @@ -32,8 +32,8 @@ func setUpTests() *TestParams { GrpcPort: 0, Timeout: 20, Profile: false, - SyncRate: 2, - KeepAliveTime: 10, + SyncTime: 2, + HeartBeat: 10, ClusterSize: 32, InterClusterChance: 0.15, BranchRate: 3, diff --git a/pkg/crdt/factory.go b/pkg/crdt/factory.go index d2d782c..9cefe05 100644 --- a/pkg/crdt/factory.go +++ b/pkg/crdt/factory.go @@ -24,7 +24,7 @@ func (f *TwoPhaseMapFactory) CreateMesh(params *mesh.MeshProviderFactoryParams) h := fnv.New64a() h.Write([]byte(s)) return h.Sum64() - }, uint64(3*f.Config.KeepAliveTime)), + }, uint64(3*f.Config.HeartBeat)), }, nil } diff --git a/pkg/libp2p/advertiser.go b/pkg/libp2p/advertiser.go new file mode 100644 index 0000000..3f1457f --- /dev/null +++ b/pkg/libp2p/advertiser.go @@ -0,0 +1,105 @@ +package main + +import ( + "bufio" + "context" + "sync" + + "github.com/libp2p/go-libp2p" + dht "github.com/libp2p/go-libp2p-kad-dht" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing" + dutil "github.com/libp2p/go-libp2p/p2p/discovery/util" + logging "github.com/tim-beatham/wgmesh/pkg/log" +) + +const PROTOCOL_ID = "/smegmesh/1.0" + +type Advertiser interface { +} + +type Libp2pAdvertiser struct { + host host.Host + dht *dht.IpfsDHT + *drouting.RoutingDiscovery +} + +func readData(bf *bufio.ReadWriter) { +} + +func writeData(bf *bufio.ReadWriter) { + bf.Write([]byte("My name is Tim")) +} + +func handleStream(stream network.Stream) { + logging.Log.WriteInfof("Received a new stream!") + + rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) + + go readData(rw) + go writeData(rw) +} + +func NewLibP2PAdvertiser() (Advertiser, error) { + logging.Log.WriteInfof("setting up") + + addrs := libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0") + host, err := libp2p.New(addrs) + + if err != nil { + return nil, err + } + + logging.Log.WriteInfof("Host created. We are: ", host.ID()) + + ctx := context.Background() + + logging.Log.WriteInfof("creating DHT") + + kDHT, err := dht.New(ctx, host) + + if err != nil { + return nil, err + } + + logging.Log.WriteInfof("bootstrapping the DHT") + if err := kDHT.Bootstrap(ctx); err != nil { + return nil, err + } + + var wg sync.WaitGroup + for _, peerAddr := range dht.DefaultBootstrapPeers { + peerinfo, _ := peer.AddrInfoFromP2pAddr(peerAddr) + wg.Add(1) + go func() { + defer wg.Done() + if err := host.Connect(ctx, *peerinfo); err != nil { + logging.Log.WriteErrorf(err.Error()) + } else { + logging.Log.WriteInfof("Connection established with bootstrap node:", *peerinfo) + } + }() + } + wg.Wait() + + // We use a rendezvous point "meet me here" to announce our location. + // This is like telling your friends to meet you at the Eiffel Tower. + logging.Log.WriteInfof("Announcing ourselves...") + routingDiscovery := drouting.NewRoutingDiscovery(kDHT) + dutil.Advertise(ctx, routingDiscovery, "bobmarley") + logging.Log.WriteInfof("Successfully announced!") + + select {} + + return nil, err +} + +func main() { + _, err := NewLibP2PAdvertiser() + + if err != nil { + logging.Log.WriteInfof(err.Error()) + } +} diff --git a/pkg/mesh/manager.go b/pkg/mesh/manager.go index 0069b03..ee82101 100644 --- a/pkg/mesh/manager.go +++ b/pkg/mesh/manager.go @@ -134,6 +134,10 @@ func (m *MeshManagerImpl) CreateMesh(args *CreateMeshParams) (string, error) { return "", err } + if *meshConfiguration.Role == conf.CLIENT_ROLE { + return "", fmt.Errorf("cannot create mesh as a client") + } + meshId, err := m.idGenerator.GetId() var ifName string = "" diff --git a/pkg/mesh/manager_test.go b/pkg/mesh/manager_test.go index 90b0059..46af9c2 100644 --- a/pkg/mesh/manager_test.go +++ b/pkg/mesh/manager_test.go @@ -24,8 +24,8 @@ func getMeshConfiguration() *conf.DaemonConfiguration { Timeout: 5, Profile: false, StubWg: true, - SyncRate: 2, - KeepAliveTime: 60, + SyncTime: 2, + HeartBeat: 60, ClusterSize: 64, InterClusterChance: 0.15, BranchRate: 3, diff --git a/pkg/robin/requester.go b/pkg/robin/requester.go index 81bb82b..5d86f80 100644 --- a/pkg/robin/requester.go +++ b/pkg/robin/requester.go @@ -43,10 +43,6 @@ func getOverrideConfiguration(args *ipc.WireGuardArgs) conf.WgConfiguration { func (n *IpcHandler) CreateMesh(args *ipc.NewMeshArgs, reply *string) error { overrideConf := getOverrideConfiguration(&args.WgArgs) - if overrideConf.Role != nil && *overrideConf.Role == conf.CLIENT_ROLE { - return fmt.Errorf("cannot create a mesh with no public endpoint") - } - meshId, err := n.Server.GetMeshManager().CreateMesh(&mesh.CreateMeshParams{ Port: args.WgArgs.WgPort, Conf: &overrideConf, @@ -86,6 +82,10 @@ func (n *IpcHandler) ListMeshes(_ string, reply *ipc.ListMeshReply) error { func (n *IpcHandler) JoinMesh(args ipc.JoinMeshArgs, reply *string) error { overrideConf := getOverrideConfiguration(&args.WgArgs) + if n.Server.GetMeshManager().GetMesh(args.MeshId) != nil { + return fmt.Errorf("user is already apart of the mesh") + } + peerConnection, err := n.Server.GetConnectionManager().GetConnection(args.IpAdress) if err != nil { @@ -147,7 +147,6 @@ func (n *IpcHandler) LeaveMesh(meshId string, reply *string) error { if err == nil { *reply = fmt.Sprintf("Left Mesh %s", meshId) } - return err } diff --git a/pkg/sync/syncer.go b/pkg/sync/syncer.go index c64a86e..f3b0c27 100644 --- a/pkg/sync/syncer.go +++ b/pkg/sync/syncer.go @@ -84,7 +84,8 @@ func (s *SyncerImpl) Sync(correspondingMesh mesh.MeshProvider) error { return nil } - // Peer with 2 nodes + // Peer with 2 nodes so that there is redundnacy in + // the situation the node leaves pre-emptively redundancyLength := min(len(neighbours), 2) gossipNodes = neighbours[:redundancyLength] } else { diff --git a/pkg/sync/syncrequester.go b/pkg/sync/syncrequester.go index 4caf029..dc9e77e 100644 --- a/pkg/sync/syncrequester.go +++ b/pkg/sync/syncrequester.go @@ -91,7 +91,7 @@ func (s *SyncRequesterImpl) SyncMesh(meshId string, meshNode mesh.MeshNode) erro c := rpc.NewSyncServiceClient(client) - syncTimeOut := float64(s.server.Conf.SyncRate) * float64(time.Second) + syncTimeOut := float64(s.server.Conf.SyncTime) * float64(time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(syncTimeOut)) defer cancel() diff --git a/pkg/sync/syncscheduler.go b/pkg/sync/syncscheduler.go index 35c1c19..54d073a 100644 --- a/pkg/sync/syncscheduler.go +++ b/pkg/sync/syncscheduler.go @@ -14,5 +14,5 @@ func syncFunction(syncer Syncer) lib.TimerFunc { } func NewSyncScheduler(s *ctrlserver.MeshCtrlServer, syncRequester SyncRequester, syncer Syncer) *lib.Timer { - return lib.NewTimer(syncFunction(syncer), s.Conf.SyncRate) + return lib.NewTimer(syncFunction(syncer), s.Conf.SyncTime) } diff --git a/pkg/timers/timers.go b/pkg/timers/timers.go index e26a644..3522775 100644 --- a/pkg/timers/timers.go +++ b/pkg/timers/timers.go @@ -11,5 +11,5 @@ func NewTimestampScheduler(ctrlServer *ctrlserver.MeshCtrlServer) lib.Timer { logging.Log.WriteInfof("Updated Timestamp") return ctrlServer.MeshManager.UpdateTimeStamp() } - return *lib.NewTimer(timerFunc, ctrlServer.Conf.KeepAliveTime) + return *lib.NewTimer(timerFunc, ctrlServer.Conf.HeartBeat) }