diff --git a/client/proxy/proxy.go b/client/proxy/proxy.go index 966b5ca4..cb7d5177 100644 --- a/client/proxy/proxy.go +++ b/client/proxy/proxy.go @@ -479,12 +479,25 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { // close resources releated with old workConn pxy.Close() + var rwc io.ReadWriteCloser = conn + var err error if pxy.limiter != nil { - rwc := frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error { + rwc = frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error { return conn.Close() }) - conn = frpNet.WrapReadWriteCloserToConn(rwc, conn) } + if pxy.cfg.UseEncryption { + rwc, err = frpIo.WithEncryption(rwc, []byte(pxy.clientCfg.Token)) + if err != nil { + conn.Close() + xl.Error("create encryption stream error: %v", err) + return + } + } + if pxy.cfg.UseCompression { + rwc = frpIo.WithCompression(rwc) + } + conn = frpNet.WrapReadWriteCloserToConn(rwc, conn) pxy.mu.Lock() pxy.workConn = conn @@ -579,12 +592,25 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { xl := pxy.xl xl.Info("incoming a new work connection for sudp proxy, %s", conn.RemoteAddr().String()) + var rwc io.ReadWriteCloser = conn + var err error if pxy.limiter != nil { - rwc := frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error { + rwc = frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error { return conn.Close() }) - conn = frpNet.WrapReadWriteCloserToConn(rwc, conn) } + if pxy.cfg.UseEncryption { + rwc, err = frpIo.WithEncryption(rwc, []byte(pxy.clientCfg.Token)) + if err != nil { + conn.Close() + xl.Error("create encryption stream error: %v", err) + return + } + } + if pxy.cfg.UseCompression { + rwc = frpIo.WithCompression(rwc) + } + conn = frpNet.WrapReadWriteCloserToConn(rwc, conn) workConn := conn readCh := make(chan *msg.UDPPacket, 1024) diff --git a/client/visitor.go b/client/visitor.go index 37491245..ce5625f0 100644 --- a/client/visitor.go +++ b/client/visitor.go @@ -488,8 +488,9 @@ func (sv *SUDPVisitor) worker(workConn net.Conn) { xl.Info("sudp worker is closed") } -func (sv *SUDPVisitor) getNewVisitorConn() (visitorConn net.Conn, err error) { - visitorConn, err = sv.ctl.connectServer() +func (sv *SUDPVisitor) getNewVisitorConn() (net.Conn, error) { + xl := xlog.FromContextSafe(sv.ctx) + visitorConn, err := sv.ctl.connectServer() if err != nil { return nil, fmt.Errorf("frpc connect frps error: %v", err) } @@ -518,7 +519,20 @@ func (sv *SUDPVisitor) getNewVisitorConn() (visitorConn net.Conn, err error) { if newVisitorConnRespMsg.Error != "" { return nil, fmt.Errorf("start new visitor connection error: %s", newVisitorConnRespMsg.Error) } - return + + var remote io.ReadWriteCloser + remote = visitorConn + if sv.cfg.UseEncryption { + remote, err = frpIo.WithEncryption(remote, []byte(sv.cfg.Sk)) + if err != nil { + xl.Error("create encryption stream error: %v", err) + return nil, err + } + } + if sv.cfg.UseCompression { + remote = frpIo.WithCompression(remote) + } + return frpNet.WrapReadWriteCloserToConn(remote, visitorConn), nil } func (sv *SUDPVisitor) Close() { diff --git a/go.mod b/go.mod index 1e2a4703..d681988a 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/klauspost/cpuid v1.2.0 // indirect github.com/klauspost/reedsolomon v1.9.1 // indirect github.com/mattn/go-runewidth v0.0.4 // indirect - github.com/onsi/ginkgo v1.12.2 + github.com/onsi/ginkgo v1.12.3 github.com/onsi/gomega v1.10.1 github.com/pires/go-proxyproto v0.0.0-20190111085350-4d51b51e3bfc github.com/pquerna/cachecontrol v0.0.0-20180517163645-1555304b9b35 // indirect @@ -33,6 +33,7 @@ require ( github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae // indirect golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7 golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d + golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980 // indirect golang.org/x/time v0.0.0-20191024005414-555d28b269f0 gopkg.in/square/go-jose.v2 v2.4.1 // indirect k8s.io/apimachinery v0.18.3 diff --git a/go.sum b/go.sum index d699b3f6..2b5e4275 100644 --- a/go.sum +++ b/go.sum @@ -115,8 +115,8 @@ github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= -github.com/onsi/ginkgo v1.12.2 h1:Ke9m3h2Hu0wsZ45yewCqhYr3Z+emcNTuLY2nMWCkrSI= -github.com/onsi/ginkgo v1.12.2/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= +github.com/onsi/ginkgo v1.12.3 h1:+RYp9QczoWz9zfUyLP/5SLXQVhfr6gZOoKGfQqHuLZQ= +github.com/onsi/ginkgo v1.12.3/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= @@ -206,6 +206,8 @@ golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200519105757-fe76b779f299 h1:DYfZAGf2WMFjMxbgTjaC+2HC7NkNAQs+6Q8b9WEB/F4= golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980 h1:OjiUf46hAmXblsZdnoSXsEUSKU8r1UEzcL5RVZ4gO9Y= +golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= diff --git a/hack/run-e2e.sh b/hack/run-e2e.sh index 6ac2f27b..68db6bfa 100755 --- a/hack/run-e2e.sh +++ b/hack/run-e2e.sh @@ -8,4 +8,8 @@ if [ $? -ne 0 ]; then go get -u github.com/onsi/ginkgo/ginkgo fi -ginkgo -nodes=1 ${ROOT}/test/e2e -- -frpc-path=${ROOT}/bin/frpc -frps-path=${ROOT}/bin/frps -log-level=debug +debug=false +if [ x${DEBUG} == x"true" ]; then + debug=true +fi +ginkgo -nodes=4 ${ROOT}/test/e2e -- -frpc-path=${ROOT}/bin/frpc -frps-path=${ROOT}/bin/frps -log-level=debug -debug=${debug} diff --git a/server/proxy/udp.go b/server/proxy/udp.go index 87540a99..d87e0efd 100644 --- a/server/proxy/udp.go +++ b/server/proxy/udp.go @@ -17,6 +17,7 @@ package proxy import ( "context" "fmt" + "io" "net" "time" @@ -24,8 +25,10 @@ import ( "github.com/fatedier/frp/models/msg" "github.com/fatedier/frp/models/proto/udp" "github.com/fatedier/frp/server/metrics" + frpNet "github.com/fatedier/frp/utils/net" "github.com/fatedier/golib/errors" + frpIo "github.com/fatedier/golib/io" ) type UDPProxy struct { @@ -174,14 +177,28 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) { } continue } - // close the old workConn and replac it with a new one + // close the old workConn and replace it with a new one if pxy.workConn != nil { pxy.workConn.Close() } - pxy.workConn = workConn + + var rwc io.ReadWriteCloser = workConn + if pxy.cfg.UseEncryption { + rwc, err = frpIo.WithEncryption(rwc, []byte(pxy.serverCfg.Token)) + if err != nil { + xl.Error("create encryption stream error: %v", err) + workConn.Close() + continue + } + } + if pxy.cfg.UseCompression { + rwc = frpIo.WithCompression(rwc) + } + + pxy.workConn = frpNet.WrapReadWriteCloserToConn(rwc, workConn) ctx, cancel := context.WithCancel(context.Background()) - go workConnReaderFn(workConn) - go workConnSenderFn(workConn, ctx) + go workConnReaderFn(pxy.workConn) + go workConnSenderFn(pxy.workConn, ctx) _, ok := <-pxy.checkCloseCh cancel() if !ok { diff --git a/test/e2e/basic/basic.go b/test/e2e/basic/basic.go new file mode 100644 index 00000000..ada4df68 --- /dev/null +++ b/test/e2e/basic/basic.go @@ -0,0 +1,198 @@ +package basic + +import ( + "fmt" + "strings" + "time" + + "github.com/fatedier/frp/test/e2e/framework" + "github.com/fatedier/frp/test/e2e/framework/consts" + + . "github.com/onsi/ginkgo" +) + +var connTimeout = 2 * time.Second + +var _ = Describe("[Feature: Basic]", func() { + f := framework.NewDefaultFramework() + + Describe("TCP && UDP", func() { + types := []string{"tcp", "udp"} + for _, t := range types { + proxyType := t + It(fmt.Sprintf("Expose a %s echo server", strings.ToUpper(proxyType)), func() { + serverConf := consts.DefaultServerConfig + clientConf := consts.DefaultClientConfig + + localPortName := "" + protocol := "tcp" + switch proxyType { + case "tcp": + localPortName = framework.TCPEchoServerPort + protocol = "tcp" + case "udp": + localPortName = framework.UDPEchoServerPort + protocol = "udp" + } + getProxyConf := func(proxyName string, portName string, extra string) string { + return fmt.Sprintf(` + [%s] + type = %s + local_port = {{ .%s }} + remote_port = {{ .%s }} + `+extra, proxyName, proxyType, localPortName, portName) + } + + tests := []struct { + proxyName string + portName string + extraConfig string + }{ + { + proxyName: "normal", + portName: framework.GenPortName("Normal"), + }, + { + proxyName: "with-encryption", + portName: framework.GenPortName("WithEncryption"), + extraConfig: "use_encryption = true", + }, + { + proxyName: "with-compression", + portName: framework.GenPortName("WithCompression"), + extraConfig: "use_compression = true", + }, + { + proxyName: "with-encryption-and-compression", + portName: framework.GenPortName("WithEncryptionAndCompression"), + extraConfig: ` + use_encryption = true + use_compression = true + `, + }, + } + + // build all client config + for _, test := range tests { + clientConf += getProxyConf(test.proxyName, test.portName, test.extraConfig) + "\n" + } + // run frps and frpc + f.RunProcesses([]string{serverConf}, []string{clientConf}) + + for _, test := range tests { + framework.ExpectRequest(protocol, f.UsedPorts[test.portName], + []byte(consts.TestString), []byte(consts.TestString), connTimeout, test.proxyName) + } + }) + } + }) + + Describe("STCP && SUDP", func() { + types := []string{"stcp", "sudp"} + for _, t := range types { + proxyType := t + It(fmt.Sprintf("Expose echo server with %s", strings.ToUpper(proxyType)), func() { + serverConf := consts.DefaultServerConfig + clientServerConf := consts.DefaultClientConfig + clientVisitorConf := consts.DefaultClientConfig + + localPortName := "" + protocol := "tcp" + switch proxyType { + case "stcp": + localPortName = framework.TCPEchoServerPort + protocol = "tcp" + case "sudp": + localPortName = framework.UDPEchoServerPort + protocol = "udp" + } + + correctSK := "abc" + wrongSK := "123" + + getProxyServerConf := func(proxyName string, extra string) string { + return fmt.Sprintf(` + [%s] + type = %s + role = server + sk = %s + local_port = {{ .%s }} + `+extra, proxyName, proxyType, correctSK, localPortName) + } + getProxyVisitorConf := func(proxyName string, portName, visitorSK, extra string) string { + return fmt.Sprintf(` + [%s] + type = %s + role = visitor + server_name = %s + sk = %s + bind_port = {{ .%s }} + `+extra, proxyName, proxyType, proxyName, visitorSK, portName) + } + + tests := []struct { + proxyName string + bindPortName string + visitorSK string + extraConfig string + expectError bool + }{ + { + proxyName: "normal", + bindPortName: framework.GenPortName("Normal"), + visitorSK: correctSK, + }, + { + proxyName: "with-encryption", + bindPortName: framework.GenPortName("WithEncryption"), + visitorSK: correctSK, + extraConfig: "use_encryption = true", + }, + { + proxyName: "with-compression", + bindPortName: framework.GenPortName("WithCompression"), + visitorSK: correctSK, + extraConfig: "use_compression = true", + }, + { + proxyName: "with-encryption-and-compression", + bindPortName: framework.GenPortName("WithEncryptionAndCompression"), + visitorSK: correctSK, + extraConfig: ` + use_encryption = true + use_compression = true + `, + }, + { + proxyName: "with-error-sk", + bindPortName: framework.GenPortName("WithErrorSK"), + visitorSK: wrongSK, + expectError: true, + }, + } + + // build all client config + for _, test := range tests { + clientServerConf += getProxyServerConf(test.proxyName, test.extraConfig) + "\n" + } + for _, test := range tests { + clientVisitorConf += getProxyVisitorConf(test.proxyName, test.bindPortName, test.visitorSK, test.extraConfig) + "\n" + } + // run frps and frpc + f.RunProcesses([]string{serverConf}, []string{clientServerConf, clientVisitorConf}) + + for _, test := range tests { + expectResp := []byte(consts.TestString) + if test.expectError { + framework.ExpectRequestError(protocol, f.UsedPorts[test.bindPortName], + []byte(consts.TestString), connTimeout, test.proxyName) + continue + } + + framework.ExpectRequest(protocol, f.UsedPorts[test.bindPortName], + []byte(consts.TestString), expectResp, connTimeout, test.proxyName) + } + }) + } + }) +}) diff --git a/test/e2e/basic/client_server.go b/test/e2e/basic/client_server.go new file mode 100644 index 00000000..3bad04ec --- /dev/null +++ b/test/e2e/basic/client_server.go @@ -0,0 +1,95 @@ +package basic + +import ( + "fmt" + + "github.com/fatedier/frp/test/e2e/framework" + "github.com/fatedier/frp/test/e2e/framework/consts" + + . "github.com/onsi/ginkgo" +) + +type generalTestConfigures struct { + server string + client string + expectError bool +} + +func defineClientServerTest(desc string, f *framework.Framework, configures *generalTestConfigures) { + It(desc, func() { + serverConf := consts.DefaultServerConfig + clientConf := consts.DefaultClientConfig + + serverConf += fmt.Sprintf(` + %s + `, configures.server) + + clientConf += fmt.Sprintf(` + %s + + [tcp] + type = tcp + local_port = {{ .%s }} + remote_port = {{ .%s }} + + [udp] + type = udp + local_port = {{ .%s }} + remote_port = {{ .%s }} + `, configures.client, + framework.TCPEchoServerPort, framework.GenPortName("TCP"), + framework.UDPEchoServerPort, framework.GenPortName("UDP"), + ) + + f.RunProcesses([]string{serverConf}, []string{clientConf}) + + if !configures.expectError { + framework.ExpectTCPRequest(f.UsedPorts[framework.GenPortName("TCP")], + []byte(consts.TestString), []byte(consts.TestString), connTimeout, "tcp proxy") + framework.ExpectUDPRequest(f.UsedPorts[framework.GenPortName("UDP")], + []byte(consts.TestString), []byte(consts.TestString), connTimeout, "udp proxy") + } else { + framework.ExpectTCPRequestError(f.UsedPorts[framework.GenPortName("TCP")], + []byte(consts.TestString), connTimeout, "tcp proxy") + framework.ExpectUDPRequestError(f.UsedPorts[framework.GenPortName("UDP")], + []byte(consts.TestString), connTimeout, "udp proxy") + } + }) +} + +var _ = Describe("[Feature: Client-Server]", func() { + f := framework.NewDefaultFramework() + + Describe("Protocol", func() { + supportProtocols := []string{"tcp", "kcp", "websocket"} + for _, protocol := range supportProtocols { + configures := &generalTestConfigures{ + server: fmt.Sprintf(` + kcp_bind_port = {{ .%s }} + protocol = %s" + `, consts.PortServerName, protocol), + client: "protocol = " + protocol, + } + defineClientServerTest(protocol, f, configures) + } + }) + + Describe("Authentication", func() { + func() { + configures := &generalTestConfigures{ + server: "token = 123456", + client: "token = 123456", + } + defineClientServerTest("Token Correct", f, configures) + }() + + func() { + configures := &generalTestConfigures{ + server: "token = 123456", + client: "token = invalid", + expectError: true, + } + defineClientServerTest("Token Incorrect", f, configures) + }() + }) +}) diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go index 79dcbbf8..60d73c5a 100644 --- a/test/e2e/e2e.go +++ b/test/e2e/e2e.go @@ -33,7 +33,8 @@ var _ = ginkgo.SynchronizedAfterSuite(func() { func RunE2ETests(t *testing.T) { gomega.RegisterFailHandler(framework.Fail) - log.Info("Starting e2e run %q on Ginkgo node %d", framework.RunID, config.GinkgoConfig.ParallelNode) + log.Info("Starting e2e run %q on Ginkgo node %d of total %d", + framework.RunID, config.GinkgoConfig.ParallelNode, config.GinkgoConfig.ParallelTotal) ginkgo.RunSpecs(t, "frp e2e suite") } diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index a1e4fd45..afe77c3d 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -8,6 +8,12 @@ import ( "github.com/fatedier/frp/test/e2e/framework" "github.com/fatedier/frp/utils/log" + + // test source + _ "github.com/fatedier/frp/test/e2e/basic" + _ "github.com/fatedier/frp/test/e2e/plugin" + + _ "github.com/onsi/ginkgo" ) // handleFlags sets up all flags and parses the command line. diff --git a/test/e2e/examples.go b/test/e2e/examples.go index d3e3944b..0628bdcd 100644 --- a/test/e2e/examples.go +++ b/test/e2e/examples.go @@ -10,31 +10,26 @@ import ( . "github.com/onsi/ginkgo" ) -var connTimeout = 5 * time.Second +var connTimeout = 2 * time.Second var _ = Describe("[Feature: Example]", func() { f := framework.NewDefaultFramework() Describe("TCP", func() { It("Expose a TCP echo server", func() { - serverConf := ` - [common] - bind_port = {{ .PortServer }} - ` - - clientConf := fmt.Sprintf(` - [common] - server_port = {{ .PortServer }} + serverConf := consts.DefaultServerConfig + clientConf := consts.DefaultClientConfig + clientConf += fmt.Sprintf(` [tcp] type = tcp local_port = {{ .%s }} - remote_port = {{ .PortTCP }} - `, framework.TCPEchoServerPort) + remote_port = {{ .%s }} + `, framework.TCPEchoServerPort, framework.GenPortName("TCP")) f.RunProcesses([]string{serverConf}, []string{clientConf}) - framework.ExpectTCPReuqest(f.UsedPorts["PortTCP"], []byte(consts.TestString), []byte(consts.TestString), connTimeout) + framework.ExpectTCPRequest(f.UsedPorts[framework.GenPortName("TCP")], []byte(consts.TestString), []byte(consts.TestString), connTimeout) }) }) }) diff --git a/test/e2e/framework/consts/consts.go b/test/e2e/framework/consts/consts.go index cf3499bc..26bc92f0 100644 --- a/test/e2e/framework/consts/consts.go +++ b/test/e2e/framework/consts/consts.go @@ -3,3 +3,19 @@ package consts const ( TestString = "frp is a fast reverse proxy to help you expose a local server behind a NAT or firewall to the internet." ) + +const ( + PortServerName = "PortServer" +) + +const ( + DefaultServerConfig = ` + [common] + bind_port = {{ .PortServer }} + ` + + DefaultClientConfig = ` + [common] + server_port = {{ .PortServer }} + ` +) diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go index 64daee32..662f80d6 100644 --- a/test/e2e/framework/framework.go +++ b/test/e2e/framework/framework.go @@ -60,10 +60,6 @@ func NewFramework(opt Options) *Framework { f := &Framework{ portAllocator: port.NewAllocator(opt.FromPortIndex, opt.ToPortIndex, opt.TotalParallelNode, opt.CurrentNodeIndex-1), } - f.mockServers = NewMockServers(f.portAllocator) - if err := f.mockServers.Run(); err != nil { - Failf("%v", err) - } ginkgo.BeforeEach(f.BeforeEach) ginkgo.AfterEach(f.AfterEach) @@ -79,6 +75,11 @@ func (f *Framework) BeforeEach() { dir, err := ioutil.TempDir(os.TempDir(), "frpe2e-test-*") ExpectNoError(err) f.TempDirectory = dir + + f.mockServers = NewMockServers(f.portAllocator) + if err := f.mockServers.Run(); err != nil { + Failf("%v", err) + } } func (f *Framework) AfterEach() { @@ -88,19 +89,38 @@ func (f *Framework) AfterEach() { RemoveCleanupAction(f.cleanupHandle) - os.RemoveAll(f.TempDirectory) - f.TempDirectory = "" - f.UsedPorts = nil - f.serverConfPaths = nil - f.clientConfPaths = nil + // stop processor for _, p := range f.serverProcesses { p.Stop() + if TestContext.Debug { + fmt.Println(p.ErrorOutput()) + fmt.Println(p.StdOutput()) + } } for _, p := range f.clientProcesses { p.Stop() + if TestContext.Debug { + fmt.Println(p.ErrorOutput()) + fmt.Println(p.StdOutput()) + } } f.serverProcesses = nil f.clientProcesses = nil + + // close mock servers + f.mockServers.Close() + + // clean directory + os.RemoveAll(f.TempDirectory) + f.TempDirectory = "" + f.serverConfPaths = nil + f.clientConfPaths = nil + + // release used ports + for _, port := range f.UsedPorts { + f.portAllocator.Release(port) + } + f.UsedPorts = nil } var portRegex = regexp.MustCompile(`{{ \.Port.*? }}`) diff --git a/test/e2e/framework/mockservers.go b/test/e2e/framework/mockservers.go index f549d798..3598aac1 100644 --- a/test/e2e/framework/mockservers.go +++ b/test/e2e/framework/mockservers.go @@ -1,6 +1,9 @@ package framework import ( + "fmt" + "os" + "github.com/fatedier/frp/test/e2e/mock/echoserver" "github.com/fatedier/frp/test/e2e/pkg/port" ) @@ -8,11 +11,13 @@ import ( const ( TCPEchoServerPort = "TCPEchoServerPort" UDPEchoServerPort = "UDPEchoServerPort" + UDSEchoServerAddr = "UDSEchoServerAddr" ) type MockServers struct { tcpEchoServer *echoserver.Server udpEchoServer *echoserver.Server + udsEchoServer *echoserver.Server } func NewMockServers(portAllocator *port.Allocator) *MockServers { @@ -31,6 +36,15 @@ func NewMockServers(portAllocator *port.Allocator) *MockServers { BindPort: int32(udpPort), RepeatNum: 1, }) + + udsIndex := portAllocator.Get() + udsAddr := fmt.Sprintf("%s/frp_echo_server_%d.sock", os.TempDir(), udsIndex) + os.Remove(udsAddr) + s.udsEchoServer = echoserver.New(echoserver.Options{ + Type: echoserver.Unix, + BindAddr: udsAddr, + RepeatNum: 1, + }) return s } @@ -41,13 +55,24 @@ func (m *MockServers) Run() error { if err := m.udpEchoServer.Run(); err != nil { return err } + if err := m.udsEchoServer.Run(); err != nil { + return err + } return nil } +func (m *MockServers) Close() { + m.tcpEchoServer.Close() + m.udpEchoServer.Close() + m.udsEchoServer.Close() + os.Remove(m.udsEchoServer.GetOptions().BindAddr) +} + func (m *MockServers) GetTemplateParams() map[string]interface{} { ret := make(map[string]interface{}) ret[TCPEchoServerPort] = m.tcpEchoServer.GetOptions().BindPort ret[UDPEchoServerPort] = m.udpEchoServer.GetOptions().BindPort + ret[UDSEchoServerAddr] = m.udsEchoServer.GetOptions().BindAddr return ret } diff --git a/test/e2e/framework/request.go b/test/e2e/framework/request.go index 0defcb72..72ae7ba3 100644 --- a/test/e2e/framework/request.go +++ b/test/e2e/framework/request.go @@ -6,8 +6,46 @@ import ( "github.com/fatedier/frp/test/e2e/pkg/request" ) -func ExpectTCPReuqest(port int, in, out []byte, timeout time.Duration) { - res, err := request.SendTCPRequest(port, in, timeout) - ExpectNoError(err) - ExpectEqual(string(out), res) +func ExpectRequest(protocol string, port int, in, out []byte, timeout time.Duration, explain ...interface{}) { + switch protocol { + case "tcp": + ExpectTCPRequest(port, in, out, timeout, explain...) + case "udp": + ExpectUDPRequest(port, in, out, timeout, explain...) + default: + Failf("ExpectRequest not support protocol: %s", protocol) + } +} + +func ExpectRequestError(protocol string, port int, in []byte, timeout time.Duration, explain ...interface{}) { + switch protocol { + case "tcp": + ExpectTCPRequestError(port, in, timeout, explain...) + case "udp": + ExpectUDPRequestError(port, in, timeout, explain...) + default: + Failf("ExpectRequestError not support protocol: %s", protocol) + } +} + +func ExpectTCPRequest(port int, in, out []byte, timeout time.Duration, explain ...interface{}) { + res, err := request.SendTCPRequest(port, in, timeout) + ExpectNoError(err, explain...) + ExpectEqual(string(out), res, explain...) +} + +func ExpectTCPRequestError(port int, in []byte, timeout time.Duration, explain ...interface{}) { + _, err := request.SendTCPRequest(port, in, timeout) + ExpectError(err, explain...) +} + +func ExpectUDPRequest(port int, in, out []byte, timeout time.Duration, explain ...interface{}) { + res, err := request.SendUDPRequest(port, in, timeout) + ExpectNoError(err, explain...) + ExpectEqual(string(out), res, explain...) +} + +func ExpectUDPRequestError(port int, in []byte, timeout time.Duration, explain ...interface{}) { + _, err := request.SendUDPRequest(port, in, timeout) + ExpectError(err, explain...) } diff --git a/test/e2e/framework/test_context.go b/test/e2e/framework/test_context.go index e727b44e..958c78f8 100644 --- a/test/e2e/framework/test_context.go +++ b/test/e2e/framework/test_context.go @@ -4,12 +4,15 @@ import ( "flag" "fmt" "os" + + "github.com/onsi/ginkgo/config" ) type TestContextType struct { FRPClientPath string FRPServerPath string LogLevel string + Debug bool } var TestContext TestContextType @@ -23,9 +26,16 @@ var TestContext TestContextType // regardless whether the test is actually in the test suite. // func RegisterCommonFlags(flags *flag.FlagSet) { + // Turn on EmitSpecProgress to get spec progress (especially on interrupt) + config.GinkgoConfig.EmitSpecProgress = true + + // Randomize specs as well as suites + config.GinkgoConfig.RandomizeAllSpecs = true + flags.StringVar(&TestContext.FRPClientPath, "frpc-path", "../../bin/frpc", "The frp client binary to use.") flags.StringVar(&TestContext.FRPServerPath, "frps-path", "../../bin/frps", "The frp server binary to use.") flags.StringVar(&TestContext.LogLevel, "log-level", "debug", "Log level.") + flags.BoolVar(&TestContext.Debug, "debug", false, "Enable debug mode to print detail info.") } func ValidateTestContext(t *TestContextType) error { diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 5f99a899..9c2ae83f 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -12,3 +12,7 @@ func init() { uuid, _ := uuid.NewUUID() RunID = uuid.String() } + +func GenPortName(name string) string { + return "Port" + name +} diff --git a/test/e2e/pkg/process/process.go b/test/e2e/pkg/process/process.go index 044f6bc6..6417276d 100644 --- a/test/e2e/pkg/process/process.go +++ b/test/e2e/pkg/process/process.go @@ -10,6 +10,7 @@ type Process struct { cmd *exec.Cmd cancel context.CancelFunc errorOutput *bytes.Buffer + stdOutput *bytes.Buffer beforeStopHandler func() } @@ -22,7 +23,9 @@ func New(path string, params []string) *Process { cancel: cancel, } p.errorOutput = bytes.NewBufferString("") + p.stdOutput = bytes.NewBufferString("") cmd.Stderr = p.errorOutput + cmd.Stdout = p.stdOutput return p } @@ -42,6 +45,10 @@ func (p *Process) ErrorOutput() string { return p.errorOutput.String() } +func (p *Process) StdOutput() string { + return p.stdOutput.String() +} + func (p *Process) SetBeforeStopHandler(fn func()) { p.beforeStopHandler = fn } diff --git a/test/e2e/pkg/request/request.go b/test/e2e/pkg/request/request.go index e641ae9c..cc4c347d 100644 --- a/test/e2e/pkg/request/request.go +++ b/test/e2e/pkg/request/request.go @@ -6,26 +6,35 @@ import ( "time" ) -func SendTCPRequest(port int, content []byte, timeout time.Duration) (res string, err error) { +func SendTCPRequest(port int, content []byte, timeout time.Duration) (string, error) { c, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", port)) if err != nil { - err = fmt.Errorf("connect to tcp server error: %v", err) - return + return "", fmt.Errorf("connect to tcp server error: %v", err) } defer c.Close() c.SetDeadline(time.Now().Add(timeout)) - return sendTCPRequestByConn(c, content) + return sendRequestByConn(c, content) } -func sendTCPRequestByConn(c net.Conn, content []byte) (res string, err error) { +func SendUDPRequest(port int, content []byte, timeout time.Duration) (string, error) { + c, err := net.Dial("udp", fmt.Sprintf("127.0.0.1:%d", port)) + if err != nil { + return "", fmt.Errorf("connect to udp server error: %v", err) + } + defer c.Close() + + c.SetDeadline(time.Now().Add(timeout)) + return sendRequestByConn(c, content) +} + +func sendRequestByConn(c net.Conn, content []byte) (string, error) { c.Write(content) buf := make([]byte, 2048) - n, errRet := c.Read(buf) - if errRet != nil { - err = fmt.Errorf("read from tcp error: %v", errRet) - return + n, err := c.Read(buf) + if err != nil { + return "", fmt.Errorf("read error: %v", err) } return string(buf[:n]), nil } diff --git a/test/e2e/plugin/client_plugins.go b/test/e2e/plugin/client_plugins.go new file mode 100644 index 00000000..0c612982 --- /dev/null +++ b/test/e2e/plugin/client_plugins.go @@ -0,0 +1,76 @@ +package plugin + +import ( + "fmt" + "time" + + "github.com/fatedier/frp/test/e2e/framework" + "github.com/fatedier/frp/test/e2e/framework/consts" + + . "github.com/onsi/ginkgo" +) + +var connTimeout = 2 * time.Second + +var _ = Describe("[Feature: Client-Plugins]", func() { + f := framework.NewDefaultFramework() + + Describe("UnixDomainSocket", func() { + It("Expose a unix domain socket echo server", func() { + serverConf := consts.DefaultServerConfig + clientConf := consts.DefaultClientConfig + + getProxyConf := func(proxyName string, portName string, extra string) string { + return fmt.Sprintf(` + [%s] + type = tcp + remote_port = {{ .%s }} + plugin = unix_domain_socket + plugin_unix_path = {{ .%s }} + `+extra, proxyName, portName, framework.UDSEchoServerAddr) + } + + tests := []struct { + proxyName string + portName string + extraConfig string + }{ + { + proxyName: "normal", + portName: framework.GenPortName("Normal"), + }, + { + proxyName: "with-encryption", + portName: framework.GenPortName("WithEncryption"), + extraConfig: "use_encryption = true", + }, + { + proxyName: "with-compression", + portName: framework.GenPortName("WithCompression"), + extraConfig: "use_compression = true", + }, + { + proxyName: "with-encryption-and-compression", + portName: framework.GenPortName("WithEncryptionAndCompression"), + extraConfig: ` + use_encryption = true + use_compression = true + `, + }, + } + + // build all client config + for _, test := range tests { + clientConf += getProxyConf(test.proxyName, test.portName, test.extraConfig) + "\n" + } + // run frps and frpc + f.RunProcesses([]string{serverConf}, []string{clientConf}) + + for _, test := range tests { + framework.ExpectTCPRequest(f.UsedPorts[test.portName], + []byte(consts.TestString), []byte(consts.TestString), + connTimeout, test.proxyName) + } + }) + }) +}) diff --git a/utils/net/udp.go b/utils/net/udp.go index 3755fe94..67d66665 100644 --- a/utils/net/udp.go +++ b/utils/net/udp.go @@ -246,7 +246,9 @@ func (l *UDPListener) Accept() (net.Conn, error) { func (l *UDPListener) Close() error { if !l.closeFlag { l.closeFlag = true - l.readConn.Close() + if l.readConn != nil { + l.readConn.Close() + } } return nil }