netbird/signal/client/client.go

296 lines
8.4 KiB
Go
Raw Normal View History

package client
2021-05-01 12:45:37 +02:00
import (
"context"
"crypto/tls"
2021-05-01 12:45:37 +02:00
"fmt"
"github.com/cenkalti/backoff/v4"
log "github.com/sirupsen/logrus"
"github.com/wiretrustee/wiretrustee/encryption"
2021-05-01 12:45:37 +02:00
"github.com/wiretrustee/wiretrustee/signal/proto"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
2021-05-01 12:45:37 +02:00
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
2021-05-01 12:45:37 +02:00
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"io"
"strings"
"sync"
"time"
)
// A set of tools to exchange connection details (Wireguard endpoints) with the remote peer.
// Client Wraps the Signal Exchange Service gRpc client
2021-05-01 12:45:37 +02:00
type Client struct {
2021-05-19 10:58:21 +02:00
key wgtypes.Key
realClient proto.SignalExchangeClient
signalConn *grpc.ClientConn
ctx context.Context
stream proto.SignalExchange_ConnectStreamClient
2021-05-01 12:45:37 +02:00
//waiting group to notify once stream is connected
connWg *sync.WaitGroup //todo use a channel instead??
2021-05-01 12:45:37 +02:00
}
// Close Closes underlying connections to the Signal Exchange
func (c *Client) Close() error {
return c.signalConn.Close()
2021-05-01 12:45:37 +02:00
}
// NewClient creates a new Signal client
func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled bool) (*Client, error) {
transportOption := grpc.WithInsecure()
if tlsEnabled {
transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))
}
2021-05-01 12:45:37 +02:00
sigCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
2021-05-01 12:45:37 +02:00
conn, err := grpc.DialContext(
sigCtx,
2021-05-01 12:45:37 +02:00
addr,
transportOption,
2021-05-01 12:45:37 +02:00
grpc.WithBlock(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 15 * time.Second,
Timeout: 10 * time.Second,
2021-05-01 12:45:37 +02:00
}))
if err != nil {
log.Errorf("failed to connect to the signalling server %v", err)
return nil, err
}
var wg sync.WaitGroup
2021-05-01 12:45:37 +02:00
return &Client{
realClient: proto.NewSignalExchangeClient(conn),
ctx: ctx,
signalConn: conn,
key: key,
connWg: &wg,
2021-05-01 12:45:37 +02:00
}, nil
}
//defaultBackoff is a basic backoff mechanism for general issues
func defaultBackoff(ctx context.Context) backoff.BackOff {
return backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 800 * time.Millisecond,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 30 * time.Second,
MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
}
// Receive Connects to the Signal Exchange message stream and starts receiving messages.
2021-05-01 12:45:37 +02:00
// The messages will be handled by msgHandler function provided.
// This function runs a goroutine underneath and reconnects to the Signal Exchange if errors occur (e.g. Exchange restart)
// The key is the identifier of our Peer (could be Wireguard public key)
2021-05-05 10:40:53 +02:00
func (c *Client) Receive(msgHandler func(msg *proto.Message) error) {
c.connWg.Add(1)
2021-05-01 12:45:37 +02:00
go func() {
var backOff = defaultBackoff(c.ctx)
2021-05-01 12:45:37 +02:00
operation := func() error {
2021-05-05 10:40:53 +02:00
err := c.connect(c.key.PublicKey().String(), msgHandler)
2021-05-01 12:45:37 +02:00
if err != nil {
log.Warnf("disconnected from the Signal Exchange due to an error: %v", err)
c.connWg.Add(1)
2021-05-01 12:45:37 +02:00
return err
}
backOff.Reset()
return nil
}
err := backoff.Retry(operation, backOff)
if err != nil {
log.Errorf("exiting Signal Service connection retry loop due to unrecoverable error: %s", err)
2021-05-01 12:45:37 +02:00
return
}
}()
}
func (c *Client) connect(key string, msgHandler func(msg *proto.Message) error) error {
c.stream = nil
2021-05-01 12:45:37 +02:00
// add key fingerprint to the request header to be identified on the server side
md := metadata.New(map[string]string{proto.HeaderId: key})
ctx := metadata.NewOutgoingContext(c.ctx, md)
2021-05-01 12:45:37 +02:00
stream, err := c.realClient.ConnectStream(ctx, grpc.WaitForReady(true))
2021-05-01 12:45:37 +02:00
c.stream = stream
2021-05-01 12:45:37 +02:00
if err != nil {
return err
}
// blocks
header, err := c.stream.Header()
if err != nil {
return err
}
registered := header.Get(proto.HeaderRegistered)
if len(registered) == 0 {
return fmt.Errorf("didn't receive a registration header from the Signal server whille connecting to the streams")
}
2021-05-01 12:45:37 +02:00
//connection established we are good to use the stream
c.connWg.Done()
2021-05-01 12:45:37 +02:00
log.Infof("connected to the Signal Exchange Stream")
return c.receive(stream, msgHandler)
2021-05-01 12:45:37 +02:00
}
// WaitConnected waits until the client is connected to the message stream
func (c *Client) WaitConnected() {
c.connWg.Wait()
2021-05-01 12:45:37 +02:00
}
// SendToStream sends a message to the remote Peer through the Signal Exchange using established stream connection to the Signal Server
2021-05-01 12:45:37 +02:00
// The Client.Receive method must be called before sending messages to establish initial connection to the Signal Exchange
// Client.connWg can be used to wait
func (c *Client) SendToStream(msg *proto.EncryptedMessage) error {
2021-05-01 12:45:37 +02:00
if c.stream == nil {
2021-05-01 12:45:37 +02:00
return fmt.Errorf("connection to the Signal Exchnage has not been established yet. Please call Client.Receive before sending messages")
}
err := c.stream.Send(msg)
2021-05-01 12:45:37 +02:00
if err != nil {
log.Errorf("error while sending message to peer [%s] [error: %v]", msg.RemoteKey, err)
return err
}
return nil
}
// decryptMessage decrypts the body of the msg using Wireguard private key and Remote peer's public key
func (c *Client) decryptMessage(msg *proto.EncryptedMessage) (*proto.Message, error) {
remoteKey, err := wgtypes.ParseKey(msg.GetKey())
if err != nil {
return nil, err
}
body := &proto.Body{}
err = encryption.DecryptMessage(remoteKey, c.key, msg.GetBody(), body)
if err != nil {
return nil, err
}
return &proto.Message{
Key: msg.Key,
RemoteKey: msg.RemoteKey,
Body: body,
}, nil
}
// encryptMessage encrypts the body of the msg using Wireguard private key and Remote peer's public key
func (c *Client) encryptMessage(msg *proto.Message) (*proto.EncryptedMessage, error) {
remoteKey, err := wgtypes.ParseKey(msg.RemoteKey)
if err != nil {
return nil, err
}
encryptedBody, err := encryption.EncryptMessage(remoteKey, c.key, msg.Body)
if err != nil {
return nil, err
}
return &proto.EncryptedMessage{
Key: msg.GetKey(),
RemoteKey: msg.GetRemoteKey(),
Body: encryptedBody,
}, nil
}
// Send sends a message to the remote Peer through the Signal Exchange.
func (c *Client) Send(msg *proto.Message) error {
2021-05-01 12:45:37 +02:00
encryptedMessage, err := c.encryptMessage(msg)
if err != nil {
return err
}
_, err = c.realClient.Send(context.TODO(), encryptedMessage)
2021-05-01 12:45:37 +02:00
if err != nil {
log.Errorf("error while sending message to peer [%s] [error: %v]", msg.RemoteKey, err)
return err
}
return nil
}
// receive receives messages from other peers coming through the Signal Exchange
func (c *Client) receive(stream proto.SignalExchange_ConnectStreamClient,
2021-05-01 12:45:37 +02:00
msgHandler func(msg *proto.Message) error) error {
for {
msg, err := stream.Recv()
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
log.Warnf("stream canceled (usually indicates shutdown)")
return err
} else if s.Code() == codes.Unavailable {
log.Warnf("server has been stopped")
return err
} else if err == io.EOF {
log.Warnf("stream closed by server")
return err
} else if err != nil {
return err
}
log.Debugf("received a new message from Peer [fingerprint: %s]", msg.Key)
decryptedMessage, err := c.decryptMessage(msg)
if err != nil {
log.Errorf("failed decrypting message of Peer [key: %s] error: [%s]", msg.Key, err.Error())
}
2021-05-01 12:45:37 +02:00
err = msgHandler(decryptedMessage)
2021-05-01 12:45:37 +02:00
if err != nil {
log.Errorf("error while handling message of Peer [key: %s] error: [%s]", msg.Key, err.Error())
//todo send something??
}
}
}
// UnMarshalCredential parses the credentials from the message and returns a Credential instance
2021-05-01 12:45:37 +02:00
func UnMarshalCredential(msg *proto.Message) (*Credential, error) {
credential := strings.Split(msg.GetBody().GetPayload(), ":")
2021-05-01 12:45:37 +02:00
if len(credential) != 2 {
return nil, fmt.Errorf("error parsing message body %s", msg.Body)
}
return &Credential{
UFrag: credential[0],
Pwd: credential[1],
}, nil
}
// MarshalCredential marsharl a Credential instance and returns a Message object
func MarshalCredential(myKey wgtypes.Key, remoteKey wgtypes.Key, credential *Credential, t proto.Body_Type) (*proto.Message, error) {
2021-05-01 12:45:37 +02:00
return &proto.Message{
Key: myKey.PublicKey().String(),
RemoteKey: remoteKey.String(),
Body: &proto.Body{
Type: t,
Payload: fmt.Sprintf("%s:%s", credential.UFrag, credential.Pwd),
},
}, nil
2021-05-01 12:45:37 +02:00
}
// Credential is an instance of a Client's Credential
2021-05-01 12:45:37 +02:00
type Credential struct {
UFrag string
Pwd string
}