diff --git a/rpc/rpc_client.go b/rpc/rpc_client.go index c34dc3b..018ee3f 100644 --- a/rpc/rpc_client.go +++ b/rpc/rpc_client.go @@ -142,6 +142,11 @@ func (c *Client) WaitForConnectivity(ctx context.Context) error { // dataClient uses transport.Connecter, which doesn't expose FailFast(false) // => we need to mask dial timeouts if err, ok := dataErr.(interface{ Temporary() bool }); ok && err.Temporary() { + // Rate-limit pings here in case Temporary() is a mis-classification + // or returns immediately (this is a tight loop in that case) + // TODO keep this in lockstep with controlClient + // => don't use FailFast for control, but check that both control and data worked + time.Sleep(envconst.Duration("ZREPL_RPC_DATACONN_PING_SLEEP", 1*time.Second)) continue } // it's not a dial timeout, diff --git a/rpc/transportmux/transportmux.go b/rpc/transportmux/transportmux.go index cb6f7ca..f78c1e3 100644 --- a/rpc/transportmux/transportmux.go +++ b/rpc/transportmux/transportmux.go @@ -7,10 +7,10 @@ package transportmux import ( "context" + "fmt" "io" "net" "time" - "fmt" "github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/transport" @@ -111,7 +111,7 @@ func Demux(ctx context.Context, rawListener transport.AuthenticatedListener, lab if ctx.Err() != nil { return } - getLog(ctx).WithError(err).Error("accept error") + getLog(ctx).WithError(err).WithField("errType", fmt.Sprintf("%T", err)).Error("accept error") continue } closeConn := func() { diff --git a/rpc/versionhandshake/versionhandshake.go b/rpc/versionhandshake/versionhandshake.go index 3864868..03835ee 100644 --- a/rpc/versionhandshake/versionhandshake.go +++ b/rpc/versionhandshake/versionhandshake.go @@ -26,14 +26,22 @@ type HandshakeError struct { msg string // If not nil, the underlying IO error that caused the handshake to fail. IOError error + isAcceptError bool } var _ net.Error = &HandshakeError{} func (e HandshakeError) Error() string { return e.msg } -// Always true to enable usage in a net.Listener. -func (e HandshakeError) Temporary() bool { return true } +// Like with net.OpErr (Go issue 6163), a client failing to handshake +// should be a temporary Accept error toward the Listener . +func (e HandshakeError) Temporary() bool { + if e.isAcceptError { + return true + } + te, ok := e.IOError.(interface{ Temporary() bool }); + return ok && te.Temporary() +} // If the underlying IOError was net.Error.Timeout(), Timeout() returns that value. // Otherwise false. @@ -142,14 +150,14 @@ func (m *HandshakeMessage) DecodeReader(r io.Reader, maxLen int) error { return nil } -func DoHandshakeCurrentVersion(conn net.Conn, deadline time.Time) error { +func DoHandshakeCurrentVersion(conn net.Conn, deadline time.Time) *HandshakeError { // current protocol version is hardcoded here return DoHandshakeVersion(conn, deadline, 1) } const HandshakeMessageMaxLen = 16 * 4096 -func DoHandshakeVersion(conn net.Conn, deadline time.Time, version int) error { +func DoHandshakeVersion(conn net.Conn, deadline time.Time, version int) *HandshakeError { ours := HandshakeMessage{ ProtocolVersion: version, Extensions: nil, diff --git a/rpc/versionhandshake/versionhandshake_transport_wrappers.go b/rpc/versionhandshake/versionhandshake_transport_wrappers.go index 660215e..09ead7a 100644 --- a/rpc/versionhandshake/versionhandshake_transport_wrappers.go +++ b/rpc/versionhandshake/versionhandshake_transport_wrappers.go @@ -55,6 +55,7 @@ func (l HandshakeListener) Accept(ctx context.Context) (*transport.AuthConn, err dl = time.Now().Add(l.timeout) // shadowing } if err := DoHandshakeCurrentVersion(conn, dl); err != nil { + err.isAcceptError = true conn.Close() return nil, err }