From e71c623e33953eca61e9ae3fe1281607c3c1ebe7 Mon Sep 17 00:00:00 2001 From: braginini Date: Mon, 19 Apr 2021 22:37:49 +0200 Subject: [PATCH] feat: add timeout while initiating conenction --- connection/connection.go | 94 ++++++++++++++++++---------------------- connection/engine.go | 20 ++++----- 2 files changed, 50 insertions(+), 64 deletions(-) diff --git a/connection/connection.go b/connection/connection.go index 9a410bdc6..b21aaeef1 100644 --- a/connection/connection.go +++ b/connection/connection.go @@ -2,6 +2,7 @@ package connection import ( "context" + "fmt" "github.com/pion/ice/v2" log "github.com/sirupsen/logrus" "github.com/wiretrustee/wiretrustee/iface" @@ -52,9 +53,8 @@ type Connection struct { // remoteAuthChannel is a channel used to wait for remote credentials to proceed with the connection remoteAuthChannel chan IceCredentials - closeChannel chan bool - connectedChannel chan struct{} - closedChannel chan struct{} + closeChannel chan bool + closedChannel chan struct{} // agent is an actual ice.Agent that is used to negotiate and maintain a connection to a remote peer agent *ice.Agent @@ -70,8 +70,6 @@ func NewConnection(config ConnConfig, signalCandidate func(candidate ice.Candidate) error, signalOffer func(uFrag string, pwd string) error, signalAnswer func(uFrag string, pwd string) error, - closedChannel chan struct{}, - connectedChannel chan struct{}, ) *Connection { return &Connection{ @@ -81,8 +79,7 @@ func NewConnection(config ConnConfig, signalAnswer: signalAnswer, remoteAuthChannel: make(chan IceCredentials, 1), closeChannel: make(chan bool, 2), - connectedChannel: connectedChannel, - closedChannel: closedChannel, + closedChannel: make(chan struct{}), agent: nil, isActive: false, mux: sync.Mutex{}, @@ -123,7 +120,7 @@ func (conn *Connection) Close() error { // Open opens connection to a remote peer. // Will block until the connection has successfully established -func (conn *Connection) Open() error { +func (conn *Connection) Open(timeout time.Duration) (chan struct{}, error) { log.Debugf("1: opening connection to peer %s", conn.Config.RemoteWgKey.String()) @@ -140,58 +137,62 @@ func (conn *Connection) Open() error { log.Debugf("2: opening connection to peer %s", conn.Config.RemoteWgKey.String()) if err != nil { - return err + return nil, err } err = conn.listenOnLocalCandidates() if err != nil { - return err + return nil, err } log.Debugf("3: opening connection to peer %s", conn.Config.RemoteWgKey.String()) err = conn.listenOnConnectionStateChanges() if err != nil { - return err + return nil, err } log.Debugf("4: opening connection to peer %s", conn.Config.RemoteWgKey.String()) err = conn.signalCredentials() if err != nil { - return err + return nil, err } log.Debugf("5: opening connection to peer %s", conn.Config.RemoteWgKey.String()) // wait until credentials have been sent from the remote peer (will arrive via a signal server) - remoteAuth := <-conn.remoteAuthChannel + select { + case remoteAuth := <-conn.remoteAuthChannel: - err = conn.agent.GatherCandidates() - if err != nil { - return err + err = conn.agent.GatherCandidates() + if err != nil { + return nil, err + } + + remoteConn, err := conn.openConnectionToRemote(remoteAuth.isControlling, remoteAuth) + if err != nil { + log.Errorf("failed establishing connection with the remote peer %s %s", conn.Config.RemoteWgKey.String(), err) + return nil, err + } + + wgConn, err := conn.createWireguardProxy() + if err != nil { + return nil, err + } + conn.wgConn = *wgConn + + go conn.proxyToRemotePeer(*wgConn, remoteConn) + go conn.proxyToLocalWireguard(*wgConn, remoteConn) + + log.Debugf("opened connection to peer %s", conn.Config.RemoteWgKey.String()) + + conn.isActive = true + + return conn.closedChannel, nil + case <-time.After(timeout): + return nil, fmt.Errorf("timeout of %vs exceeded while waiting for connection to peer %s", timeout.Seconds(), conn.Config.RemoteWgKey.String()) } - - remoteConn, err := conn.openConnectionToRemote(remoteAuth.isControlling, remoteAuth) - if err != nil { - log.Errorf("failed establishing connection with the remote peer %s %s", conn.Config.RemoteWgKey.String(), err) - return err - } - - wgConn, err := conn.createWireguardProxy() - if err != nil { - return err - } - conn.wgConn = *wgConn - - go conn.proxyToRemotePeer(*wgConn, remoteConn) - go conn.proxyToLocalWireguard(*wgConn, remoteConn) - - log.Debugf("opened connection to peer %s", conn.Config.RemoteWgKey.String()) - - conn.isActive = true - - return nil } func (conn *Connection) OnAnswer(remoteAuth IceCredentials) error { @@ -311,6 +312,10 @@ func (conn *Connection) listenOnConnectionStateChanges() error { log.Debugf("connected to peer %s via selected candidate pair %s", conn.Config.RemoteWgKey.String(), pair) } else if state == ice.ConnectionStateDisconnected || state == ice.ConnectionStateFailed { // todo do we really wanna have a connection restart within connection itself? Think of moving it outside + err := conn.Close() + if err != nil { + log.Errorf("failed closing connection fo peer %s %s", conn.Config.RemoteWgKey.String(), err.Error()) + } close(conn.closedChannel) } }) @@ -322,21 +327,6 @@ func (conn *Connection) listenOnConnectionStateChanges() error { return nil } -func (conn *Connection) Restart() error { - err := conn.Close() - if err != nil { - log.Errorf("failed closing connection to peer %s %s", conn.Config.RemoteWgKey.String(), err) - return err - } - err = conn.Open() - if err != nil { - log.Errorf("failed reopenning connection to peer %s %s", conn.Config.RemoteWgKey.String(), err) - return err - } - - return nil -} - // createWireguardProxy opens connection to a local Wireguard instance (proxy) and sets Wireguard's peer endpoint to point // to a local address of a proxy func (conn *Connection) createWireguardProxy() (*net.Conn, error) { diff --git a/connection/engine.go b/connection/engine.go index bd40fc67a..b41e05027 100644 --- a/connection/engine.go +++ b/connection/engine.go @@ -9,6 +9,7 @@ import ( "github.com/wiretrustee/wiretrustee/signal" sProto "github.com/wiretrustee/wiretrustee/signal/proto" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + "time" ) type Engine struct { @@ -76,7 +77,7 @@ func (e *Engine) Start(privateKey string, peers []Peer) error { go func() { operation := func() error { - conn, closed, err := e.openConnection(*wgPort, myKey, peer) + _, closed, err := e.openConnection(*wgPort, myKey, peer) if err != nil { return err } @@ -84,12 +85,9 @@ func (e *Engine) Start(privateKey string, peers []Peer) error { select { case _, ok := <-closed: if !ok { - err = conn.Close() - if err != nil { - return err - } + return fmt.Errorf("connection to peer %s has been closed", peer.WgPubKey) } - return fmt.Errorf("connection to peer %s has been closed", peer.WgPubKey) + return nil } } @@ -128,17 +126,15 @@ func (e *Engine) openConnection(wgPort int, myKey wgtypes.Key, peer Peer) (*Conn return signalCandidate(candidate, myKey, remoteKey, e.signal) } - connected := make(chan struct{}, 1) - closed := make(chan struct{}) - conn := NewConnection(*connConfig, signalCandidate, signalOffer, signalAnswer, closed, connected) + conn := NewConnection(*connConfig, signalCandidate, signalOffer, signalAnswer) e.conns[remoteKey.String()] = conn - // blocks until the connection is open (or timeout??) - err := conn.Open() + // blocks until the connection is open (or timeout) + closedCh, err := conn.Open(60 * time.Second) if err != nil { log.Errorf("error openning connection to a remote peer %s %s", remoteKey.String(), err.Error()) return nil, nil, err } - return conn, closed, nil + return conn, closedCh, nil } func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtypes.Key, s *signal.Client) error {