From e63edea763e55d1efa274b383d290efdae4fe0c7 Mon Sep 17 00:00:00 2001 From: Tim Beatham Date: Wed, 1 Nov 2023 10:39:46 +0000 Subject: [PATCH] Fixing an issue where packets are dropped each time we change wg configuration --- go.mod | 2 +- pkg/lib/conv.go | 6 ------ pkg/mesh/config.go | 7 +++---- pkg/mesh/manager.go | 4 ++++ pkg/sync/syncer.go | 12 ++++++++++-- pkg/wg/wg.go | 13 ++++++++++++- 6 files changed, 30 insertions(+), 14 deletions(-) diff --git a/go.mod b/go.mod index f6805d8..ee1d7c4 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/akamensky/argparse v1.4.0 github.com/automerge/automerge-go v0.0.0-20230903201930-b80ce8aadbb9 github.com/google/uuid v1.3.0 + github.com/jmespath/go-jmespath v0.4.0 github.com/sirupsen/logrus v1.9.3 golang.zx2c4.com/wireguard/wgctrl v0.0.0-20230429144221-925a1e7659e6 google.golang.org/grpc v1.58.1 @@ -16,7 +17,6 @@ require ( require ( github.com/golang/protobuf v1.5.3 // indirect github.com/google/go-cmp v0.5.9 // indirect - github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/native v1.1.0 // indirect github.com/mdlayher/genetlink v1.3.2 // indirect github.com/mdlayher/netlink v1.7.2 // indirect diff --git a/pkg/lib/conv.go b/pkg/lib/conv.go index 0536ae4..7fb47d0 100644 --- a/pkg/lib/conv.go +++ b/pkg/lib/conv.go @@ -1,9 +1,5 @@ package lib -import ( - logging "github.com/tim-beatham/wgmesh/pkg/log" -) - // MapToSlice converts a map to a slice in go func MapValues[K comparable, V any](m map[K]V) []V { return MapValuesWithExclude(m, map[K]struct{}{}) @@ -23,8 +19,6 @@ func MapValuesWithExclude[K comparable, V any](m map[K]V, exclude map[K]struct{} continue } - logging.Log.WriteInfof("Key %s", k) - values[i] = v i++ } diff --git a/pkg/mesh/config.go b/pkg/mesh/config.go index f42ab33..aea7f86 100644 --- a/pkg/mesh/config.go +++ b/pkg/mesh/config.go @@ -16,7 +16,7 @@ type WgMeshConfigApplyer struct { meshManager *MeshManager } -func ConvertMeshNode(node MeshNode) (*wgtypes.PeerConfig, error) { +func convertMeshNode(node MeshNode) (*wgtypes.PeerConfig, error) { endpoint, err := net.ResolveUDPAddr("udp", node.GetWgEndpoint()) if err != nil { @@ -59,7 +59,7 @@ func (m *WgMeshConfigApplyer) updateWgConf(mesh MeshProvider) error { var count int = 0 for _, n := range nodes { - peer, err := ConvertMeshNode(n) + peer, err := convertMeshNode(n) if err != nil { return err @@ -70,8 +70,7 @@ func (m *WgMeshConfigApplyer) updateWgConf(mesh MeshProvider) error { } cfg := wgtypes.Config{ - Peers: peerConfigs, - ReplacePeers: true, + Peers: peerConfigs, } dev, err := mesh.GetDevice() diff --git a/pkg/mesh/manager.go b/pkg/mesh/manager.go index 165fb62..5d76b0b 100644 --- a/pkg/mesh/manager.go +++ b/pkg/mesh/manager.go @@ -218,6 +218,10 @@ func (s *MeshManager) GetSelf(meshId string) (MeshNode, error) { return node, nil } +func (s *MeshManager) ApplyConfig() error { + return s.configApplyer.ApplyConfig() +} + // UpdateTimeStamp updates the timestamp of this node in all meshes func (s *MeshManager) UpdateTimeStamp() error { for _, mesh := range s.Meshes { diff --git a/pkg/sync/syncer.go b/pkg/sync/syncer.go index 45bb94b..9333206 100644 --- a/pkg/sync/syncer.go +++ b/pkg/sync/syncer.go @@ -21,13 +21,18 @@ type SyncerImpl struct { manager *mesh.MeshManager requester SyncRequester authenticatedNodes []crdt.MeshNodeCrdt + infectionCount int } const subSetLength = 3 +const infectionCount = 3 // Sync: Sync random nodes func (s *SyncerImpl) Sync(meshId string) error { - if !s.manager.HasChanges(meshId) { + logging.Log.WriteInfof("UPDATING WG CONF") + s.manager.ApplyConfig() + + if !s.manager.HasChanges(meshId) && s.infectionCount == 0 { logging.Log.WriteInfof("No changes for %s", meshId) return nil } @@ -76,6 +81,9 @@ func (s *SyncerImpl) Sync(meshId string) error { waitGroup.Wait() logging.Log.WriteInfof("SYNC TIME: %v", time.Now().Sub(before)) + + s.infectionCount = ((infectionCount + s.infectionCount - 1) % infectionCount) + return nil } @@ -93,5 +101,5 @@ func (s *SyncerImpl) SyncMeshes() error { } func NewSyncer(m *mesh.MeshManager, r SyncRequester) Syncer { - return &SyncerImpl{manager: m, requester: r} + return &SyncerImpl{manager: m, requester: r, infectionCount: 0} } diff --git a/pkg/wg/wg.go b/pkg/wg/wg.go index accf93f..f080563 100644 --- a/pkg/wg/wg.go +++ b/pkg/wg/wg.go @@ -1,6 +1,7 @@ package wg import ( + "errors" "fmt" "net" "os/exec" @@ -75,19 +76,29 @@ func flushInterface(ifName string) error { // EnableInterface flushes the interface and sets the ip address of the // interface func (m *WgInterfaceManipulatorImpl) EnableInterface(ifName string, ip string) error { + if len(ifName) == 0 { + return errors.New("ifName not provided") + } + err := flushInterface(ifName) if err != nil { return err } + cmd := exec.Command("/usr/bin/ip", "link", "set", "up", "dev", ifName) + + if err := cmd.Run(); err != nil { + return err + } + hostIp, _, err := net.ParseCIDR(ip) if err != nil { return err } - cmd := exec.Command("/usr/bin/ip", "addr", "add", hostIp.String()+"/64", "dev", ifName) + cmd = exec.Command("/usr/bin/ip", "addr", "add", hostIp.String()+"/64", "dev", ifName) if err := cmd.Run(); err != nil { return err