rpc: proper handling of context cancellation for transportmux + dataconn

- prior to this patch, context cancellation would leave rpc.Server open
- did not make problems because context was only cancelled by SIGINT,
  which was immediately followed by os.Exit
This commit is contained in:
Christian Schwarz 2020-04-20 21:24:42 +02:00
parent 28e66ca78f
commit 3d91686350
5 changed files with 107 additions and 26 deletions

View File

@ -5,6 +5,7 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"sync"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
@ -50,16 +51,26 @@ func NewServer(wi WireInterceptor, logger Logger, handler Handler) *Server {
// No accept errors are returned: they are logged to the Logger passed // No accept errors are returned: they are logged to the Logger passed
// to the constructor. // to the constructor.
func (s *Server) Serve(ctx context.Context, l transport.AuthenticatedListener) { func (s *Server) Serve(ctx context.Context, l transport.AuthenticatedListener) {
var wg sync.WaitGroup
defer wg.Wait()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wg.Add(1)
go func() { go func() {
defer wg.Done()
<-ctx.Done() <-ctx.Done()
s.log.Debug("context done") s.log.Debug("context done, closing listener")
if err := l.Close(); err != nil { if err := l.Close(); err != nil {
s.log.WithError(err).Error("cannot close listener") s.log.WithError(err).Error("cannot close listener")
} }
}() }()
conns := make(chan *transport.AuthConn) conns := make(chan *transport.AuthConn)
wg.Add(1)
go func() { go func() {
defer wg.Done()
defer close(conns)
for { for {
conn, err := l.Accept(ctx) conn, err := l.Accept(ctx)
if err != nil { if err != nil {
@ -74,7 +85,11 @@ func (s *Server) Serve(ctx context.Context, l transport.AuthenticatedListener) {
} }
}() }()
for conn := range conns { for conn := range conns {
go s.serveConn(conn) wg.Add(1)
go func(conn *transport.AuthConn) {
defer wg.Done()
s.serveConn(conn)
}(conn)
} }
} }

View File

@ -33,8 +33,12 @@ import (
type Logger = logger.Logger type Logger = logger.Logger
type acceptRes struct {
conn *transport.AuthConn
err error
}
type acceptReq struct { type acceptReq struct {
callback chan net.Conn callback chan acceptRes
} }
type Listener struct { type Listener struct {
@ -64,10 +68,19 @@ func New(authListener transport.AuthenticatedListener, l Logger) *Listener {
// The returned net.Conn is guaranteed to be *transport.AuthConn, i.e., the type of connection // The returned net.Conn is guaranteed to be *transport.AuthConn, i.e., the type of connection
// returned by the wrapped transport.AuthenticatedListener. // returned by the wrapped transport.AuthenticatedListener.
func (a Listener) Accept() (net.Conn, error) { func (a Listener) Accept() (net.Conn, error) {
req := acceptReq{make(chan net.Conn, 1)} req := acceptReq{make(chan acceptRes, 1)}
a.accepts <- req
conn := <-req.callback select {
return conn, nil case a.accepts <- req:
case <-a.stop:
return nil, fmt.Errorf("already closed") // TODO net.Error
}
res, ok := <-req.callback
if !ok {
return nil, fmt.Errorf("already closed") // TODO net.Error
}
return res.conn, res.err
} }
func (a Listener) handleAccept() { func (a Listener) handleAccept() {
@ -77,18 +90,9 @@ func (a Listener) handleAccept() {
a.logger.Debug("handleAccept stop accepting") a.logger.Debug("handleAccept stop accepting")
return return
case req := <-a.accepts: case req := <-a.accepts:
for { a.logger.Debug("accept authListener")
a.logger.Debug("accept authListener") authConn, err := a.al.Accept(context.Background())
authConn, err := a.al.Accept(context.Background()) req.callback <- acceptRes{authConn, err}
if err != nil {
a.logger.WithError(err).Error("accept error")
continue
}
a.logger.WithField("type", fmt.Sprintf("%T", authConn)).
Debug("accept complete")
req.callback <- authConn
break
}
} }
} }
} }

View File

@ -47,8 +47,9 @@ func NewServer(handler Handler, loggers Loggers, ctxInterceptor HandlerContextIn
if dl, ok := ctx.Deadline(); ok { if dl, ok := ctx.Deadline(); ok {
go time.AfterFunc(dl.Sub(dl), controlServer.Stop) go time.AfterFunc(dl.Sub(dl), controlServer.Stop)
} }
loggers.Control.Debug("shutting down control server") loggers.Control.Debug("gracefully shutting down control server")
controlServer.GracefulStop() controlServer.GracefulStop()
loggers.Control.Debug("gracdeful shut down of control server complete")
}() }()
errOut <- serve() errOut <- serve()
@ -84,6 +85,8 @@ func NewServer(handler Handler, loggers Loggers, ctxInterceptor HandlerContextIn
// Serve never returns an error, it logs them to the Server's logger. // Serve never returns an error, it logs them to the Server's logger.
func (s *Server) Serve(ctx context.Context, l transport.AuthenticatedListener) { func (s *Server) Serve(ctx context.Context, l transport.AuthenticatedListener) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer s.logger.Debug("rpc.(*Server).Serve done")
l = versionhandshake.Listener(l, envconst.Duration("ZREPL_RPC_SERVER_VERSIONHANDSHAKE_TIMEOUT", 10*time.Second)) l = versionhandshake.Listener(l, envconst.Duration("ZREPL_RPC_SERVER_VERSIONHANDSHAKE_TIMEOUT", 10*time.Second))

View File

@ -7,6 +7,8 @@ package transportmux
import ( import (
"context" "context"
"sync/atomic"
"syscall"
"fmt" "fmt"
"io" "io"
@ -42,12 +44,31 @@ type acceptRes struct {
} }
type demuxListener struct { type demuxListener struct {
conns chan acceptRes closed int32
conns chan acceptRes
}
var ErrClosed = &net.OpError{
Op: "accept",
Net: "demux",
Source: nil,
Addr: nil,
Err: syscall.EINVAL,
} }
func (l *demuxListener) Accept(ctx context.Context) (*transport.AuthConn, error) { func (l *demuxListener) Accept(ctx context.Context) (*transport.AuthConn, error) {
res := <-l.conns if atomic.LoadInt32(&l.closed) != 0 {
return res.conn, res.err return nil, ErrClosed
}
select {
case r, ok := <-l.conns:
if !ok {
return nil, ErrClosed
}
return r.conn, r.err
case <-ctx.Done():
return nil, ctx.Err()
}
} }
type demuxAddr struct{} type demuxAddr struct{}
@ -59,7 +80,10 @@ func (l *demuxListener) Addr() net.Addr {
return demuxAddr{} return demuxAddr{}
} }
func (l *demuxListener) Close() error { return nil } // TODO func (l *demuxListener) Close() error {
atomic.StoreInt32(&l.closed, 1)
return nil
}
// Exact length of a label in bytes (0-byte padded if it is shorter). // Exact length of a label in bytes (0-byte padded if it is shorter).
// This is a protocol constant, changing it breaks the wire protocol. // This is a protocol constant, changing it breaks the wire protocol.
@ -90,7 +114,10 @@ func Demux(ctx context.Context, rawListener transport.AuthenticatedListener, lab
if _, ok := padded[labelPadded]; ok { if _, ok := padded[labelPadded]; ok {
return nil, fmt.Errorf("duplicate label %q", label) return nil, fmt.Errorf("duplicate label %q", label)
} }
dl := &demuxListener{make(chan acceptRes)} dl := &demuxListener{
closed: 0,
conns: make(chan acceptRes, 1),
}
padded[labelPadded] = dl padded[labelPadded] = dl
ret[label] = dl ret[label] = dl
} }
@ -103,10 +130,37 @@ func Demux(ctx context.Context, rawListener transport.AuthenticatedListener, lab
if err := rawListener.Close(); err != nil { if err := rawListener.Close(); err != nil {
getLog(ctx).WithError(err).Error("error closing listener") getLog(ctx).WithError(err).Error("error closing listener")
} }
drainConns := func(ch chan acceptRes) {
for c := range ch {
if c.conn != nil {
if err := c.conn.Close(); err != nil {
getLog(ctx).WithError(err).Error("error closing connection while draining after listener was closed")
}
}
}
}
for _, dl := range ret {
atomic.StoreInt32(&dl.(*demuxListener).closed, 1)
drainConns(dl.(*demuxListener).conns)
}
}() }()
go func() { go func() {
defer func() {
for _, dl := range ret {
close(dl.(*demuxListener).conns)
}
}()
for { for {
select {
case <-ctx.Done():
getLog(ctx).WithError(ctx.Err()).Info("stop accepting new connections after context done")
return
default:
}
rawConn, err := rawListener.Accept(ctx) rawConn, err := rawListener.Accept(ctx)
if err != nil { if err != nil {
if ctx.Err() != nil { if ctx.Err() != nil {
@ -147,7 +201,6 @@ func Demux(ctx context.Context, rawListener transport.AuthenticatedListener, lab
if err != nil { if err != nil {
getLog(ctx).WithError(err).Error("cannot reset deadline") getLog(ctx).WithError(err).Error("cannot reset deadline")
} }
// blocking is intentional
demuxListener.conns <- acceptRes{conn: rawConn, err: nil} demuxListener.conns <- acceptRes{conn: rawConn, err: nil}
} }
}() }()

View File

@ -65,6 +65,12 @@ type TCPAuthListener struct {
} }
func (f *TCPAuthListener) Accept(ctx context.Context) (*transport.AuthConn, error) { func (f *TCPAuthListener) Accept(ctx context.Context) (*transport.AuthConn, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
<-ctx.Done()
cancel()
}()
nc, err := f.TCPListener.AcceptTCP() nc, err := f.TCPListener.AcceptTCP()
if err != nil { if err != nil {
return nil, err return nil, err