diff --git a/relay/client/dialer/quic/conn.go b/relay/client/dialer/quic/conn.go deleted file mode 100644 index 2c8b29a3c..000000000 --- a/relay/client/dialer/quic/conn.go +++ /dev/null @@ -1,52 +0,0 @@ -package quic - -import ( - "net" - - "github.com/quic-go/quic-go" - log "github.com/sirupsen/logrus" -) - -type Conn struct { - quic.Stream - qConn quic.Connection -} - -func NewConn(stream quic.Stream, qConn quic.Connection) net.Conn { - return &Conn{ - Stream: stream, - qConn: qConn, - } -} - -func (q *Conn) Write(b []byte) (n int, err error) { - log.Debugf("writing: %d, %x\n", len(b), b) - n, err = q.Stream.Write(b) - if n != len(b) { - log.Errorf("failed to write out the full message") - } - return -} - -func (q *Conn) Close() error { - err := q.Stream.Close() - if err != nil { - log.Errorf("failed to close stream: %s", err) - return err - } - err = q.qConn.CloseWithError(0, "") - if err != nil { - log.Errorf("failed to close connection: %s", err) - return err - - } - return err -} - -func (c *Conn) LocalAddr() net.Addr { - return c.qConn.LocalAddr() -} - -func (c *Conn) RemoteAddr() net.Addr { - return c.qConn.RemoteAddr() -} diff --git a/relay/client/dialer/quic/quic.go b/relay/client/dialer/quic/quic.go deleted file mode 100644 index 4863ed7bd..000000000 --- a/relay/client/dialer/quic/quic.go +++ /dev/null @@ -1,32 +0,0 @@ -package quic - -import ( - "context" - "crypto/tls" - "net" - - "github.com/quic-go/quic-go" - log "github.com/sirupsen/logrus" -) - -func Dial(address string) (net.Conn, error) { - tlsConf := &tls.Config{ - InsecureSkipVerify: true, - NextProtos: []string{"quic-echo-example"}, - } - qConn, err := quic.DialAddr(context.Background(), address, tlsConf, &quic.Config{ - EnableDatagrams: true, - }) - if err != nil { - log.Errorf("dial quic address %s failed: %s", address, err) - return nil, err - } - - stream, err := qConn.OpenStreamSync(context.Background()) - if err != nil { - return nil, err - } - - conn := NewConn(stream, qConn) - return conn, nil -} diff --git a/relay/client/dialer/tcp/tcp.go b/relay/client/dialer/tcp/tcp.go deleted file mode 100644 index 47b0a31d5..000000000 --- a/relay/client/dialer/tcp/tcp.go +++ /dev/null @@ -1,7 +0,0 @@ -package tcp - -import "net" - -func Dial(address string) (net.Conn, error) { - return net.Dial("tcp", address) -} diff --git a/relay/client/dialer/udp/udp.go b/relay/client/dialer/udp/udp.go deleted file mode 100644 index ff0fa9c83..000000000 --- a/relay/client/dialer/udp/udp.go +++ /dev/null @@ -1,14 +0,0 @@ -package udp - -import ( - "net" -) - -func Dial(address string) (net.Conn, error) { - udpAddr, err := net.ResolveUDPAddr("udp", address) - if err != nil { - return nil, err - } - - return net.DialUDP("udp", nil, udpAddr) -} diff --git a/relay/server/listener/quic/conn.go b/relay/server/listener/quic/conn.go deleted file mode 100644 index 5414f9eee..000000000 --- a/relay/server/listener/quic/conn.go +++ /dev/null @@ -1,36 +0,0 @@ -package quic - -import ( - "net" - - "github.com/quic-go/quic-go" - log "github.com/sirupsen/logrus" -) - -type Conn struct { - quic.Stream - qConn quic.Connection -} - -func NewConn(stream quic.Stream, qConn quic.Connection) net.Conn { - return &Conn{ - Stream: stream, - qConn: qConn, - } -} - -func (q Conn) Write(b []byte) (n int, err error) { - n, err = q.Stream.Write(b) - if n != len(b) { - log.Errorf("failed to write out the full message") - } - return -} - -func (q Conn) LocalAddr() net.Addr { - return q.qConn.LocalAddr() -} - -func (q Conn) RemoteAddr() net.Addr { - return q.qConn.RemoteAddr() -} diff --git a/relay/server/listener/quic/listener.go b/relay/server/listener/quic/listener.go deleted file mode 100644 index 55107e7ba..000000000 --- a/relay/server/listener/quic/listener.go +++ /dev/null @@ -1,110 +0,0 @@ -package quic - -import ( - "context" - "crypto/rand" - "crypto/rsa" - "crypto/tls" - "crypto/x509" - "encoding/pem" - "math/big" - "net" - "sync" - - "github.com/quic-go/quic-go" - log "github.com/sirupsen/logrus" - - "github.com/netbirdio/netbird/relay/server/listener" -) - -type Listener struct { - address string - - listener *quic.Listener - quit chan struct{} - wg sync.WaitGroup -} - -func NewListener(address string) listener.Listener { - return &Listener{ - address: address, - } -} - -func (l *Listener) Listen(onAcceptFn func(conn net.Conn)) error { - ql, err := quic.ListenAddr(l.address, generateTLSConfig(), &quic.Config{ - EnableDatagrams: true, - }) - if err != nil { - return err - } - l.listener = ql - l.quit = make(chan struct{}) - - log.Infof("quic server is listening on address: %s", l.address) - l.wg.Add(1) - go l.acceptLoop(onAcceptFn) - - <-l.quit - return nil -} - -func (l *Listener) Close() error { - close(l.quit) - err := l.listener.Close() - l.wg.Wait() - return err -} - -func (l *Listener) acceptLoop(acceptFn func(conn net.Conn)) { - defer l.wg.Done() - - for { - qConn, err := l.listener.Accept(context.Background()) - if err != nil { - select { - case <-l.quit: - return - default: - log.Errorf("failed to accept connection: %s", err) - continue - } - } - - log.Infof("new connection from: %s", qConn.RemoteAddr()) - - stream, err := qConn.AcceptStream(context.Background()) - if err != nil { - log.Errorf("failed to open stream: %s", err) - continue - } - - conn := NewConn(stream, qConn) - - go acceptFn(conn) - } -} - -// Setup a bare-bones TLS config for the server -func generateTLSConfig() *tls.Config { - key, err := rsa.GenerateKey(rand.Reader, 2048) - if err != nil { - panic(err) - } - template := x509.Certificate{SerialNumber: big.NewInt(1)} - certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &key.PublicKey, key) - if err != nil { - panic(err) - } - keyPEM := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(key)}) - certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER}) - - tlsCert, err := tls.X509KeyPair(certPEM, keyPEM) - if err != nil { - panic(err) - } - return &tls.Config{ - Certificates: []tls.Certificate{tlsCert}, - NextProtos: []string{"quic-echo-example"}, - } -} diff --git a/relay/server/listener/tcp/listener.go b/relay/server/listener/tcp/listener.go deleted file mode 100644 index fb26b0fc4..000000000 --- a/relay/server/listener/tcp/listener.go +++ /dev/null @@ -1,80 +0,0 @@ -package tcp - -import ( - "net" - "sync" - - log "github.com/sirupsen/logrus" - - "github.com/netbirdio/netbird/relay/server/listener" -) - -// Listener -// Is it just demo code. It does not work in real life environment because the TCP is a streaming protocol, and -// it does not handle framing. -type Listener struct { - address string - - onAcceptFn func(conn net.Conn) - wg sync.WaitGroup - quit chan struct{} - listener net.Listener - lock sync.Mutex -} - -func NewListener(address string) listener.Listener { - return &Listener{ - address: address, - } -} - -func (l *Listener) Listen(onAcceptFn func(conn net.Conn)) error { - l.lock.Lock() - - l.onAcceptFn = onAcceptFn - l.quit = make(chan struct{}) - - li, err := net.Listen("tcp", l.address) - if err != nil { - log.Errorf("failed to listen on address: %s, %s", l.address, err) - l.lock.Unlock() - return err - } - log.Debugf("TCP server is listening on address: %s", l.address) - l.listener = li - l.wg.Add(1) - go l.acceptLoop() - - l.lock.Unlock() - <-l.quit - return nil -} - -// Close todo: prevent multiple call (do not close two times the channel) -func (l *Listener) Close() error { - l.lock.Lock() - defer l.lock.Unlock() - - close(l.quit) - err := l.listener.Close() - l.wg.Wait() - return err -} - -func (l *Listener) acceptLoop() { - defer l.wg.Done() - - for { - conn, err := l.listener.Accept() - if err != nil { - select { - case <-l.quit: - return - default: - log.Errorf("failed to accept connection: %s", err) - continue - } - } - go l.onAcceptFn(conn) - } -} diff --git a/relay/server/listener/udp/conn.go b/relay/server/listener/udp/conn.go deleted file mode 100644 index 1c811064c..000000000 --- a/relay/server/listener/udp/conn.go +++ /dev/null @@ -1,68 +0,0 @@ -package udp - -import ( - "io" - "net" - "time" -) - -type Conn struct { - *net.UDPConn - addr *net.UDPAddr - msgChannel chan []byte -} - -func NewConn(conn *net.UDPConn, addr *net.UDPAddr) *Conn { - return &Conn{ - UDPConn: conn, - addr: addr, - msgChannel: make(chan []byte), - } -} - -func (u *Conn) Read(b []byte) (n int, err error) { - msg, ok := <-u.msgChannel - if !ok { - return 0, io.EOF - } - - n = copy(b, msg) - return n, nil -} - -func (u *Conn) Write(b []byte) (n int, err error) { - return u.UDPConn.WriteTo(b, u.addr) -} - -func (u *Conn) Close() error { - //TODO implement me - //panic("implement me") - return nil -} - -func (u *Conn) LocalAddr() net.Addr { - return u.UDPConn.LocalAddr() -} - -func (u *Conn) RemoteAddr() net.Addr { - return u.addr -} - -func (u *Conn) SetDeadline(t time.Time) error { - //TODO implement me - panic("implement SetDeadline") -} - -func (u *Conn) SetReadDeadline(t time.Time) error { - //TODO implement me - panic("implement SetReadDeadline") -} - -func (u *Conn) SetWriteDeadline(t time.Time) error { - //TODO implement me - panic("implement SetWriteDeadline") -} - -func (u *Conn) onNewMsg(b []byte) { - u.msgChannel <- b -} diff --git a/relay/server/listener/udp/listener.go b/relay/server/listener/udp/listener.go deleted file mode 100644 index a895cdc32..000000000 --- a/relay/server/listener/udp/listener.go +++ /dev/null @@ -1,104 +0,0 @@ -package udp - -import ( - "net" - "sync" - - log "github.com/sirupsen/logrus" - - "github.com/netbirdio/netbird/relay/server/listener" -) - -type Listener struct { - address string - conns map[string]*Conn - onAcceptFn func(conn net.Conn) - - listener *net.UDPConn - - wg sync.WaitGroup - quit chan struct{} - lock sync.Mutex -} - -func NewListener(address string) listener.Listener { - return &Listener{ - address: address, - conns: make(map[string]*Conn), - } -} - -func (l *Listener) Listen(onAcceptFn func(conn net.Conn)) error { - l.lock.Lock() - - l.onAcceptFn = onAcceptFn - l.quit = make(chan struct{}) - - addr, err := net.ResolveUDPAddr("udp", l.address) - if err != nil { - log.Errorf("invalid listen address '%s': %s", l.address, err) - l.lock.Unlock() - return err - } - - li, err := net.ListenUDP("udp", addr) - if err != nil { - log.Fatalf("%s", err) - l.lock.Unlock() - return err - } - log.Debugf("udp server is listening on address: %s", addr.String()) - l.listener = li - l.wg.Add(1) - go l.readLoop() - - l.lock.Unlock() - <-l.quit - return nil -} - -func (l *Listener) Close() error { - l.lock.Lock() - defer l.lock.Unlock() - - if l.listener == nil { - return nil - } - - log.Infof("closing UDP listener") - close(l.quit) - err := l.listener.Close() - l.wg.Wait() - l.listener = nil - return err -} - -func (l *Listener) readLoop() { - defer l.wg.Done() - - for { - buf := make([]byte, 1500) - n, addr, err := l.listener.ReadFromUDP(buf) - if err != nil { - select { - case <-l.quit: - return - default: - log.Errorf("failed to accept connection: %s", err) - continue - } - } - - pConn, ok := l.conns[addr.String()] - if ok { - pConn.onNewMsg(buf[:n]) - continue - } - - pConn = NewConn(l.listener, addr) - log.Infof("new connection from: %s", pConn.RemoteAddr()) - l.conns[addr.String()] = pConn - go l.onAcceptFn(pConn) - pConn.onNewMsg(buf[:n]) - } -} diff --git a/relay/server/server.go b/relay/server/server.go index 80d1ba00d..c34151c9d 100644 --- a/relay/server/server.go +++ b/relay/server/server.go @@ -3,8 +3,6 @@ package server import ( "context" "crypto/tls" - "errors" - "sync" "time" log "github.com/sirupsen/logrus" @@ -12,7 +10,6 @@ import ( "github.com/netbirdio/netbird/relay/auth" "github.com/netbirdio/netbird/relay/server/listener" - "github.com/netbirdio/netbird/relay/server/listener/udp" "github.com/netbirdio/netbird/relay/server/listener/ws" ) @@ -22,9 +19,8 @@ type ListenerConfig struct { } type Server struct { - relay *Relay - uDPListener listener.Listener - wSListener listener.Listener + relay *Relay + wSListener listener.Listener } func NewServer(meter metric.Meter, exposedAddress string, tlsSupport bool, authValidator auth.Validator) (*Server, error) { @@ -38,9 +34,6 @@ func NewServer(meter metric.Meter, exposedAddress string, tlsSupport bool, authV } func (r *Server) Listen(cfg ListenerConfig) error { - wg := sync.WaitGroup{} - wg.Add(2) - r.wSListener = &ws.Listener{ Address: cfg.Address, TLSConfig: cfg.TLSConfig, @@ -48,46 +41,26 @@ func (r *Server) Listen(cfg ListenerConfig) error { var wslErr error go func() { - defer wg.Done() wslErr = r.wSListener.Listen(r.relay.Accept) if wslErr != nil { log.Errorf("failed to bind ws server: %s", wslErr) } }() - r.uDPListener = udp.NewListener(cfg.Address) - var udpLErr error - go func() { - defer wg.Done() - udpLErr = r.uDPListener.Listen(r.relay.Accept) - if udpLErr != nil { - log.Errorf("failed to bind ws server: %s", udpLErr) - } - }() - - err := errors.Join(wslErr, udpLErr) - return err + return wslErr } -func (r *Server) Close() error { - var wErr error +func (r *Server) Close() (err error) { // stop service new connections if r.wSListener != nil { - wErr = r.wSListener.Close() - } - - var uErr error - if r.uDPListener != nil { - uErr = r.uDPListener.Close() + err = r.wSListener.Close() } // close accepted connections gracefully ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() r.relay.Close(ctx) - - err := errors.Join(wErr, uErr) - return err + return } func (r *Server) InstanceURL() string {