mirror of
https://github.com/netbirdio/netbird.git
synced 2025-01-21 05:19:33 +01:00
refactor: move answer and offer handling to peer agent
This commit is contained in:
parent
4ba9c958b4
commit
6fa4077f04
@ -26,13 +26,9 @@ type PeerAgent struct {
|
||||
signal *signal.Client
|
||||
// a connection to a local Wireguard instance to proxy data
|
||||
wgConn net.Conn
|
||||
// an address of local Wireguard instance
|
||||
wgAddr string
|
||||
//
|
||||
wgIface string
|
||||
}
|
||||
|
||||
// NewPeerAgent creates a new PeerAgent with give local and remote Wireguard public keys and initializes an ICE Agent
|
||||
// NewPeerAgent creates a new PeerAgent with given local and remote Wireguard public keys and initializes an ICE Agent
|
||||
func NewPeerAgent(localKey string, remoteKey string, stunTurnURLS []*ice.URL, wgAddr string, signal *signal.Client,
|
||||
wgIface string) (*PeerAgent, error) {
|
||||
|
||||
@ -61,22 +57,21 @@ func NewPeerAgent(localKey string, remoteKey string, stunTurnURLS []*ice.URL, wg
|
||||
LocalKey: localKey,
|
||||
RemoteKey: remoteKey,
|
||||
iceAgent: iceAgent,
|
||||
wgAddr: wgAddr,
|
||||
conn: nil,
|
||||
wgConn: wgConn,
|
||||
signal: signal,
|
||||
wgIface: wgIface,
|
||||
}
|
||||
|
||||
err = peerAgent.onConnectionStateChange()
|
||||
if err != nil {
|
||||
//todo close agent
|
||||
log.Errorf("failed starting listener on ICE connection state change %s", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = peerAgent.onCandidate()
|
||||
if err != nil {
|
||||
log.Errorf("failed listening on ICE connection state changes %s", err)
|
||||
log.Errorf("failed starting listener on ICE Candidate %s", err)
|
||||
//todo close agent
|
||||
return nil, err
|
||||
}
|
||||
@ -130,21 +125,20 @@ func (pa *PeerAgent) proxyToLocalWireguard() {
|
||||
// 4. after connection has been established peer starts to:
|
||||
// - proxy all local Wireguard's packets to the remote peer
|
||||
// - proxy all incoming data from the remote peer to local Wireguard
|
||||
func (pa *PeerAgent) OpenConnection(initiator bool) error {
|
||||
func (pa *PeerAgent) OpenConnection(initiator bool) (*ice.Conn, error) {
|
||||
|
||||
// start gathering candidates
|
||||
err := pa.iceAgent.GatherCandidates()
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// by that time it should be already set
|
||||
frag, pwd, err := pa.iceAgent.GetRemoteUserCredentials()
|
||||
if err != nil {
|
||||
log.Errorf("remote credentials are not set for remote peer %s", pa.RemoteKey)
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// initiate remote connection
|
||||
// will block until connection was established
|
||||
var conn *ice.Conn = nil
|
||||
@ -156,17 +150,59 @@ func (pa *PeerAgent) OpenConnection(initiator bool) error {
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("failed listening on local port %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("Local addr %s, remote addr %s", conn.LocalAddr(), conn.RemoteAddr())
|
||||
pa.conn = conn
|
||||
|
||||
return conn, err
|
||||
}
|
||||
|
||||
func (pa *PeerAgent) prepareConnection(msg *sProto.Message, initiator bool) (*signal.Credential, error) {
|
||||
remoteCred, err := signal.UnMarshalCredential(msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cred, err := pa.Authenticate(remoteCred)
|
||||
if err != nil {
|
||||
log.Errorf("error authenticating remote peer %s", msg.Key)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go func() {
|
||||
pa.conn, err = pa.OpenConnection(initiator)
|
||||
if err != nil {
|
||||
log.Errorf("error opening connection to remote peer %s %s", msg.Key, err)
|
||||
}
|
||||
}()
|
||||
|
||||
return cred, nil
|
||||
}
|
||||
|
||||
func (pa *PeerAgent) OnOffer(msg *sProto.Message) error {
|
||||
|
||||
cred, err := pa.prepareConnection(msg, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// notify the remote peer about our credentials
|
||||
answer := signal.MarshalCredential(pa.LocalKey, pa.RemoteKey, &signal.Credential{
|
||||
UFrag: cred.UFrag,
|
||||
Pwd: cred.Pwd,
|
||||
}, sProto.Message_ANSWER)
|
||||
|
||||
err = pa.signal.Send(answer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pa *PeerAgent) OnAnswer(msg *sProto.Message) error {
|
||||
return nil
|
||||
_, err := pa.prepareConnection(msg, true)
|
||||
return err
|
||||
}
|
||||
|
||||
func (pa *PeerAgent) OnRemoteCandidate(msg *sProto.Message) error {
|
||||
@ -236,7 +272,8 @@ func (pa *PeerAgent) onConnectionStateChange() error {
|
||||
})
|
||||
}
|
||||
|
||||
func (pa *PeerAgent) Start() error {
|
||||
// OfferConnection starts sending a connection offer to a remote peer
|
||||
func (pa *PeerAgent) OfferConnection() error {
|
||||
localUFrag, localPwd, err := pa.iceAgent.GetLocalUserCredentials()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -61,6 +61,8 @@ func (e *Engine) Start(privateKey string, peers []string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
e.receiveSignal(myPubKey)
|
||||
|
||||
// initialize peer agents
|
||||
for _, peer := range peers {
|
||||
peerAgent, err := NewPeerAgent(myPubKey, peer, e.stunsTurns, fmt.Sprintf("127.0.0.1:%d", *wgPort), e.signal, e.wgIface)
|
||||
@ -69,12 +71,8 @@ func (e *Engine) Start(privateKey string, peers []string) error {
|
||||
return err
|
||||
}
|
||||
e.agents[myPubKey] = peerAgent
|
||||
}
|
||||
|
||||
e.receiveSignal(myPubKey)
|
||||
|
||||
for _, pa := range e.agents {
|
||||
err := pa.Start()
|
||||
err = peerAgent.OfferConnection()
|
||||
if err != nil {
|
||||
log.Fatalf("failed starting agent %s %s", myPubKey, err)
|
||||
return err
|
||||
@ -97,34 +95,20 @@ func (e *Engine) receiveSignal(localKey string) {
|
||||
return fmt.Errorf("unknown peer %s", msg.Key)
|
||||
}
|
||||
|
||||
// the one who send offer (expects answer) is the initiator of teh connection
|
||||
initiator := msg.Type == sProto.Message_ANSWER
|
||||
|
||||
switch msg.Type {
|
||||
case sProto.Message_OFFER:
|
||||
|
||||
cred, err := e.handle(msg, peerAgent, initiator)
|
||||
err := peerAgent.OnOffer(msg)
|
||||
if err != nil {
|
||||
log.Errorf("error handling OFFER from %s", msg.Key)
|
||||
return err
|
||||
}
|
||||
// notify the remote peer about our credentials
|
||||
answer := signal.MarshalCredential(peerAgent.LocalKey, peerAgent.RemoteKey, &signal.Credential{
|
||||
UFrag: cred.UFrag,
|
||||
Pwd: cred.Pwd,
|
||||
}, sProto.Message_ANSWER)
|
||||
|
||||
err = e.signal.Send(answer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
case sProto.Message_ANSWER:
|
||||
_, err := e.handle(msg, peerAgent, initiator)
|
||||
err := peerAgent.OnAnswer(msg)
|
||||
if err != nil {
|
||||
log.Errorf("error handling ANSWER from %s", msg.Key)
|
||||
return err
|
||||
}
|
||||
|
||||
case sProto.Message_CANDIDATE:
|
||||
err := peerAgent.OnRemoteCandidate(msg)
|
||||
if err != nil {
|
||||
@ -138,26 +122,3 @@ func (e *Engine) receiveSignal(localKey string) {
|
||||
|
||||
e.signal.WaitConnected()
|
||||
}
|
||||
|
||||
func (e *Engine) handle(msg *sProto.Message, peerAgent *PeerAgent, initiator bool) (*signal.Credential, error) {
|
||||
remoteCred, err := signal.UnMarshalCredential(msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cred, err := peerAgent.Authenticate(remoteCred)
|
||||
if err != nil {
|
||||
log.Errorf("error authenticating remote peer %s", msg.Key)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go func() {
|
||||
|
||||
err = peerAgent.OpenConnection(initiator)
|
||||
if err != nil {
|
||||
log.Errorf("error opening connection to remote peer %s %s", msg.Key, err)
|
||||
}
|
||||
}()
|
||||
|
||||
return cred, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user