mirror of
https://github.com/zrepl/zrepl.git
synced 2025-01-05 13:59:06 +01:00
transport/local: fix "client-provided callback did block on send" crash
fixes #173 (amendment of 0e419c11df2bc45feb6edb7d82a309b0fe174875)
This commit is contained in:
parent
234a327a03
commit
b5dc04e62c
@ -61,7 +61,9 @@ func (l *LocalListener) Connect(dialCtx context.Context, clientIdentity string)
|
|||||||
// place request
|
// place request
|
||||||
req := connectRequest{
|
req := connectRequest{
|
||||||
clientIdentity: clientIdentity,
|
clientIdentity: clientIdentity,
|
||||||
callback: make(chan connectResult),
|
// ensure non-blocking send in Accept, we don't necessarily read from callback before
|
||||||
|
// Accept writes to it
|
||||||
|
callback: make(chan connectResult, 1),
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case l.connects <- req:
|
case l.connects <- req:
|
||||||
@ -93,19 +95,35 @@ func (l *LocalListener) Addr() net.Addr { return localAddr{"<listening>"} }
|
|||||||
|
|
||||||
func (l *LocalListener) Accept(ctx context.Context) (*transport.AuthConn, error) {
|
func (l *LocalListener) Accept(ctx context.Context) (*transport.AuthConn, error) {
|
||||||
respondToRequest := func(req connectRequest, res connectResult) (err error) {
|
respondToRequest := func(req connectRequest, res connectResult) (err error) {
|
||||||
|
|
||||||
transport.GetLogger(ctx).
|
transport.GetLogger(ctx).
|
||||||
WithField("res.conn", res.conn).WithField("res.err", res.err).
|
WithField("res.conn", res.conn).WithField("res.err", res.err).
|
||||||
Debug("responding to client request")
|
Debug("responding to client request")
|
||||||
|
|
||||||
|
// contract bewteen Connect and Accept is that Connect sends a req.callback
|
||||||
|
// into which we can send one result non-blockingly.
|
||||||
|
// We want to panic if that contract is violated (impl error)
|
||||||
|
//
|
||||||
|
// However, Connect also supports timeouts through context cancellation:
|
||||||
|
// Connect closes the channel into which we might send, which will panic.
|
||||||
|
//
|
||||||
|
// ==> distinguish those cases in defer
|
||||||
|
const clientCallbackBlocked = "client-provided callback did block on send"
|
||||||
defer func() {
|
defer func() {
|
||||||
errv := recover()
|
errv := recover()
|
||||||
|
if errv == clientCallbackBlocked {
|
||||||
|
// this would be a violation of contract betwee Connect and Accept, see above
|
||||||
|
panic(clientCallbackBlocked)
|
||||||
|
} else {
|
||||||
transport.GetLogger(ctx).WithField("recover_err", errv).
|
transport.GetLogger(ctx).WithField("recover_err", errv).
|
||||||
Debug("panic on send to client callback, likely a legitimate client-side timeout")
|
Debug("panic on send to client callback, likely a legitimate client-side timeout")
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
select {
|
select {
|
||||||
case req.callback <- res:
|
case req.callback <- res:
|
||||||
err = nil
|
err = nil
|
||||||
default:
|
default:
|
||||||
err = fmt.Errorf("client-provided callback did block on send")
|
panic(clientCallbackBlocked)
|
||||||
}
|
}
|
||||||
close(req.callback)
|
close(req.callback)
|
||||||
return err
|
return err
|
||||||
|
Loading…
Reference in New Issue
Block a user