diff --git a/CHANGELOG.md b/CHANGELOG.md index d633bcce..efeb7170 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ FEATURE: New `tcpTunnel` backend mode allowing for private sharing of local TCP sockets with other `zrok` users (https://github.com/openziti/zrok/issues/170) +FEATURE: New `udpTunnel` backend mode allowing for private sharing of local UDP sockets with other `zrok` users (https://github.com/openziti/zrok/issues/306) + FEATURE: New metrics infrastructure based on OpenZiti usage events (https://github.com/openziti/zrok/issues/128). See the [v0.4 Metrics Guide](docs/guides/metrics-and-limits/configuring-metrics.md) for more information. FEATURE: New limits implementation based on the new metrics infrastructure (https://github.com/openziti/zrok/issues/235). See the [v0.4 Limits Guide](docs/guides/metrics-and-limits/configuring-limits.md) for more information. diff --git a/cmd/zrok/accessPrivate.go b/cmd/zrok/accessPrivate.go index 8c106b84..df2f33f4 100644 --- a/cmd/zrok/accessPrivate.go +++ b/cmd/zrok/accessPrivate.go @@ -7,6 +7,7 @@ import ( "github.com/openziti/zrok/endpoints" "github.com/openziti/zrok/endpoints/proxy" "github.com/openziti/zrok/endpoints/tcpTunnel" + "github.com/openziti/zrok/endpoints/udpTunnel" "github.com/openziti/zrok/rest_client_zrok" "github.com/openziti/zrok/rest_client_zrok/share" "github.com/openziti/zrok/rest_model_zrok" @@ -18,6 +19,7 @@ import ( "os" "os/signal" "syscall" + "time" ) var accessPrivateCmd *accessPrivateCommand @@ -84,6 +86,8 @@ func (cmd *accessPrivateCommand) run(_ *cobra.Command, args []string) { switch accessResp.Payload.BackendMode { case "tcpTunnel": protocol = "tcp://" + case "udpTunnel": + protocol = "udp://" } endpointUrl, err := url.Parse(protocol + cmd.bindAddress) @@ -95,7 +99,8 @@ func (cmd *accessPrivateCommand) run(_ *cobra.Command, args []string) { } requests := make(chan *endpoints.Request, 1024) - if accessResp.Payload.BackendMode == "tcpTunnel" { + switch accessResp.Payload.BackendMode { + case "tcpTunnel": fe, err := tcpTunnel.NewFrontend(&tcpTunnel.FrontendConfig{ BindAddress: cmd.bindAddress, IdentityName: "backend", @@ -116,7 +121,31 @@ func (cmd *accessPrivateCommand) run(_ *cobra.Command, args []string) { panic(err) } }() - } else { + + case "udpTunnel": + fe, err := udpTunnel.NewFrontend(&udpTunnel.FrontendConfig{ + BindAddress: cmd.bindAddress, + IdentityName: "backend", + ShrToken: args[0], + RequestsChan: requests, + IdleTime: time.Minute, + }) + if err != nil { + if !panicInstead { + tui.Error("unable to create private frontend", err) + } + panic(err) + } + go func() { + if err := fe.Run(); err != nil { + if !panicInstead { + tui.Error("error starting frontend", err) + } + panic(err) + } + }() + + default: cfg := proxy.DefaultFrontendConfig("backend") cfg.ShrToken = shrToken cfg.Address = cmd.bindAddress diff --git a/cmd/zrok/sharePrivate.go b/cmd/zrok/sharePrivate.go index 3d32a4b5..049d91a5 100644 --- a/cmd/zrok/sharePrivate.go +++ b/cmd/zrok/sharePrivate.go @@ -8,6 +8,7 @@ import ( "github.com/openziti/zrok/endpoints" "github.com/openziti/zrok/endpoints/proxy" "github.com/openziti/zrok/endpoints/tcpTunnel" + "github.com/openziti/zrok/endpoints/udpTunnel" "github.com/openziti/zrok/model" "github.com/openziti/zrok/rest_client_zrok" "github.com/openziti/zrok/rest_client_zrok/share" @@ -43,7 +44,7 @@ func newSharePrivateCommand() *sharePrivateCommand { } command := &sharePrivateCommand{cmd: cmd} cmd.Flags().StringArrayVar(&command.basicAuth, "basic-auth", []string{}, "Basic authentication users (,...") - cmd.Flags().StringVar(&command.backendMode, "backend-mode", "proxy", "The backend mode {proxy, web, tcpTunnel}") + cmd.Flags().StringVar(&command.backendMode, "backend-mode", "proxy", "The backend mode {proxy, web, tcpTunnel, udpTunnel}") cmd.Flags().BoolVar(&command.headless, "headless", false, "Disable TUI and run headless") cmd.Flags().BoolVar(&command.insecure, "insecure", false, "Enable insecure TLS certificate validation for ") cmd.Run = command.run @@ -70,6 +71,9 @@ func (cmd *sharePrivateCommand) run(_ *cobra.Command, args []string) { case "tcpTunnel": target = args[0] + case "udpTunnel": + target = args[0] + default: tui.Error(fmt.Sprintf("invalid backend mode '%v'; expected {proxy, web, tcpTunnel}", cmd.backendMode), nil) } @@ -192,6 +196,26 @@ func (cmd *sharePrivateCommand) run(_ *cobra.Command, args []string) { } }() + case "udpTunnel": + cfg := &udpTunnel.BackendConfig{ + IdentityPath: zif, + EndpointAddress: target, + ShrToken: resp.Payload.ShrToken, + RequestsChan: requestsChan, + } + be, err := udpTunnel.NewBackend(cfg) + if err != nil { + if !panicInstead { + tui.Error("unable to create udpTunnel backend", err) + } + panic(err) + } + go func() { + if err := be.Run(); err != nil { + logrus.Errorf("error running udpTunnel backend: %v", err) + } + }() + default: tui.Error("invalid backend mode", nil) } diff --git a/endpoints/udpTunnel/backend.go b/endpoints/udpTunnel/backend.go new file mode 100644 index 00000000..80832481 --- /dev/null +++ b/endpoints/udpTunnel/backend.go @@ -0,0 +1,81 @@ +package udpTunnel + +import ( + "github.com/openziti/sdk-golang/ziti" + "github.com/openziti/sdk-golang/ziti/config" + "github.com/openziti/sdk-golang/ziti/edge" + "github.com/openziti/zrok/endpoints" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "net" + "time" +) + +type BackendConfig struct { + IdentityPath string + EndpointAddress string + ShrToken string + RequestsChan chan *endpoints.Request +} + +type Backend struct { + cfg *BackendConfig + listener edge.Listener +} + +func NewBackend(cfg *BackendConfig) (*Backend, error) { + options := ziti.ListenOptions{ + ConnectTimeout: 5 * time.Minute, + MaxConnections: 64, + } + zcfg, err := config.NewFromFile(cfg.IdentityPath) + if err != nil { + return nil, errors.Wrap(err, "error loading config") + } + listener, err := ziti.NewContextWithConfig(zcfg).ListenWithOptions(cfg.ShrToken, &options) + if err != nil { + return nil, errors.Wrap(err, "error listening") + } + b := &Backend{ + cfg: cfg, + listener: listener, + } + return b, nil +} + +func (b *Backend) Run() error { + logrus.Info("started") + defer logrus.Info("exited") + + for { + if conn, err := b.listener.Accept(); err == nil { + go b.handle(conn) + } else { + return err + } + } +} + +func (b *Backend) handle(conn net.Conn) { + logrus.Debugf("handling '%v'", conn.RemoteAddr()) + if rAddr, err := net.ResolveUDPAddr("udp", b.cfg.EndpointAddress); err == nil { + if rConn, err := net.DialUDP("udp", nil, rAddr); err == nil { + go endpoints.TXer(conn, rConn) + go endpoints.TXer(rConn, conn) + if b.cfg.RequestsChan != nil { + b.cfg.RequestsChan <- &endpoints.Request{ + Stamp: time.Now(), + RemoteAddr: conn.RemoteAddr().String(), + Method: "ACCEPT", + Path: rAddr.String(), + } + } + } else { + logrus.Errorf("error dialing '%v': %v", b.cfg.EndpointAddress, err) + _ = conn.Close() + return + } + } else { + logrus.Errorf("error resolving '%v': %v", b.cfg.EndpointAddress, err) + } +} diff --git a/endpoints/udpTunnel/frontend.go b/endpoints/udpTunnel/frontend.go new file mode 100644 index 00000000..c8f11f51 --- /dev/null +++ b/endpoints/udpTunnel/frontend.go @@ -0,0 +1,197 @@ +package udpTunnel + +import ( + "github.com/openziti/sdk-golang/ziti" + "github.com/openziti/sdk-golang/ziti/config" + "github.com/openziti/zrok/endpoints" + "github.com/openziti/zrok/model" + "github.com/openziti/zrok/zrokdir" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "net" + "sync" + "time" +) + +type FrontendConfig struct { + BindAddress string + IdentityName string + ShrToken string + RequestsChan chan *endpoints.Request + IdleTime time.Duration +} + +type Frontend struct { + cfg *FrontendConfig + zCtx ziti.Context + lAddr *net.UDPAddr + clients *sync.Map // map[net.Addr]*clientConn +} + +type clientConn struct { + zitiConn net.Conn + conn *net.UDPConn + addr *net.UDPAddr + closer func(addr *net.UDPAddr) + active chan bool +} + +func (c *clientConn) Read(b []byte) (n int, err error) { + panic("write only connection!") +} + +func (c *clientConn) Write(b []byte) (n int, err error) { + return c.conn.WriteTo(b, c.addr) +} + +func (c *clientConn) Close() error { + select { + case <-c.active: + // if it's here client as already closed + default: + close(c.active) + c.closer(c.addr) + } + return nil +} + +func (c *clientConn) LocalAddr() net.Addr { + return c.conn.LocalAddr() +} + +func (c *clientConn) RemoteAddr() net.Addr { + return c.addr +} + +func (c *clientConn) SetDeadline(t time.Time) error { + return nil +} + +func (c *clientConn) SetReadDeadline(t time.Time) error { + return nil +} + +func (c *clientConn) SetWriteDeadline(t time.Time) error { + return nil +} + +func (c *clientConn) timeout(idle time.Duration) { + t := time.NewTimer(idle) + for { + select { + + case active := <-c.active: + if active { + t.Stop() + t.Reset(idle) + } else { + break + } + + case <-t.C: + _ = c.Close() + return + } + } +} + +func NewFrontend(cfg *FrontendConfig) (*Frontend, error) { + lAddr, err := net.ResolveUDPAddr("udp", cfg.BindAddress) + if err != nil { + return nil, errors.Wrapf(err, "error resolving udp address '%v'", cfg.BindAddress) + } + zCfgPath, err := zrokdir.ZitiIdentityFile(cfg.IdentityName) + if err != nil { + return nil, errors.Wrapf(err, "error getting ziti identity '%v' from zrokdir", cfg.IdentityName) + } + zCfg, err := config.NewFromFile(zCfgPath) + if err != nil { + return nil, errors.Wrap(err, "error loading config") + } + zCfg.ConfigTypes = []string{model.ZrokProxyConfig} + zCtx := ziti.NewContextWithConfig(zCfg) + logrus.Errorf("creating new frontend") + return &Frontend{ + cfg: cfg, + zCtx: zCtx, + lAddr: lAddr, + clients: new(sync.Map), + }, nil +} + +func (f *Frontend) Run() error { + l, err := net.ListenUDP("udp", f.lAddr) + if err != nil { + return errors.Wrapf(err, "error listening at '%v'", f.lAddr) + } + for { + buf := make([]byte, 16*1024) + count, srcAddr, err := l.ReadFromUDP(buf) + if err != nil { + return err + } + + c, found := f.clients.Load(srcAddr.String()) + if found { + clt := c.(*clientConn) + clt.active <- true + _, err := clt.zitiConn.Write(buf[:count]) + if err != nil { + logrus.Errorf("error writing '%v': %v", f.cfg.ShrToken, err) + f.clients.Delete(srcAddr) + _ = clt.zitiConn.Close() + } + } else { + zitiConn, err := f.zCtx.Dial(f.cfg.ShrToken) + if err != nil { + logrus.Errorf("error dialing '%v': %v", f.cfg.ShrToken, err) + continue + } + + _, err = zitiConn.Write(buf[:count]) + if err != nil { + logrus.Errorf("error writing '%v': %v", f.cfg.ShrToken, err) + _ = zitiConn.Close() + continue + } + + clt := f.makeClient(zitiConn, l, srcAddr) + f.clients.Store(srcAddr.String(), clt) + } + } +} + +func (f *Frontend) notify(msg string, addr *net.UDPAddr) { + if f.cfg.RequestsChan != nil { + f.cfg.RequestsChan <- &endpoints.Request{ + Stamp: time.Now(), + RemoteAddr: addr.String(), + Method: msg, + Path: f.cfg.ShrToken, + } + } +} + +func (f *Frontend) makeClient(zitiConn net.Conn, l *net.UDPConn, addr *net.UDPAddr) *clientConn { + clt := &clientConn{ + zitiConn: zitiConn, + conn: l, + addr: addr, + closer: f.closeClient, + active: make(chan bool), + } + go clt.timeout(f.cfg.IdleTime) + go endpoints.TXer(zitiConn, clt) + + f.notify("ACCEPT", addr) + return clt +} + +func (f *Frontend) closeClient(addr *net.UDPAddr) { + f.notify("CLOSED", addr) + c, found := f.clients.LoadAndDelete(addr.String()) + if found { + clt := c.(*clientConn) + _ = clt.zitiConn.Close() + } +}