server: adjust code structure

This commit is contained in:
fatedier 2019-01-15 00:11:08 +08:00
parent 0c7d778896
commit 611d063e1f
17 changed files with 1458 additions and 1141 deletions

View File

@ -1,4 +1,4 @@
package server
package nathole
import (
"bytes"

View File

@ -26,6 +26,9 @@ import (
"github.com/fatedier/frp/models/consts"
frpErr "github.com/fatedier/frp/models/errors"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/server/controller"
"github.com/fatedier/frp/server/proxy"
"github.com/fatedier/frp/server/stats"
"github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/version"
@ -34,9 +37,53 @@ import (
"github.com/fatedier/golib/errors"
)
type ControlManager struct {
// controls indexed by run id
ctlsByRunId map[string]*Control
mu sync.RWMutex
}
func NewControlManager() *ControlManager {
return &ControlManager{
ctlsByRunId: make(map[string]*Control),
}
}
func (cm *ControlManager) Add(runId string, ctl *Control) (oldCtl *Control) {
cm.mu.Lock()
defer cm.mu.Unlock()
oldCtl, ok := cm.ctlsByRunId[runId]
if ok {
oldCtl.Replaced(ctl)
}
cm.ctlsByRunId[runId] = ctl
return
}
func (cm *ControlManager) Del(runId string) {
cm.mu.Lock()
defer cm.mu.Unlock()
delete(cm.ctlsByRunId, runId)
}
func (cm *ControlManager) GetById(runId string) (ctl *Control, ok bool) {
cm.mu.RLock()
defer cm.mu.RUnlock()
ctl, ok = cm.ctlsByRunId[runId]
return
}
type Control struct {
// all resource managers and controllers
rc *ResourceController
rc *controller.ResourceController
// proxy manager
pxyManager *proxy.ProxyManager
// stats collector to store stats info of clients and proxies
statsCollector stats.Collector
// login message
loginMsg *msg.Login
@ -54,7 +101,7 @@ type Control struct {
workConnCh chan net.Conn
// proxies in one client
proxies map[string]Proxy
proxies map[string]proxy.Proxy
// pool count
poolCount int
@ -81,15 +128,19 @@ type Control struct {
mu sync.RWMutex
}
func NewControl(rc *ResourceController, ctlConn net.Conn, loginMsg *msg.Login) *Control {
func NewControl(rc *controller.ResourceController, pxyManager *proxy.ProxyManager,
statsCollector stats.Collector, ctlConn net.Conn, loginMsg *msg.Login) *Control {
return &Control{
rc: rc,
pxyManager: pxyManager,
statsCollector: statsCollector,
conn: ctlConn,
loginMsg: loginMsg,
sendCh: make(chan msg.Message, 10),
readCh: make(chan msg.Message, 10),
workConnCh: make(chan net.Conn, loginMsg.PoolCount+10),
proxies: make(map[string]Proxy),
proxies: make(map[string]proxy.Proxy),
poolCount: loginMsg.PoolCount,
portsUsedNum: 0,
lastPing: time.Now(),
@ -284,15 +335,22 @@ func (ctl *Control) stoper() {
for _, pxy := range ctl.proxies {
pxy.Close()
ctl.rc.PxyManager.Del(pxy.GetName())
StatsCloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType)
ctl.pxyManager.Del(pxy.GetName())
ctl.statsCollector.Mark(stats.TypeCloseProxy, &stats.CloseProxyPayload{
Name: pxy.GetName(),
ProxyType: pxy.GetConf().GetBaseInfo().ProxyType,
})
}
ctl.allShutdown.Done()
ctl.rc.CtlManager.Del(ctl.runId)
ctl.conn.Info("client exit success")
StatsCloseClient()
ctl.statsCollector.Mark(stats.TypeCloseClient, &stats.CloseClientPayload{})
}
// block until Control closed
func (ctl *Control) WaitClosed() {
ctl.allShutdown.WaitDone()
}
func (ctl *Control) manager() {
@ -334,7 +392,10 @@ func (ctl *Control) manager() {
} else {
resp.RemoteAddr = remoteAddr
ctl.conn.Info("new proxy [%s] success", m.ProxyName)
StatsNewProxy(m.ProxyName, m.ProxyType)
ctl.statsCollector.Mark(stats.TypeNewProxy, &stats.NewProxyPayload{
Name: m.ProxyName,
ProxyType: m.ProxyType,
})
}
ctl.sendCh <- resp
case *msg.CloseProxy:
@ -359,7 +420,7 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err
// NewProxy will return a interface Proxy.
// In fact it create different proxies by different proxy type, we just call run() here.
pxy, err := NewProxy(ctl.runId, ctl.rc, ctl.poolCount, ctl.GetWorkConn, pxyConf)
pxy, err := proxy.NewProxy(ctl.runId, ctl.rc, ctl.statsCollector, ctl.poolCount, ctl.GetWorkConn, pxyConf)
if err != nil {
return remoteAddr, err
}
@ -394,7 +455,7 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err
}
}()
err = ctl.rc.PxyManager.Add(pxyMsg.ProxyName, pxy)
err = ctl.pxyManager.Add(pxyMsg.ProxyName, pxy)
if err != nil {
return
}
@ -417,10 +478,13 @@ func (ctl *Control) CloseProxy(closeMsg *msg.CloseProxy) (err error) {
ctl.portsUsedNum = ctl.portsUsedNum - pxy.GetUsedPortsNum()
}
pxy.Close()
ctl.rc.PxyManager.Del(pxy.GetName())
ctl.pxyManager.Del(pxy.GetName())
delete(ctl.proxies, closeMsg.ProxyName)
ctl.mu.Unlock()
StatsCloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType)
ctl.statsCollector.Mark(stats.TypeCloseProxy, &stats.CloseProxyPayload{
Name: pxy.GetName(),
ProxyType: pxy.GetConf().GetBaseInfo().ProxyType,
})
return
}

View File

@ -0,0 +1,46 @@
// Copyright 2019 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package controller
import (
"github.com/fatedier/frp/models/nathole"
"github.com/fatedier/frp/server/group"
"github.com/fatedier/frp/server/ports"
"github.com/fatedier/frp/utils/vhost"
)
// All resource managers and controllers
type ResourceController struct {
// Manage all visitor listeners
VisitorManager *VisitorManager
// Tcp Group Controller
TcpGroupCtl *group.TcpGroupCtl
// Manage all tcp ports
TcpPortManager *ports.PortManager
// Manage all udp ports
UdpPortManager *ports.PortManager
// For http proxies, forwarding http requests
HttpReverseProxy *vhost.HttpReverseProxy
// For https proxies, route requests to different clients by hostname and other infomation
VhostHttpsMuxer *vhost.HttpsMuxer
// Controller for nat hole connections
NatHoleController *nathole.NatHoleController
}

View File

@ -1,4 +1,4 @@
// Copyright 2017 fatedier, fatedier@gmail.com
// Copyright 2019 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package server
package controller
import (
"fmt"
@ -25,81 +25,6 @@ import (
frpIo "github.com/fatedier/golib/io"
)
type ControlManager struct {
// controls indexed by run id
ctlsByRunId map[string]*Control
mu sync.RWMutex
}
func NewControlManager() *ControlManager {
return &ControlManager{
ctlsByRunId: make(map[string]*Control),
}
}
func (cm *ControlManager) Add(runId string, ctl *Control) (oldCtl *Control) {
cm.mu.Lock()
defer cm.mu.Unlock()
oldCtl, ok := cm.ctlsByRunId[runId]
if ok {
oldCtl.Replaced(ctl)
}
cm.ctlsByRunId[runId] = ctl
return
}
func (cm *ControlManager) Del(runId string) {
cm.mu.Lock()
defer cm.mu.Unlock()
delete(cm.ctlsByRunId, runId)
}
func (cm *ControlManager) GetById(runId string) (ctl *Control, ok bool) {
cm.mu.RLock()
defer cm.mu.RUnlock()
ctl, ok = cm.ctlsByRunId[runId]
return
}
type ProxyManager struct {
// proxies indexed by proxy name
pxys map[string]Proxy
mu sync.RWMutex
}
func NewProxyManager() *ProxyManager {
return &ProxyManager{
pxys: make(map[string]Proxy),
}
}
func (pm *ProxyManager) Add(name string, pxy Proxy) error {
pm.mu.Lock()
defer pm.mu.Unlock()
if _, ok := pm.pxys[name]; ok {
return fmt.Errorf("proxy name [%s] is already in use", name)
}
pm.pxys[name] = pxy
return nil
}
func (pm *ProxyManager) Del(name string) {
pm.mu.Lock()
defer pm.mu.Unlock()
delete(pm.pxys, name)
}
func (pm *ProxyManager) GetByName(name string) (pxy Proxy, ok bool) {
pm.mu.RLock()
defer pm.mu.RUnlock()
pxy, ok = pm.pxys[name]
return
}
// Manager for visitor listeners.
type VisitorManager struct {
visitorListeners map[string]*frpNet.CustomListener

View File

@ -66,7 +66,7 @@ func (svr *Service) ApiServerInfo(w http.ResponseWriter, r *http.Request) {
log.Info("Http request: [%s]", r.URL.Path)
cfg := &g.GlbServerCfg.ServerCommonConf
serverStats := StatsGetServer()
serverStats := svr.statsCollector.GetServer()
res = ServerInfoResp{
Version: version.Full(),
BindPort: cfg.BindPort,
@ -185,11 +185,11 @@ func (svr *Service) ApiProxyByType(w http.ResponseWriter, r *http.Request) {
}
func (svr *Service) getProxyStatsByType(proxyType string) (proxyInfos []*ProxyStatsInfo) {
proxyStats := StatsGetProxiesByType(proxyType)
proxyStats := svr.statsCollector.GetProxiesByType(proxyType)
proxyInfos = make([]*ProxyStatsInfo, 0, len(proxyStats))
for _, ps := range proxyStats {
proxyInfo := &ProxyStatsInfo{}
if pxy, ok := svr.rc.PxyManager.GetByName(ps.Name); ok {
if pxy, ok := svr.pxyManager.GetByName(ps.Name); ok {
content, err := json.Marshal(pxy.GetConf())
if err != nil {
log.Warn("marshal proxy [%s] conf info error: %v", ps.Name, err)
@ -252,12 +252,12 @@ func (svr *Service) ApiProxyByTypeAndName(w http.ResponseWriter, r *http.Request
func (svr *Service) getProxyStatsByTypeAndName(proxyType string, proxyName string) (proxyInfo GetProxyStatsResp) {
proxyInfo.Name = proxyName
ps := StatsGetProxiesByTypeAndName(proxyType, proxyName)
ps := svr.statsCollector.GetProxiesByTypeAndName(proxyType, proxyName)
if ps == nil {
proxyInfo.Code = 1
proxyInfo.Msg = "no proxy info found"
} else {
if pxy, ok := svr.rc.PxyManager.GetByName(proxyName); ok {
if pxy, ok := svr.pxyManager.GetByName(proxyName); ok {
content, err := json.Marshal(pxy.GetConf())
if err != nil {
log.Warn("marshal proxy [%s] conf info error: %v", ps.Name, err)
@ -309,7 +309,7 @@ func (svr *Service) ApiProxyTraffic(w http.ResponseWriter, r *http.Request) {
log.Info("Http request: [%s]", r.URL.Path)
res.Name = name
proxyTrafficInfo := StatsGetProxyTraffic(name)
proxyTrafficInfo := svr.statsCollector.GetProxyTraffic(name)
if proxyTrafficInfo == nil {
res.Code = 1
res.Msg = "no proxy info found"

View File

@ -1,316 +0,0 @@
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"sync"
"time"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/utils/log"
"github.com/fatedier/frp/utils/metric"
)
const (
ReserveDays = 7
)
var globalStats *ServerStatistics
type ServerStatistics struct {
TotalTrafficIn metric.DateCounter
TotalTrafficOut metric.DateCounter
CurConns metric.Counter
// counter for clients
ClientCounts metric.Counter
// counter for proxy types
ProxyTypeCounts map[string]metric.Counter
// statistics for different proxies
// key is proxy name
ProxyStatistics map[string]*ProxyStatistics
mu sync.Mutex
}
type ProxyStatistics struct {
Name string
ProxyType string
TrafficIn metric.DateCounter
TrafficOut metric.DateCounter
CurConns metric.Counter
LastStartTime time.Time
LastCloseTime time.Time
}
func init() {
globalStats = &ServerStatistics{
TotalTrafficIn: metric.NewDateCounter(ReserveDays),
TotalTrafficOut: metric.NewDateCounter(ReserveDays),
CurConns: metric.NewCounter(),
ClientCounts: metric.NewCounter(),
ProxyTypeCounts: make(map[string]metric.Counter),
ProxyStatistics: make(map[string]*ProxyStatistics),
}
go func() {
for {
time.Sleep(12 * time.Hour)
log.Debug("start to clear useless proxy statistics data...")
StatsClearUselessInfo()
log.Debug("finish to clear useless proxy statistics data")
}
}()
}
func StatsClearUselessInfo() {
// To check if there are proxies that closed than 7 days and drop them.
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
for name, data := range globalStats.ProxyStatistics {
if !data.LastCloseTime.IsZero() && time.Since(data.LastCloseTime) > time.Duration(7*24)*time.Hour {
delete(globalStats.ProxyStatistics, name)
log.Trace("clear proxy [%s]'s statistics data, lastCloseTime: [%s]", name, data.LastCloseTime.String())
}
}
}
func StatsNewClient() {
if g.GlbServerCfg.DashboardPort != 0 {
globalStats.ClientCounts.Inc(1)
}
}
func StatsCloseClient() {
if g.GlbServerCfg.DashboardPort != 0 {
globalStats.ClientCounts.Dec(1)
}
}
func StatsNewProxy(name string, proxyType string) {
if g.GlbServerCfg.DashboardPort != 0 {
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
counter, ok := globalStats.ProxyTypeCounts[proxyType]
if !ok {
counter = metric.NewCounter()
}
counter.Inc(1)
globalStats.ProxyTypeCounts[proxyType] = counter
proxyStats, ok := globalStats.ProxyStatistics[name]
if !(ok && proxyStats.ProxyType == proxyType) {
proxyStats = &ProxyStatistics{
Name: name,
ProxyType: proxyType,
CurConns: metric.NewCounter(),
TrafficIn: metric.NewDateCounter(ReserveDays),
TrafficOut: metric.NewDateCounter(ReserveDays),
}
globalStats.ProxyStatistics[name] = proxyStats
}
proxyStats.LastStartTime = time.Now()
}
}
func StatsCloseProxy(proxyName string, proxyType string) {
if g.GlbServerCfg.DashboardPort != 0 {
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
if counter, ok := globalStats.ProxyTypeCounts[proxyType]; ok {
counter.Dec(1)
}
if proxyStats, ok := globalStats.ProxyStatistics[proxyName]; ok {
proxyStats.LastCloseTime = time.Now()
}
}
}
func StatsOpenConnection(name string) {
if g.GlbServerCfg.DashboardPort != 0 {
globalStats.CurConns.Inc(1)
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
proxyStats, ok := globalStats.ProxyStatistics[name]
if ok {
proxyStats.CurConns.Inc(1)
globalStats.ProxyStatistics[name] = proxyStats
}
}
}
func StatsCloseConnection(name string) {
if g.GlbServerCfg.DashboardPort != 0 {
globalStats.CurConns.Dec(1)
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
proxyStats, ok := globalStats.ProxyStatistics[name]
if ok {
proxyStats.CurConns.Dec(1)
globalStats.ProxyStatistics[name] = proxyStats
}
}
}
func StatsAddTrafficIn(name string, trafficIn int64) {
if g.GlbServerCfg.DashboardPort != 0 {
globalStats.TotalTrafficIn.Inc(trafficIn)
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
proxyStats, ok := globalStats.ProxyStatistics[name]
if ok {
proxyStats.TrafficIn.Inc(trafficIn)
globalStats.ProxyStatistics[name] = proxyStats
}
}
}
func StatsAddTrafficOut(name string, trafficOut int64) {
if g.GlbServerCfg.DashboardPort != 0 {
globalStats.TotalTrafficOut.Inc(trafficOut)
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
proxyStats, ok := globalStats.ProxyStatistics[name]
if ok {
proxyStats.TrafficOut.Inc(trafficOut)
globalStats.ProxyStatistics[name] = proxyStats
}
}
}
// Functions for getting server stats.
type ServerStats struct {
TotalTrafficIn int64
TotalTrafficOut int64
CurConns int64
ClientCounts int64
ProxyTypeCounts map[string]int64
}
func StatsGetServer() *ServerStats {
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
s := &ServerStats{
TotalTrafficIn: globalStats.TotalTrafficIn.TodayCount(),
TotalTrafficOut: globalStats.TotalTrafficOut.TodayCount(),
CurConns: globalStats.CurConns.Count(),
ClientCounts: globalStats.ClientCounts.Count(),
ProxyTypeCounts: make(map[string]int64),
}
for k, v := range globalStats.ProxyTypeCounts {
s.ProxyTypeCounts[k] = v.Count()
}
return s
}
type ProxyStats struct {
Name string
Type string
TodayTrafficIn int64
TodayTrafficOut int64
LastStartTime string
LastCloseTime string
CurConns int64
}
func StatsGetProxiesByType(proxyType string) []*ProxyStats {
res := make([]*ProxyStats, 0)
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
for name, proxyStats := range globalStats.ProxyStatistics {
if proxyStats.ProxyType != proxyType {
continue
}
ps := &ProxyStats{
Name: name,
Type: proxyStats.ProxyType,
TodayTrafficIn: proxyStats.TrafficIn.TodayCount(),
TodayTrafficOut: proxyStats.TrafficOut.TodayCount(),
CurConns: proxyStats.CurConns.Count(),
}
if !proxyStats.LastStartTime.IsZero() {
ps.LastStartTime = proxyStats.LastStartTime.Format("01-02 15:04:05")
}
if !proxyStats.LastCloseTime.IsZero() {
ps.LastCloseTime = proxyStats.LastCloseTime.Format("01-02 15:04:05")
}
res = append(res, ps)
}
return res
}
func StatsGetProxiesByTypeAndName(proxyType string, proxyName string) (res *ProxyStats) {
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
for name, proxyStats := range globalStats.ProxyStatistics {
if proxyStats.ProxyType != proxyType {
continue
}
if name != proxyName {
continue
}
res = &ProxyStats{
Name: name,
Type: proxyStats.ProxyType,
TodayTrafficIn: proxyStats.TrafficIn.TodayCount(),
TodayTrafficOut: proxyStats.TrafficOut.TodayCount(),
CurConns: proxyStats.CurConns.Count(),
}
if !proxyStats.LastStartTime.IsZero() {
res.LastStartTime = proxyStats.LastStartTime.Format("01-02 15:04:05")
}
if !proxyStats.LastCloseTime.IsZero() {
res.LastCloseTime = proxyStats.LastCloseTime.Format("01-02 15:04:05")
}
break
}
return
}
type ProxyTrafficInfo struct {
Name string
TrafficIn []int64
TrafficOut []int64
}
func StatsGetProxyTraffic(name string) (res *ProxyTrafficInfo) {
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
proxyStats, ok := globalStats.ProxyStatistics[name]
if ok {
res = &ProxyTrafficInfo{
Name: name,
}
res.TrafficIn = proxyStats.TrafficIn.GetLastDaysCount(ReserveDays)
res.TrafficOut = proxyStats.TrafficOut.GetLastDaysCount(ReserveDays)
}
return
}

View File

@ -1,687 +0,0 @@
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"context"
"fmt"
"io"
"net"
"strings"
"sync"
"time"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/models/proto/udp"
"github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/util"
"github.com/fatedier/frp/utils/vhost"
"github.com/fatedier/golib/errors"
frpIo "github.com/fatedier/golib/io"
)
type GetWorkConnFn func() (frpNet.Conn, error)
type Proxy interface {
Run() (remoteAddr string, err error)
GetName() string
GetConf() config.ProxyConf
GetWorkConnFromPool() (workConn frpNet.Conn, err error)
GetUsedPortsNum() int
Close()
log.Logger
}
type BaseProxy struct {
name string
rc *ResourceController
listeners []frpNet.Listener
usedPortsNum int
poolCount int
getWorkConnFn GetWorkConnFn
mu sync.RWMutex
log.Logger
}
func (pxy *BaseProxy) GetName() string {
return pxy.name
}
func (pxy *BaseProxy) GetUsedPortsNum() int {
return pxy.usedPortsNum
}
func (pxy *BaseProxy) Close() {
pxy.Info("proxy closing")
for _, l := range pxy.listeners {
l.Close()
}
}
func (pxy *BaseProxy) GetWorkConnFromPool() (workConn frpNet.Conn, err error) {
// try all connections from the pool
for i := 0; i < pxy.poolCount+1; i++ {
if workConn, err = pxy.getWorkConnFn(); err != nil {
pxy.Warn("failed to get work connection: %v", err)
return
}
pxy.Info("get a new work connection: [%s]", workConn.RemoteAddr().String())
workConn.AddLogPrefix(pxy.GetName())
err := msg.WriteMsg(workConn, &msg.StartWorkConn{
ProxyName: pxy.GetName(),
})
if err != nil {
workConn.Warn("failed to send message to work connection from pool: %v, times: %d", err, i)
workConn.Close()
} else {
break
}
}
if err != nil {
pxy.Error("try to get work connection failed in the end")
return
}
return
}
// startListenHandler start a goroutine handler for each listener.
// p: p will just be passed to handler(Proxy, frpNet.Conn).
// handler: each proxy type can set different handler function to deal with connections accepted from listeners.
func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, frpNet.Conn)) {
for _, listener := range pxy.listeners {
go func(l frpNet.Listener) {
for {
// block
// if listener is closed, err returned
c, err := l.Accept()
if err != nil {
pxy.Info("listener is closed")
return
}
pxy.Debug("get a user connection [%s]", c.RemoteAddr().String())
go handler(p, c)
}
}(listener)
}
}
func NewProxy(runId string, rc *ResourceController, poolCount int, getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf) (pxy Proxy, err error) {
basePxy := BaseProxy{
name: pxyConf.GetBaseInfo().ProxyName,
rc: rc,
listeners: make([]frpNet.Listener, 0),
poolCount: poolCount,
getWorkConnFn: getWorkConnFn,
Logger: log.NewPrefixLogger(runId),
}
switch cfg := pxyConf.(type) {
case *config.TcpProxyConf:
basePxy.usedPortsNum = 1
pxy = &TcpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.HttpProxyConf:
pxy = &HttpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.HttpsProxyConf:
pxy = &HttpsProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.UdpProxyConf:
basePxy.usedPortsNum = 1
pxy = &UdpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.StcpProxyConf:
pxy = &StcpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.XtcpProxyConf:
pxy = &XtcpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
default:
return pxy, fmt.Errorf("proxy type not support")
}
pxy.AddLogPrefix(pxy.GetName())
return
}
type TcpProxy struct {
BaseProxy
cfg *config.TcpProxyConf
realPort int
}
func (pxy *TcpProxy) Run() (remoteAddr string, err error) {
if pxy.cfg.Group != "" {
l, realPort, errRet := pxy.rc.TcpGroupCtl.Listen(pxy.name, pxy.cfg.Group, pxy.cfg.GroupKey, g.GlbServerCfg.ProxyBindAddr, pxy.cfg.RemotePort)
if errRet != nil {
err = errRet
return
}
defer func() {
if err != nil {
l.Close()
}
}()
pxy.realPort = realPort
listener := frpNet.WrapLogListener(l)
listener.AddLogPrefix(pxy.name)
pxy.listeners = append(pxy.listeners, listener)
pxy.Info("tcp proxy listen port [%d] in group [%s]", pxy.cfg.RemotePort, pxy.cfg.Group)
} else {
pxy.realPort, err = pxy.rc.TcpPortManager.Acquire(pxy.name, pxy.cfg.RemotePort)
if err != nil {
return
}
defer func() {
if err != nil {
pxy.rc.TcpPortManager.Release(pxy.realPort)
}
}()
listener, errRet := frpNet.ListenTcp(g.GlbServerCfg.ProxyBindAddr, pxy.realPort)
if errRet != nil {
err = errRet
return
}
listener.AddLogPrefix(pxy.name)
pxy.listeners = append(pxy.listeners, listener)
pxy.Info("tcp proxy listen port [%d]", pxy.cfg.RemotePort)
}
pxy.cfg.RemotePort = pxy.realPort
remoteAddr = fmt.Sprintf(":%d", pxy.realPort)
pxy.startListenHandler(pxy, HandleUserTcpConnection)
return
}
func (pxy *TcpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *TcpProxy) Close() {
pxy.BaseProxy.Close()
if pxy.cfg.Group == "" {
pxy.rc.TcpPortManager.Release(pxy.realPort)
}
}
type HttpProxy struct {
BaseProxy
cfg *config.HttpProxyConf
closeFuncs []func()
}
func (pxy *HttpProxy) Run() (remoteAddr string, err error) {
routeConfig := vhost.VhostRouteConfig{
RewriteHost: pxy.cfg.HostHeaderRewrite,
Headers: pxy.cfg.Headers,
Username: pxy.cfg.HttpUser,
Password: pxy.cfg.HttpPwd,
CreateConnFn: pxy.GetRealConn,
}
locations := pxy.cfg.Locations
if len(locations) == 0 {
locations = []string{""}
}
addrs := make([]string, 0)
for _, domain := range pxy.cfg.CustomDomains {
routeConfig.Domain = domain
for _, location := range locations {
routeConfig.Location = location
err = pxy.rc.HttpReverseProxy.Register(routeConfig)
if err != nil {
return
}
tmpDomain := routeConfig.Domain
tmpLocation := routeConfig.Location
addrs = append(addrs, util.CanonicalAddr(tmpDomain, int(g.GlbServerCfg.VhostHttpPort)))
pxy.closeFuncs = append(pxy.closeFuncs, func() {
pxy.rc.HttpReverseProxy.UnRegister(tmpDomain, tmpLocation)
})
pxy.Info("http proxy listen for host [%s] location [%s]", routeConfig.Domain, routeConfig.Location)
}
}
if pxy.cfg.SubDomain != "" {
routeConfig.Domain = pxy.cfg.SubDomain + "." + g.GlbServerCfg.SubDomainHost
for _, location := range locations {
routeConfig.Location = location
err = pxy.rc.HttpReverseProxy.Register(routeConfig)
if err != nil {
return
}
tmpDomain := routeConfig.Domain
tmpLocation := routeConfig.Location
addrs = append(addrs, util.CanonicalAddr(tmpDomain, g.GlbServerCfg.VhostHttpPort))
pxy.closeFuncs = append(pxy.closeFuncs, func() {
pxy.rc.HttpReverseProxy.UnRegister(tmpDomain, tmpLocation)
})
pxy.Info("http proxy listen for host [%s] location [%s]", routeConfig.Domain, routeConfig.Location)
}
}
remoteAddr = strings.Join(addrs, ",")
return
}
func (pxy *HttpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *HttpProxy) GetRealConn() (workConn frpNet.Conn, err error) {
tmpConn, errRet := pxy.GetWorkConnFromPool()
if errRet != nil {
err = errRet
return
}
var rwc io.ReadWriteCloser = tmpConn
if pxy.cfg.UseEncryption {
rwc, err = frpIo.WithEncryption(rwc, []byte(g.GlbServerCfg.Token))
if err != nil {
pxy.Error("create encryption stream error: %v", err)
return
}
}
if pxy.cfg.UseCompression {
rwc = frpIo.WithCompression(rwc)
}
workConn = frpNet.WrapReadWriteCloserToConn(rwc, tmpConn)
workConn = frpNet.WrapStatsConn(workConn, pxy.updateStatsAfterClosedConn)
StatsOpenConnection(pxy.GetName())
return
}
func (pxy *HttpProxy) updateStatsAfterClosedConn(totalRead, totalWrite int64) {
name := pxy.GetName()
StatsCloseConnection(name)
StatsAddTrafficIn(name, totalWrite)
StatsAddTrafficOut(name, totalRead)
}
func (pxy *HttpProxy) Close() {
pxy.BaseProxy.Close()
for _, closeFn := range pxy.closeFuncs {
closeFn()
}
}
type HttpsProxy struct {
BaseProxy
cfg *config.HttpsProxyConf
}
func (pxy *HttpsProxy) Run() (remoteAddr string, err error) {
routeConfig := &vhost.VhostRouteConfig{}
addrs := make([]string, 0)
for _, domain := range pxy.cfg.CustomDomains {
routeConfig.Domain = domain
l, errRet := pxy.rc.VhostHttpsMuxer.Listen(routeConfig)
if errRet != nil {
err = errRet
return
}
l.AddLogPrefix(pxy.name)
pxy.Info("https proxy listen for host [%s]", routeConfig.Domain)
pxy.listeners = append(pxy.listeners, l)
addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, g.GlbServerCfg.VhostHttpsPort))
}
if pxy.cfg.SubDomain != "" {
routeConfig.Domain = pxy.cfg.SubDomain + "." + g.GlbServerCfg.SubDomainHost
l, errRet := pxy.rc.VhostHttpsMuxer.Listen(routeConfig)
if errRet != nil {
err = errRet
return
}
l.AddLogPrefix(pxy.name)
pxy.Info("https proxy listen for host [%s]", routeConfig.Domain)
pxy.listeners = append(pxy.listeners, l)
addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, int(g.GlbServerCfg.VhostHttpsPort)))
}
pxy.startListenHandler(pxy, HandleUserTcpConnection)
remoteAddr = strings.Join(addrs, ",")
return
}
func (pxy *HttpsProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *HttpsProxy) Close() {
pxy.BaseProxy.Close()
}
type StcpProxy struct {
BaseProxy
cfg *config.StcpProxyConf
}
func (pxy *StcpProxy) Run() (remoteAddr string, err error) {
listener, errRet := pxy.rc.VisitorManager.Listen(pxy.GetName(), pxy.cfg.Sk)
if errRet != nil {
err = errRet
return
}
listener.AddLogPrefix(pxy.name)
pxy.listeners = append(pxy.listeners, listener)
pxy.Info("stcp proxy custom listen success")
pxy.startListenHandler(pxy, HandleUserTcpConnection)
return
}
func (pxy *StcpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *StcpProxy) Close() {
pxy.BaseProxy.Close()
pxy.rc.VisitorManager.CloseListener(pxy.GetName())
}
type XtcpProxy struct {
BaseProxy
cfg *config.XtcpProxyConf
closeCh chan struct{}
}
func (pxy *XtcpProxy) Run() (remoteAddr string, err error) {
if pxy.rc.NatHoleController == nil {
pxy.Error("udp port for xtcp is not specified.")
err = fmt.Errorf("xtcp is not supported in frps")
return
}
sidCh := pxy.rc.NatHoleController.ListenClient(pxy.GetName(), pxy.cfg.Sk)
go func() {
for {
select {
case <-pxy.closeCh:
break
case sid := <-sidCh:
workConn, errRet := pxy.GetWorkConnFromPool()
if errRet != nil {
continue
}
m := &msg.NatHoleSid{
Sid: sid,
}
errRet = msg.WriteMsg(workConn, m)
if errRet != nil {
pxy.Warn("write nat hole sid package error, %v", errRet)
}
}
}
}()
return
}
func (pxy *XtcpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *XtcpProxy) Close() {
pxy.BaseProxy.Close()
pxy.rc.NatHoleController.CloseClient(pxy.GetName())
errors.PanicToError(func() {
close(pxy.closeCh)
})
}
type UdpProxy struct {
BaseProxy
cfg *config.UdpProxyConf
realPort int
// udpConn is the listener of udp packages
udpConn *net.UDPConn
// there are always only one workConn at the same time
// get another one if it closed
workConn net.Conn
// sendCh is used for sending packages to workConn
sendCh chan *msg.UdpPacket
// readCh is used for reading packages from workConn
readCh chan *msg.UdpPacket
// checkCloseCh is used for watching if workConn is closed
checkCloseCh chan int
isClosed bool
}
func (pxy *UdpProxy) Run() (remoteAddr string, err error) {
pxy.realPort, err = pxy.rc.UdpPortManager.Acquire(pxy.name, pxy.cfg.RemotePort)
if err != nil {
return
}
defer func() {
if err != nil {
pxy.rc.UdpPortManager.Release(pxy.realPort)
}
}()
remoteAddr = fmt.Sprintf(":%d", pxy.realPort)
pxy.cfg.RemotePort = pxy.realPort
addr, errRet := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", g.GlbServerCfg.ProxyBindAddr, pxy.realPort))
if errRet != nil {
err = errRet
return
}
udpConn, errRet := net.ListenUDP("udp", addr)
if errRet != nil {
err = errRet
pxy.Warn("listen udp port error: %v", err)
return
}
pxy.Info("udp proxy listen port [%d]", pxy.cfg.RemotePort)
pxy.udpConn = udpConn
pxy.sendCh = make(chan *msg.UdpPacket, 1024)
pxy.readCh = make(chan *msg.UdpPacket, 1024)
pxy.checkCloseCh = make(chan int)
// read message from workConn, if it returns any error, notify proxy to start a new workConn
workConnReaderFn := func(conn net.Conn) {
for {
var (
rawMsg msg.Message
errRet error
)
pxy.Trace("loop waiting message from udp workConn")
// client will send heartbeat in workConn for keeping alive
conn.SetReadDeadline(time.Now().Add(time.Duration(60) * time.Second))
if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil {
pxy.Warn("read from workConn for udp error: %v", errRet)
conn.Close()
// notify proxy to start a new work connection
// ignore error here, it means the proxy is closed
errors.PanicToError(func() {
pxy.checkCloseCh <- 1
})
return
}
conn.SetReadDeadline(time.Time{})
switch m := rawMsg.(type) {
case *msg.Ping:
pxy.Trace("udp work conn get ping message")
continue
case *msg.UdpPacket:
if errRet := errors.PanicToError(func() {
pxy.Trace("get udp message from workConn: %s", m.Content)
pxy.readCh <- m
StatsAddTrafficOut(pxy.GetName(), int64(len(m.Content)))
}); errRet != nil {
conn.Close()
pxy.Info("reader goroutine for udp work connection closed")
return
}
}
}
}
// send message to workConn
workConnSenderFn := func(conn net.Conn, ctx context.Context) {
var errRet error
for {
select {
case udpMsg, ok := <-pxy.sendCh:
if !ok {
pxy.Info("sender goroutine for udp work connection closed")
return
}
if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil {
pxy.Info("sender goroutine for udp work connection closed: %v", errRet)
conn.Close()
return
} else {
pxy.Trace("send message to udp workConn: %s", udpMsg.Content)
StatsAddTrafficIn(pxy.GetName(), int64(len(udpMsg.Content)))
continue
}
case <-ctx.Done():
pxy.Info("sender goroutine for udp work connection closed")
return
}
}
}
go func() {
// Sleep a while for waiting control send the NewProxyResp to client.
time.Sleep(500 * time.Millisecond)
for {
workConn, err := pxy.GetWorkConnFromPool()
if err != nil {
time.Sleep(1 * time.Second)
// check if proxy is closed
select {
case _, ok := <-pxy.checkCloseCh:
if !ok {
return
}
default:
}
continue
}
// close the old workConn and replac it with a new one
if pxy.workConn != nil {
pxy.workConn.Close()
}
pxy.workConn = workConn
ctx, cancel := context.WithCancel(context.Background())
go workConnReaderFn(workConn)
go workConnSenderFn(workConn, ctx)
_, ok := <-pxy.checkCloseCh
cancel()
if !ok {
return
}
}
}()
// Read from user connections and send wrapped udp message to sendCh (forwarded by workConn).
// Client will transfor udp message to local udp service and waiting for response for a while.
// Response will be wrapped to be forwarded by work connection to server.
// Close readCh and sendCh at the end.
go func() {
udp.ForwardUserConn(udpConn, pxy.readCh, pxy.sendCh)
pxy.Close()
}()
return remoteAddr, nil
}
func (pxy *UdpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *UdpProxy) Close() {
pxy.mu.Lock()
defer pxy.mu.Unlock()
if !pxy.isClosed {
pxy.isClosed = true
pxy.BaseProxy.Close()
if pxy.workConn != nil {
pxy.workConn.Close()
}
pxy.udpConn.Close()
// all channels only closed here
close(pxy.checkCloseCh)
close(pxy.readCh)
close(pxy.sendCh)
}
pxy.rc.UdpPortManager.Release(pxy.realPort)
}
// HandleUserTcpConnection is used for incoming tcp user connections.
// It can be used for tcp, http, https type.
func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn) {
defer userConn.Close()
// try all connections from the pool
workConn, err := pxy.GetWorkConnFromPool()
if err != nil {
return
}
defer workConn.Close()
var local io.ReadWriteCloser = workConn
cfg := pxy.GetConf().GetBaseInfo()
if cfg.UseEncryption {
local, err = frpIo.WithEncryption(local, []byte(g.GlbServerCfg.Token))
if err != nil {
pxy.Error("create encryption stream error: %v", err)
return
}
}
if cfg.UseCompression {
local = frpIo.WithCompression(local)
}
pxy.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
StatsOpenConnection(pxy.GetName())
inCount, outCount := frpIo.Join(local, userConn)
StatsCloseConnection(pxy.GetName())
StatsAddTrafficIn(pxy.GetName(), inCount)
StatsAddTrafficOut(pxy.GetName(), outCount)
pxy.Debug("join connections closed")
}

138
server/proxy/http.go Normal file
View File

@ -0,0 +1,138 @@
// Copyright 2019 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"io"
"strings"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/server/stats"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/util"
"github.com/fatedier/frp/utils/vhost"
frpIo "github.com/fatedier/golib/io"
)
type HttpProxy struct {
BaseProxy
cfg *config.HttpProxyConf
closeFuncs []func()
}
func (pxy *HttpProxy) Run() (remoteAddr string, err error) {
routeConfig := vhost.VhostRouteConfig{
RewriteHost: pxy.cfg.HostHeaderRewrite,
Headers: pxy.cfg.Headers,
Username: pxy.cfg.HttpUser,
Password: pxy.cfg.HttpPwd,
CreateConnFn: pxy.GetRealConn,
}
locations := pxy.cfg.Locations
if len(locations) == 0 {
locations = []string{""}
}
addrs := make([]string, 0)
for _, domain := range pxy.cfg.CustomDomains {
routeConfig.Domain = domain
for _, location := range locations {
routeConfig.Location = location
err = pxy.rc.HttpReverseProxy.Register(routeConfig)
if err != nil {
return
}
tmpDomain := routeConfig.Domain
tmpLocation := routeConfig.Location
addrs = append(addrs, util.CanonicalAddr(tmpDomain, int(g.GlbServerCfg.VhostHttpPort)))
pxy.closeFuncs = append(pxy.closeFuncs, func() {
pxy.rc.HttpReverseProxy.UnRegister(tmpDomain, tmpLocation)
})
pxy.Info("http proxy listen for host [%s] location [%s]", routeConfig.Domain, routeConfig.Location)
}
}
if pxy.cfg.SubDomain != "" {
routeConfig.Domain = pxy.cfg.SubDomain + "." + g.GlbServerCfg.SubDomainHost
for _, location := range locations {
routeConfig.Location = location
err = pxy.rc.HttpReverseProxy.Register(routeConfig)
if err != nil {
return
}
tmpDomain := routeConfig.Domain
tmpLocation := routeConfig.Location
addrs = append(addrs, util.CanonicalAddr(tmpDomain, g.GlbServerCfg.VhostHttpPort))
pxy.closeFuncs = append(pxy.closeFuncs, func() {
pxy.rc.HttpReverseProxy.UnRegister(tmpDomain, tmpLocation)
})
pxy.Info("http proxy listen for host [%s] location [%s]", routeConfig.Domain, routeConfig.Location)
}
}
remoteAddr = strings.Join(addrs, ",")
return
}
func (pxy *HttpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *HttpProxy) GetRealConn() (workConn frpNet.Conn, err error) {
tmpConn, errRet := pxy.GetWorkConnFromPool()
if errRet != nil {
err = errRet
return
}
var rwc io.ReadWriteCloser = tmpConn
if pxy.cfg.UseEncryption {
rwc, err = frpIo.WithEncryption(rwc, []byte(g.GlbServerCfg.Token))
if err != nil {
pxy.Error("create encryption stream error: %v", err)
return
}
}
if pxy.cfg.UseCompression {
rwc = frpIo.WithCompression(rwc)
}
workConn = frpNet.WrapReadWriteCloserToConn(rwc, tmpConn)
workConn = frpNet.WrapStatsConn(workConn, pxy.updateStatsAfterClosedConn)
pxy.statsCollector.Mark(stats.TypeOpenConnection, &stats.OpenConnectionPayload{ProxyName: pxy.GetName()})
return
}
func (pxy *HttpProxy) updateStatsAfterClosedConn(totalRead, totalWrite int64) {
name := pxy.GetName()
pxy.statsCollector.Mark(stats.TypeCloseProxy, &stats.CloseConnectionPayload{ProxyName: name})
pxy.statsCollector.Mark(stats.TypeAddTrafficIn, &stats.AddTrafficInPayload{
ProxyName: name,
TrafficBytes: totalWrite,
})
pxy.statsCollector.Mark(stats.TypeAddTrafficOut, &stats.AddTrafficOutPayload{
ProxyName: name,
TrafficBytes: totalRead,
})
}
func (pxy *HttpProxy) Close() {
pxy.BaseProxy.Close()
for _, closeFn := range pxy.closeFuncs {
closeFn()
}
}

72
server/proxy/https.go Normal file
View File

@ -0,0 +1,72 @@
// Copyright 2019 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"strings"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/utils/util"
"github.com/fatedier/frp/utils/vhost"
)
type HttpsProxy struct {
BaseProxy
cfg *config.HttpsProxyConf
}
func (pxy *HttpsProxy) Run() (remoteAddr string, err error) {
routeConfig := &vhost.VhostRouteConfig{}
addrs := make([]string, 0)
for _, domain := range pxy.cfg.CustomDomains {
routeConfig.Domain = domain
l, errRet := pxy.rc.VhostHttpsMuxer.Listen(routeConfig)
if errRet != nil {
err = errRet
return
}
l.AddLogPrefix(pxy.name)
pxy.Info("https proxy listen for host [%s]", routeConfig.Domain)
pxy.listeners = append(pxy.listeners, l)
addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, g.GlbServerCfg.VhostHttpsPort))
}
if pxy.cfg.SubDomain != "" {
routeConfig.Domain = pxy.cfg.SubDomain + "." + g.GlbServerCfg.SubDomainHost
l, errRet := pxy.rc.VhostHttpsMuxer.Listen(routeConfig)
if errRet != nil {
err = errRet
return
}
l.AddLogPrefix(pxy.name)
pxy.Info("https proxy listen for host [%s]", routeConfig.Domain)
pxy.listeners = append(pxy.listeners, l)
addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, int(g.GlbServerCfg.VhostHttpsPort)))
}
pxy.startListenHandler(pxy, HandleUserTcpConnection)
remoteAddr = strings.Join(addrs, ",")
return
}
func (pxy *HttpsProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *HttpsProxy) Close() {
pxy.BaseProxy.Close()
}

250
server/proxy/proxy.go Normal file
View File

@ -0,0 +1,250 @@
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"fmt"
"io"
"sync"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/server/controller"
"github.com/fatedier/frp/server/stats"
"github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net"
frpIo "github.com/fatedier/golib/io"
)
type GetWorkConnFn func() (frpNet.Conn, error)
type Proxy interface {
Run() (remoteAddr string, err error)
GetName() string
GetConf() config.ProxyConf
GetWorkConnFromPool() (workConn frpNet.Conn, err error)
GetUsedPortsNum() int
Close()
log.Logger
}
type BaseProxy struct {
name string
rc *controller.ResourceController
statsCollector stats.Collector
listeners []frpNet.Listener
usedPortsNum int
poolCount int
getWorkConnFn GetWorkConnFn
mu sync.RWMutex
log.Logger
}
func (pxy *BaseProxy) GetName() string {
return pxy.name
}
func (pxy *BaseProxy) GetUsedPortsNum() int {
return pxy.usedPortsNum
}
func (pxy *BaseProxy) Close() {
pxy.Info("proxy closing")
for _, l := range pxy.listeners {
l.Close()
}
}
func (pxy *BaseProxy) GetWorkConnFromPool() (workConn frpNet.Conn, err error) {
// try all connections from the pool
for i := 0; i < pxy.poolCount+1; i++ {
if workConn, err = pxy.getWorkConnFn(); err != nil {
pxy.Warn("failed to get work connection: %v", err)
return
}
pxy.Info("get a new work connection: [%s]", workConn.RemoteAddr().String())
workConn.AddLogPrefix(pxy.GetName())
err := msg.WriteMsg(workConn, &msg.StartWorkConn{
ProxyName: pxy.GetName(),
})
if err != nil {
workConn.Warn("failed to send message to work connection from pool: %v, times: %d", err, i)
workConn.Close()
} else {
break
}
}
if err != nil {
pxy.Error("try to get work connection failed in the end")
return
}
return
}
// startListenHandler start a goroutine handler for each listener.
// p: p will just be passed to handler(Proxy, frpNet.Conn).
// handler: each proxy type can set different handler function to deal with connections accepted from listeners.
func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, frpNet.Conn, stats.Collector)) {
for _, listener := range pxy.listeners {
go func(l frpNet.Listener) {
for {
// block
// if listener is closed, err returned
c, err := l.Accept()
if err != nil {
pxy.Info("listener is closed")
return
}
pxy.Debug("get a user connection [%s]", c.RemoteAddr().String())
go handler(p, c, pxy.statsCollector)
}
}(listener)
}
}
func NewProxy(runId string, rc *controller.ResourceController, statsCollector stats.Collector, poolCount int,
getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf) (pxy Proxy, err error) {
basePxy := BaseProxy{
name: pxyConf.GetBaseInfo().ProxyName,
rc: rc,
statsCollector: statsCollector,
listeners: make([]frpNet.Listener, 0),
poolCount: poolCount,
getWorkConnFn: getWorkConnFn,
Logger: log.NewPrefixLogger(runId),
}
switch cfg := pxyConf.(type) {
case *config.TcpProxyConf:
basePxy.usedPortsNum = 1
pxy = &TcpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.HttpProxyConf:
pxy = &HttpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.HttpsProxyConf:
pxy = &HttpsProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.UdpProxyConf:
basePxy.usedPortsNum = 1
pxy = &UdpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.StcpProxyConf:
pxy = &StcpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.XtcpProxyConf:
pxy = &XtcpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
default:
return pxy, fmt.Errorf("proxy type not support")
}
pxy.AddLogPrefix(pxy.GetName())
return
}
// HandleUserTcpConnection is used for incoming tcp user connections.
// It can be used for tcp, http, https type.
func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn, statsCollector stats.Collector) {
defer userConn.Close()
// try all connections from the pool
workConn, err := pxy.GetWorkConnFromPool()
if err != nil {
return
}
defer workConn.Close()
var local io.ReadWriteCloser = workConn
cfg := pxy.GetConf().GetBaseInfo()
if cfg.UseEncryption {
local, err = frpIo.WithEncryption(local, []byte(g.GlbServerCfg.Token))
if err != nil {
pxy.Error("create encryption stream error: %v", err)
return
}
}
if cfg.UseCompression {
local = frpIo.WithCompression(local)
}
pxy.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
statsCollector.Mark(stats.TypeOpenConnection, &stats.OpenConnectionPayload{ProxyName: pxy.GetName()})
inCount, outCount := frpIo.Join(local, userConn)
statsCollector.Mark(stats.TypeCloseConnection, &stats.CloseConnectionPayload{ProxyName: pxy.GetName()})
statsCollector.Mark(stats.TypeAddTrafficIn, &stats.AddTrafficInPayload{
ProxyName: pxy.GetName(),
TrafficBytes: inCount,
})
statsCollector.Mark(stats.TypeAddTrafficOut, &stats.AddTrafficOutPayload{
ProxyName: pxy.GetName(),
TrafficBytes: outCount,
})
pxy.Debug("join connections closed")
}
type ProxyManager struct {
// proxies indexed by proxy name
pxys map[string]Proxy
mu sync.RWMutex
}
func NewProxyManager() *ProxyManager {
return &ProxyManager{
pxys: make(map[string]Proxy),
}
}
func (pm *ProxyManager) Add(name string, pxy Proxy) error {
pm.mu.Lock()
defer pm.mu.Unlock()
if _, ok := pm.pxys[name]; ok {
return fmt.Errorf("proxy name [%s] is already in use", name)
}
pm.pxys[name] = pxy
return nil
}
func (pm *ProxyManager) Del(name string) {
pm.mu.Lock()
defer pm.mu.Unlock()
delete(pm.pxys, name)
}
func (pm *ProxyManager) GetByName(name string) (pxy Proxy, ok bool) {
pm.mu.RLock()
defer pm.mu.RUnlock()
pxy, ok = pm.pxys[name]
return
}

47
server/proxy/stcp.go Normal file
View File

@ -0,0 +1,47 @@
// Copyright 2019 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"github.com/fatedier/frp/models/config"
)
type StcpProxy struct {
BaseProxy
cfg *config.StcpProxyConf
}
func (pxy *StcpProxy) Run() (remoteAddr string, err error) {
listener, errRet := pxy.rc.VisitorManager.Listen(pxy.GetName(), pxy.cfg.Sk)
if errRet != nil {
err = errRet
return
}
listener.AddLogPrefix(pxy.name)
pxy.listeners = append(pxy.listeners, listener)
pxy.Info("stcp proxy custom listen success")
pxy.startListenHandler(pxy, HandleUserTcpConnection)
return
}
func (pxy *StcpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *StcpProxy) Close() {
pxy.BaseProxy.Close()
pxy.rc.VisitorManager.CloseListener(pxy.GetName())
}

