From 584e098e8e90c665dfa32ec1317734ab27fef5a4 Mon Sep 17 00:00:00 2001 From: fatedier Date: Wed, 17 Jan 2018 01:09:33 +0800 Subject: [PATCH] frpc: add status command --- client/admin.go | 1 + client/admin_api.go | 125 ++++++++++++- client/control.go | 348 +++++++++---------------------------- client/proxy.go | 5 +- client/proxy_manager.go | 340 ++++++++++++++++++++++++++++++++++++ client/service.go | 4 +- cmd/frpc/main.go | 176 +++++++++++++++---- models/config/proxy.go | 5 + server/control.go | 6 +- server/service.go | 2 +- utils/shutdown/shutdown.go | 28 +-- 11 files changed, 722 insertions(+), 318 deletions(-) create mode 100644 client/proxy_manager.go diff --git a/client/admin.go b/client/admin.go index f728483e..37cdf4c1 100644 --- a/client/admin.go +++ b/client/admin.go @@ -39,6 +39,7 @@ func (svr *Service) RunAdminServer(addr string, port int64) (err error) { // api, see dashboard_api.go router.GET("/api/reload", frpNet.HttprouterBasicAuth(svr.apiReload, user, passwd)) + router.GET("/api/status", frpNet.HttprouterBasicAuth(svr.apiStatus, user, passwd)) address := fmt.Sprintf("%s:%d", addr, port) server := &http.Server{ diff --git a/client/admin_api.go b/client/admin_api.go index 70842e65..fae1737e 100644 --- a/client/admin_api.go +++ b/client/admin_api.go @@ -16,7 +16,10 @@ package client import ( "encoding/json" + "fmt" "net/http" + "sort" + "strings" "github.com/julienschmidt/httprouter" ini "github.com/vaughan0/go-ini" @@ -72,7 +75,127 @@ func (svr *Service) apiReload(w http.ResponseWriter, r *http.Request, _ httprout return } - svr.ctl.reloadConf(pxyCfgs, visitorCfgs) + err = svr.ctl.reloadConf(pxyCfgs, visitorCfgs) + if err != nil { + res.Code = 4 + res.Msg = err.Error() + log.Error("reload frpc proxy config error: %v", err) + return + } log.Info("success reload conf") return } + +type StatusResp struct { + Tcp []ProxyStatusResp `json:"tcp"` + Udp []ProxyStatusResp `json:"udp"` + Http []ProxyStatusResp `json:"http"` + Https []ProxyStatusResp `json:"https"` + Stcp []ProxyStatusResp `json:"stcp"` + Xtcp []ProxyStatusResp `json:"xtcp"` +} + +type ProxyStatusResp struct { + Name string `json:"name"` + Type string `json:"type"` + Status string `json:"status"` + Err string `json:"err"` + LocalAddr string `json:"local_addr"` + Plugin string `json:"plugin"` + RemoteAddr string `json:"remote_addr"` +} + +type ByProxyStatusResp []ProxyStatusResp + +func (a ByProxyStatusResp) Len() int { return len(a) } +func (a ByProxyStatusResp) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a ByProxyStatusResp) Less(i, j int) bool { return strings.Compare(a[i].Name, a[j].Name) < 0 } + +func NewProxyStatusResp(status *ProxyStatus) ProxyStatusResp { + psr := ProxyStatusResp{ + Name: status.Name, + Type: status.Type, + Status: status.Status, + Err: status.Err, + } + switch cfg := status.Cfg.(type) { + case *config.TcpProxyConf: + if cfg.LocalPort != 0 { + psr.LocalAddr = fmt.Sprintf("%s:%d", cfg.LocalIp, cfg.LocalPort) + } + psr.Plugin = cfg.Plugin + psr.RemoteAddr = fmt.Sprintf(":%d", cfg.RemotePort) + case *config.UdpProxyConf: + if cfg.LocalPort != 0 { + psr.LocalAddr = fmt.Sprintf("%s:%d", cfg.LocalIp, cfg.LocalPort) + } + psr.RemoteAddr = fmt.Sprintf(":%d", cfg.RemotePort) + case *config.HttpProxyConf: + if cfg.LocalPort != 0 { + psr.LocalAddr = fmt.Sprintf("%s:%d", cfg.LocalIp, cfg.LocalPort) + } + psr.Plugin = cfg.Plugin + case *config.HttpsProxyConf: + if cfg.LocalPort != 0 { + psr.LocalAddr = fmt.Sprintf("%s:%d", cfg.LocalIp, cfg.LocalPort) + } + psr.Plugin = cfg.Plugin + case *config.StcpProxyConf: + if cfg.LocalPort != 0 { + psr.LocalAddr = fmt.Sprintf("%s:%d", cfg.LocalIp, cfg.LocalPort) + } + psr.Plugin = cfg.Plugin + case *config.XtcpProxyConf: + if cfg.LocalPort != 0 { + psr.LocalAddr = fmt.Sprintf("%s:%d", cfg.LocalIp, cfg.LocalPort) + } + psr.Plugin = cfg.Plugin + } + return psr +} + +// api/status +func (svr *Service) apiStatus(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + var ( + buf []byte + res StatusResp + ) + res.Tcp = make([]ProxyStatusResp, 0) + res.Udp = make([]ProxyStatusResp, 0) + res.Http = make([]ProxyStatusResp, 0) + res.Https = make([]ProxyStatusResp, 0) + res.Stcp = make([]ProxyStatusResp, 0) + res.Xtcp = make([]ProxyStatusResp, 0) + defer func() { + log.Info("Http response [/api/status]") + buf, _ = json.Marshal(&res) + w.Write(buf) + }() + + log.Info("Http request: [/api/status]") + + ps := svr.ctl.pm.GetAllProxyStatus() + for _, status := range ps { + switch status.Type { + case "tcp": + res.Tcp = append(res.Tcp, NewProxyStatusResp(status)) + case "udp": + res.Udp = append(res.Udp, NewProxyStatusResp(status)) + case "http": + res.Http = append(res.Http, NewProxyStatusResp(status)) + case "https": + res.Https = append(res.Https, NewProxyStatusResp(status)) + case "stcp": + res.Stcp = append(res.Stcp, NewProxyStatusResp(status)) + case "xtcp": + res.Xtcp = append(res.Xtcp, NewProxyStatusResp(status)) + } + } + sort.Sort(ByProxyStatusResp(res.Tcp)) + sort.Sort(ByProxyStatusResp(res.Udp)) + sort.Sort(ByProxyStatusResp(res.Http)) + sort.Sort(ByProxyStatusResp(res.Https)) + sort.Sort(ByProxyStatusResp(res.Stcp)) + sort.Sort(ByProxyStatusResp(res.Xtcp)) + return +} diff --git a/client/control.go b/client/control.go index 788621cd..65bec393 100644 --- a/client/control.go +++ b/client/control.go @@ -24,9 +24,9 @@ import ( "github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/msg" "github.com/fatedier/frp/utils/crypto" - "github.com/fatedier/frp/utils/errors" "github.com/fatedier/frp/utils/log" frpNet "github.com/fatedier/frp/utils/net" + "github.com/fatedier/frp/utils/shutdown" "github.com/fatedier/frp/utils/util" "github.com/fatedier/frp/utils/version" "github.com/xtaci/smux" @@ -40,20 +40,10 @@ type Control struct { // frpc service svr *Service - // login message to server + // login message to server, only used loginMsg *msg.Login - // proxy configures - pxyCfgs map[string]config.ProxyConf - - // proxies - proxies map[string]Proxy - - // visitor configures - visitorCfgs map[string]config.ProxyConf - - // visitors - visitors map[string]Visitor + pm *ProxyManager // control connection conn frpNet.Conn @@ -79,6 +69,10 @@ type Control struct { // last time got the Pong message lastPong time.Time + readerShutdown *shutdown.Shutdown + writerShutdown *shutdown.Shutdown + msgHandlerShutdown *shutdown.Shutdown + mu sync.RWMutex log.Logger @@ -92,28 +86,22 @@ func NewControl(svr *Service, pxyCfgs map[string]config.ProxyConf, visitorCfgs m User: config.ClientCommonCfg.User, Version: version.Full(), } - return &Control{ - svr: svr, - loginMsg: loginMsg, - pxyCfgs: pxyCfgs, - visitorCfgs: visitorCfgs, - proxies: make(map[string]Proxy), - visitors: make(map[string]Visitor), - sendCh: make(chan msg.Message, 10), - readCh: make(chan msg.Message, 10), - closedCh: make(chan int), - Logger: log.NewPrefixLogger(""), + ctl := &Control{ + svr: svr, + loginMsg: loginMsg, + sendCh: make(chan msg.Message, 10), + readCh: make(chan msg.Message, 10), + closedCh: make(chan int), + readerShutdown: shutdown.New(), + writerShutdown: shutdown.New(), + msgHandlerShutdown: shutdown.New(), + Logger: log.NewPrefixLogger(""), } + ctl.pm = NewProxyManager(ctl, ctl.sendCh, "") + ctl.pm.Reload(pxyCfgs, visitorCfgs) + return ctl } -// 1. login -// 2. start reader() writer() manager() -// 3. connection closed -// 4. In reader(): close closedCh and exit, controler() get it -// 5. In controler(): close readCh and sendCh, manager() and writer() will exit -// 6. In controler(): ini readCh, sendCh, closedCh -// 7. In controler(): start new reader(), writer(), manager() -// controler() will keep running func (ctl *Control) Run() (err error) { for { err = ctl.login() @@ -125,47 +113,29 @@ func (ctl *Control) Run() (err error) { if config.ClientCommonCfg.LoginFailExit { return } else { - time.Sleep(30 * time.Second) + time.Sleep(10 * time.Second) } } else { break } } - go ctl.controler() - go ctl.manager() - go ctl.writer() - go ctl.reader() + go ctl.worker() - // start all local visitors - for _, cfg := range ctl.visitorCfgs { - visitor := NewVisitor(ctl, cfg) - err = visitor.Run() - if err != nil { - visitor.Warn("start error: %v", err) - continue - } - ctl.visitors[cfg.GetName()] = visitor - visitor.Info("start visitor success") - } - - // send NewProxy message for all configured proxies - for _, cfg := range ctl.pxyCfgs { - var newProxyMsg msg.NewProxy - cfg.UnMarshalToMsg(&newProxyMsg) - ctl.sendCh <- &newProxyMsg - } + // start all local visitors and send NewProxy message for all configured proxies + ctl.pm.Reset(ctl.sendCh, ctl.runId) + ctl.pm.CheckAndStartProxy() return nil } -func (ctl *Control) NewWorkConn() { +func (ctl *Control) HandleReqWorkConn(inMsg *msg.ReqWorkConn) { workConn, err := ctl.connectServer() if err != nil { return } m := &msg.NewWorkConn{ - RunId: ctl.getRunId(), + RunId: ctl.runId, } if err = msg.WriteMsg(workConn, m); err != nil { ctl.Warn("work connection write to server error: %v", err) @@ -182,33 +152,26 @@ func (ctl *Control) NewWorkConn() { workConn.AddLogPrefix(startMsg.ProxyName) // dispatch this work connection to related proxy - pxy, ok := ctl.getProxy(startMsg.ProxyName) - if ok { - workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String()) - go pxy.InWorkConn(workConn) + ctl.pm.HandleWorkConn(startMsg.ProxyName, workConn) +} + +func (ctl *Control) HandleNewProxyResp(inMsg *msg.NewProxyResp) { + // Server will return NewProxyResp message to each NewProxy message. + // Start a new proxy handler if no error got + err := ctl.pm.StartProxy(inMsg.ProxyName, inMsg.Error) + if err != nil { + ctl.Warn("[%s] start error: %v", inMsg.ProxyName, err) } else { - workConn.Close() + ctl.Info("[%s] start proxy success", inMsg.ProxyName) } } func (ctl *Control) Close() error { ctl.mu.Lock() + defer ctl.mu.Unlock() ctl.exit = true - err := errors.PanicToError(func() { - for name, _ := range ctl.proxies { - ctl.sendCh <- &msg.CloseProxy{ - ProxyName: name, - } - } - }) - ctl.mu.Unlock() - return err -} - -func (ctl *Control) init() { - ctl.sendCh = make(chan msg.Message, 10) - ctl.readCh = make(chan msg.Message, 10) - ctl.closedCh = make(chan int) + ctl.pm.CloseProxies() + return nil } // login send a login message to server and wait for a loginResp message. @@ -249,7 +212,7 @@ func (ctl *Control) login() (err error) { now := time.Now().Unix() ctl.loginMsg.PrivilegeKey = util.GetAuthKey(config.ClientCommonCfg.PrivilegeToken, now) ctl.loginMsg.Timestamp = now - ctl.loginMsg.RunId = ctl.getRunId() + ctl.loginMsg.RunId = ctl.runId if err = msg.WriteMsg(conn, ctl.loginMsg); err != nil { return err @@ -270,16 +233,11 @@ func (ctl *Control) login() (err error) { ctl.conn = conn // update runId got from server - ctl.setRunId(loginRespMsg.RunId) + ctl.runId = loginRespMsg.RunId config.ClientCommonCfg.ServerUdpPort = loginRespMsg.ServerUdpPort ctl.ClearLogPrefix() ctl.AddLogPrefix(loginRespMsg.RunId) ctl.Info("login to server success, get run id [%s], server udp port [%d]", loginRespMsg.RunId, loginRespMsg.ServerUdpPort) - - // login success, so we let closedCh available again - ctl.closedCh = make(chan int) - ctl.lastPong = time.Now() - return nil } @@ -292,7 +250,6 @@ func (ctl *Control) connectServer() (conn frpNet.Conn, err error) { return } conn = frpNet.WrapConn(stream) - } else { conn, err = frpNet.ConnectServerByHttpProxy(config.ClientCommonCfg.HttpProxy, config.ClientCommonCfg.Protocol, fmt.Sprintf("%s:%d", config.ClientCommonCfg.ServerAddr, config.ClientCommonCfg.ServerPort)) @@ -304,12 +261,14 @@ func (ctl *Control) connectServer() (conn frpNet.Conn, err error) { return } +// reader read all messages from frps and send to readCh func (ctl *Control) reader() { defer func() { if err := recover(); err != nil { ctl.Error("panic error: %v", err) } }() + defer ctl.readerShutdown.Done() defer close(ctl.closedCh) encReader := crypto.NewReader(ctl.conn, []byte(config.ClientCommonCfg.PrivilegeToken)) @@ -328,7 +287,9 @@ func (ctl *Control) reader() { } } +// writer writes messages got from sendCh to frps func (ctl *Control) writer() { + defer ctl.writerShutdown.Done() encWriter, err := crypto.NewWriter(ctl.conn, []byte(config.ClientCommonCfg.PrivilegeToken)) if err != nil { ctl.conn.Error("crypto new writer error: %v", err) @@ -348,19 +309,22 @@ func (ctl *Control) writer() { } } -// manager handles all channel events and do corresponding process -func (ctl *Control) manager() { +// msgHandler handles all channel events and do corresponding operations. +func (ctl *Control) msgHandler() { defer func() { if err := recover(); err != nil { ctl.Error("panic error: %v", err) } }() + defer ctl.msgHandlerShutdown.Done() hbSend := time.NewTicker(time.Duration(config.ClientCommonCfg.HeartBeatInterval) * time.Second) defer hbSend.Stop() hbCheck := time.NewTicker(time.Second) defer hbCheck.Stop() + ctl.lastPong = time.Now() + for { select { case <-hbSend.C: @@ -381,35 +345,9 @@ func (ctl *Control) manager() { switch m := rawMsg.(type) { case *msg.ReqWorkConn: - go ctl.NewWorkConn() + go ctl.HandleReqWorkConn(m) case *msg.NewProxyResp: - // Server will return NewProxyResp message to each NewProxy message. - // Start a new proxy handler if no error got - if m.Error != "" { - ctl.Warn("[%s] start error: %s", m.ProxyName, m.Error) - continue - } - cfg, ok := ctl.getProxyConf(m.ProxyName) - if !ok { - // it will never go to this branch now - ctl.Warn("[%s] no proxy conf found", m.ProxyName) - continue - } - - oldPxy, ok := ctl.getProxy(m.ProxyName) - if ok { - oldPxy.Close() - } - pxy := NewProxy(ctl, cfg) - if err := pxy.Run(); err != nil { - ctl.Warn("[%s] proxy start running error: %v", m.ProxyName, err) - ctl.sendCh <- &msg.CloseProxy{ - ProxyName: m.ProxyName, - } - continue - } - ctl.addProxy(m.ProxyName, pxy) - ctl.Info("[%s] start proxy success", m.ProxyName) + ctl.HandleNewProxyResp(m) case *msg.Pong: ctl.lastPong = time.Now() ctl.Debug("receive heartbeat from server") @@ -419,10 +357,14 @@ func (ctl *Control) manager() { } // controler keep watching closedCh, start a new connection if previous control connection is closed. -// If controler is notified by closedCh, reader and writer and manager will exit, then recall these functions. -func (ctl *Control) controler() { +// If controler is notified by closedCh, reader and writer and handler will exit, then recall these functions. +func (ctl *Control) worker() { + go ctl.msgHandler() + go ctl.writer() + go ctl.reader() + var err error - maxDelayTime := 30 * time.Second + maxDelayTime := 20 * time.Second delayTime := time.Second checkInterval := 10 * time.Second @@ -430,41 +372,20 @@ func (ctl *Control) controler() { for { select { case <-checkProxyTicker.C: - // Every 10 seconds, check which proxy registered failed and reregister it to server. - ctl.mu.RLock() - for _, cfg := range ctl.pxyCfgs { - if _, exist := ctl.proxies[cfg.GetName()]; !exist { - ctl.Info("try to register proxy [%s]", cfg.GetName()) - var newProxyMsg msg.NewProxy - cfg.UnMarshalToMsg(&newProxyMsg) - ctl.sendCh <- &newProxyMsg - } - } - - for _, cfg := range ctl.visitorCfgs { - if _, exist := ctl.visitors[cfg.GetName()]; !exist { - ctl.Info("try to start visitor [%s]", cfg.GetName()) - visitor := NewVisitor(ctl, cfg) - err = visitor.Run() - if err != nil { - visitor.Warn("start error: %v", err) - continue - } - ctl.visitors[cfg.GetName()] = visitor - visitor.Info("start visitor success") - } - } - ctl.mu.RUnlock() + // every 10 seconds, check which proxy registered failed and reregister it to server + ctl.pm.CheckAndStartProxy() case _, ok := <-ctl.closedCh: // we won't get any variable from this channel if !ok { - // close related channels + // close related channels and wait until other goroutines done close(ctl.readCh) - close(ctl.sendCh) + ctl.readerShutdown.WaitDone() + ctl.msgHandlerShutdown.WaitDone() - for _, pxy := range ctl.proxies { - pxy.Close() - } + close(ctl.sendCh) + ctl.writerShutdown.WaitDone() + + ctl.pm.CloseProxies() // if ctl.exit is true, just exit ctl.mu.RLock() exit := ctl.exit @@ -473,9 +394,7 @@ func (ctl *Control) controler() { return } - time.Sleep(time.Second) - - // loop util reconnect to server success + // loop util reconnecting to server success for { ctl.Info("try to reconnect to server...") err = ctl.login() @@ -488,27 +407,27 @@ func (ctl *Control) controler() { } continue } - // reconnect success, init the delayTime + // reconnect success, init delayTime delayTime = time.Second break } // init related channels and variables - ctl.init() + ctl.sendCh = make(chan msg.Message, 10) + ctl.readCh = make(chan msg.Message, 10) + ctl.closedCh = make(chan int) + ctl.readerShutdown = shutdown.New() + ctl.writerShutdown = shutdown.New() + ctl.msgHandlerShutdown = shutdown.New() + ctl.pm.Reset(ctl.sendCh, ctl.runId) // previous work goroutines should be closed and start them here - go ctl.manager() + go ctl.msgHandler() go ctl.writer() go ctl.reader() - // send NewProxy message for all configured proxies - ctl.mu.RLock() - for _, cfg := range ctl.pxyCfgs { - var newProxyMsg msg.NewProxy - cfg.UnMarshalToMsg(&newProxyMsg) - ctl.sendCh <- &newProxyMsg - } - ctl.mu.RUnlock() + // start all configured proxies + ctl.pm.CheckAndStartProxy() checkProxyTicker.Stop() checkProxyTicker = time.NewTicker(checkInterval) @@ -517,106 +436,7 @@ func (ctl *Control) controler() { } } -func (ctl *Control) setRunId(runId string) { - ctl.mu.Lock() - defer ctl.mu.Unlock() - ctl.runId = runId -} - -func (ctl *Control) getRunId() string { - ctl.mu.RLock() - defer ctl.mu.RUnlock() - return ctl.runId -} - -func (ctl *Control) getProxy(name string) (pxy Proxy, ok bool) { - ctl.mu.RLock() - defer ctl.mu.RUnlock() - pxy, ok = ctl.proxies[name] - return -} - -func (ctl *Control) addProxy(name string, pxy Proxy) { - ctl.mu.Lock() - defer ctl.mu.Unlock() - ctl.proxies[name] = pxy -} - -func (ctl *Control) getProxyConf(name string) (conf config.ProxyConf, ok bool) { - ctl.mu.RLock() - defer ctl.mu.RUnlock() - conf, ok = ctl.pxyCfgs[name] - return -} - -func (ctl *Control) reloadConf(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.ProxyConf) { - ctl.mu.Lock() - defer ctl.mu.Unlock() - - removedPxyNames := make([]string, 0) - for name, oldCfg := range ctl.pxyCfgs { - del := false - cfg, ok := pxyCfgs[name] - if !ok { - del = true - } else { - if !oldCfg.Compare(cfg) { - del = true - } - } - - if del { - removedPxyNames = append(removedPxyNames, name) - delete(ctl.pxyCfgs, name) - if pxy, ok := ctl.proxies[name]; ok { - pxy.Close() - } - delete(ctl.proxies, name) - ctl.sendCh <- &msg.CloseProxy{ - ProxyName: name, - } - } - } - ctl.Info("proxy removed: %v", removedPxyNames) - - addedPxyNames := make([]string, 0) - for name, cfg := range pxyCfgs { - if _, ok := ctl.pxyCfgs[name]; !ok { - ctl.pxyCfgs[name] = cfg - addedPxyNames = append(addedPxyNames, name) - } - } - ctl.Info("proxy added: %v", addedPxyNames) - - removedVisitorName := make([]string, 0) - for name, oldVisitorCfg := range ctl.visitorCfgs { - del := false - cfg, ok := visitorCfgs[name] - if !ok { - del = true - } else { - if !oldVisitorCfg.Compare(cfg) { - del = true - } - } - - if del { - removedVisitorName = append(removedVisitorName, name) - delete(ctl.visitorCfgs, name) - if visitor, ok := ctl.visitors[name]; ok { - visitor.Close() - } - delete(ctl.visitors, name) - } - } - ctl.Info("visitor removed: %v", removedVisitorName) - - addedVisitorName := make([]string, 0) - for name, visitorCfg := range visitorCfgs { - if _, ok := ctl.visitorCfgs[name]; !ok { - ctl.visitorCfgs[name] = visitorCfg - addedVisitorName = append(addedVisitorName, name) - } - } - ctl.Info("visitor added: %v", addedVisitorName) +func (ctl *Control) reloadConf(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.ProxyConf) error { + err := ctl.pm.Reload(pxyCfgs, visitorCfgs) + return err } diff --git a/client/proxy.go b/client/proxy.go index 0b26bf41..4d07cc63 100644 --- a/client/proxy.go +++ b/client/proxy.go @@ -39,13 +39,13 @@ type Proxy interface { // InWorkConn accept work connections registered to server. InWorkConn(conn frpNet.Conn) + Close() log.Logger } -func NewProxy(ctl *Control, pxyConf config.ProxyConf) (pxy Proxy) { +func NewProxy(pxyConf config.ProxyConf) (pxy Proxy) { baseProxy := BaseProxy{ - ctl: ctl, Logger: log.NewPrefixLogger(pxyConf.GetName()), } switch cfg := pxyConf.(type) { @@ -84,7 +84,6 @@ func NewProxy(ctl *Control, pxyConf config.ProxyConf) (pxy Proxy) { } type BaseProxy struct { - ctl *Control closed bool mu sync.RWMutex log.Logger diff --git a/client/proxy_manager.go b/client/proxy_manager.go new file mode 100644 index 00000000..d756b5ba --- /dev/null +++ b/client/proxy_manager.go @@ -0,0 +1,340 @@ +package client + +import ( + "fmt" + "sync" + + "github.com/fatedier/frp/models/config" + "github.com/fatedier/frp/models/msg" + "github.com/fatedier/frp/utils/errors" + "github.com/fatedier/frp/utils/log" + frpNet "github.com/fatedier/frp/utils/net" +) + +const ( + ProxyStatusNew = "new" + ProxyStatusStartErr = "start error" + ProxyStatusRunning = "running" + ProxyStatusClosed = "closed" +) + +type ProxyManager struct { + ctl *Control + + proxies map[string]*ProxyWrapper + + visitorCfgs map[string]config.ProxyConf + visitors map[string]Visitor + + sendCh chan (msg.Message) + + closed bool + mu sync.RWMutex + + log.Logger +} + +type ProxyWrapper struct { + Name string + Type string + Status string + Err string + Cfg config.ProxyConf + + pxy Proxy + + mu sync.RWMutex +} + +type ProxyStatus struct { + Name string `json:"name"` + Type string `json:"type"` + Status string `json:"status"` + Err string `json:"err"` + Cfg config.ProxyConf `json:"cfg"` +} + +func NewProxyWrapper(cfg config.ProxyConf) *ProxyWrapper { + return &ProxyWrapper{ + Name: cfg.GetName(), + Type: cfg.GetType(), + Status: ProxyStatusNew, + Cfg: cfg, + pxy: nil, + } +} + +func (pw *ProxyWrapper) IsRunning() bool { + pw.mu.RLock() + defer pw.mu.RUnlock() + if pw.Status == ProxyStatusRunning { + return true + } else { + return false + } +} + +func (pw *ProxyWrapper) GetStatus() *ProxyStatus { + pw.mu.RLock() + defer pw.mu.RUnlock() + ps := &ProxyStatus{ + Name: pw.Name, + Type: pw.Type, + Status: pw.Status, + Err: pw.Err, + Cfg: pw.Cfg, + } + return ps +} + +func (pw *ProxyWrapper) Start(serverRespErr string) error { + if pw.pxy != nil { + pw.pxy.Close() + pw.pxy = nil + } + + if serverRespErr != "" { + pw.mu.Lock() + pw.Status = ProxyStatusStartErr + pw.Err = serverRespErr + pw.mu.Unlock() + return fmt.Errorf(serverRespErr) + } + + pxy := NewProxy(pw.Cfg) + pw.mu.Lock() + defer pw.mu.Unlock() + if err := pxy.Run(); err != nil { + pw.Status = ProxyStatusStartErr + pw.Err = err.Error() + return err + } + pw.Status = ProxyStatusRunning + pw.Err = "" + pw.pxy = pxy + return nil +} + +func (pw *ProxyWrapper) InWorkConn(workConn frpNet.Conn) { + pw.mu.RLock() + pxy := pw.pxy + pw.mu.RUnlock() + if pxy != nil { + workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String()) + go pxy.InWorkConn(workConn) + } else { + workConn.Close() + } +} + +func (pw *ProxyWrapper) Close() { + pw.mu.Lock() + defer pw.mu.Unlock() + if pw.pxy != nil { + pw.pxy.Close() + pw.pxy = nil + } + pw.Status = ProxyStatusClosed +} + +func NewProxyManager(ctl *Control, msgSendCh chan (msg.Message), logPrefix string) *ProxyManager { + return &ProxyManager{ + proxies: make(map[string]*ProxyWrapper), + visitorCfgs: make(map[string]config.ProxyConf), + visitors: make(map[string]Visitor), + sendCh: msgSendCh, + closed: false, + Logger: log.NewPrefixLogger(logPrefix), + } +} + +func (pm *ProxyManager) Reset(msgSendCh chan (msg.Message), logPrefix string) { + pm.mu.Lock() + defer pm.mu.Unlock() + pm.closed = false + pm.sendCh = msgSendCh + pm.ClearLogPrefix() + pm.AddLogPrefix(logPrefix) +} + +// Must hold the lock before calling this function. +func (pm *ProxyManager) sendMsg(m msg.Message) error { + err := errors.PanicToError(func() { + pm.sendCh <- m + }) + if err != nil { + pm.closed = true + } + return err +} + +func (pm *ProxyManager) StartProxy(name string, serverRespErr string) error { + pm.mu.Lock() + defer pm.mu.Unlock() + if pm.closed { + return fmt.Errorf("ProxyManager is closed now") + } + + pxy, ok := pm.proxies[name] + if !ok { + return fmt.Errorf("no proxy found") + } + + if err := pxy.Start(serverRespErr); err != nil { + errRet := err + err = pm.sendMsg(&msg.CloseProxy{ + ProxyName: name, + }) + if err != nil { + errRet = fmt.Errorf("send CloseProxy message error") + } + return errRet + } + return nil +} + +func (pm *ProxyManager) CloseProxies() { + pm.mu.RLock() + defer pm.mu.RUnlock() + for _, pxy := range pm.proxies { + pxy.Close() + } +} + +func (pm *ProxyManager) CheckAndStartProxy() { + pm.mu.RLock() + defer pm.mu.RUnlock() + if pm.closed { + pm.Warn("CheckAndStartProxy error: ProxyManager is closed now") + return + } + + for _, pxy := range pm.proxies { + if !pxy.IsRunning() { + var newProxyMsg msg.NewProxy + pxy.Cfg.UnMarshalToMsg(&newProxyMsg) + err := pm.sendMsg(&newProxyMsg) + if err != nil { + pm.Warn("[%s] proxy send NewProxy message error") + return + } + } + } + + for _, cfg := range pm.visitorCfgs { + if _, exist := pm.visitors[cfg.GetName()]; !exist { + pm.Info("try to start visitor [%s]", cfg.GetName()) + visitor := NewVisitor(pm.ctl, cfg) + err := visitor.Run() + if err != nil { + visitor.Warn("start error: %v", err) + continue + } + pm.visitors[cfg.GetName()] = visitor + visitor.Info("start visitor success") + } + } +} + +func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.ProxyConf) error { + pm.mu.Lock() + defer pm.mu.Unlock() + if pm.closed { + err := fmt.Errorf("Reload error: ProxyManager is closed now") + pm.Warn(err.Error()) + return err + } + + delPxyNames := make([]string, 0) + for name, pxy := range pm.proxies { + del := false + cfg, ok := pxyCfgs[name] + if !ok { + del = true + } else { + if !pxy.Cfg.Compare(cfg) { + del = true + } + } + + if del { + delPxyNames = append(delPxyNames, name) + delete(pm.proxies, name) + + pxy.Close() + err := pm.sendMsg(&msg.CloseProxy{ + ProxyName: name, + }) + if err != nil { + err = fmt.Errorf("Reload error: ProxyManager is closed now") + pm.Warn(err.Error()) + return err + } + } + } + pm.Info("proxy removed: %v", delPxyNames) + + addPxyNames := make([]string, 0) + for name, cfg := range pxyCfgs { + if _, ok := pm.proxies[name]; !ok { + pxy := NewProxyWrapper(cfg) + pm.proxies[name] = pxy + addPxyNames = append(addPxyNames, name) + } + } + pm.Info("proxy added: %v", addPxyNames) + + delVisitorName := make([]string, 0) + for name, oldVisitorCfg := range pm.visitorCfgs { + del := false + cfg, ok := visitorCfgs[name] + if !ok { + del = true + } else { + if !oldVisitorCfg.Compare(cfg) { + del = true + } + } + + if del { + delVisitorName = append(delVisitorName, name) + delete(pm.visitorCfgs, name) + if visitor, ok := pm.visitors[name]; ok { + visitor.Close() + } + delete(pm.visitors, name) + } + } + pm.Info("visitor removed: %v", delVisitorName) + + addVisitorName := make([]string, 0) + for name, visitorCfg := range visitorCfgs { + if _, ok := pm.visitorCfgs[name]; !ok { + pm.visitorCfgs[name] = visitorCfg + addVisitorName = append(addVisitorName, name) + } + } + pm.Info("visitor added: %v", addVisitorName) + return nil +} + +func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn) { + pm.mu.RLock() + pw, ok := pm.proxies[name] + pm.mu.RUnlock() + if ok { + pw.InWorkConn(workConn) + } else { + workConn.Close() + } +} + +func (pm *ProxyManager) GetAllProxyStatus() []*ProxyStatus { + ps := make([]*ProxyStatus, 0) + pm.mu.RLock() + defer pm.mu.RUnlock() + for _, pxy := range pm.proxies { + ps = append(ps, pxy.GetStatus()) + } + return ps +} diff --git a/client/service.go b/client/service.go index 49c78486..c5a2f1e4 100644 --- a/client/service.go +++ b/client/service.go @@ -53,6 +53,6 @@ func (svr *Service) Run() error { return nil } -func (svr *Service) Close() error { - return svr.ctl.Close() +func (svr *Service) Close() { + svr.ctl.Close() } diff --git a/cmd/frpc/main.go b/cmd/frpc/main.go index f0d438f8..f1836db2 100644 --- a/cmd/frpc/main.go +++ b/cmd/frpc/main.go @@ -28,6 +28,7 @@ import ( "time" docopt "github.com/docopt/docopt-go" + "github.com/rodaine/table" ini "github.com/vaughan0/go-ini" "github.com/fatedier/frp/client" @@ -44,7 +45,8 @@ var usage string = `frpc is the client of frp Usage: frpc [-c config_file] [-L log_file] [--log-level=] [--server-addr=] - frpc [-c config_file] --reload + frpc reload [-c config_file] + frpc status [-c config_file] frpc -h | --help frpc -v | --version @@ -53,7 +55,6 @@ Options: -L log_file set output log file, including console --log-level= set log level: debug, info, warn, error --server-addr= addr which frps is listening for, example: 0.0.0.0:7000 - --reload reload configure file without program exit -h --help show this screen -v --version show version ` @@ -82,40 +83,25 @@ func main() { config.ClientCommonCfg.ConfigFile = confFile // check if reload command - if args["--reload"] != nil { - if args["--reload"].(bool) { - req, err := http.NewRequest("GET", "http://"+ - config.ClientCommonCfg.AdminAddr+":"+fmt.Sprintf("%d", config.ClientCommonCfg.AdminPort)+"/api/reload", nil) - if err != nil { + if args["reload"] != nil { + if args["reload"].(bool) { + if err = CmdReload(); err != nil { fmt.Printf("frps reload error: %v\n", err) os.Exit(1) + } else { + fmt.Printf("reload success\n") + os.Exit(0) } + } + } - authStr := "Basic " + base64.StdEncoding.EncodeToString([]byte(config.ClientCommonCfg.AdminUser+":"+ - config.ClientCommonCfg.AdminPwd)) - - req.Header.Add("Authorization", authStr) - resp, err := http.DefaultClient.Do(req) - if err != nil { - fmt.Printf("frpc reload error: %v\n", err) + // check if status command + if args["status"] != nil { + if args["status"].(bool) { + if err = CmdStatus(); err != nil { + fmt.Println("frps get status error: %v\n", err) os.Exit(1) } else { - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - fmt.Printf("frpc reload error: %v\n", err) - os.Exit(1) - } - res := &client.GeneralResponse{} - err = json.Unmarshal(body, &res) - if err != nil { - fmt.Printf("http response error: %s\n", strings.TrimSpace(string(body))) - os.Exit(1) - } else if res.Code != 0 { - fmt.Printf("reload error: %s\n", res.Msg) - os.Exit(1) - } - fmt.Printf("reload success\n") os.Exit(0) } } @@ -187,3 +173,133 @@ func HandleSignal(svr *client.Service) { time.Sleep(250 * time.Millisecond) os.Exit(0) } + +func CmdReload() error { + if config.ClientCommonCfg.AdminPort == 0 { + return fmt.Errorf("admin_port shoud be set if you want to use reload feature") + } + + req, err := http.NewRequest("GET", "http://"+ + config.ClientCommonCfg.AdminAddr+":"+fmt.Sprintf("%d", config.ClientCommonCfg.AdminPort)+"/api/reload", nil) + if err != nil { + return err + } + + authStr := "Basic " + base64.StdEncoding.EncodeToString([]byte(config.ClientCommonCfg.AdminUser+":"+ + config.ClientCommonCfg.AdminPwd)) + + req.Header.Add("Authorization", authStr) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } else { + if resp.StatusCode != 200 { + return fmt.Errorf("admin api status code [%d]", resp.StatusCode) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + res := &client.GeneralResponse{} + err = json.Unmarshal(body, &res) + if err != nil { + return fmt.Errorf("unmarshal http response error: %s", strings.TrimSpace(string(body))) + } else if res.Code != 0 { + return fmt.Errorf(res.Msg) + } + } + return nil +} + +func CmdStatus() error { + if config.ClientCommonCfg.AdminPort == 0 { + return fmt.Errorf("admin_port shoud be set if you want to get proxy status") + } + + req, err := http.NewRequest("GET", "http://"+ + config.ClientCommonCfg.AdminAddr+":"+fmt.Sprintf("%d", config.ClientCommonCfg.AdminPort)+"/api/status", nil) + if err != nil { + return err + } + + authStr := "Basic " + base64.StdEncoding.EncodeToString([]byte(config.ClientCommonCfg.AdminUser+":"+ + config.ClientCommonCfg.AdminPwd)) + + req.Header.Add("Authorization", authStr) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } else { + if resp.StatusCode != 200 { + return fmt.Errorf("admin api status code [%d]", resp.StatusCode) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + res := &client.StatusResp{} + err = json.Unmarshal(body, &res) + if err != nil { + return fmt.Errorf("unmarshal http response error: %s", strings.TrimSpace(string(body))) + } + + fmt.Println("Proxy Status...") + if len(res.Tcp) > 0 { + fmt.Printf("TCP") + tbl := table.New("Name", "Status", "LocalAddr", "Plugin", "RemoteAddr", "Error") + for _, ps := range res.Tcp { + tbl.AddRow(ps.Name, ps.Status, ps.LocalAddr, ps.Plugin, ps.RemoteAddr, ps.Err) + } + tbl.Print() + fmt.Println("") + } + if len(res.Udp) > 0 { + fmt.Printf("UDP") + tbl := table.New("Name", "Status", "LocalAddr", "Plugin", "RemoteAddr", "Error") + for _, ps := range res.Udp { + tbl.AddRow(ps.Name, ps.Status, ps.LocalAddr, ps.Plugin, ps.RemoteAddr, ps.Err) + } + tbl.Print() + fmt.Println("") + } + if len(res.Http) > 0 { + fmt.Printf("HTTP") + tbl := table.New("Name", "Status", "LocalAddr", "Plugin", "RemoteAddr", "Error") + for _, ps := range res.Http { + tbl.AddRow(ps.Name, ps.Status, ps.LocalAddr, ps.Plugin, ps.RemoteAddr, ps.Err) + } + tbl.Print() + fmt.Println("") + } + if len(res.Https) > 0 { + fmt.Printf("HTTPS") + tbl := table.New("Name", "Status", "LocalAddr", "Plugin", "RemoteAddr", "Error") + for _, ps := range res.Https { + tbl.AddRow(ps.Name, ps.Status, ps.LocalAddr, ps.Plugin, ps.RemoteAddr, ps.Err) + } + tbl.Print() + fmt.Println("") + } + if len(res.Stcp) > 0 { + fmt.Printf("STCP") + tbl := table.New("Name", "Status", "LocalAddr", "Plugin", "RemoteAddr", "Error") + for _, ps := range res.Stcp { + tbl.AddRow(ps.Name, ps.Status, ps.LocalAddr, ps.Plugin, ps.RemoteAddr, ps.Err) + } + tbl.Print() + fmt.Println("") + } + if len(res.Xtcp) > 0 { + fmt.Printf("XTCP") + tbl := table.New("Name", "Status", "LocalAddr", "Plugin", "RemoteAddr", "Error") + for _, ps := range res.Xtcp { + tbl.AddRow(ps.Name, ps.Status, ps.LocalAddr, ps.Plugin, ps.RemoteAddr, ps.Err) + } + tbl.Print() + fmt.Println("") + } + } + return nil +} diff --git a/models/config/proxy.go b/models/config/proxy.go index ce4c1c2d..e87b7eca 100644 --- a/models/config/proxy.go +++ b/models/config/proxy.go @@ -52,6 +52,7 @@ func NewConfByType(proxyType string) ProxyConf { type ProxyConf interface { GetName() string + GetType() string GetBaseInfo() *BaseProxyConf LoadFromMsg(pMsg *msg.NewProxy) LoadFromFile(name string, conf ini.Section) error @@ -103,6 +104,10 @@ func (cfg *BaseProxyConf) GetName() string { return cfg.ProxyName } +func (cfg *BaseProxyConf) GetType() string { + return cfg.ProxyType +} + func (cfg *BaseProxyConf) GetBaseInfo() *BaseProxyConf { return cfg } diff --git a/server/control.go b/server/control.go index 2833277b..5e8fe95e 100644 --- a/server/control.go +++ b/server/control.go @@ -253,13 +253,13 @@ func (ctl *Control) stoper() { ctl.allShutdown.WaitStart() close(ctl.readCh) - ctl.managerShutdown.WaitDown() + ctl.managerShutdown.WaitDone() close(ctl.sendCh) - ctl.writerShutdown.WaitDown() + ctl.writerShutdown.WaitDone() ctl.conn.Close() - ctl.readerShutdown.WaitDown() + ctl.readerShutdown.WaitDone() close(ctl.workConnCh) for workConn := range ctl.workConnCh { diff --git a/server/service.go b/server/service.go index 5997f3dc..a510b179 100644 --- a/server/service.go +++ b/server/service.go @@ -283,7 +283,7 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e ctl := NewControl(svr, ctlConn, loginMsg) if oldCtl := svr.ctlManager.Add(loginMsg.RunId, ctl); oldCtl != nil { - oldCtl.allShutdown.WaitDown() + oldCtl.allShutdown.WaitDone() } ctlConn.AddLogPrefix(loginMsg.RunId) diff --git a/utils/shutdown/shutdown.go b/utils/shutdown/shutdown.go index cdd87268..7fd7bfcc 100644 --- a/utils/shutdown/shutdown.go +++ b/utils/shutdown/shutdown.go @@ -19,19 +19,19 @@ import ( ) type Shutdown struct { - doing bool - ending bool - start chan struct{} - down chan struct{} - mu sync.Mutex + doing bool + ending bool + startCh chan struct{} + doneCh chan struct{} + mu sync.Mutex } func New() *Shutdown { return &Shutdown{ - doing: false, - ending: false, - start: make(chan struct{}), - down: make(chan struct{}), + doing: false, + ending: false, + startCh: make(chan struct{}), + doneCh: make(chan struct{}), } } @@ -40,12 +40,12 @@ func (s *Shutdown) Start() { defer s.mu.Unlock() if !s.doing { s.doing = true - close(s.start) + close(s.startCh) } } func (s *Shutdown) WaitStart() { - <-s.start + <-s.startCh } func (s *Shutdown) Done() { @@ -53,10 +53,10 @@ func (s *Shutdown) Done() { defer s.mu.Unlock() if !s.ending { s.ending = true - close(s.down) + close(s.doneCh) } } -func (s *Shutdown) WaitDown() { - <-s.down +func (s *Shutdown) WaitDone() { + <-s.doneCh }