[#307] rpc: proper handling of context cancellation for transportmux + dataconn

- prior to this patch, context cancellation would leave rpc.Server open
- did not make problems because context was only cancelled by SIGINT,
  which was immediately followed by os.Exit
This commit is contained in:
Christian Schwarz
2020-04-20 21:24:42 +02:00
parent f772b3d39f
commit bcb5965617
5 changed files with 107 additions and 26 deletions

View File

@@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"sync"
"github.com/golang/protobuf/proto"
@@ -50,16 +51,26 @@ func NewServer(wi WireInterceptor, logger Logger, handler Handler) *Server {
// No accept errors are returned: they are logged to the Logger passed
// to the constructor.
func (s *Server) Serve(ctx context.Context, l transport.AuthenticatedListener) {
var wg sync.WaitGroup
defer wg.Wait()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wg.Add(1)
go func() {
defer wg.Done()
<-ctx.Done()
s.log.Debug("context done")
s.log.Debug("context done, closing listener")
if err := l.Close(); err != nil {
s.log.WithError(err).Error("cannot close listener")
}
}()
conns := make(chan *transport.AuthConn)
wg.Add(1)
go func() {
defer wg.Done()
defer close(conns)
for {
conn, err := l.Accept(ctx)
if err != nil {
@@ -74,7 +85,11 @@ func (s *Server) Serve(ctx context.Context, l transport.AuthenticatedListener) {
}
}()
for conn := range conns {
go s.serveConn(conn)
wg.Add(1)
go func(conn *transport.AuthConn) {
defer wg.Done()
s.serveConn(conn)
}(conn)
}
}