84
server/proxy/tcp.go Normal file
View File

@ -0,0 +1,84 @@
// Copyright 2019 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"fmt"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/config"
frpNet "github.com/fatedier/frp/utils/net"
)
type TcpProxy struct {
BaseProxy
cfg *config.TcpProxyConf
realPort int
}
func (pxy *TcpProxy) Run() (remoteAddr string, err error) {
if pxy.cfg.Group != "" {
l, realPort, errRet := pxy.rc.TcpGroupCtl.Listen(pxy.name, pxy.cfg.Group, pxy.cfg.GroupKey, g.GlbServerCfg.ProxyBindAddr, pxy.cfg.RemotePort)
if errRet != nil {
err = errRet
return
}
defer func() {
if err != nil {
l.Close()
}
}()
pxy.realPort = realPort
listener := frpNet.WrapLogListener(l)
listener.AddLogPrefix(pxy.name)
pxy.listeners = append(pxy.listeners, listener)
pxy.Info("tcp proxy listen port [%d] in group [%s]", pxy.cfg.RemotePort, pxy.cfg.Group)
} else {
pxy.realPort, err = pxy.rc.TcpPortManager.Acquire(pxy.name, pxy.cfg.RemotePort)
if err != nil {
return
}
defer func() {
if err != nil {
pxy.rc.TcpPortManager.Release(pxy.realPort)
}
}()
listener, errRet := frpNet.ListenTcp(g.GlbServerCfg.ProxyBindAddr, pxy.realPort)
if errRet != nil {
err = errRet
return
}
listener.AddLogPrefix(pxy.name)
pxy.listeners = append(pxy.listeners, listener)
pxy.Info("tcp proxy listen port [%d]", pxy.cfg.RemotePort)
}
pxy.cfg.RemotePort = pxy.realPort
remoteAddr = fmt.Sprintf(":%d", pxy.realPort)
pxy.startListenHandler(pxy, HandleUserTcpConnection)
return
}
func (pxy *TcpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *TcpProxy) Close() {
pxy.BaseProxy.Close()
if pxy.cfg.Group == "" {
pxy.rc.TcpPortManager.Release(pxy.realPort)
}
}

