mirror of
https://github.com/netbirdio/netbird.git
synced 2025-08-23 12:41:20 +02:00
refactor: reimplement the general flow
This commit is contained in:
@@ -6,7 +6,7 @@ import (
|
|||||||
"github.com/pion/ice/v2"
|
"github.com/pion/ice/v2"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"github.com/wiretrustee/wiretrustee/engine"
|
"github.com/wiretrustee/wiretrustee/connection"
|
||||||
sig "github.com/wiretrustee/wiretrustee/signal"
|
sig "github.com/wiretrustee/wiretrustee/signal"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -42,7 +42,7 @@ var (
|
|||||||
turnURL.Username = config.TurnUser
|
turnURL.Username = config.TurnUser
|
||||||
urls := []*ice.URL{turnURL, stunURL}
|
urls := []*ice.URL{turnURL, stunURL}
|
||||||
|
|
||||||
engine := engine.NewEngine(signalClient, urls, config.WgIface, config.WgAddr)
|
engine := connection.NewEngine(signalClient, urls, config.WgIface, config.WgAddr)
|
||||||
|
|
||||||
err = engine.Start(config.PrivateKey, strings.Split(config.Peers, ","))
|
err = engine.Start(config.PrivateKey, strings.Split(config.Peers, ","))
|
||||||
|
|
||||||
|
295
connection/connection.go
Normal file
295
connection/connection.go
Normal file
@@ -0,0 +1,295 @@
|
|||||||
|
package connection
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/pion/ice/v2"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/wiretrustee/wiretrustee/iface"
|
||||||
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||||
|
"net"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
DefaultAllowedIps = "0.0.0.0/0"
|
||||||
|
DefaultWgKeepAlive = 20 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
// Local Wireguard listening address e.g. 127.0.0.1:51820
|
||||||
|
WgListenAddr string
|
||||||
|
// A Local Wireguard Peer IP address in CIDR notation e.g. 10.30.30.1/24
|
||||||
|
WgPeerIp string
|
||||||
|
// Local Wireguard Interface name (e.g. wg0)
|
||||||
|
WgIface string
|
||||||
|
// Local Wireguard private key
|
||||||
|
WgKey wgtypes.Key
|
||||||
|
// Remote Wireguard public key
|
||||||
|
RemoteWgKey wgtypes.Key
|
||||||
|
|
||||||
|
StunTurnURLS []*ice.URL
|
||||||
|
}
|
||||||
|
|
||||||
|
type IceCredentials struct {
|
||||||
|
uFrag string
|
||||||
|
pwd string
|
||||||
|
isControlling bool //todo think of better solution??
|
||||||
|
}
|
||||||
|
|
||||||
|
type Connection struct {
|
||||||
|
Config Config
|
||||||
|
// signalCandidate is a handler function to signal remote peer about local connection candidate
|
||||||
|
signalCandidate func(candidate ice.Candidate) error
|
||||||
|
|
||||||
|
// signalOffer is a handler function to signal remote peer our connection offer (credentials)
|
||||||
|
signalOffer func(uFrag string, pwd string) error
|
||||||
|
|
||||||
|
// signalOffer is a handler function to signal remote peer our connection answer (credentials)
|
||||||
|
signalAnswer func(uFrag string, pwd string) error
|
||||||
|
|
||||||
|
// remoteAuthChannel is a channel used to wait for remote credentials to proceed with the connection
|
||||||
|
remoteAuthChannel chan IceCredentials
|
||||||
|
|
||||||
|
closeChannel chan bool
|
||||||
|
|
||||||
|
// agent is an actual ice.Agent that is used to negotiate and maintain a connection to a remote peer
|
||||||
|
agent *ice.Agent
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewConnection(config Config,
|
||||||
|
signalCandidate func(candidate ice.Candidate) error,
|
||||||
|
signalOffer func(uFrag string, pwd string) error,
|
||||||
|
signalAnswer func(uFrag string, pwd string) error,
|
||||||
|
) *Connection {
|
||||||
|
|
||||||
|
return &Connection{
|
||||||
|
Config: config,
|
||||||
|
signalCandidate: signalCandidate,
|
||||||
|
signalOffer: signalOffer,
|
||||||
|
signalAnswer: signalAnswer,
|
||||||
|
remoteAuthChannel: make(chan IceCredentials, 1),
|
||||||
|
closeChannel: make(chan bool, 1),
|
||||||
|
agent: nil,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open opens connection to a remote peer.
|
||||||
|
// Will block until the connection has successfully established
|
||||||
|
func (conn *Connection) Open() error {
|
||||||
|
|
||||||
|
wgConn, err := conn.createWireguardProxy()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// create an ice Agent that will be responsible for negotiating and establishing actual peer-to-peer connection
|
||||||
|
conn.agent, err = ice.NewAgent(&ice.AgentConfig{
|
||||||
|
NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4},
|
||||||
|
Urls: conn.Config.StunTurnURLS,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = conn.listenOnLocalCandidates()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = conn.listenOnConnectionStateChanges()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = conn.signalCredentials()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait until credentials have been sent from the remote peer (will arrive via signal channel)
|
||||||
|
remoteAuth := <-conn.remoteAuthChannel
|
||||||
|
|
||||||
|
err = conn.agent.GatherCandidates()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
remoteConn, err := conn.openConnectionToRemote(remoteAuth.isControlling, remoteAuth)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed establishing connection with the remote peer %s %s", conn.Config.RemoteWgKey.String(), err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
go conn.proxyToRemotePeer(*wgConn, remoteConn)
|
||||||
|
go conn.proxyToLocalWireguard(*wgConn, remoteConn)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (conn *Connection) OnAnswer(remoteAuth IceCredentials) error {
|
||||||
|
log.Debugf("onAnswer from peer %s", conn.Config.RemoteWgKey.String())
|
||||||
|
conn.remoteAuthChannel <- remoteAuth
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (conn *Connection) OnOffer(remoteAuth IceCredentials) error {
|
||||||
|
|
||||||
|
uFrag, pwd, err := conn.agent.GetLocalUserCredentials()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = conn.signalAnswer(uFrag, pwd)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
conn.remoteAuthChannel <- remoteAuth
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (conn *Connection) OnRemoteCandidate(candidate ice.Candidate) error {
|
||||||
|
|
||||||
|
log.Debugf("onRemoteCandidate from peer %s -> %s", conn.Config.RemoteWgKey.String(), candidate.String())
|
||||||
|
|
||||||
|
err := conn.agent.AddRemoteCandidate(candidate)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// openConnectionToRemote opens an ice.Conn to the remote peer. This is a real peer-to-peer connection
|
||||||
|
func (conn *Connection) openConnectionToRemote(isControlling bool, credentials IceCredentials) (*ice.Conn, error) {
|
||||||
|
var realConn *ice.Conn
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if isControlling {
|
||||||
|
realConn, err = conn.agent.Dial(context.TODO(), credentials.uFrag, credentials.pwd)
|
||||||
|
} else {
|
||||||
|
realConn, err = conn.agent.Accept(context.TODO(), credentials.uFrag, credentials.pwd)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return realConn, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// signalCredentials prepares local user credentials and signals them to the remote peer
|
||||||
|
func (conn *Connection) signalCredentials() error {
|
||||||
|
localUFrag, localPwd, err := conn.agent.GetLocalUserCredentials()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = conn.signalOffer(localUFrag, localPwd)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// listenOnLocalCandidates registers callback of an ICE Agent to new local connection candidates and
|
||||||
|
// signal them to the remote peer
|
||||||
|
func (conn *Connection) listenOnLocalCandidates() error {
|
||||||
|
err := conn.agent.OnCandidate(func(candidate ice.Candidate) {
|
||||||
|
if candidate != nil {
|
||||||
|
|
||||||
|
log.Debugf("discovered local candidate %s", candidate.String())
|
||||||
|
err := conn.signalCandidate(candidate)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed signaling candidate to the remote peer %s %s", conn.Config.RemoteWgKey.String(), err)
|
||||||
|
//todo ??
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// listenOnConnectionStateChanges registers callback of an ICE Agent to track connection state
|
||||||
|
func (conn *Connection) listenOnConnectionStateChanges() error {
|
||||||
|
err := conn.agent.OnConnectionStateChange(func(state ice.ConnectionState) {
|
||||||
|
log.Debugf("ICE Connection State has changed: %s", state.String())
|
||||||
|
if state == ice.ConnectionStateConnected {
|
||||||
|
// once the connection has been established we can check the selected candidate pair
|
||||||
|
pair, err := conn.agent.GetSelectedCandidatePair()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed selecting active ICE candidate pair %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Debugf("connected to peer %s via selected candidate pair %s", conn.Config.RemoteWgKey.String(), pair)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// createWireguardProxy opens connection to the local Wireguard instance (proxy) and sets peer endpoint of Wireguard to point
|
||||||
|
// to the local address of a proxy
|
||||||
|
func (conn *Connection) createWireguardProxy() (*net.Conn, error) {
|
||||||
|
wgConn, err := net.Dial("udp", conn.Config.WgListenAddr)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("failed dialing to local Wireguard port %s", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// add local proxy connection as a Wireguard peer
|
||||||
|
err = iface.UpdatePeer(conn.Config.WgIface, conn.Config.RemoteWgKey.String(), DefaultAllowedIps, DefaultWgKeepAlive,
|
||||||
|
wgConn.LocalAddr().String())
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error while configuring Wireguard peer [%s] %s", conn.Config.RemoteWgKey.String(), err.Error())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &wgConn, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// proxyToRemotePeer proxies everything from Wireguard to the remote peer
|
||||||
|
// blocks
|
||||||
|
func (conn *Connection) proxyToRemotePeer(wgConn net.Conn, remoteConn *ice.Conn) {
|
||||||
|
|
||||||
|
buf := make([]byte, 1500)
|
||||||
|
for {
|
||||||
|
n, err := wgConn.Read(buf)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnln("Error reading from peer: ", err.Error())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err = remoteConn.Write(buf[:n])
|
||||||
|
if err != nil {
|
||||||
|
log.Warnln("Error writing to remote peer: ", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// proxyToLocalWireguard proxies everything from the remote peer to local Wireguard
|
||||||
|
// blocks
|
||||||
|
func (conn *Connection) proxyToLocalWireguard(wgConn net.Conn, remoteConn *ice.Conn) {
|
||||||
|
|
||||||
|
buf := make([]byte, 1500)
|
||||||
|
for {
|
||||||
|
n, err := remoteConn.Read(buf)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed reading from remote connection %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err = wgConn.Write(buf[:n])
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed writing to local Wireguard instance %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
203
connection/engine.go
Normal file
203
connection/engine.go
Normal file
@@ -0,0 +1,203 @@
|
|||||||
|
package connection
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/pion/ice/v2"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/wiretrustee/wiretrustee/iface"
|
||||||
|
"github.com/wiretrustee/wiretrustee/signal"
|
||||||
|
sProto "github.com/wiretrustee/wiretrustee/signal/proto"
|
||||||
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Engine struct {
|
||||||
|
// a list of STUN and TURN servers
|
||||||
|
stunsTurns []*ice.URL
|
||||||
|
// signal server client
|
||||||
|
signal *signal.Client
|
||||||
|
// peer agents indexed by local public key of the remote peers
|
||||||
|
conns map[string]*Connection
|
||||||
|
// Wireguard interface
|
||||||
|
wgIface string
|
||||||
|
// Wireguard local address
|
||||||
|
wgIp string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewEngine(signal *signal.Client, stunsTurns []*ice.URL, wgIface string, wgAddr string) *Engine {
|
||||||
|
return &Engine{
|
||||||
|
stunsTurns: stunsTurns,
|
||||||
|
signal: signal,
|
||||||
|
wgIface: wgIface,
|
||||||
|
wgIp: wgAddr,
|
||||||
|
conns: map[string]*Connection{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Engine) Start(privateKey string, peers []string) error {
|
||||||
|
|
||||||
|
// setup wireguard
|
||||||
|
myKey, err := wgtypes.ParseKey(privateKey)
|
||||||
|
myPubKey := myKey.PublicKey().String()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error parsing Wireguard key %s: [%s]", privateKey, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = iface.Create(e.wgIface, e.wgIp)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error while creating interface %s: [%s]", e.wgIface, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = iface.Configure(e.wgIface, myKey.String())
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error while configuring Wireguard interface [%s]: %s", e.wgIface, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
wgPort, err := iface.GetListenPort(e.wgIface)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error while getting Wireguard interface port [%s]: %s", e.wgIface, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
e.receiveSignal(myPubKey)
|
||||||
|
|
||||||
|
// initialize peer agents
|
||||||
|
for _, peer := range peers {
|
||||||
|
remoteKey, _ := wgtypes.ParseKey(peer)
|
||||||
|
connConfig := &Config{
|
||||||
|
WgListenAddr: fmt.Sprintf("127.0.0.1:%d", *wgPort),
|
||||||
|
WgPeerIp: e.wgIp,
|
||||||
|
WgIface: e.wgIface,
|
||||||
|
WgKey: myKey,
|
||||||
|
RemoteWgKey: remoteKey,
|
||||||
|
StunTurnURLS: e.stunsTurns,
|
||||||
|
}
|
||||||
|
|
||||||
|
signalOffer := func(uFrag string, pwd string) error {
|
||||||
|
return signalAuth(uFrag, pwd, myKey, remoteKey, e.signal, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
signalAnswer := func(uFrag string, pwd string) error {
|
||||||
|
return signalAuth(uFrag, pwd, myKey, remoteKey, e.signal, true)
|
||||||
|
}
|
||||||
|
signalCandidate := func(candidate ice.Candidate) error {
|
||||||
|
return signalCandidate(candidate, myKey, remoteKey, e.signal)
|
||||||
|
}
|
||||||
|
|
||||||
|
conn := NewConnection(*connConfig, signalCandidate, signalOffer, signalAnswer)
|
||||||
|
e.conns[myPubKey] = conn
|
||||||
|
|
||||||
|
err = conn.Open()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error openning connection to a remote peer %s %s", remoteKey.String(), err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtypes.Key, s *signal.Client) error {
|
||||||
|
err := s.Send(&sProto.Message{
|
||||||
|
Type: sProto.Message_CANDIDATE,
|
||||||
|
Key: myKey.PublicKey().String(),
|
||||||
|
RemoteKey: remoteKey.String(),
|
||||||
|
Body: candidate.Marshal(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed signaling candidate to the remote peer %s %s", remoteKey.String(), err)
|
||||||
|
//todo ??
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func signalAuth(uFrag string, pwd string, myKey wgtypes.Key, remoteKey wgtypes.Key, s *signal.Client, isAnswer bool) error {
|
||||||
|
|
||||||
|
var t sProto.Message_Type
|
||||||
|
if isAnswer {
|
||||||
|
t = sProto.Message_ANSWER
|
||||||
|
} else {
|
||||||
|
t = sProto.Message_OFFER
|
||||||
|
}
|
||||||
|
|
||||||
|
msg := signal.MarshalCredential(myKey.PublicKey().String(), remoteKey.String(), &signal.Credential{
|
||||||
|
UFrag: uFrag,
|
||||||
|
Pwd: pwd}, t)
|
||||||
|
|
||||||
|
err := s.Send(msg)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Engine) receiveSignal(localKey string) {
|
||||||
|
// connect to a stream of messages coming from the signal server
|
||||||
|
e.signal.Receive(localKey, func(msg *sProto.Message) error {
|
||||||
|
|
||||||
|
conn := e.conns[msg.RemoteKey]
|
||||||
|
if conn == nil {
|
||||||
|
return fmt.Errorf("wrongly addressed message %s", msg.Key)
|
||||||
|
}
|
||||||
|
|
||||||
|
if conn.Config.RemoteWgKey.String() != msg.Key {
|
||||||
|
return fmt.Errorf("unknown peer %s", msg.Key)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch msg.Type {
|
||||||
|
case sProto.Message_OFFER:
|
||||||
|
remoteCred, err := signal.UnMarshalCredential(msg)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = conn.OnOffer(IceCredentials{
|
||||||
|
uFrag: remoteCred.UFrag,
|
||||||
|
pwd: remoteCred.Pwd,
|
||||||
|
isControlling: false,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
case sProto.Message_ANSWER:
|
||||||
|
remoteCred, err := signal.UnMarshalCredential(msg)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = conn.OnAnswer(IceCredentials{
|
||||||
|
uFrag: remoteCred.UFrag,
|
||||||
|
pwd: remoteCred.Pwd,
|
||||||
|
isControlling: true,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
case sProto.Message_CANDIDATE:
|
||||||
|
|
||||||
|
candidate, err := ice.UnmarshalCandidate(msg.Body)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed on parsing remote candidate %s -> %s", candidate, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = conn.OnRemoteCandidate(candidate)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error handling CANDIATE from %s", msg.Key)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
e.signal.WaitConnected()
|
||||||
|
}
|
312
engine/agent.go
312
engine/agent.go
@@ -1,312 +0,0 @@
|
|||||||
package engine
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"github.com/pion/ice/v2"
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
"github.com/wiretrustee/wiretrustee/iface"
|
|
||||||
"github.com/wiretrustee/wiretrustee/signal"
|
|
||||||
sProto "github.com/wiretrustee/wiretrustee/signal/proto"
|
|
||||||
"net"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// PeerAgent is responsible for establishing and maintaining of the connection between two peers (local and remote)
|
|
||||||
// It uses underlying ice.Agent and ice.Conn
|
|
||||||
type PeerAgent struct {
|
|
||||||
// a Wireguard public key of the peer
|
|
||||||
LocalKey string
|
|
||||||
// a Wireguard public key of the remote peer
|
|
||||||
RemoteKey string
|
|
||||||
// ICE iceAgent that actually negotiates and maintains peer-to-peer connection
|
|
||||||
iceAgent *ice.Agent
|
|
||||||
// Actual peer-to-peer connection
|
|
||||||
conn *ice.Conn
|
|
||||||
// a signal.Client to negotiate initial connection
|
|
||||||
signal *signal.Client
|
|
||||||
// a connection to a local Wireguard instance to proxy data
|
|
||||||
wgConn net.Conn
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewPeerAgent creates a new PeerAgent with given local and remote Wireguard public keys and initializes an ICE Agent
|
|
||||||
func NewPeerAgent(localKey string, remoteKey string, stunTurnURLS []*ice.URL, wgAddr string, signal *signal.Client,
|
|
||||||
wgIface string) (*PeerAgent, error) {
|
|
||||||
|
|
||||||
// connect to local Wireguard instance
|
|
||||||
wgConn, err := net.Dial("udp", wgAddr)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("failed dialing to local Wireguard port %s", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
// add local proxy connection as a Wireguard peer
|
|
||||||
err = iface.UpdatePeer(wgIface, remoteKey, "0.0.0.0/0", 15*time.Second, wgConn.LocalAddr().String())
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error while configuring Wireguard peer [%s] %s", remoteKey, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
// init ICE Agent
|
|
||||||
iceAgent, err := ice.NewAgent(&ice.AgentConfig{
|
|
||||||
NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4},
|
|
||||||
Urls: stunTurnURLS,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
peerAgent := &PeerAgent{
|
|
||||||
LocalKey: localKey,
|
|
||||||
RemoteKey: remoteKey,
|
|
||||||
iceAgent: iceAgent,
|
|
||||||
conn: nil,
|
|
||||||
wgConn: wgConn,
|
|
||||||
signal: signal,
|
|
||||||
}
|
|
||||||
|
|
||||||
err = peerAgent.onConnectionStateChange()
|
|
||||||
if err != nil {
|
|
||||||
//todo close agent
|
|
||||||
log.Errorf("failed starting listener on ICE connection state change %s", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = peerAgent.onCandidate()
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed starting listener on ICE Candidate %s", err)
|
|
||||||
//todo close agent
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return peerAgent, nil
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// proxyToRemotePeer proxies everything from Wireguard to the remote peer
|
|
||||||
// blocks
|
|
||||||
func (pa *PeerAgent) proxyToRemotePeer() {
|
|
||||||
|
|
||||||
buf := make([]byte, 1500)
|
|
||||||
for {
|
|
||||||
n, err := pa.wgConn.Read(buf)
|
|
||||||
if err != nil {
|
|
||||||
log.Warnln("Error reading from peer: ", err.Error())
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
n, err = pa.conn.Write(buf[:n])
|
|
||||||
if err != nil {
|
|
||||||
log.Warnln("Error writing to remote peer: ", err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// proxyToLocalWireguard proxies everything from the remote peer to local Wireguard
|
|
||||||
// blocks
|
|
||||||
func (pa *PeerAgent) proxyToLocalWireguard() {
|
|
||||||
|
|
||||||
buf := make([]byte, 1500)
|
|
||||||
for {
|
|
||||||
n, err := pa.conn.Read(buf)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed reading from remote connection %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
n, err = pa.wgConn.Write(buf[:n])
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed writing to local Wireguard instance %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// OpenConnection opens connection to remote peer. Flow:
|
|
||||||
// 1. start gathering connection candidates
|
|
||||||
// 2. if the peer was an initiator then it dials to the remote peer
|
|
||||||
// 3. if the peer wasn't an initiator then it waits for incoming connection from the remote peer
|
|
||||||
// 4. after connection has been established peer starts to:
|
|
||||||
// - proxy all local Wireguard's packets to the remote peer
|
|
||||||
// - proxy all incoming data from the remote peer to local Wireguard
|
|
||||||
func (pa *PeerAgent) OpenConnection(initiator bool) (*ice.Conn, error) {
|
|
||||||
|
|
||||||
// start gathering candidates
|
|
||||||
err := pa.iceAgent.GatherCandidates()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// by that time it should be already set
|
|
||||||
frag, pwd, err := pa.iceAgent.GetRemoteUserCredentials()
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("remote credentials are not set for remote peer %s", pa.RemoteKey)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
// initiate remote connection
|
|
||||||
// will block until connection was established
|
|
||||||
var conn *ice.Conn = nil
|
|
||||||
if initiator {
|
|
||||||
conn, err = pa.iceAgent.Dial(context.TODO(), frag, pwd)
|
|
||||||
} else {
|
|
||||||
conn, err = pa.iceAgent.Accept(context.TODO(), frag, pwd)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("failed listening on local port %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Infof("Local addr %s, remote addr %s", conn.LocalAddr(), conn.RemoteAddr())
|
|
||||||
|
|
||||||
return conn, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pa *PeerAgent) prepareConnection(msg *sProto.Message, initiator bool) (*signal.Credential, error) {
|
|
||||||
remoteCred, err := signal.UnMarshalCredential(msg)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
cred, err := pa.Authenticate(remoteCred)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error authenticating remote peer %s", msg.Key)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
pa.conn, err = pa.OpenConnection(initiator)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error opening connection to remote peer %s %s", msg.Key, err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return cred, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pa *PeerAgent) OnOffer(msg *sProto.Message) error {
|
|
||||||
|
|
||||||
cred, err := pa.prepareConnection(msg, false)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// notify the remote peer about our credentials
|
|
||||||
answer := signal.MarshalCredential(pa.LocalKey, pa.RemoteKey, &signal.Credential{
|
|
||||||
UFrag: cred.UFrag,
|
|
||||||
Pwd: cred.Pwd,
|
|
||||||
}, sProto.Message_ANSWER)
|
|
||||||
|
|
||||||
err = pa.signal.Send(answer)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pa *PeerAgent) OnAnswer(msg *sProto.Message) error {
|
|
||||||
_, err := pa.prepareConnection(msg, true)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pa *PeerAgent) OnRemoteCandidate(msg *sProto.Message) error {
|
|
||||||
|
|
||||||
log.Debugf("received remote candidate %s", msg.Body)
|
|
||||||
|
|
||||||
candidate, err := ice.UnmarshalCandidate(msg.Body)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed on parsing remote candidate %s -> %s", candidate, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = pa.iceAgent.AddRemoteCandidate(candidate)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed on adding remote candidate %s -> %s", candidate, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// onCandidate detects new local ice.Candidate and sends it to the remote peer via signal server
|
|
||||||
func (pa *PeerAgent) onCandidate() error {
|
|
||||||
return pa.iceAgent.OnCandidate(func(candidate ice.Candidate) {
|
|
||||||
if candidate != nil {
|
|
||||||
|
|
||||||
log.Debugf("discovered local candidate %s", candidate.String())
|
|
||||||
|
|
||||||
err := pa.signal.Send(&sProto.Message{
|
|
||||||
Type: sProto.Message_CANDIDATE,
|
|
||||||
Key: pa.LocalKey,
|
|
||||||
RemoteKey: pa.RemoteKey,
|
|
||||||
Body: candidate.Marshal(),
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed signaling candidate to the remote peer %s %s", pa.RemoteKey, err)
|
|
||||||
//todo ??
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// onConnectionStateChange listens on ice.Agent connection state change events and once connected checks a Candidate pair
|
|
||||||
// the ice.Conn was established with
|
|
||||||
func (pa *PeerAgent) onConnectionStateChange() error {
|
|
||||||
return pa.iceAgent.OnConnectionStateChange(func(state ice.ConnectionState) {
|
|
||||||
log.Debugf("ICE Connection State has changed: %s", state.String())
|
|
||||||
if state == ice.ConnectionStateConnected {
|
|
||||||
// once the connection has been established we can check the selected candidate pair
|
|
||||||
pair, err := pa.iceAgent.GetSelectedCandidatePair()
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed selecting active ICE candidate pair %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
log.Debugf("connected to peer %s via selected candidate pair %s", pa.RemoteKey, pair)
|
|
||||||
|
|
||||||
// start proxying data between local Wireguard and remote peer
|
|
||||||
go func() {
|
|
||||||
pa.proxyToRemotePeer()
|
|
||||||
}()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
pa.proxyToLocalWireguard()
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// OfferConnection starts sending a connection offer to a remote peer
|
|
||||||
func (pa *PeerAgent) OfferConnection() error {
|
|
||||||
localUFrag, localPwd, err := pa.iceAgent.GetLocalUserCredentials()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
offer := signal.MarshalCredential(pa.LocalKey, pa.RemoteKey, &signal.Credential{
|
|
||||||
UFrag: localUFrag,
|
|
||||||
Pwd: localPwd}, sProto.Message_OFFER)
|
|
||||||
|
|
||||||
err = pa.signal.Send(offer)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// authenticate sets the signal.Credential of the remote peer
|
|
||||||
// and returns local Credentials
|
|
||||||
func (pa *PeerAgent) Authenticate(credential *signal.Credential) (*signal.Credential, error) {
|
|
||||||
|
|
||||||
err := pa.iceAgent.SetRemoteCredentials(credential.UFrag, credential.Pwd)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
localUFrag, localPwd, err := pa.iceAgent.GetLocalUserCredentials()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &signal.Credential{
|
|
||||||
UFrag: localUFrag,
|
|
||||||
Pwd: localPwd}, nil
|
|
||||||
|
|
||||||
}
|
|
124
engine/engine.go
124
engine/engine.go
@@ -1,124 +0,0 @@
|
|||||||
package engine
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"github.com/pion/ice/v2"
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
"github.com/wiretrustee/wiretrustee/iface"
|
|
||||||
signal "github.com/wiretrustee/wiretrustee/signal"
|
|
||||||
sProto "github.com/wiretrustee/wiretrustee/signal/proto"
|
|
||||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Engine struct {
|
|
||||||
// a list of STUN and TURN servers
|
|
||||||
stunsTurns []*ice.URL
|
|
||||||
// signal server client
|
|
||||||
signal *signal.Client
|
|
||||||
// peer agents indexed by local public key of the remote peers
|
|
||||||
agents map[string]*PeerAgent
|
|
||||||
// Wireguard interface
|
|
||||||
wgIface string
|
|
||||||
// Wireguard local address
|
|
||||||
wgAddr string
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewEngine(signal *signal.Client, stunsTurns []*ice.URL, wgIface string, wgAddr string) *Engine {
|
|
||||||
return &Engine{
|
|
||||||
stunsTurns: stunsTurns,
|
|
||||||
signal: signal,
|
|
||||||
wgIface: wgIface,
|
|
||||||
wgAddr: wgAddr,
|
|
||||||
agents: map[string]*PeerAgent{},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *Engine) Start(privateKey string, peers []string) error {
|
|
||||||
|
|
||||||
// setup wireguard
|
|
||||||
myKey, err := wgtypes.ParseKey(privateKey)
|
|
||||||
myPubKey := myKey.PublicKey().String()
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error parsing Wireguard key %s: [%s]", privateKey, err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = iface.Create(e.wgIface, e.wgAddr)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error while creating interface %s: [%s]", e.wgIface, err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = iface.Configure(e.wgIface, myKey.String())
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error while configuring Wireguard interface [%s]: %s", e.wgIface, err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
wgPort, err := iface.GetListenPort(e.wgIface)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error while getting Wireguard interface port [%s]: %s", e.wgIface, err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
e.receiveSignal(myPubKey)
|
|
||||||
|
|
||||||
// initialize peer agents
|
|
||||||
for _, peer := range peers {
|
|
||||||
peerAgent, err := NewPeerAgent(myPubKey, peer, e.stunsTurns, fmt.Sprintf("127.0.0.1:%d", *wgPort), e.signal, e.wgIface)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("failed creating peer agent for pair %s - %s", myPubKey, peer)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
e.agents[myPubKey] = peerAgent
|
|
||||||
|
|
||||||
err = peerAgent.OfferConnection()
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("failed starting agent %s %s", myPubKey, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *Engine) receiveSignal(localKey string) {
|
|
||||||
// connect to a stream of messages coming from the signal server
|
|
||||||
e.signal.Receive(localKey, func(msg *sProto.Message) error {
|
|
||||||
|
|
||||||
peerAgent := e.agents[msg.RemoteKey]
|
|
||||||
if peerAgent == nil {
|
|
||||||
return fmt.Errorf("wrongly addressed message %s", msg.Key)
|
|
||||||
}
|
|
||||||
|
|
||||||
if peerAgent.RemoteKey != msg.Key {
|
|
||||||
return fmt.Errorf("unknown peer %s", msg.Key)
|
|
||||||
}
|
|
||||||
|
|
||||||
switch msg.Type {
|
|
||||||
case sProto.Message_OFFER:
|
|
||||||
err := peerAgent.OnOffer(msg)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error handling OFFER from %s", msg.Key)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
case sProto.Message_ANSWER:
|
|
||||||
err := peerAgent.OnAnswer(msg)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error handling ANSWER from %s", msg.Key)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
case sProto.Message_CANDIDATE:
|
|
||||||
err := peerAgent.OnRemoteCandidate(msg)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error handling CANDIATE from %s", msg.Key)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
e.signal.WaitConnected()
|
|
||||||
}
|
|
Reference in New Issue
Block a user