Merge pull request #1041 from fatedier/update

optimize code
This commit is contained in:
fatedier 2019-01-15 15:58:06 +08:00 committed by GitHub
commit 54916793f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 1601 additions and 1224 deletions

View File

@ -8,11 +8,12 @@
frp is a fast reverse proxy to help you expose a local server behind a NAT or firewall to the internet. As of now, it supports tcp & udp, as well as http and https protocols, where requests can be forwarded to internal services by domain name.
Now it also try to support p2p connect.
## Table of Contents
<!-- vim-markdown-toc GFM -->
* [What can I do with frp?](#what-can-i-do-with-frp)
* [Status](#status)
* [Architecture](#architecture)
* [Example Usage](#example-usage)
@ -37,6 +38,7 @@ frp is a fast reverse proxy to help you expose a local server behind a NAT or fi
* [Support KCP Protocol](#support-kcp-protocol)
* [Connection Pool](#connection-pool)
* [Load balancing](#load-balancing)
* [Health Check](#health-check)
* [Rewriting the Host Header](#rewriting-the-host-header)
* [Set Headers In HTTP Request](#set-headers-in-http-request)
* [Get Real IP](#get-real-ip)
@ -55,11 +57,6 @@ frp is a fast reverse proxy to help you expose a local server behind a NAT or fi
<!-- vim-markdown-toc -->
## What can I do with frp?
* Expose any http and https service behind a NAT or firewall to the internet by a server with public IP address(Name-based Virtual Host Support).
* Expose any tcp or udp service behind a NAT or firewall to the internet by a server with public IP address.
## Status
frp is under development and you can try it with latest release version. Master branch for releasing stable version when dev branch for developing.
@ -394,11 +391,7 @@ Then visit `http://[server_addr]:7500` to see dashboard, default username and pa
### Authentication
Since v0.10.0, you only need to set `token` in frps.ini and frpc.ini.
Note that time duration between server of frpc and frps mustn't exceed 15 minutes because timestamp is used for authentication.
Howerver, this timeout duration can be modified by setting `authentication_timeout` in frps's configure file. It's defalut value is 900, means 15 minutes. If it is equals 0, then frps will not check authentication timeout.
`token` in frps.ini and frpc.ini should be same.
### Encryption and Compression
@ -540,6 +533,52 @@ group_key = 123
Proxies in same group will accept connections from port 80 randomly.
### Health Check
Health check feature can help you achieve high availability with load balancing.
Add `health_check_type = {type}` to enable health check.
**type** can be tcp or http.
Type tcp will dial the service port and type http will send a http rquest to service and require a 200 response.
Type tcp configuration:
```ini
# frpc.ini
[test1]
type = tcp
local_port = 22
remote_port = 6000
# enable tcp health check
health_check_type = tcp
# dial timeout seconds
health_check_timeout_s = 3
# if continuous failed in 3 times, the proxy will be removed from frps
health_check_max_failed = 3
# every 10 seconds will do a health check
health_check_interval_s = 10
```
Type http configuration:
```ini
# frpc.ini
[web]
type = http
local_ip = 127.0.0.1
local_port = 80
custom_domains = test.yourdomain.com
# enable http health check
health_check_type = http
# frpc will send a GET http request '/status' to local http service
# http service is alive when it return 2xx http response code
health_check_url = /status
health_check_interval_s = 10
health_check_max_failed = 3
health_check_timeout_s = 3
```
### Rewriting the Host Header
When forwarding to a local port, frp does not modify the tunneled HTTP requests at all, they are copied to your server byte-for-byte as they are received. Some application servers use the Host header for determining which development site to display. For this reason, frp can rewrite your requests with a modified host header. Use the `host_header_rewrite` switch to rewrite incoming HTTP requests.
@ -578,8 +617,6 @@ Features for http proxy only.
You can get user's real IP from http request header `X-Forwarded-For` and `X-Real-IP`.
**Note that now you can only get these two headers in first request of each user connection.**
### Password protecting your web service
Anyone who can guess your tunnel URL can access your local web server unless you protect it with a password.

View File

@ -4,13 +4,12 @@
[README](README.md) | [中文文档](README_zh.md)
frp 是一个可用于内网穿透的高性能的反向代理应用,支持 tcp, udp, http, https 协议
frp 是一个可用于内网穿透的高性能的反向代理应用,支持 tcp, udp 协议,为 http 和 https 应用协议提供了额外的能力,且尝试性支持了点对点穿透
## 目录
<!-- vim-markdown-toc GFM -->
* [frp 的作用](#frp-的作用)
* [开发状态](#开发状态)
* [架构](#架构)
* [使用示例](#使用示例)
@ -35,6 +34,7 @@ frp 是一个可用于内网穿透的高性能的反向代理应用,支持 tcp
* [底层通信可选 kcp 协议](#底层通信可选-kcp-协议)
* [连接池](#连接池)
* [负载均衡](#负载均衡)
* [健康检查](#健康检查)
* [修改 Host Header](#修改-host-header)
* [设置 HTTP 请求的 header](#设置-http-请求的-header)
* [获取用户真实 IP](#获取用户真实-ip)
@ -53,15 +53,9 @@ frp 是一个可用于内网穿透的高性能的反向代理应用,支持 tcp
<!-- vim-markdown-toc -->
## frp 的作用
* 利用处于内网或防火墙后的机器,对外网环境提供 http 或 https 服务。
* 对于 http, https 服务支持基于域名的虚拟主机支持自定义域名绑定使多个域名可以共用一个80端口。
* 利用处于内网或防火墙后的机器,对外网环境提供 tcp 和 udp 服务,例如在家里通过 ssh 访问处于公司内网环境内的主机。
## 开发状态
frp 仍然处于前期开发阶段,未经充分测试与验证,不推荐用于生产环境。
frp 仍然处于开发阶段,未经充分测试与验证,不推荐用于生产环境。
master 分支用于发布稳定版本dev 分支用于开发,您可以尝试下载最新的 release 版本进行测试。
@ -394,6 +388,8 @@ frpc 会自动使用环境变量渲染配置文件模版,所有环境变量需
通过浏览器查看 frp 的状态以及代理统计信息展示。
**注Dashboard 尚未针对大量的 proxy 数据展示做优化,如果出现 Dashboard 访问较慢的情况,请不要启用此功能。**
需要在 frps.ini 中指定 dashboard 服务使用的端口,即可开启此功能:
```ini
@ -410,11 +406,7 @@ dashboard_pwd = admin
### 身份验证
从 v0.10.0 版本开始,所有 proxy 配置全部放在客户端(也就是之前版本的特权模式),服务端和客户端的 common 配置中的 `token` 参数一致则身份验证通过。
需要注意的是 frpc 所在机器和 frps 所在机器的时间相差不能超过 15 分钟,因为时间戳会被用于加密验证中,防止报文被劫持后被其他人利用。
这个超时时间可以在配置文件中通过 `authentication_timeout` 这个参数来修改,单位为秒,默认值为 900即 15 分钟。如果修改为 0则 frps 将不对身份验证报文的时间戳进行超时校验。
服务端和客户端的 common 配置中的 `token` 参数一致则身份验证通过。
### 加密与压缩
@ -568,6 +560,52 @@ group_key = 123
要求 `group_key` 相同,做权限验证,且 `remote_port` 相同。
### 健康检查
通过给 proxy 加上健康检查的功能,可以在要反向代理的服务出现故障时,将这个服务从 frps 中摘除,搭配负载均衡的功能,可以用来实现高可用的架构,避免服务单点故障。
在每一个 proxy 的配置下加上 `health_check_type = {type}` 来启用健康检查功能。
**type** 目前可选 tcp 和 http。
tcp 只要能够建立连接则认为服务正常http 会发送一个 http 请求,服务需要返回 2xx 的状态码才会被认为正常。
tcp 示例配置如下:
```ini
# frpc.ini
[test1]
type = tcp
local_port = 22
remote_port = 6000
# 启用健康检查,类型为 tcp
health_check_type = tcp
# 建立连接超时时间为 3 秒
health_check_timeout_s = 3
# 连续 3 次检查失败,此 proxy 会被摘除
health_check_max_failed = 3
# 每隔 10 秒进行一次健康检查
health_check_interval_s = 10
```
http 示例配置如下:
```ini
# frpc.ini
[web]
type = http
local_ip = 127.0.0.1
local_port = 80
custom_domains = test.yourdomain.com
# 启用健康检查,类型为 http
health_check_type = http
# 健康检查发送 http 请求的 url后端服务需要返回 2xx 的 http 状态码
health_check_url = /status
health_check_interval_s = 10
health_check_max_failed = 3
health_check_timeout_s = 3
```
### 修改 Host Header
通常情况下 frp 不会修改转发的任何数据。但有一些后端服务会根据 http 请求 header 中的 host 字段来展现不同的网站,例如 nginx 的虚拟主机服务,启用 host-header 的修改功能可以动态修改 http 请求中的 host 字段。该功能仅限于 http 类型的代理。
@ -603,8 +641,6 @@ header_X-From-Where = frp
目前只有 **http** 类型的代理支持这一功能,可以通过用户请求的 header 中的 `X-Forwarded-For``X-Real-IP` 来获取用户真实 IP。
**需要注意的是,目前只在每一个用户连接的第一个 HTTP 请求中添加了这两个 header。**
### 通过密码保护你的 web 服务
由于所有客户端共用一个 frps 的 http 服务端口,任何知道你的域名和 url 的人都能访问到你部署在内网的 web 服务,但是在某些场景下需要确保只有限定的用户才能访问。
@ -651,7 +687,7 @@ subdomain = test
frps 和 frpc 都启动成功后,通过 `test.frps.com` 就可以访问到内网的 web 服务。
需要注意的是如果 frps 配置了 `subdomain_host`,则 `custom_domains` 中不能是属于 `subdomain_host` 的子域名或者泛域名。
**注:如果 frps 配置了 `subdomain_host`,则 `custom_domains` 中不能是属于 `subdomain_host` 的子域名或者泛域名。**
同一个 http 或 https 类型的代理中 `custom_domains``subdomain` 可以同时配置。
@ -740,8 +776,6 @@ plugin_http_passwd = abc
计划在后续版本中加入的功能与优化,排名不分先后,如果有其他功能建议欢迎在 [issues](https://github.com/fatedier/frp/issues) 中反馈。
* frps 记录 http 请求日志。
* frps 支持直接反向代理,类似 haproxy。
* 集成对 k8s 等平台的支持。
## 为 frp 做贡献
@ -762,6 +796,12 @@ frp 是一个免费且开源的项目,我们欢迎任何人为其开发和进
frp 交流群606194980 (QQ 群号)
### 知识星球
如果您想学习 frp 相关的知识和技术,或者寻求任何帮助,都可以通过微信扫描下方的二维码付费加入知识星球的官方社群:
![zsxq](/doc/pic/zsxq.jpg)
### 支付宝扫码捐赠
![donate-alipay](/doc/pic/donate-alipay.png)

View File

@ -1 +1 @@
<!DOCTYPE html> <html lang=en> <head> <meta charset=utf-8> <title>frps dashboard</title> <link rel="shortcut icon" href="favicon.ico"></head> <body> <div id=app></div> <script type="text/javascript" src="manifest.js?bc42bc4eff72df8da372"></script><script type="text/javascript" src="vendor.js?ee403fce53c8757fc931"></script></body> </html>
<!DOCTYPE html> <html lang=en> <head> <meta charset=utf-8> <title>frps dashboard</title> <link rel="shortcut icon" href="favicon.ico"></head> <body> <div id=app></div> <script type="text/javascript" src="manifest.js?14bea8276eef86cc7c61"></script><script type="text/javascript" src="vendor.js?51925ec1a77936b64d61"></script></body> </html>

View File

@ -1 +1 @@
!function(e){function n(r){if(t[r])return t[r].exports;var o=t[r]={i:r,l:!1,exports:{}};return e[r].call(o.exports,o,o.exports,n),o.l=!0,o.exports}var r=window.webpackJsonp;window.webpackJsonp=function(t,c,u){for(var i,a,f,l=0,s=[];l<t.length;l++)a=t[l],o[a]&&s.push(o[a][0]),o[a]=0;for(i in c)Object.prototype.hasOwnProperty.call(c,i)&&(e[i]=c[i]);for(r&&r(t,c,u);s.length;)s.shift()();if(u)for(l=0;l<u.length;l++)f=n(n.s=u[l]);return f};var t={},o={1:0};n.e=function(e){function r(){i.onerror=i.onload=null,clearTimeout(a);var n=o[e];0!==n&&(n&&n[1](new Error("Loading chunk "+e+" failed.")),o[e]=void 0)}var t=o[e];if(0===t)return new Promise(function(e){e()});if(t)return t[2];var c=new Promise(function(n,r){t=o[e]=[n,r]});t[2]=c;var u=document.getElementsByTagName("head")[0],i=document.createElement("script");i.type="text/javascript",i.charset="utf-8",i.async=!0,i.timeout=12e4,n.nc&&i.setAttribute("nonce",n.nc),i.src=n.p+""+e+".js?"+{0:"ee403fce53c8757fc931"}[e];var a=setTimeout(r,12e4);return i.onerror=i.onload=r,u.appendChild(i),c},n.m=e,n.c=t,n.i=function(e){return e},n.d=function(e,r,t){n.o(e,r)||Object.defineProperty(e,r,{configurable:!1,enumerable:!0,get:t})},n.n=function(e){var r=e&&e.__esModule?function(){return e.default}:function(){return e};return n.d(r,"a",r),r},n.o=function(e,n){return Object.prototype.hasOwnProperty.call(e,n)},n.p="",n.oe=function(e){throw console.error(e),e}}([]);
!function(e){function n(r){if(t[r])return t[r].exports;var o=t[r]={i:r,l:!1,exports:{}};return e[r].call(o.exports,o,o.exports,n),o.l=!0,o.exports}var r=window.webpackJsonp;window.webpackJsonp=function(t,c,u){for(var i,a,f,l=0,s=[];l<t.length;l++)a=t[l],o[a]&&s.push(o[a][0]),o[a]=0;for(i in c)Object.prototype.hasOwnProperty.call(c,i)&&(e[i]=c[i]);for(r&&r(t,c,u);s.length;)s.shift()();if(u)for(l=0;l<u.length;l++)f=n(n.s=u[l]);return f};var t={},o={1:0};n.e=function(e){function r(){i.onerror=i.onload=null,clearTimeout(a);var n=o[e];0!==n&&(n&&n[1](new Error("Loading chunk "+e+" failed.")),o[e]=void 0)}var t=o[e];if(0===t)return new Promise(function(e){e()});if(t)return t[2];var c=new Promise(function(n,r){t=o[e]=[n,r]});t[2]=c;var u=document.getElementsByTagName("head")[0],i=document.createElement("script");i.type="text/javascript",i.charset="utf-8",i.async=!0,i.timeout=12e4,n.nc&&i.setAttribute("nonce",n.nc),i.src=n.p+""+e+".js?"+{0:"51925ec1a77936b64d61"}[e];var a=setTimeout(r,12e4);return i.onerror=i.onload=r,u.appendChild(i),c},n.m=e,n.c=t,n.i=function(e){return e},n.d=function(e,r,t){n.o(e,r)||Object.defineProperty(e,r,{configurable:!1,enumerable:!0,get:t})},n.n=function(e){var r=e&&e.__esModule?function(){return e.default}:function(){return e};return n.d(r,"a",r),r},n.o=function(e,n){return Object.prototype.hasOwnProperty.call(e,n)},n.p="",n.oe=function(e){throw console.error(e),e}}([]);

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -54,7 +54,6 @@ var (
logLevel string
logMaxDays int64
token string
authTimeout int64
subDomainHost string
tcpMux bool
allowPorts string
@ -82,7 +81,6 @@ func init() {
rootCmd.PersistentFlags().StringVarP(&logLevel, "log_level", "", "info", "log level")
rootCmd.PersistentFlags().Int64VarP(&logMaxDays, "log_max_days", "", 3, "log_max_days")
rootCmd.PersistentFlags().StringVarP(&token, "token", "t", "", "auth token")
rootCmd.PersistentFlags().Int64VarP(&authTimeout, "auth_timeout", "", 900, "auth timeout")
rootCmd.PersistentFlags().StringVarP(&subDomainHost, "subdomain_host", "", "", "subdomain host")
rootCmd.PersistentFlags().StringVarP(&allowPorts, "allow_ports", "", "", "allow ports")
rootCmd.PersistentFlags().Int64VarP(&maxPortsPerClient, "max_ports_per_client", "", 0, "max ports per client")
@ -173,7 +171,6 @@ func parseServerCommonCfgFromCmd() (err error) {
g.GlbServerCfg.LogLevel = logLevel
g.GlbServerCfg.LogMaxDays = logMaxDays
g.GlbServerCfg.Token = token
g.GlbServerCfg.AuthTimeout = authTimeout
g.GlbServerCfg.SubDomainHost = subDomainHost
if len(allowPorts) > 0 {
// e.g. 1000-2000,2001,2002,3000-4000

View File

@ -76,9 +76,12 @@ group_key = 123456
# enable health check for the backend service, it support 'tcp' and 'http' now
# frpc will connect local service's port to detect it's healthy status
health_check_type = tcp
health_check_interval_s = 10
health_check_max_failed = 1
# health check connection timeout
health_check_timeout_s = 3
# if continuous failed in 3 times, the proxy will be removed from frps
health_check_max_failed = 3
# every 10 seconds will do a health check
health_check_interval_s = 10
[ssh_random]
type = tcp
@ -137,6 +140,8 @@ health_check_type = http
# http service is alive when it return 2xx http response code
health_check_url = /status
health_check_interval_s = 10
health_check_max_failed = 3
health_check_timeout_s = 3
[web02]
type = https

View File

@ -59,10 +59,6 @@ max_pool_count = 5
# max ports can be used for each client, default value is 0 means no limit
max_ports_per_client = 0
# authentication_timeout means the timeout interval (seconds) when the frpc connects frps
# if authentication_timeout is zero, the time is not verified, default is 900s
authentication_timeout = 900
# if subdomain_host is not empty, you can set subdomain when type is http or https in frpc's configure file
# when subdomain is test, the host used by routing is test.frps.com
subdomain_host = frps.com

BIN
doc/pic/zsxq.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 12 KiB

View File

@ -67,7 +67,6 @@ type ServerCommonConf struct {
LogLevel string `json:"log_level"`
LogMaxDays int64 `json:"log_max_days"`
Token string `json:"token"`
AuthTimeout int64 `json:"auth_timeout"`
SubDomainHost string `json:"subdomain_host"`
TcpMux bool `json:"tcp_mux"`
@ -98,7 +97,6 @@ func GetDefaultServerConf() *ServerCommonConf {
LogLevel: "info",
LogMaxDays: 3,
Token: "",
AuthTimeout: 900,
SubDomainHost: "",
TcpMux: true,
AllowPorts: make(map[int]struct{}),
@ -285,16 +283,6 @@ func UnmarshalServerConfFromIni(defaultCfg *ServerCommonConf, content string) (c
}
}
if tmpStr, ok = conf.Get("common", "authentication_timeout"); ok {
v, errRet := strconv.ParseInt(tmpStr, 10, 64)
if errRet != nil {
err = fmt.Errorf("Parse conf error: authentication_timeout is incorrect")
return
} else {
cfg.AuthTimeout = v
}
}
if tmpStr, ok = conf.Get("common", "subdomain_host"); ok {
cfg.SubDomainHost = strings.ToLower(strings.TrimSpace(tmpStr))
}

View File

@ -1,4 +1,4 @@
package server
package nathole
import (
"bytes"

View File

@ -26,6 +26,9 @@ import (
"github.com/fatedier/frp/models/consts"
frpErr "github.com/fatedier/frp/models/errors"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/server/controller"
"github.com/fatedier/frp/server/proxy"
"github.com/fatedier/frp/server/stats"
"github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/version"
@ -34,9 +37,53 @@ import (
"github.com/fatedier/golib/errors"
)
type ControlManager struct {
// controls indexed by run id
ctlsByRunId map[string]*Control
mu sync.RWMutex
}
func NewControlManager() *ControlManager {
return &ControlManager{
ctlsByRunId: make(map[string]*Control),
}
}
func (cm *ControlManager) Add(runId string, ctl *Control) (oldCtl *Control) {
cm.mu.Lock()
defer cm.mu.Unlock()
oldCtl, ok := cm.ctlsByRunId[runId]
if ok {
oldCtl.Replaced(ctl)
}
cm.ctlsByRunId[runId] = ctl
return
}
func (cm *ControlManager) Del(runId string) {
cm.mu.Lock()
defer cm.mu.Unlock()
delete(cm.ctlsByRunId, runId)
}
func (cm *ControlManager) GetById(runId string) (ctl *Control, ok bool) {
cm.mu.RLock()
defer cm.mu.RUnlock()
ctl, ok = cm.ctlsByRunId[runId]
return
}
type Control struct {
// frps service
svr *Service
// all resource managers and controllers
rc *controller.ResourceController
// proxy manager
pxyManager *proxy.ProxyManager
// stats collector to store stats info of clients and proxies
statsCollector stats.Collector
// login message
loginMsg *msg.Login
@ -54,7 +101,7 @@ type Control struct {
workConnCh chan net.Conn
// proxies in one client
proxies map[string]Proxy
proxies map[string]proxy.Proxy
// pool count
poolCount int
@ -81,15 +128,19 @@ type Control struct {
mu sync.RWMutex
}
func NewControl(svr *Service, ctlConn net.Conn, loginMsg *msg.Login) *Control {
func NewControl(rc *controller.ResourceController, pxyManager *proxy.ProxyManager,
statsCollector stats.Collector, ctlConn net.Conn, loginMsg *msg.Login) *Control {
return &Control{
svr: svr,
rc: rc,
pxyManager: pxyManager,
statsCollector: statsCollector,
conn: ctlConn,
loginMsg: loginMsg,
sendCh: make(chan msg.Message, 10),
readCh: make(chan msg.Message, 10),
workConnCh: make(chan net.Conn, loginMsg.PoolCount+10),
proxies: make(map[string]Proxy),
proxies: make(map[string]proxy.Proxy),
poolCount: loginMsg.PoolCount,
portsUsedNum: 0,
lastPing: time.Now(),
@ -284,14 +335,22 @@ func (ctl *Control) stoper() {
for _, pxy := range ctl.proxies {
pxy.Close()
ctl.svr.DelProxy(pxy.GetName())
StatsCloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType)
ctl.pxyManager.Del(pxy.GetName())
ctl.statsCollector.Mark(stats.TypeCloseProxy, &stats.CloseProxyPayload{
Name: pxy.GetName(),
ProxyType: pxy.GetConf().GetBaseInfo().ProxyType,
})
}
ctl.allShutdown.Done()
ctl.conn.Info("client exit success")
StatsCloseClient()
ctl.statsCollector.Mark(stats.TypeCloseClient, &stats.CloseClientPayload{})
}
// block until Control closed
func (ctl *Control) WaitClosed() {
ctl.allShutdown.WaitDone()
}
func (ctl *Control) manager() {
@ -333,7 +392,10 @@ func (ctl *Control) manager() {
} else {
resp.RemoteAddr = remoteAddr
ctl.conn.Info("new proxy [%s] success", m.ProxyName)
StatsNewProxy(m.ProxyName, m.ProxyType)
ctl.statsCollector.Mark(stats.TypeNewProxy, &stats.NewProxyPayload{
Name: m.ProxyName,
ProxyType: m.ProxyType,
})
}
ctl.sendCh <- resp
case *msg.CloseProxy:
@ -358,7 +420,7 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err
// NewProxy will return a interface Proxy.
// In fact it create different proxies by different proxy type, we just call run() here.
pxy, err := NewProxy(ctl, pxyConf)
pxy, err := proxy.NewProxy(ctl.runId, ctl.rc, ctl.statsCollector, ctl.poolCount, ctl.GetWorkConn, pxyConf)
if err != nil {
return remoteAddr, err
}
@ -393,7 +455,7 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err
}
}()
err = ctl.svr.RegisterProxy(pxyMsg.ProxyName, pxy)
err = ctl.pxyManager.Add(pxyMsg.ProxyName, pxy)
if err != nil {
return
}
@ -406,7 +468,6 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err
func (ctl *Control) CloseProxy(closeMsg *msg.CloseProxy) (err error) {
ctl.mu.Lock()
pxy, ok := ctl.proxies[closeMsg.ProxyName]
if !ok {
ctl.mu.Unlock()
@ -417,10 +478,13 @@ func (ctl *Control) CloseProxy(closeMsg *msg.CloseProxy) (err error) {
ctl.portsUsedNum = ctl.portsUsedNum - pxy.GetUsedPortsNum()
}
pxy.Close()
ctl.svr.DelProxy(pxy.GetName())
ctl.pxyManager.Del(pxy.GetName())
delete(ctl.proxies, closeMsg.ProxyName)
ctl.mu.Unlock()
StatsCloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType)
ctl.statsCollector.Mark(stats.TypeCloseProxy, &stats.CloseProxyPayload{
Name: pxy.GetName(),
ProxyType: pxy.GetConf().GetBaseInfo().ProxyType,
})
return
}

View File

@ -0,0 +1,46 @@
// Copyright 2019 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package controller
import (
"github.com/fatedier/frp/models/nathole"
"github.com/fatedier/frp/server/group"
"github.com/fatedier/frp/server/ports"
"github.com/fatedier/frp/utils/vhost"
)
// All resource managers and controllers
type ResourceController struct {
// Manage all visitor listeners
VisitorManager *VisitorManager
// Tcp Group Controller
TcpGroupCtl *group.TcpGroupCtl
// Manage all tcp ports
TcpPortManager *ports.PortManager
// Manage all udp ports
UdpPortManager *ports.PortManager
// For http proxies, forwarding http requests
HttpReverseProxy *vhost.HttpReverseProxy
// For https proxies, route requests to different clients by hostname and other infomation
VhostHttpsMuxer *vhost.HttpsMuxer
// Controller for nat hole connections
NatHoleController *nathole.NatHoleController
}

View File

@ -1,4 +1,4 @@
// Copyright 2017 fatedier, fatedier@gmail.com
// Copyright 2019 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package server
package controller
import (
"fmt"
@ -25,75 +25,6 @@ import (
frpIo "github.com/fatedier/golib/io"
)
type ControlManager struct {
// controls indexed by run id
ctlsByRunId map[string]*Control
mu sync.RWMutex
}
func NewControlManager() *ControlManager {
return &ControlManager{
ctlsByRunId: make(map[string]*Control),
}
}
func (cm *ControlManager) Add(runId string, ctl *Control) (oldCtl *Control) {
cm.mu.Lock()
defer cm.mu.Unlock()
oldCtl, ok := cm.ctlsByRunId[runId]
if ok {
oldCtl.Replaced(ctl)
}
cm.ctlsByRunId[runId] = ctl
return
}
func (cm *ControlManager) GetById(runId string) (ctl *Control, ok bool) {
cm.mu.RLock()
defer cm.mu.RUnlock()
ctl, ok = cm.ctlsByRunId[runId]
return
}
type ProxyManager struct {
// proxies indexed by proxy name
pxys map[string]Proxy
mu sync.RWMutex
}
func NewProxyManager() *ProxyManager {
return &ProxyManager{
pxys: make(map[string]Proxy),
}
}
func (pm *ProxyManager) Add(name string, pxy Proxy) error {
pm.mu.Lock()
defer pm.mu.Unlock()
if _, ok := pm.pxys[name]; ok {
return fmt.Errorf("proxy name [%s] is already in use", name)
}
pm.pxys[name] = pxy
return nil
}
func (pm *ProxyManager) Del(name string) {
pm.mu.Lock()
defer pm.mu.Unlock()
delete(pm.pxys, name)
}
func (pm *ProxyManager) GetByName(name string) (pxy Proxy, ok bool) {
pm.mu.RLock()
defer pm.mu.RUnlock()
pxy, ok = pm.pxys[name]
return
}
// Manager for visitor listeners.
type VisitorManager struct {
visitorListeners map[string]*frpNet.CustomListener

View File

@ -32,7 +32,7 @@ var (
httpServerWriteTimeout = 10 * time.Second
)
func RunDashboardServer(addr string, port int) (err error) {
func (svr *Service) RunDashboardServer(addr string, port int) (err error) {
// url router
router := mux.NewRouter()
@ -40,10 +40,10 @@ func RunDashboardServer(addr string, port int) (err error) {
router.Use(frpNet.NewHttpAuthMiddleware(user, passwd).Middleware)
// api, see dashboard_api.go
router.HandleFunc("/api/serverinfo", apiServerInfo).Methods("GET")
router.HandleFunc("/api/proxy/{type}", apiProxyByType).Methods("GET")
router.HandleFunc("/api/proxy/{type}/{name}", apiProxyByTypeAndName).Methods("GET")
router.HandleFunc("/api/traffic/{name}", apiProxyTraffic).Methods("GET")
router.HandleFunc("/api/serverinfo", svr.ApiServerInfo).Methods("GET")
router.HandleFunc("/api/proxy/{type}", svr.ApiProxyByType).Methods("GET")
router.HandleFunc("/api/proxy/{type}/{name}", svr.ApiProxyByTypeAndName).Methods("GET")
router.HandleFunc("/api/traffic/{name}", svr.ApiProxyTraffic).Methods("GET")
// view
router.Handle("/favicon.ico", http.FileServer(assets.FileSystem)).Methods("GET")

View File

@ -32,7 +32,6 @@ type GeneralResponse struct {
Msg string `json:"msg"`
}
// api/serverinfo
type ServerInfoResp struct {
GeneralResponse
@ -42,7 +41,6 @@ type ServerInfoResp struct {
VhostHttpPort int `json:"vhost_http_port"`
VhostHttpsPort int `json:"vhost_https_port"`
KcpBindPort int `json:"kcp_bind_port"`
AuthTimeout int64 `json:"auth_timeout"`
SubdomainHost string `json:"subdomain_host"`
MaxPoolCount int64 `json:"max_pool_count"`
MaxPortsPerClient int64 `json:"max_ports_per_client"`
@ -55,7 +53,8 @@ type ServerInfoResp struct {
ProxyTypeCounts map[string]int64 `json:"proxy_type_count"`
}
func apiServerInfo(w http.ResponseWriter, r *http.Request) {
// api/serverinfo
func (svr *Service) ApiServerInfo(w http.ResponseWriter, r *http.Request) {
var (
buf []byte
res ServerInfoResp
@ -66,7 +65,7 @@ func apiServerInfo(w http.ResponseWriter, r *http.Request) {
log.Info("Http request: [%s]", r.URL.Path)
cfg := &g.GlbServerCfg.ServerCommonConf
serverStats := StatsGetServer()
serverStats := svr.statsCollector.GetServer()
res = ServerInfoResp{
Version: version.Full(),
BindPort: cfg.BindPort,
@ -74,7 +73,6 @@ func apiServerInfo(w http.ResponseWriter, r *http.Request) {
VhostHttpPort: cfg.VhostHttpPort,
VhostHttpsPort: cfg.VhostHttpsPort,
KcpBindPort: cfg.KcpBindPort,
AuthTimeout: cfg.AuthTimeout,
SubdomainHost: cfg.SubDomainHost,
MaxPoolCount: cfg.MaxPoolCount,
MaxPortsPerClient: cfg.MaxPortsPerClient,
@ -162,7 +160,7 @@ type GetProxyInfoResp struct {
}
// api/proxy/:type
func apiProxyByType(w http.ResponseWriter, r *http.Request) {
func (svr *Service) ApiProxyByType(w http.ResponseWriter, r *http.Request) {
var (
buf []byte
res GetProxyInfoResp
@ -177,19 +175,19 @@ func apiProxyByType(w http.ResponseWriter, r *http.Request) {
}()
log.Info("Http request: [%s]", r.URL.Path)
res.Proxies = getProxyStatsByType(proxyType)
res.Proxies = svr.getProxyStatsByType(proxyType)
buf, _ = json.Marshal(&res)
w.Write(buf)
}
func getProxyStatsByType(proxyType string) (proxyInfos []*ProxyStatsInfo) {
proxyStats := StatsGetProxiesByType(proxyType)
func (svr *Service) getProxyStatsByType(proxyType string) (proxyInfos []*ProxyStatsInfo) {
proxyStats := svr.statsCollector.GetProxiesByType(proxyType)
proxyInfos = make([]*ProxyStatsInfo, 0, len(proxyStats))
for _, ps := range proxyStats {
proxyInfo := &ProxyStatsInfo{}
if pxy, ok := ServerService.pxyManager.GetByName(ps.Name); ok {
if pxy, ok := svr.pxyManager.GetByName(ps.Name); ok {
content, err := json.Marshal(pxy.GetConf())
if err != nil {
log.Warn("marshal proxy [%s] conf info error: %v", ps.Name, err)
@ -230,7 +228,7 @@ type GetProxyStatsResp struct {
}
// api/proxy/:type/:name
func apiProxyByTypeAndName(w http.ResponseWriter, r *http.Request) {
func (svr *Service) ApiProxyByTypeAndName(w http.ResponseWriter, r *http.Request) {
var (
buf []byte
res GetProxyStatsResp
@ -244,20 +242,20 @@ func apiProxyByTypeAndName(w http.ResponseWriter, r *http.Request) {
}()
log.Info("Http request: [%s]", r.URL.Path)
res = getProxyStatsByTypeAndName(proxyType, name)
res = svr.getProxyStatsByTypeAndName(proxyType, name)
buf, _ = json.Marshal(&res)
w.Write(buf)
}
func getProxyStatsByTypeAndName(proxyType string, proxyName string) (proxyInfo GetProxyStatsResp) {
func (svr *Service) getProxyStatsByTypeAndName(proxyType string, proxyName string) (proxyInfo GetProxyStatsResp) {
proxyInfo.Name = proxyName
ps := StatsGetProxiesByTypeAndName(proxyType, proxyName)
ps := svr.statsCollector.GetProxiesByTypeAndName(proxyType, proxyName)
if ps == nil {
proxyInfo.Code = 1
proxyInfo.Msg = "no proxy info found"
} else {
if pxy, ok := ServerService.pxyManager.GetByName(proxyName); ok {
if pxy, ok := svr.pxyManager.GetByName(proxyName); ok {
content, err := json.Marshal(pxy.GetConf())
if err != nil {
log.Warn("marshal proxy [%s] conf info error: %v", ps.Name, err)
@ -295,7 +293,7 @@ type GetProxyTrafficResp struct {
TrafficOut []int64 `json:"traffic_out"`
}
func apiProxyTraffic(w http.ResponseWriter, r *http.Request) {
func (svr *Service) ApiProxyTraffic(w http.ResponseWriter, r *http.Request) {
var (
buf []byte
res GetProxyTrafficResp
@ -309,7 +307,7 @@ func apiProxyTraffic(w http.ResponseWriter, r *http.Request) {
log.Info("Http request: [%s]", r.URL.Path)
res.Name = name
proxyTrafficInfo := StatsGetProxyTraffic(name)
proxyTrafficInfo := svr.statsCollector.GetProxyTraffic(name)
if proxyTrafficInfo == nil {
res.Code = 1
res.Msg = "no proxy info found"

View File

@ -1,316 +0,0 @@
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"sync"
"time"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/utils/log"
"github.com/fatedier/frp/utils/metric"
)
const (
ReserveDays = 7
)
var globalStats *ServerStatistics
type ServerStatistics struct {
TotalTrafficIn metric.DateCounter
TotalTrafficOut metric.DateCounter
CurConns metric.Counter
// counter for clients
ClientCounts metric.Counter
// counter for proxy types
ProxyTypeCounts map[string]metric.Counter
// statistics for different proxies
// key is proxy name
ProxyStatistics map[string]*ProxyStatistics
mu sync.Mutex
}
type ProxyStatistics struct {
Name string
ProxyType string
TrafficIn metric.DateCounter
TrafficOut metric.DateCounter
CurConns metric.Counter
LastStartTime time.Time
LastCloseTime time.Time
}
func init() {
globalStats = &ServerStatistics{
TotalTrafficIn: metric.NewDateCounter(ReserveDays),
TotalTrafficOut: metric.NewDateCounter(ReserveDays),
CurConns: metric.NewCounter(),
ClientCounts: metric.NewCounter(),
ProxyTypeCounts: make(map[string]metric.Counter),
ProxyStatistics: make(map[string]*ProxyStatistics),
}
go func() {
for {
time.Sleep(12 * time.Hour)
log.Debug("start to clear useless proxy statistics data...")
StatsClearUselessInfo()
log.Debug("finish to clear useless proxy statistics data")
}
}()
}
func StatsClearUselessInfo() {
// To check if there are proxies that closed than 7 days and drop them.
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
for name, data := range globalStats.ProxyStatistics {
if !data.LastCloseTime.IsZero() && time.Since(data.LastCloseTime) > time.Duration(7*24)*time.Hour {
delete(globalStats.ProxyStatistics, name)
log.Trace("clear proxy [%s]'s statistics data, lastCloseTime: [%s]", name, data.LastCloseTime.String())
}
}
}
func StatsNewClient() {
if g.GlbServerCfg.DashboardPort != 0 {
globalStats.ClientCounts.Inc(1)
}
}
func StatsCloseClient() {
if g.GlbServerCfg.DashboardPort != 0 {
globalStats.ClientCounts.Dec(1)
}
}
func StatsNewProxy(name string, proxyType string) {
if g.GlbServerCfg.DashboardPort != 0 {
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
counter, ok := globalStats.ProxyTypeCounts[proxyType]
if !ok {
counter = metric.NewCounter()
}
counter.Inc(1)
globalStats.ProxyTypeCounts[proxyType] = counter
proxyStats, ok := globalStats.ProxyStatistics[name]
if !(ok && proxyStats.ProxyType == proxyType) {
proxyStats = &ProxyStatistics{
Name: name,
ProxyType: proxyType,
CurConns: metric.NewCounter(),
TrafficIn: metric.NewDateCounter(ReserveDays),
TrafficOut: metric.NewDateCounter(ReserveDays),
}
globalStats.ProxyStatistics[name] = proxyStats
}
proxyStats.LastStartTime = time.Now()
}
}
func StatsCloseProxy(proxyName string, proxyType string) {
if g.GlbServerCfg.DashboardPort != 0 {
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
if counter, ok := globalStats.ProxyTypeCounts[proxyType]; ok {
counter.Dec(1)
}
if proxyStats, ok := globalStats.ProxyStatistics[proxyName]; ok {
proxyStats.LastCloseTime = time.Now()
}
}
}
func StatsOpenConnection(name string) {
if g.GlbServerCfg.DashboardPort != 0 {
globalStats.CurConns.Inc(1)
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
proxyStats, ok := globalStats.ProxyStatistics[name]
if ok {
proxyStats.CurConns.Inc(1)
globalStats.ProxyStatistics[name] = proxyStats
}
}
}
func StatsCloseConnection(name string) {
if g.GlbServerCfg.DashboardPort != 0 {
globalStats.CurConns.Dec(1)
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
proxyStats, ok := globalStats.ProxyStatistics[name]
if ok {
proxyStats.CurConns.Dec(1)
globalStats.ProxyStatistics[name] = proxyStats
}
}
}
func StatsAddTrafficIn(name string, trafficIn int64) {
if g.GlbServerCfg.DashboardPort != 0 {
globalStats.TotalTrafficIn.Inc(trafficIn)
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
proxyStats, ok := globalStats.ProxyStatistics[name]
if ok {
proxyStats.TrafficIn.Inc(trafficIn)
globalStats.ProxyStatistics[name] = proxyStats
}
}
}
func StatsAddTrafficOut(name string, trafficOut int64) {
if g.GlbServerCfg.DashboardPort != 0 {
globalStats.TotalTrafficOut.Inc(trafficOut)
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
proxyStats, ok := globalStats.ProxyStatistics[name]
if ok {
proxyStats.TrafficOut.Inc(trafficOut)
globalStats.ProxyStatistics[name] = proxyStats
}
}
}
// Functions for getting server stats.
type ServerStats struct {
TotalTrafficIn int64
TotalTrafficOut int64
CurConns int64
ClientCounts int64
ProxyTypeCounts map[string]int64
}
func StatsGetServer() *ServerStats {
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
s := &ServerStats{
TotalTrafficIn: globalStats.TotalTrafficIn.TodayCount(),
TotalTrafficOut: globalStats.TotalTrafficOut.TodayCount(),
CurConns: globalStats.CurConns.Count(),
ClientCounts: globalStats.ClientCounts.Count(),
ProxyTypeCounts: make(map[string]int64),
}
for k, v := range globalStats.ProxyTypeCounts {
s.ProxyTypeCounts[k] = v.Count()
}
return s
}
type ProxyStats struct {
Name string
Type string
TodayTrafficIn int64
TodayTrafficOut int64
LastStartTime string
LastCloseTime string
CurConns int64
}
func StatsGetProxiesByType(proxyType string) []*ProxyStats {
res := make([]*ProxyStats, 0)
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
for name, proxyStats := range globalStats.ProxyStatistics {
if proxyStats.ProxyType != proxyType {
continue
}
ps := &ProxyStats{
Name: name,
Type: proxyStats.ProxyType,
TodayTrafficIn: proxyStats.TrafficIn.TodayCount(),
TodayTrafficOut: proxyStats.TrafficOut.TodayCount(),
CurConns: proxyStats.CurConns.Count(),
}
if !proxyStats.LastStartTime.IsZero() {
ps.LastStartTime = proxyStats.LastStartTime.Format("01-02 15:04:05")
}
if !proxyStats.LastCloseTime.IsZero() {
ps.LastCloseTime = proxyStats.LastCloseTime.Format("01-02 15:04:05")
}
res = append(res, ps)
}
return res
}
func StatsGetProxiesByTypeAndName(proxyType string, proxyName string) (res *ProxyStats) {
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
for name, proxyStats := range globalStats.ProxyStatistics {
if proxyStats.ProxyType != proxyType {
continue
}
if name != proxyName {
continue
}
res = &ProxyStats{
Name: name,
Type: proxyStats.ProxyType,
TodayTrafficIn: proxyStats.TrafficIn.TodayCount(),
TodayTrafficOut: proxyStats.TrafficOut.TodayCount(),
CurConns: proxyStats.CurConns.Count(),
}
if !proxyStats.LastStartTime.IsZero() {
res.LastStartTime = proxyStats.LastStartTime.Format("01-02 15:04:05")
}
if !proxyStats.LastCloseTime.IsZero() {
res.LastCloseTime = proxyStats.LastCloseTime.Format("01-02 15:04:05")
}
break
}
return
}
type ProxyTrafficInfo struct {
Name string
TrafficIn []int64
TrafficOut []int64
}
func StatsGetProxyTraffic(name string) (res *ProxyTrafficInfo) {
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
proxyStats, ok := globalStats.ProxyStatistics[name]
if ok {
res = &ProxyTrafficInfo{
Name: name,
}
res.TrafficIn = proxyStats.TrafficIn.GetLastDaysCount(ReserveDays)
res.TrafficOut = proxyStats.TrafficOut.GetLastDaysCount(ReserveDays)
}
return
}

View File

@ -1,687 +0,0 @@
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"context"
"fmt"
"io"
"net"
"strings"
"sync"
"time"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/models/proto/udp"
"github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/util"
"github.com/fatedier/frp/utils/vhost"
"github.com/fatedier/golib/errors"
frpIo "github.com/fatedier/golib/io"
)
type Proxy interface {
Run() (remoteAddr string, err error)
GetControl() *Control
GetName() string
GetConf() config.ProxyConf
GetWorkConnFromPool() (workConn frpNet.Conn, err error)
GetUsedPortsNum() int
Close()
log.Logger
}
type BaseProxy struct {
name string
ctl *Control
listeners []frpNet.Listener
usedPortsNum int
mu sync.RWMutex
log.Logger
}
func (pxy *BaseProxy) GetName() string {
return pxy.name
}
func (pxy *BaseProxy) GetControl() *Control {
return pxy.ctl
}
func (pxy *BaseProxy) GetUsedPortsNum() int {
return pxy.usedPortsNum
}
func (pxy *BaseProxy) Close() {
pxy.Info("proxy closing")
for _, l := range pxy.listeners {
l.Close()
}
}
func (pxy *BaseProxy) GetWorkConnFromPool() (workConn frpNet.Conn, err error) {
ctl := pxy.GetControl()
// try all connections from the pool
for i := 0; i < ctl.poolCount+1; i++ {
if workConn, err = ctl.GetWorkConn(); err != nil {
pxy.Warn("failed to get work connection: %v", err)
return
}
pxy.Info("get a new work connection: [%s]", workConn.RemoteAddr().String())
workConn.AddLogPrefix(pxy.GetName())
err := msg.WriteMsg(workConn, &msg.StartWorkConn{
ProxyName: pxy.GetName(),
})
if err != nil {
workConn.Warn("failed to send message to work connection from pool: %v, times: %d", err, i)
workConn.Close()
} else {
break
}
}
if err != nil {
pxy.Error("try to get work connection failed in the end")
return
}
return
}
// startListenHandler start a goroutine handler for each listener.
// p: p will just be passed to handler(Proxy, frpNet.Conn).
// handler: each proxy type can set different handler function to deal with connections accepted from listeners.
func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, frpNet.Conn)) {
for _, listener := range pxy.listeners {
go func(l frpNet.Listener) {
for {
// block
// if listener is closed, err returned
c, err := l.Accept()
if err != nil {
pxy.Info("listener is closed")
return
}
pxy.Debug("get a user connection [%s]", c.RemoteAddr().String())
go handler(p, c)
}
}(listener)
}
}
func NewProxy(ctl *Control, pxyConf config.ProxyConf) (pxy Proxy, err error) {
basePxy := BaseProxy{
name: pxyConf.GetBaseInfo().ProxyName,
ctl: ctl,
listeners: make([]frpNet.Listener, 0),
Logger: log.NewPrefixLogger(ctl.runId),
}
switch cfg := pxyConf.(type) {
case *config.TcpProxyConf:
basePxy.usedPortsNum = 1
pxy = &TcpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.HttpProxyConf:
pxy = &HttpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.HttpsProxyConf:
pxy = &HttpsProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.UdpProxyConf:
basePxy.usedPortsNum = 1
pxy = &UdpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.StcpProxyConf:
pxy = &StcpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.XtcpProxyConf:
pxy = &XtcpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
default:
return pxy, fmt.Errorf("proxy type not support")
}
pxy.AddLogPrefix(pxy.GetName())
return
}
type TcpProxy struct {
BaseProxy
cfg *config.TcpProxyConf
realPort int
}
func (pxy *TcpProxy) Run() (remoteAddr string, err error) {
if pxy.cfg.Group != "" {
l, realPort, errRet := pxy.ctl.svr.tcpGroupCtl.Listen(pxy.name, pxy.cfg.Group, pxy.cfg.GroupKey, g.GlbServerCfg.ProxyBindAddr, pxy.cfg.RemotePort)
if errRet != nil {
err = errRet
return
}
defer func() {
if err != nil {
l.Close()
}
}()
pxy.realPort = realPort
listener := frpNet.WrapLogListener(l)
listener.AddLogPrefix(pxy.name)
pxy.listeners = append(pxy.listeners, listener)
pxy.Info("tcp proxy listen port [%d] in group [%s]", pxy.cfg.RemotePort, pxy.cfg.Group)
} else {
pxy.realPort, err = pxy.ctl.svr.tcpPortManager.Acquire(pxy.name, pxy.cfg.RemotePort)
if err != nil {
return
}
defer func() {
if err != nil {
pxy.ctl.svr.tcpPortManager.Release(pxy.realPort)
}
}()
listener, errRet := frpNet.ListenTcp(g.GlbServerCfg.ProxyBindAddr, pxy.realPort)
if errRet != nil {
err = errRet
return
}
listener.AddLogPrefix(pxy.name)
pxy.listeners = append(pxy.listeners, listener)
pxy.Info("tcp proxy listen port [%d]", pxy.cfg.RemotePort)
}
pxy.cfg.RemotePort = pxy.realPort
remoteAddr = fmt.Sprintf(":%d", pxy.realPort)
pxy.startListenHandler(pxy, HandleUserTcpConnection)
return
}
func (pxy *TcpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *TcpProxy) Close() {
pxy.BaseProxy.Close()
if pxy.cfg.Group == "" {
pxy.ctl.svr.tcpPortManager.Release(pxy.realPort)
}
}
type HttpProxy struct {
BaseProxy
cfg *config.HttpProxyConf
closeFuncs []func()
}
func (pxy *HttpProxy) Run() (remoteAddr string, err error) {
routeConfig := vhost.VhostRouteConfig{
RewriteHost: pxy.cfg.HostHeaderRewrite,
Headers: pxy.cfg.Headers,
Username: pxy.cfg.HttpUser,
Password: pxy.cfg.HttpPwd,
CreateConnFn: pxy.GetRealConn,
}
locations := pxy.cfg.Locations
if len(locations) == 0 {
locations = []string{""}
}
addrs := make([]string, 0)
for _, domain := range pxy.cfg.CustomDomains {
routeConfig.Domain = domain
for _, location := range locations {
routeConfig.Location = location
err = pxy.ctl.svr.httpReverseProxy.Register(routeConfig)
if err != nil {
return
}
tmpDomain := routeConfig.Domain
tmpLocation := routeConfig.Location
addrs = append(addrs, util.CanonicalAddr(tmpDomain, int(g.GlbServerCfg.VhostHttpPort)))
pxy.closeFuncs = append(pxy.closeFuncs, func() {
pxy.ctl.svr.httpReverseProxy.UnRegister(tmpDomain, tmpLocation)
})
pxy.Info("http proxy listen for host [%s] location [%s]", routeConfig.Domain, routeConfig.Location)
}
}
if pxy.cfg.SubDomain != "" {
routeConfig.Domain = pxy.cfg.SubDomain + "." + g.GlbServerCfg.SubDomainHost
for _, location := range locations {
routeConfig.Location = location
err = pxy.ctl.svr.httpReverseProxy.Register(routeConfig)
if err != nil {
return
}
tmpDomain := routeConfig.Domain
tmpLocation := routeConfig.Location
addrs = append(addrs, util.CanonicalAddr(tmpDomain, g.GlbServerCfg.VhostHttpPort))
pxy.closeFuncs = append(pxy.closeFuncs, func() {
pxy.ctl.svr.httpReverseProxy.UnRegister(tmpDomain, tmpLocation)
})
pxy.Info("http proxy listen for host [%s] location [%s]", routeConfig.Domain, routeConfig.Location)
}
}
remoteAddr = strings.Join(addrs, ",")
return
}
func (pxy *HttpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *HttpProxy) GetRealConn() (workConn frpNet.Conn, err error) {
tmpConn, errRet := pxy.GetWorkConnFromPool()
if errRet != nil {
err = errRet
return
}
var rwc io.ReadWriteCloser = tmpConn
if pxy.cfg.UseEncryption {
rwc, err = frpIo.WithEncryption(rwc, []byte(g.GlbServerCfg.Token))
if err != nil {
pxy.Error("create encryption stream error: %v", err)
return
}
}
if pxy.cfg.UseCompression {
rwc = frpIo.WithCompression(rwc)
}
workConn = frpNet.WrapReadWriteCloserToConn(rwc, tmpConn)
workConn = frpNet.WrapStatsConn(workConn, pxy.updateStatsAfterClosedConn)
StatsOpenConnection(pxy.GetName())
return
}
func (pxy *HttpProxy) updateStatsAfterClosedConn(totalRead, totalWrite int64) {
name := pxy.GetName()
StatsCloseConnection(name)
StatsAddTrafficIn(name, totalWrite)
StatsAddTrafficOut(name, totalRead)
}
func (pxy *HttpProxy) Close() {
pxy.BaseProxy.Close()
for _, closeFn := range pxy.closeFuncs {
closeFn()
}
}
type HttpsProxy struct {
BaseProxy
cfg *config.HttpsProxyConf
}
func (pxy *HttpsProxy) Run() (remoteAddr string, err error) {
routeConfig := &vhost.VhostRouteConfig{}
addrs := make([]string, 0)
for _, domain := range pxy.cfg.CustomDomains {
routeConfig.Domain = domain
l, errRet := pxy.ctl.svr.VhostHttpsMuxer.Listen(routeConfig)
if errRet != nil {
err = errRet
return
}
l.AddLogPrefix(pxy.name)
pxy.Info("https proxy listen for host [%s]", routeConfig.Domain)
pxy.listeners = append(pxy.listeners, l)
addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, g.GlbServerCfg.VhostHttpsPort))
}
if pxy.cfg.SubDomain != "" {
routeConfig.Domain = pxy.cfg.SubDomain + "." + g.GlbServerCfg.SubDomainHost
l, errRet := pxy.ctl.svr.VhostHttpsMuxer.Listen(routeConfig)
if errRet != nil {
err = errRet
return
}
l.AddLogPrefix(pxy.name)
pxy.Info("https proxy listen for host [%s]", routeConfig.Domain)
pxy.listeners = append(pxy.listeners, l)
addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, int(g.GlbServerCfg.VhostHttpsPort)))
}
pxy.startListenHandler(pxy, HandleUserTcpConnection)
remoteAddr = strings.Join(addrs, ",")
return
}
func (pxy *HttpsProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *HttpsProxy) Close() {
pxy.BaseProxy.Close()
}
type StcpProxy struct {
BaseProxy
cfg *config.StcpProxyConf
}
func (pxy *StcpProxy) Run() (remoteAddr string, err error) {
listener, errRet := pxy.ctl.svr.visitorManager.Listen(pxy.GetName(), pxy.cfg.Sk)
if errRet != nil {
err = errRet
return
}
listener.AddLogPrefix(pxy.name)
pxy.listeners = append(pxy.listeners, listener)
pxy.Info("stcp proxy custom listen success")
pxy.startListenHandler(pxy, HandleUserTcpConnection)
return
}
func (pxy *StcpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *StcpProxy) Close() {
pxy.BaseProxy.Close()
pxy.ctl.svr.visitorManager.CloseListener(pxy.GetName())
}
type XtcpProxy struct {
BaseProxy
cfg *config.XtcpProxyConf
closeCh chan struct{}
}
func (pxy *XtcpProxy) Run() (remoteAddr string, err error) {
if pxy.ctl.svr.natHoleController == nil {
pxy.Error("udp port for xtcp is not specified.")
err = fmt.Errorf("xtcp is not supported in frps")
return
}
sidCh := pxy.ctl.svr.natHoleController.ListenClient(pxy.GetName(), pxy.cfg.Sk)
go func() {
for {
select {
case <-pxy.closeCh:
break
case sid := <-sidCh:
workConn, errRet := pxy.GetWorkConnFromPool()
if errRet != nil {
continue
}
m := &msg.NatHoleSid{
Sid: sid,
}
errRet = msg.WriteMsg(workConn, m)
if errRet != nil {
pxy.Warn("write nat hole sid package error, %v", errRet)
}
}
}
}()
return
}
func (pxy *XtcpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *XtcpProxy) Close() {
pxy.BaseProxy.Close()
pxy.ctl.svr.natHoleController.CloseClient(pxy.GetName())
errors.PanicToError(func() {
close(pxy.closeCh)
})
}
type UdpProxy struct {
BaseProxy
cfg *config.UdpProxyConf
realPort int
// udpConn is the listener of udp packages
udpConn *net.UDPConn
// there are always only one workConn at the same time
// get another one if it closed
workConn net.Conn
// sendCh is used for sending packages to workConn
sendCh chan *msg.UdpPacket
// readCh is used for reading packages from workConn
readCh chan *msg.UdpPacket
// checkCloseCh is used for watching if workConn is closed
checkCloseCh chan int
isClosed bool
}
func (pxy *UdpProxy) Run() (remoteAddr string, err error) {
pxy.realPort, err = pxy.ctl.svr.udpPortManager.Acquire(pxy.name, pxy.cfg.RemotePort)
if err != nil {
return
}
defer func() {
if err != nil {
pxy.ctl.svr.udpPortManager.Release(pxy.realPort)
}
}()
remoteAddr = fmt.Sprintf(":%d", pxy.realPort)
pxy.cfg.RemotePort = pxy.realPort
addr, errRet := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", g.GlbServerCfg.ProxyBindAddr, pxy.realPort))
if errRet != nil {
err = errRet
return
}
udpConn, errRet := net.ListenUDP("udp", addr)
if errRet != nil {
err = errRet
pxy.Warn("listen udp port error: %v", err)
return
}
pxy.Info("udp proxy listen port [%d]", pxy.cfg.RemotePort)
pxy.udpConn = udpConn
pxy.sendCh = make(chan *msg.UdpPacket, 1024)
pxy.readCh = make(chan *msg.UdpPacket, 1024)
pxy.checkCloseCh = make(chan int)
// read message from workConn, if it returns any error, notify proxy to start a new workConn
workConnReaderFn := func(conn net.Conn) {
for {
var (
rawMsg msg.Message
errRet error
)
pxy.Trace("loop waiting message from udp workConn")
// client will send heartbeat in workConn for keeping alive
conn.SetReadDeadline(time.Now().Add(time.Duration(60) * time.Second))
if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil {
pxy.Warn("read from workConn for udp error: %v", errRet)
conn.Close()
// notify proxy to start a new work connection
// ignore error here, it means the proxy is closed
errors.PanicToError(func() {
pxy.checkCloseCh <- 1
})
return
}
conn.SetReadDeadline(time.Time{})
switch m := rawMsg.(type) {
case *msg.Ping:
pxy.Trace("udp work conn get ping message")
continue
case *msg.UdpPacket:
if errRet := errors.PanicToError(func() {
pxy.Trace("get udp message from workConn: %s", m.Content)
pxy.readCh <- m
StatsAddTrafficOut(pxy.GetName(), int64(len(m.Content)))
}); errRet != nil {
conn.Close()
pxy.Info("reader goroutine for udp work connection closed")
return
}
}
}
}
// send message to workConn
workConnSenderFn := func(conn net.Conn, ctx context.Context) {
var errRet error
for {
select {
case udpMsg, ok := <-pxy.sendCh:
if !ok {
pxy.Info("sender goroutine for udp work connection closed")
return
}
if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil {
pxy.Info("sender goroutine for udp work connection closed: %v", errRet)
conn.Close()
return
} else {
pxy.Trace("send message to udp workConn: %s", udpMsg.Content)
StatsAddTrafficIn(pxy.GetName(), int64(len(udpMsg.Content)))
continue
}
case <-ctx.Done():
pxy.Info("sender goroutine for udp work connection closed")
return
}
}
}
go func() {
// Sleep a while for waiting control send the NewProxyResp to client.
time.Sleep(500 * time.Millisecond)
for {
workConn, err := pxy.GetWorkConnFromPool()
if err != nil {
time.Sleep(1 * time.Second)
// check if proxy is closed
select {
case _, ok := <-pxy.checkCloseCh:
if !ok {
return
}
default:
}
continue
}
// close the old workConn and replac it with a new one
if pxy.workConn != nil {
pxy.workConn.Close()
}
pxy.workConn = workConn
ctx, cancel := context.WithCancel(context.Background())
go workConnReaderFn(workConn)
go workConnSenderFn(workConn, ctx)
_, ok := <-pxy.checkCloseCh
cancel()
if !ok {
return
}
}
}()
// Read from user connections and send wrapped udp message to sendCh (forwarded by workConn).
// Client will transfor udp message to local udp service and waiting for response for a while.
// Response will be wrapped to be forwarded by work connection to server.
// Close readCh and sendCh at the end.
go func() {
udp.ForwardUserConn(udpConn, pxy.readCh, pxy.sendCh)
pxy.Close()
}()
return remoteAddr, nil
}
func (pxy *UdpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *UdpProxy) Close() {
pxy.mu.Lock()
defer pxy.mu.Unlock()
if !pxy.isClosed {
pxy.isClosed = true
pxy.BaseProxy.Close()
if pxy.workConn != nil {
pxy.workConn.Close()
}
pxy.udpConn.Close()
// all channels only closed here
close(pxy.checkCloseCh)
close(pxy.readCh)
close(pxy.sendCh)
}
pxy.ctl.svr.udpPortManager.Release(pxy.realPort)
}
// HandleUserTcpConnection is used for incoming tcp user connections.
// It can be used for tcp, http, https type.
func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn) {
defer userConn.Close()
// try all connections from the pool
workConn, err := pxy.GetWorkConnFromPool()
if err != nil {
return
}
defer workConn.Close()
var local io.ReadWriteCloser = workConn
cfg := pxy.GetConf().GetBaseInfo()
if cfg.UseEncryption {
local, err = frpIo.WithEncryption(local, []byte(g.GlbServerCfg.Token))
if err != nil {
pxy.Error("create encryption stream error: %v", err)
return
}
}
if cfg.UseCompression {
local = frpIo.WithCompression(local)
}
pxy.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
StatsOpenConnection(pxy.GetName())
inCount, outCount := frpIo.Join(local, userConn)
StatsCloseConnection(pxy.GetName())
StatsAddTrafficIn(pxy.GetName(), inCount)
StatsAddTrafficOut(pxy.GetName(), outCount)
pxy.Debug("join connections closed")
}

138
server/proxy/http.go Normal file
View File

@ -0,0 +1,138 @@
// Copyright 2019 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"io"
"strings"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/server/stats"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/util"
"github.com/fatedier/frp/utils/vhost"
frpIo "github.com/fatedier/golib/io"
)
type HttpProxy struct {
BaseProxy
cfg *config.HttpProxyConf
closeFuncs []func()
}
func (pxy *HttpProxy) Run() (remoteAddr string, err error) {
routeConfig := vhost.VhostRouteConfig{
RewriteHost: pxy.cfg.HostHeaderRewrite,
Headers: pxy.cfg.Headers,
Username: pxy.cfg.HttpUser,
Password: pxy.cfg.HttpPwd,
CreateConnFn: pxy.GetRealConn,
}
locations := pxy.cfg.Locations
if len(locations) == 0 {
locations = []string{""}
}
addrs := make([]string, 0)
for _, domain := range pxy.cfg.CustomDomains {
routeConfig.Domain = domain
for _, location := range locations {
routeConfig.Location = location
err = pxy.rc.HttpReverseProxy.Register(routeConfig)
if err != nil {
return
}
tmpDomain := routeConfig.Domain
tmpLocation := routeConfig.Location
addrs = append(addrs, util.CanonicalAddr(tmpDomain, int(g.GlbServerCfg.VhostHttpPort)))
pxy.closeFuncs = append(pxy.closeFuncs, func() {
pxy.rc.HttpReverseProxy.UnRegister(tmpDomain, tmpLocation)
})
pxy.Info("http proxy listen for host [%s] location [%s]", routeConfig.Domain, routeConfig.Location)
}
}
if pxy.cfg.SubDomain != "" {
routeConfig.Domain = pxy.cfg.SubDomain + "." + g.GlbServerCfg.SubDomainHost
for _, location := range locations {
routeConfig.Location = location
err = pxy.rc.HttpReverseProxy.Register(routeConfig)
if err != nil {
return
}
tmpDomain := routeConfig.Domain
tmpLocation := routeConfig.Location
addrs = append(addrs, util.CanonicalAddr(tmpDomain, g.GlbServerCfg.VhostHttpPort))
pxy.closeFuncs = append(pxy.closeFuncs, func() {
pxy.rc.HttpReverseProxy.UnRegister(tmpDomain, tmpLocation)
})
pxy.Info("http proxy listen for host [%s] location [%s]", routeConfig.Domain, routeConfig.Location)
}
}
remoteAddr = strings.Join(addrs, ",")
return
}
func (pxy *HttpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *HttpProxy) GetRealConn() (workConn frpNet.Conn, err error) {
tmpConn, errRet := pxy.GetWorkConnFromPool()
if errRet != nil {
err = errRet
return
}
var rwc io.ReadWriteCloser = tmpConn
if pxy.cfg.UseEncryption {
rwc, err = frpIo.WithEncryption(rwc, []byte(g.GlbServerCfg.Token))
if err != nil {
pxy.Error("create encryption stream error: %v", err)
return
}
}
if pxy.cfg.UseCompression {
rwc = frpIo.WithCompression(rwc)
}
workConn = frpNet.WrapReadWriteCloserToConn(rwc, tmpConn)
workConn = frpNet.WrapStatsConn(workConn, pxy.updateStatsAfterClosedConn)
pxy.statsCollector.Mark(stats.TypeOpenConnection, &stats.OpenConnectionPayload{ProxyName: pxy.GetName()})
return
}
func (pxy *HttpProxy) updateStatsAfterClosedConn(totalRead, totalWrite int64) {
name := pxy.GetName()
pxy.statsCollector.Mark(stats.TypeCloseProxy, &stats.CloseConnectionPayload{ProxyName: name})
pxy.statsCollector.Mark(stats.TypeAddTrafficIn, &stats.AddTrafficInPayload{
ProxyName: name,
TrafficBytes: totalWrite,
})
pxy.statsCollector.Mark(stats.TypeAddTrafficOut, &stats.AddTrafficOutPayload{
ProxyName: name,
TrafficBytes: totalRead,
})
}
func (pxy *HttpProxy) Close() {
pxy.BaseProxy.Close()
for _, closeFn := range pxy.closeFuncs {
closeFn()
}
}

72
server/proxy/https.go Normal file
View File

@ -0,0 +1,72 @@
// Copyright 2019 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"strings"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/utils/util"
"github.com/fatedier/frp/utils/vhost"
)
type HttpsProxy struct {
BaseProxy
cfg *config.HttpsProxyConf
}
func (pxy *HttpsProxy) Run() (remoteAddr string, err error) {
routeConfig := &vhost.VhostRouteConfig{}
addrs := make([]string, 0)
for _, domain := range pxy.cfg.CustomDomains {
routeConfig.Domain = domain
l, errRet := pxy.rc.VhostHttpsMuxer.Listen(routeConfig)
if errRet != nil {
err = errRet
return
}
l.AddLogPrefix(pxy.name)
pxy.Info("https proxy listen for host [%s]", routeConfig.Domain)
pxy.listeners = append(pxy.listeners, l)
addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, g.GlbServerCfg.VhostHttpsPort))
}
if pxy.cfg.SubDomain != "" {
routeConfig.Domain = pxy.cfg.SubDomain + "." + g.GlbServerCfg.SubDomainHost
l, errRet := pxy.rc.VhostHttpsMuxer.Listen(routeConfig)
if errRet != nil {
err = errRet
return
}
l.AddLogPrefix(pxy.name)
pxy.Info("https proxy listen for host [%s]", routeConfig.Domain)
pxy.listeners = append(pxy.listeners, l)
addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, int(g.GlbServerCfg.VhostHttpsPort)))
}
pxy.startListenHandler(pxy, HandleUserTcpConnection)
remoteAddr = strings.Join(addrs, ",")
return
}
func (pxy *HttpsProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *HttpsProxy) Close() {
pxy.BaseProxy.Close()
}

250
server/proxy/proxy.go Normal file
View File

@ -0,0 +1,250 @@
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"fmt"
"io"
"sync"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/server/controller"
"github.com/fatedier/frp/server/stats"
"github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net"
frpIo "github.com/fatedier/golib/io"
)
type GetWorkConnFn func() (frpNet.Conn, error)
type Proxy interface {
Run() (remoteAddr string, err error)
GetName() string
GetConf() config.ProxyConf
GetWorkConnFromPool() (workConn frpNet.Conn, err error)
GetUsedPortsNum() int
Close()
log.Logger
}
type BaseProxy struct {
name string
rc *controller.ResourceController
statsCollector stats.Collector
listeners []frpNet.Listener
usedPortsNum int
poolCount int
getWorkConnFn GetWorkConnFn
mu sync.RWMutex
log.Logger
}
func (pxy *BaseProxy) GetName() string {
return pxy.name
}
func (pxy *BaseProxy) GetUsedPortsNum() int {
return pxy.usedPortsNum
}
func (pxy *BaseProxy) Close() {
pxy.Info("proxy closing")
for _, l := range pxy.listeners {
l.Close()
}
}
func (pxy *BaseProxy) GetWorkConnFromPool() (workConn frpNet.Conn, err error) {
// try all connections from the pool
for i := 0; i < pxy.poolCount+1; i++ {
if workConn, err = pxy.getWorkConnFn(); err != nil {
pxy.Warn("failed to get work connection: %v", err)
return
}
pxy.Info("get a new work connection: [%s]", workConn.RemoteAddr().String())
workConn.AddLogPrefix(pxy.GetName())
err := msg.WriteMsg(workConn, &msg.StartWorkConn{
ProxyName: pxy.GetName(),
})
if err != nil {
workConn.Warn("failed to send message to work connection from pool: %v, times: %d", err, i)
workConn.Close()
} else {
break
}
}
if err != nil {
pxy.Error("try to get work connection failed in the end")
return
}
return
}
// startListenHandler start a goroutine handler for each listener.
// p: p will just be passed to handler(Proxy, frpNet.Conn).
// handler: each proxy type can set different handler function to deal with connections accepted from listeners.
func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, frpNet.Conn, stats.Collector)) {
for _, listener := range pxy.listeners {
go func(l frpNet.Listener) {
for {
// block
// if listener is closed, err returned
c, err := l.Accept()
if err != nil {
pxy.Info("listener is closed")
return
}
pxy.Debug("get a user connection [%s]", c.RemoteAddr().String())
go handler(p, c, pxy.statsCollector)
}
}(listener)
}
}
func NewProxy(runId string, rc *controller.ResourceController, statsCollector stats.Collector, poolCount int,
getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf) (pxy Proxy, err error) {
basePxy := BaseProxy{
name: pxyConf.GetBaseInfo().ProxyName,
rc: rc,
statsCollector: statsCollector,
listeners: make([]frpNet.Listener, 0),
poolCount: poolCount,
getWorkConnFn: getWorkConnFn,
Logger: log.NewPrefixLogger(runId),
}
switch cfg := pxyConf.(type) {
case *config.TcpProxyConf:
basePxy.usedPortsNum = 1
pxy = &TcpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.HttpProxyConf:
pxy = &HttpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.HttpsProxyConf:
pxy = &HttpsProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.UdpProxyConf:
basePxy.usedPortsNum = 1
pxy = &UdpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.StcpProxyConf:
pxy = &StcpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.XtcpProxyConf:
pxy = &XtcpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
default:
return pxy, fmt.Errorf("proxy type not support")
}
pxy.AddLogPrefix(pxy.GetName())
return
}
// HandleUserTcpConnection is used for incoming tcp user connections.
// It can be used for tcp, http, https type.
func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn, statsCollector stats.Collector) {
defer userConn.Close()
// try all connections from the pool
workConn, err := pxy.GetWorkConnFromPool()
if err != nil {
return
}
defer workConn.Close()
var local io.ReadWriteCloser = workConn
cfg := pxy.GetConf().GetBaseInfo()
if cfg.UseEncryption {
local, err = frpIo.WithEncryption(local, []byte(g.GlbServerCfg.Token))
if err != nil {
pxy.Error("create encryption stream error: %v", err)
return
}
}
if cfg.UseCompression {
local = frpIo.WithCompression(local)
}
pxy.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
statsCollector.Mark(stats.TypeOpenConnection, &stats.OpenConnectionPayload{ProxyName: pxy.GetName()})
inCount, outCount := frpIo.Join(local, userConn)
statsCollector.Mark(stats.TypeCloseConnection, &stats.CloseConnectionPayload{ProxyName: pxy.GetName()})
statsCollector.Mark(stats.TypeAddTrafficIn, &stats.AddTrafficInPayload{
ProxyName: pxy.GetName(),
TrafficBytes: inCount,
})
statsCollector.Mark(stats.TypeAddTrafficOut, &stats.AddTrafficOutPayload{
ProxyName: pxy.GetName(),
TrafficBytes: outCount,
})
pxy.Debug("join connections closed")
}
type ProxyManager struct {
// proxies indexed by proxy name
pxys map[string]Proxy
mu sync.RWMutex
}
func NewProxyManager() *ProxyManager {
return &ProxyManager{
pxys: make(map[string]Proxy),
}
}
func (pm *ProxyManager) Add(name string, pxy Proxy) error {
pm.mu.Lock()
defer pm.mu.Unlock()
if _, ok := pm.pxys[name]; ok {
return fmt.Errorf("proxy name [%s] is already in use", name)
}
pm.pxys[name] = pxy
return nil
}
func (pm *ProxyManager) Del(name string) {
pm.mu.Lock()
defer pm.mu.Unlock()
delete(pm.pxys, name)
}
func (pm *ProxyManager) GetByName(name string) (pxy Proxy, ok bool) {
pm.mu.RLock()
defer pm.mu.RUnlock()
pxy, ok = pm.pxys[name]
return
}

47
server/proxy/stcp.go Normal file
View File

@ -0,0 +1,47 @@
// Copyright 2019 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"github.com/fatedier/frp/models/config"
)
type StcpProxy struct {
BaseProxy
cfg *config.StcpProxyConf
}
func (pxy *StcpProxy) Run() (remoteAddr string, err error) {
listener, errRet := pxy.rc.VisitorManager.Listen(pxy.GetName(), pxy.cfg.Sk)
if errRet != nil {
err = errRet
return
}
listener.AddLogPrefix(pxy.name)
pxy.listeners = append(pxy.listeners, listener)
pxy.Info("stcp proxy custom listen success")
pxy.startListenHandler(pxy, HandleUserTcpConnection)
return
}
func (pxy *StcpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *StcpProxy) Close() {
pxy.BaseProxy.Close()
pxy.rc.VisitorManager.CloseListener(pxy.GetName())
}

84
server/proxy/tcp.go Normal file
View File

@ -0,0 +1,84 @@
// Copyright 2019 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"fmt"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/config"
frpNet "github.com/fatedier/frp/utils/net"
)
type TcpProxy struct {
BaseProxy
cfg *config.TcpProxyConf
realPort int
}
func (pxy *TcpProxy) Run() (remoteAddr string, err error) {
if pxy.cfg.Group != "" {
l, realPort, errRet := pxy.rc.TcpGroupCtl.Listen(pxy.name, pxy.cfg.Group, pxy.cfg.GroupKey, g.GlbServerCfg.ProxyBindAddr, pxy.cfg.RemotePort)
if errRet != nil {
err = errRet
return
}
defer func() {
if err != nil {
l.Close()
}
}()
pxy.realPort = realPort
listener := frpNet.WrapLogListener(l)
listener.AddLogPrefix(pxy.name)
pxy.listeners = append(pxy.listeners, listener)
pxy.Info("tcp proxy listen port [%d] in group [%s]", pxy.cfg.RemotePort, pxy.cfg.Group)
} else {
pxy.realPort, err = pxy.rc.TcpPortManager.Acquire(pxy.name, pxy.cfg.RemotePort)
if err != nil {
return
}
defer func() {
if err != nil {
pxy.rc.TcpPortManager.Release(pxy.realPort)
}
}()
listener, errRet := frpNet.ListenTcp(g.GlbServerCfg.ProxyBindAddr, pxy.realPort)
if errRet != nil {
err = errRet
return
}
listener.AddLogPrefix(pxy.name)
pxy.listeners = append(pxy.listeners, listener)
pxy.Info("tcp proxy listen port [%d]", pxy.cfg.RemotePort)
}
pxy.cfg.RemotePort = pxy.realPort
remoteAddr = fmt.Sprintf(":%d", pxy.realPort)
pxy.startListenHandler(pxy, HandleUserTcpConnection)
return
}
func (pxy *TcpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *TcpProxy) Close() {
pxy.BaseProxy.Close()
if pxy.cfg.Group == "" {
pxy.rc.TcpPortManager.Release(pxy.realPort)
}
}

225
server/proxy/udp.go Normal file
View File

@ -0,0 +1,225 @@
// Copyright 2019 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"context"
"fmt"
"net"
"time"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/models/proto/udp"
"github.com/fatedier/frp/server/stats"
"github.com/fatedier/golib/errors"
)
type UdpProxy struct {
BaseProxy
cfg *config.UdpProxyConf
realPort int
// udpConn is the listener of udp packages
udpConn *net.UDPConn
// there are always only one workConn at the same time
// get another one if it closed
workConn net.Conn
// sendCh is used for sending packages to workConn
sendCh chan *msg.UdpPacket
// readCh is used for reading packages from workConn
readCh chan *msg.UdpPacket
// checkCloseCh is used for watching if workConn is closed
checkCloseCh chan int
isClosed bool
}
func (pxy *UdpProxy) Run() (remoteAddr string, err error) {
pxy.realPort, err = pxy.rc.UdpPortManager.Acquire(pxy.name, pxy.cfg.RemotePort)
if err != nil {
return
}
defer func() {
if err != nil {
pxy.rc.UdpPortManager.Release(pxy.realPort)
}
}()
remoteAddr = fmt.Sprintf(":%d", pxy.realPort)
pxy.cfg.RemotePort = pxy.realPort
addr, errRet := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", g.GlbServerCfg.ProxyBindAddr, pxy.realPort))
if errRet != nil {
err = errRet
return
}
udpConn, errRet := net.ListenUDP("udp", addr)
if errRet != nil {
err = errRet
pxy.Warn("listen udp port error: %v", err)
return
}
pxy.Info("udp proxy listen port [%d]", pxy.cfg.RemotePort)
pxy.udpConn = udpConn
pxy.sendCh = make(chan *msg.UdpPacket, 1024)
pxy.readCh = make(chan *msg.UdpPacket, 1024)
pxy.checkCloseCh = make(chan int)
// read message from workConn, if it returns any error, notify proxy to start a new workConn
workConnReaderFn := func(conn net.Conn) {
for {
var (
rawMsg msg.Message
errRet error
)
pxy.Trace("loop waiting message from udp workConn")
// client will send heartbeat in workConn for keeping alive
conn.SetReadDeadline(time.Now().Add(time.Duration(60) * time.Second))
if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil {
pxy.Warn("read from workConn for udp error: %v", errRet)
conn.Close()
// notify proxy to start a new work connection
// ignore error here, it means the proxy is closed
errors.PanicToError(func() {
pxy.checkCloseCh <- 1
})
return
}
conn.SetReadDeadline(time.Time{})
switch m := rawMsg.(type) {
case *msg.Ping:
pxy.Trace("udp work conn get ping message")
continue
case *msg.UdpPacket:
if errRet := errors.PanicToError(func() {
pxy.Trace("get udp message from workConn: %s", m.Content)
pxy.readCh <- m
pxy.statsCollector.Mark(stats.TypeAddTrafficOut, &stats.AddTrafficOutPayload{
ProxyName: pxy.GetName(),
TrafficBytes: int64(len(m.Content)),
})
}); errRet != nil {
conn.Close()
pxy.Info("reader goroutine for udp work connection closed")
return
}
}
}
}
// send message to workConn
workConnSenderFn := func(conn net.Conn, ctx context.Context) {
var errRet error
for {
select {
case udpMsg, ok := <-pxy.sendCh:
if !ok {
pxy.Info("sender goroutine for udp work connection closed")
return
}
if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil {
pxy.Info("sender goroutine for udp work connection closed: %v", errRet)
conn.Close()
return
} else {
pxy.Trace("send message to udp workConn: %s", udpMsg.Content)
pxy.statsCollector.Mark(stats.TypeAddTrafficIn, &stats.AddTrafficInPayload{
ProxyName: pxy.GetName(),
TrafficBytes: int64(len(udpMsg.Content)),
})
continue
}
case <-ctx.Done():
pxy.Info("sender goroutine for udp work connection closed")
return
}
}
}
go func() {
// Sleep a while for waiting control send the NewProxyResp to client.
time.Sleep(500 * time.Millisecond)
for {
workConn, err := pxy.GetWorkConnFromPool()
if err != nil {
time.Sleep(1 * time.Second)
// check if proxy is closed
select {
case _, ok := <-pxy.checkCloseCh:
if !ok {
return
}
default:
}
continue
}
// close the old workConn and replac it with a new one
if pxy.workConn != nil {
pxy.workConn.Close()
}
pxy.workConn = workConn
ctx, cancel := context.WithCancel(context.Background())
go workConnReaderFn(workConn)
go workConnSenderFn(workConn, ctx)
_, ok := <-pxy.checkCloseCh
cancel()
if !ok {
return
}
}
}()
// Read from user connections and send wrapped udp message to sendCh (forwarded by workConn).
// Client will transfor udp message to local udp service and waiting for response for a while.
// Response will be wrapped to be forwarded by work connection to server.
// Close readCh and sendCh at the end.
go func() {
udp.ForwardUserConn(udpConn, pxy.readCh, pxy.sendCh)
pxy.Close()
}()
return remoteAddr, nil
}
func (pxy *UdpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *UdpProxy) Close() {
pxy.mu.Lock()
defer pxy.mu.Unlock()
if !pxy.isClosed {
pxy.isClosed = true
pxy.BaseProxy.Close()
if pxy.workConn != nil {
pxy.workConn.Close()
}
pxy.udpConn.Close()
// all channels only closed here
close(pxy.checkCloseCh)
close(pxy.readCh)
close(pxy.sendCh)
}
pxy.rc.UdpPortManager.Release(pxy.realPort)
}

73
server/proxy/xtcp.go Normal file
View File

@ -0,0 +1,73 @@
// Copyright 2019 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"fmt"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/golib/errors"
)
type XtcpProxy struct {
BaseProxy
cfg *config.XtcpProxyConf
closeCh chan struct{}
}
func (pxy *XtcpProxy) Run() (remoteAddr string, err error) {
if pxy.rc.NatHoleController == nil {
pxy.Error("udp port for xtcp is not specified.")
err = fmt.Errorf("xtcp is not supported in frps")
return
}
sidCh := pxy.rc.NatHoleController.ListenClient(pxy.GetName(), pxy.cfg.Sk)
go func() {
for {
select {
case <-pxy.closeCh:
break
case sid := <-sidCh:
workConn, errRet := pxy.GetWorkConnFromPool()
if errRet != nil {
continue
}
m := &msg.NatHoleSid{
Sid: sid,
}
errRet = msg.WriteMsg(workConn, m)
if errRet != nil {
pxy.Warn("write nat hole sid package error, %v", errRet)
}
}
}
}()
return
}
func (pxy *XtcpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *XtcpProxy) Close() {
pxy.BaseProxy.Close()
pxy.rc.NatHoleController.CloseClient(pxy.GetName())
errors.PanicToError(func() {
close(pxy.closeCh)
})
}

View File

@ -25,8 +25,12 @@ import (
"github.com/fatedier/frp/assets"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/models/nathole"
"github.com/fatedier/frp/server/controller"
"github.com/fatedier/frp/server/group"
"github.com/fatedier/frp/server/ports"
"github.com/fatedier/frp/server/proxy"
"github.com/fatedier/frp/server/stats"
"github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/util"
@ -57,43 +61,33 @@ type Service struct {
// Accept connections using websocket
websocketListener frpNet.Listener
// For https proxies, route requests to different clients by hostname and other infomation
VhostHttpsMuxer *vhost.HttpsMuxer
httpReverseProxy *vhost.HttpReverseProxy
// Manage all controllers
ctlManager *ControlManager
// Manage all proxies
pxyManager *ProxyManager
pxyManager *proxy.ProxyManager
// Manage all visitor listeners
visitorManager *VisitorManager
// All resource managers and controllers
rc *controller.ResourceController
// Manage all tcp ports
tcpPortManager *ports.PortManager
// Manage all udp ports
udpPortManager *ports.PortManager
// Tcp Group Controller
tcpGroupCtl *group.TcpGroupCtl
// Controller for nat hole connections
natHoleController *NatHoleController
// stats collector to store server and proxies stats info
statsCollector stats.Collector
}
func NewService() (svr *Service, err error) {
cfg := &g.GlbServerCfg.ServerCommonConf
svr = &Service{
ctlManager: NewControlManager(),
pxyManager: NewProxyManager(),
visitorManager: NewVisitorManager(),
tcpPortManager: ports.NewPortManager("tcp", cfg.ProxyBindAddr, cfg.AllowPorts),
udpPortManager: ports.NewPortManager("udp", cfg.ProxyBindAddr, cfg.AllowPorts),
ctlManager: NewControlManager(),
pxyManager: proxy.NewProxyManager(),
rc: &controller.ResourceController{
VisitorManager: controller.NewVisitorManager(),
TcpPortManager: ports.NewPortManager("tcp", cfg.ProxyBindAddr, cfg.AllowPorts),
UdpPortManager: ports.NewPortManager("udp", cfg.ProxyBindAddr, cfg.AllowPorts),
},
}
svr.tcpGroupCtl = group.NewTcpGroupCtl(svr.tcpPortManager)
// Init group controller
svr.rc.TcpGroupCtl = group.NewTcpGroupCtl(svr.rc.TcpPortManager)
// Init assets.
err = assets.Load(cfg.AssetsDir)
@ -151,7 +145,7 @@ func NewService() (svr *Service, err error) {
rp := vhost.NewHttpReverseProxy(vhost.HttpReverseProxyOptions{
ResponseHeaderTimeoutS: cfg.VhostHttpTimeout,
})
svr.httpReverseProxy = rp
svr.rc.HttpReverseProxy = rp
address := fmt.Sprintf("%s:%d", cfg.ProxyBindAddr, cfg.VhostHttpPort)
server := &http.Server{
@ -185,7 +179,7 @@ func NewService() (svr *Service, err error) {
}
}
svr.VhostHttpsMuxer, err = vhost.NewHttpsMuxer(frpNet.WrapLogListener(l), 30*time.Second)
svr.rc.VhostHttpsMuxer, err = vhost.NewHttpsMuxer(frpNet.WrapLogListener(l), 30*time.Second)
if err != nil {
err = fmt.Errorf("Create vhost httpsMuxer error, %v", err)
return
@ -195,33 +189,36 @@ func NewService() (svr *Service, err error) {
// Create nat hole controller.
if cfg.BindUdpPort > 0 {
var nc *NatHoleController
var nc *nathole.NatHoleController
addr := fmt.Sprintf("%s:%d", cfg.BindAddr, cfg.BindUdpPort)
nc, err = NewNatHoleController(addr)
nc, err = nathole.NewNatHoleController(addr)
if err != nil {
err = fmt.Errorf("Create nat hole controller error, %v", err)
return
}
svr.natHoleController = nc
svr.rc.NatHoleController = nc
log.Info("nat hole udp service listen on %s:%d", cfg.BindAddr, cfg.BindUdpPort)
}
var statsEnable bool
// Create dashboard web server.
if cfg.DashboardPort > 0 {
err = RunDashboardServer(cfg.DashboardAddr, cfg.DashboardPort)
err = svr.RunDashboardServer(cfg.DashboardAddr, cfg.DashboardPort)
if err != nil {
err = fmt.Errorf("Create dashboard web server error, %v", err)
return
}
log.Info("Dashboard listen on %s:%d", cfg.DashboardAddr, cfg.DashboardPort)
statsEnable = true
}
svr.statsCollector = stats.NewInternalCollector(statsEnable)
return
}
func (svr *Service) Run() {
if svr.natHoleController != nil {
go svr.natHoleController.Run()
if svr.rc.NatHoleController != nil {
go svr.rc.NatHoleController.Run()
}
if g.GlbServerCfg.KcpBindPort > 0 {
go svr.HandleListener(svr.kcpListener)
@ -327,11 +324,6 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e
}
// Check auth.
nowTime := time.Now().Unix()
if g.GlbServerCfg.AuthTimeout != 0 && nowTime-loginMsg.Timestamp > g.GlbServerCfg.AuthTimeout {
err = fmt.Errorf("authorization timeout")
return
}
if util.GetAuthKey(g.GlbServerCfg.Token, loginMsg.Timestamp) != loginMsg.PrivilegeKey {
err = fmt.Errorf("authorization failed")
return
@ -346,7 +338,7 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e
}
}
ctl := NewControl(svr, ctlConn, loginMsg)
ctl := NewControl(svr.rc, svr.pxyManager, svr.statsCollector, ctlConn, loginMsg)
if oldCtl := svr.ctlManager.Add(loginMsg.RunId, ctl); oldCtl != nil {
oldCtl.allShutdown.WaitDone()
@ -356,7 +348,13 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e
ctl.Start()
// for statistics
StatsNewClient()
svr.statsCollector.Mark(stats.TypeNewClient, &stats.NewClientPayload{})
go func() {
// block until control closed
ctl.WaitClosed()
svr.ctlManager.Del(loginMsg.RunId)
}()
return
}
@ -372,14 +370,6 @@ func (svr *Service) RegisterWorkConn(workConn frpNet.Conn, newMsg *msg.NewWorkCo
}
func (svr *Service) RegisterVisitorConn(visitorConn frpNet.Conn, newMsg *msg.NewVisitorConn) error {
return svr.visitorManager.NewConn(newMsg.ProxyName, visitorConn, newMsg.Timestamp, newMsg.SignKey,
return svr.rc.VisitorManager.NewConn(newMsg.ProxyName, visitorConn, newMsg.Timestamp, newMsg.SignKey,
newMsg.UseEncryption, newMsg.UseCompression)
}
func (svr *Service) RegisterProxy(name string, pxy Proxy) error {
return svr.pxyManager.Add(name, pxy)
}
func (svr *Service) DelProxy(name string) {
svr.pxyManager.Del(name)
}

273
server/stats/internal.go Normal file
View File

@ -0,0 +1,273 @@
// Copyright 2019 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stats
import (
"sync"
"time"
"github.com/fatedier/frp/utils/log"
"github.com/fatedier/frp/utils/metric"
)
type internalCollector struct {
enable bool
info *ServerStatistics
mu sync.Mutex
}
func NewInternalCollector(enable bool) Collector {
return &internalCollector{
enable: enable,
info: &ServerStatistics{
TotalTrafficIn: metric.NewDateCounter(ReserveDays),
TotalTrafficOut: metric.NewDateCounter(ReserveDays),
CurConns: metric.NewCounter(),
ClientCounts: metric.NewCounter(),
ProxyTypeCounts: make(map[string]metric.Counter),
ProxyStatistics: make(map[string]*ProxyStatistics),
},
}
}
func (collector *internalCollector) Run() error {
go func() {
for {
time.Sleep(12 * time.Hour)
log.Debug("start to clear useless proxy statistics data...")
collector.ClearUselessInfo()
log.Debug("finish to clear useless proxy statistics data")
}
}()
return nil
}
func (collector *internalCollector) ClearUselessInfo() {
// To check if there are proxies that closed than 7 days and drop them.
collector.mu.Lock()
defer collector.mu.Unlock()
for name, data := range collector.info.ProxyStatistics {
if !data.LastCloseTime.IsZero() && time.Since(data.LastCloseTime) > time.Duration(7*24)*time.Hour {
delete(collector.info.ProxyStatistics, name)
log.Trace("clear proxy [%s]'s statistics data, lastCloseTime: [%s]", name, data.LastCloseTime.String())
}
}
}
func (collector *internalCollector) Mark(statsType StatsType, payload interface{}) {
if !collector.enable {
return
}
switch v := payload.(type) {
case *NewClientPayload:
collector.newClient(v)
case *CloseClientPayload:
collector.closeClient(v)
case *OpenConnectionPayload:
collector.openConnection(v)
case *CloseConnectionPayload:
collector.closeConnection(v)
case *AddTrafficInPayload:
collector.addTrafficIn(v)
case *AddTrafficOutPayload:
collector.addTrafficOut(v)
}
}
func (collector *internalCollector) newClient(payload *NewClientPayload) {
collector.info.ClientCounts.Inc(1)
}
func (collector *internalCollector) closeClient(payload *CloseClientPayload) {
collector.info.ClientCounts.Dec(1)
}
func (collector *internalCollector) newProxy(payload *NewProxyPayload) {
collector.mu.Lock()
defer collector.mu.Unlock()
counter, ok := collector.info.ProxyTypeCounts[payload.ProxyType]
if !ok {
counter = metric.NewCounter()
}
counter.Inc(1)
collector.info.ProxyTypeCounts[payload.ProxyType] = counter
proxyStats, ok := collector.info.ProxyStatistics[payload.Name]
if !(ok && proxyStats.ProxyType == payload.ProxyType) {
proxyStats = &ProxyStatistics{
Name: payload.Name,
ProxyType: payload.ProxyType,
CurConns: metric.NewCounter(),
TrafficIn: metric.NewDateCounter(ReserveDays),
TrafficOut: metric.NewDateCounter(ReserveDays),
}
collector.info.ProxyStatistics[payload.Name] = proxyStats
}
proxyStats.LastStartTime = time.Now()
}
func (collector *internalCollector) closeProxy(payload *CloseProxyPayload) {
collector.mu.Lock()
defer collector.mu.Unlock()
if counter, ok := collector.info.ProxyTypeCounts[payload.ProxyType]; ok {
counter.Dec(1)
}
if proxyStats, ok := collector.info.ProxyStatistics[payload.Name]; ok {
proxyStats.LastCloseTime = time.Now()
}
}
func (collector *internalCollector) openConnection(payload *OpenConnectionPayload) {
collector.info.CurConns.Inc(1)
collector.mu.Lock()
defer collector.mu.Unlock()
proxyStats, ok := collector.info.ProxyStatistics[payload.ProxyName]
if ok {
proxyStats.CurConns.Inc(1)
collector.info.ProxyStatistics[payload.ProxyName] = proxyStats
}
}
func (collector *internalCollector) closeConnection(payload *CloseConnectionPayload) {
collector.info.CurConns.Dec(1)
collector.mu.Lock()
defer collector.mu.Unlock()
proxyStats, ok := collector.info.ProxyStatistics[payload.ProxyName]
if ok {
proxyStats.CurConns.Dec(1)
collector.info.ProxyStatistics[payload.ProxyName] = proxyStats
}
}
func (collector *internalCollector) addTrafficIn(payload *AddTrafficInPayload) {
collector.info.TotalTrafficIn.Inc(payload.TrafficBytes)
collector.mu.Lock()
defer collector.mu.Unlock()
proxyStats, ok := collector.info.ProxyStatistics[payload.ProxyName]
if ok {
proxyStats.TrafficIn.Inc(payload.TrafficBytes)
collector.info.ProxyStatistics[payload.ProxyName] = proxyStats
}
}
func (collector *internalCollector) addTrafficOut(payload *AddTrafficOutPayload) {
collector.info.TotalTrafficOut.Inc(payload.TrafficBytes)
collector.mu.Lock()
defer collector.mu.Unlock()
proxyStats, ok := collector.info.ProxyStatistics[payload.ProxyName]
if ok {
proxyStats.TrafficOut.Inc(payload.TrafficBytes)
collector.info.ProxyStatistics[payload.ProxyName] = proxyStats
}
}
func (collector *internalCollector) GetServer() *ServerStats {
collector.mu.Lock()
defer collector.mu.Unlock()
s := &ServerStats{
TotalTrafficIn: collector.info.TotalTrafficIn.TodayCount(),
TotalTrafficOut: collector.info.TotalTrafficOut.TodayCount(),
CurConns: collector.info.CurConns.Count(),
ClientCounts: collector.info.ClientCounts.Count(),
ProxyTypeCounts: make(map[string]int64),
}
for k, v := range collector.info.ProxyTypeCounts {
s.ProxyTypeCounts[k] = v.Count()
}
return s
}
func (collector *internalCollector) GetProxiesByType(proxyType string) []*ProxyStats {
res := make([]*ProxyStats, 0)
collector.mu.Lock()
defer collector.mu.Unlock()
for name, proxyStats := range collector.info.ProxyStatistics {
if proxyStats.ProxyType != proxyType {
continue
}
ps := &ProxyStats{
Name: name,
Type: proxyStats.ProxyType,
TodayTrafficIn: proxyStats.TrafficIn.TodayCount(),
TodayTrafficOut: proxyStats.TrafficOut.TodayCount(),
CurConns: proxyStats.CurConns.Count(),
}
if !proxyStats.LastStartTime.IsZero() {
ps.LastStartTime = proxyStats.LastStartTime.Format("01-02 15:04:05")
}
if !proxyStats.LastCloseTime.IsZero() {
ps.LastCloseTime = proxyStats.LastCloseTime.Format("01-02 15:04:05")
}
res = append(res, ps)
}
return res
}
func (collector *internalCollector) GetProxiesByTypeAndName(proxyType string, proxyName string) (res *ProxyStats) {
collector.mu.Lock()
defer collector.mu.Unlock()
for name, proxyStats := range collector.info.ProxyStatistics {
if proxyStats.ProxyType != proxyType {
continue
}
if name != proxyName {
continue
}
res = &ProxyStats{
Name: name,
Type: proxyStats.ProxyType,
TodayTrafficIn: proxyStats.TrafficIn.TodayCount(),
TodayTrafficOut: proxyStats.TrafficOut.TodayCount(),
CurConns: proxyStats.CurConns.Count(),
}
if !proxyStats.LastStartTime.IsZero() {
res.LastStartTime = proxyStats.LastStartTime.Format("01-02 15:04:05")
}
if !proxyStats.LastCloseTime.IsZero() {
res.LastCloseTime = proxyStats.LastCloseTime.Format("01-02 15:04:05")
}
break
}
return
}
func (collector *internalCollector) GetProxyTraffic(name string) (res *ProxyTrafficInfo) {
collector.mu.Lock()
defer collector.mu.Unlock()
proxyStats, ok := collector.info.ProxyStatistics[name]
if ok {
res = &ProxyTrafficInfo{
Name: name,
}
res.TrafficIn = proxyStats.TrafficIn.GetLastDaysCount(ReserveDays)
res.TrafficOut = proxyStats.TrafficOut.GetLastDaysCount(ReserveDays)
}
return
}

129
server/stats/stats.go Normal file
View File

@ -0,0 +1,129 @@
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stats
import (
"time"
"github.com/fatedier/frp/utils/metric"
)
const (
ReserveDays = 7
)
type StatsType int
const (
TypeNewClient StatsType = iota
TypeCloseClient
TypeNewProxy
TypeCloseProxy
TypeOpenConnection
TypeCloseConnection
TypeAddTrafficIn
TypeAddTrafficOut
)
type ServerStats struct {
TotalTrafficIn int64
TotalTrafficOut int64
CurConns int64
ClientCounts int64
ProxyTypeCounts map[string]int64
}
type ProxyStats struct {
Name string
Type string
TodayTrafficIn int64
TodayTrafficOut int64
LastStartTime string
LastCloseTime string
CurConns int64
}
type ProxyTrafficInfo struct {
Name string
TrafficIn []int64
TrafficOut []int64
}
type ProxyStatistics struct {
Name string
ProxyType string
TrafficIn metric.DateCounter
TrafficOut metric.DateCounter
CurConns metric.Counter
LastStartTime time.Time
LastCloseTime time.Time
}
type ServerStatistics struct {
TotalTrafficIn metric.DateCounter
TotalTrafficOut metric.DateCounter
CurConns metric.Counter
// counter for clients
ClientCounts metric.Counter
// counter for proxy types
ProxyTypeCounts map[string]metric.Counter
// statistics for different proxies
// key is proxy name
ProxyStatistics map[string]*ProxyStatistics
}
type Collector interface {
Mark(statsType StatsType, payload interface{})
Run() error
GetServer() *ServerStats
GetProxiesByType(proxyType string) []*ProxyStats
GetProxiesByTypeAndName(proxyType string, proxyName string) *ProxyStats
GetProxyTraffic(name string) *ProxyTrafficInfo
}
type NewClientPayload struct{}
type CloseClientPayload struct{}
type NewProxyPayload struct {
Name string
ProxyType string
}
type CloseProxyPayload struct {
Name string
ProxyType string
}
type OpenConnectionPayload struct {
ProxyName string
}
type CloseConnectionPayload struct {
ProxyName string
}
type AddTrafficInPayload struct {
ProxyName string
TrafficBytes int64
}
type AddTrafficOutPayload struct {
ProxyName string
TrafficBytes int64
}

View File

@ -19,7 +19,7 @@ import (
"strings"
)
var version string = "0.22.0"
var version string = "0.23.0"
func Full() string {
return version

View File

@ -19,9 +19,6 @@
<el-form-item label="Https Port">
<span>{{ vhost_https_port }}</span>
</el-form-item>
<el-form-item label="Auth Timeout">
<span>{{ auth_timeout }}</span>
</el-form-item>
<el-form-item label="Subdomain Host">
<span>{{ subdomain_host }}</span>
</el-form-item>
@ -64,7 +61,6 @@
bind_udp_port: '',
vhost_http_port: '',
vhost_https_port: '',
auth_timeout: '',
subdomain_host: '',
max_pool_count: '',
max_ports_per_client: '',
@ -100,7 +96,6 @@
if (this.vhost_https_port == 0) {
this.vhost_https_port = "disable"
}
this.auth_timeout = json.auth_timeout
this.subdomain_host = json.subdomain_host
this.max_pool_count = json.max_pool_count
this.max_ports_per_client = json.max_ports_per_client