From ce2b9401af08913fadf5aeb7e57d9c68ae161262 Mon Sep 17 00:00:00 2001 From: Eugene K Date: Thu, 4 May 2023 10:28:31 -0400 Subject: [PATCH] implement udpTunnel mode: backend and frontend --- endpoints/udpTunnel/backend.go | 81 +++++++++++++ endpoints/udpTunnel/frontend.go | 197 ++++++++++++++++++++++++++++++++ 2 files changed, 278 insertions(+) create mode 100644 endpoints/udpTunnel/backend.go create mode 100644 endpoints/udpTunnel/frontend.go diff --git a/endpoints/udpTunnel/backend.go b/endpoints/udpTunnel/backend.go new file mode 100644 index 00000000..80832481 --- /dev/null +++ b/endpoints/udpTunnel/backend.go @@ -0,0 +1,81 @@ +package udpTunnel + +import ( + "github.com/openziti/sdk-golang/ziti" + "github.com/openziti/sdk-golang/ziti/config" + "github.com/openziti/sdk-golang/ziti/edge" + "github.com/openziti/zrok/endpoints" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "net" + "time" +) + +type BackendConfig struct { + IdentityPath string + EndpointAddress string + ShrToken string + RequestsChan chan *endpoints.Request +} + +type Backend struct { + cfg *BackendConfig + listener edge.Listener +} + +func NewBackend(cfg *BackendConfig) (*Backend, error) { + options := ziti.ListenOptions{ + ConnectTimeout: 5 * time.Minute, + MaxConnections: 64, + } + zcfg, err := config.NewFromFile(cfg.IdentityPath) + if err != nil { + return nil, errors.Wrap(err, "error loading config") + } + listener, err := ziti.NewContextWithConfig(zcfg).ListenWithOptions(cfg.ShrToken, &options) + if err != nil { + return nil, errors.Wrap(err, "error listening") + } + b := &Backend{ + cfg: cfg, + listener: listener, + } + return b, nil +} + +func (b *Backend) Run() error { + logrus.Info("started") + defer logrus.Info("exited") + + for { + if conn, err := b.listener.Accept(); err == nil { + go b.handle(conn) + } else { + return err + } + } +} + +func (b *Backend) handle(conn net.Conn) { + logrus.Debugf("handling '%v'", conn.RemoteAddr()) + if rAddr, err := net.ResolveUDPAddr("udp", b.cfg.EndpointAddress); err == nil { + if rConn, err := net.DialUDP("udp", nil, rAddr); err == nil { + go endpoints.TXer(conn, rConn) + go endpoints.TXer(rConn, conn) + if b.cfg.RequestsChan != nil { + b.cfg.RequestsChan <- &endpoints.Request{ + Stamp: time.Now(), + RemoteAddr: conn.RemoteAddr().String(), + Method: "ACCEPT", + Path: rAddr.String(), + } + } + } else { + logrus.Errorf("error dialing '%v': %v", b.cfg.EndpointAddress, err) + _ = conn.Close() + return + } + } else { + logrus.Errorf("error resolving '%v': %v", b.cfg.EndpointAddress, err) + } +} diff --git a/endpoints/udpTunnel/frontend.go b/endpoints/udpTunnel/frontend.go new file mode 100644 index 00000000..e3e21caf --- /dev/null +++ b/endpoints/udpTunnel/frontend.go @@ -0,0 +1,197 @@ +package udpTunnel + +import ( + "github.com/openziti/sdk-golang/ziti" + "github.com/openziti/sdk-golang/ziti/config" + "github.com/openziti/zrok/endpoints" + "github.com/openziti/zrok/model" + "github.com/openziti/zrok/zrokdir" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "net" + "sync" + "time" +) + +type FrontendConfig struct { + BindAddress string + IdentityName string + ShrToken string + RequestsChan chan *endpoints.Request + IdleTime time.Duration +} + +type Frontend struct { + cfg *FrontendConfig + zCtx ziti.Context + lAddr *net.UDPAddr + clients *sync.Map // map[net.Addr]*clientConn +} + +type clientConn struct { + zitiConn net.Conn + conn *net.UDPConn + addr *net.UDPAddr + closer func(addr *net.UDPAddr) + active chan bool +} + +func (c *clientConn) Read(b []byte) (n int, err error) { + panic("write only connection!") +} + +func (c *clientConn) Write(b []byte) (n int, err error) { + return c.conn.WriteTo(b, c.addr) +} + +func (c *clientConn) Close() error { + select { + case <-c.active: + // if it's here client as already closed + default: + close(c.active) + c.closer(c.addr) + } + return nil +} + +func (c *clientConn) LocalAddr() net.Addr { + return c.conn.LocalAddr() +} + +func (c *clientConn) RemoteAddr() net.Addr { + return c.addr +} + +func (c *clientConn) SetDeadline(t time.Time) error { + return nil +} + +func (c *clientConn) SetReadDeadline(t time.Time) error { + return nil +} + +func (c *clientConn) SetWriteDeadline(t time.Time) error { + return nil +} + +func (c *clientConn) timeout(idle time.Duration) { + t := time.NewTimer(idle) + for { + select { + + case active := <-c.active: + if active { + t.Stop() + t.Reset(idle) + } else { + break + } + + case <-t.C: + _ = c.Close() + return + } + } +} + +func NewFrontend(cfg *FrontendConfig) (*Frontend, error) { + lAddr, err := net.ResolveUDPAddr("udp", cfg.BindAddress) + if err != nil { + return nil, errors.Wrapf(err, "error resolving tcp address '%v'", cfg.BindAddress) + } + zCfgPath, err := zrokdir.ZitiIdentityFile(cfg.IdentityName) + if err != nil { + return nil, errors.Wrapf(err, "error getting ziti identity '%v' from zrokdir", cfg.IdentityName) + } + zCfg, err := config.NewFromFile(zCfgPath) + if err != nil { + return nil, errors.Wrap(err, "error loading config") + } + zCfg.ConfigTypes = []string{model.ZrokProxyConfig} + zCtx := ziti.NewContextWithConfig(zCfg) + logrus.Errorf("creating new frontend") + return &Frontend{ + cfg: cfg, + zCtx: zCtx, + lAddr: lAddr, + clients: new(sync.Map), + }, nil +} + +func (f *Frontend) Run() error { + l, err := net.ListenUDP("udp", f.lAddr) + if err != nil { + return errors.Wrapf(err, "error listening at '%v'", f.lAddr) + } + for { + buf := make([]byte, 16*1024) + count, srcAddr, err := l.ReadFromUDP(buf) + if err != nil { + return err + } + + c, found := f.clients.Load(srcAddr.String()) + if found { + clt := c.(*clientConn) + clt.active <- true + _, err := clt.zitiConn.Write(buf[:count]) + if err != nil { + logrus.Errorf("error writing '%v': %v", f.cfg.ShrToken, err) + f.clients.Delete(srcAddr) + _ = clt.zitiConn.Close() + } + } else { + zitiConn, err := f.zCtx.Dial(f.cfg.ShrToken) + if err != nil { + logrus.Errorf("error dialing '%v': %v", f.cfg.ShrToken, err) + continue + } + + _, err = zitiConn.Write(buf[:count]) + if err != nil { + logrus.Errorf("error writing '%v': %v", f.cfg.ShrToken, err) + _ = zitiConn.Close() + continue + } + + clt := f.makeClient(zitiConn, l, srcAddr) + f.clients.Store(srcAddr.String(), clt) + } + } +} + +func (f *Frontend) notify(msg string, addr *net.UDPAddr) { + if f.cfg.RequestsChan != nil { + f.cfg.RequestsChan <- &endpoints.Request{ + Stamp: time.Now(), + RemoteAddr: addr.String(), + Method: msg, + Path: f.cfg.ShrToken, + } + } +} + +func (f *Frontend) makeClient(zitiConn net.Conn, l *net.UDPConn, addr *net.UDPAddr) *clientConn { + clt := &clientConn{ + zitiConn: zitiConn, + conn: l, + addr: addr, + closer: f.closeClient, + active: make(chan bool), + } + go clt.timeout(f.cfg.IdleTime) + go endpoints.TXer(zitiConn, clt) + + f.notify("ACCEPT", addr) + return clt +} + +func (f *Frontend) closeClient(addr *net.UDPAddr) { + f.notify("CLOSED", addr) + c, found := f.clients.LoadAndDelete(addr.String()) + if found { + clt := c.(*clientConn) + _ = clt.zitiConn.Close() + } +}