feat: add timeout while initiating conenction

This commit is contained in:
braginini 2021-04-19 22:37:49 +02:00
parent c96b63b956
commit e71c623e33
2 changed files with 50 additions and 64 deletions

View File

@ -2,6 +2,7 @@ package connection
import ( import (
"context" "context"
"fmt"
"github.com/pion/ice/v2" "github.com/pion/ice/v2"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/wiretrustee/wiretrustee/iface" "github.com/wiretrustee/wiretrustee/iface"
@ -52,9 +53,8 @@ type Connection struct {
// remoteAuthChannel is a channel used to wait for remote credentials to proceed with the connection // remoteAuthChannel is a channel used to wait for remote credentials to proceed with the connection
remoteAuthChannel chan IceCredentials remoteAuthChannel chan IceCredentials
closeChannel chan bool closeChannel chan bool
connectedChannel chan struct{} closedChannel chan struct{}
closedChannel chan struct{}
// agent is an actual ice.Agent that is used to negotiate and maintain a connection to a remote peer // agent is an actual ice.Agent that is used to negotiate and maintain a connection to a remote peer
agent *ice.Agent agent *ice.Agent
@ -70,8 +70,6 @@ func NewConnection(config ConnConfig,
signalCandidate func(candidate ice.Candidate) error, signalCandidate func(candidate ice.Candidate) error,
signalOffer func(uFrag string, pwd string) error, signalOffer func(uFrag string, pwd string) error,
signalAnswer func(uFrag string, pwd string) error, signalAnswer func(uFrag string, pwd string) error,
closedChannel chan struct{},
connectedChannel chan struct{},
) *Connection { ) *Connection {
return &Connection{ return &Connection{
@ -81,8 +79,7 @@ func NewConnection(config ConnConfig,
signalAnswer: signalAnswer, signalAnswer: signalAnswer,
remoteAuthChannel: make(chan IceCredentials, 1), remoteAuthChannel: make(chan IceCredentials, 1),
closeChannel: make(chan bool, 2), closeChannel: make(chan bool, 2),
connectedChannel: connectedChannel, closedChannel: make(chan struct{}),
closedChannel: closedChannel,
agent: nil, agent: nil,
isActive: false, isActive: false,
mux: sync.Mutex{}, mux: sync.Mutex{},
@ -123,7 +120,7 @@ func (conn *Connection) Close() error {
// Open opens connection to a remote peer. // Open opens connection to a remote peer.
// Will block until the connection has successfully established // Will block until the connection has successfully established
func (conn *Connection) Open() error { func (conn *Connection) Open(timeout time.Duration) (chan struct{}, error) {
log.Debugf("1: opening connection to peer %s", conn.Config.RemoteWgKey.String()) log.Debugf("1: opening connection to peer %s", conn.Config.RemoteWgKey.String())
@ -140,58 +137,62 @@ func (conn *Connection) Open() error {
log.Debugf("2: opening connection to peer %s", conn.Config.RemoteWgKey.String()) log.Debugf("2: opening connection to peer %s", conn.Config.RemoteWgKey.String())
if err != nil { if err != nil {
return err return nil, err
} }
err = conn.listenOnLocalCandidates() err = conn.listenOnLocalCandidates()
if err != nil { if err != nil {
return err return nil, err
} }
log.Debugf("3: opening connection to peer %s", conn.Config.RemoteWgKey.String()) log.Debugf("3: opening connection to peer %s", conn.Config.RemoteWgKey.String())
err = conn.listenOnConnectionStateChanges() err = conn.listenOnConnectionStateChanges()
if err != nil { if err != nil {
return err return nil, err
} }
log.Debugf("4: opening connection to peer %s", conn.Config.RemoteWgKey.String()) log.Debugf("4: opening connection to peer %s", conn.Config.RemoteWgKey.String())
err = conn.signalCredentials() err = conn.signalCredentials()
if err != nil { if err != nil {
return err return nil, err
} }
log.Debugf("5: opening connection to peer %s", conn.Config.RemoteWgKey.String()) log.Debugf("5: opening connection to peer %s", conn.Config.RemoteWgKey.String())
// wait until credentials have been sent from the remote peer (will arrive via a signal server) // wait until credentials have been sent from the remote peer (will arrive via a signal server)
remoteAuth := <-conn.remoteAuthChannel select {
case remoteAuth := <-conn.remoteAuthChannel:
err = conn.agent.GatherCandidates() err = conn.agent.GatherCandidates()
if err != nil { if err != nil {
return err return nil, err
}
remoteConn, err := conn.openConnectionToRemote(remoteAuth.isControlling, remoteAuth)
if err != nil {
log.Errorf("failed establishing connection with the remote peer %s %s", conn.Config.RemoteWgKey.String(), err)
return nil, err
}
wgConn, err := conn.createWireguardProxy()
if err != nil {
return nil, err
}
conn.wgConn = *wgConn
go conn.proxyToRemotePeer(*wgConn, remoteConn)
go conn.proxyToLocalWireguard(*wgConn, remoteConn)
log.Debugf("opened connection to peer %s", conn.Config.RemoteWgKey.String())
conn.isActive = true
return conn.closedChannel, nil
case <-time.After(timeout):
return nil, fmt.Errorf("timeout of %vs exceeded while waiting for connection to peer %s", timeout.Seconds(), conn.Config.RemoteWgKey.String())
} }
remoteConn, err := conn.openConnectionToRemote(remoteAuth.isControlling, remoteAuth)
if err != nil {
log.Errorf("failed establishing connection with the remote peer %s %s", conn.Config.RemoteWgKey.String(), err)
return err
}
wgConn, err := conn.createWireguardProxy()
if err != nil {
return err
}
conn.wgConn = *wgConn
go conn.proxyToRemotePeer(*wgConn, remoteConn)
go conn.proxyToLocalWireguard(*wgConn, remoteConn)
log.Debugf("opened connection to peer %s", conn.Config.RemoteWgKey.String())
conn.isActive = true
return nil
} }
func (conn *Connection) OnAnswer(remoteAuth IceCredentials) error { func (conn *Connection) OnAnswer(remoteAuth IceCredentials) error {
@ -311,6 +312,10 @@ func (conn *Connection) listenOnConnectionStateChanges() error {
log.Debugf("connected to peer %s via selected candidate pair %s", conn.Config.RemoteWgKey.String(), pair) log.Debugf("connected to peer %s via selected candidate pair %s", conn.Config.RemoteWgKey.String(), pair)
} else if state == ice.ConnectionStateDisconnected || state == ice.ConnectionStateFailed { } else if state == ice.ConnectionStateDisconnected || state == ice.ConnectionStateFailed {
// todo do we really wanna have a connection restart within connection itself? Think of moving it outside // todo do we really wanna have a connection restart within connection itself? Think of moving it outside
err := conn.Close()
if err != nil {
log.Errorf("failed closing connection fo peer %s %s", conn.Config.RemoteWgKey.String(), err.Error())
}
close(conn.closedChannel) close(conn.closedChannel)
} }
}) })
@ -322,21 +327,6 @@ func (conn *Connection) listenOnConnectionStateChanges() error {
return nil return nil
} }
func (conn *Connection) Restart() error {
err := conn.Close()
if err != nil {
log.Errorf("failed closing connection to peer %s %s", conn.Config.RemoteWgKey.String(), err)
return err
}
err = conn.Open()
if err != nil {
log.Errorf("failed reopenning connection to peer %s %s", conn.Config.RemoteWgKey.String(), err)
return err
}
return nil
}
// createWireguardProxy opens connection to a local Wireguard instance (proxy) and sets Wireguard's peer endpoint to point // createWireguardProxy opens connection to a local Wireguard instance (proxy) and sets Wireguard's peer endpoint to point
// to a local address of a proxy // to a local address of a proxy
func (conn *Connection) createWireguardProxy() (*net.Conn, error) { func (conn *Connection) createWireguardProxy() (*net.Conn, error) {

View File

@ -9,6 +9,7 @@ import (
"github.com/wiretrustee/wiretrustee/signal" "github.com/wiretrustee/wiretrustee/signal"
sProto "github.com/wiretrustee/wiretrustee/signal/proto" sProto "github.com/wiretrustee/wiretrustee/signal/proto"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"time"
) )
type Engine struct { type Engine struct {
@ -76,7 +77,7 @@ func (e *Engine) Start(privateKey string, peers []Peer) error {
go func() { go func() {
operation := func() error { operation := func() error {
conn, closed, err := e.openConnection(*wgPort, myKey, peer) _, closed, err := e.openConnection(*wgPort, myKey, peer)
if err != nil { if err != nil {
return err return err
} }
@ -84,12 +85,9 @@ func (e *Engine) Start(privateKey string, peers []Peer) error {
select { select {
case _, ok := <-closed: case _, ok := <-closed:
if !ok { if !ok {
err = conn.Close() return fmt.Errorf("connection to peer %s has been closed", peer.WgPubKey)
if err != nil {
return err
}
} }
return fmt.Errorf("connection to peer %s has been closed", peer.WgPubKey) return nil
} }
} }
@ -128,17 +126,15 @@ func (e *Engine) openConnection(wgPort int, myKey wgtypes.Key, peer Peer) (*Conn
return signalCandidate(candidate, myKey, remoteKey, e.signal) return signalCandidate(candidate, myKey, remoteKey, e.signal)
} }
connected := make(chan struct{}, 1) conn := NewConnection(*connConfig, signalCandidate, signalOffer, signalAnswer)
closed := make(chan struct{})
conn := NewConnection(*connConfig, signalCandidate, signalOffer, signalAnswer, closed, connected)
e.conns[remoteKey.String()] = conn e.conns[remoteKey.String()] = conn
// blocks until the connection is open (or timeout??) // blocks until the connection is open (or timeout)
err := conn.Open() closedCh, err := conn.Open(60 * time.Second)
if err != nil { if err != nil {
log.Errorf("error openning connection to a remote peer %s %s", remoteKey.String(), err.Error()) log.Errorf("error openning connection to a remote peer %s %s", remoteKey.String(), err.Error())
return nil, nil, err return nil, nil, err
} }
return conn, closed, nil return conn, closedCh, nil
} }
func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtypes.Key, s *signal.Client) error { func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtypes.Key, s *signal.Client) error {