225
server/proxy/udp.go Normal file
View File

@ -0,0 +1,225 @@
// Copyright 2019 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"context"
"fmt"
"net"
"time"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/models/proto/udp"
"github.com/fatedier/frp/server/stats"
"github.com/fatedier/golib/errors"
)
type UdpProxy struct {
BaseProxy
cfg *config.UdpProxyConf
realPort int
// udpConn is the listener of udp packages
udpConn *net.UDPConn
// there are always only one workConn at the same time
// get another one if it closed
workConn net.Conn
// sendCh is used for sending packages to workConn
sendCh chan *msg.UdpPacket
// readCh is used for reading packages from workConn
readCh chan *msg.UdpPacket
// checkCloseCh is used for watching if workConn is closed
checkCloseCh chan int
isClosed bool
}
func (pxy *UdpProxy) Run() (remoteAddr string, err error) {
pxy.realPort, err = pxy.rc.UdpPortManager.Acquire(pxy.name, pxy.cfg.RemotePort)
if err != nil {
return
}
defer func() {
if err != nil {
pxy.rc.UdpPortManager.Release(pxy.realPort)
}
}()
remoteAddr = fmt.Sprintf(":%d", pxy.realPort)
pxy.cfg.RemotePort = pxy.realPort
addr, errRet := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", g.GlbServerCfg.ProxyBindAddr, pxy.realPort))
if errRet != nil {
err = errRet
return
}
udpConn, errRet := net.ListenUDP("udp", addr)
if errRet != nil {
err = errRet
pxy.Warn("listen udp port error: %v", err)
return
}
pxy.Info("udp proxy listen port [%d]", pxy.cfg.RemotePort)
pxy.udpConn = udpConn
pxy.sendCh = make(chan *msg.UdpPacket, 1024)
pxy.readCh = make(chan *msg.UdpPacket, 1024)
pxy.checkCloseCh = make(chan int)
// read message from workConn, if it returns any error, notify proxy to start a new workConn
workConnReaderFn := func(conn net.Conn) {
for {
var (
rawMsg msg.Message
errRet error
)
pxy.Trace("loop waiting message from udp workConn")
// client will send heartbeat in workConn for keeping alive
conn.SetReadDeadline(time.Now().Add(time.Duration(60) * time.Second))
if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil {
pxy.Warn("read from workConn for udp error: %v", errRet)
conn.Close()
// notify proxy to start a new work connection
// ignore error here, it means the proxy is closed
errors.PanicToError(func() {
pxy.checkCloseCh <- 1
})
return
}
conn.SetReadDeadline(time.Time{})
switch m := rawMsg.(type) {
case *msg.Ping:
pxy.Trace("udp work conn get ping message")
continue
case *msg.UdpPacket:
if errRet := errors.PanicToError(func() {
pxy.Trace("get udp message from workConn: %s", m.Content)
pxy.readCh <- m
pxy.statsCollector.Mark(stats.TypeAddTrafficOut, &stats.AddTrafficOutPayload{
ProxyName: pxy.GetName(),
TrafficBytes: int64(len(m.Content)),
})
}); errRet != nil {
conn.Close()
pxy.Info("reader goroutine for udp work connection closed")
return
}
}
}
}
// send message to workConn
workConnSenderFn := func(conn net.Conn, ctx context.Context) {
var errRet error
for {
select {
case udpMsg, ok := <-pxy.sendCh:
if !ok {
pxy.Info("sender goroutine for udp work connection closed")
return
}
if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil {
pxy.Info("sender goroutine for udp work connection closed: %v", errRet)
conn.Close()
return
} else {
pxy.Trace("send message to udp workConn: %s", udpMsg.Content)
pxy.statsCollector.Mark(stats.TypeAddTrafficIn, &stats.AddTrafficInPayload{
ProxyName: pxy.GetName(),
TrafficBytes: int64(len(udpMsg.Content)),
})
continue
}
case <-ctx.Done():
pxy.Info("sender goroutine for udp work connection closed")
return
}
}
}
go func() {
// Sleep a while for waiting control send the NewProxyResp to client.
time.Sleep(500 * time.Millisecond)
for {
workConn, err := pxy.GetWorkConnFromPool()
if err != nil {
time.Sleep(1 * time.Second)
// check if proxy is closed
select {
case _, ok := <-pxy.checkCloseCh:
if !ok {
return
}
default:
}
continue
}
// close the old workConn and replac it with a new one
if pxy.workConn != nil {
pxy.workConn.Close()
}
pxy.workConn = workConn
ctx, cancel := context.WithCancel(context.Background())
go workConnReaderFn(workConn)
go workConnSenderFn(workConn, ctx)
_, ok := <-pxy.checkCloseCh
cancel()
if !ok {
return
}
}
}()
// Read from user connections and send wrapped udp message to sendCh (forwarded by workConn).
// Client will transfor udp message to local udp service and waiting for response for a while.
// Response will be wrapped to be forwarded by work connection to server.
// Close readCh and sendCh at the end.
go func() {
udp.ForwardUserConn(udpConn, pxy.readCh, pxy.sendCh)
pxy.Close()
}()
return remoteAddr, nil
}
func (pxy *UdpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *UdpProxy) Close() {
pxy.mu.Lock()
defer pxy.mu.Unlock()
if !pxy.isClosed {
pxy.isClosed = true
pxy.BaseProxy.Close()
if pxy.workConn != nil {
pxy.workConn.Close()
}
pxy.udpConn.Close()
// all channels only closed here
close(pxy.checkCloseCh)
close(pxy.readCh)
close(pxy.sendCh)
}
pxy.rc.UdpPortManager.Release(pxy.realPort)
}

