mirror of
https://github.com/netbirdio/netbird.git
synced 2025-02-23 13:41:19 +01:00
Update calculate server state (#796)
Refactored updateServerStates and calculateState added some checks to ensure we are not sending connecting on context canceled removed some state updates from the RunClient function
This commit is contained in:
parent
8375491708
commit
306e02d32b
@ -59,9 +59,6 @@ func RunClient(ctx context.Context, config *Config, statusRecorder *peer.Status,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
statusRecorder.MarkManagementDisconnected()
|
|
||||||
|
|
||||||
statusRecorder.ClientStart()
|
|
||||||
defer statusRecorder.ClientStop()
|
defer statusRecorder.ClientStop()
|
||||||
operation := func() error {
|
operation := func() error {
|
||||||
// if context cancelled we not start new backoff cycle
|
// if context cancelled we not start new backoff cycle
|
||||||
@ -163,6 +160,8 @@ func RunClient(ctx context.Context, config *Config, statusRecorder *peer.Status,
|
|||||||
log.Print("Netbird engine started, my IP is: ", peerConfig.Address)
|
log.Print("Netbird engine started, my IP is: ", peerConfig.Address)
|
||||||
state.Set(StatusConnected)
|
state.Set(StatusConnected)
|
||||||
|
|
||||||
|
statusRecorder.ClientStart()
|
||||||
|
|
||||||
<-engineCtx.Done()
|
<-engineCtx.Done()
|
||||||
statusRecorder.ClientTeardown()
|
statusRecorder.ClientTeardown()
|
||||||
|
|
||||||
|
@ -15,7 +15,6 @@ type notifier struct {
|
|||||||
serverStateLock sync.Mutex
|
serverStateLock sync.Mutex
|
||||||
listenersLock sync.Mutex
|
listenersLock sync.Mutex
|
||||||
listener Listener
|
listener Listener
|
||||||
currentServerState bool
|
|
||||||
currentClientState bool
|
currentClientState bool
|
||||||
lastNotification int
|
lastNotification int
|
||||||
}
|
}
|
||||||
@ -45,24 +44,14 @@ func (n *notifier) updateServerStates(mgmState bool, signalState bool) {
|
|||||||
n.serverStateLock.Lock()
|
n.serverStateLock.Lock()
|
||||||
defer n.serverStateLock.Unlock()
|
defer n.serverStateLock.Unlock()
|
||||||
|
|
||||||
var newState bool
|
calculatedState := n.calculateState(mgmState, signalState)
|
||||||
if mgmState && signalState {
|
|
||||||
newState = true
|
|
||||||
} else {
|
|
||||||
newState = false
|
|
||||||
}
|
|
||||||
|
|
||||||
if !n.isServerStateChanged(newState) {
|
if !n.isServerStateChanged(calculatedState) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
n.currentServerState = newState
|
n.lastNotification = calculatedState
|
||||||
|
|
||||||
if n.lastNotification == stateDisconnecting {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
n.lastNotification = n.calculateState(newState, n.currentClientState)
|
|
||||||
n.notify(n.lastNotification)
|
n.notify(n.lastNotification)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -70,7 +59,7 @@ func (n *notifier) clientStart() {
|
|||||||
n.serverStateLock.Lock()
|
n.serverStateLock.Lock()
|
||||||
defer n.serverStateLock.Unlock()
|
defer n.serverStateLock.Unlock()
|
||||||
n.currentClientState = true
|
n.currentClientState = true
|
||||||
n.lastNotification = n.calculateState(n.currentServerState, true)
|
n.lastNotification = stateConnected
|
||||||
n.notify(n.lastNotification)
|
n.notify(n.lastNotification)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -78,7 +67,7 @@ func (n *notifier) clientStop() {
|
|||||||
n.serverStateLock.Lock()
|
n.serverStateLock.Lock()
|
||||||
defer n.serverStateLock.Unlock()
|
defer n.serverStateLock.Unlock()
|
||||||
n.currentClientState = false
|
n.currentClientState = false
|
||||||
n.lastNotification = n.calculateState(n.currentServerState, false)
|
n.lastNotification = stateDisconnected
|
||||||
n.notify(n.lastNotification)
|
n.notify(n.lastNotification)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -90,8 +79,8 @@ func (n *notifier) clientTearDown() {
|
|||||||
n.notify(n.lastNotification)
|
n.notify(n.lastNotification)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *notifier) isServerStateChanged(newState bool) bool {
|
func (n *notifier) isServerStateChanged(newState int) bool {
|
||||||
return n.currentServerState != newState
|
return n.lastNotification != newState
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *notifier) notify(state int) {
|
func (n *notifier) notify(state int) {
|
||||||
@ -118,15 +107,19 @@ func (n *notifier) notifyListener(l Listener, state int) {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *notifier) calculateState(serverState bool, clientState bool) int {
|
func (n *notifier) calculateState(managementConn, signalConn bool) int {
|
||||||
if serverState && clientState {
|
if managementConn && signalConn {
|
||||||
return stateConnected
|
return stateConnected
|
||||||
}
|
}
|
||||||
|
|
||||||
if !clientState {
|
if !managementConn && !signalConn {
|
||||||
return stateDisconnected
|
return stateDisconnected
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if n.lastNotification == stateDisconnecting {
|
||||||
|
return stateDisconnecting
|
||||||
|
}
|
||||||
|
|
||||||
return stateConnecting
|
return stateConnecting
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -47,25 +47,24 @@ func Test_notifier_serverState(t *testing.T) {
|
|||||||
|
|
||||||
type scenario struct {
|
type scenario struct {
|
||||||
name string
|
name string
|
||||||
expected bool
|
expected int
|
||||||
mgmState bool
|
mgmState bool
|
||||||
signalState bool
|
signalState bool
|
||||||
}
|
}
|
||||||
scenarios := []scenario{
|
scenarios := []scenario{
|
||||||
{"connected", true, true, true},
|
{"connected", stateConnected, true, true},
|
||||||
{"mgm down", false, false, true},
|
{"mgm down", stateConnecting, false, true},
|
||||||
{"signal down", false, true, false},
|
{"signal down", stateConnecting, true, false},
|
||||||
{"disconnected", false, false, false},
|
{"disconnected", stateDisconnected, false, false},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range scenarios {
|
for _, tt := range scenarios {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
n := newNotifier()
|
n := newNotifier()
|
||||||
n.updateServerStates(tt.mgmState, tt.signalState)
|
n.updateServerStates(tt.mgmState, tt.signalState)
|
||||||
if n.currentServerState != tt.expected {
|
if n.lastNotification != tt.expected {
|
||||||
t.Errorf("invalid serverstate: %t, expected: %t", n.currentServerState, tt.expected)
|
t.Errorf("invalid serverstate: %d, expected: %d", n.lastNotification, tt.expected)
|
||||||
}
|
}
|
||||||
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"google.golang.org/grpc/keepalive"
|
"google.golang.org/grpc/keepalive"
|
||||||
|
|
||||||
"github.com/cenkalti/backoff/v4"
|
"github.com/cenkalti/backoff/v4"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/client/system"
|
"github.com/netbirdio/netbird/client/system"
|
||||||
"github.com/netbirdio/netbird/encryption"
|
"github.com/netbirdio/netbird/encryption"
|
||||||
"github.com/netbirdio/netbird/management/proto"
|
"github.com/netbirdio/netbird/management/proto"
|
||||||
@ -144,15 +145,19 @@ func (c *GrpcClient) Sync(msgHandler func(msg *proto.SyncResponse) error) error
|
|||||||
// blocking until error
|
// blocking until error
|
||||||
err = c.receiveEvents(stream, *serverPubKey, msgHandler)
|
err = c.receiveEvents(stream, *serverPubKey, msgHandler)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied {
|
s, _ := gstatus.FromError(err)
|
||||||
|
switch s.Code() {
|
||||||
|
case codes.PermissionDenied:
|
||||||
return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer
|
return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer
|
||||||
|
case codes.Canceled:
|
||||||
|
log.Debugf("management connection context has been canceled, this usually indicates shutdown")
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
backOff.Reset() // reset backoff counter after successful connection
|
||||||
|
c.notifyDisconnected()
|
||||||
|
log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
// we need this reset because after a successful connection and a consequent error, backoff lib doesn't
|
|
||||||
// reset times and next try will start with a long delay
|
|
||||||
backOff.Reset()
|
|
||||||
c.notifyDisconnected()
|
|
||||||
log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err)
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -4,9 +4,11 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/cenkalti/backoff/v4"
|
"github.com/cenkalti/backoff/v4"
|
||||||
"github.com/netbirdio/netbird/encryption"
|
|
||||||
"github.com/netbirdio/netbird/signal/proto"
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
@ -17,9 +19,9 @@ import (
|
|||||||
"google.golang.org/grpc/keepalive"
|
"google.golang.org/grpc/keepalive"
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
"io"
|
|
||||||
"sync"
|
"github.com/netbirdio/netbird/encryption"
|
||||||
"time"
|
"github.com/netbirdio/netbird/signal/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ConnStateNotifier is a wrapper interface of the status recorder
|
// ConnStateNotifier is a wrapper interface of the status recorder
|
||||||
@ -155,6 +157,10 @@ func (c *GrpcClient) Receive(msgHandler func(msg *proto.Message) error) error {
|
|||||||
// start receiving messages from the Signal stream (from other peers through signal)
|
// start receiving messages from the Signal stream (from other peers through signal)
|
||||||
err = c.receive(stream, msgHandler)
|
err = c.receive(stream, msgHandler)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
|
||||||
|
log.Debugf("signal connection context has been canceled, this usually indicates shutdown")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
// we need this reset because after a successful connection and a consequent error, backoff lib doesn't
|
// we need this reset because after a successful connection and a consequent error, backoff lib doesn't
|
||||||
// reset times and next try will start with a long delay
|
// reset times and next try will start with a long delay
|
||||||
backOff.Reset()
|
backOff.Reset()
|
||||||
|
Loading…
Reference in New Issue
Block a user