72-pull-rate-in-configuration

- Added pull rate to configuration (finally) so this can
be modified by an administrator.
This commit is contained in:
Tim Beatham 2023-12-31 12:44:50 +00:00
parent 9e1058e0f2
commit fd29af73e3
15 changed files with 138 additions and 27 deletions

View File

@ -226,7 +226,6 @@ func main() {
})
var newMeshRole *string = newMeshCmd.Selector("r", "role", []string{"peer", "client"}, &argparse.Options{
Default: "peer",
Help: "Role in the mesh network. A value of peer means that the node is publicly routeable and thus considered" +
" in the gossip protocol. Client means that the node is not publicly routeable and is not a candidate in the gossip" +
" protocol",
@ -259,7 +258,6 @@ func main() {
})
var joinMeshRole *string = joinMeshCmd.Selector("r", "role", []string{"peer", "client"}, &argparse.Options{
Default: "peer",
Help: "Role in the mesh network. A value of peer means that the node is publicly routeable and thus considered" +
" in the gossip protocol. Client means that the node is not publicly routeable and is not a candidate in the gossip" +
" protocol",

View File

@ -59,6 +59,11 @@ func main() {
}
ctrlServer, err := ctrlserver.NewCtrlServer(&ctrlServerParams)
if err != nil {
panic(err)
}
syncProvider.Server = ctrlServer
syncRequester = sync.NewSyncRequester(ctrlServer)
syncer = sync.NewSyncer(ctrlServer.MeshManager, conf, syncRequester)

View File

@ -47,7 +47,6 @@ type WgConfiguration struct {
// If the user is globaly accessible they specify themselves as a client.
Role *NodeType `yaml:"role" validate:"required,eq=client|eq=peer"`
// KeepAliveWg configures the implementation so that we send keep alive packets to peers.
// KeepAlive can only be set if role is type client
KeepAliveWg *int `yaml:"keepAliveWg" validate:"omitempty,gte=0"`
// PreUp are WireGuard commands to run before adding the WG interface
PreUp []string `yaml:"preUp"`
@ -77,13 +76,13 @@ type DaemonConfiguration struct {
Profile bool `yaml:"profile"`
// StubWg whether or not to stub the WireGuard types
StubWg bool `yaml:"stubWg"`
// SyncRate specifies how long the minimum time should be between synchronisation
SyncRate int `yaml:"syncRate" validate:"required,gte=1"`
// SyncTime specifies how long the minimum time should be between synchronisation
SyncTime int `yaml:"syncTime" validate:"required,gte=1"`
// PullTime specifies the interval between checking for configuration changes
PullTime int `yaml:"pullTime" validate:"required,gte=0"`
// KeepAliveTime: number of seconds before the leader of the mesh sends an update to
// HeartBeat: number of seconds before the leader of the mesh sends an update to
// send to every member in the mesh
KeepAliveTime int `yaml:"keepAliveTime" validate:"required,gte=1"`
HeartBeat int `yaml:"heartBeatTime" validate:"required,gte=1"`
// ClusterSize specifies how many neighbours you should synchronise with per round
ClusterSize int `yaml:"clusterSize" validate:"gte=1"`
// InterClusterChance specifies the probabilityof inter-cluster communication in a sync round

View File

@ -21,8 +21,8 @@ func getExampleConfiguration() *DaemonConfiguration {
Timeout: 5,
Profile: false,
StubWg: false,
SyncRate: 2,
KeepAliveTime: 2,
SyncTime: 2,
HeartBeat: 2,
ClusterSize: 64,
InterClusterChance: 0.15,
BranchRate: 3,
@ -163,9 +163,9 @@ func TestBranchRateZero(t *testing.T) {
}
}
func TestSyncRateZero(t *testing.T) {
func TestsyncTimeZero(t *testing.T) {
conf := getExampleConfiguration()
conf.SyncRate = 0
conf.SyncTime = 0
err := ValidateDaemonConfiguration(conf)
@ -176,7 +176,7 @@ func TestSyncRateZero(t *testing.T) {
func TestKeepAliveTimeZero(t *testing.T) {
conf := getExampleConfiguration()
conf.KeepAliveTime = 0
conf.HeartBeat = 0
err := ValidateDaemonConfiguration(conf)
if err == nil {

View File

@ -264,7 +264,7 @@ func (m *TwoPhaseStoreMeshManager) UpdateTimeStamp(nodeId string) error {
peerToUpdate := peers[0]
if uint64(time.Now().Unix())-m.store.Clock.GetTimestamp(peerToUpdate) > 3*uint64(m.DaemonConf.KeepAliveTime) {
if uint64(time.Now().Unix())-m.store.Clock.GetTimestamp(peerToUpdate) > 3*uint64(m.DaemonConf.HeartBeat) {
m.store.Mark(peerToUpdate)
if len(peers) < 2 {

View File

@ -32,8 +32,8 @@ func setUpTests() *TestParams {
GrpcPort: 0,
Timeout: 20,
Profile: false,
SyncRate: 2,
KeepAliveTime: 10,
SyncTime: 2,
HeartBeat: 10,
ClusterSize: 32,
InterClusterChance: 0.15,
BranchRate: 3,

View File

@ -24,7 +24,7 @@ func (f *TwoPhaseMapFactory) CreateMesh(params *mesh.MeshProviderFactoryParams)
h := fnv.New64a()
h.Write([]byte(s))
return h.Sum64()
}, uint64(3*f.Config.KeepAliveTime)),
}, uint64(3*f.Config.HeartBeat)),
}, nil
}

105
pkg/libp2p/advertiser.go Normal file
View File

@ -0,0 +1,105 @@
package main
import (
"bufio"
"context"
"sync"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing"
dutil "github.com/libp2p/go-libp2p/p2p/discovery/util"
logging "github.com/tim-beatham/wgmesh/pkg/log"
)
const PROTOCOL_ID = "/smegmesh/1.0"
type Advertiser interface {
}
type Libp2pAdvertiser struct {
host host.Host
dht *dht.IpfsDHT
*drouting.RoutingDiscovery
}
func readData(bf *bufio.ReadWriter) {
}
func writeData(bf *bufio.ReadWriter) {
bf.Write([]byte("My name is Tim"))
}
func handleStream(stream network.Stream) {
logging.Log.WriteInfof("Received a new stream!")
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
go readData(rw)
go writeData(rw)
}
func NewLibP2PAdvertiser() (Advertiser, error) {
logging.Log.WriteInfof("setting up")
addrs := libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")
host, err := libp2p.New(addrs)
if err != nil {
return nil, err
}
logging.Log.WriteInfof("Host created. We are: ", host.ID())
ctx := context.Background()
logging.Log.WriteInfof("creating DHT")
kDHT, err := dht.New(ctx, host)
if err != nil {
return nil, err
}
logging.Log.WriteInfof("bootstrapping the DHT")
if err := kDHT.Bootstrap(ctx); err != nil {
return nil, err
}
var wg sync.WaitGroup
for _, peerAddr := range dht.DefaultBootstrapPeers {
peerinfo, _ := peer.AddrInfoFromP2pAddr(peerAddr)
wg.Add(1)
go func() {
defer wg.Done()
if err := host.Connect(ctx, *peerinfo); err != nil {
logging.Log.WriteErrorf(err.Error())
} else {
logging.Log.WriteInfof("Connection established with bootstrap node:", *peerinfo)
}
}()
}
wg.Wait()
// We use a rendezvous point "meet me here" to announce our location.
// This is like telling your friends to meet you at the Eiffel Tower.
logging.Log.WriteInfof("Announcing ourselves...")
routingDiscovery := drouting.NewRoutingDiscovery(kDHT)
dutil.Advertise(ctx, routingDiscovery, "bobmarley")
logging.Log.WriteInfof("Successfully announced!")
select {}
return nil, err
}
func main() {
_, err := NewLibP2PAdvertiser()
if err != nil {
logging.Log.WriteInfof(err.Error())
}
}

View File

@ -134,6 +134,10 @@ func (m *MeshManagerImpl) CreateMesh(args *CreateMeshParams) (string, error) {
return "", err
}
if *meshConfiguration.Role == conf.CLIENT_ROLE {
return "", fmt.Errorf("cannot create mesh as a client")
}
meshId, err := m.idGenerator.GetId()
var ifName string = ""

View File

@ -24,8 +24,8 @@ func getMeshConfiguration() *conf.DaemonConfiguration {
Timeout: 5,
Profile: false,
StubWg: true,
SyncRate: 2,
KeepAliveTime: 60,
SyncTime: 2,
HeartBeat: 60,
ClusterSize: 64,
InterClusterChance: 0.15,
BranchRate: 3,

View File

@ -43,10 +43,6 @@ func getOverrideConfiguration(args *ipc.WireGuardArgs) conf.WgConfiguration {
func (n *IpcHandler) CreateMesh(args *ipc.NewMeshArgs, reply *string) error {
overrideConf := getOverrideConfiguration(&args.WgArgs)
if overrideConf.Role != nil && *overrideConf.Role == conf.CLIENT_ROLE {
return fmt.Errorf("cannot create a mesh with no public endpoint")
}
meshId, err := n.Server.GetMeshManager().CreateMesh(&mesh.CreateMeshParams{
Port: args.WgArgs.WgPort,
Conf: &overrideConf,
@ -86,6 +82,10 @@ func (n *IpcHandler) ListMeshes(_ string, reply *ipc.ListMeshReply) error {
func (n *IpcHandler) JoinMesh(args ipc.JoinMeshArgs, reply *string) error {
overrideConf := getOverrideConfiguration(&args.WgArgs)
if n.Server.GetMeshManager().GetMesh(args.MeshId) != nil {
return fmt.Errorf("user is already apart of the mesh")
}
peerConnection, err := n.Server.GetConnectionManager().GetConnection(args.IpAdress)
if err != nil {
@ -147,7 +147,6 @@ func (n *IpcHandler) LeaveMesh(meshId string, reply *string) error {
if err == nil {
*reply = fmt.Sprintf("Left Mesh %s", meshId)
}
return err
}

View File

@ -84,7 +84,8 @@ func (s *SyncerImpl) Sync(correspondingMesh mesh.MeshProvider) error {
return nil
}
// Peer with 2 nodes
// Peer with 2 nodes so that there is redundnacy in
// the situation the node leaves pre-emptively
redundancyLength := min(len(neighbours), 2)
gossipNodes = neighbours[:redundancyLength]
} else {

View File

@ -91,7 +91,7 @@ func (s *SyncRequesterImpl) SyncMesh(meshId string, meshNode mesh.MeshNode) erro
c := rpc.NewSyncServiceClient(client)
syncTimeOut := float64(s.server.Conf.SyncRate) * float64(time.Second)
syncTimeOut := float64(s.server.Conf.SyncTime) * float64(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(syncTimeOut))
defer cancel()

View File

@ -14,5 +14,5 @@ func syncFunction(syncer Syncer) lib.TimerFunc {
}
func NewSyncScheduler(s *ctrlserver.MeshCtrlServer, syncRequester SyncRequester, syncer Syncer) *lib.Timer {
return lib.NewTimer(syncFunction(syncer), s.Conf.SyncRate)
return lib.NewTimer(syncFunction(syncer), s.Conf.SyncTime)
}

View File

@ -11,5 +11,5 @@ func NewTimestampScheduler(ctrlServer *ctrlserver.MeshCtrlServer) lib.Timer {
logging.Log.WriteInfof("Updated Timestamp")
return ctrlServer.MeshManager.UpdateTimeStamp()
}
return *lib.NewTimer(timerFunc, ctrlServer.Conf.KeepAliveTime)
return *lib.NewTimer(timerFunc, ctrlServer.Conf.HeartBeat)
}