From 352648b7cba072c33e69cc23b9a0d00923d2ddcd Mon Sep 17 00:00:00 2001 From: Tim Beatham Date: Fri, 29 Dec 2023 11:12:40 +0000 Subject: [PATCH] main - Fixed problem where connection not removed on error --- pkg/conn/connection.go | 19 +------------------ pkg/sync/syncer.go | 12 +++++++----- pkg/sync/syncererror.go | 11 ++++++++--- pkg/sync/syncrequester.go | 2 +- 4 files changed, 17 insertions(+), 27 deletions(-) diff --git a/pkg/conn/connection.go b/pkg/conn/connection.go index 5c0a17c..c169d45 100644 --- a/pkg/conn/connection.go +++ b/pkg/conn/connection.go @@ -42,25 +42,8 @@ func NewWgCtrlConnection(clientConfig *tls.Config, server string) (PeerConnectio // ConnectWithToken: Connects to a new gRPC peer given the address of the other server. func (c *WgCtrlConnection) CreateGrpcConnection() error { - retryPolicy := `{ - "methodConfig": [{ - "name": [ - {"service": "syncservice.SyncService"}, - {"service": "ctrlserver.MeshCtrlServer"} - ], - "waitForReady": true, - "retryPolicy": { - "MaxAttempts": 2, - "InitialBackoff": ".1s", - "MaxBackoff": ".1s", - "BackoffMultiplier": 1.0, - "RetryableStatusCodes": [ "UNAVAILABLE", "DEADLINE_EXCEEDED", "UNKNOWN" ] - } - }]}` - conn, err := grpc.Dial(c.endpoint, - grpc.WithTransportCredentials(credentials.NewTLS(c.clientConfig)), - grpc.WithDefaultServiceConfig(retryPolicy)) + grpc.WithTransportCredentials(credentials.NewTLS(c.clientConfig))) if err != nil { logging.Log.WriteErrorf("Could not connect: %s\n", err.Error()) diff --git a/pkg/sync/syncer.go b/pkg/sync/syncer.go index 9cefe38..896b33f 100644 --- a/pkg/sync/syncer.go +++ b/pkg/sync/syncer.go @@ -66,12 +66,14 @@ func (s *SyncerImpl) Sync(meshId string) error { // Clients always pings its peer for configuration if self != nil && self.GetType() == conf.CLIENT_ROLE && len(nodeNames) > 1 { - keyFunc := lib.HashString - bucketFunc := lib.HashString + neighbours := s.cluster.GetNeighbours(nodeNames, publicKey.String()) - neighbour := lib.ConsistentHash(nodeNames, publicKey.String(), keyFunc, bucketFunc) - gossipNodes = make([]string, 1) - gossipNodes[0] = neighbour + if len(neighbours) == 0 { + return nil + } + + redundancyLength := min(len(neighbours), 3) + gossipNodes = neighbours[:redundancyLength] } else { neighbours := s.cluster.GetNeighbours(nodeNames, publicKey.String()) gossipNodes = lib.RandomSubsetOfLength(neighbours, s.conf.BranchRate) diff --git a/pkg/sync/syncererror.go b/pkg/sync/syncererror.go index 412113a..7dd1dc3 100644 --- a/pkg/sync/syncererror.go +++ b/pkg/sync/syncererror.go @@ -22,11 +22,16 @@ type SyncErrorHandlerImpl struct { func (s *SyncErrorHandlerImpl) handleFailed(meshId string, nodeId string) bool { mesh := s.meshManager.GetMesh(meshId) mesh.Mark(nodeId) + node, err := mesh.GetNode(nodeId) + + if err != nil { + s.connManager.RemoveConnection(node.GetHostEndpoint()) + } return true } func (s *SyncErrorHandlerImpl) handleDeadlineExceeded(meshId string, nodeId string) bool { - mesh := s.meshManager.GetMesh(nodeId) + mesh := s.meshManager.GetMesh(meshId) if mesh == nil { return true @@ -57,6 +62,6 @@ func (s *SyncErrorHandlerImpl) Handle(meshId string, nodeId string, err error) b return false } -func NewSyncErrorHandler(m mesh.MeshManager) SyncErrorHandler { - return &SyncErrorHandlerImpl{meshManager: m} +func NewSyncErrorHandler(m mesh.MeshManager, conn conn.ConnectionManager) SyncErrorHandler { + return &SyncErrorHandlerImpl{meshManager: m, connManager: conn} } diff --git a/pkg/sync/syncrequester.go b/pkg/sync/syncrequester.go index 0e91f3b..a3fa6da 100644 --- a/pkg/sync/syncrequester.go +++ b/pkg/sync/syncrequester.go @@ -151,6 +151,6 @@ func (s *SyncRequesterImpl) syncMesh(mesh mesh.MeshProvider, ctx context.Context } func NewSyncRequester(s *ctrlserver.MeshCtrlServer) SyncRequester { - errorHdlr := NewSyncErrorHandler(s.MeshManager) + errorHdlr := NewSyncErrorHandler(s.MeshManager, s.ConnectionManager) return &SyncRequesterImpl{server: s, errorHdlr: errorHdlr} }