package signal import ( "context" "fmt" "github.com/cenkalti/backoff/v4" pb "github.com/golang/protobuf/proto" log "github.com/sirupsen/logrus" "github.com/wiretrustee/wiretrustee/signal/proto" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "io" "strings" "sync" "time" ) // A set of tools to exchange connection details (Wireguard endpoints) with the remote peer. // Wraps the Signal Exchange Service gRpc client type Client struct { key wgtypes.Key encryptionKey string realClient proto.SignalExchangeClient signalConn *grpc.ClientConn ctx context.Context stream proto.SignalExchange_ConnectStreamClient //waiting group to notify once stream is connected connWg sync.WaitGroup //todo use a channel instead?? } // Closes underlying connections to the Signal Exchange func (c *Client) Close() error { return c.signalConn.Close() } func NewClient(addr string, key wgtypes.Key, ctx context.Context) (*Client, error) { conn, err := grpc.DialContext( ctx, addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: 3 * time.Second, Timeout: 2 * time.Second, })) if err != nil { log.Errorf("failed to connect to the signalling server %v", err) return nil, err } return &Client{ realClient: proto.NewSignalExchangeClient(conn), ctx: ctx, signalConn: conn, key: key, }, nil } // Connects to the Signal Exchange message stream and starts receiving messages. // The messages will be handled by msgHandler function provided. // This function runs a goroutine underneath and reconnects to the Signal Exchange if errors occur (e.g. Exchange restart) // The key is the identifier of our Peer (could be Wireguard public key) func (c *Client) Receive(msgHandler func(msg *proto.Message) error) { c.connWg.Add(1) go func() { var backOff = &backoff.ExponentialBackOff{ InitialInterval: backoff.DefaultInitialInterval, RandomizationFactor: backoff.DefaultRandomizationFactor, Multiplier: backoff.DefaultMultiplier, MaxInterval: 3 * time.Second, MaxElapsedTime: time.Duration(0), //never stop Stop: backoff.Stop, Clock: backoff.SystemClock, } operation := func() error { err := c.connect(c.key.PublicKey().String(), msgHandler) if err != nil { log.Warnf("disconnected from the Signal Exchange due to an error %s. Retrying ... ", err) c.connWg.Add(1) return err } backOff.Reset() return nil } err := backoff.Retry(operation, backOff) if err != nil { log.Errorf("error while communicating with the Signal Exchange %s ", err) return } }() } func (c *Client) connect(key string, msgHandler func(msg *proto.Message) error) error { c.stream = nil // add key fingerprint to the request header to be identified on the server side md := metadata.New(map[string]string{proto.HeaderId: key}) ctx := metadata.NewOutgoingContext(c.ctx, md) ctx, cancel := context.WithCancel(ctx) defer cancel() stream, err := c.realClient.ConnectStream(ctx) c.stream = stream if err != nil { return err } //connection established we are good to use the stream c.connWg.Done() log.Infof("connected to the Signal Exchange Stream") return c.receive(stream, msgHandler) } // Waits until the client is connected to the message stream func (c *Client) WaitConnected() { c.connWg.Wait() } // Sends a message to the remote Peer through the Signal Exchange using established stream connection to the Signal Server // The Client.Receive method must be called before sending messages to establish initial connection to the Signal Exchange // Client.connWg can be used to wait func (c *Client) SendToStream(msg *proto.EncryptedMessage) error { if c.stream == nil { return fmt.Errorf("connection to the Signal Exchnage has not been established yet. Please call Client.Receive before sending messages") } err := c.stream.Send(msg) if err != nil { log.Errorf("error while sending message to peer [%s] [error: %v]", msg.RemoteKey, err) return err } return nil } // decryptMessage decrypts the body of the msg using Wireguard private key and Remote peer's public key func (c *Client) decryptMessage(msg *proto.EncryptedMessage) (*proto.Message, error) { remoteKey, err := wgtypes.ParseKey(msg.GetKey()) if err != nil { return nil, err } decryptedBody, err := Decrypt(msg.GetBody(), remoteKey, c.key) body := &proto.Body{} err = pb.Unmarshal(decryptedBody, body) if err != nil { return nil, err } return &proto.Message{ Key: msg.Key, RemoteKey: msg.RemoteKey, Body: body, }, nil } // encryptMessage encrypts the body of the msg using Wireguard private key and Remote peer's public key func (c *Client) encryptMessage(msg *proto.Message) (*proto.EncryptedMessage, error) { body, err := pb.Marshal(msg.GetBody()) if err != nil { return nil, err } remoteKey, err := wgtypes.ParseKey(msg.RemoteKey) if err != nil { return nil, err } encryptedBody, err := Encrypt(body, remoteKey, c.key) if err != nil { return nil, err } return &proto.EncryptedMessage{ Key: msg.GetKey(), RemoteKey: msg.GetRemoteKey(), Body: encryptedBody, }, nil } // Sends a message to the remote Peer through the Signal Exchange. func (c *Client) Send(msg *proto.Message) error { encryptedMessage, err := c.encryptMessage(msg) if err != nil { return err } _, err = c.realClient.Send(context.TODO(), encryptedMessage) if err != nil { log.Errorf("error while sending message to peer [%s] [error: %v]", msg.RemoteKey, err) return err } return nil } // Receives messages from other peers coming through the Signal Exchange func (c *Client) receive(stream proto.SignalExchange_ConnectStreamClient, msgHandler func(msg *proto.Message) error) error { for { msg, err := stream.Recv() if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { log.Warnf("stream canceled (usually indicates shutdown)") return err } else if s.Code() == codes.Unavailable { log.Warnf("server has been stopped") return err } else if err == io.EOF { log.Warnf("stream closed by server") return err } else if err != nil { return err } log.Debugf("received a new message from Peer [fingerprint: %s]", msg.Key) decryptedMessage, err := c.decryptMessage(msg) if err != nil { log.Errorf("failed decrypting message of Peer [key: %s] error: [%s]", msg.Key, err.Error()) } err = msgHandler(decryptedMessage) if err != nil { log.Errorf("error while handling message of Peer [key: %s] error: [%s]", msg.Key, err.Error()) //todo send something?? } } } func UnMarshalCredential(msg *proto.Message) (*Credential, error) { credential := strings.Split(msg.GetBody().GetPayload(), ":") if len(credential) != 2 { return nil, fmt.Errorf("error parsing message body %s", msg.Body) } return &Credential{ UFrag: credential[0], Pwd: credential[1], }, nil } func MarshalCredential(myKey wgtypes.Key, remoteKey wgtypes.Key, credential *Credential, t proto.Body_Type) (*proto.Message, error) { return &proto.Message{ Key: myKey.PublicKey().String(), RemoteKey: remoteKey.String(), Body: &proto.Body{ Type: t, Payload: fmt.Sprintf("%s:%s", credential.UFrag, credential.Pwd), }, }, nil } type Credential struct { UFrag string Pwd string }