[client,relay] Add QUIC support (#2962)

This commit is contained in:
Zoltan Papp
2025-01-15 16:28:19 +01:00
committed by GitHub
parent e4a25b6a60
commit 1ffa519387
25 changed files with 943 additions and 33 deletions

View File

@@ -0,0 +1,101 @@
package quic
import (
"context"
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/quic-go/quic-go"
)
type Conn struct {
session quic.Connection
closed bool
closedMu sync.Mutex
ctx context.Context
ctxCancel context.CancelFunc
}
func NewConn(session quic.Connection) *Conn {
ctx, cancel := context.WithCancel(context.Background())
return &Conn{
session: session,
ctx: ctx,
ctxCancel: cancel,
}
}
func (c *Conn) Read(b []byte) (n int, err error) {
dgram, err := c.session.ReceiveDatagram(c.ctx)
if err != nil {
return 0, c.remoteCloseErrHandling(err)
}
// Copy data to b, ensuring we dont exceed the size of b
n = copy(b, dgram)
return n, nil
}
func (c *Conn) Write(b []byte) (int, error) {
if err := c.session.SendDatagram(b); err != nil {
return 0, c.remoteCloseErrHandling(err)
}
return len(b), nil
}
func (c *Conn) LocalAddr() net.Addr {
return c.session.LocalAddr()
}
func (c *Conn) RemoteAddr() net.Addr {
return c.session.RemoteAddr()
}
func (c *Conn) SetReadDeadline(t time.Time) error {
return nil
}
func (c *Conn) SetWriteDeadline(t time.Time) error {
return fmt.Errorf("SetWriteDeadline is not implemented")
}
func (c *Conn) SetDeadline(t time.Time) error {
return fmt.Errorf("SetDeadline is not implemented")
}
func (c *Conn) Close() error {
c.closedMu.Lock()
if c.closed {
c.closedMu.Unlock()
return nil
}
c.closed = true
c.closedMu.Unlock()
c.ctxCancel() // Cancel the context
sessionErr := c.session.CloseWithError(0, "normal closure")
return sessionErr
}
func (c *Conn) isClosed() bool {
c.closedMu.Lock()
defer c.closedMu.Unlock()
return c.closed
}
func (c *Conn) remoteCloseErrHandling(err error) error {
if c.isClosed() {
return net.ErrClosed
}
// Check if the connection was closed remotely
var appErr *quic.ApplicationError
if errors.As(err, &appErr) && appErr.ErrorCode == 0x0 {
return net.ErrClosed
}
return err
}

View File

@@ -0,0 +1,66 @@
package quic
import (
"context"
"crypto/tls"
"errors"
"fmt"
"net"
"github.com/quic-go/quic-go"
log "github.com/sirupsen/logrus"
)
type Listener struct {
// Address is the address to listen on
Address string
// TLSConfig is the TLS configuration for the server
TLSConfig *tls.Config
listener *quic.Listener
acceptFn func(conn net.Conn)
}
func (l *Listener) Listen(acceptFn func(conn net.Conn)) error {
l.acceptFn = acceptFn
quicCfg := &quic.Config{
EnableDatagrams: true,
}
listener, err := quic.ListenAddr(l.Address, l.TLSConfig, quicCfg)
if err != nil {
return fmt.Errorf("failed to create QUIC listener: %v", err)
}
l.listener = listener
log.Infof("QUIC server listening on address: %s", l.Address)
for {
session, err := listener.Accept(context.Background())
if err != nil {
if errors.Is(err, quic.ErrServerClosed) {
return nil
}
log.Errorf("Failed to accept QUIC session: %v", err)
continue
}
log.Infof("QUIC client connected from: %s", session.RemoteAddr())
conn := NewConn(session)
l.acceptFn(conn)
}
}
func (l *Listener) Shutdown(ctx context.Context) error {
if l.listener == nil {
return nil
}
log.Infof("stopping QUIC listener")
if err := l.listener.Close(); err != nil {
return fmt.Errorf("listener shutdown failed: %v", err)
}
log.Infof("QUIC listener stopped")
return nil
}

View File

@@ -88,6 +88,8 @@ func (l *Listener) onAccept(w http.ResponseWriter, r *http.Request) {
return
}
log.Infof("WS client connected from: %s", rAddr)
conn := NewConn(wsConn, lAddr, rAddr)
l.acceptFn(conn)
}

