Added sync

This commit is contained in:
Tim Beatham
2023-10-10 20:14:40 +01:00
parent e729c5b181
commit ec87afc235
16 changed files with 794 additions and 41 deletions

20
pkg/sync/syncer.go Normal file
View File

@@ -0,0 +1,20 @@
package sync
// Syncer: picks random nodes from the mesh
type Syncer interface {
Sync(meshId string) error
SyncMeshes() error
}
type SyncerImpl struct {
}
// Sync: Sync random nodes
func (s *SyncerImpl) Sync(meshId string) error {
return nil
}
// SyncMeshes:
func (s *SyncerImpl) SyncMeshes() error {
return nil
}

111
pkg/sync/syncrequester.go Normal file
View File

@@ -0,0 +1,111 @@
package sync
import (
"context"
"errors"
"time"
"github.com/tim-beatham/wgmesh/pkg/ctrlserver"
"github.com/tim-beatham/wgmesh/pkg/rpc"
)
// SyncRequester: coordinates the syncing of meshes
type SyncRequester interface {
GetMesh(meshId string) error
SyncMesh(meshid string) error
}
type SyncRequesterImpl struct {
server *ctrlserver.MeshCtrlServer
}
// GetMesh: Retrieves the local state of the mesh at the endpoint
func (s *SyncRequesterImpl) GetMesh(meshId string, endPoint string) error {
peerConnection, err := s.server.ConnectionManager.GetConnection(endPoint)
if err != nil {
return err
}
err = peerConnection.Connect()
if err != nil {
return err
}
client, err := peerConnection.GetClient()
if err != nil {
return err
}
c := rpc.NewSyncServiceClient(client)
authContext, err := peerConnection.CreateAuthContext(meshId)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(authContext, time.Second)
defer cancel()
reply, err := c.GetConf(ctx, &rpc.GetConfRequest{MeshId: meshId})
if err != nil {
return err
}
err = s.server.MeshManager.AddMesh(meshId, s.server.Conf.IfName, reply.Mesh)
return err
}
// SyncMesh: Proactively send a sync request to the other mesh
func (s *SyncRequesterImpl) SyncMesh(meshId string, endpoint string) error {
peerConnection, err := s.server.ConnectionManager.GetConnection(endpoint)
if err != nil {
return err
}
err = peerConnection.Connect()
if err != nil {
return err
}
client, err := peerConnection.GetClient()
if err != nil {
return err
}
authContext, err := peerConnection.CreateAuthContext(meshId)
if err != nil {
return err
}
mesh := s.server.MeshManager.GetMesh(meshId)
if mesh == nil {
return errors.New("mesh does not exist")
}
syncMeshRequest := rpc.SyncMeshRequest{
MeshId: meshId,
Changes: mesh.SaveChanges(),
}
c := rpc.NewSyncServiceClient(client)
ctx, cancel := context.WithTimeout(authContext, time.Second)
defer cancel()
_, err = c.SyncMesh(ctx, &syncMeshRequest)
if err != nil {
return err
}
return nil
}

46
pkg/sync/syncscheduler.go Normal file
View File

@@ -0,0 +1,46 @@
package sync
import (
"time"
"github.com/tim-beatham/wgmesh/pkg/ctrlserver"
)
// SyncScheduler: Loops through all nodes in the mesh and runs a schedule to
// sync each event
type SyncScheduler interface {
Run() error
Stop() error
}
type SyncSchedulerImpl struct {
quit chan struct{}
server *ctrlserver.MeshCtrlServer
}
// Run implements SyncScheduler.
func (s *SyncSchedulerImpl) Run() error {
ticker := time.NewTicker(time.Second)
quit := make(chan struct{})
s.quit = quit
for {
select {
case <-ticker.C:
break
case <-quit:
break
}
}
}
// Stop implements SyncScheduler.
func (s *SyncSchedulerImpl) Stop() error {
close(s.quit)
return nil
}
func NewSyncScheduler(s *ctrlserver.MeshCtrlServer) SyncScheduler {
return &SyncSchedulerImpl{server: s}
}

49
pkg/sync/syncservice.go Normal file
View File

@@ -0,0 +1,49 @@
// sync merges shared state between two nodes
package sync
import (
"context"
"errors"
"github.com/tim-beatham/wgmesh/pkg/ctrlserver"
"github.com/tim-beatham/wgmesh/pkg/rpc"
)
type SyncServiceImpl struct {
server *ctrlserver.MeshCtrlServer
}
// GetMesh: Gets a nodes local mesh configuration as a CRDT
func (s *SyncServiceImpl) GetConf(context context.Context, request *rpc.GetConfRequest) (*rpc.GetConfReply, error) {
mesh := s.server.MeshManager.GetMesh(request.MeshId)
if mesh == nil {
return nil, errors.New("mesh does not exist")
}
meshBytes := mesh.Save()
reply := rpc.GetConfReply{
Mesh: meshBytes,
}
return &reply, nil
}
// Sync: Pings a node and syncs the mesh configuration with the other node
func (s *SyncServiceImpl) SyncMesh(conext context.Context, request *rpc.SyncMeshRequest) (*rpc.SyncMeshReply, error) {
mesh := s.server.MeshManager.GetMesh(request.MeshId)
if mesh == nil {
return nil, errors.New("mesh does not exist")
}
err := s.server.MeshManager.UpdateMesh(request.MeshId, request.Changes)
if err != nil {
return nil, err
}
return &rpc.SyncMeshReply{Success: true}, nil
}