diff --git a/client/internal/connect.go b/client/internal/connect.go index 2c0c2657b..49544a921 100644 --- a/client/internal/connect.go +++ b/client/internal/connect.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "net" - "os" "runtime" "runtime/debug" "strings" @@ -309,8 +308,8 @@ func (c *ConnectClient) run( } func relayAddress(resp *mgmProto.LoginResponse) string { - if envRelay := os.Getenv("NB_RELAY_ADDRESS"); envRelay != "" { - return envRelay + if ra := peer.ForcedRelayAddress(); ra != "" { + return ra } if resp.GetWiretrusteeConfig().GetRelayAddress() != "" { diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index f467d5c0e..29218c03a 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -111,9 +111,10 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu } ctx, ctxCancel := context.WithCancel(engineCtx) + connLog := log.WithField("peer", config.Key) var conn = &Conn{ - log: log.WithField("peer", config.Key), + log: connLog, ctx: ctx, ctxCancel: ctxCancel, config: config, @@ -121,7 +122,7 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu wgProxyFactory: wgProxyFactory, signaler: signaler, allowedIPsIP: allowedIPsIP.String(), - handshaker: NewHandshaker(ctx, config, signaler), + handshaker: NewHandshaker(ctx, connLog, config, signaler), statusRelay: StatusDisconnected, statusICE: StatusDisconnected, } @@ -138,8 +139,8 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu DoHandshake: conn.doHandshake, } - conn.workerRelay = NewWorkerRelay(ctx, conn.log, relayManager, config, rFns) - conn.workerICE = NewWorkerICE(ctx, conn.log, config, config.ICEConfig, signaler, iFaceDiscover, statusRecorder, wFns) + conn.workerRelay = NewWorkerRelay(ctx, connLog, relayManager, config, rFns) + conn.workerICE = NewWorkerICE(ctx, connLog, config, config.ICEConfig, signaler, iFaceDiscover, statusRecorder, wFns) return conn, nil } @@ -148,7 +149,7 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu // be used. // todo implement on disconnected event from ICE and relay too. func (conn *Conn) Open() { - conn.log.Debugf("trying to connect to peer") + conn.log.Debugf("open connection to peer") peerState := State{ PubKey: conn.config.Key, @@ -333,12 +334,18 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) { return } + conn.log.Debugf("relay connection is ready") + conn.statusRelay = stateConnected if conn.currentConnType > connPriorityRelay { return } + if conn.currentConnType != 0 { + conn.log.Infof("update connection to Relay type") + } + wgProxy := conn.wgProxyFactory.GetProxy(conn.ctx) endpoint, err := wgProxy.AddTurnConn(rci.relayedConn) if err != nil { @@ -391,12 +398,18 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon return } + conn.log.Debugf("ICE connection is ready") + conn.statusICE = stateConnected if conn.currentConnType > priority { return } + if conn.currentConnType != 0 { + conn.log.Infof("update connection to ICE type") + } + var ( endpoint net.Addr wgProxy wgproxy.Proxy diff --git a/client/internal/peer/env_config.go b/client/internal/peer/env_config.go index 87b626df7..21269168f 100644 --- a/client/internal/peer/env_config.go +++ b/client/internal/peer/env_config.go @@ -16,6 +16,13 @@ const ( envICEForceRelayConn = "NB_ICE_FORCE_RELAY_CONN" ) +func ForcedRelayAddress() string { + if envRelay := os.Getenv("NB_RELAY_ADDRESS"); envRelay != "" { + return envRelay + } + return "" +} + func iceKeepAlive() time.Duration { keepAliveEnv := os.Getenv(envICEKeepAliveIntervalSec) if keepAliveEnv == "" { diff --git a/client/internal/peer/handshaker.go b/client/internal/peer/handshaker.go index 797bbb82e..8e5ad7120 100644 --- a/client/internal/peer/handshaker.go +++ b/client/internal/peer/handshaker.go @@ -57,6 +57,7 @@ type HandshakeArgs struct { type Handshaker struct { mu sync.Mutex ctx context.Context + log *log.Entry config ConnConfig signaler *Signaler @@ -71,9 +72,10 @@ type Handshaker struct { handshakeArgs HandshakeArgs } -func NewHandshaker(ctx context.Context, config ConnConfig, signaler *Signaler) *Handshaker { +func NewHandshaker(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler) *Handshaker { return &Handshaker{ ctx: ctx, + log: log, config: config, signaler: signaler, remoteOffersCh: make(chan OfferAnswer), @@ -92,6 +94,7 @@ func (h *Handshaker) Handshake(args HandshakeArgs) (*OfferAnswer, error) { return cachedOfferAnswer, nil } + h.log.Debugf("send offer") err := h.sendOffer(args) if err != nil { return nil, err @@ -106,8 +109,8 @@ func (h *Handshaker) Handshake(args HandshakeArgs) (*OfferAnswer, error) { } h.storeRemoteOfferAnswer(remoteOfferAnswer) - log.Debugf("received connection confirmation from peer %s running version %s and with remote WireGuard listen port %d", - h.config.Key, remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort) + h.log.Debugf("received connection confirmation, running version %s and with remote WireGuard listen port %d", + remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort) return remoteOfferAnswer, nil } @@ -119,7 +122,7 @@ func (h *Handshaker) OnRemoteOffer(offer OfferAnswer) bool { case h.remoteOffersCh <- offer: return true default: - log.Debugf("OnRemoteOffer skipping message from peer %s because is not ready", h.config.Key) + h.log.Debugf("OnRemoteOffer skipping message because is not ready") // connection might not be ready yet to receive so we ignore the message return false } @@ -133,7 +136,7 @@ func (h *Handshaker) OnRemoteAnswer(answer OfferAnswer) bool { return true default: // connection might not be ready yet to receive so we ignore the message - log.Debugf("OnRemoteAnswer skipping message from peer %s because is not ready", h.config.Key) + h.log.Debugf("OnRemoteAnswer skipping message because is not ready") return false } } @@ -153,7 +156,7 @@ func (h *Handshaker) sendOffer(args HandshakeArgs) error { } func (h *Handshaker) sendAnswer() error { - log.Debugf("sending answer to %s", h.config.Key) + h.log.Debugf("sending answer") answer := OfferAnswer{ IceCredentials: IceCredentials{h.handshakeArgs.IceUFrag, h.handshakeArgs.IcePwd}, WgListenPort: h.config.LocalWgPort, diff --git a/client/internal/peer/worker_ice.go b/client/internal/peer/worker_ice.go index ade33fd1a..5c49b4033 100644 --- a/client/internal/peer/worker_ice.go +++ b/client/internal/peer/worker_ice.go @@ -215,6 +215,7 @@ func (w *WorkerICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HA defer w.muxAgent.Unlock() w.log.Debugf("OnRemoteCandidate from peer %s -> %s", w.config.Key, candidate.String()) if w.agent == nil { + w.log.Warnf("ICE Agent is not initialized yet") return } diff --git a/client/internal/peer/worker_relay.go b/client/internal/peer/worker_relay.go index dce9cb423..25eb41738 100644 --- a/client/internal/peer/worker_relay.go +++ b/client/internal/peer/worker_relay.go @@ -54,12 +54,14 @@ func (w *WorkerRelay) SetupRelayConnection() { if errors.Is(err, ErrSignalIsNotReady) { w.log.Infof("signal client isn't ready, skipping connection attempt") } - w.log.Errorf("failed to do handshake: %v", err) + w.log.Errorf("%s", err) continue } if !w.isRelaySupported(remoteOfferAnswer) { // todo should we retry? + // if the remote peer doesn't support relay make no sense to retry infinity + // but if the remote peer supports relay just the connection is lost we should retry continue } @@ -81,7 +83,7 @@ func (w *WorkerRelay) SetupRelayConnection() { rosenpassAddr: remoteOfferAnswer.RosenpassAddr, }) - // todo: waitForDisconnection() + <-w.ctx.Done() } } diff --git a/relay/client/client.go b/relay/client/client.go index 4e6e63265..d63629030 100644 --- a/relay/client/client.go +++ b/relay/client/client.go @@ -111,6 +111,7 @@ func NewClient(ctx context.Context, serverAddress, peerID string) *Client { // Connect establishes a connection to the relay server. It blocks until the connection is established or an error occurs. func (c *Client) Connect() error { + log.Infof("connecting to relay server: %s", c.serverAddress) c.readLoopMutex.Lock() defer c.readLoopMutex.Unlock() @@ -139,6 +140,7 @@ func (c *Client) Connect() error { c.wgReadLoop.Add(1) go c.readLoop(c.relayConn) + log.Infof("relay connection established with: %s", c.serverAddress) return nil } diff --git a/relay/cmd/main.go b/relay/cmd/main.go index ba3ff47f7..dc5cfe69d 100644 --- a/relay/cmd/main.go +++ b/relay/cmd/main.go @@ -6,13 +6,27 @@ import ( "syscall" log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" "github.com/netbirdio/netbird/relay/server" "github.com/netbirdio/netbird/util" ) +var ( + listenAddress string + + rootCmd = &cobra.Command{ + Use: "relay", + Short: "Relay service", + Long: "Relay service for Netbird agents", + Run: execute, + } +) + func init() { - util.InitLog("trace", "console") + _ = util.InitLog("trace", "console") + rootCmd.PersistentFlags().StringVarP(&listenAddress, "listen-address", "l", ":1235", "listen address") + } func waitForExitSignal() { @@ -21,10 +35,9 @@ func waitForExitSignal() { _ = <-osSigs } -func main() { - address := "10.145.236.1:1235" +func execute(cmd *cobra.Command, args []string) { srv := server.NewServer() - err := srv.Listen(address) + err := srv.Listen(listenAddress) if err != nil { log.Errorf("failed to bind server: %s", err) os.Exit(1) @@ -38,3 +51,10 @@ func main() { os.Exit(1) } } + +func main() { + err := rootCmd.Execute() + if err != nil { + os.Exit(1) + } +}