View File

@@ -150,6 +150,8 @@ func (r *Relay) Accept(conn net.Conn) {
func (r *Relay) Shutdown(ctx context.Context) {
log.Infof("close connection with all peers")
r.closeMu.Lock()
defer r.closeMu.Unlock()
wg := sync.WaitGroup{}
peers := r.store.Peers()
for _, peer := range peers {
@@ -161,7 +163,7 @@ func (r *Relay) Shutdown(ctx context.Context) {
}
wg.Wait()
r.metricsCancel()
r.closeMu.Unlock()
r.closed = true
}
// InstanceURL returns the instance URL of the relay server

View File

@@ -3,13 +3,17 @@ package server
import (
"context"
"crypto/tls"
"sync"
log "github.com/sirupsen/logrus"
"github.com/hashicorp/go-multierror"
"go.opentelemetry.io/otel/metric"
nberrors "github.com/netbirdio/netbird/client/errors"
"github.com/netbirdio/netbird/relay/auth"
"github.com/netbirdio/netbird/relay/server/listener"
"github.com/netbirdio/netbird/relay/server/listener/quic"
"github.com/netbirdio/netbird/relay/server/listener/ws"
quictls "github.com/netbirdio/netbird/relay/tls"
)
// ListenerConfig is the configuration for the listener.
@@ -24,8 +28,8 @@ type ListenerConfig struct {
// It is the gate between the WebSocket listener and the Relay server logic.
// In a new HTTP connection, the server will accept the connection and pass it to the Relay server via the Accept method.
type Server struct {
relay *Relay
wSListener listener.Listener
relay *Relay
listeners []listener.Listener
}
// NewServer creates a new relay server instance.
@@ -39,35 +43,63 @@ func NewServer(meter metric.Meter, exposedAddress string, tlsSupport bool, authV
return nil, err
}
return &Server{
relay: relay,
relay: relay,
listeners: make([]listener.Listener, 0, 2),
}, nil
}
// Listen starts the relay server.
func (r *Server) Listen(cfg ListenerConfig) error {
r.wSListener = &ws.Listener{
wSListener := &ws.Listener{
Address: cfg.Address,
TLSConfig: cfg.TLSConfig,
}
r.listeners = append(r.listeners, wSListener)
wslErr := r.wSListener.Listen(r.relay.Accept)
if wslErr != nil {
log.Errorf("failed to bind ws server: %s", wslErr)
tlsConfigQUIC, err := quictls.ServerQUICTLSConfig(cfg.TLSConfig)
if err != nil {
return err
}
return wslErr
quicListener := &quic.Listener{
Address: cfg.Address,
TLSConfig: tlsConfigQUIC,
}
r.listeners = append(r.listeners, quicListener)
errChan := make(chan error, len(r.listeners))
wg := sync.WaitGroup{}
for _, l := range r.listeners {
wg.Add(1)
go func(listener listener.Listener) {
defer wg.Done()
errChan <- listener.Listen(r.relay.Accept)
}(l)
}
wg.Wait()
close(errChan)
var multiErr *multierror.Error
for err := range errChan {
multiErr = multierror.Append(multiErr, err)
}
return nberrors.FormatErrorOrNil(multiErr)
}
// Shutdown stops the relay server. If there are active connections, they will be closed gracefully. In case of a context,
// the connections will be forcefully closed.
func (r *Server) Shutdown(ctx context.Context) (err error) {
// stop service new connections
if r.wSListener != nil {
err = r.wSListener.Shutdown(ctx)
}
func (r *Server) Shutdown(ctx context.Context) error {
r.relay.Shutdown(ctx)
return
var multiErr *multierror.Error
for _, l := range r.listeners {
if err := l.Shutdown(ctx); err != nil {
multiErr = multierror.Append(multiErr, err)
}
}
return nberrors.FormatErrorOrNil(multiErr)
}
// InstanceURL returns the instance URL of the relay server.