73
server/proxy/xtcp.go Normal file
View File

@ -0,0 +1,73 @@
// Copyright 2019 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"fmt"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/golib/errors"
)
type XtcpProxy struct {
BaseProxy
cfg *config.XtcpProxyConf
closeCh chan struct{}
}
func (pxy *XtcpProxy) Run() (remoteAddr string, err error) {
if pxy.rc.NatHoleController == nil {
pxy.Error("udp port for xtcp is not specified.")
err = fmt.Errorf("xtcp is not supported in frps")
return
}
sidCh := pxy.rc.NatHoleController.ListenClient(pxy.GetName(), pxy.cfg.Sk)
go func() {
for {
select {
case <-pxy.closeCh:
break
case sid := <-sidCh:
workConn, errRet := pxy.GetWorkConnFromPool()
if errRet != nil {
continue
}
m := &msg.NatHoleSid{
Sid: sid,
}
errRet = msg.WriteMsg(workConn, m)
if errRet != nil {
pxy.Warn("write nat hole sid package error, %v", errRet)
}
}
}
}()
return
}
func (pxy *XtcpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *XtcpProxy) Close() {
pxy.BaseProxy.Close()
pxy.rc.NatHoleController.CloseClient(pxy.GetName())
errors.PanicToError(func() {
close(pxy.closeCh)
})
}

