- Fixed problem where connection not removed on error
This commit is contained in:
Tim Beatham 2023-12-29 11:12:40 +00:00
parent 2d5df25b1d
commit 352648b7cb
4 changed files with 17 additions and 27 deletions

View File

@ -42,25 +42,8 @@ func NewWgCtrlConnection(clientConfig *tls.Config, server string) (PeerConnectio
// ConnectWithToken: Connects to a new gRPC peer given the address of the other server.
func (c *WgCtrlConnection) CreateGrpcConnection() error {
retryPolicy := `{
"methodConfig": [{
"name": [
{"service": "syncservice.SyncService"},
{"service": "ctrlserver.MeshCtrlServer"}
],
"waitForReady": true,
"retryPolicy": {
"MaxAttempts": 2,
"InitialBackoff": ".1s",
"MaxBackoff": ".1s",
"BackoffMultiplier": 1.0,
"RetryableStatusCodes": [ "UNAVAILABLE", "DEADLINE_EXCEEDED", "UNKNOWN" ]
}
}]}`
conn, err := grpc.Dial(c.endpoint,
grpc.WithTransportCredentials(credentials.NewTLS(c.clientConfig)),
grpc.WithDefaultServiceConfig(retryPolicy))
grpc.WithTransportCredentials(credentials.NewTLS(c.clientConfig)))
if err != nil {
logging.Log.WriteErrorf("Could not connect: %s\n", err.Error())

View File

@ -66,12 +66,14 @@ func (s *SyncerImpl) Sync(meshId string) error {
// Clients always pings its peer for configuration
if self != nil && self.GetType() == conf.CLIENT_ROLE && len(nodeNames) > 1 {
keyFunc := lib.HashString
bucketFunc := lib.HashString
neighbours := s.cluster.GetNeighbours(nodeNames, publicKey.String())
neighbour := lib.ConsistentHash(nodeNames, publicKey.String(), keyFunc, bucketFunc)
gossipNodes = make([]string, 1)
gossipNodes[0] = neighbour
if len(neighbours) == 0 {
return nil
}
redundancyLength := min(len(neighbours), 3)
gossipNodes = neighbours[:redundancyLength]
} else {
neighbours := s.cluster.GetNeighbours(nodeNames, publicKey.String())
gossipNodes = lib.RandomSubsetOfLength(neighbours, s.conf.BranchRate)

View File

@ -22,11 +22,16 @@ type SyncErrorHandlerImpl struct {
func (s *SyncErrorHandlerImpl) handleFailed(meshId string, nodeId string) bool {
mesh := s.meshManager.GetMesh(meshId)
mesh.Mark(nodeId)
node, err := mesh.GetNode(nodeId)
if err != nil {
s.connManager.RemoveConnection(node.GetHostEndpoint())
}
return true
}
func (s *SyncErrorHandlerImpl) handleDeadlineExceeded(meshId string, nodeId string) bool {
mesh := s.meshManager.GetMesh(nodeId)
mesh := s.meshManager.GetMesh(meshId)
if mesh == nil {
return true
@ -57,6 +62,6 @@ func (s *SyncErrorHandlerImpl) Handle(meshId string, nodeId string, err error) b
return false
}
func NewSyncErrorHandler(m mesh.MeshManager) SyncErrorHandler {
return &SyncErrorHandlerImpl{meshManager: m}
func NewSyncErrorHandler(m mesh.MeshManager, conn conn.ConnectionManager) SyncErrorHandler {
return &SyncErrorHandlerImpl{meshManager: m, connManager: conn}
}

View File

@ -151,6 +151,6 @@ func (s *SyncRequesterImpl) syncMesh(mesh mesh.MeshProvider, ctx context.Context
}
func NewSyncRequester(s *ctrlserver.MeshCtrlServer) SyncRequester {
errorHdlr := NewSyncErrorHandler(s.MeshManager)
errorHdlr := NewSyncErrorHandler(s.MeshManager, s.ConnectionManager)
return &SyncRequesterImpl{server: s, errorHdlr: errorHdlr}
}