From ac09ba3982c2094dba135e17ea9b499a061a0e75 Mon Sep 17 00:00:00 2001 From: fatedier Date: Sun, 17 Jul 2016 21:42:21 +0800 Subject: [PATCH] all: add /api/proxies api for statistics info --- src/frp/models/client/client.go | 3 +- src/frp/models/consts/consts.go | 8 + src/frp/models/metric/server.go | 201 +++++++++++++++++++++++++ src/frp/models/msg/process.go | 46 +++++- src/frp/models/server/config.go | 7 + src/frp/models/server/dashboard.go | 1 + src/frp/models/server/dashboard_api.go | 22 +++ src/frp/models/server/server.go | 8 +- 8 files changed, 289 insertions(+), 7 deletions(-) create mode 100644 src/frp/models/metric/server.go diff --git a/src/frp/models/client/client.go b/src/frp/models/client/client.go index bdd42793..aa7b8788 100644 --- a/src/frp/models/client/client.go +++ b/src/frp/models/client/client.go @@ -96,7 +96,8 @@ func (p *ProxyClient) StartTunnel(serverAddr string, serverPort int64) (err erro // l means local, r means remote log.Debug("Join two connections, (l[%s] r[%s]) (l[%s] r[%s])", localConn.GetLocalAddr(), localConn.GetRemoteAddr(), remoteConn.GetLocalAddr(), remoteConn.GetRemoteAddr()) - go msg.JoinMore(localConn, remoteConn, p.BaseConf) + needRecord := false + go msg.JoinMore(localConn, remoteConn, p.BaseConf, needRecord) return nil } diff --git a/src/frp/models/consts/consts.go b/src/frp/models/consts/consts.go index 7bb09443..90ca9036 100644 --- a/src/frp/models/consts/consts.go +++ b/src/frp/models/consts/consts.go @@ -21,6 +21,14 @@ const ( Closed ) +var ( + StatusStr = []string{ + "idle", + "working", + "closed", + } +) + // msg type const ( NewCtlConn = iota diff --git a/src/frp/models/metric/server.go b/src/frp/models/metric/server.go new file mode 100644 index 00000000..51d0e522 --- /dev/null +++ b/src/frp/models/metric/server.go @@ -0,0 +1,201 @@ +// Copyright 2016 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "encoding/json" + "sync" + "time" + + "frp/models/consts" +) + +var ( + DailyDataKeepDays int = 7 + ServerMetricInfoMap map[string]*ServerMetric + smMutex sync.RWMutex +) + +type ServerMetric struct { + Name string `json:"name"` + Type string `json:"type"` + BindAddr string `json:"bind_addr"` + ListenPort int64 `json:"listen_port"` + CustomDomains []string `json:"custom_domains"` + Status string `json:"status"` + UseEncryption bool `json:"use_encryption"` + UseGzip bool `json:"use_gzip"` + PrivilegeMode bool `json:"privilege_mode"` + + // statistics + CurrentConns int64 `json:"current_conns"` + Daily []*DailyServerStats `json:"daily"` + mutex sync.RWMutex +} + +type DailyServerStats struct { + Time string `json:"time"` + FlowIn int64 `json:"flow_in"` + FlowOut int64 `json:"flow_out"` + TotalAcceptConns int64 `json:"total_accept_conns"` +} + +func init() { + ServerMetricInfoMap = make(map[string]*ServerMetric) +} + +func GetAllProxyMetrics() map[string]*ServerMetric { + result := make(map[string]*ServerMetric) + smMutex.RLock() + defer smMutex.RUnlock() + for proxyName, metric := range ServerMetricInfoMap { + metric.mutex.RLock() + byteBuf, _ := json.Marshal(metric) + metric.mutex.RUnlock() + tmpMetric := &ServerMetric{} + json.Unmarshal(byteBuf, &tmpMetric) + result[proxyName] = tmpMetric + } + return result +} + +// if proxyName isn't exist, return nil +func GetProxyMetrics(proxyName string) *ServerMetric { + smMutex.RLock() + defer smMutex.RUnlock() + metric, ok := ServerMetricInfoMap[proxyName] + if ok { + byteBuf, _ := json.Marshal(metric) + tmpMetric := &ServerMetric{} + json.Unmarshal(byteBuf, &tmpMetric) + return tmpMetric + } else { + return nil + } +} + +func SetProxyInfo(proxyName string, proxyType, bindAddr string, + useEncryption, useGzip, privilegeMode bool, customDomains []string, + listenPort int64) { + smMutex.Lock() + info, ok := ServerMetricInfoMap[proxyName] + if !ok { + info = &ServerMetric{} + info.Daily = make([]*DailyServerStats, 0) + } + info.Name = proxyName + info.Type = proxyType + info.UseEncryption = useEncryption + info.UseGzip = useGzip + info.PrivilegeMode = privilegeMode + info.BindAddr = bindAddr + info.ListenPort = listenPort + info.CustomDomains = customDomains + ServerMetricInfoMap[proxyName] = info + smMutex.Unlock() +} + +func SetStatus(proxyName string, status int64) { + smMutex.RLock() + metric, ok := ServerMetricInfoMap[proxyName] + smMutex.RUnlock() + if ok { + metric.mutex.Lock() + metric.Status = consts.StatusStr[status] + metric.mutex.Unlock() + } +} + +type DealFuncType func(*DailyServerStats) + +func DealDailyData(dailyData []*DailyServerStats, fn DealFuncType) (newDailyData []*DailyServerStats) { + now := time.Now().Format("20060102") + dailyLen := len(dailyData) + if dailyLen == 0 { + daily := &DailyServerStats{} + daily.Time = now + fn(daily) + dailyData = append(dailyData, daily) + } else { + daily := dailyData[dailyLen-1] + if daily.Time == now { + fn(daily) + } else { + newDaily := &DailyServerStats{} + newDaily.Time = now + fn(newDaily) + if dailyLen == DailyDataKeepDays { + for i := 0; i < dailyLen-1; i++ { + dailyData[i] = dailyData[i+1] + } + dailyData[dailyLen-1] = newDaily + } else { + dailyData = append(dailyData, newDaily) + } + } + } + return dailyData +} + +func OpenConnection(proxyName string) { + smMutex.RLock() + metric, ok := ServerMetricInfoMap[proxyName] + smMutex.RUnlock() + if ok { + metric.mutex.Lock() + metric.CurrentConns++ + metric.Daily = DealDailyData(metric.Daily, func(stats *DailyServerStats) { + stats.TotalAcceptConns++ + }) + metric.mutex.Unlock() + } +} + +func CloseConnection(proxyName string) { + smMutex.RLock() + metric, ok := ServerMetricInfoMap[proxyName] + smMutex.RUnlock() + if ok { + metric.mutex.Lock() + metric.CurrentConns-- + metric.mutex.Unlock() + } +} + +func AddFlowIn(proxyName string, value int64) { + smMutex.RLock() + metric, ok := ServerMetricInfoMap[proxyName] + smMutex.RUnlock() + if ok { + metric.mutex.Lock() + metric.Daily = DealDailyData(metric.Daily, func(stats *DailyServerStats) { + stats.FlowIn += value + }) + metric.mutex.Unlock() + } +} + +func AddFlowOut(proxyName string, value int64) { + smMutex.RLock() + metric, ok := ServerMetricInfoMap[proxyName] + smMutex.RUnlock() + if ok { + metric.mutex.Lock() + metric.Daily = DealDailyData(metric.Daily, func(stats *DailyServerStats) { + stats.FlowOut += value + }) + metric.mutex.Unlock() + } +} diff --git a/src/frp/models/msg/process.go b/src/frp/models/msg/process.go index cfc782bc..4c7783bd 100644 --- a/src/frp/models/msg/process.go +++ b/src/frp/models/msg/process.go @@ -24,6 +24,7 @@ import ( "sync" "frp/models/config" + "frp/models/metric" "frp/utils/conn" "frp/utils/log" "frp/utils/pcrypto" @@ -52,7 +53,7 @@ func Join(c1 *conn.Conn, c2 *conn.Conn) { } // join two connections and do some operations -func JoinMore(c1 *conn.Conn, c2 *conn.Conn, conf config.BaseConf) { +func JoinMore(c1 *conn.Conn, c2 *conn.Conn, conf config.BaseConf, needRecord bool) { var wait sync.WaitGroup encryptPipe := func(from *conn.Conn, to *conn.Conn) { defer from.Close() @@ -60,7 +61,7 @@ func JoinMore(c1 *conn.Conn, c2 *conn.Conn, conf config.BaseConf) { defer wait.Done() // we don't care about errors here - pipeEncrypt(from.TcpConn, to.TcpConn, conf) + pipeEncrypt(from.TcpConn, to.TcpConn, conf, needRecord) } decryptPipe := func(to *conn.Conn, from *conn.Conn) { @@ -69,13 +70,16 @@ func JoinMore(c1 *conn.Conn, c2 *conn.Conn, conf config.BaseConf) { defer wait.Done() // we don't care about errors here - pipeDecrypt(to.TcpConn, from.TcpConn, conf) + pipeDecrypt(to.TcpConn, from.TcpConn, conf, needRecord) } wait.Add(2) go encryptPipe(c1, c2) go decryptPipe(c2, c1) wait.Wait() + if needRecord { + metric.CloseConnection(conf.Name) + } log.Debug("ProxyName [%s], One tunnel stopped", conf.Name) return } @@ -102,7 +106,7 @@ func unpkgMsg(data []byte) (int, []byte, []byte) { } // decrypt msg from reader, then write into writer -func pipeDecrypt(r net.Conn, w net.Conn, conf config.BaseConf) (err error) { +func pipeDecrypt(r net.Conn, w net.Conn, conf config.BaseConf, needRecord bool) (err error) { laes := new(pcrypto.Pcrypto) key := conf.AuthToken if conf.PrivilegeMode { @@ -116,6 +120,15 @@ func pipeDecrypt(r net.Conn, w net.Conn, conf config.BaseConf) (err error) { buf := make([]byte, 5*1024+4) var left, res []byte var cnt int + + // record + var flowBytes int64 = 0 + if needRecord { + defer func() { + metric.AddFlowOut(conf.Name, flowBytes) + }() + } + nreader := bufio.NewReader(r) for { // there may be more than 1 package in variable @@ -156,12 +169,20 @@ func pipeDecrypt(r net.Conn, w net.Conn, conf config.BaseConf) (err error) { if err != nil { return err } + + if needRecord { + flowBytes += int64(len(res)) + if flowBytes >= 1024*1024 { + metric.AddFlowOut(conf.Name, flowBytes) + flowBytes = 0 + } + } } return nil } // recvive msg from reader, then encrypt msg into writer -func pipeEncrypt(r net.Conn, w net.Conn, conf config.BaseConf) (err error) { +func pipeEncrypt(r net.Conn, w net.Conn, conf config.BaseConf, needRecord bool) (err error) { laes := new(pcrypto.Pcrypto) key := conf.AuthToken if conf.PrivilegeMode { @@ -172,6 +193,14 @@ func pipeEncrypt(r net.Conn, w net.Conn, conf config.BaseConf) (err error) { return fmt.Errorf("Pcrypto Init error: %v", err) } + // record + var flowBytes int64 = 0 + if needRecord { + defer func() { + metric.AddFlowIn(conf.Name, flowBytes) + }() + } + nreader := bufio.NewReader(r) buf := make([]byte, 5*1024) for { @@ -179,6 +208,13 @@ func pipeEncrypt(r net.Conn, w net.Conn, conf config.BaseConf) (err error) { if err != nil { return err } + if needRecord { + flowBytes += int64(n) + if flowBytes >= 1024*1024 { + metric.AddFlowIn(conf.Name, flowBytes) + flowBytes = 0 + } + } res := buf[0:n] // gzip diff --git a/src/frp/models/server/config.go b/src/frp/models/server/config.go index 1d49c661..de96c112 100644 --- a/src/frp/models/server/config.go +++ b/src/frp/models/server/config.go @@ -23,6 +23,7 @@ import ( ini "github.com/vaughan0/go-ini" "frp/models/consts" + "frp/models/metric" "frp/utils/log" "frp/utils/vhost" ) @@ -232,6 +233,12 @@ func loadProxyConf(confFile string) (proxyServers map[string]*ProxyServer, err e proxyServers[proxyServer.Name] = proxyServer } } + + // set metric statistics of all proxies + for name, p := range proxyServers { + metric.SetProxyInfo(name, p.Type, p.BindAddr, p.UseEncryption, p.UseGzip, + p.PrivilegeMode, p.CustomDomains, p.ListenPort) + } return proxyServers, nil } diff --git a/src/frp/models/server/dashboard.go b/src/frp/models/server/dashboard.go index 5ff20f94..5fc5fd55 100644 --- a/src/frp/models/server/dashboard.go +++ b/src/frp/models/server/dashboard.go @@ -30,6 +30,7 @@ func RunDashboardServer(addr string, port int64) (err error) { router := gin.New() //router.LoadHTMLGlob("assets/*") router.GET("/api/reload", apiReload) + router.GET("/api/proxies", apiProxies) go router.Run(fmt.Sprintf("%s:%d", addr, port)) return } diff --git a/src/frp/models/server/dashboard_api.go b/src/frp/models/server/dashboard_api.go index 85a32742..7a2f70bc 100644 --- a/src/frp/models/server/dashboard_api.go +++ b/src/frp/models/server/dashboard_api.go @@ -20,6 +20,7 @@ import ( "github.com/gin-gonic/gin" + "frp/models/metric" "frp/utils/log" ) @@ -44,3 +45,24 @@ func apiReload(c *gin.Context) { } c.JSON(200, res) } + +type ProxiesResponse struct { + Code int64 `json:"code"` + Msg string `json:"msg"` + Proxies []*metric.ServerMetric `json:"proxies"` +} + +func apiProxies(c *gin.Context) { + res := &ProxiesResponse{} + res.Proxies = make([]*metric.ServerMetric, 0) + defer func() { + log.Info("Http response [/api/proxies]: code [%d]", res.Code) + }() + + log.Info("Http request: [/api/proxies]") + serverMetricMap := metric.GetAllProxyMetrics() + for _, metric := range serverMetricMap { + res.Proxies = append(res.Proxies, metric) + } + c.JSON(200, res) +} diff --git a/src/frp/models/server/server.go b/src/frp/models/server/server.go index b15f5cfa..139c9899 100644 --- a/src/frp/models/server/server.go +++ b/src/frp/models/server/server.go @@ -21,6 +21,7 @@ import ( "frp/models/config" "frp/models/consts" + "frp/models/metric" "frp/models/msg" "frp/utils/conn" "frp/utils/log" @@ -69,6 +70,7 @@ func NewProxyServerFromCtlMsg(req *msg.ControlReq) (p *ProxyServer) { func (p *ProxyServer) Init() { p.Lock() p.Status = consts.Idle + metric.SetStatus(p.Name, p.Status) p.workConnChan = make(chan *conn.Conn, 100) p.ctlMsgChan = make(chan int64) p.listeners = make([]Listener, 0) @@ -130,6 +132,7 @@ func (p *ProxyServer) Start(c *conn.Conn) (err error) { p.Lock() p.Status = consts.Working p.Unlock() + metric.SetStatus(p.Name, p.Status) // start a goroutine for every listener to accept user connection for _, listener := range p.listeners { @@ -163,7 +166,9 @@ func (p *ProxyServer) Start(c *conn.Conn) (err error) { log.Debug("Join two connections, (l[%s] r[%s]) (l[%s] r[%s])", workConn.GetLocalAddr(), workConn.GetRemoteAddr(), userConn.GetLocalAddr(), userConn.GetRemoteAddr()) - go msg.JoinMore(userConn, workConn, p.BaseConf) + needRecord := true + go msg.JoinMore(userConn, workConn, p.BaseConf, needRecord) + metric.OpenConnection(p.Name) }() } }(listener) @@ -186,6 +191,7 @@ func (p *ProxyServer) Close() { p.CtlConn.Close() } } + metric.SetStatus(p.Name, p.Status) // if the proxy created by PrivilegeMode, delete it when closed if p.PrivilegeMode { DeleteProxy(p.Name)