From 08c17c3247b7d0c73b06b98d6b5d442034617ec2 Mon Sep 17 00:00:00 2001 From: fatedier Date: Fri, 7 Dec 2018 17:05:36 +0800 Subject: [PATCH 1/4] frpc: support health check --- client/control.go | 51 +++---- client/event.go | 28 ++++ client/health.go | 55 +++++-- client/proxy_manager.go | 294 ++++++++------------------------------ client/proxy_wrapper.go | 219 ++++++++++++++++++++++++++++ client/visitor_manager.go | 16 ++- models/config/proxy.go | 27 +++- server/group/group.go | 1 + server/group/tcp.go | 6 +- 9 files changed, 414 insertions(+), 283 deletions(-) create mode 100644 client/event.go create mode 100644 client/proxy_wrapper.go diff --git a/client/control.go b/client/control.go index 09ca2a02..4a588c5d 100644 --- a/client/control.go +++ b/client/control.go @@ -37,7 +37,8 @@ type Control struct { runId string // manage all proxies - pm *ProxyManager + pxyCfgs map[string]config.ProxyConf + pm *ProxyManager // manage all visitors vm *VisitorManager @@ -76,6 +77,7 @@ func NewControl(runId string, conn frpNet.Conn, session *fmux.Session, pxyCfgs m runId: runId, conn: conn, session: session, + pxyCfgs: pxyCfgs, sendCh: make(chan msg.Message, 100), readCh: make(chan msg.Message, 100), closedCh: make(chan struct{}), @@ -85,8 +87,8 @@ func NewControl(runId string, conn frpNet.Conn, session *fmux.Session, pxyCfgs m msgHandlerShutdown: shutdown.New(), Logger: log.NewPrefixLogger(""), } - ctl.pm = NewProxyManager(ctl.sendCh, "") - ctl.pm.Reload(pxyCfgs, false) + ctl.pm = NewProxyManager(ctl.sendCh, runId) + ctl.vm = NewVisitorManager(ctl) ctl.vm.Reload(visitorCfgs) return ctl @@ -95,10 +97,10 @@ func NewControl(runId string, conn frpNet.Conn, session *fmux.Session, pxyCfgs m func (ctl *Control) Run() { go ctl.worker() - // start all local visitors and send NewProxy message for all configured proxies - ctl.pm.Reset(ctl.sendCh, ctl.runId) - ctl.pm.CheckAndStartProxy([]string{ProxyStatusNew}) + // start all proxies + ctl.pm.Reload(ctl.pxyCfgs) + // start all visitors go ctl.vm.Run() return } @@ -142,7 +144,7 @@ func (ctl *Control) HandleNewProxyResp(inMsg *msg.NewProxyResp) { } func (ctl *Control) Close() error { - ctl.pm.CloseProxies() + ctl.conn.Close() return nil } @@ -275,33 +277,26 @@ func (ctl *Control) worker() { go ctl.reader() go ctl.writer() - checkInterval := 60 * time.Second - checkProxyTicker := time.NewTicker(checkInterval) + select { + case <-ctl.closedCh: + // close related channels and wait until other goroutines done + close(ctl.readCh) + ctl.readerShutdown.WaitDone() + ctl.msgHandlerShutdown.WaitDone() - for { - select { - case <-checkProxyTicker.C: - // check which proxy registered failed and reregister it to server - ctl.pm.CheckAndStartProxy([]string{ProxyStatusStartErr, ProxyStatusClosed}) - case <-ctl.closedCh: - // close related channels and wait until other goroutines done - close(ctl.readCh) - ctl.readerShutdown.WaitDone() - ctl.msgHandlerShutdown.WaitDone() + close(ctl.sendCh) + ctl.writerShutdown.WaitDone() - close(ctl.sendCh) - ctl.writerShutdown.WaitDone() + ctl.pm.Close() + ctl.vm.Close() - ctl.pm.CloseProxies() - - close(ctl.closedDoneCh) - return - } + close(ctl.closedDoneCh) + return } } func (ctl *Control) ReloadConf(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.VisitorConf) error { ctl.vm.Reload(visitorCfgs) - err := ctl.pm.Reload(pxyCfgs, true) - return err + ctl.pm.Reload(pxyCfgs) + return nil } diff --git a/client/event.go b/client/event.go new file mode 100644 index 00000000..b10b1e48 --- /dev/null +++ b/client/event.go @@ -0,0 +1,28 @@ +package client + +import ( + "errors" + + "github.com/fatedier/frp/models/msg" +) + +type EventType int + +const ( + EvStartProxy EventType = iota + EvCloseProxy +) + +var ( + ErrPayloadType = errors.New("error payload type") +) + +type EventHandler func(evType EventType, payload interface{}) error + +type StartProxyPayload struct { + NewProxyMsg *msg.NewProxy +} + +type CloseProxyPayload struct { + CloseProxyMsg *msg.CloseProxy +} diff --git a/client/health.go b/client/health.go index 8e84a6f8..7002adee 100644 --- a/client/health.go +++ b/client/health.go @@ -16,9 +16,17 @@ package client import ( "context" + "errors" + "fmt" "net" "net/http" "time" + + "github.com/fatedier/frp/utils/log" +) + +var ( + ErrHealthCheckType = errors.New("error health check type") ) type HealthCheckMonitor struct { @@ -40,6 +48,8 @@ type HealthCheckMonitor struct { ctx context.Context cancel context.CancelFunc + + l log.Logger } func NewHealthCheckMonitor(checkType string, intervalS int, timeoutS int, maxFailedTimes int, addr string, url string, @@ -70,6 +80,10 @@ func NewHealthCheckMonitor(checkType string, intervalS int, timeoutS int, maxFai } } +func (monitor *HealthCheckMonitor) SetLogger(l log.Logger) { + monitor.l = l +} + func (monitor *HealthCheckMonitor) Start() { go monitor.checkWorker() } @@ -81,7 +95,7 @@ func (monitor *HealthCheckMonitor) Stop() { func (monitor *HealthCheckMonitor) checkWorker() { for { ctx, cancel := context.WithDeadline(monitor.ctx, time.Now().Add(monitor.timeout)) - ok := monitor.doCheck(ctx) + err := monitor.doCheck(ctx) // check if this monitor has been closed select { @@ -92,14 +106,26 @@ func (monitor *HealthCheckMonitor) checkWorker() { cancel() } - if ok { + if err == nil { + if monitor.l != nil { + monitor.l.Trace("do one health check success") + } if !monitor.statusOK && monitor.statusNormalFn != nil { + if monitor.l != nil { + monitor.l.Info("health check status change to success") + } monitor.statusOK = true monitor.statusNormalFn() } } else { + if monitor.l != nil { + monitor.l.Warn("do one health check failed: %v", err) + } monitor.failedTimes++ if monitor.statusOK && int(monitor.failedTimes) >= monitor.maxFailedTimes && monitor.statusFailedFn != nil { + if monitor.l != nil { + monitor.l.Warn("health check status change to failed") + } monitor.statusOK = false monitor.statusFailedFn() } @@ -109,39 +135,44 @@ func (monitor *HealthCheckMonitor) checkWorker() { } } -func (monitor *HealthCheckMonitor) doCheck(ctx context.Context) bool { +func (monitor *HealthCheckMonitor) doCheck(ctx context.Context) error { switch monitor.checkType { case "tcp": return monitor.doTcpCheck(ctx) case "http": return monitor.doHttpCheck(ctx) default: - return false + return ErrHealthCheckType } } -func (monitor *HealthCheckMonitor) doTcpCheck(ctx context.Context) bool { +func (monitor *HealthCheckMonitor) doTcpCheck(ctx context.Context) error { + // if tcp address is not specified, always return nil + if monitor.addr == "" { + return nil + } + var d net.Dialer conn, err := d.DialContext(ctx, "tcp", monitor.addr) if err != nil { - return false + return err } conn.Close() - return true + return nil } -func (monitor *HealthCheckMonitor) doHttpCheck(ctx context.Context) bool { +func (monitor *HealthCheckMonitor) doHttpCheck(ctx context.Context) error { req, err := http.NewRequest("GET", monitor.url, nil) if err != nil { - return false + return err } resp, err := http.DefaultClient.Do(req) if err != nil { - return false + return err } if resp.StatusCode/100 != 2 { - return false + return fmt.Errorf("do http health check, StatusCode is [%d] not 2xx", resp.StatusCode) } - return true + return nil } diff --git a/client/proxy_manager.go b/client/proxy_manager.go index dc9f350d..42fa9a77 100644 --- a/client/proxy_manager.go +++ b/client/proxy_manager.go @@ -12,126 +12,91 @@ import ( "github.com/fatedier/golib/errors" ) -const ( - ProxyStatusNew = "new" - ProxyStatusStartErr = "start error" - ProxyStatusWaitStart = "wait start" - ProxyStatusRunning = "running" - ProxyStatusCheckFailed = "check failed" - ProxyStatusClosed = "closed" -) - type ProxyManager struct { sendCh chan (msg.Message) proxies map[string]*ProxyWrapper - closed bool - mu sync.RWMutex + closed bool + mu sync.RWMutex + + logPrefix string log.Logger } func NewProxyManager(msgSendCh chan (msg.Message), logPrefix string) *ProxyManager { return &ProxyManager{ - proxies: make(map[string]*ProxyWrapper), - sendCh: msgSendCh, - closed: false, - Logger: log.NewPrefixLogger(logPrefix), + proxies: make(map[string]*ProxyWrapper), + sendCh: msgSendCh, + closed: false, + logPrefix: logPrefix, + Logger: log.NewPrefixLogger(logPrefix), } } -func (pm *ProxyManager) Reset(msgSendCh chan (msg.Message), logPrefix string) { - pm.mu.Lock() - defer pm.mu.Unlock() - pm.closed = false - pm.sendCh = msgSendCh - pm.ClearLogPrefix() - pm.AddLogPrefix(logPrefix) -} - -// Must hold the lock before calling this function. -func (pm *ProxyManager) sendMsg(m msg.Message) error { - err := errors.PanicToError(func() { - pm.sendCh <- m - }) - if err != nil { - pm.closed = true - } - return err -} - func (pm *ProxyManager) StartProxy(name string, remoteAddr string, serverRespErr string) error { - pm.mu.Lock() - defer pm.mu.Unlock() - if pm.closed { - return fmt.Errorf("ProxyManager is closed now") - } - + pm.mu.RLock() pxy, ok := pm.proxies[name] + pm.mu.RUnlock() if !ok { - return fmt.Errorf("no proxy found") + return fmt.Errorf("proxy [%s] not found", name) } - if err := pxy.Start(remoteAddr, serverRespErr); err != nil { - errRet := err - err = pm.sendMsg(&msg.CloseProxy{ - ProxyName: name, - }) - if err != nil { - errRet = fmt.Errorf("send CloseProxy message error") - } - return errRet + err := pxy.SetRunningStatus(remoteAddr, serverRespErr) + if err != nil { + return err } return nil } -func (pm *ProxyManager) CloseProxies() { +func (pm *ProxyManager) Close() { pm.mu.RLock() defer pm.mu.RUnlock() for _, pxy := range pm.proxies { - pxy.Close() + pxy.Stop() } } -// pxyStatus: check and start proxies in which status -func (pm *ProxyManager) CheckAndStartProxy(pxyStatus []string) { +func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn) { + pm.mu.RLock() + pw, ok := pm.proxies[name] + pm.mu.RUnlock() + if ok { + pw.InWorkConn(workConn) + } else { + workConn.Close() + } +} + +func (pm *ProxyManager) HandleEvent(evType EventType, payload interface{}) error { + var m msg.Message + switch event := payload.(type) { + case *StartProxyPayload: + m = event.NewProxyMsg + case *CloseProxyPayload: + m = event.CloseProxyMsg + default: + return ErrPayloadType + } + + err := errors.PanicToError(func() { + pm.sendCh <- m + }) + return err +} + +func (pm *ProxyManager) GetAllProxyStatus() []*ProxyStatus { + ps := make([]*ProxyStatus, 0) pm.mu.RLock() defer pm.mu.RUnlock() - if pm.closed { - pm.Warn("CheckAndStartProxy error: ProxyManager is closed now") - return - } - for _, pxy := range pm.proxies { - status := pxy.GetStatusStr() - for _, s := range pxyStatus { - if status == s { - var newProxyMsg msg.NewProxy - pxy.Cfg.MarshalToMsg(&newProxyMsg) - err := pm.sendMsg(&newProxyMsg) - if err != nil { - pm.Warn("[%s] proxy send NewProxy message error") - return - } - pxy.WaitStart() - break - } - } + ps = append(ps, pxy.GetStatus()) } + return ps } -func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf, startNow bool) error { +func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf) { pm.mu.Lock() - defer func() { - pm.mu.Unlock() - if startNow { - go pm.CheckAndStartProxy([]string{ProxyStatusNew}) - } - }() - if pm.closed { - err := fmt.Errorf("Reload error: ProxyManager is closed now") - pm.Warn(err.Error()) - return err - } + defer pm.mu.Unlock() delPxyNames := make([]string, 0) for name, pxy := range pm.proxies { @@ -149,163 +114,24 @@ func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf, startNow boo delPxyNames = append(delPxyNames, name) delete(pm.proxies, name) - pxy.Close() - err := pm.sendMsg(&msg.CloseProxy{ - ProxyName: name, - }) - if err != nil { - err = fmt.Errorf("Reload error: ProxyManager is closed now") - pm.Warn(err.Error()) - return err - } + pxy.Stop() } } - pm.Info("proxy removed: %v", delPxyNames) + if len(delPxyNames) > 0 { + pm.Info("proxy removed: %v", delPxyNames) + } addPxyNames := make([]string, 0) for name, cfg := range pxyCfgs { if _, ok := pm.proxies[name]; !ok { - pxy := NewProxyWrapper(cfg) + pxy := NewProxyWrapper(cfg, pm.HandleEvent, pm.logPrefix) pm.proxies[name] = pxy addPxyNames = append(addPxyNames, name) + + pxy.Start() } } - pm.Info("proxy added: %v", addPxyNames) - return nil -} - -func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn) { - pm.mu.RLock() - pw, ok := pm.proxies[name] - pm.mu.RUnlock() - if ok { - pw.InWorkConn(workConn) - } else { - workConn.Close() + if len(addPxyNames) > 0 { + pm.Info("proxy added: %v", addPxyNames) } } - -func (pm *ProxyManager) GetAllProxyStatus() []*ProxyStatus { - ps := make([]*ProxyStatus, 0) - pm.mu.RLock() - defer pm.mu.RUnlock() - for _, pxy := range pm.proxies { - ps = append(ps, pxy.GetStatus()) - } - return ps -} - -type ProxyStatus struct { - Name string `json:"name"` - Type string `json:"type"` - Status string `json:"status"` - Err string `json:"err"` - Cfg config.ProxyConf `json:"cfg"` - - // Got from server. - RemoteAddr string `json:"remote_addr"` -} - -// ProxyWrapper is a wrapper of Proxy interface only used in ProxyManager -// Add additional proxy status info -type ProxyWrapper struct { - Name string - Type string - Status string - Err string - Cfg config.ProxyConf - - RemoteAddr string - - pxy Proxy - - mu sync.RWMutex -} - -func NewProxyWrapper(cfg config.ProxyConf) *ProxyWrapper { - return &ProxyWrapper{ - Name: cfg.GetBaseInfo().ProxyName, - Type: cfg.GetBaseInfo().ProxyType, - Status: ProxyStatusNew, - Cfg: cfg, - pxy: nil, - } -} - -func (pw *ProxyWrapper) GetStatusStr() string { - pw.mu.RLock() - defer pw.mu.RUnlock() - return pw.Status -} - -func (pw *ProxyWrapper) GetStatus() *ProxyStatus { - pw.mu.RLock() - defer pw.mu.RUnlock() - ps := &ProxyStatus{ - Name: pw.Name, - Type: pw.Type, - Status: pw.Status, - Err: pw.Err, - Cfg: pw.Cfg, - RemoteAddr: pw.RemoteAddr, - } - return ps -} - -func (pw *ProxyWrapper) WaitStart() { - pw.mu.Lock() - defer pw.mu.Unlock() - pw.Status = ProxyStatusWaitStart -} - -func (pw *ProxyWrapper) Start(remoteAddr string, serverRespErr string) error { - if pw.pxy != nil { - pw.pxy.Close() - pw.pxy = nil - } - - if serverRespErr != "" { - pw.mu.Lock() - pw.Status = ProxyStatusStartErr - pw.RemoteAddr = remoteAddr - pw.Err = serverRespErr - pw.mu.Unlock() - return fmt.Errorf(serverRespErr) - } - - pxy := NewProxy(pw.Cfg) - pw.mu.Lock() - defer pw.mu.Unlock() - pw.RemoteAddr = remoteAddr - if err := pxy.Run(); err != nil { - pw.Status = ProxyStatusStartErr - pw.Err = err.Error() - return err - } - pw.Status = ProxyStatusRunning - pw.Err = "" - pw.pxy = pxy - return nil -} - -func (pw *ProxyWrapper) InWorkConn(workConn frpNet.Conn) { - pw.mu.RLock() - pxy := pw.pxy - pw.mu.RUnlock() - if pxy != nil { - workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String()) - go pxy.InWorkConn(workConn) - } else { - workConn.Close() - } -} - -func (pw *ProxyWrapper) Close() { - pw.mu.Lock() - defer pw.mu.Unlock() - if pw.pxy != nil { - pw.pxy.Close() - pw.pxy = nil - } - pw.Status = ProxyStatusClosed -} diff --git a/client/proxy_wrapper.go b/client/proxy_wrapper.go new file mode 100644 index 00000000..a19d97b2 --- /dev/null +++ b/client/proxy_wrapper.go @@ -0,0 +1,219 @@ +package client + +import ( + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/fatedier/frp/models/config" + "github.com/fatedier/frp/models/msg" + "github.com/fatedier/frp/utils/log" + frpNet "github.com/fatedier/frp/utils/net" +) + +const ( + ProxyStatusNew = "new" + ProxyStatusWaitStart = "wait start" + ProxyStatusStartErr = "start error" + ProxyStatusRunning = "running" + ProxyStatusCheckFailed = "check failed" + ProxyStatusClosed = "closed" +) + +var ( + statusCheckInterval time.Duration = 3 * time.Second + waitResponseTimeout = 20 * time.Second + startErrTimeout = 30 * time.Second +) + +type ProxyStatus struct { + Name string `json:"name"` + Type string `json:"type"` + Status string `json:"status"` + Err string `json:"err"` + Cfg config.ProxyConf `json:"cfg"` + + // Got from server. + RemoteAddr string `json:"remote_addr"` +} + +type ProxyWrapper struct { + ProxyStatus + + // underlying proxy + pxy Proxy + + // if ProxyConf has healcheck config + // monitor will watch if it is alive + monitor *HealthCheckMonitor + + // event handler + handler EventHandler + + health uint32 + lastSendStartMsg time.Time + lastStartErr time.Time + closeCh chan struct{} + mu sync.RWMutex + + log.Logger +} + +func NewProxyWrapper(cfg config.ProxyConf, eventHandler EventHandler, logPrefix string) *ProxyWrapper { + baseInfo := cfg.GetBaseInfo() + pw := &ProxyWrapper{ + ProxyStatus: ProxyStatus{ + Name: baseInfo.ProxyName, + Type: baseInfo.ProxyType, + Status: ProxyStatusNew, + Cfg: cfg, + }, + closeCh: make(chan struct{}), + handler: eventHandler, + Logger: log.NewPrefixLogger(logPrefix), + } + pw.AddLogPrefix(pw.Name) + + if baseInfo.HealthCheckType != "" { + pw.health = 1 // means failed + pw.monitor = NewHealthCheckMonitor(baseInfo.HealthCheckType, baseInfo.HealthCheckIntervalS, + baseInfo.HealthCheckTimeoutS, baseInfo.HealthCheckMaxFailed, baseInfo.HealthCheckAddr, + baseInfo.HealthCheckUrl, pw.statusNormalCallback, pw.statusFailedCallback) + pw.monitor.SetLogger(pw.Logger) + pw.Trace("enable health check monitor") + } + + pw.pxy = NewProxy(pw.Cfg) + return pw +} + +func (pw *ProxyWrapper) SetRunningStatus(remoteAddr string, respErr string) error { + pw.mu.Lock() + defer pw.mu.Unlock() + if pw.Status != ProxyStatusWaitStart { + return fmt.Errorf("status not wait start, ignore start message") + } + + pw.RemoteAddr = remoteAddr + if respErr != "" { + pw.Status = ProxyStatusStartErr + pw.Err = respErr + pw.lastStartErr = time.Now() + return fmt.Errorf(pw.Err) + } + + if err := pw.pxy.Run(); err != nil { + pw.Status = ProxyStatusStartErr + pw.Err = err.Error() + pw.lastStartErr = time.Now() + return err + } + + pw.Status = ProxyStatusRunning + pw.Err = "" + return nil +} + +func (pw *ProxyWrapper) Start() { + go pw.checkWorker() + if pw.monitor != nil { + go pw.monitor.Start() + } +} + +func (pw *ProxyWrapper) Stop() { + pw.mu.Lock() + defer pw.mu.Unlock() + pw.pxy.Close() + if pw.monitor != nil { + pw.monitor.Stop() + } + pw.Status = ProxyStatusClosed + + pw.handler(EvCloseProxy, &CloseProxyPayload{ + CloseProxyMsg: &msg.CloseProxy{ + ProxyName: pw.Name, + }, + }) +} + +func (pw *ProxyWrapper) checkWorker() { + for { + // check proxy status + now := time.Now() + if atomic.LoadUint32(&pw.health) == 0 { + pw.mu.Lock() + if pw.Status == ProxyStatusNew || + pw.Status == ProxyStatusCheckFailed || + (pw.Status == ProxyStatusWaitStart && now.After(pw.lastSendStartMsg.Add(waitResponseTimeout))) || + (pw.Status == ProxyStatusStartErr && now.After(pw.lastStartErr.Add(startErrTimeout))) { + + pw.Trace("change status from [%s] to [%s]", pw.Status, ProxyStatusWaitStart) + pw.Status = ProxyStatusWaitStart + + var newProxyMsg msg.NewProxy + pw.Cfg.MarshalToMsg(&newProxyMsg) + pw.lastSendStartMsg = now + pw.handler(EvStartProxy, &StartProxyPayload{ + NewProxyMsg: &newProxyMsg, + }) + } + pw.mu.Unlock() + } else { + pw.mu.Lock() + if pw.Status == ProxyStatusRunning || pw.Status == ProxyStatusWaitStart { + pw.handler(EvCloseProxy, &CloseProxyPayload{ + CloseProxyMsg: &msg.CloseProxy{ + ProxyName: pw.Name, + }, + }) + pw.Trace("change status from [%s] to [%s]", pw.Status, ProxyStatusCheckFailed) + pw.Status = ProxyStatusCheckFailed + } + pw.mu.Unlock() + } + + select { + case <-pw.closeCh: + return + case <-time.After(statusCheckInterval): + } + } +} + +func (pw *ProxyWrapper) statusNormalCallback() { + atomic.StoreUint32(&pw.health, 0) + pw.Info("health check success") +} + +func (pw *ProxyWrapper) statusFailedCallback() { + atomic.StoreUint32(&pw.health, 1) + pw.Info("health check failed") +} + +func (pw *ProxyWrapper) InWorkConn(workConn frpNet.Conn) { + pw.mu.RLock() + pxy := pw.pxy + pw.mu.RUnlock() + if pxy != nil { + workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String()) + go pxy.InWorkConn(workConn) + } else { + workConn.Close() + } +} + +func (pw *ProxyWrapper) GetStatus() *ProxyStatus { + pw.mu.RLock() + defer pw.mu.RUnlock() + ps := &ProxyStatus{ + Name: pw.Name, + Type: pw.Type, + Status: pw.Status, + Err: pw.Err, + Cfg: pw.Cfg, + RemoteAddr: pw.RemoteAddr, + } + return ps +} diff --git a/client/visitor_manager.go b/client/visitor_manager.go index 3e0aa80b..b223d55d 100644 --- a/client/visitor_manager.go +++ b/client/visitor_manager.go @@ -96,7 +96,9 @@ func (vm *VisitorManager) Reload(cfgs map[string]config.VisitorConf) { delete(vm.visitors, name) } } - log.Info("visitor removed: %v", delNames) + if len(delNames) > 0 { + log.Info("visitor removed: %v", delNames) + } addNames := make([]string, 0) for name, cfg := range cfgs { @@ -106,6 +108,16 @@ func (vm *VisitorManager) Reload(cfgs map[string]config.VisitorConf) { vm.startVisitor(cfg) } } - log.Info("visitor added: %v", addNames) + if len(addNames) > 0 { + log.Info("visitor added: %v", addNames) + } return } + +func (vm *VisitorManager) Close() { + vm.mu.Lock() + defer vm.mu.Unlock() + for _, v := range vm.visitors { + v.Close() + } +} diff --git a/models/config/proxy.go b/models/config/proxy.go index 9ea680a4..39e31946 100644 --- a/models/config/proxy.go +++ b/models/config/proxy.go @@ -170,6 +170,10 @@ func (cfg *BaseProxyConf) UnmarshalFromIni(prefix string, name string, section i if err := cfg.HealthCheckConf.UnmarshalFromIni(prefix, name, section); err != nil { return err } + + if cfg.HealthCheckType == "tcp" && cfg.Plugin == "" { + cfg.HealthCheckAddr = cfg.LocalIp + fmt.Sprintf(":%d", cfg.LocalPort) + } return nil } @@ -381,7 +385,7 @@ func (cfg *LocalSvrConf) checkForCli() (err error) { // Health check info type HealthCheckConf struct { HealthCheckType string `json:"health_check_type"` // tcp | http - HealthCheckTimeout int `json:"health_check_timeout"` + HealthCheckTimeoutS int `json:"health_check_timeout_s"` HealthCheckMaxFailed int `json:"health_check_max_failed"` HealthCheckIntervalS int `json:"health_check_interval_s"` HealthCheckUrl string `json:"health_check_url"` @@ -392,8 +396,10 @@ type HealthCheckConf struct { func (cfg *HealthCheckConf) compare(cmp *HealthCheckConf) bool { if cfg.HealthCheckType != cmp.HealthCheckType || - cfg.HealthCheckUrl != cmp.HealthCheckUrl || - cfg.HealthCheckIntervalS != cmp.HealthCheckIntervalS { + cfg.HealthCheckTimeoutS != cmp.HealthCheckTimeoutS || + cfg.HealthCheckMaxFailed != cmp.HealthCheckMaxFailed || + cfg.HealthCheckIntervalS != cmp.HealthCheckIntervalS || + cfg.HealthCheckUrl != cmp.HealthCheckUrl { return false } return true @@ -403,6 +409,18 @@ func (cfg *HealthCheckConf) UnmarshalFromIni(prefix string, name string, section cfg.HealthCheckType = section["health_check_type"] cfg.HealthCheckUrl = section["health_check_url"] + if tmpStr, ok := section["health_check_timeout_s"]; ok { + if cfg.HealthCheckTimeoutS, err = strconv.Atoi(tmpStr); err != nil { + return fmt.Errorf("Parse conf error: proxy [%s] health_check_timeout_s error", name) + } + } + + if tmpStr, ok := section["health_check_max_failed"]; ok { + if cfg.HealthCheckMaxFailed, err = strconv.Atoi(tmpStr); err != nil { + return fmt.Errorf("Parse conf error: proxy [%s] health_check_max_failed error", name) + } + } + if tmpStr, ok := section["health_check_interval_s"]; ok { if cfg.HealthCheckIntervalS, err = strconv.Atoi(tmpStr); err != nil { return fmt.Errorf("Parse conf error: proxy [%s] health_check_interval_s error", name) @@ -419,9 +437,6 @@ func (cfg *HealthCheckConf) checkForCli() error { if cfg.HealthCheckType == "http" && cfg.HealthCheckUrl == "" { return fmt.Errorf("health_check_url is required for health check type 'http'") } - if cfg.HealthCheckIntervalS <= 0 { - return fmt.Errorf("health_check_interval_s is required and should greater than 0") - } } return nil } diff --git a/server/group/group.go b/server/group/group.go index 859239eb..a0dae7cd 100644 --- a/server/group/group.go +++ b/server/group/group.go @@ -22,4 +22,5 @@ var ( ErrGroupAuthFailed = errors.New("group auth failed") ErrGroupParamsInvalid = errors.New("group params invalid") ErrListenerClosed = errors.New("group listener closed") + ErrGroupDifferentPort = errors.New("group should have same remote port") ) diff --git a/server/group/tcp.go b/server/group/tcp.go index 2de05b4c..8c46be65 100644 --- a/server/group/tcp.go +++ b/server/group/tcp.go @@ -114,10 +114,14 @@ func (tg *TcpGroup) Listen(proxyName string, group string, groupKey string, addr } go tg.worker() } else { - if tg.group != group || tg.addr != addr || tg.port != port { + if tg.group != group || tg.addr != addr { err = ErrGroupParamsInvalid return } + if tg.port != port { + err = ErrGroupDifferentPort + return + } if tg.groupKey != groupKey { err = ErrGroupAuthFailed return From aea9f9fbcc8735193c979c684ae360d450d40c3c Mon Sep 17 00:00:00 2001 From: fatedier Date: Sun, 9 Dec 2018 21:56:46 +0800 Subject: [PATCH 2/4] health: add more ci cases and fix bugs --- Makefile | 2 +- client/proxy_wrapper.go | 29 +++- cmd/frpc/sub/root.go | 5 + cmd/frps/root.go | 9 +- conf/frpc_full.ini | 2 + models/config/proxy.go | 7 + tests/ci/health/health_test.go | 247 +++++++++++++++++++++++++++++++++ tests/ci/normal_test.go | 14 +- tests/ci/reconnect_test.go | 2 - tests/ci/reload_test.go | 8 +- tests/consts/consts.go | 3 + tests/mock/echo_server.go | 92 ++++++------ tests/mock/http_server.go | 31 +++++ tests/util/process.go | 24 +++- 14 files changed, 409 insertions(+), 66 deletions(-) create mode 100644 tests/ci/health/health_test.go diff --git a/Makefile b/Makefile index bc450af9..c217eb88 100644 --- a/Makefile +++ b/Makefile @@ -34,7 +34,7 @@ gotest: go test -v --cover ./utils/... ci: - go test -count=1 -v ./tests/... + go test -count=1 -p=1 -v ./tests/... alltest: gotest ci diff --git a/client/proxy_wrapper.go b/client/proxy_wrapper.go index a19d97b2..059d1821 100644 --- a/client/proxy_wrapper.go +++ b/client/proxy_wrapper.go @@ -10,6 +10,8 @@ import ( "github.com/fatedier/frp/models/msg" "github.com/fatedier/frp/utils/log" frpNet "github.com/fatedier/frp/utils/net" + + "github.com/fatedier/golib/errors" ) const ( @@ -55,6 +57,7 @@ type ProxyWrapper struct { lastSendStartMsg time.Time lastStartErr time.Time closeCh chan struct{} + healthNotifyCh chan struct{} mu sync.RWMutex log.Logger @@ -69,9 +72,10 @@ func NewProxyWrapper(cfg config.ProxyConf, eventHandler EventHandler, logPrefix Status: ProxyStatusNew, Cfg: cfg, }, - closeCh: make(chan struct{}), - handler: eventHandler, - Logger: log.NewPrefixLogger(logPrefix), + closeCh: make(chan struct{}), + healthNotifyCh: make(chan struct{}), + handler: eventHandler, + Logger: log.NewPrefixLogger(logPrefix), } pw.AddLogPrefix(pw.Name) @@ -125,6 +129,8 @@ func (pw *ProxyWrapper) Start() { func (pw *ProxyWrapper) Stop() { pw.mu.Lock() defer pw.mu.Unlock() + close(pw.closeCh) + close(pw.healthNotifyCh) pw.pxy.Close() if pw.monitor != nil { pw.monitor.Stop() @@ -139,6 +145,10 @@ func (pw *ProxyWrapper) Stop() { } func (pw *ProxyWrapper) checkWorker() { + if pw.monitor != nil { + // let monitor do check request first + time.Sleep(500 * time.Millisecond) + } for { // check proxy status now := time.Now() @@ -178,17 +188,30 @@ func (pw *ProxyWrapper) checkWorker() { case <-pw.closeCh: return case <-time.After(statusCheckInterval): + case <-pw.healthNotifyCh: } } } func (pw *ProxyWrapper) statusNormalCallback() { atomic.StoreUint32(&pw.health, 0) + errors.PanicToError(func() { + select { + case pw.healthNotifyCh <- struct{}{}: + default: + } + }) pw.Info("health check success") } func (pw *ProxyWrapper) statusFailedCallback() { atomic.StoreUint32(&pw.health, 1) + errors.PanicToError(func() { + select { + case pw.healthNotifyCh <- struct{}{}: + default: + } + }) pw.Info("health check failed") } diff --git a/cmd/frpc/sub/root.go b/cmd/frpc/sub/root.go index 23b3bf9c..ca3b853e 100644 --- a/cmd/frpc/sub/root.go +++ b/cmd/frpc/sub/root.go @@ -166,6 +166,11 @@ func parseClientCommonCfgFromCmd() (err error) { g.GlbClientCfg.LogLevel = logLevel g.GlbClientCfg.LogFile = logFile g.GlbClientCfg.LogMaxDays = int64(logMaxDays) + if logFile == "console" { + g.GlbClientCfg.LogWay = "console" + } else { + g.GlbClientCfg.LogWay = "file" + } return nil } diff --git a/cmd/frps/root.go b/cmd/frps/root.go index 76a1acd9..b4ccff4f 100644 --- a/cmd/frps/root.go +++ b/cmd/frps/root.go @@ -52,7 +52,6 @@ var ( dashboardPwd string assetsDir string logFile string - logWay string logLevel string logMaxDays int64 token string @@ -81,7 +80,6 @@ func init() { rootCmd.PersistentFlags().StringVarP(&dashboardUser, "dashboard_user", "", "admin", "dashboard user") rootCmd.PersistentFlags().StringVarP(&dashboardPwd, "dashboard_pwd", "", "admin", "dashboard password") rootCmd.PersistentFlags().StringVarP(&logFile, "log_file", "", "console", "log file") - rootCmd.PersistentFlags().StringVarP(&logWay, "log_way", "", "console", "log way") rootCmd.PersistentFlags().StringVarP(&logLevel, "log_level", "", "info", "log level") rootCmd.PersistentFlags().Int64VarP(&logMaxDays, "log_max_days", "", 3, "log_max_days") rootCmd.PersistentFlags().StringVarP(&token, "token", "t", "", "auth token") @@ -175,7 +173,6 @@ func parseServerCommonCfgFromCmd() (err error) { g.GlbServerCfg.DashboardUser = dashboardUser g.GlbServerCfg.DashboardPwd = dashboardPwd g.GlbServerCfg.LogFile = logFile - g.GlbServerCfg.LogWay = logWay g.GlbServerCfg.LogLevel = logLevel g.GlbServerCfg.LogMaxDays = logMaxDays g.GlbServerCfg.Token = token @@ -194,6 +191,12 @@ func parseServerCommonCfgFromCmd() (err error) { } } g.GlbServerCfg.MaxPortsPerClient = maxPortsPerClient + + if logFile == "console" { + g.GlbClientCfg.LogWay = "console" + } else { + g.GlbClientCfg.LogWay = "file" + } return } diff --git a/conf/frpc_full.ini b/conf/frpc_full.ini index 2b5d327b..3f9a69da 100644 --- a/conf/frpc_full.ini +++ b/conf/frpc_full.ini @@ -77,6 +77,8 @@ group_key = 123456 # frpc will connect local service's port to detect it's healthy status health_check_type = tcp health_check_interval_s = 10 +health_check_max_failed = 1 +health_check_timeout_s = 3 [ssh_random] type = tcp diff --git a/models/config/proxy.go b/models/config/proxy.go index 39e31946..0766b5ae 100644 --- a/models/config/proxy.go +++ b/models/config/proxy.go @@ -174,6 +174,13 @@ func (cfg *BaseProxyConf) UnmarshalFromIni(prefix string, name string, section i if cfg.HealthCheckType == "tcp" && cfg.Plugin == "" { cfg.HealthCheckAddr = cfg.LocalIp + fmt.Sprintf(":%d", cfg.LocalPort) } + if cfg.HealthCheckType == "http" && cfg.Plugin == "" && cfg.HealthCheckUrl != "" { + s := fmt.Sprintf("http://%s:%d", cfg.LocalIp, cfg.LocalPort) + if !strings.HasPrefix(cfg.HealthCheckUrl, "/") { + s += "/" + } + cfg.HealthCheckUrl = s + cfg.HealthCheckUrl + } return nil } diff --git a/tests/ci/health/health_test.go b/tests/ci/health/health_test.go new file mode 100644 index 00000000..fd1a5286 --- /dev/null +++ b/tests/ci/health/health_test.go @@ -0,0 +1,247 @@ +package health + +import ( + "net/http" + "os" + "strings" + "sync" + "testing" + "time" + + "github.com/fatedier/frp/tests/config" + "github.com/fatedier/frp/tests/consts" + "github.com/fatedier/frp/tests/mock" + "github.com/fatedier/frp/tests/util" + + "github.com/stretchr/testify/assert" +) + +const FRPS_CONF = ` +[common] +bind_addr = 0.0.0.0 +bind_port = 14000 +vhost_http_port = 14000 +log_file = console +log_level = debug +token = 123456 +` + +const FRPC_CONF = ` +[common] +server_addr = 127.0.0.1 +server_port = 14000 +log_file = console +log_level = debug +token = 123456 + +[tcp1] +type = tcp +local_port = 15001 +remote_port = 15000 +group = test +group_key = 123 +health_check_type = tcp +health_check_interval_s = 1 + +[tcp2] +type = tcp +local_port = 15002 +remote_port = 15000 +group = test +group_key = 123 +health_check_type = tcp +health_check_interval_s = 1 + +[http1] +type = http +local_port = 15003 +custom_domains = test1.com +health_check_type = http +health_check_interval_s = 1 +health_check_url = /health + +[http2] +type = http +local_port = 15004 +custom_domains = test2.com +health_check_type = http +health_check_interval_s = 1 +health_check_url = /health +` + +func TestHealthCheck(t *testing.T) { + assert := assert.New(t) + + // ****** start backgroud services ****** + echoSvc1 := mock.NewEchoServer(15001, 1, "echo1") + err := echoSvc1.Start() + if assert.NoError(err) { + defer echoSvc1.Stop() + } + + echoSvc2 := mock.NewEchoServer(15002, 1, "echo2") + err = echoSvc2.Start() + if assert.NoError(err) { + defer echoSvc2.Stop() + } + + var healthMu sync.RWMutex + svc1Health := true + svc2Health := true + httpSvc1 := mock.NewHttpServer(15003, func(w http.ResponseWriter, r *http.Request) { + if strings.Contains(r.URL.Path, "health") { + healthMu.RLock() + defer healthMu.RUnlock() + if svc1Health { + w.WriteHeader(200) + } else { + w.WriteHeader(500) + } + } else { + w.Write([]byte("http1")) + } + }) + err = httpSvc1.Start() + if assert.NoError(err) { + defer httpSvc1.Stop() + } + + httpSvc2 := mock.NewHttpServer(15004, func(w http.ResponseWriter, r *http.Request) { + if strings.Contains(r.URL.Path, "health") { + healthMu.RLock() + defer healthMu.RUnlock() + if svc2Health { + w.WriteHeader(200) + } else { + w.WriteHeader(500) + } + } else { + w.Write([]byte("http2")) + } + }) + err = httpSvc2.Start() + if assert.NoError(err) { + defer httpSvc2.Stop() + } + + time.Sleep(200 * time.Millisecond) + + // ****** start frps and frpc ****** + frpsCfgPath, err := config.GenerateConfigFile(consts.FRPS_NORMAL_CONFIG, FRPS_CONF) + if assert.NoError(err) { + defer os.Remove(frpsCfgPath) + } + + frpcCfgPath, err := config.GenerateConfigFile(consts.FRPC_NORMAL_CONFIG, FRPC_CONF) + if assert.NoError(err) { + defer os.Remove(frpcCfgPath) + } + + frpsProcess := util.NewProcess(consts.FRPS_SUB_BIN_PATH, []string{"-c", frpsCfgPath}) + err = frpsProcess.Start() + if assert.NoError(err) { + defer frpsProcess.Stop() + } + + time.Sleep(100 * time.Millisecond) + + frpcProcess := util.NewProcess(consts.FRPC_SUB_BIN_PATH, []string{"-c", frpcCfgPath}) + err = frpcProcess.Start() + if assert.NoError(err) { + defer frpcProcess.Stop() + } + time.Sleep(1000 * time.Millisecond) + + // ****** healcheck type tcp ****** + // echo1 and echo2 is ok + result := make([]string, 0) + res, err := util.SendTcpMsg("127.0.0.1:15000", "echo") + assert.NoError(err) + result = append(result, res) + + res, err = util.SendTcpMsg("127.0.0.1:15000", "echo") + assert.NoError(err) + result = append(result, res) + + assert.Contains(result, "echo1") + assert.Contains(result, "echo2") + + // close echo2 server, echo1 is work + echoSvc2.Stop() + time.Sleep(1200 * time.Millisecond) + + result = make([]string, 0) + res, err = util.SendTcpMsg("127.0.0.1:15000", "echo") + assert.NoError(err) + result = append(result, res) + + res, err = util.SendTcpMsg("127.0.0.1:15000", "echo") + assert.NoError(err) + result = append(result, res) + + assert.NotContains(result, "echo2") + + // resume echo2 server, all services are ok + echoSvc2 = mock.NewEchoServer(15002, 1, "echo2") + err = echoSvc2.Start() + if assert.NoError(err) { + defer echoSvc2.Stop() + } + + time.Sleep(1200 * time.Millisecond) + + result = make([]string, 0) + res, err = util.SendTcpMsg("127.0.0.1:15000", "echo") + assert.NoError(err) + result = append(result, res) + + res, err = util.SendTcpMsg("127.0.0.1:15000", "echo") + assert.NoError(err) + result = append(result, res) + + assert.Contains(result, "echo1") + assert.Contains(result, "echo2") + + // ****** healcheck type http ****** + // http1 and http2 is ok + code, body, _, err := util.SendHttpMsg("GET", "http://127.0.0.1:14000/xxx", "test1.com", nil, "") + assert.NoError(err) + assert.Equal(200, code) + assert.Equal("http1", body) + + code, body, _, err = util.SendHttpMsg("GET", "http://127.0.0.1:14000/xxx", "test2.com", nil, "") + assert.NoError(err) + assert.Equal(200, code) + assert.Equal("http2", body) + + // http2 health check error + healthMu.Lock() + svc2Health = false + healthMu.Unlock() + time.Sleep(1200 * time.Millisecond) + + code, body, _, err = util.SendHttpMsg("GET", "http://127.0.0.1:14000/xxx", "test1.com", nil, "") + assert.NoError(err) + assert.Equal(200, code) + assert.Equal("http1", body) + + code, _, _, err = util.SendHttpMsg("GET", "http://127.0.0.1:14000/xxx", "test2.com", nil, "") + assert.NoError(err) + assert.Equal(404, code) + + // resume http2 service, http1 and http2 are ok + healthMu.Lock() + svc2Health = true + healthMu.Unlock() + time.Sleep(1200 * time.Millisecond) + + code, body, _, err = util.SendHttpMsg("GET", "http://127.0.0.1:14000/xxx", "test1.com", nil, "") + assert.NoError(err) + assert.Equal(200, code) + assert.Equal("http1", body) + + code, body, _, err = util.SendHttpMsg("GET", "http://127.0.0.1:14000/xxx", "test2.com", nil, "") + assert.NoError(err) + assert.Equal(200, code) + assert.Equal("http2", body) +} diff --git a/tests/ci/normal_test.go b/tests/ci/normal_test.go index c5283460..d76cc2df 100644 --- a/tests/ci/normal_test.go +++ b/tests/ci/normal_test.go @@ -22,13 +22,21 @@ import ( ) func TestMain(m *testing.M) { - go mock.StartTcpEchoServer(consts.TEST_TCP_PORT) - go mock.StartTcpEchoServer2(consts.TEST_TCP2_PORT) + var err error + tcpEcho1 := mock.NewEchoServer(consts.TEST_TCP_PORT, 1, "") + tcpEcho2 := mock.NewEchoServer(consts.TEST_TCP2_PORT, 2, "") + + if err = tcpEcho1.Start(); err != nil { + panic(err) + } + if err = tcpEcho2.Start(); err != nil { + panic(err) + } + go mock.StartUdpEchoServer(consts.TEST_UDP_PORT) go mock.StartUnixDomainServer(consts.TEST_UNIX_DOMAIN_ADDR) go mock.StartHttpServer(consts.TEST_HTTP_PORT) - var err error p1 := util.NewProcess(consts.FRPS_BIN_PATH, []string{"-c", "./auto_test_frps.ini"}) if err = p1.Start(); err != nil { panic(err) diff --git a/tests/ci/reconnect_test.go b/tests/ci/reconnect_test.go index 7974c2c7..114567b2 100644 --- a/tests/ci/reconnect_test.go +++ b/tests/ci/reconnect_test.go @@ -17,7 +17,6 @@ const FRPS_RECONNECT_CONF = ` bind_addr = 0.0.0.0 bind_port = 20000 log_file = console -# debug, info, warn, error log_level = debug token = 123456 ` @@ -27,7 +26,6 @@ const FRPC_RECONNECT_CONF = ` server_addr = 127.0.0.1 server_port = 20000 log_file = console -# debug, info, warn, error log_level = debug token = 123456 admin_port = 21000 diff --git a/tests/ci/reload_test.go b/tests/ci/reload_test.go index 9811db95..f05f2edb 100644 --- a/tests/ci/reload_test.go +++ b/tests/ci/reload_test.go @@ -84,7 +84,8 @@ func TestReload(t *testing.T) { frpcCfgPath, err := config.GenerateConfigFile(consts.FRPC_NORMAL_CONFIG, FRPC_RELOAD_CONF_1) if assert.NoError(err) { - defer os.Remove(frpcCfgPath) + rmFile1 := frpcCfgPath + defer os.Remove(rmFile1) } frpsProcess := util.NewProcess(consts.FRPS_BIN_PATH, []string{"-c", frpsCfgPath}) @@ -120,7 +121,10 @@ func TestReload(t *testing.T) { // reload frpc config frpcCfgPath, err = config.GenerateConfigFile(consts.FRPC_NORMAL_CONFIG, FRPC_RELOAD_CONF_2) - assert.NoError(err) + if assert.NoError(err) { + rmFile2 := frpcCfgPath + defer os.Remove(rmFile2) + } err = util.ReloadConf("127.0.0.1:21000", "abc", "abc") assert.NoError(err) diff --git a/tests/consts/consts.go b/tests/consts/consts.go index 60dcffee..4e1c1a00 100644 --- a/tests/consts/consts.go +++ b/tests/consts/consts.go @@ -6,6 +6,9 @@ var ( FRPS_BIN_PATH = "../../bin/frps" FRPC_BIN_PATH = "../../bin/frpc" + FRPS_SUB_BIN_PATH = "../../../bin/frps" + FRPC_SUB_BIN_PATH = "../../../bin/frpc" + FRPS_NORMAL_CONFIG = "./auto_test_frps.ini" FRPC_NORMAL_CONFIG = "./auto_test_frpc.ini" diff --git a/tests/mock/echo_server.go b/tests/mock/echo_server.go index a24947f5..e029f505 100644 --- a/tests/mock/echo_server.go +++ b/tests/mock/echo_server.go @@ -10,40 +10,48 @@ import ( frpNet "github.com/fatedier/frp/utils/net" ) -func StartTcpEchoServer(port int) { - l, err := frpNet.ListenTcp("127.0.0.1", port) - if err != nil { - fmt.Printf("echo server listen error: %v\n", err) - return +type EchoServer struct { + l frpNet.Listener + + port int + repeatedNum int + specifyStr string +} + +func NewEchoServer(port int, repeatedNum int, specifyStr string) *EchoServer { + if repeatedNum <= 0 { + repeatedNum = 1 } - - for { - c, err := l.Accept() - if err != nil { - fmt.Printf("echo server accept error: %v\n", err) - return - } - - go echoWorker(c) + return &EchoServer{ + port: port, + repeatedNum: repeatedNum, + specifyStr: specifyStr, } } -func StartTcpEchoServer2(port int) { - l, err := frpNet.ListenTcp("127.0.0.1", port) +func (es *EchoServer) Start() error { + l, err := frpNet.ListenTcp("127.0.0.1", es.port) if err != nil { - fmt.Printf("echo server2 listen error: %v\n", err) - return + fmt.Printf("echo server listen error: %v\n", err) + return err } + es.l = l - for { - c, err := l.Accept() - if err != nil { - fmt.Printf("echo server2 accept error: %v\n", err) - return + go func() { + for { + c, err := l.Accept() + if err != nil { + return + } + + go echoWorker(c, es.repeatedNum, es.specifyStr) } + }() + return nil +} - go echoWorker2(c) - } +func (es *EchoServer) Stop() { + es.l.Close() } func StartUdpEchoServer(port int) { @@ -60,7 +68,7 @@ func StartUdpEchoServer(port int) { return } - go echoWorker(c) + go echoWorker(c, 1, "") } } @@ -80,11 +88,11 @@ func StartUnixDomainServer(unixPath string) { return } - go echoWorker(c) + go echoWorker(c, 1, "") } } -func echoWorker(c net.Conn) { +func echoWorker(c net.Conn, repeatedNum int, specifyStr string) { buf := make([]byte, 2048) for { @@ -99,28 +107,14 @@ func echoWorker(c net.Conn) { } } - c.Write(buf[:n]) - } -} - -func echoWorker2(c net.Conn) { - buf := make([]byte, 2048) - - for { - n, err := c.Read(buf) - if err != nil { - if err == io.EOF { - c.Close() - break - } else { - fmt.Printf("echo server read error: %v\n", err) - return + if specifyStr != "" { + c.Write([]byte(specifyStr)) + } else { + var w []byte + for i := 0; i < repeatedNum; i++ { + w = append(w, buf[:n]...) } + c.Write(w) } - - var w []byte - w = append(w, buf[:n]...) - w = append(w, buf[:n]...) - c.Write(w) } } diff --git a/tests/mock/http_server.go b/tests/mock/http_server.go index 7e97ad61..37b2b1e6 100644 --- a/tests/mock/http_server.go +++ b/tests/mock/http_server.go @@ -3,6 +3,7 @@ package mock import ( "fmt" "log" + "net" "net/http" "regexp" "strings" @@ -12,6 +13,36 @@ import ( "github.com/gorilla/websocket" ) +type HttpServer struct { + l net.Listener + + port int + handler http.HandlerFunc +} + +func NewHttpServer(port int, handler http.HandlerFunc) *HttpServer { + return &HttpServer{ + port: port, + handler: handler, + } +} + +func (hs *HttpServer) Start() error { + l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", hs.port)) + if err != nil { + fmt.Printf("http server listen error: %v\n", err) + return err + } + hs.l = l + + go http.Serve(l, http.HandlerFunc(hs.handler)) + return nil +} + +func (hs *HttpServer) Stop() { + hs.l.Close() +} + var upgrader = websocket.Upgrader{} func StartHttpServer(port int) { diff --git a/tests/util/process.go b/tests/util/process.go index 1e34040d..e7078468 100644 --- a/tests/util/process.go +++ b/tests/util/process.go @@ -1,22 +1,29 @@ package util import ( + "bytes" "context" "os/exec" ) type Process struct { - cmd *exec.Cmd - cancel context.CancelFunc + cmd *exec.Cmd + cancel context.CancelFunc + errorOutput *bytes.Buffer + + beforeStopHandler func() } func NewProcess(path string, params []string) *Process { ctx, cancel := context.WithCancel(context.Background()) cmd := exec.CommandContext(ctx, path, params...) - return &Process{ + p := &Process{ cmd: cmd, cancel: cancel, } + p.errorOutput = bytes.NewBufferString("") + cmd.Stderr = p.errorOutput + return p } func (p *Process) Start() error { @@ -24,6 +31,17 @@ func (p *Process) Start() error { } func (p *Process) Stop() error { + if p.beforeStopHandler != nil { + p.beforeStopHandler() + } p.cancel() return p.cmd.Wait() } + +func (p *Process) ErrorOutput() string { + return p.errorOutput.String() +} + +func (p *Process) SetBeforeStopHandler(fn func()) { + p.beforeStopHandler = fn +} From 35278ad17fe9d658d9a5463ee94315c8cef7bd81 Mon Sep 17 00:00:00 2001 From: fatedier Date: Sun, 9 Dec 2018 22:06:22 +0800 Subject: [PATCH 3/4] mv folders --- client/admin_api.go | 3 ++- client/control.go | 5 +++-- client/{ => event}/event.go | 2 +- client/{ => health}/health.go | 2 +- client/{ => proxy}/proxy.go | 2 +- client/{ => proxy}/proxy_manager.go | 17 +++++++++-------- client/{ => proxy}/proxy_wrapper.go | 18 ++++++++++-------- tests/ci/normal_test.go | 16 ++++++++-------- 8 files changed, 35 insertions(+), 30 deletions(-) rename client/{ => event}/event.go (96%) rename client/{ => health}/health.go (99%) rename client/{ => proxy}/proxy.go (99%) rename client/{ => proxy}/proxy_manager.go (88%) rename client/{ => proxy}/proxy_wrapper.go (90%) diff --git a/client/admin_api.go b/client/admin_api.go index 50745406..893d127d 100644 --- a/client/admin_api.go +++ b/client/admin_api.go @@ -24,6 +24,7 @@ import ( ini "github.com/vaughan0/go-ini" + "github.com/fatedier/frp/client/proxy" "github.com/fatedier/frp/g" "github.com/fatedier/frp/models/config" "github.com/fatedier/frp/utils/log" @@ -121,7 +122,7 @@ func (a ByProxyStatusResp) Len() int { return len(a) } func (a ByProxyStatusResp) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a ByProxyStatusResp) Less(i, j int) bool { return strings.Compare(a[i].Name, a[j].Name) < 0 } -func NewProxyStatusResp(status *ProxyStatus) ProxyStatusResp { +func NewProxyStatusResp(status *proxy.ProxyStatus) ProxyStatusResp { psr := ProxyStatusResp{ Name: status.Name, Type: status.Type, diff --git a/client/control.go b/client/control.go index 4a588c5d..376893f7 100644 --- a/client/control.go +++ b/client/control.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "github.com/fatedier/frp/client/proxy" "github.com/fatedier/frp/g" "github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/msg" @@ -38,7 +39,7 @@ type Control struct { // manage all proxies pxyCfgs map[string]config.ProxyConf - pm *ProxyManager + pm *proxy.ProxyManager // manage all visitors vm *VisitorManager @@ -87,7 +88,7 @@ func NewControl(runId string, conn frpNet.Conn, session *fmux.Session, pxyCfgs m msgHandlerShutdown: shutdown.New(), Logger: log.NewPrefixLogger(""), } - ctl.pm = NewProxyManager(ctl.sendCh, runId) + ctl.pm = proxy.NewProxyManager(ctl.sendCh, runId) ctl.vm = NewVisitorManager(ctl) ctl.vm.Reload(visitorCfgs) diff --git a/client/event.go b/client/event/event.go similarity index 96% rename from client/event.go rename to client/event/event.go index b10b1e48..e8ea2e20 100644 --- a/client/event.go +++ b/client/event/event.go @@ -1,4 +1,4 @@ -package client +package event import ( "errors" diff --git a/client/health.go b/client/health/health.go similarity index 99% rename from client/health.go rename to client/health/health.go index 7002adee..16c1a598 100644 --- a/client/health.go +++ b/client/health/health.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package client +package health import ( "context" diff --git a/client/proxy.go b/client/proxy/proxy.go similarity index 99% rename from client/proxy.go rename to client/proxy/proxy.go index a89921d2..610a6f88 100644 --- a/client/proxy.go +++ b/client/proxy/proxy.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package client +package proxy import ( "bytes" diff --git a/client/proxy_manager.go b/client/proxy/proxy_manager.go similarity index 88% rename from client/proxy_manager.go rename to client/proxy/proxy_manager.go index 42fa9a77..b65f37a5 100644 --- a/client/proxy_manager.go +++ b/client/proxy/proxy_manager.go @@ -1,9 +1,10 @@ -package client +package proxy import ( "fmt" "sync" + "github.com/fatedier/frp/client/event" "github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/msg" "github.com/fatedier/frp/utils/log" @@ -67,15 +68,15 @@ func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn) { } } -func (pm *ProxyManager) HandleEvent(evType EventType, payload interface{}) error { +func (pm *ProxyManager) HandleEvent(evType event.EventType, payload interface{}) error { var m msg.Message - switch event := payload.(type) { - case *StartProxyPayload: - m = event.NewProxyMsg - case *CloseProxyPayload: - m = event.CloseProxyMsg + switch e := payload.(type) { + case *event.StartProxyPayload: + m = e.NewProxyMsg + case *event.CloseProxyPayload: + m = e.CloseProxyMsg default: - return ErrPayloadType + return event.ErrPayloadType } err := errors.PanicToError(func() { diff --git a/client/proxy_wrapper.go b/client/proxy/proxy_wrapper.go similarity index 90% rename from client/proxy_wrapper.go rename to client/proxy/proxy_wrapper.go index 059d1821..f95144c6 100644 --- a/client/proxy_wrapper.go +++ b/client/proxy/proxy_wrapper.go @@ -1,4 +1,4 @@ -package client +package proxy import ( "fmt" @@ -6,6 +6,8 @@ import ( "sync/atomic" "time" + "github.com/fatedier/frp/client/event" + "github.com/fatedier/frp/client/health" "github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/msg" "github.com/fatedier/frp/utils/log" @@ -48,10 +50,10 @@ type ProxyWrapper struct { // if ProxyConf has healcheck config // monitor will watch if it is alive - monitor *HealthCheckMonitor + monitor *health.HealthCheckMonitor // event handler - handler EventHandler + handler event.EventHandler health uint32 lastSendStartMsg time.Time @@ -63,7 +65,7 @@ type ProxyWrapper struct { log.Logger } -func NewProxyWrapper(cfg config.ProxyConf, eventHandler EventHandler, logPrefix string) *ProxyWrapper { +func NewProxyWrapper(cfg config.ProxyConf, eventHandler event.EventHandler, logPrefix string) *ProxyWrapper { baseInfo := cfg.GetBaseInfo() pw := &ProxyWrapper{ ProxyStatus: ProxyStatus{ @@ -81,7 +83,7 @@ func NewProxyWrapper(cfg config.ProxyConf, eventHandler EventHandler, logPrefix if baseInfo.HealthCheckType != "" { pw.health = 1 // means failed - pw.monitor = NewHealthCheckMonitor(baseInfo.HealthCheckType, baseInfo.HealthCheckIntervalS, + pw.monitor = health.NewHealthCheckMonitor(baseInfo.HealthCheckType, baseInfo.HealthCheckIntervalS, baseInfo.HealthCheckTimeoutS, baseInfo.HealthCheckMaxFailed, baseInfo.HealthCheckAddr, baseInfo.HealthCheckUrl, pw.statusNormalCallback, pw.statusFailedCallback) pw.monitor.SetLogger(pw.Logger) @@ -137,7 +139,7 @@ func (pw *ProxyWrapper) Stop() { } pw.Status = ProxyStatusClosed - pw.handler(EvCloseProxy, &CloseProxyPayload{ + pw.handler(event.EvCloseProxy, &event.CloseProxyPayload{ CloseProxyMsg: &msg.CloseProxy{ ProxyName: pw.Name, }, @@ -165,7 +167,7 @@ func (pw *ProxyWrapper) checkWorker() { var newProxyMsg msg.NewProxy pw.Cfg.MarshalToMsg(&newProxyMsg) pw.lastSendStartMsg = now - pw.handler(EvStartProxy, &StartProxyPayload{ + pw.handler(event.EvStartProxy, &event.StartProxyPayload{ NewProxyMsg: &newProxyMsg, }) } @@ -173,7 +175,7 @@ func (pw *ProxyWrapper) checkWorker() { } else { pw.mu.Lock() if pw.Status == ProxyStatusRunning || pw.Status == ProxyStatusWaitStart { - pw.handler(EvCloseProxy, &CloseProxyPayload{ + pw.handler(event.EvCloseProxy, &event.CloseProxyPayload{ CloseProxyMsg: &msg.CloseProxy{ ProxyName: pw.Name, }, diff --git a/tests/ci/normal_test.go b/tests/ci/normal_test.go index d76cc2df..24f5795a 100644 --- a/tests/ci/normal_test.go +++ b/tests/ci/normal_test.go @@ -12,7 +12,7 @@ import ( "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" - "github.com/fatedier/frp/client" + "github.com/fatedier/frp/client/proxy" "github.com/fatedier/frp/server/ports" "github.com/fatedier/frp/tests/consts" "github.com/fatedier/frp/tests/mock" @@ -218,31 +218,31 @@ func TestAllowPorts(t *testing.T) { // Port not allowed status, err := util.GetProxyStatus(consts.ADMIN_ADDR, consts.ADMIN_USER, consts.ADMIN_PWD, consts.ProxyTcpPortNotAllowed) if assert.NoError(err) { - assert.Equal(client.ProxyStatusStartErr, status.Status) + assert.Equal(proxy.ProxyStatusStartErr, status.Status) assert.True(strings.Contains(status.Err, ports.ErrPortNotAllowed.Error())) } status, err = util.GetProxyStatus(consts.ADMIN_ADDR, consts.ADMIN_USER, consts.ADMIN_PWD, consts.ProxyUdpPortNotAllowed) if assert.NoError(err) { - assert.Equal(client.ProxyStatusStartErr, status.Status) + assert.Equal(proxy.ProxyStatusStartErr, status.Status) assert.True(strings.Contains(status.Err, ports.ErrPortNotAllowed.Error())) } status, err = util.GetProxyStatus(consts.ADMIN_ADDR, consts.ADMIN_USER, consts.ADMIN_PWD, consts.ProxyTcpPortUnavailable) if assert.NoError(err) { - assert.Equal(client.ProxyStatusStartErr, status.Status) + assert.Equal(proxy.ProxyStatusStartErr, status.Status) assert.True(strings.Contains(status.Err, ports.ErrPortUnAvailable.Error())) } // Port normal status, err = util.GetProxyStatus(consts.ADMIN_ADDR, consts.ADMIN_USER, consts.ADMIN_PWD, consts.ProxyTcpPortNormal) if assert.NoError(err) { - assert.Equal(client.ProxyStatusRunning, status.Status) + assert.Equal(proxy.ProxyStatusRunning, status.Status) } status, err = util.GetProxyStatus(consts.ADMIN_ADDR, consts.ADMIN_USER, consts.ADMIN_PWD, consts.ProxyUdpPortNormal) if assert.NoError(err) { - assert.Equal(client.ProxyStatusRunning, status.Status) + assert.Equal(proxy.ProxyStatusRunning, status.Status) } } @@ -271,7 +271,7 @@ func TestPluginHttpProxy(t *testing.T) { assert := assert.New(t) status, err := util.GetProxyStatus(consts.ADMIN_ADDR, consts.ADMIN_USER, consts.ADMIN_PWD, consts.ProxyHttpProxy) if assert.NoError(err) { - assert.Equal(client.ProxyStatusRunning, status.Status) + assert.Equal(proxy.ProxyStatusRunning, status.Status) // http proxy addr := status.RemoteAddr @@ -299,7 +299,7 @@ func TestRangePortsMapping(t *testing.T) { name := fmt.Sprintf("%s_%d", consts.ProxyRangeTcpPrefix, i) status, err := util.GetProxyStatus(consts.ADMIN_ADDR, consts.ADMIN_USER, consts.ADMIN_PWD, name) if assert.NoError(err) { - assert.Equal(client.ProxyStatusRunning, status.Status) + assert.Equal(proxy.ProxyStatusRunning, status.Status) } } } From 146956ac6eda3e5afa3c0908c9fc36b44d1e2a13 Mon Sep 17 00:00:00 2001 From: fatedier Date: Sun, 9 Dec 2018 22:06:56 +0800 Subject: [PATCH 4/4] bump version to v0.22.0 --- utils/version/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/version/version.go b/utils/version/version.go index 120a1acd..0e85dc91 100644 --- a/utils/version/version.go +++ b/utils/version/version.go @@ -19,7 +19,7 @@ import ( "strings" ) -var version string = "0.21.0" +var version string = "0.22.0" func Full() string { return version