[client] Set info logs (#3504)

collect and log connection stats per peer every 10 minutes
This commit is contained in:
Zoltan Papp 2025-03-14 22:34:41 +01:00 committed by GitHub
parent a2faae5d62
commit 6f82e96d6a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 147 additions and 18 deletions

View File

@ -114,6 +114,9 @@ type Conn struct {
guard *guard.Guard guard *guard.Guard
semaphore *semaphoregroup.SemaphoreGroup semaphore *semaphoregroup.SemaphoreGroup
// debug purpose
dumpState *stateDump
} }
// NewConn creates a new not opened Conn to the remote peer. // NewConn creates a new not opened Conn to the remote peer.
@ -140,7 +143,7 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu
} }
ctrl := isController(config) ctrl := isController(config)
conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, conn, relayManager) conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, conn, relayManager, conn.dumpState)
relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally() relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
workerICE, err := NewWorkerICE(ctx, connLog, config, conn, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally) workerICE, err := NewWorkerICE(ctx, connLog, config, conn, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally)
@ -160,6 +163,8 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu
go conn.handshaker.Listen() go conn.handshaker.Listen()
conn.dumpState = newStateDump(connLog)
go conn.dumpState.Start(ctx)
return conn, nil return conn, nil
} }
@ -193,6 +198,7 @@ func (conn *Conn) startHandshakeAndReconnect(ctx context.Context) {
defer conn.semaphore.Done(conn.ctx) defer conn.semaphore.Done(conn.ctx)
conn.waitInitialRandomSleepTime(ctx) conn.waitInitialRandomSleepTime(ctx)
conn.dumpState.SendOffer()
err := conn.handshaker.sendOffer() err := conn.handshaker.sendOffer()
if err != nil { if err != nil {
conn.log.Errorf("failed to send initial offer: %v", err) conn.log.Errorf("failed to send initial offer: %v", err)
@ -251,12 +257,14 @@ func (conn *Conn) Close() {
// OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise // OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise
// doesn't block, discards the message if connection wasn't ready // doesn't block, discards the message if connection wasn't ready
func (conn *Conn) OnRemoteAnswer(answer OfferAnswer) bool { func (conn *Conn) OnRemoteAnswer(answer OfferAnswer) bool {
conn.log.Debugf("OnRemoteAnswer, status ICE: %s, status relay: %s", conn.statusICE, conn.statusRelay) conn.dumpState.RemoteAnswer()
conn.log.Infof("OnRemoteAnswer, status ICE: %s, status relay: %s", conn.statusICE, conn.statusRelay)
return conn.handshaker.OnRemoteAnswer(answer) return conn.handshaker.OnRemoteAnswer(answer)
} }
// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer. // OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer.
func (conn *Conn) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HAMap) { func (conn *Conn) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HAMap) {
conn.dumpState.RemoteCandidate()
conn.workerICE.OnRemoteCandidate(candidate, haRoutes) conn.workerICE.OnRemoteCandidate(candidate, haRoutes)
} }
@ -278,7 +286,8 @@ func (conn *Conn) SetOnDisconnected(handler func(remotePeer string)) {
} }
func (conn *Conn) OnRemoteOffer(offer OfferAnswer) bool { func (conn *Conn) OnRemoteOffer(offer OfferAnswer) bool {
conn.log.Debugf("OnRemoteOffer, on status ICE: %s, status Relay: %s", conn.statusICE, conn.statusRelay) conn.dumpState.RemoteOffer()
conn.log.Infof("OnRemoteOffer, on status ICE: %s, status Relay: %s", conn.statusICE, conn.statusRelay)
return conn.handshaker.OnRemoteOffer(offer) return conn.handshaker.OnRemoteOffer(offer)
} }
@ -322,6 +331,7 @@ func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEC
} }
conn.log.Infof("set ICE to active connection") conn.log.Infof("set ICE to active connection")
conn.dumpState.P2PConnected()
var ( var (
ep *net.UDPAddr ep *net.UDPAddr
@ -329,6 +339,7 @@ func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEC
err error err error
) )
if iceConnInfo.RelayedOnLocal { if iceConnInfo.RelayedOnLocal {
conn.dumpState.NewLocalProxy()
wgProxy, err = conn.newProxy(iceConnInfo.RemoteConn) wgProxy, err = conn.newProxy(iceConnInfo.RemoteConn)
if err != nil { if err != nil {
conn.log.Errorf("failed to add turn net.Conn to local proxy: %v", err) conn.log.Errorf("failed to add turn net.Conn to local proxy: %v", err)
@ -390,6 +401,7 @@ func (conn *Conn) onICEStateDisconnected() {
// switch back to relay connection // switch back to relay connection
if conn.isReadyToUpgrade() { if conn.isReadyToUpgrade() {
conn.log.Infof("ICE disconnected, set Relay to active connection") conn.log.Infof("ICE disconnected, set Relay to active connection")
conn.dumpState.SwitchToRelay()
conn.wgProxyRelay.Work() conn.wgProxyRelay.Work()
if err := conn.configureWGEndpoint(conn.wgProxyRelay.EndpointAddr()); err != nil { if err := conn.configureWGEndpoint(conn.wgProxyRelay.EndpointAddr()); err != nil {
@ -432,6 +444,7 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
return return
} }
conn.dumpState.RelayConnected()
conn.log.Debugf("Relay connection has been established, setup the WireGuard") conn.log.Debugf("Relay connection has been established, setup the WireGuard")
wgProxy, err := conn.newProxy(rci.relayedConn) wgProxy, err := conn.newProxy(rci.relayedConn)
@ -439,6 +452,7 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
conn.log.Errorf("failed to add relayed net.Conn to local proxy: %v", err) conn.log.Errorf("failed to add relayed net.Conn to local proxy: %v", err)
return return
} }
conn.dumpState.NewLocalProxy()
conn.log.Infof("created new wgProxy for relay connection: %s", wgProxy.EndpointAddr().String()) conn.log.Infof("created new wgProxy for relay connection: %s", wgProxy.EndpointAddr().String())
@ -481,10 +495,10 @@ func (conn *Conn) onRelayDisconnected() {
return return
} }
conn.log.Debugf("relay connection is disconnected") conn.log.Infof("relay connection is disconnected")
if conn.currentConnPriority == connPriorityRelay { if conn.currentConnPriority == connPriorityRelay {
conn.log.Debugf("clean up WireGuard config") conn.log.Infof("clean up WireGuard config")
if err := conn.removeWgPeer(); err != nil { if err := conn.removeWgPeer(); err != nil {
conn.log.Errorf("failed to remove wg endpoint: %v", err) conn.log.Errorf("failed to remove wg endpoint: %v", err)
} }
@ -516,7 +530,8 @@ func (conn *Conn) listenGuardEvent(ctx context.Context) {
for { for {
select { select {
case <-conn.guard.Reconnect: case <-conn.guard.Reconnect:
conn.log.Debugf("send offer to peer") conn.log.Infof("send offer to peer")
conn.dumpState.SendOffer()
if err := conn.handshaker.SendOffer(); err != nil { if err := conn.handshaker.SendOffer(); err != nil {
conn.log.Errorf("failed to send offer: %v", err) conn.log.Errorf("failed to send offer: %v", err)
} }

View File

@ -76,19 +76,19 @@ func (h *Handshaker) AddOnNewOfferListener(offer func(remoteOfferAnswer *OfferAn
func (h *Handshaker) Listen() { func (h *Handshaker) Listen() {
for { for {
h.log.Debugf("wait for remote offer confirmation") h.log.Info("wait for remote offer confirmation")
remoteOfferAnswer, err := h.waitForRemoteOfferConfirmation() remoteOfferAnswer, err := h.waitForRemoteOfferConfirmation()
if err != nil { if err != nil {
var connectionClosedError *ConnectionClosedError var connectionClosedError *ConnectionClosedError
if errors.As(err, &connectionClosedError) { if errors.As(err, &connectionClosedError) {
h.log.Tracef("stop handshaker") h.log.Info("exit from handshaker")
return return
} }
h.log.Errorf("failed to received remote offer confirmation: %s", err) h.log.Errorf("failed to received remote offer confirmation: %s", err)
continue continue
} }
h.log.Debugf("received connection confirmation, running version %s and with remote WireGuard listen port %d", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort) h.log.Infof("received connection confirmation, running version %s and with remote WireGuard listen port %d", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort)
for _, listener := range h.onNewOfferListeners { for _, listener := range h.onNewOfferListeners {
go listener(remoteOfferAnswer) go listener(remoteOfferAnswer)
} }
@ -108,7 +108,7 @@ func (h *Handshaker) OnRemoteOffer(offer OfferAnswer) bool {
case h.remoteOffersCh <- offer: case h.remoteOffersCh <- offer:
return true return true
default: default:
h.log.Debugf("OnRemoteOffer skipping message because is not ready") h.log.Warnf("OnRemoteOffer skipping message because is not ready")
// connection might not be ready yet to receive so we ignore the message // connection might not be ready yet to receive so we ignore the message
return false return false
} }
@ -131,8 +131,7 @@ func (h *Handshaker) waitForRemoteOfferConfirmation() (*OfferAnswer, error) {
select { select {
case remoteOfferAnswer := <-h.remoteOffersCh: case remoteOfferAnswer := <-h.remoteOffersCh:
// received confirmation from the remote peer -> ready to proceed // received confirmation from the remote peer -> ready to proceed
err := h.sendAnswer() if err := h.sendAnswer(); err != nil {
if err != nil {
return nil, err return nil, err
} }
return &remoteOfferAnswer, nil return &remoteOfferAnswer, nil
@ -168,7 +167,7 @@ func (h *Handshaker) sendOffer() error {
} }
func (h *Handshaker) sendAnswer() error { func (h *Handshaker) sendAnswer() error {
h.log.Debugf("sending answer") h.log.Infof("sending answer")
uFrag, pwd := h.ice.GetLocalUserCredentials() uFrag, pwd := h.ice.GetLocalUserCredentials()
answer := OfferAnswer{ answer := OfferAnswer{

View File

@ -0,0 +1,112 @@
package peer
import (
"context"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
type stateDump struct {
log *log.Entry
sentOffer int
remoteOffer int
remoteAnswer int
remoteCandidate int
p2pConnected int
switchToRelay int
wgCheckSuccess int
relayConnected int
localProxies int
mu sync.Mutex
}
func newStateDump(log *log.Entry) *stateDump {
return &stateDump{
log: log,
}
}
func (s *stateDump) Start(ctx context.Context) {
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.dumpState()
case <-ctx.Done():
return
}
}
}
func (s *stateDump) RemoteOffer() {
s.mu.Lock()
defer s.mu.Unlock()
s.remoteOffer++
}
func (s *stateDump) RemoteCandidate() {
s.mu.Lock()
defer s.mu.Unlock()
s.remoteCandidate++
}
func (s *stateDump) SendOffer() {
s.mu.Lock()
defer s.mu.Unlock()
s.sentOffer++
}
func (s *stateDump) dumpState() {
s.mu.Lock()
defer s.mu.Unlock()
s.log.Infof("Dump stat: SentOffer: %d, RemoteOffer: %d, RemoteAnswer: %d, RemoteCandidate: %d, P2PConnected: %d, SwitchToRelay: %d, WGCheckSuccess: %d, RelayConnected: %d, LocalProxies: %d",
s.sentOffer, s.remoteOffer, s.remoteAnswer, s.remoteCandidate, s.p2pConnected, s.switchToRelay, s.wgCheckSuccess, s.relayConnected, s.localProxies)
}
func (s *stateDump) RemoteAnswer() {
s.mu.Lock()
defer s.mu.Unlock()
s.remoteAnswer++
}
func (s *stateDump) P2PConnected() {
s.mu.Lock()
defer s.mu.Unlock()
s.p2pConnected++
}
func (s *stateDump) SwitchToRelay() {
s.mu.Lock()
defer s.mu.Unlock()
s.switchToRelay++
}
func (s *stateDump) WGcheckSuccess() {
s.mu.Lock()
defer s.mu.Unlock()
s.wgCheckSuccess++
}
func (s *stateDump) RelayConnected() {
s.mu.Lock()
defer s.mu.Unlock()
s.relayConnected++
}
func (s *stateDump) NewLocalProxy() {
s.mu.Lock()
defer s.mu.Unlock()
s.localProxies++
}

View File

@ -27,6 +27,7 @@ type WGWatcher struct {
log *log.Entry log *log.Entry
wgIfaceStater WGInterfaceStater wgIfaceStater WGInterfaceStater
peerKey string peerKey string
stateDump *stateDump
ctx context.Context ctx context.Context
ctxCancel context.CancelFunc ctxCancel context.CancelFunc
@ -34,11 +35,12 @@ type WGWatcher struct {
waitGroup sync.WaitGroup waitGroup sync.WaitGroup
} }
func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey string) *WGWatcher { func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey string, stateDump *stateDump) *WGWatcher {
return &WGWatcher{ return &WGWatcher{
log: log, log: log,
wgIfaceStater: wgIfaceStater, wgIfaceStater: wgIfaceStater,
peerKey: peerKey, peerKey: peerKey,
stateDump: stateDump,
} }
} }
@ -105,6 +107,7 @@ func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, ctxCancel contex
resetTime := time.Until(handshake.Add(checkPeriod)) resetTime := time.Until(handshake.Add(checkPeriod))
timer.Reset(resetTime) timer.Reset(resetTime)
w.stateDump.WGcheckSuccess()
w.log.Debugf("WireGuard watcher reset timer: %v", resetTime) w.log.Debugf("WireGuard watcher reset timer: %v", resetTime)
case <-ctx.Done(): case <-ctx.Done():

View File

@ -43,7 +43,7 @@ func TestWGWatcher_EnableWgWatcher(t *testing.T) {
mlog := log.WithField("peer", "tet") mlog := log.WithField("peer", "tet")
mocWgIface := &MocWgIface{} mocWgIface := &MocWgIface{}
watcher := NewWGWatcher(mlog, mocWgIface, "") watcher := NewWGWatcher(mlog, mocWgIface, "", newStateDump(mlog))
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@ -72,7 +72,7 @@ func TestWGWatcher_ReEnable(t *testing.T) {
mlog := log.WithField("peer", "tet") mlog := log.WithField("peer", "tet")
mocWgIface := &MocWgIface{} mocWgIface := &MocWgIface{}
watcher := NewWGWatcher(mlog, mocWgIface, "") watcher := NewWGWatcher(mlog, mocWgIface, "", newStateDump(mlog))
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()

View File

@ -33,14 +33,14 @@ type WorkerRelay struct {
wgWatcher *WGWatcher wgWatcher *WGWatcher
} }
func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, conn *Conn, relayManager relayClient.ManagerService) *WorkerRelay { func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, conn *Conn, relayManager relayClient.ManagerService, stateDump *stateDump) *WorkerRelay {
r := &WorkerRelay{ r := &WorkerRelay{
log: log, log: log,
isController: ctrl, isController: ctrl,
config: config, config: config,
conn: conn, conn: conn,
relayManager: relayManager, relayManager: relayManager,
wgWatcher: NewWGWatcher(log, config.WgConfig.WgInterface, config.Key), wgWatcher: NewWGWatcher(log, config.WgConfig.WgInterface, config.Key, stateDump),
} }
return r return r
} }