diff --git a/signal/peer/peer.go b/signal/peer/peer.go index e0ee8d19d..fb5e1aa1b 100644 --- a/signal/peer/peer.go +++ b/signal/peer/peer.go @@ -55,7 +55,7 @@ func (registry *Registry) Register(peer *Peer) { // can be that peer already exists but it is fine (e.g. reconnect) // todo investigate what happens to the old peer (especially Peer.Stream) when we override it registry.Peers.Store(peer.Id, peer) - log.Printf("registered peer [%s]", peer.Id) + log.Debugf("peer registered [%s]", peer.Id) } @@ -63,7 +63,7 @@ func (registry *Registry) Register(peer *Peer) { func (registry *Registry) Deregister(peer *Peer) { _, loaded := registry.Peers.LoadAndDelete(peer.Id) if loaded { - log.Printf("deregistered peer [%s]", peer.Id) + log.Debugf("peer deregistered [%s]", peer.Id) } else { log.Warnf("attempted to remove non-existent peer [%s]", peer.Id) } diff --git a/signal/server/signal.go b/signal/server/signal.go index fc102c27c..c33287ab0 100644 --- a/signal/server/signal.go +++ b/signal/server/signal.go @@ -36,11 +36,11 @@ func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto. //forward the message to the target peer err := dstPeer.Stream.Send(msg) if err != nil { - log.Errorf("error while forwarding message from peer [%s] to peer [%s]", msg.Key, msg.RemoteKey) + log.Errorf("error while forwarding message from peer [%s] to peer [%s] %v", msg.Key, msg.RemoteKey, err) //todo respond to the sender? } } else { - log.Warnf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", msg.Key, msg.RemoteKey) + log.Debugf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", msg.Key, msg.RemoteKey) //todo respond to the sender? } return &proto.EncryptedMessage{}, nil @@ -48,11 +48,17 @@ func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto. // ConnectStream connects to the exchange stream func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer) error { + p, err := s.connectPeer(stream) if err != nil { return err } + defer func() { + log.Infof("peer disconnected [%s] ", p.Id) + s.registry.Deregister(p) + }() + //needed to confirm that the peer has been registered so that the client can proceed header := metadata.Pairs(proto.HeaderRegistered, "1") err = stream.SendHeader(header) @@ -60,8 +66,10 @@ func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer) return err } - log.Infof("peer [%s] has successfully connected", p.Id) + log.Infof("peer connected [%s]", p.Id) + for { + //read incoming messages msg, err := stream.Recv() if err == io.EOF { break @@ -74,14 +82,13 @@ func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer) //forward the message to the target peer err := dstPeer.Stream.Send(msg) if err != nil { - log.Errorf("error while forwarding message from peer [%s] to peer [%s]", p.Id, msg.RemoteKey) + log.Errorf("error while forwarding message from peer [%s] to peer [%s] %v", p.Id, msg.RemoteKey, err) //todo respond to the sender? } } else { - log.Warnf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", p.Id, msg.RemoteKey) + log.Debugf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", p.Id, msg.RemoteKey) //todo respond to the sender? } - } <-stream.Context().Done() return stream.Context().Err()