From 293003fcdb6a090f9e2919b8e7d1224ab1e41549 Mon Sep 17 00:00:00 2001 From: fatedier Date: Thu, 13 Jan 2022 14:26:07 +0800 Subject: [PATCH] allow to disable application layer heartbeat to reduce traffic cost (#2758) fix #2754 --- client/control.go | 25 ++++++++--- client/service.go | 2 +- conf/frpc_full.ini | 6 ++- conf/frps_full.ini | 5 ++- pkg/config/client.go | 78 +++++++++++++++++----------------- pkg/config/client_test.go | 55 ++++++++++++------------ pkg/config/server.go | 78 ++++++++++++++++++---------------- pkg/config/server_test.go | 62 ++++++++++++++------------- server/control.go | 13 ++++-- server/service.go | 2 +- test/e2e/features/heartbeat.go | 48 +++++++++++++++++++++ 11 files changed, 228 insertions(+), 146 deletions(-) create mode 100644 test/e2e/features/heartbeat.go diff --git a/client/control.go b/client/control.go index b563be44..07a6d226 100644 --- a/client/control.go +++ b/client/control.go @@ -311,16 +311,27 @@ func (ctl *Control) msgHandler() { }() defer ctl.msgHandlerShutdown.Done() - hbSend := time.NewTicker(time.Duration(ctl.clientCfg.HeartbeatInterval) * time.Second) - defer hbSend.Stop() - hbCheck := time.NewTicker(time.Second) - defer hbCheck.Stop() + var hbSendCh <-chan time.Time + // TODO(fatedier): disable heartbeat if TCPMux is enabled. + // Just keep it here to keep compatible with old version frps. + if ctl.clientCfg.HeartbeatInterval > 0 { + hbSend := time.NewTicker(time.Duration(ctl.clientCfg.HeartbeatInterval) * time.Second) + defer hbSend.Stop() + hbSendCh = hbSend.C + } + + var hbCheckCh <-chan time.Time + // Check heartbeat timeout only if TCPMux is not enabled and users don't disable heartbeat feature. + if ctl.clientCfg.HeartbeatInterval > 0 && ctl.clientCfg.HeartbeatTimeout > 0 && !ctl.clientCfg.TCPMux { + hbCheck := time.NewTicker(time.Second) + defer hbCheck.Stop() + hbCheckCh = hbCheck.C + } ctl.lastPong = time.Now() - for { select { - case <-hbSend.C: + case <-hbSendCh: // send heartbeat to server xl.Debug("send heartbeat to server") pingMsg := &msg.Ping{} @@ -329,7 +340,7 @@ func (ctl *Control) msgHandler() { return } ctl.sendCh <- pingMsg - case <-hbCheck.C: + case <-hbCheckCh: if time.Since(ctl.lastPong) > time.Duration(ctl.clientCfg.HeartbeatTimeout)*time.Second { xl.Warn("heartbeat timeout") // let reader() stop diff --git a/client/service.go b/client/service.go index d0f38453..67bf3dd0 100644 --- a/client/service.go +++ b/client/service.go @@ -249,7 +249,7 @@ func (svr *Service) login() (conn net.Conn, session *fmux.Session, err error) { if svr.cfg.TCPMux { fmuxCfg := fmux.DefaultConfig() - fmuxCfg.KeepAliveInterval = 20 * time.Second + fmuxCfg.KeepAliveInterval = time.Duration(svr.cfg.TCPMuxKeepaliveInterval) * time.Second fmuxCfg.LogOutput = io.Discard session, err = fmux.Client(conn, fmuxCfg) if err != nil { diff --git a/conf/frpc_full.ini b/conf/frpc_full.ini index 21fc5351..5aaaecb7 100644 --- a/conf/frpc_full.ini +++ b/conf/frpc_full.ini @@ -61,6 +61,9 @@ pool_count = 5 # if tcp stream multiplexing is used, default is true, it must be same with frps tcp_mux = true +# specify keep alive interval for tcp mux. +# only valid if tcp_mux is true. +# tcp_mux_keepalive_interval = 60 # your proxy name will be changed to {user}.{proxy} user = your_name @@ -89,7 +92,8 @@ tls_enable = true # start = ssh,dns # heartbeat configure, it's not recommended to modify the default value -# the default value of heartbeat_interval is 10 and heartbeat_timeout is 90 +# The default value of heartbeat_interval is 10 and heartbeat_timeout is 90. Set negative value +# to disable it. # heartbeat_interval = 30 # heartbeat_timeout = 90 diff --git a/conf/frps_full.ini b/conf/frps_full.ini index c3da4e2c..4aef9774 100644 --- a/conf/frps_full.ini +++ b/conf/frps_full.ini @@ -92,7 +92,7 @@ oidc_skip_expiry_check = false oidc_skip_issuer_check = false # heartbeat configure, it's not recommended to modify the default value -# the default value of heartbeat_timeout is 90 +# the default value of heartbeat_timeout is 90. Set negative value to disable it. # heartbeat_timeout = 90 # user_conn_timeout configure, it's not recommended to modify the default value @@ -121,6 +121,9 @@ subdomain_host = frps.com # if tcp stream multiplexing is used, default is true tcp_mux = true +# specify keep alive interval for tcp mux. +# only valid if tcp_mux is true. +# tcp_mux_keepalive_interval = 60 # custom 404 page for HTTP requests # custom_404_page = /path/to/404.html diff --git a/pkg/config/client.go b/pkg/config/client.go index b2efb79a..f2d1a07d 100644 --- a/pkg/config/client.go +++ b/pkg/config/client.go @@ -86,6 +86,9 @@ type ClientCommonConf struct { // the server must have TCP multiplexing enabled as well. By default, this // value is true. TCPMux bool `ini:"tcp_mux" json:"tcp_mux"` + // TCPMuxKeepaliveInterval specifies the keep alive interval for TCP stream multipler. + // If TCPMux is true, heartbeat of application layer is unnecessary because it can only rely on heartbeat in TCPMux. + TCPMuxKeepaliveInterval int64 `ini:"tcp_mux_keepalive_interval" json:"tcp_mux_keepalive_interval"` // User specifies a prefix for proxy names to distinguish them from other // clients. If this value is not "", proxy names will automatically be // changed to "{user}.{proxy_name}". By default, this value is "". @@ -129,11 +132,11 @@ type ClientCommonConf struct { DisableCustomTLSFirstByte bool `ini:"disable_custom_tls_first_byte" json:"disable_custom_tls_first_byte"` // HeartBeatInterval specifies at what interval heartbeats are sent to the // server, in seconds. It is not recommended to change this value. By - // default, this value is 30. + // default, this value is 30. Set negative value to disable it. HeartbeatInterval int64 `ini:"heartbeat_interval" json:"heartbeat_interval"` // HeartBeatTimeout specifies the maximum allowed heartbeat response delay // before the connection is terminated, in seconds. It is not recommended - // to change this value. By default, this value is 90. + // to change this value. By default, this value is 90. Set negative value to disable it. HeartbeatTimeout int64 `ini:"heartbeat_timeout" json:"heartbeat_timeout"` // Client meta info Metas map[string]string `ini:"-" json:"metas"` @@ -147,36 +150,37 @@ type ClientCommonConf struct { // GetDefaultClientConf returns a client configuration with default values. func GetDefaultClientConf() ClientCommonConf { return ClientCommonConf{ - ClientConfig: auth.GetDefaultClientConf(), - ServerAddr: "0.0.0.0", - ServerPort: 7000, - HTTPProxy: os.Getenv("http_proxy"), - LogFile: "console", - LogWay: "console", - LogLevel: "info", - LogMaxDays: 3, - DisableLogColor: false, - AdminAddr: "127.0.0.1", - AdminPort: 0, - AdminUser: "", - AdminPwd: "", - AssetsDir: "", - PoolCount: 1, - TCPMux: true, - User: "", - DNSServer: "", - LoginFailExit: true, - Start: make([]string, 0), - Protocol: "tcp", - TLSEnable: false, - TLSCertFile: "", - TLSKeyFile: "", - TLSTrustedCaFile: "", - HeartbeatInterval: 30, - HeartbeatTimeout: 90, - Metas: make(map[string]string), - UDPPacketSize: 1500, - IncludeConfigFiles: make([]string, 0), + ClientConfig: auth.GetDefaultClientConf(), + ServerAddr: "0.0.0.0", + ServerPort: 7000, + HTTPProxy: os.Getenv("http_proxy"), + LogFile: "console", + LogWay: "console", + LogLevel: "info", + LogMaxDays: 3, + DisableLogColor: false, + AdminAddr: "127.0.0.1", + AdminPort: 0, + AdminUser: "", + AdminPwd: "", + AssetsDir: "", + PoolCount: 1, + TCPMux: true, + TCPMuxKeepaliveInterval: 60, + User: "", + DNSServer: "", + LoginFailExit: true, + Start: make([]string, 0), + Protocol: "tcp", + TLSEnable: false, + TLSCertFile: "", + TLSKeyFile: "", + TLSTrustedCaFile: "", + HeartbeatInterval: 30, + HeartbeatTimeout: 90, + Metas: make(map[string]string), + UDPPacketSize: 1500, + IncludeConfigFiles: make([]string, 0), } } @@ -189,12 +193,10 @@ func (cfg *ClientCommonConf) Complete() { } func (cfg *ClientCommonConf) Validate() error { - if cfg.HeartbeatInterval <= 0 { - return fmt.Errorf("invalid heartbeat_interval") - } - - if cfg.HeartbeatTimeout < cfg.HeartbeatInterval { - return fmt.Errorf("invalid heartbeat_timeout, heartbeat_timeout is less than heartbeat_interval") + if cfg.HeartbeatTimeout > 0 && cfg.HeartbeatInterval > 0 { + if cfg.HeartbeatTimeout < cfg.HeartbeatInterval { + return fmt.Errorf("invalid heartbeat_timeout, heartbeat_timeout is less than heartbeat_interval") + } } if cfg.TLSEnable == false { diff --git a/pkg/config/client_test.go b/pkg/config/client_test.go index e9e8e363..c78e3294 100644 --- a/pkg/config/client_test.go +++ b/pkg/config/client_test.go @@ -259,33 +259,34 @@ func Test_LoadClientCommonConf(t *testing.T) { OidcTokenEndpointURL: "endpoint_url", }, }, - ServerAddr: "0.0.0.9", - ServerPort: 7009, - HTTPProxy: "http://user:passwd@192.168.1.128:8080", - LogFile: "./frpc.log9", - LogWay: "file", - LogLevel: "info9", - LogMaxDays: 39, - DisableLogColor: false, - AdminAddr: "127.0.0.9", - AdminPort: 7409, - AdminUser: "admin9", - AdminPwd: "admin9", - AssetsDir: "./static9", - PoolCount: 59, - TCPMux: true, - User: "your_name", - LoginFailExit: true, - Protocol: "tcp", - TLSEnable: true, - TLSCertFile: "client.crt", - TLSKeyFile: "client.key", - TLSTrustedCaFile: "ca.crt", - TLSServerName: "example.com", - DNSServer: "8.8.8.9", - Start: []string{"ssh", "dns"}, - HeartbeatInterval: 39, - HeartbeatTimeout: 99, + ServerAddr: "0.0.0.9", + ServerPort: 7009, + HTTPProxy: "http://user:passwd@192.168.1.128:8080", + LogFile: "./frpc.log9", + LogWay: "file", + LogLevel: "info9", + LogMaxDays: 39, + DisableLogColor: false, + AdminAddr: "127.0.0.9", + AdminPort: 7409, + AdminUser: "admin9", + AdminPwd: "admin9", + AssetsDir: "./static9", + PoolCount: 59, + TCPMux: true, + TCPMuxKeepaliveInterval: 60, + User: "your_name", + LoginFailExit: true, + Protocol: "tcp", + TLSEnable: true, + TLSCertFile: "client.crt", + TLSKeyFile: "client.key", + TLSTrustedCaFile: "ca.crt", + TLSServerName: "example.com", + DNSServer: "8.8.8.9", + Start: []string{"ssh", "dns"}, + HeartbeatInterval: 39, + HeartbeatTimeout: 99, Metas: map[string]string{ "var1": "123", "var2": "234", diff --git a/pkg/config/server.go b/pkg/config/server.go index 8e7f7ad2..d002f9ff 100644 --- a/pkg/config/server.go +++ b/pkg/config/server.go @@ -118,6 +118,9 @@ type ServerCommonConf struct { // from a client to share a single TCP connection. By default, this value // is true. TCPMux bool `ini:"tcp_mux" json:"tcp_mux"` + // TCPMuxKeepaliveInterval specifies the keep alive interval for TCP stream multipler. + // If TCPMux is true, heartbeat of application layer is unnecessary because it can only rely on heartbeat in TCPMux. + TCPMuxKeepaliveInterval int64 `ini:"tcp_mux_keepalive_interval" json:"tcp_mux_keepalive_interval"` // Custom404Page specifies a path to a custom 404 page to display. If this // value is "", a default page will be displayed. By default, this value is // "". @@ -154,7 +157,7 @@ type ServerCommonConf struct { TLSTrustedCaFile string `ini:"tls_trusted_ca_file" json:"tls_trusted_ca_file"` // HeartBeatTimeout specifies the maximum time to wait for a heartbeat // before terminating the connection. It is not recommended to change this - // value. By default, this value is 90. + // value. By default, this value is 90. Set negative value to disable it. HeartbeatTimeout int64 `ini:"heartbeat_timeout" json:"heartbeat_timeout"` // UserConnTimeout specifies the maximum time to wait for a work // connection. By default, this value is 10. @@ -170,42 +173,43 @@ type ServerCommonConf struct { // defaults. func GetDefaultServerConf() ServerCommonConf { return ServerCommonConf{ - ServerConfig: auth.GetDefaultServerConf(), - BindAddr: "0.0.0.0", - BindPort: 7000, - BindUDPPort: 0, - KCPBindPort: 0, - ProxyBindAddr: "", - VhostHTTPPort: 0, - VhostHTTPSPort: 0, - TCPMuxHTTPConnectPort: 0, - VhostHTTPTimeout: 60, - DashboardAddr: "0.0.0.0", - DashboardPort: 0, - DashboardUser: "", - DashboardPwd: "", - EnablePrometheus: false, - AssetsDir: "", - LogFile: "console", - LogWay: "console", - LogLevel: "info", - LogMaxDays: 3, - DisableLogColor: false, - DetailedErrorsToClient: true, - SubDomainHost: "", - TCPMux: true, - AllowPorts: make(map[int]struct{}), - MaxPoolCount: 5, - MaxPortsPerClient: 0, - TLSOnly: false, - TLSCertFile: "", - TLSKeyFile: "", - TLSTrustedCaFile: "", - HeartbeatTimeout: 90, - UserConnTimeout: 10, - Custom404Page: "", - HTTPPlugins: make(map[string]plugin.HTTPPluginOptions), - UDPPacketSize: 1500, + ServerConfig: auth.GetDefaultServerConf(), + BindAddr: "0.0.0.0", + BindPort: 7000, + BindUDPPort: 0, + KCPBindPort: 0, + ProxyBindAddr: "", + VhostHTTPPort: 0, + VhostHTTPSPort: 0, + TCPMuxHTTPConnectPort: 0, + VhostHTTPTimeout: 60, + DashboardAddr: "0.0.0.0", + DashboardPort: 0, + DashboardUser: "", + DashboardPwd: "", + EnablePrometheus: false, + AssetsDir: "", + LogFile: "console", + LogWay: "console", + LogLevel: "info", + LogMaxDays: 3, + DisableLogColor: false, + DetailedErrorsToClient: true, + SubDomainHost: "", + TCPMux: true, + TCPMuxKeepaliveInterval: 60, + AllowPorts: make(map[int]struct{}), + MaxPoolCount: 5, + MaxPortsPerClient: 0, + TLSOnly: false, + TLSCertFile: "", + TLSKeyFile: "", + TLSTrustedCaFile: "", + HeartbeatTimeout: 90, + UserConnTimeout: 10, + Custom404Page: "", + HTTPPlugins: make(map[string]plugin.HTTPPluginOptions), + UDPPacketSize: 1500, } } diff --git a/pkg/config/server_test.go b/pkg/config/server_test.go index 18f6d7ae..bdf20cb4 100644 --- a/pkg/config/server_test.go +++ b/pkg/config/server_test.go @@ -131,15 +131,16 @@ func Test_LoadServerCommonConf(t *testing.T) { 12: struct{}{}, 99: struct{}{}, }, - MaxPoolCount: 59, - MaxPortsPerClient: 9, - TLSOnly: true, - TLSCertFile: "server.crt", - TLSKeyFile: "server.key", - TLSTrustedCaFile: "ca.crt", - SubDomainHost: "frps.com", - TCPMux: true, - UDPPacketSize: 1509, + MaxPoolCount: 59, + MaxPortsPerClient: 9, + TLSOnly: true, + TLSCertFile: "server.crt", + TLSKeyFile: "server.key", + TLSTrustedCaFile: "ca.crt", + SubDomainHost: "frps.com", + TCPMux: true, + TCPMuxKeepaliveInterval: 60, + UDPPacketSize: 1509, HTTPPlugins: map[string]plugin.HTTPPluginOptions{ "user-manager": { @@ -174,27 +175,28 @@ func Test_LoadServerCommonConf(t *testing.T) { AuthenticateNewWorkConns: false, }, }, - BindAddr: "0.0.0.9", - BindPort: 7009, - BindUDPPort: 7008, - ProxyBindAddr: "0.0.0.9", - VhostHTTPTimeout: 60, - DashboardAddr: "0.0.0.0", - DashboardUser: "", - DashboardPwd: "", - EnablePrometheus: false, - LogFile: "console", - LogWay: "console", - LogLevel: "info", - LogMaxDays: 3, - DetailedErrorsToClient: true, - TCPMux: true, - AllowPorts: make(map[int]struct{}), - MaxPoolCount: 5, - HeartbeatTimeout: 90, - UserConnTimeout: 10, - HTTPPlugins: make(map[string]plugin.HTTPPluginOptions), - UDPPacketSize: 1500, + BindAddr: "0.0.0.9", + BindPort: 7009, + BindUDPPort: 7008, + ProxyBindAddr: "0.0.0.9", + VhostHTTPTimeout: 60, + DashboardAddr: "0.0.0.0", + DashboardUser: "", + DashboardPwd: "", + EnablePrometheus: false, + LogFile: "console", + LogWay: "console", + LogLevel: "info", + LogMaxDays: 3, + DetailedErrorsToClient: true, + TCPMux: true, + TCPMuxKeepaliveInterval: 60, + AllowPorts: make(map[int]struct{}), + MaxPoolCount: 5, + HeartbeatTimeout: 90, + UserConnTimeout: 10, + HTTPPlugins: make(map[string]plugin.HTTPPluginOptions), + UDPPacketSize: 1500, }, }, } diff --git a/server/control.go b/server/control.go index 7632bbed..25adc2d2 100644 --- a/server/control.go +++ b/server/control.go @@ -400,12 +400,19 @@ func (ctl *Control) manager() { defer ctl.allShutdown.Start() defer ctl.managerShutdown.Done() - heartbeat := time.NewTicker(time.Second) - defer heartbeat.Stop() + var heartbeatCh <-chan time.Time + if ctl.serverCfg.TCPMux || ctl.serverCfg.HeartbeatTimeout <= 0 { + // Don't need application heartbeat here. + // yamux will do same thing. + } else { + heartbeat := time.NewTicker(time.Second) + defer heartbeat.Stop() + heartbeatCh = heartbeat.C + } for { select { - case <-heartbeat.C: + case <-heartbeatCh: if time.Since(ctl.lastPing) > time.Duration(ctl.serverCfg.HeartbeatTimeout)*time.Second { xl.Warn("heartbeat timeout") return diff --git a/server/service.go b/server/service.go index f6e015a5..bc0d48a6 100644 --- a/server/service.go +++ b/server/service.go @@ -406,7 +406,7 @@ func (svr *Service) HandleListener(l net.Listener) { go func(ctx context.Context, frpConn net.Conn) { if svr.cfg.TCPMux { fmuxCfg := fmux.DefaultConfig() - fmuxCfg.KeepAliveInterval = 20 * time.Second + fmuxCfg.KeepAliveInterval = time.Duration(svr.cfg.TCPMuxKeepaliveInterval) * time.Second fmuxCfg.LogOutput = io.Discard session, err := fmux.Server(frpConn, fmuxCfg) if err != nil { diff --git a/test/e2e/features/heartbeat.go b/test/e2e/features/heartbeat.go new file mode 100644 index 00000000..b0732c37 --- /dev/null +++ b/test/e2e/features/heartbeat.go @@ -0,0 +1,48 @@ +package features + +import ( + "fmt" + "time" + + "github.com/fatedier/frp/test/e2e/framework" + + . "github.com/onsi/ginkgo" +) + +var _ = Describe("[Feature: Heartbeat]", func() { + f := framework.NewDefaultFramework() + + It("disable application layer heartbeat", func() { + serverPort := f.AllocPort() + serverConf := fmt.Sprintf(` + [common] + bind_addr = 0.0.0.0 + bind_port = %d + heartbeat_timeout = -1 + tcp_mux_keepalive_interval = 2 + `, serverPort) + + remotePort := f.AllocPort() + clientConf := fmt.Sprintf(` + [common] + server_port = %d + log_level = trace + heartbeat_interval = -1 + heartbeat_timeout = -1 + tcp_mux_keepalive_interval = 2 + + [tcp] + type = tcp + local_port = %d + remote_port = %d + `, serverPort, f.PortByName(framework.TCPEchoServerPort), remotePort) + + // run frps and frpc + f.RunProcesses([]string{serverConf}, []string{clientConf}) + + framework.NewRequestExpect(f).Protocol("tcp").Port(remotePort).Ensure() + + time.Sleep(5 * time.Second) + framework.NewRequestExpect(f).Protocol("tcp").Port(remotePort).Ensure() + }) +})