frp/client/visitor.go

554 lines
13 KiB
Go
Raw Normal View History

2017-06-25 21:02:33 +02:00
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
2017-10-24 16:53:20 +02:00
"bytes"
2019-10-12 14:13:12 +02:00
"context"
2017-10-24 12:20:07 +02:00
"fmt"
2017-06-25 21:02:33 +02:00
"io"
2019-03-05 04:18:17 +01:00
"io/ioutil"
2017-10-24 12:20:07 +02:00
"net"
2017-06-25 21:02:33 +02:00
"sync"
"time"
2020-09-23 07:49:14 +02:00
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/proto/udp"
frpNet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/xlog"
2018-05-07 20:13:30 +02:00
2020-04-22 15:37:45 +02:00
"github.com/fatedier/golib/errors"
2018-05-08 17:42:04 +02:00
frpIo "github.com/fatedier/golib/io"
2018-05-07 20:13:30 +02:00
"github.com/fatedier/golib/pool"
2019-03-05 04:18:17 +01:00
fmux "github.com/hashicorp/yamux"
2017-06-25 21:02:33 +02:00
)
2017-12-04 18:34:33 +01:00
// Visitor is used for forward traffics from local port tot remote service.
type Visitor interface {
2017-06-25 21:02:33 +02:00
Run() error
Close()
}
2019-10-12 14:13:12 +02:00
func NewVisitor(ctx context.Context, ctl *Control, cfg config.VisitorConf) (visitor Visitor) {
xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(cfg.GetBaseInfo().ProxyName)
2017-12-04 18:34:33 +01:00
baseVisitor := BaseVisitor{
2019-10-12 14:13:12 +02:00
ctl: ctl,
ctx: xlog.NewContext(ctx, xl),
2017-06-25 21:02:33 +02:00
}
switch cfg := cfg.(type) {
2020-05-24 11:48:37 +02:00
case *config.STCPVisitorConf:
visitor = &STCPVisitor{
2019-01-31 09:49:23 +01:00
BaseVisitor: &baseVisitor,
2017-12-04 18:34:33 +01:00
cfg: cfg,
2017-06-25 21:02:33 +02:00
}
2020-05-24 11:48:37 +02:00
case *config.XTCPVisitorConf:
visitor = &XTCPVisitor{
2019-01-31 09:49:23 +01:00
BaseVisitor: &baseVisitor,
2017-12-04 18:34:33 +01:00
cfg: cfg,
2017-10-24 12:20:07 +02:00
}
2020-05-24 11:48:37 +02:00
case *config.SUDPVisitorConf:
visitor = &SUDPVisitor{
2020-04-22 15:37:45 +02:00
BaseVisitor: &baseVisitor,
cfg: cfg,
checkCloseCh: make(chan struct{}),
}
2017-06-25 21:02:33 +02:00
}
return
}
2017-12-04 18:34:33 +01:00
type BaseVisitor struct {
2017-06-25 21:02:33 +02:00
ctl *Control
2019-10-12 14:13:12 +02:00
l net.Listener
2017-06-25 21:02:33 +02:00
closed bool
2019-10-12 14:13:12 +02:00
mu sync.RWMutex
ctx context.Context
2017-06-25 21:02:33 +02:00
}
2020-05-24 11:48:37 +02:00
type STCPVisitor struct {
2019-01-31 09:49:23 +01:00
*BaseVisitor
2017-06-25 21:02:33 +02:00
2020-05-24 11:48:37 +02:00
cfg *config.STCPVisitorConf
2017-06-25 21:02:33 +02:00
}
2020-05-24 11:48:37 +02:00
func (sv *STCPVisitor) Run() (err error) {
2019-10-12 14:13:12 +02:00
sv.l, err = net.Listen("tcp", fmt.Sprintf("%s:%d", sv.cfg.BindAddr, sv.cfg.BindPort))
2017-06-25 21:02:33 +02:00
if err != nil {
return
}
go sv.worker()
return
}
2020-05-24 11:48:37 +02:00
func (sv *STCPVisitor) Close() {
2017-06-25 21:02:33 +02:00
sv.l.Close()
}
2020-05-24 11:48:37 +02:00
func (sv *STCPVisitor) worker() {
2019-10-12 14:13:12 +02:00
xl := xlog.FromContextSafe(sv.ctx)
2017-06-25 21:02:33 +02:00
for {
conn, err := sv.l.Accept()
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Warn("stcp local listener closed")
2017-06-25 21:02:33 +02:00
return
}
go sv.handleConn(conn)
}
}
2020-05-24 11:48:37 +02:00
func (sv *STCPVisitor) handleConn(userConn net.Conn) {
2019-10-12 14:13:12 +02:00
xl := xlog.FromContextSafe(sv.ctx)
2017-06-25 21:02:33 +02:00
defer userConn.Close()
2019-10-12 14:13:12 +02:00
xl.Debug("get a new stcp user connection")
2017-12-04 18:34:33 +01:00
visitorConn, err := sv.ctl.connectServer()
2017-06-25 21:02:33 +02:00
if err != nil {
return
}
2017-12-04 18:34:33 +01:00
defer visitorConn.Close()
2017-06-25 21:02:33 +02:00
now := time.Now().Unix()
2017-12-04 18:34:33 +01:00
newVisitorConnMsg := &msg.NewVisitorConn{
2017-06-25 21:02:33 +02:00
ProxyName: sv.cfg.ServerName,
SignKey: util.GetAuthKey(sv.cfg.Sk, now),
Timestamp: now,
UseEncryption: sv.cfg.UseEncryption,
UseCompression: sv.cfg.UseCompression,
}
2017-12-04 18:34:33 +01:00
err = msg.WriteMsg(visitorConn, newVisitorConnMsg)
2017-06-25 21:02:33 +02:00
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Warn("send newVisitorConnMsg to server error: %v", err)
2017-06-25 21:02:33 +02:00
return
}
2017-12-04 18:34:33 +01:00
var newVisitorConnRespMsg msg.NewVisitorConnResp
visitorConn.SetReadDeadline(time.Now().Add(10 * time.Second))
err = msg.ReadMsgInto(visitorConn, &newVisitorConnRespMsg)
2017-06-25 21:02:33 +02:00
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Warn("get newVisitorConnRespMsg error: %v", err)
2017-06-25 21:02:33 +02:00
return
}
2017-12-04 18:34:33 +01:00
visitorConn.SetReadDeadline(time.Time{})
2017-06-25 21:02:33 +02:00
2017-12-04 18:34:33 +01:00
if newVisitorConnRespMsg.Error != "" {
2019-10-12 14:13:12 +02:00
xl.Warn("start new visitor connection error: %s", newVisitorConnRespMsg.Error)
2017-06-25 21:02:33 +02:00
return
}
var remote io.ReadWriteCloser
2017-12-04 18:34:33 +01:00
remote = visitorConn
2017-06-25 21:02:33 +02:00
if sv.cfg.UseEncryption {
remote, err = frpIo.WithEncryption(remote, []byte(sv.cfg.Sk))
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Error("create encryption stream error: %v", err)
2017-06-25 21:02:33 +02:00
return
}
}
if sv.cfg.UseCompression {
remote = frpIo.WithCompression(remote)
}
frpIo.Join(userConn, remote)
}
2017-10-24 12:20:07 +02:00
2020-05-24 11:48:37 +02:00
type XTCPVisitor struct {
2019-01-31 09:49:23 +01:00
*BaseVisitor
2017-10-24 12:20:07 +02:00
2020-05-24 11:48:37 +02:00
cfg *config.XTCPVisitorConf
2017-10-24 12:20:07 +02:00
}
2020-05-24 11:48:37 +02:00
func (sv *XTCPVisitor) Run() (err error) {
2019-10-12 14:13:12 +02:00
sv.l, err = net.Listen("tcp", fmt.Sprintf("%s:%d", sv.cfg.BindAddr, sv.cfg.BindPort))
2017-10-24 12:20:07 +02:00
if err != nil {
return
}
go sv.worker()
return
}
2020-05-24 11:48:37 +02:00
func (sv *XTCPVisitor) Close() {
2017-10-24 12:20:07 +02:00
sv.l.Close()
}
2020-05-24 11:48:37 +02:00
func (sv *XTCPVisitor) worker() {
2019-10-12 14:13:12 +02:00
xl := xlog.FromContextSafe(sv.ctx)
2017-10-24 12:20:07 +02:00
for {
conn, err := sv.l.Accept()
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Warn("xtcp local listener closed")
2017-10-24 12:20:07 +02:00
return
}
go sv.handleConn(conn)
}
}
2020-05-24 11:48:37 +02:00
func (sv *XTCPVisitor) handleConn(userConn net.Conn) {
2019-10-12 14:13:12 +02:00
xl := xlog.FromContextSafe(sv.ctx)
2017-10-24 12:20:07 +02:00
defer userConn.Close()
2019-10-12 14:13:12 +02:00
xl.Debug("get a new xtcp user connection")
if sv.ctl.serverUDPPort == 0 {
2019-10-12 14:13:12 +02:00
xl.Error("xtcp is not supported by server")
2017-10-24 12:20:07 +02:00
return
}
raddr, err := net.ResolveUDPAddr("udp",
fmt.Sprintf("%s:%d", sv.ctl.clientCfg.ServerAddr, sv.ctl.serverUDPPort))
2018-10-18 07:55:51 +02:00
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Error("resolve server UDP addr error")
2018-10-18 07:55:51 +02:00
return
}
2017-12-04 18:34:33 +01:00
visitorConn, err := net.DialUDP("udp", nil, raddr)
2018-10-18 07:55:51 +02:00
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Warn("dial server udp addr error: %v", err)
2018-10-18 07:55:51 +02:00
return
}
2017-12-04 18:34:33 +01:00
defer visitorConn.Close()
2017-10-24 12:20:07 +02:00
now := time.Now().Unix()
2017-12-04 18:34:33 +01:00
natHoleVisitorMsg := &msg.NatHoleVisitor{
2017-10-24 12:20:07 +02:00
ProxyName: sv.cfg.ServerName,
SignKey: util.GetAuthKey(sv.cfg.Sk, now),
Timestamp: now,
}
2017-12-04 18:34:33 +01:00
err = msg.WriteMsg(visitorConn, natHoleVisitorMsg)
2017-10-24 12:20:07 +02:00
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Warn("send natHoleVisitorMsg to server error: %v", err)
2017-10-24 12:20:07 +02:00
return
}
// Wait for client address at most 10 seconds.
2017-10-24 16:53:20 +02:00
var natHoleRespMsg msg.NatHoleResp
2017-12-04 18:34:33 +01:00
visitorConn.SetReadDeadline(time.Now().Add(10 * time.Second))
2017-10-24 16:53:20 +02:00
buf := pool.GetBuf(1024)
2017-12-04 18:34:33 +01:00
n, err := visitorConn.Read(buf)
2017-10-24 16:53:20 +02:00
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Warn("get natHoleRespMsg error: %v", err)
2017-10-24 16:53:20 +02:00
return
}
err = msg.ReadMsgInto(bytes.NewReader(buf[:n]), &natHoleRespMsg)
2017-10-24 12:20:07 +02:00
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Warn("get natHoleRespMsg error: %v", err)
2017-10-24 12:20:07 +02:00
return
}
2017-12-04 18:34:33 +01:00
visitorConn.SetReadDeadline(time.Time{})
2017-10-24 16:53:20 +02:00
pool.PutBuf(buf)
if natHoleRespMsg.Error != "" {
2019-10-12 14:13:12 +02:00
xl.Error("natHoleRespMsg get error info: %s", natHoleRespMsg.Error)
return
}
2019-10-12 14:13:12 +02:00
xl.Trace("get natHoleRespMsg, sid [%s], client address [%s], visitor address [%s]", natHoleRespMsg.Sid, natHoleRespMsg.ClientAddr, natHoleRespMsg.VisitorAddr)
2017-10-24 12:20:07 +02:00
2017-12-04 18:34:33 +01:00
// Close visitorConn, so we can use it's local address.
visitorConn.Close()
2017-10-24 12:20:07 +02:00
2019-03-11 08:53:58 +01:00
// send sid message to client
2017-12-04 18:34:33 +01:00
laddr, _ := net.ResolveUDPAddr("udp", visitorConn.LocalAddr().String())
2019-03-11 08:53:58 +01:00
daddr, err := net.ResolveUDPAddr("udp", natHoleRespMsg.ClientAddr)
2017-10-24 16:53:20 +02:00
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Error("resolve client udp address error: %v", err)
2017-10-24 16:53:20 +02:00
return
2017-10-24 12:20:07 +02:00
}
2019-03-11 08:53:58 +01:00
lConn, err := net.DialUDP("udp", laddr, daddr)
2017-12-05 15:26:53 +01:00
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Error("dial client udp address error: %v", err)
2017-12-05 15:26:53 +01:00
return
}
2019-03-03 16:44:44 +01:00
defer lConn.Close()
2019-03-11 08:53:58 +01:00
lConn.Write([]byte(natHoleRespMsg.Sid))
// read ack sid from client
2017-10-24 12:20:07 +02:00
sidBuf := pool.GetBuf(1024)
2019-03-11 08:53:58 +01:00
lConn.SetReadDeadline(time.Now().Add(8 * time.Second))
n, err = lConn.Read(sidBuf)
2017-10-24 12:20:07 +02:00
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Warn("get sid from client error: %v", err)
2017-10-24 12:20:07 +02:00
return
}
lConn.SetReadDeadline(time.Time{})
2017-10-24 16:53:20 +02:00
if string(sidBuf[:n]) != natHoleRespMsg.Sid {
2019-10-12 14:13:12 +02:00
xl.Warn("incorrect sid from client")
2017-10-24 12:20:07 +02:00
return
}
pool.PutBuf(sidBuf)
2019-10-12 14:13:12 +02:00
xl.Info("nat hole connection make success, sid [%s]", natHoleRespMsg.Sid)
2019-03-11 08:53:58 +01:00
// wrap kcp connection
2017-10-24 12:20:07 +02:00
var remote io.ReadWriteCloser
2020-05-24 11:48:37 +02:00
remote, err = frpNet.NewKCPConnFromUDP(lConn, true, natHoleRespMsg.ClientAddr)
2017-10-24 12:20:07 +02:00
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Error("create kcp connection from udp connection error: %v", err)
2017-10-24 12:20:07 +02:00
return
}
2019-03-05 04:18:17 +01:00
fmuxCfg := fmux.DefaultConfig()
fmuxCfg.KeepAliveInterval = 5 * time.Second
fmuxCfg.LogOutput = ioutil.Discard
sess, err := fmux.Client(remote, fmuxCfg)
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Error("create yamux session error: %v", err)
2019-03-05 04:18:17 +01:00
return
}
defer sess.Close()
muxConn, err := sess.Open()
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Error("open yamux stream error: %v", err)
2019-03-05 04:18:17 +01:00
return
}
var muxConnRWCloser io.ReadWriteCloser = muxConn
if sv.cfg.UseEncryption {
muxConnRWCloser, err = frpIo.WithEncryption(muxConnRWCloser, []byte(sv.cfg.Sk))
if err != nil {
2019-10-12 14:13:12 +02:00
xl.Error("create encryption stream error: %v", err)
return
}
}
if sv.cfg.UseCompression {
muxConnRWCloser = frpIo.WithCompression(muxConnRWCloser)
}
frpIo.Join(userConn, muxConnRWCloser)
2019-10-12 14:13:12 +02:00
xl.Debug("join connections closed")
2017-10-24 12:20:07 +02:00
}
2020-04-22 15:37:45 +02:00
2020-05-24 11:48:37 +02:00
type SUDPVisitor struct {
2020-04-22 15:37:45 +02:00
*BaseVisitor
checkCloseCh chan struct{}
// udpConn is the listener of udp packet
udpConn *net.UDPConn
2020-05-24 11:48:37 +02:00
readCh chan *msg.UDPPacket
sendCh chan *msg.UDPPacket
2020-04-22 15:37:45 +02:00
2020-05-24 11:48:37 +02:00
cfg *config.SUDPVisitorConf
2020-04-22 15:37:45 +02:00
}
// SUDP Run start listen a udp port
2020-05-24 11:48:37 +02:00
func (sv *SUDPVisitor) Run() (err error) {
2020-04-22 15:37:45 +02:00
xl := xlog.FromContextSafe(sv.ctx)
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", sv.cfg.BindAddr, sv.cfg.BindPort))
if err != nil {
return fmt.Errorf("sudp ResolveUDPAddr error: %v", err)
}
sv.udpConn, err = net.ListenUDP("udp", addr)
if err != nil {
return fmt.Errorf("listen udp port %s error: %v", addr.String(), err)
}
2020-05-24 11:48:37 +02:00
sv.sendCh = make(chan *msg.UDPPacket, 1024)
sv.readCh = make(chan *msg.UDPPacket, 1024)
2020-04-22 15:37:45 +02:00
xl.Info("sudp start to work")
go sv.dispatcher()
2020-05-24 11:48:37 +02:00
go udp.ForwardUserConn(sv.udpConn, sv.readCh, sv.sendCh, int(sv.ctl.clientCfg.UDPPacketSize))
2020-04-22 15:37:45 +02:00
return
}
2020-05-24 11:48:37 +02:00
func (sv *SUDPVisitor) dispatcher() {
2020-04-22 15:37:45 +02:00
xl := xlog.FromContextSafe(sv.ctx)
for {
// loop for get frpc to frps tcp conn
// setup worker
// wait worker to finished
// retry or exit
visitorConn, err := sv.getNewVisitorConn()
if err != nil {
// check if proxy is closed
// if checkCloseCh is close, we will return, other case we will continue to reconnect
select {
case <-sv.checkCloseCh:
xl.Info("frpc sudp visitor proxy is closed")
return
default:
}
time.Sleep(3 * time.Second)
xl.Warn("newVisitorConn to frps error: %v, try to reconnect", err)
continue
}
sv.worker(visitorConn)
select {
case <-sv.checkCloseCh:
return
default:
}
}
}
2020-05-24 11:48:37 +02:00
func (sv *SUDPVisitor) worker(workConn net.Conn) {
2020-04-22 15:37:45 +02:00
xl := xlog.FromContextSafe(sv.ctx)
xl.Debug("starting sudp proxy worker")
wg := &sync.WaitGroup{}
wg.Add(2)
closeCh := make(chan struct{})
// udp service -> frpc -> frps -> frpc visitor -> user
workConnReaderFn := func(conn net.Conn) {
defer func() {
conn.Close()
close(closeCh)
wg.Done()
}()
for {
var (
rawMsg msg.Message
errRet error
)
// frpc will send heartbeat in workConn to frpc visitor for keeping alive
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil {
xl.Warn("read from workconn for user udp conn error: %v", errRet)
return
}
conn.SetReadDeadline(time.Time{})
switch m := rawMsg.(type) {
case *msg.Ping:
xl.Debug("frpc visitor get ping message from frpc")
continue
2020-05-24 11:48:37 +02:00
case *msg.UDPPacket:
2020-04-22 15:37:45 +02:00
if errRet := errors.PanicToError(func() {
sv.readCh <- m
xl.Trace("frpc visitor get udp packet from frpc")
}); errRet != nil {
xl.Info("reader goroutine for udp work connection closed")
return
}
}
}
}
// udp service <- frpc <- frps <- frpc visitor <- user
workConnSenderFn := func(conn net.Conn) {
defer func() {
conn.Close()
wg.Done()
}()
var errRet error
for {
select {
case udpMsg, ok := <-sv.sendCh:
if !ok {
xl.Info("sender goroutine for udp work connection closed")
return
}
if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil {
xl.Warn("sender goroutine for udp work connection closed: %v", errRet)
return
}
case <-closeCh:
return
}
}
}
go workConnReaderFn(workConn)
go workConnSenderFn(workConn)
wg.Wait()
xl.Info("sudp worker is closed")
}
2020-09-07 08:57:23 +02:00
func (sv *SUDPVisitor) getNewVisitorConn() (net.Conn, error) {
xl := xlog.FromContextSafe(sv.ctx)
visitorConn, err := sv.ctl.connectServer()
2020-04-22 15:37:45 +02:00
if err != nil {
return nil, fmt.Errorf("frpc connect frps error: %v", err)
}
now := time.Now().Unix()
newVisitorConnMsg := &msg.NewVisitorConn{
ProxyName: sv.cfg.ServerName,
SignKey: util.GetAuthKey(sv.cfg.Sk, now),
Timestamp: now,
UseEncryption: sv.cfg.UseEncryption,
UseCompression: sv.cfg.UseCompression,
}
err = msg.WriteMsg(visitorConn, newVisitorConnMsg)
if err != nil {
return nil, fmt.Errorf("frpc send newVisitorConnMsg to frps error: %v", err)
}
var newVisitorConnRespMsg msg.NewVisitorConnResp
visitorConn.SetReadDeadline(time.Now().Add(10 * time.Second))
err = msg.ReadMsgInto(visitorConn, &newVisitorConnRespMsg)
if err != nil {
return nil, fmt.Errorf("frpc read newVisitorConnRespMsg error: %v", err)
}
visitorConn.SetReadDeadline(time.Time{})
if newVisitorConnRespMsg.Error != "" {
return nil, fmt.Errorf("start new visitor connection error: %s", newVisitorConnRespMsg.Error)
}
2020-09-07 08:57:23 +02:00
var remote io.ReadWriteCloser
remote = visitorConn
if sv.cfg.UseEncryption {
remote, err = frpIo.WithEncryption(remote, []byte(sv.cfg.Sk))
if err != nil {
xl.Error("create encryption stream error: %v", err)
return nil, err
}
}
if sv.cfg.UseCompression {
remote = frpIo.WithCompression(remote)
}
return frpNet.WrapReadWriteCloserToConn(remote, visitorConn), nil
2020-04-22 15:37:45 +02:00
}
2020-05-24 11:48:37 +02:00
func (sv *SUDPVisitor) Close() {
2020-04-22 15:37:45 +02:00
sv.mu.Lock()
defer sv.mu.Unlock()
select {
case <-sv.checkCloseCh:
return
default:
close(sv.checkCloseCh)
}
if sv.udpConn != nil {
sv.udpConn.Close()
}
close(sv.readCh)
close(sv.sendCh)
}