View File

@ -25,8 +25,12 @@ import (
"github.com/fatedier/frp/assets"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/models/nathole"
"github.com/fatedier/frp/server/controller"
"github.com/fatedier/frp/server/group"
"github.com/fatedier/frp/server/ports"
"github.com/fatedier/frp/server/proxy"
"github.com/fatedier/frp/server/stats"
"github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/util"
@ -43,36 +47,6 @@ const (
var ServerService *Service
// All resource managers and controllers
type ResourceController struct {
// Manage all controllers
CtlManager *ControlManager
// Manage all proxies
PxyManager *ProxyManager
// Manage all visitor listeners
VisitorManager *VisitorManager
// Tcp Group Controller
TcpGroupCtl *group.TcpGroupCtl
// Manage all tcp ports
TcpPortManager *ports.PortManager
// Manage all udp ports
UdpPortManager *ports.PortManager
// For http proxies, forwarding http requests
HttpReverseProxy *vhost.HttpReverseProxy
// For https proxies, route requests to different clients by hostname and other infomation
VhostHttpsMuxer *vhost.HttpsMuxer
// Controller for nat hole connections
NatHoleController *NatHoleController
}
// Server service
type Service struct {
// Dispatch connections to different handlers listen on same port
@ -87,21 +61,32 @@ type Service struct {
// Accept connections using websocket
websocketListener frpNet.Listener
// Manage all controllers
ctlManager *ControlManager
// Manage all proxies
pxyManager *proxy.ProxyManager
// All resource managers and controllers
rc *ResourceController
rc *controller.ResourceController
// stats collector to store server and proxies stats info
statsCollector stats.Collector
}
func NewService() (svr *Service, err error) {
cfg := &g.GlbServerCfg.ServerCommonConf
svr = &Service{
rc: &ResourceController{
CtlManager: NewControlManager(),
PxyManager: NewProxyManager(),
VisitorManager: NewVisitorManager(),
ctlManager: NewControlManager(),
pxyManager: proxy.NewProxyManager(),
rc: &controller.ResourceController{
VisitorManager: controller.NewVisitorManager(),
TcpPortManager: ports.NewPortManager("tcp", cfg.ProxyBindAddr, cfg.AllowPorts),
UdpPortManager: ports.NewPortManager("udp", cfg.ProxyBindAddr, cfg.AllowPorts),
},
}
// Init group controller
svr.rc.TcpGroupCtl = group.NewTcpGroupCtl(svr.rc.TcpPortManager)
// Init assets.
@ -204,9 +189,9 @@ func NewService() (svr *Service, err error) {
// Create nat hole controller.
if cfg.BindUdpPort > 0 {
var nc *NatHoleController
var nc *nathole.NatHoleController
addr := fmt.Sprintf("%s:%d", cfg.BindAddr, cfg.BindUdpPort)
nc, err = NewNatHoleController(addr)
nc, err = nathole.NewNatHoleController(addr)
if err != nil {
err = fmt.Errorf("Create nat hole controller error, %v", err)
return
@ -215,6 +200,7 @@ func NewService() (svr *Service, err error) {
log.Info("nat hole udp service listen on %s:%d", cfg.BindAddr, cfg.BindUdpPort)
}
var statsEnable bool
// Create dashboard web server.
if cfg.DashboardPort > 0 {
err = svr.RunDashboardServer(cfg.DashboardAddr, cfg.DashboardPort)
@ -223,8 +209,10 @@ func NewService() (svr *Service, err error) {
return
}
log.Info("Dashboard listen on %s:%d", cfg.DashboardAddr, cfg.DashboardPort)
statsEnable = true
}
svr.statsCollector = stats.NewInternalCollector(statsEnable)
return
}
@ -355,9 +343,9 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e
}
}
ctl := NewControl(svr.rc, ctlConn, loginMsg)
ctl := NewControl(svr.rc, svr.pxyManager, svr.statsCollector, ctlConn, loginMsg)
if oldCtl := svr.rc.CtlManager.Add(loginMsg.RunId, ctl); oldCtl != nil {
if oldCtl := svr.ctlManager.Add(loginMsg.RunId, ctl); oldCtl != nil {
oldCtl.allShutdown.WaitDone()
}
@ -365,13 +353,19 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e
ctl.Start()
// for statistics
StatsNewClient()
svr.statsCollector.Mark(stats.TypeNewClient, &stats.NewClientPayload{})
go func() {
// block until control closed
ctl.WaitClosed()
svr.ctlManager.Del(loginMsg.RunId)
}()
return
}
// RegisterWorkConn register a new work connection to control and proxies need it.
func (svr *Service) RegisterWorkConn(workConn frpNet.Conn, newMsg *msg.NewWorkConn) {
ctl, exist := svr.rc.CtlManager.GetById(newMsg.RunId)
ctl, exist := svr.ctlManager.GetById(newMsg.RunId)
if !exist {
workConn.Warn("No client control found for run id [%s]", newMsg.RunId)
return

273
server/stats/internal.go Normal file
View File

@ -0,0 +1,273 @@
// Copyright 2019 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stats
import (
"sync"
"time"
"github.com/fatedier/frp/utils/log"
"github.com/fatedier/frp/utils/metric"
)
type internalCollector struct {
enable bool
info *ServerStatistics
mu sync.Mutex
}
func NewInternalCollector(enable bool) Collector {
return &internalCollector{
enable: enable,
info: &ServerStatistics{
TotalTrafficIn: metric.NewDateCounter(ReserveDays),
TotalTrafficOut: metric.NewDateCounter(ReserveDays),
CurConns: metric.NewCounter(),
ClientCounts: metric.NewCounter(),
ProxyTypeCounts: make(map[string]metric.Counter),
ProxyStatistics: make(map[string]*ProxyStatistics),
},
}
}
func (collector *internalCollector) Run() error {
go func() {
for {
time.Sleep(12 * time.Hour)
log.Debug("start to clear useless proxy statistics data...")
collector.ClearUselessInfo()
log.Debug("finish to clear useless proxy statistics data")
}
}()
return nil
}
func (collector *internalCollector) ClearUselessInfo() {
// To check if there are proxies that closed than 7 days and drop them.
collector.mu.Lock()
defer collector.mu.Unlock()
for name, data := range collector.info.ProxyStatistics {
if !data.LastCloseTime.IsZero() && time.Since(data.LastCloseTime) > time.Duration(7*24)*time.Hour {
delete(collector.info.ProxyStatistics, name)
log.Trace("clear proxy [%s]'s statistics data, lastCloseTime: [%s]", name, data.LastCloseTime.String())
}
}
}
func (collector *internalCollector) Mark(statsType StatsType, payload interface{}) {
if !collector.enable {
return
}
switch v := payload.(type) {
case *NewClientPayload:
collector.newClient(v)
case *CloseClientPayload:
collector.closeClient(v)
case *OpenConnectionPayload:
collector.openConnection(v)
case *CloseConnectionPayload:
collector.closeConnection(v)
case *AddTrafficInPayload:
collector.addTrafficIn(v)
case *AddTrafficOutPayload:
collector.addTrafficOut(v)
}
}
func (collector *internalCollector) newClient(payload *NewClientPayload) {
collector.info.ClientCounts.Inc(1)
}
func (collector *internalCollector) closeClient(payload *CloseClientPayload) {
collector.info.ClientCounts.Dec(1)
}
func (collector *internalCollector) newProxy(payload *NewProxyPayload) {
collector.mu.Lock()
defer collector.mu.Unlock()
counter, ok := collector.info.ProxyTypeCounts[payload.ProxyType]
if !ok {
counter = metric.NewCounter()
}
counter.Inc(1)
collector.info.ProxyTypeCounts[payload.ProxyType] = counter
proxyStats, ok := collector.info.ProxyStatistics[payload.Name]
if !(ok && proxyStats.ProxyType == payload.ProxyType) {
proxyStats = &ProxyStatistics{
Name: payload.Name,
ProxyType: payload.ProxyType,
CurConns: metric.NewCounter(),
TrafficIn: metric.NewDateCounter(ReserveDays),
TrafficOut: metric.NewDateCounter(ReserveDays),
}
collector.info.ProxyStatistics[payload.Name] = proxyStats
}
proxyStats.LastStartTime = time.Now()
}
func (collector *internalCollector) closeProxy(payload *CloseProxyPayload) {
collector.mu.Lock()
defer collector.mu.Unlock()
if counter, ok := collector.info.ProxyTypeCounts[payload.ProxyType]; ok {
counter.Dec(1)
}
if proxyStats, ok := collector.info.ProxyStatistics[payload.Name]; ok {
proxyStats.LastCloseTime = time.Now()
}
}
func (collector *internalCollector) openConnection(payload *OpenConnectionPayload) {
collector.info.CurConns.Inc(1)
collector.mu.Lock()
defer collector.mu.Unlock()
proxyStats, ok := collector.info.ProxyStatistics[payload.ProxyName]
if ok {
proxyStats.CurConns.Inc(1)
collector.info.ProxyStatistics[payload.ProxyName] = proxyStats
}
}
func (collector *internalCollector) closeConnection(payload *CloseConnectionPayload) {
collector.info.CurConns.Dec(1)
collector.mu.Lock()
defer collector.mu.Unlock()
proxyStats, ok := collector.info.ProxyStatistics[payload.ProxyName]
if ok {
proxyStats.CurConns.Dec(1)
collector.info.ProxyStatistics[payload.ProxyName] = proxyStats
}
}
func (collector *internalCollector) addTrafficIn(payload *AddTrafficInPayload) {
collector.info.TotalTrafficIn.Inc(payload.TrafficBytes)
collector.mu.Lock()
defer collector.mu.Unlock()
proxyStats, ok := collector.info.ProxyStatistics[payload.ProxyName]
if ok {
proxyStats.TrafficIn.Inc(payload.TrafficBytes)
collector.info.ProxyStatistics[payload.ProxyName] = proxyStats
}
}
func (collector *internalCollector) addTrafficOut(payload *AddTrafficOutPayload) {
collector.info.TotalTrafficOut.Inc(payload.TrafficBytes)
collector.mu.Lock()
defer collector.mu.Unlock()
proxyStats, ok := collector.info.ProxyStatistics[payload.ProxyName]
if ok {
proxyStats.TrafficOut.Inc(payload.TrafficBytes)
collector.info.ProxyStatistics[payload.ProxyName] = proxyStats
}
}
func (collector *internalCollector) GetServer() *ServerStats {
collector.mu.Lock()
defer collector.mu.Unlock()
s := &ServerStats{
TotalTrafficIn: collector.info.TotalTrafficIn.TodayCount(),
TotalTrafficOut: collector.info.TotalTrafficOut.TodayCount(),
CurConns: collector.info.CurConns.Count(),
ClientCounts: collector.info.ClientCounts.Count(),
ProxyTypeCounts: make(map[string]int64),
}
for k, v := range collector.info.ProxyTypeCounts {
s.ProxyTypeCounts[k] = v.Count()
}
return s
}
func (collector *internalCollector) GetProxiesByType(proxyType string) []*ProxyStats {
res := make([]*ProxyStats, 0)
collector.mu.Lock()
defer collector.mu.Unlock()
for name, proxyStats := range collector.info.ProxyStatistics {
if proxyStats.ProxyType != proxyType {
continue
}
ps := &ProxyStats{
Name: name,
Type: proxyStats.ProxyType,
TodayTrafficIn: proxyStats.TrafficIn.TodayCount(),
TodayTrafficOut: proxyStats.TrafficOut.TodayCount(),
CurConns: proxyStats.CurConns.Count(),
}
if !proxyStats.LastStartTime.IsZero() {
ps.LastStartTime = proxyStats.LastStartTime.Format("01-02 15:04:05")
}
if !proxyStats.LastCloseTime.IsZero() {
ps.LastCloseTime = proxyStats.LastCloseTime.Format("01-02 15:04:05")
}
res = append(res, ps)
}
return res
}
func (collector *internalCollector) GetProxiesByTypeAndName(proxyType string, proxyName string) (res *ProxyStats) {
collector.mu.Lock()
defer collector.mu.Unlock()
for name, proxyStats := range collector.info.ProxyStatistics {
if proxyStats.ProxyType != proxyType {
continue
}
if name != proxyName {
continue
}
res = &ProxyStats{
Name: name,
Type: proxyStats.ProxyType,
TodayTrafficIn: proxyStats.TrafficIn.TodayCount(),
TodayTrafficOut: proxyStats.TrafficOut.TodayCount(),
CurConns: proxyStats.CurConns.Count(),
}
if !proxyStats.LastStartTime.IsZero() {
res.LastStartTime = proxyStats.LastStartTime.Format("01-02 15:04:05")
}
if !proxyStats.LastCloseTime.IsZero() {
res.LastCloseTime = proxyStats.LastCloseTime.Format("01-02 15:04:05")
}
break
}
return
}
func (collector *internalCollector) GetProxyTraffic(name string) (res *ProxyTrafficInfo) {
collector.mu.Lock()
defer collector.mu.Unlock()
proxyStats, ok := collector.info.ProxyStatistics[name]
if ok {
res = &ProxyTrafficInfo{
Name: name,
}
res.TrafficIn = proxyStats.TrafficIn.GetLastDaysCount(ReserveDays)
res.TrafficOut = proxyStats.TrafficOut.GetLastDaysCount(ReserveDays)
}
return
}

129
server/stats/stats.go Normal file
View File

@ -0,0 +1,129 @@
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stats
import (
"time"
"github.com/fatedier/frp/utils/metric"
)
const (
ReserveDays = 7
)
type StatsType int
const (
TypeNewClient StatsType = iota
TypeCloseClient
TypeNewProxy
TypeCloseProxy
TypeOpenConnection
TypeCloseConnection
TypeAddTrafficIn
TypeAddTrafficOut
)
type ServerStats struct {
TotalTrafficIn int64
TotalTrafficOut int64
CurConns int64
ClientCounts int64
ProxyTypeCounts map[string]int64
}
type ProxyStats struct {
Name string
Type string
TodayTrafficIn int64
TodayTrafficOut int64
LastStartTime string
LastCloseTime string
CurConns int64
}
type ProxyTrafficInfo struct {
Name string
TrafficIn []int64
TrafficOut []int64
}
type ProxyStatistics struct {
Name string
ProxyType string
TrafficIn metric.DateCounter
TrafficOut metric.DateCounter
CurConns metric.Counter
LastStartTime time.Time
LastCloseTime time.Time
}
type ServerStatistics struct {
TotalTrafficIn metric.DateCounter
TotalTrafficOut metric.DateCounter
CurConns metric.Counter
// counter for clients
ClientCounts metric.Counter
// counter for proxy types
ProxyTypeCounts map[string]metric.Counter
// statistics for different proxies
// key is proxy name
ProxyStatistics map[string]*ProxyStatistics
}
type Collector interface {
Mark(statsType StatsType, payload interface{})
Run() error
GetServer() *ServerStats
GetProxiesByType(proxyType string) []*ProxyStats
GetProxiesByTypeAndName(proxyType string, proxyName string) *ProxyStats
GetProxyTraffic(name string) *ProxyTrafficInfo
}
type NewClientPayload struct{}
type CloseClientPayload struct{}
type NewProxyPayload struct {
Name string
ProxyType string
}
type CloseProxyPayload struct {
Name string
ProxyType string
}
type OpenConnectionPayload struct {
ProxyName string
}
type CloseConnectionPayload struct {
ProxyName string
}
type AddTrafficInPayload struct {
ProxyName string
TrafficBytes int64
}
type AddTrafficOutPayload struct {
ProxyName string
TrafficBytes int64
}