[relay] Add health check attempt threshold (#2609)

* Add health check attempt threshold for receiver

* Add health check attempt threshold for sender
This commit is contained in:
Maycon Santos 2024-09-17 10:04:17 +02:00 committed by GitHub
parent b74951f29e
commit 5bc601111d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 240 additions and 29 deletions

View File

@ -306,7 +306,7 @@ func (c *Client) handShake() error {
func (c *Client) readLoop(relayConn net.Conn) { func (c *Client) readLoop(relayConn net.Conn) {
internallyStoppedFlag := newInternalStopFlag() internallyStoppedFlag := newInternalStopFlag()
hc := healthcheck.NewReceiver() hc := healthcheck.NewReceiver(c.log)
go c.listenForStopEvents(hc, relayConn, internallyStoppedFlag) go c.listenForStopEvents(hc, relayConn, internallyStoppedFlag)
var ( var (

View File

@ -3,6 +3,8 @@ package healthcheck
import ( import (
"context" "context"
"time" "time"
log "github.com/sirupsen/logrus"
) )
var ( var (
@ -15,22 +17,25 @@ var (
// The heartbeat timeout is a bit longer than the sender's healthcheck interval // The heartbeat timeout is a bit longer than the sender's healthcheck interval
type Receiver struct { type Receiver struct {
OnTimeout chan struct{} OnTimeout chan struct{}
log *log.Entry
ctx context.Context ctx context.Context
ctxCancel context.CancelFunc ctxCancel context.CancelFunc
heartbeat chan struct{} heartbeat chan struct{}
alive bool alive bool
attemptThreshold int
} }
// NewReceiver creates a new healthcheck receiver and start the timer in the background // NewReceiver creates a new healthcheck receiver and start the timer in the background
func NewReceiver() *Receiver { func NewReceiver(log *log.Entry) *Receiver {
ctx, ctxCancel := context.WithCancel(context.Background()) ctx, ctxCancel := context.WithCancel(context.Background())
r := &Receiver{ r := &Receiver{
OnTimeout: make(chan struct{}, 1), OnTimeout: make(chan struct{}, 1),
log: log,
ctx: ctx, ctx: ctx,
ctxCancel: ctxCancel, ctxCancel: ctxCancel,
heartbeat: make(chan struct{}, 1), heartbeat: make(chan struct{}, 1),
attemptThreshold: getAttemptThresholdFromEnv(),
} }
go r.waitForHealthcheck() go r.waitForHealthcheck()
@ -56,16 +61,23 @@ func (r *Receiver) waitForHealthcheck() {
defer r.ctxCancel() defer r.ctxCancel()
defer close(r.OnTimeout) defer close(r.OnTimeout)
failureCounter := 0
for { for {
select { select {
case <-r.heartbeat: case <-r.heartbeat:
r.alive = true r.alive = true
failureCounter = 0
case <-ticker.C: case <-ticker.C:
if r.alive { if r.alive {
r.alive = false r.alive = false
continue continue
} }
failureCounter++
if failureCounter < r.attemptThreshold {
r.log.Warnf("healthcheck failed, attempt %d", failureCounter)
continue
}
r.notifyTimeout() r.notifyTimeout()
return return
case <-r.ctx.Done(): case <-r.ctx.Done():

View File

@ -1,13 +1,18 @@
package healthcheck package healthcheck
import ( import (
"context"
"fmt"
"os"
"testing" "testing"
"time" "time"
log "github.com/sirupsen/logrus"
) )
func TestNewReceiver(t *testing.T) { func TestNewReceiver(t *testing.T) {
heartbeatTimeout = 5 * time.Second heartbeatTimeout = 5 * time.Second
r := NewReceiver() r := NewReceiver(log.WithContext(context.Background()))
select { select {
case <-r.OnTimeout: case <-r.OnTimeout:
@ -19,7 +24,7 @@ func TestNewReceiver(t *testing.T) {
func TestNewReceiverNotReceive(t *testing.T) { func TestNewReceiverNotReceive(t *testing.T) {
heartbeatTimeout = 1 * time.Second heartbeatTimeout = 1 * time.Second
r := NewReceiver() r := NewReceiver(log.WithContext(context.Background()))
select { select {
case <-r.OnTimeout: case <-r.OnTimeout:
@ -30,7 +35,7 @@ func TestNewReceiverNotReceive(t *testing.T) {
func TestNewReceiverAck(t *testing.T) { func TestNewReceiverAck(t *testing.T) {
heartbeatTimeout = 2 * time.Second heartbeatTimeout = 2 * time.Second
r := NewReceiver() r := NewReceiver(log.WithContext(context.Background()))
r.Heartbeat() r.Heartbeat()
@ -40,3 +45,53 @@ func TestNewReceiverAck(t *testing.T) {
case <-time.After(3 * time.Second): case <-time.After(3 * time.Second):
} }
} }
func TestReceiverHealthCheckAttemptThreshold(t *testing.T) {
testsCases := []struct {
name string
threshold int
resetCounterOnce bool
}{
{"Default attempt threshold", defaultAttemptThreshold, false},
{"Custom attempt threshold", 3, false},
{"Should reset threshold once", 2, true},
}
for _, tc := range testsCases {
t.Run(tc.name, func(t *testing.T) {
originalInterval := healthCheckInterval
originalTimeout := heartbeatTimeout
healthCheckInterval = 1 * time.Second
heartbeatTimeout = healthCheckInterval + 500*time.Millisecond
defer func() {
healthCheckInterval = originalInterval
heartbeatTimeout = originalTimeout
}()
//nolint:tenv
os.Setenv(defaultAttemptThresholdEnv, fmt.Sprintf("%d", tc.threshold))
defer os.Unsetenv(defaultAttemptThresholdEnv)
receiver := NewReceiver(log.WithField("test_name", tc.name))
testTimeout := heartbeatTimeout*time.Duration(tc.threshold) + healthCheckInterval
if tc.resetCounterOnce {
receiver.Heartbeat()
t.Logf("reset counter once")
}
select {
case <-receiver.OnTimeout:
if tc.resetCounterOnce {
t.Fatalf("should not have timed out before %s", testTimeout)
}
case <-time.After(testTimeout):
if tc.resetCounterOnce {
return
}
t.Fatalf("should have timed out before %s", testTimeout)
}
})
}
}

View File

@ -2,7 +2,16 @@ package healthcheck
import ( import (
"context" "context"
"os"
"strconv"
"time" "time"
log "github.com/sirupsen/logrus"
)
const (
defaultAttemptThreshold = 1
defaultAttemptThresholdEnv = "NB_RELAY_HC_ATTEMPT_THRESHOLD"
) )
var ( var (
@ -15,20 +24,25 @@ var (
// If the receiver does not receive the signal in a certain time, it will send a timeout signal and stop to work // If the receiver does not receive the signal in a certain time, it will send a timeout signal and stop to work
// It will also stop if the context is canceled // It will also stop if the context is canceled
type Sender struct { type Sender struct {
log *log.Entry
// HealthCheck is a channel to send health check signal to the peer // HealthCheck is a channel to send health check signal to the peer
HealthCheck chan struct{} HealthCheck chan struct{}
// Timeout is a channel to the health check signal is not received in a certain time // Timeout is a channel to the health check signal is not received in a certain time
Timeout chan struct{} Timeout chan struct{}
ack chan struct{} ack chan struct{}
alive bool
attemptThreshold int
} }
// NewSender creates a new healthcheck sender // NewSender creates a new healthcheck sender
func NewSender() *Sender { func NewSender(log *log.Entry) *Sender {
hc := &Sender{ hc := &Sender{
log: log,
HealthCheck: make(chan struct{}, 1), HealthCheck: make(chan struct{}, 1),
Timeout: make(chan struct{}, 1), Timeout: make(chan struct{}, 1),
ack: make(chan struct{}, 1), ack: make(chan struct{}, 1),
attemptThreshold: getAttemptThresholdFromEnv(),
} }
return hc return hc
@ -46,23 +60,51 @@ func (hc *Sender) StartHealthCheck(ctx context.Context) {
ticker := time.NewTicker(healthCheckInterval) ticker := time.NewTicker(healthCheckInterval)
defer ticker.Stop() defer ticker.Stop()
timeoutTimer := time.NewTimer(healthCheckInterval + healthCheckTimeout) timeoutTicker := time.NewTicker(hc.getTimeoutTime())
defer timeoutTimer.Stop() defer timeoutTicker.Stop()
defer close(hc.HealthCheck) defer close(hc.HealthCheck)
defer close(hc.Timeout) defer close(hc.Timeout)
failureCounter := 0
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
hc.HealthCheck <- struct{}{} hc.HealthCheck <- struct{}{}
case <-timeoutTimer.C: case <-timeoutTicker.C:
if hc.alive {
hc.alive = false
continue
}
failureCounter++
if failureCounter < hc.attemptThreshold {
hc.log.Warnf("Health check failed attempt %d.", failureCounter)
continue
}
hc.Timeout <- struct{}{} hc.Timeout <- struct{}{}
return return
case <-hc.ack: case <-hc.ack:
timeoutTimer.Reset(healthCheckInterval + healthCheckTimeout) failureCounter = 0
hc.alive = true
case <-ctx.Done(): case <-ctx.Done():
return return
} }
} }
} }
func (hc *Sender) getTimeoutTime() time.Duration {
return healthCheckInterval + healthCheckTimeout
}
func getAttemptThresholdFromEnv() int {
if attemptThreshold := os.Getenv(defaultAttemptThresholdEnv); attemptThreshold != "" {
threshold, err := strconv.ParseInt(attemptThreshold, 10, 64)
if err != nil {
log.Errorf("Failed to parse attempt threshold from environment variable \"%s\" should be an integer. Using default value", attemptThreshold)
return defaultAttemptThreshold
}
return int(threshold)
}
return defaultAttemptThreshold
}

View File

@ -2,9 +2,12 @@ package healthcheck
import ( import (
"context" "context"
"fmt"
"os" "os"
"testing" "testing"
"time" "time"
log "github.com/sirupsen/logrus"
) )
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
@ -18,7 +21,7 @@ func TestMain(m *testing.M) {
func TestNewHealthPeriod(t *testing.T) { func TestNewHealthPeriod(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
hc := NewSender() hc := NewSender(log.WithContext(ctx))
go hc.StartHealthCheck(ctx) go hc.StartHealthCheck(ctx)
iterations := 0 iterations := 0
@ -38,7 +41,7 @@ func TestNewHealthPeriod(t *testing.T) {
func TestNewHealthFailed(t *testing.T) { func TestNewHealthFailed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
hc := NewSender() hc := NewSender(log.WithContext(ctx))
go hc.StartHealthCheck(ctx) go hc.StartHealthCheck(ctx)
select { select {
@ -50,7 +53,7 @@ func TestNewHealthFailed(t *testing.T) {
func TestNewHealthcheckStop(t *testing.T) { func TestNewHealthcheckStop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
hc := NewSender() hc := NewSender(log.WithContext(ctx))
go hc.StartHealthCheck(ctx) go hc.StartHealthCheck(ctx)
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
@ -75,7 +78,7 @@ func TestNewHealthcheckStop(t *testing.T) {
func TestTimeoutReset(t *testing.T) { func TestTimeoutReset(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
hc := NewSender() hc := NewSender(log.WithContext(ctx))
go hc.StartHealthCheck(ctx) go hc.StartHealthCheck(ctx)
iterations := 0 iterations := 0
@ -101,3 +104,102 @@ func TestTimeoutReset(t *testing.T) {
t.Fatalf("is not exited") t.Fatalf("is not exited")
} }
} }
func TestSenderHealthCheckAttemptThreshold(t *testing.T) {
testsCases := []struct {
name string
threshold int
resetCounterOnce bool
}{
{"Default attempt threshold", defaultAttemptThreshold, false},
{"Custom attempt threshold", 3, false},
{"Should reset threshold once", 2, true},
}
for _, tc := range testsCases {
t.Run(tc.name, func(t *testing.T) {
originalInterval := healthCheckInterval
originalTimeout := healthCheckTimeout
healthCheckInterval = 1 * time.Second
healthCheckTimeout = 500 * time.Millisecond
defer func() {
healthCheckInterval = originalInterval
healthCheckTimeout = originalTimeout
}()
//nolint:tenv
os.Setenv(defaultAttemptThresholdEnv, fmt.Sprintf("%d", tc.threshold))
defer os.Unsetenv(defaultAttemptThresholdEnv)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sender := NewSender(log.WithField("test_name", tc.name))
go sender.StartHealthCheck(ctx)
go func() {
responded := false
for {
select {
case <-ctx.Done():
return
case _, ok := <-sender.HealthCheck:
if !ok {
return
}
if tc.resetCounterOnce && !responded {
responded = true
sender.OnHCResponse()
}
}
}
}()
testTimeout := sender.getTimeoutTime()*time.Duration(tc.threshold) + healthCheckInterval
select {
case <-sender.Timeout:
if tc.resetCounterOnce {
t.Fatalf("should not have timed out before %s", testTimeout)
}
case <-time.After(testTimeout):
if tc.resetCounterOnce {
return
}
t.Fatalf("should have timed out before %s", testTimeout)
}
})
}
}
//nolint:tenv
func TestGetAttemptThresholdFromEnv(t *testing.T) {
tests := []struct {
name string
envValue string
expected int
}{
{"Default attempt threshold when env is not set", "", defaultAttemptThreshold},
{"Custom attempt threshold when env is set to a valid integer", "3", 3},
{"Default attempt threshold when env is set to an invalid value", "invalid", defaultAttemptThreshold},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.envValue == "" {
os.Unsetenv(defaultAttemptThresholdEnv)
} else {
os.Setenv(defaultAttemptThresholdEnv, tt.envValue)
}
result := getAttemptThresholdFromEnv()
if result != tt.expected {
t.Fatalf("Expected %d, got %d", tt.expected, result)
}
os.Unsetenv(defaultAttemptThresholdEnv)
})
}
}

View File

@ -49,7 +49,7 @@ func (p *Peer) Work() {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
hc := healthcheck.NewSender() hc := healthcheck.NewSender(p.log)
go hc.StartHealthCheck(ctx) go hc.StartHealthCheck(ctx)
go p.handleHealthcheckEvents(ctx, hc) go p.handleHealthcheckEvents(ctx, hc)