mirror of
https://github.com/zrepl/zrepl.git
synced 2025-06-20 09:47:50 +02:00
rpc: treat protocol handshake errors as permanent
treat handshake errors as permanent on the client The issue was observed by 100% CPU usage due to lack ofrate-limiting in dataconn.ReqPing retries=> safeguard that
This commit is contained in:
parent
7584c66bdb
commit
ab3e783168
@ -142,6 +142,11 @@ func (c *Client) WaitForConnectivity(ctx context.Context) error {
|
|||||||
// dataClient uses transport.Connecter, which doesn't expose FailFast(false)
|
// dataClient uses transport.Connecter, which doesn't expose FailFast(false)
|
||||||
// => we need to mask dial timeouts
|
// => we need to mask dial timeouts
|
||||||
if err, ok := dataErr.(interface{ Temporary() bool }); ok && err.Temporary() {
|
if err, ok := dataErr.(interface{ Temporary() bool }); ok && err.Temporary() {
|
||||||
|
// Rate-limit pings here in case Temporary() is a mis-classification
|
||||||
|
// or returns immediately (this is a tight loop in that case)
|
||||||
|
// TODO keep this in lockstep with controlClient
|
||||||
|
// => don't use FailFast for control, but check that both control and data worked
|
||||||
|
time.Sleep(envconst.Duration("ZREPL_RPC_DATACONN_PING_SLEEP", 1*time.Second))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// it's not a dial timeout,
|
// it's not a dial timeout,
|
||||||
|
@ -7,10 +7,10 @@ package transportmux
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"time"
|
"time"
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/zrepl/zrepl/logger"
|
"github.com/zrepl/zrepl/logger"
|
||||||
"github.com/zrepl/zrepl/transport"
|
"github.com/zrepl/zrepl/transport"
|
||||||
@ -111,7 +111,7 @@ func Demux(ctx context.Context, rawListener transport.AuthenticatedListener, lab
|
|||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
getLog(ctx).WithError(err).Error("accept error")
|
getLog(ctx).WithError(err).WithField("errType", fmt.Sprintf("%T", err)).Error("accept error")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
closeConn := func() {
|
closeConn := func() {
|
||||||
|
@ -26,14 +26,22 @@ type HandshakeError struct {
|
|||||||
msg string
|
msg string
|
||||||
// If not nil, the underlying IO error that caused the handshake to fail.
|
// If not nil, the underlying IO error that caused the handshake to fail.
|
||||||
IOError error
|
IOError error
|
||||||
|
isAcceptError bool
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ net.Error = &HandshakeError{}
|
var _ net.Error = &HandshakeError{}
|
||||||
|
|
||||||
func (e HandshakeError) Error() string { return e.msg }
|
func (e HandshakeError) Error() string { return e.msg }
|
||||||
|
|
||||||
// Always true to enable usage in a net.Listener.
|
// Like with net.OpErr (Go issue 6163), a client failing to handshake
|
||||||
func (e HandshakeError) Temporary() bool { return true }
|
// should be a temporary Accept error toward the Listener .
|
||||||
|
func (e HandshakeError) Temporary() bool {
|
||||||
|
if e.isAcceptError {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
te, ok := e.IOError.(interface{ Temporary() bool });
|
||||||
|
return ok && te.Temporary()
|
||||||
|
}
|
||||||
|
|
||||||
// If the underlying IOError was net.Error.Timeout(), Timeout() returns that value.
|
// If the underlying IOError was net.Error.Timeout(), Timeout() returns that value.
|
||||||
// Otherwise false.
|
// Otherwise false.
|
||||||
@ -142,14 +150,14 @@ func (m *HandshakeMessage) DecodeReader(r io.Reader, maxLen int) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func DoHandshakeCurrentVersion(conn net.Conn, deadline time.Time) error {
|
func DoHandshakeCurrentVersion(conn net.Conn, deadline time.Time) *HandshakeError {
|
||||||
// current protocol version is hardcoded here
|
// current protocol version is hardcoded here
|
||||||
return DoHandshakeVersion(conn, deadline, 1)
|
return DoHandshakeVersion(conn, deadline, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
const HandshakeMessageMaxLen = 16 * 4096
|
const HandshakeMessageMaxLen = 16 * 4096
|
||||||
|
|
||||||
func DoHandshakeVersion(conn net.Conn, deadline time.Time, version int) error {
|
func DoHandshakeVersion(conn net.Conn, deadline time.Time, version int) *HandshakeError {
|
||||||
ours := HandshakeMessage{
|
ours := HandshakeMessage{
|
||||||
ProtocolVersion: version,
|
ProtocolVersion: version,
|
||||||
Extensions: nil,
|
Extensions: nil,
|
||||||
|
@ -55,6 +55,7 @@ func (l HandshakeListener) Accept(ctx context.Context) (*transport.AuthConn, err
|
|||||||
dl = time.Now().Add(l.timeout) // shadowing
|
dl = time.Now().Add(l.timeout) // shadowing
|
||||||
}
|
}
|
||||||
if err := DoHandshakeCurrentVersion(conn, dl); err != nil {
|
if err := DoHandshakeCurrentVersion(conn, dl); err != nil {
|
||||||
|
err.isAcceptError = true
|
||||||
conn.Close()
|
conn.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user