From ee8786a6b33d14634b40e6866db94aefe1441b3d Mon Sep 17 00:00:00 2001 From: fatedier Date: Thu, 19 May 2016 20:50:19 +0800 Subject: [PATCH] models/server: fix one possible error when frps accept too many user connections in a short time --- src/frp/cmd/frps/control.go | 2 +- src/frp/models/server/server.go | 113 +++++++++++++++----------------- 2 files changed, 53 insertions(+), 62 deletions(-) diff --git a/src/frp/cmd/frps/control.go b/src/frp/cmd/frps/control.go index 7b8bed7a..08da7feb 100644 --- a/src/frp/cmd/frps/control.go +++ b/src/frp/cmd/frps/control.go @@ -243,7 +243,7 @@ func doLogin(req *msg.ControlReq, c *conn.Conn) (ret int64, info string) { return } // the connection will close after join over - s.RecvNewWorkConn(c) + s.RegisterNewWorkConn(c) } else { info = fmt.Sprintf("Unsupport login message type [%d]", req.Type) log.Warn("Unsupport login message type [%d]", req.Type) diff --git a/src/frp/models/server/server.go b/src/frp/models/server/server.go index adcbaecf..1e8f9bde 100644 --- a/src/frp/models/server/server.go +++ b/src/frp/models/server/server.go @@ -15,7 +15,7 @@ package server import ( - "container/list" + "fmt" "sync" "time" @@ -42,15 +42,13 @@ type ProxyServer struct { listeners []Listener // accept new connection from remote users ctlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel workConnChan chan *conn.Conn // get new work conns from control goroutine - userConnList *list.List // store user conns mutex sync.Mutex } func (p *ProxyServer) Init() { p.Status = consts.Idle - p.workConnChan = make(chan *conn.Conn) + p.workConnChan = make(chan *conn.Conn, 100) p.ctlMsgChan = make(chan int64) - p.userConnList = list.New() p.listeners = make([]Listener, 0) } @@ -96,74 +94,34 @@ func (p *ProxyServer) Start() (err error) { } log.Debug("ProxyName [%s], get one new user conn [%s]", p.Name, c.GetRemoteAddr()) - // insert into list - p.Lock() if p.Status != consts.Working { log.Debug("ProxyName [%s] is not working, new user conn close", p.Name) c.Close() - p.Unlock() return } - p.userConnList.PushBack(c) - p.Unlock() - // put msg to control conn - p.ctlMsgChan <- 1 - - // set timeout - time.AfterFunc(time.Duration(UserConnTimeout)*time.Second, func() { - p.Lock() - element := p.userConnList.Front() - p.Unlock() - if element == nil { + // start another goroutine for join two conns from frpc and user + go func() { + workConn, err := p.getWorkConn() + if err != nil { return } - userConn := element.Value.(*conn.Conn) - if userConn == c { - log.Warn("ProxyName [%s], user conn [%s] timeout", p.Name, c.GetRemoteAddr()) - userConn.Close() + userConn := c + // msg will transfer to another without modifying + // l means local, r means remote + log.Debug("Join two connections, (l[%s] r[%s]) (l[%s] r[%s])", workConn.GetLocalAddr(), workConn.GetRemoteAddr(), + userConn.GetLocalAddr(), userConn.GetRemoteAddr()) + + if p.UseEncryption { + go conn.JoinMore(userConn, workConn, p.AuthToken) + } else { + go conn.Join(userConn, workConn) } - }) + }() } }(listener) } - - // start another goroutine for join two conns from frpc and user - go func() { - for { - workConn, ok := <-p.workConnChan - if !ok { - return - } - - p.Lock() - element := p.userConnList.Front() - - var userConn *conn.Conn - if element != nil { - userConn = element.Value.(*conn.Conn) - p.userConnList.Remove(element) - } else { - workConn.Close() - p.Unlock() - continue - } - p.Unlock() - - // msg will transfer to another without modifying - // l means local, r means remote - log.Debug("Join two connections, (l[%s] r[%s]) (l[%s] r[%s])", workConn.GetLocalAddr(), workConn.GetRemoteAddr(), - userConn.GetLocalAddr(), userConn.GetRemoteAddr()) - - if p.UseEncryption { - go conn.JoinMore(userConn, workConn, p.AuthToken) - } else { - go conn.Join(userConn, workConn) - } - } - }() - return nil } @@ -180,7 +138,6 @@ func (p *ProxyServer) Close() { } close(p.ctlMsgChan) close(p.workConnChan) - p.userConnList = list.New() } p.Unlock() } @@ -195,6 +152,40 @@ func (p *ProxyServer) WaitUserConn() (closeFlag bool) { return } -func (p *ProxyServer) RecvNewWorkConn(c *conn.Conn) { +func (p *ProxyServer) RegisterNewWorkConn(c *conn.Conn) { p.workConnChan <- c } + +// when frps get one user connection, we get one work connection from the pool and return it +// if no workConn available in the pool, send message to frpc to get one or more +// and wait until it is available +// return an error if wait timeout +func (p *ProxyServer) getWorkConn() (workConn *conn.Conn, err error) { + var ok bool + + // get a work connection from the pool + select { + case workConn, ok = <-p.workConnChan: + if !ok { + err = fmt.Errorf("ProxyName [%s], no work connections available, control is closing", p.Name) + return + } + default: + // no work connections available in the poll, send message to frpc to get one + p.ctlMsgChan <- 1 + + select { + case workConn, ok = <-p.workConnChan: + if !ok { + err = fmt.Errorf("ProxyName [%s], no work connections available, control is closing", p.Name) + return + } + + case <-time.After(time.Duration(UserConnTimeout) * time.Second): + log.Warn("ProxyName [%s], timeout trying to get work connection", p.Name) + err = fmt.Errorf("ProxyName [%s], timeout trying to get work connection", p.Name) + return + } + } + return +}