diff --git a/client/control.go b/client/control.go index 80dca9ad..d618680b 100644 --- a/client/control.go +++ b/client/control.go @@ -144,8 +144,8 @@ func (ctl *Control) NewWorkConn() { // dispatch this work connection to related proxy if pxy, ok := ctl.proxies[startMsg.ProxyName]; ok { - go pxy.InWorkConn(workConn) workConn.Info("start a new work connection") + go pxy.InWorkConn(workConn) } else { workConn.Close() } @@ -288,7 +288,7 @@ func (ctl *Control) manager() { } cfg, ok := ctl.pxyCfgs[m.ProxyName] if !ok { - // it will never go to this branch + // it will never go to this branch now ctl.Warn("[%s] no proxy conf found", m.ProxyName) continue } @@ -317,12 +317,12 @@ func (ctl *Control) controler() { maxDelayTime := 30 * time.Second delayTime := time.Second - checkInterval := 60 * time.Second + checkInterval := 30 * time.Second checkProxyTicker := time.NewTicker(checkInterval) for { select { case <-checkProxyTicker.C: - // Every 60 seconds, check which proxy registered failed and reregister it to server. + // Every 30 seconds, check which proxy registered failed and reregister it to server. for _, cfg := range ctl.pxyCfgs { if _, exist := ctl.proxies[cfg.GetName()]; !exist { ctl.Info("try to reregister proxy [%s]", cfg.GetName()) @@ -337,6 +337,10 @@ func (ctl *Control) controler() { // close related channels close(ctl.readCh) close(ctl.sendCh) + + for _, pxy := range ctl.proxies { + pxy.Close() + } time.Sleep(time.Second) // loop util reconnect to server success diff --git a/client/proxy.go b/client/proxy.go index d618e198..754f070c 100644 --- a/client/proxy.go +++ b/client/proxy.go @@ -18,6 +18,7 @@ import ( "fmt" "io" "net" + "sync" "github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/msg" @@ -69,7 +70,9 @@ func NewProxy(ctl *Control, pxyConf config.ProxyConf) (pxy Proxy) { } type BaseProxy struct { - ctl *Control + ctl *Control + closed bool + mu sync.RWMutex log.Logger } @@ -151,20 +154,34 @@ func (pxy *UdpProxy) Run() (err error) { } func (pxy *UdpProxy) Close() { - pxy.workConn.Close() - close(pxy.readCh) - close(pxy.sendCh) + pxy.mu.Lock() + defer pxy.mu.Unlock() + + if !pxy.closed { + pxy.closed = true + if pxy.workConn != nil { + pxy.workConn.Close() + } + if pxy.readCh != nil { + close(pxy.readCh) + } + if pxy.sendCh != nil { + close(pxy.sendCh) + } + } } func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn) { - if pxy.workConn != nil { - pxy.workConn.Close() - close(pxy.readCh) - close(pxy.sendCh) - } + pxy.Info("incoming a new work connection for udp proxy") + // close resources releated with old workConn + pxy.Close() + + pxy.mu.Lock() pxy.workConn = conn pxy.readCh = make(chan *msg.UdpPacket, 64) pxy.sendCh = make(chan *msg.UdpPacket, 64) + pxy.closed = false + pxy.mu.Unlock() workConnReaderFn := func(conn net.Conn) { for { @@ -174,9 +191,10 @@ func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn) { return } if errRet := errors.PanicToError(func() { + pxy.Trace("get udp package from workConn: %s", udpMsg.Content) pxy.readCh <- &udpMsg }); errRet != nil { - pxy.Info("reader goroutine for udp work connection closed") + pxy.Info("reader goroutine for udp work connection closed: %v", errRet) return } } @@ -184,6 +202,7 @@ func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn) { workConnSenderFn := func(conn net.Conn) { var errRet error for udpMsg := range pxy.sendCh { + pxy.Trace("send udp package to workConn: %s", udpMsg.Content) if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil { pxy.Info("sender goroutine for udp work connection closed") return diff --git a/conf/frpc.ini b/conf/frpc.ini index b93639d1..f32d35f1 100644 --- a/conf/frpc.ini +++ b/conf/frpc.ini @@ -11,7 +11,7 @@ server_port = 7000 # console or real logFile path like ./frpc.log log_file = ./frpc.log -# debug, info, warn, error +# trace, debug, info, warn, error log_level = info log_max_days = 3 diff --git a/conf/frps.ini b/conf/frps.ini index c19b46d9..cd8ca011 100644 --- a/conf/frps.ini +++ b/conf/frps.ini @@ -21,7 +21,7 @@ dashboard_pwd = admin # console or real logFile path like ./frps.log log_file = ./frps.log -# debug, info, warn, error +# trace, debug, info, warn, error log_level = info log_max_days = 3 diff --git a/models/proto/udp/udp.go b/models/proto/udp/udp.go index 062bdfbf..a7d42c16 100644 --- a/models/proto/udp/udp.go +++ b/models/proto/udp/udp.go @@ -51,22 +51,22 @@ func ForwardUserConn(udpConn *net.UDPConn, readCh <-chan *msg.UdpPacket, sendCh }() // write - go func() { - buf := pool.GetBuf(1500) - defer pool.PutBuf(buf) - for { - n, remoteAddr, err := udpConn.ReadFromUDP(buf) - if err != nil { - udpConn.Close() - return - } - udpMsg := NewUdpPacket(buf[:n], nil, remoteAddr) - select { - case sendCh <- udpMsg: - default: - } + buf := pool.GetBuf(1500) + defer pool.PutBuf(buf) + for { + n, remoteAddr, err := udpConn.ReadFromUDP(buf) + if err != nil { + udpConn.Close() + return } - }() + // buf[:n] will be encoded to string, so the bytes can be reused + udpMsg := NewUdpPacket(buf[:n], nil, remoteAddr) + select { + case sendCh <- udpMsg: + default: + } + } + return } func Forwarder(dstAddr *net.UDPAddr, readCh <-chan *msg.UdpPacket, sendCh chan<- *msg.UdpPacket) { diff --git a/server/control.go b/server/control.go index a0fe1861..9e772f33 100644 --- a/server/control.go +++ b/server/control.go @@ -258,6 +258,7 @@ func (ctl *Control) stoper() { ctl.writerShutdown.WaitDown() ctl.conn.Close() + ctl.readerShutdown.WaitDown() close(ctl.workConnCh) for workConn := range ctl.workConnCh { diff --git a/server/proxy.go b/server/proxy.go index 901bac91..62cd37c1 100644 --- a/server/proxy.go +++ b/server/proxy.go @@ -19,6 +19,7 @@ import ( "fmt" "io" "net" + "sync" "time" "github.com/fatedier/frp/models/config" @@ -45,6 +46,7 @@ type BaseProxy struct { name string ctl *Control listeners []frpNet.Listener + mu sync.RWMutex log.Logger } @@ -276,11 +278,23 @@ type UdpProxy struct { BaseProxy cfg *config.UdpProxyConf - udpConn *net.UDPConn - workConn net.Conn - sendCh chan *msg.UdpPacket - readCh chan *msg.UdpPacket + // udpConn is the listener of udp packages + udpConn *net.UDPConn + + // there are always only one workConn at the same time + // get another one if it closed + workConn net.Conn + + // sendCh is used for sending packages to workConn + sendCh chan *msg.UdpPacket + + // readCh is used for reading packages from workConn + readCh chan *msg.UdpPacket + + // checkCloseCh is used for watching if workConn is closed checkCloseCh chan int + + isClosed bool } func (pxy *UdpProxy) Run() (err error) { @@ -300,39 +314,49 @@ func (pxy *UdpProxy) Run() (err error) { pxy.readCh = make(chan *msg.UdpPacket, 64) pxy.checkCloseCh = make(chan int) + // read message from workConn, if it returns any error, notify proxy to start a new workConn workConnReaderFn := func(conn net.Conn) { for { var udpMsg msg.UdpPacket + pxy.Trace("loop waiting message from udp workConn") if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil { pxy.Warn("read from workConn for udp error: %v", errRet) conn.Close() // notify proxy to start a new work connection + // ignore error here, it means the proxy is closed errors.PanicToError(func() { pxy.checkCloseCh <- 1 }) return } if errRet := errors.PanicToError(func() { + pxy.Trace("get udp message from workConn: %s", udpMsg.Content) pxy.readCh <- &udpMsg StatsAddTrafficOut(pxy.GetName(), int64(len(udpMsg.Content))) }); errRet != nil { + conn.Close() pxy.Info("reader goroutine for udp work connection closed") return } } } + + // send message to workConn workConnSenderFn := func(conn net.Conn, ctx context.Context) { var errRet error for { select { case udpMsg, ok := <-pxy.sendCh: if !ok { + pxy.Info("sender goroutine for udp work condition closed") return } if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil { pxy.Info("sender goroutine for udp work connection closed: %v", errRet) + conn.Close() return } else { + pxy.Trace("send message to udp workConn: %s", udpMsg.Content) StatsAddTrafficIn(pxy.GetName(), int64(len(udpMsg.Content))) continue } @@ -344,12 +368,12 @@ func (pxy *UdpProxy) Run() (err error) { } go func() { + // Sleep a while for waiting control send the NewProxyResp to client. + time.Sleep(500 * time.Millisecond) for { - // Sleep a while for waiting control send the NewProxyResp to client. - time.Sleep(500 * time.Millisecond) workConn, err := pxy.GetWorkConnFromPool() if err != nil { - time.Sleep(5 * time.Second) + time.Sleep(1 * time.Second) // check if proxy is closed select { case _, ok := <-pxy.checkCloseCh: @@ -360,6 +384,10 @@ func (pxy *UdpProxy) Run() (err error) { } continue } + // close the old workConn and replac it with a new one + if pxy.workConn != nil { + pxy.workConn.Close() + } pxy.workConn = workConn ctx, cancel := context.WithCancel(context.Background()) go workConnReaderFn(workConn) @@ -372,10 +400,14 @@ func (pxy *UdpProxy) Run() (err error) { } }() - // Read from user connections and send wrapped udp message to sendCh. + // Read from user connections and send wrapped udp message to sendCh (forwarded by workConn). // Client will transfor udp message to local udp service and waiting for response for a while. - // Response will be wrapped to be transfored in work connection to server. - udp.ForwardUserConn(udpConn, pxy.readCh, pxy.sendCh) + // Response will be wrapped to be forwarded by work connection to server. + // Close readCh and sendCh at the end. + go func() { + udp.ForwardUserConn(udpConn, pxy.readCh, pxy.sendCh) + pxy.Close() + }() return nil } @@ -384,12 +416,20 @@ func (pxy *UdpProxy) GetConf() config.ProxyConf { } func (pxy *UdpProxy) Close() { - pxy.BaseProxy.Close() - pxy.workConn.Close() - pxy.udpConn.Close() - close(pxy.checkCloseCh) - close(pxy.readCh) - close(pxy.sendCh) + pxy.mu.Lock() + defer pxy.mu.Unlock() + if !pxy.isClosed { + pxy.isClosed = true + + pxy.BaseProxy.Close() + pxy.workConn.Close() + pxy.udpConn.Close() + + // all channels only closed here + close(pxy.checkCloseCh) + close(pxy.readCh) + close(pxy.sendCh) + } } // HandleUserTcpConnection is used for incoming tcp user connections. diff --git a/utils/log/log.go b/utils/log/log.go index 95192210..241ddb27 100644 --- a/utils/log/log.go +++ b/utils/log/log.go @@ -55,6 +55,8 @@ func SetLogLevel(logLevel string) { level = 6 case "debug": level = 7 + case "trace": + level = 8 default: level = 4 } @@ -79,6 +81,10 @@ func Debug(format string, v ...interface{}) { Log.Debug(format, v...) } +func Trace(format string, v ...interface{}) { + Log.Trace(format, v...) +} + // Logger type Logger interface { AddLogPrefix(string) @@ -88,6 +94,7 @@ type Logger interface { Warn(string, ...interface{}) Info(string, ...interface{}) Debug(string, ...interface{}) + Trace(string, ...interface{}) } type PrefixLogger struct { @@ -136,3 +143,7 @@ func (pl *PrefixLogger) Info(format string, v ...interface{}) { func (pl *PrefixLogger) Debug(format string, v ...interface{}) { Log.Debug(pl.prefix+format, v...) } + +func (pl *PrefixLogger) Trace(format string, v ...interface{}) { + Log.Trace(pl.prefix+format, v...) +}