[Signal] - when peer disconnects registry keeps broken gRPC stream (#178)

* fix: [signal] - when peer disconnects registry keeps broken gRPC stream. The peer is removed on stream closed.

* chore: [signal] - improve logging

* chore: [signal] - improve logging
This commit is contained in:
Mikhail Bragin 2021-12-31 19:25:44 +01:00 committed by GitHub
parent a67b9a16af
commit 4d2b194570
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 15 additions and 8 deletions

View File

@ -55,7 +55,7 @@ func (registry *Registry) Register(peer *Peer) {
// can be that peer already exists but it is fine (e.g. reconnect) // can be that peer already exists but it is fine (e.g. reconnect)
// todo investigate what happens to the old peer (especially Peer.Stream) when we override it // todo investigate what happens to the old peer (especially Peer.Stream) when we override it
registry.Peers.Store(peer.Id, peer) registry.Peers.Store(peer.Id, peer)
log.Printf("registered peer [%s]", peer.Id) log.Debugf("peer registered [%s]", peer.Id)
} }
@ -63,7 +63,7 @@ func (registry *Registry) Register(peer *Peer) {
func (registry *Registry) Deregister(peer *Peer) { func (registry *Registry) Deregister(peer *Peer) {
_, loaded := registry.Peers.LoadAndDelete(peer.Id) _, loaded := registry.Peers.LoadAndDelete(peer.Id)
if loaded { if loaded {
log.Printf("deregistered peer [%s]", peer.Id) log.Debugf("peer deregistered [%s]", peer.Id)
} else { } else {
log.Warnf("attempted to remove non-existent peer [%s]", peer.Id) log.Warnf("attempted to remove non-existent peer [%s]", peer.Id)
} }

View File

@ -36,11 +36,11 @@ func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto.
//forward the message to the target peer //forward the message to the target peer
err := dstPeer.Stream.Send(msg) err := dstPeer.Stream.Send(msg)
if err != nil { if err != nil {
log.Errorf("error while forwarding message from peer [%s] to peer [%s]", msg.Key, msg.RemoteKey) log.Errorf("error while forwarding message from peer [%s] to peer [%s] %v", msg.Key, msg.RemoteKey, err)
//todo respond to the sender? //todo respond to the sender?
} }
} else { } else {
log.Warnf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", msg.Key, msg.RemoteKey) log.Debugf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", msg.Key, msg.RemoteKey)
//todo respond to the sender? //todo respond to the sender?
} }
return &proto.EncryptedMessage{}, nil return &proto.EncryptedMessage{}, nil
@ -48,11 +48,17 @@ func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto.
// ConnectStream connects to the exchange stream // ConnectStream connects to the exchange stream
func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer) error { func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer) error {
p, err := s.connectPeer(stream) p, err := s.connectPeer(stream)
if err != nil { if err != nil {
return err return err
} }
defer func() {
log.Infof("peer disconnected [%s] ", p.Id)
s.registry.Deregister(p)
}()
//needed to confirm that the peer has been registered so that the client can proceed //needed to confirm that the peer has been registered so that the client can proceed
header := metadata.Pairs(proto.HeaderRegistered, "1") header := metadata.Pairs(proto.HeaderRegistered, "1")
err = stream.SendHeader(header) err = stream.SendHeader(header)
@ -60,8 +66,10 @@ func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer)
return err return err
} }
log.Infof("peer [%s] has successfully connected", p.Id) log.Infof("peer connected [%s]", p.Id)
for { for {
//read incoming messages
msg, err := stream.Recv() msg, err := stream.Recv()
if err == io.EOF { if err == io.EOF {
break break
@ -74,14 +82,13 @@ func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer)
//forward the message to the target peer //forward the message to the target peer
err := dstPeer.Stream.Send(msg) err := dstPeer.Stream.Send(msg)
if err != nil { if err != nil {
log.Errorf("error while forwarding message from peer [%s] to peer [%s]", p.Id, msg.RemoteKey) log.Errorf("error while forwarding message from peer [%s] to peer [%s] %v", p.Id, msg.RemoteKey, err)
//todo respond to the sender? //todo respond to the sender?
} }
} else { } else {
log.Warnf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", p.Id, msg.RemoteKey) log.Debugf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", p.Id, msg.RemoteKey)
//todo respond to the sender? //todo respond to the sender?
} }
} }
<-stream.Context().Done() <-stream.Context().Done()
return stream.Context().Err() return stream.Context().Err()