frp/client/proxy/proxy.go

807 lines
20 KiB
Go
Raw Normal View History

2017-03-08 19:03:47 +01:00
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2018-12-09 15:06:22 +01:00
package proxy
2017-03-08 19:03:47 +01:00
import (
2017-10-24 16:53:20 +02:00
"bytes"
2019-10-12 14:13:12 +02:00
"context"
2017-03-08 19:03:47 +01:00
"fmt"
"io"
2019-03-05 04:18:17 +01:00
"io/ioutil"
2017-03-12 19:44:47 +01:00
"net"
2019-03-11 08:53:58 +01:00
"strconv"
"strings"
2017-04-24 18:34:14 +02:00
"sync"
"time"
2017-03-08 19:03:47 +01:00
2020-09-23 07:49:14 +02:00
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/msg"
plugin "github.com/fatedier/frp/pkg/plugin/client"
"github.com/fatedier/frp/pkg/proto/udp"
"github.com/fatedier/frp/pkg/util/limit"
frpNet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/xlog"
2018-05-07 20:13:30 +02:00
"github.com/fatedier/golib/errors"
2018-05-08 17:42:04 +02:00
frpIo "github.com/fatedier/golib/io"
2018-05-07 20:13:30 +02:00
"github.com/fatedier/golib/pool"
2019-03-05 04:18:17 +01:00
fmux "github.com/hashicorp/yamux"
2019-03-29 12:01:18 +01:00
pp "github.com/pires/go-proxyproto"
2019-11-02 18:20:49 +01:00
"golang.org/x/time/rate"
2017-03-08 19:03:47 +01:00
)
2018-11-06 11:35:05 +01:00
// Proxy defines how to handle work connections for different proxy type.
2017-03-08 19:03:47 +01:00
type Proxy interface {
2017-03-11 19:03:24 +01:00
Run() error
2017-03-09 18:42:06 +01:00
// InWorkConn accept work connections registered to server.
2019-10-12 14:13:12 +02:00
InWorkConn(net.Conn, *msg.StartWorkConn)
2018-01-16 18:09:33 +01:00
2017-03-08 19:03:47 +01:00
Close()
}
2019-10-12 14:13:12 +02:00
func NewProxy(ctx context.Context, pxyConf config.ProxyConf, clientCfg config.ClientCommonConf, serverUDPPort int) (pxy Proxy) {
2019-11-02 18:20:49 +01:00
var limiter *rate.Limiter
2019-11-08 18:13:30 +01:00
limitBytes := pxyConf.GetBaseInfo().BandwidthLimit.Bytes()
2019-11-02 18:20:49 +01:00
if limitBytes > 0 {
limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes))
}
2017-03-12 19:44:47 +01:00
baseProxy := BaseProxy{
clientCfg: clientCfg,
serverUDPPort: serverUDPPort,
2019-11-02 18:20:49 +01:00
limiter: limiter,
2019-10-12 14:13:12 +02:00
xl: xlog.FromContextSafe(ctx),
ctx: ctx,
2017-03-12 19:44:47 +01:00
}
2017-03-08 19:03:47 +01:00
switch cfg := pxyConf.(type) {
2020-05-24 11:48:37 +02:00
case *config.TCPProxyConf:
pxy = &TCPProxy{
2019-01-31 09:49:23 +01:00
BaseProxy: &baseProxy,
2017-03-12 19:44:47 +01:00
cfg: cfg,
2017-03-08 19:03:47 +01:00
}
2020-05-24 11:48:37 +02:00
case *config.TCPMuxProxyConf:
pxy = &TCPMuxProxy{
BaseProxy: &baseProxy,
cfg: cfg,
}
2020-05-24 11:48:37 +02:00
case *config.UDPProxyConf:
pxy = &UDPProxy{
2019-01-31 09:49:23 +01:00
BaseProxy: &baseProxy,
2017-03-12 19:44:47 +01:00
cfg: cfg,
2017-03-08 19:03:47 +01:00
}
2020-05-24 11:48:37 +02:00
case *config.HTTPProxyConf:
pxy = &HTTPProxy{
2019-01-31 09:49:23 +01:00
BaseProxy: &baseProxy,
2017-03-12 19:44:47 +01:00
cfg: cfg,
2017-03-08 19:03:47 +01:00
}
2020-05-24 11:48:37 +02:00
case *config.HTTPSProxyConf:
pxy = &HTTPSProxy{
2019-01-31 09:49:23 +01:00
BaseProxy: &baseProxy,
2017-03-12 19:44:47 +01:00
cfg: cfg,
2017-03-08 19:03:47 +01:00
}
2020-05-24 11:48:37 +02:00
case *config.STCPProxyConf:
pxy = &STCPProxy{
2019-01-31 09:49:23 +01:00
BaseProxy: &baseProxy,
2017-06-25 21:02:33 +02:00
cfg: cfg,
}
2020-05-24 11:48:37 +02:00
case *config.XTCPProxyConf:
pxy = &XTCPProxy{
2019-01-31 09:49:23 +01:00
BaseProxy: &baseProxy,
2017-10-24 12:20:07 +02:00
cfg: cfg,
}
2020-05-24 11:48:37 +02:00
case *config.SUDPProxyConf:
pxy = &SUDPProxy{
2020-04-22 15:37:45 +02:00
BaseProxy: &baseProxy,
cfg: cfg,
closeCh: make(chan struct{}),
}
2017-03-08 19:03:47 +01:00
}
return
}
2017-03-12 19:44:47 +01:00
type BaseProxy struct {
closed bool
clientCfg config.ClientCommonConf
serverUDPPort int
2019-11-02 18:20:49 +01:00
limiter *rate.Limiter
2019-10-12 14:13:12 +02:00
mu sync.RWMutex
xl *xlog.Logger
ctx context.Context
2017-03-12 19:44:47 +01:00
}
2017-03-08 19:03:47 +01:00
// TCP
2020-05-24 11:48:37 +02:00
type TCPProxy struct {
2019-01-31 09:49:23 +01:00
*BaseProxy
2017-03-12 19:44:47 +01:00
2020-05-24 11:48:37 +02:00
cfg *config.TCPProxyConf
proxyPlugin plugin.Plugin
2017-03-08 19:03:47 +01:00
}
2020-05-24 11:48:37 +02:00
func (pxy *TCPProxy) Run() (err error) {
if pxy.cfg.Plugin != "" {
pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
if err != nil {
return
}
}
2017-03-11 19:03:24 +01:00
return
2017-03-08 19:03:47 +01:00
}
2020-05-24 11:48:37 +02:00
func (pxy *TCPProxy) Close() {
if pxy.proxyPlugin != nil {
pxy.proxyPlugin.Close()
}
2017-03-08 19:03:47 +01:00
}
2020-05-24 11:48:37 +02:00
func (pxy *TCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter,
2019-11-02 18:20:49 +01:00
conn, []byte(pxy.clientCfg.Token), m)
2017-03-08 19:03:47 +01:00
}
// TCP Multiplexer
2020-05-24 11:48:37 +02:00
type TCPMuxProxy struct {
*BaseProxy
2020-05-24 11:48:37 +02:00
cfg *config.TCPMuxProxyConf
proxyPlugin plugin.Plugin
}
2020-05-24 11:48:37 +02:00
func (pxy *TCPMuxProxy) Run() (err error) {
if pxy.cfg.Plugin != "" {
pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
if err != nil {
return
}
}
return
}
2020-05-24 11:48:37 +02:00
func (pxy *TCPMuxProxy) Close() {
if pxy.proxyPlugin != nil {
pxy.proxyPlugin.Close()
}
}
2020-05-24 11:48:37 +02:00
func (pxy *TCPMuxProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter,
conn, []byte(pxy.clientCfg.Token), m)
}
2017-03-08 19:03:47 +01:00
// HTTP
2020-05-24 11:48:37 +02:00
type HTTPProxy struct {
2019-01-31 09:49:23 +01:00
*BaseProxy
2017-03-12 19:44:47 +01:00
2020-05-24 11:48:37 +02:00
cfg *config.HTTPProxyConf
proxyPlugin plugin.Plugin
2017-03-08 19:03:47 +01:00
}
2020-05-24 11:48:37 +02:00
func (pxy *HTTPProxy) Run() (err error) {
if pxy.cfg.Plugin != "" {
pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
if err != nil {
return
}
}
2017-03-11 19:03:24 +01:00
return
2017-03-08 19:03:47 +01:00
}
2020-05-24 11:48:37 +02:00
func (pxy *HTTPProxy) Close() {
if pxy.proxyPlugin != nil {
pxy.proxyPlugin.Close()
}
2017-03-08 19:03:47 +01:00
}
2020-05-24 11:48:37 +02:00
func (pxy *HTTPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter,
2019-11-02 18:20:49 +01:00
conn, []byte(pxy.clientCfg.Token), m)
2017-03-08 19:03:47 +01:00
}
// HTTPS
2020-05-24 11:48:37 +02:00
type HTTPSProxy struct {
2019-01-31 09:49:23 +01:00
*BaseProxy
2017-03-12 19:44:47 +01:00
2020-05-24 11:48:37 +02:00
cfg *config.HTTPSProxyConf
proxyPlugin plugin.Plugin
2017-03-08 19:03:47 +01:00
}
2020-05-24 11:48:37 +02:00
func (pxy *HTTPSProxy) Run() (err error) {
if pxy.cfg.Plugin != "" {
pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
if err != nil {
return
}
}
2017-03-11 19:03:24 +01:00
return
2017-03-08 19:03:47 +01:00
}
2020-05-24 11:48:37 +02:00
func (pxy *HTTPSProxy) Close() {
if pxy.proxyPlugin != nil {
pxy.proxyPlugin.Close()
}
2017-03-08 19:03:47 +01:00
}
2020-05-24 11:48:37 +02:00
func (pxy *HTTPSProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter,
2019-11-02 18:20:49 +01:00
conn, []byte(pxy.clientCfg.Token), m)
2017-03-09 19:01:17 +01:00
}
2017-06-25 21:02:33 +02:00
// STCP
2020-05-24 11:48:37 +02:00
type STCPProxy struct {
2019-01-31 09:49:23 +01:00
*BaseProxy
2017-06-25 21:02:33 +02:00
2020-05-24 11:48:37 +02:00
cfg *config.STCPProxyConf
2017-06-25 21:02:33 +02:00
proxyPlugin plugin.Plugin
}
2020-05-24 11:48:37 +02:00
func (pxy *STCPProxy) Run() (err error) {
2017-06-25 21:02:33 +02:00
if pxy.cfg.Plugin != "" {
pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
if err != nil {
return
}
}
return
}
2020-05-24 11:48:37 +02:00
func (pxy *STCPProxy) Close() {
2017-06-25 21:02:33 +02:00
if pxy.proxyPlugin != nil {
pxy.proxyPlugin.Close()
}
}
2020-05-24 11:48:37 +02:00
func (pxy *STCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter,
2019-11-02 18:20:49 +01:00
conn, []byte(pxy.clientCfg.Token), m)
2017-06-25 21:02:33 +02:00
}
2017-10-24 12:20:07 +02:00
// XTCP
2020-05-24 11:48:37 +02:00
type XTCPProxy struct {
2019-01-31 09:49:23 +01:00
*BaseProxy
2017-10-24 12:20:07 +02:00
2020-05-24 11:48:37 +02:00
cfg *config.XTCPProxyConf
2017-10-24 12:20:07 +02:00
proxyPlugin plugin.Plugin
}
2020-05-24 11:48:37 +02:00
func (pxy *XTCPProxy) Run() (err error) {
2017-10-24 12:20:07 +02:00
if pxy.cfg.Plugin != "" {
pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
if err != nil {
return
}
}
return
}
2020-05-24 11:48:37 +02:00
func (pxy *XTCPProxy) Close() {
2017-10-24 12:20:07 +02:00
if pxy.proxyPlugin != nil {
pxy.proxyPlugin.Close()
}
}
2020-05-24 11:48:37 +02:00
func (pxy *XTCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
2019-10-12 14:13:12 +02:00
xl := pxy.xl
2017-10-24 12:20:07 +02:00
defer conn.Close()
var natHoleSidMsg msg.NatHoleSid
err := msg.ReadMsgInto(conn, &natHoleSidMsg)
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Error("xtcp read from workConn error: %v", err)
2017-10-24 12:20:07 +02:00
return
}
natHoleClientMsg := &msg.NatHoleClient{
ProxyName: pxy.cfg.ProxyName,
Sid: natHoleSidMsg.Sid,
}
raddr, _ := net.ResolveUDPAddr("udp",
fmt.Sprintf("%s:%d", pxy.clientCfg.ServerAddr, pxy.serverUDPPort))
2017-10-24 12:20:07 +02:00
clientConn, err := net.DialUDP("udp", nil, raddr)
defer clientConn.Close()
err = msg.WriteMsg(clientConn, natHoleClientMsg)
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Error("send natHoleClientMsg to server error: %v", err)
2017-10-24 12:20:07 +02:00
return
}
2017-10-24 16:53:20 +02:00
// Wait for client address at most 5 seconds.
2017-10-24 12:20:07 +02:00
var natHoleRespMsg msg.NatHoleResp
2017-10-24 16:53:20 +02:00
clientConn.SetReadDeadline(time.Now().Add(5 * time.Second))
buf := pool.GetBuf(1024)
n, err := clientConn.Read(buf)
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Error("get natHoleRespMsg error: %v", err)
2017-10-24 16:53:20 +02:00
return
}
err = msg.ReadMsgInto(bytes.NewReader(buf[:n]), &natHoleRespMsg)
2017-10-24 12:20:07 +02:00
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Error("get natHoleRespMsg error: %v", err)
2017-10-24 12:20:07 +02:00
return
}
clientConn.SetReadDeadline(time.Time{})
clientConn.Close()
if natHoleRespMsg.Error != "" {
2019-10-12 14:13:12 +02:00
xl.Error("natHoleRespMsg get error info: %s", natHoleRespMsg.Error)
return
}
2019-10-12 14:13:12 +02:00
xl.Trace("get natHoleRespMsg, sid [%s], client address [%s] visitor address [%s]", natHoleRespMsg.Sid, natHoleRespMsg.ClientAddr, natHoleRespMsg.VisitorAddr)
2017-10-24 12:20:07 +02:00
2019-03-11 08:53:58 +01:00
// Send detect message
array := strings.Split(natHoleRespMsg.VisitorAddr, ":")
if len(array) <= 1 {
2019-10-12 14:13:12 +02:00
xl.Error("get NatHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr)
2019-03-11 08:53:58 +01:00
}
2017-10-24 12:20:07 +02:00
laddr, _ := net.ResolveUDPAddr("udp", clientConn.LocalAddr().String())
2019-03-11 08:53:58 +01:00
/*
for i := 1000; i < 65000; i++ {
pxy.sendDetectMsg(array[0], int64(i), laddr, "a")
}
*/
port, err := strconv.ParseInt(array[1], 10, 64)
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Error("get natHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr)
2019-03-11 08:53:58 +01:00
return
}
pxy.sendDetectMsg(array[0], int(port), laddr, []byte(natHoleRespMsg.Sid))
2019-10-12 14:13:12 +02:00
xl.Trace("send all detect msg done")
2019-03-11 08:53:58 +01:00
msg.WriteMsg(conn, &msg.NatHoleClientDetectOK{})
// Listen for clientConn's address and wait for visitor connection
lConn, err := net.ListenUDP("udp", laddr)
2017-10-24 12:20:07 +02:00
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Error("listen on visitorConn's local adress error: %v", err)
2017-10-24 12:20:07 +02:00
return
}
2019-03-11 08:53:58 +01:00
defer lConn.Close()
2017-10-24 12:20:07 +02:00
2019-03-11 08:53:58 +01:00
lConn.SetReadDeadline(time.Now().Add(8 * time.Second))
sidBuf := pool.GetBuf(1024)
var uAddr *net.UDPAddr
n, uAddr, err = lConn.ReadFromUDP(sidBuf)
2017-10-24 12:20:07 +02:00
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Warn("get sid from visitor error: %v", err)
2017-10-24 12:20:07 +02:00
return
}
2019-03-11 08:53:58 +01:00
lConn.SetReadDeadline(time.Time{})
if string(sidBuf[:n]) != natHoleRespMsg.Sid {
2019-10-12 14:13:12 +02:00
xl.Warn("incorrect sid from visitor")
2019-03-11 08:53:58 +01:00
return
}
pool.PutBuf(sidBuf)
2019-10-12 14:13:12 +02:00
xl.Info("nat hole connection make success, sid [%s]", natHoleRespMsg.Sid)
2019-03-11 08:53:58 +01:00
lConn.WriteToUDP(sidBuf[:n], uAddr)
2017-10-24 12:20:07 +02:00
2020-05-24 11:48:37 +02:00
kcpConn, err := frpNet.NewKCPConnFromUDP(lConn, false, uAddr.String())
2017-10-24 12:20:07 +02:00
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Error("create kcp connection from udp connection error: %v", err)
2017-10-24 12:20:07 +02:00
return
}
2019-03-05 04:18:17 +01:00
fmuxCfg := fmux.DefaultConfig()
fmuxCfg.KeepAliveInterval = 5 * time.Second
fmuxCfg.LogOutput = ioutil.Discard
sess, err := fmux.Server(kcpConn, fmuxCfg)
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Error("create yamux server from kcp connection error: %v", err)
2019-03-05 04:18:17 +01:00
return
}
defer sess.Close()
muxConn, err := sess.Accept()
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Error("accept for yamux connection error: %v", err)
2019-03-05 04:18:17 +01:00
return
}
2020-05-24 11:48:37 +02:00
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter,
2019-10-12 14:13:12 +02:00
muxConn, []byte(pxy.cfg.Sk), m)
2017-10-24 12:20:07 +02:00
}
2020-05-24 11:48:37 +02:00
func (pxy *XTCPProxy) sendDetectMsg(addr string, port int, laddr *net.UDPAddr, content []byte) (err error) {
2019-03-11 08:53:58 +01:00
daddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", addr, port))
if err != nil {
return err
}
tConn, err := net.DialUDP("udp", laddr, daddr)
if err != nil {
return err
}
//uConn := ipv4.NewConn(tConn)
//uConn.SetTTL(3)
tConn.Write(content)
tConn.Close()
return nil
}
2017-03-09 19:01:17 +01:00
// UDP
2020-05-24 11:48:37 +02:00
type UDPProxy struct {
2019-01-31 09:49:23 +01:00
*BaseProxy
2017-03-12 19:44:47 +01:00
2020-05-24 11:48:37 +02:00
cfg *config.UDPProxyConf
2017-03-12 19:44:47 +01:00
localAddr *net.UDPAddr
2020-05-24 11:48:37 +02:00
readCh chan *msg.UDPPacket
2020-05-24 11:48:37 +02:00
// include msg.UDPPacket and msg.Ping
sendCh chan msg.Message
2019-10-12 14:13:12 +02:00
workConn net.Conn
2017-03-09 19:01:17 +01:00
}
2020-05-24 11:48:37 +02:00
func (pxy *UDPProxy) Run() (err error) {
pxy.localAddr, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", pxy.cfg.LocalIP, pxy.cfg.LocalPort))
2017-03-12 19:44:47 +01:00
if err != nil {
return
}
2017-03-11 19:03:24 +01:00
return
2017-03-09 19:01:17 +01:00
}
2020-05-24 11:48:37 +02:00
func (pxy *UDPProxy) Close() {
2017-04-24 18:34:14 +02:00
pxy.mu.Lock()
defer pxy.mu.Unlock()
if !pxy.closed {
pxy.closed = true
if pxy.workConn != nil {
pxy.workConn.Close()
}
if pxy.readCh != nil {
close(pxy.readCh)
}
if pxy.sendCh != nil {
close(pxy.sendCh)
}
}
2017-03-09 19:01:17 +01:00
}
2020-05-24 11:48:37 +02:00
func (pxy *UDPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
2019-10-12 14:13:12 +02:00
xl := pxy.xl
xl.Info("incoming a new work connection for udp proxy, %s", conn.RemoteAddr().String())
2017-04-24 18:34:14 +02:00
// close resources releated with old workConn
pxy.Close()
2020-09-07 08:57:23 +02:00
var rwc io.ReadWriteCloser = conn
var err error
2019-11-02 18:20:49 +01:00
if pxy.limiter != nil {
2020-09-07 08:57:23 +02:00
rwc = frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
2019-11-02 18:20:49 +01:00
return conn.Close()
})
}
2020-09-07 08:57:23 +02:00
if pxy.cfg.UseEncryption {
rwc, err = frpIo.WithEncryption(rwc, []byte(pxy.clientCfg.Token))
if err != nil {
conn.Close()
xl.Error("create encryption stream error: %v", err)
return
}
}
if pxy.cfg.UseCompression {
rwc = frpIo.WithCompression(rwc)
}
conn = frpNet.WrapReadWriteCloserToConn(rwc, conn)
2019-11-02 18:20:49 +01:00
2017-04-24 18:34:14 +02:00
pxy.mu.Lock()
2017-03-12 19:44:47 +01:00
pxy.workConn = conn
2020-05-24 11:48:37 +02:00
pxy.readCh = make(chan *msg.UDPPacket, 1024)
pxy.sendCh = make(chan msg.Message, 1024)
2017-04-24 18:34:14 +02:00
pxy.closed = false
pxy.mu.Unlock()
2017-03-12 19:44:47 +01:00
2020-05-24 11:48:37 +02:00
workConnReaderFn := func(conn net.Conn, readCh chan *msg.UDPPacket) {
2017-03-12 19:44:47 +01:00
for {
2020-05-24 11:48:37 +02:00
var udpMsg msg.UDPPacket
2017-03-12 19:44:47 +01:00
if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil {
2019-10-12 14:13:12 +02:00
xl.Warn("read from workConn for udp error: %v", errRet)
2017-03-12 19:44:47 +01:00
return
}
if errRet := errors.PanicToError(func() {
2019-10-12 14:13:12 +02:00
xl.Trace("get udp package from workConn: %s", udpMsg.Content)
readCh <- &udpMsg
2017-03-12 19:44:47 +01:00
}); errRet != nil {
2019-10-12 14:13:12 +02:00
xl.Info("reader goroutine for udp work connection closed: %v", errRet)
2017-03-12 19:44:47 +01:00
return
}
}
}
workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) {
defer func() {
2019-10-12 14:13:12 +02:00
xl.Info("writer goroutine for udp work connection closed")
}()
2017-03-12 19:44:47 +01:00
var errRet error
for rawMsg := range sendCh {
switch m := rawMsg.(type) {
2020-05-24 11:48:37 +02:00
case *msg.UDPPacket:
2019-10-12 14:13:12 +02:00
xl.Trace("send udp package to workConn: %s", m.Content)
case *msg.Ping:
2019-10-12 14:13:12 +02:00
xl.Trace("send ping message to udp workConn")
}
if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil {
2019-10-12 14:13:12 +02:00
xl.Error("udp work write error: %v", errRet)
2017-03-12 19:44:47 +01:00
return
}
}
}
heartbeatFn := func(conn net.Conn, sendCh chan msg.Message) {
var errRet error
for {
time.Sleep(time.Duration(30) * time.Second)
if errRet = errors.PanicToError(func() {
sendCh <- &msg.Ping{}
}); errRet != nil {
2019-10-12 14:13:12 +02:00
xl.Trace("heartbeat goroutine for udp work connection closed")
2017-05-17 10:02:31 +02:00
break
}
}
}
2017-03-12 19:44:47 +01:00
go workConnSenderFn(pxy.workConn, pxy.sendCh)
go workConnReaderFn(pxy.workConn, pxy.readCh)
go heartbeatFn(pxy.workConn, pxy.sendCh)
2020-05-24 11:48:37 +02:00
udp.Forwarder(pxy.localAddr, pxy.readCh, pxy.sendCh, int(pxy.clientCfg.UDPPacketSize))
2017-03-09 19:01:17 +01:00
}
2020-05-24 11:48:37 +02:00
type SUDPProxy struct {
2020-04-22 15:37:45 +02:00
*BaseProxy
2020-05-24 11:48:37 +02:00
cfg *config.SUDPProxyConf
2020-04-22 15:37:45 +02:00
localAddr *net.UDPAddr
closeCh chan struct{}
}
2020-05-24 11:48:37 +02:00
func (pxy *SUDPProxy) Run() (err error) {
pxy.localAddr, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", pxy.cfg.LocalIP, pxy.cfg.LocalPort))
2020-04-22 15:37:45 +02:00
if err != nil {
return
}
return
}
2020-05-24 11:48:37 +02:00
func (pxy *SUDPProxy) Close() {
2020-04-22 15:37:45 +02:00
pxy.mu.Lock()
defer pxy.mu.Unlock()
select {
case <-pxy.closeCh:
return
default:
close(pxy.closeCh)
}
}
2020-05-24 11:48:37 +02:00
func (pxy *SUDPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
2020-04-22 15:37:45 +02:00
xl := pxy.xl
xl.Info("incoming a new work connection for sudp proxy, %s", conn.RemoteAddr().String())
2020-09-07 08:57:23 +02:00
var rwc io.ReadWriteCloser = conn
var err error
2020-04-22 15:37:45 +02:00
if pxy.limiter != nil {
2020-09-07 08:57:23 +02:00
rwc = frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
2020-04-22 15:37:45 +02:00
return conn.Close()
})
}
2020-09-07 08:57:23 +02:00
if pxy.cfg.UseEncryption {
rwc, err = frpIo.WithEncryption(rwc, []byte(pxy.clientCfg.Token))
if err != nil {
conn.Close()
xl.Error("create encryption stream error: %v", err)
return
}
}
if pxy.cfg.UseCompression {
rwc = frpIo.WithCompression(rwc)
}
conn = frpNet.WrapReadWriteCloserToConn(rwc, conn)
2020-04-22 15:37:45 +02:00
workConn := conn
2020-05-24 11:48:37 +02:00
readCh := make(chan *msg.UDPPacket, 1024)
2020-04-22 15:37:45 +02:00
sendCh := make(chan msg.Message, 1024)
isClose := false
mu := &sync.Mutex{}
closeFn := func() {
mu.Lock()
defer mu.Unlock()
if isClose {
return
}
isClose = true
if workConn != nil {
workConn.Close()
}
close(readCh)
close(sendCh)
}
// udp service <- frpc <- frps <- frpc visitor <- user
2020-05-24 11:48:37 +02:00
workConnReaderFn := func(conn net.Conn, readCh chan *msg.UDPPacket) {
2020-04-22 15:37:45 +02:00
defer closeFn()
for {
// first to check sudp proxy is closed or not
select {
case <-pxy.closeCh:
xl.Trace("frpc sudp proxy is closed")
return
default:
}
2020-05-24 11:48:37 +02:00
var udpMsg msg.UDPPacket
2020-04-22 15:37:45 +02:00
if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil {
xl.Warn("read from workConn for sudp error: %v", errRet)
return
}
if errRet := errors.PanicToError(func() {
readCh <- &udpMsg
}); errRet != nil {
xl.Warn("reader goroutine for sudp work connection closed: %v", errRet)
return
}
}
}
// udp service -> frpc -> frps -> frpc visitor -> user
workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) {
defer func() {
closeFn()
xl.Info("writer goroutine for sudp work connection closed")
}()
var errRet error
for rawMsg := range sendCh {
switch m := rawMsg.(type) {
2020-05-24 11:48:37 +02:00
case *msg.UDPPacket:
2020-04-22 15:37:45 +02:00
xl.Trace("frpc send udp package to frpc visitor, [udp local: %v, remote: %v], [tcp work conn local: %v, remote: %v]",
m.LocalAddr.String(), m.RemoteAddr.String(), conn.LocalAddr().String(), conn.RemoteAddr().String())
case *msg.Ping:
xl.Trace("frpc send ping message to frpc visitor")
}
if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil {
xl.Error("sudp work write error: %v", errRet)
return
}
}
}
heartbeatFn := func(conn net.Conn, sendCh chan msg.Message) {
ticker := time.NewTicker(30 * time.Second)
defer func() {
ticker.Stop()
closeFn()
}()
var errRet error
for {
select {
case <-ticker.C:
if errRet = errors.PanicToError(func() {
sendCh <- &msg.Ping{}
}); errRet != nil {
xl.Warn("heartbeat goroutine for sudp work connection closed")
return
}
case <-pxy.closeCh:
xl.Trace("frpc sudp proxy is closed")
return
}
}
}
go workConnSenderFn(workConn, sendCh)
go workConnReaderFn(workConn, readCh)
go heartbeatFn(workConn, sendCh)
2020-05-24 11:48:37 +02:00
udp.Forwarder(pxy.localAddr, readCh, sendCh, int(pxy.clientCfg.UDPPacketSize))
2020-04-22 15:37:45 +02:00
}
2017-03-09 19:01:17 +01:00
// Common handler for tcp work connections.
2020-05-24 11:48:37 +02:00
func HandleTCPWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin,
2019-11-02 18:20:49 +01:00
baseInfo *config.BaseProxyConf, limiter *rate.Limiter, workConn net.Conn, encKey []byte, m *msg.StartWorkConn) {
2019-10-12 14:13:12 +02:00
xl := xlog.FromContextSafe(ctx)
var (
remote io.ReadWriteCloser
err error
)
2017-03-09 19:01:17 +01:00
remote = workConn
2019-11-02 18:20:49 +01:00
if limiter != nil {
remote = frpIo.WrapReadWriteCloser(limit.NewReader(workConn, limiter), limit.NewWriter(workConn, limiter), func() error {
return workConn.Close()
})
}
2018-01-23 09:31:59 +01:00
2019-10-12 14:13:12 +02:00
xl.Trace("handle tcp work connection, use_encryption: %t, use_compression: %t",
baseInfo.UseEncryption, baseInfo.UseCompression)
2017-03-09 19:01:17 +01:00
if baseInfo.UseEncryption {
2017-10-24 20:49:56 +02:00
remote, err = frpIo.WithEncryption(remote, encKey)
2017-03-09 19:01:17 +01:00
if err != nil {
2018-01-23 09:31:59 +01:00
workConn.Close()
2019-10-12 14:13:12 +02:00
xl.Error("create encryption stream error: %v", err)
2017-03-09 19:01:17 +01:00
return
}
}
if baseInfo.UseCompression {
2017-06-06 12:48:40 +02:00
remote = frpIo.WithCompression(remote)
2017-03-09 19:01:17 +01:00
}
// check if we need to send proxy protocol info
var extraInfo []byte
if baseInfo.ProxyProtocolVersion != "" {
if m.SrcAddr != "" && m.SrcPort != 0 {
if m.DstAddr == "" {
m.DstAddr = "127.0.0.1"
}
h := &pp.Header{
Command: pp.PROXY,
SourceAddress: net.ParseIP(m.SrcAddr),
SourcePort: m.SrcPort,
DestinationAddress: net.ParseIP(m.DstAddr),
DestinationPort: m.DstPort,
}
if strings.Contains(m.SrcAddr, ".") {
h.TransportProtocol = pp.TCPv4
} else {
h.TransportProtocol = pp.TCPv6
}
if baseInfo.ProxyProtocolVersion == "v1" {
h.Version = 1
} else if baseInfo.ProxyProtocolVersion == "v2" {
h.Version = 2
}
buf := bytes.NewBuffer(nil)
h.WriteTo(buf)
extraInfo = buf.Bytes()
}
}
if proxyPlugin != nil {
// if plugin is set, let plugin handle connections first
2019-10-12 14:13:12 +02:00
xl.Debug("handle by plugin: %s", proxyPlugin.Name())
proxyPlugin.Handle(remote, workConn, extraInfo)
2019-10-12 14:13:12 +02:00
xl.Debug("handle by plugin finished")
return
2020-05-24 11:48:37 +02:00
}
2020-05-24 11:48:37 +02:00
localConn, err := frpNet.ConnectServer("tcp", fmt.Sprintf("%s:%d", localInfo.LocalIP, localInfo.LocalPort))
if err != nil {
workConn.Close()
xl.Error("connect to local service [%s:%d] error: %v", localInfo.LocalIP, localInfo.LocalPort, err)
return
}
2019-03-29 12:01:18 +01:00
2020-05-24 11:48:37 +02:00
xl.Debug("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(),
localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String())
2019-03-29 12:01:18 +01:00
2020-05-24 11:48:37 +02:00
if len(extraInfo) > 0 {
localConn.Write(extraInfo)
}
2020-05-24 11:48:37 +02:00
frpIo.Join(localConn, remote)
xl.Debug("join connections closed")
2017-03-08 19:03:47 +01:00
}