zrepl/rpc/frame_layer.go

301 lines
6.7 KiB
Go
Raw Normal View History

reimplement io.ReadWriteCloser based RPC mechanism The existing ByteStreamRPC requires writing RPC stub + server code for each RPC endpoint. Does not scale well. Goal: adding a new RPC call should - not require writing an RPC stub / handler - not require modifications to the RPC lib The wire format is inspired by HTTP2, the API by net/rpc. Frames are used for framing messages, i.e. a message is made of multiple frames which are glued together using a frame-bridging reader / writer. This roughly corresponds to HTTP2 streams, although we're happy with just one stream at any time and the resulting non-need for flow control, etc. Frames are typed using a header. The two most important types are 'Header' and 'Data'. The RPC protocol is built on top of this: - Client sends a header => multiple frames of type 'header' - Client sends request body => mulitiple frames of type 'data' - Server reads a header => multiple frames of type 'header' - Server reads request body => mulitiple frames of type 'data' - Server sends response header => ... - Server sends response body => ... An RPC header is serialized JSON and always the same structure. The body is of the type specified in the header. The RPC server and client use some semi-fancy reflection tequniques to automatically infer the data type of the request/response body based on the method signature of the server handler; or the client parameters, respectively. This boils down to a special-case for io.Reader, which are just dumped into a series of data frames as efficiently as possible. All other types are (de)serialized using encoding/json. The RPC layer and Frame Layer log some arbitrary messages that proved useful during debugging. By default, they log to a non-logger, which should not have a big impact on performance. pprof analysis shows the implementation spends its CPU time 60% waiting for syscalls 30% in memmove 10% ... On a Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz CPU, Linux 4.12, the implementation achieved ~3.6GiB/s. Future optimization may include spice(2) / vmspice(2) on Linux, although this doesn't fit so well with the heavy use of io.Reader / io.Writer throughout the codebase. The existing hackaround for local calls was re-implemented to fit the new interface of PRCServer and RPCClient. The 'R'PC method invocation is a bit slower because reflection is involved inbetween, but otherwise performance should be no different. The RPC code currently does not support multipart requests and thus does not support the equivalent of a POST. Thus, the switch to the new rpc code had the following fallout: - Move request objects + constants from rpc package to main app code - Sacrifice the hacky 'push = pull me' way of doing push -> need to further extend RPC to support multipart requests or something to implement this properly with additional interfaces -> should be done after replication is abstracted better than separate algorithms for doPull() and doPush()
2017-08-19 22:37:14 +02:00
package rpc
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"github.com/pkg/errors"
)
type Frame struct {
Type FrameType
NoMoreFrames bool
PayloadLength uint32
}
//go:generate stringer -type=FrameType
type FrameType uint8
const (
FrameTypeHeader FrameType = 0x01
FrameTypeData FrameType = 0x02
FrameTypeTrailer FrameType = 0x03
FrameTypeRST FrameType = 0xff
)
type Status uint64
const (
StatusOK Status = 1 + iota
StatusRequestError
StatusServerError
// Returned when an error occurred but the side at fault cannot be determined
StatusError
)
type Header struct {
// Request-only
Endpoint string
// Data type of body (request & reply)
DataType DataType
// Request-only
Accept DataType
// Reply-only
Error Status
// Reply-only
ErrorMessage string
}
func NewErrorHeader(status Status, format string, args ...interface{}) (h *Header) {
h = &Header{}
h.Error = status
h.ErrorMessage = fmt.Sprintf(format, args...)
return
}
type DataType uint8
const (
DataTypeNone DataType = 1 + iota
DataTypeControl
reimplement io.ReadWriteCloser based RPC mechanism The existing ByteStreamRPC requires writing RPC stub + server code for each RPC endpoint. Does not scale well. Goal: adding a new RPC call should - not require writing an RPC stub / handler - not require modifications to the RPC lib The wire format is inspired by HTTP2, the API by net/rpc. Frames are used for framing messages, i.e. a message is made of multiple frames which are glued together using a frame-bridging reader / writer. This roughly corresponds to HTTP2 streams, although we're happy with just one stream at any time and the resulting non-need for flow control, etc. Frames are typed using a header. The two most important types are 'Header' and 'Data'. The RPC protocol is built on top of this: - Client sends a header => multiple frames of type 'header' - Client sends request body => mulitiple frames of type 'data' - Server reads a header => multiple frames of type 'header' - Server reads request body => mulitiple frames of type 'data' - Server sends response header => ... - Server sends response body => ... An RPC header is serialized JSON and always the same structure. The body is of the type specified in the header. The RPC server and client use some semi-fancy reflection tequniques to automatically infer the data type of the request/response body based on the method signature of the server handler; or the client parameters, respectively. This boils down to a special-case for io.Reader, which are just dumped into a series of data frames as efficiently as possible. All other types are (de)serialized using encoding/json. The RPC layer and Frame Layer log some arbitrary messages that proved useful during debugging. By default, they log to a non-logger, which should not have a big impact on performance. pprof analysis shows the implementation spends its CPU time 60% waiting for syscalls 30% in memmove 10% ... On a Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz CPU, Linux 4.12, the implementation achieved ~3.6GiB/s. Future optimization may include spice(2) / vmspice(2) on Linux, although this doesn't fit so well with the heavy use of io.Reader / io.Writer throughout the codebase. The existing hackaround for local calls was re-implemented to fit the new interface of PRCServer and RPCClient. The 'R'PC method invocation is a bit slower because reflection is involved inbetween, but otherwise performance should be no different. The RPC code currently does not support multipart requests and thus does not support the equivalent of a POST. Thus, the switch to the new rpc code had the following fallout: - Move request objects + constants from rpc package to main app code - Sacrifice the hacky 'push = pull me' way of doing push -> need to further extend RPC to support multipart requests or something to implement this properly with additional interfaces -> should be done after replication is abstracted better than separate algorithms for doPull() and doPush()
2017-08-19 22:37:14 +02:00
DataTypeMarshaledJSON
DataTypeOctets
)
const (
MAX_PAYLOAD_LENGTH = 4 * 1024 * 1024
MAX_HEADER_LENGTH = 4 * 1024
)
type frameBridgingReader struct {
l *MessageLayer
frameType FrameType
// < 0 means no limit
bytesLeftToLimit int
f Frame
}
func NewFrameBridgingReader(l *MessageLayer, frameType FrameType, totalLimit int) *frameBridgingReader {
return &frameBridgingReader{l, frameType, totalLimit, Frame{}}
}
func (r *frameBridgingReader) Read(b []byte) (n int, err error) {
if r.bytesLeftToLimit == 0 {
r.l.logger.Printf("limit reached, returning EOF")
reimplement io.ReadWriteCloser based RPC mechanism The existing ByteStreamRPC requires writing RPC stub + server code for each RPC endpoint. Does not scale well. Goal: adding a new RPC call should - not require writing an RPC stub / handler - not require modifications to the RPC lib The wire format is inspired by HTTP2, the API by net/rpc. Frames are used for framing messages, i.e. a message is made of multiple frames which are glued together using a frame-bridging reader / writer. This roughly corresponds to HTTP2 streams, although we're happy with just one stream at any time and the resulting non-need for flow control, etc. Frames are typed using a header. The two most important types are 'Header' and 'Data'. The RPC protocol is built on top of this: - Client sends a header => multiple frames of type 'header' - Client sends request body => mulitiple frames of type 'data' - Server reads a header => multiple frames of type 'header' - Server reads request body => mulitiple frames of type 'data' - Server sends response header => ... - Server sends response body => ... An RPC header is serialized JSON and always the same structure. The body is of the type specified in the header. The RPC server and client use some semi-fancy reflection tequniques to automatically infer the data type of the request/response body based on the method signature of the server handler; or the client parameters, respectively. This boils down to a special-case for io.Reader, which are just dumped into a series of data frames as efficiently as possible. All other types are (de)serialized using encoding/json. The RPC layer and Frame Layer log some arbitrary messages that proved useful during debugging. By default, they log to a non-logger, which should not have a big impact on performance. pprof analysis shows the implementation spends its CPU time 60% waiting for syscalls 30% in memmove 10% ... On a Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz CPU, Linux 4.12, the implementation achieved ~3.6GiB/s. Future optimization may include spice(2) / vmspice(2) on Linux, although this doesn't fit so well with the heavy use of io.Reader / io.Writer throughout the codebase. The existing hackaround for local calls was re-implemented to fit the new interface of PRCServer and RPCClient. The 'R'PC method invocation is a bit slower because reflection is involved inbetween, but otherwise performance should be no different. The RPC code currently does not support multipart requests and thus does not support the equivalent of a POST. Thus, the switch to the new rpc code had the following fallout: - Move request objects + constants from rpc package to main app code - Sacrifice the hacky 'push = pull me' way of doing push -> need to further extend RPC to support multipart requests or something to implement this properly with additional interfaces -> should be done after replication is abstracted better than separate algorithms for doPull() and doPush()
2017-08-19 22:37:14 +02:00
return 0, io.EOF
}
log := r.l.logger
if r.f.PayloadLength == 0 {
if r.f.NoMoreFrames {
r.l.logger.Printf("no more frames flag set, returning EOF")
reimplement io.ReadWriteCloser based RPC mechanism The existing ByteStreamRPC requires writing RPC stub + server code for each RPC endpoint. Does not scale well. Goal: adding a new RPC call should - not require writing an RPC stub / handler - not require modifications to the RPC lib The wire format is inspired by HTTP2, the API by net/rpc. Frames are used for framing messages, i.e. a message is made of multiple frames which are glued together using a frame-bridging reader / writer. This roughly corresponds to HTTP2 streams, although we're happy with just one stream at any time and the resulting non-need for flow control, etc. Frames are typed using a header. The two most important types are 'Header' and 'Data'. The RPC protocol is built on top of this: - Client sends a header => multiple frames of type 'header' - Client sends request body => mulitiple frames of type 'data' - Server reads a header => multiple frames of type 'header' - Server reads request body => mulitiple frames of type 'data' - Server sends response header => ... - Server sends response body => ... An RPC header is serialized JSON and always the same structure. The body is of the type specified in the header. The RPC server and client use some semi-fancy reflection tequniques to automatically infer the data type of the request/response body based on the method signature of the server handler; or the client parameters, respectively. This boils down to a special-case for io.Reader, which are just dumped into a series of data frames as efficiently as possible. All other types are (de)serialized using encoding/json. The RPC layer and Frame Layer log some arbitrary messages that proved useful during debugging. By default, they log to a non-logger, which should not have a big impact on performance. pprof analysis shows the implementation spends its CPU time 60% waiting for syscalls 30% in memmove 10% ... On a Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz CPU, Linux 4.12, the implementation achieved ~3.6GiB/s. Future optimization may include spice(2) / vmspice(2) on Linux, although this doesn't fit so well with the heavy use of io.Reader / io.Writer throughout the codebase. The existing hackaround for local calls was re-implemented to fit the new interface of PRCServer and RPCClient. The 'R'PC method invocation is a bit slower because reflection is involved inbetween, but otherwise performance should be no different. The RPC code currently does not support multipart requests and thus does not support the equivalent of a POST. Thus, the switch to the new rpc code had the following fallout: - Move request objects + constants from rpc package to main app code - Sacrifice the hacky 'push = pull me' way of doing push -> need to further extend RPC to support multipart requests or something to implement this properly with additional interfaces -> should be done after replication is abstracted better than separate algorithms for doPull() and doPush()
2017-08-19 22:37:14 +02:00
err = io.EOF
return
}
log.Printf("reading frame")
r.f, err = r.l.readFrame()
if err != nil {
log.Printf("error reading frame: %+v", err)
reimplement io.ReadWriteCloser based RPC mechanism The existing ByteStreamRPC requires writing RPC stub + server code for each RPC endpoint. Does not scale well. Goal: adding a new RPC call should - not require writing an RPC stub / handler - not require modifications to the RPC lib The wire format is inspired by HTTP2, the API by net/rpc. Frames are used for framing messages, i.e. a message is made of multiple frames which are glued together using a frame-bridging reader / writer. This roughly corresponds to HTTP2 streams, although we're happy with just one stream at any time and the resulting non-need for flow control, etc. Frames are typed using a header. The two most important types are 'Header' and 'Data'. The RPC protocol is built on top of this: - Client sends a header => multiple frames of type 'header' - Client sends request body => mulitiple frames of type 'data' - Server reads a header => multiple frames of type 'header' - Server reads request body => mulitiple frames of type 'data' - Server sends response header => ... - Server sends response body => ... An RPC header is serialized JSON and always the same structure. The body is of the type specified in the header. The RPC server and client use some semi-fancy reflection tequniques to automatically infer the data type of the request/response body based on the method signature of the server handler; or the client parameters, respectively. This boils down to a special-case for io.Reader, which are just dumped into a series of data frames as efficiently as possible. All other types are (de)serialized using encoding/json. The RPC layer and Frame Layer log some arbitrary messages that proved useful during debugging. By default, they log to a non-logger, which should not have a big impact on performance. pprof analysis shows the implementation spends its CPU time 60% waiting for syscalls 30% in memmove 10% ... On a Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz CPU, Linux 4.12, the implementation achieved ~3.6GiB/s. Future optimization may include spice(2) / vmspice(2) on Linux, although this doesn't fit so well with the heavy use of io.Reader / io.Writer throughout the codebase. The existing hackaround for local calls was re-implemented to fit the new interface of PRCServer and RPCClient. The 'R'PC method invocation is a bit slower because reflection is involved inbetween, but otherwise performance should be no different. The RPC code currently does not support multipart requests and thus does not support the equivalent of a POST. Thus, the switch to the new rpc code had the following fallout: - Move request objects + constants from rpc package to main app code - Sacrifice the hacky 'push = pull me' way of doing push -> need to further extend RPC to support multipart requests or something to implement this properly with additional interfaces -> should be done after replication is abstracted better than separate algorithms for doPull() and doPush()
2017-08-19 22:37:14 +02:00
return 0, err
}
log.Printf("read frame: %#v", r.f)
if r.f.Type != r.frameType {
err = errors.Wrapf(err, "expected frame of type %s", r.frameType)
return 0, err
}
}
maxread := len(b)
if maxread > int(r.f.PayloadLength) {
maxread = int(r.f.PayloadLength)
}
if r.bytesLeftToLimit > 0 && maxread > r.bytesLeftToLimit {
maxread = r.bytesLeftToLimit
}
nb, err := r.l.rwc.Read(b[:maxread])
log.Printf("read %v from rwc\n", nb)
if nb < 0 {
panic("should not return negative number of bytes")
}
r.f.PayloadLength -= uint32(nb)
r.bytesLeftToLimit -= nb
return nb, err // TODO io.EOF for maxread = r.f.PayloadLength ?
}
type frameBridgingWriter struct {
l *MessageLayer
frameType FrameType
// < 0 means no limit
bytesLeftToLimit int
payloadLength int
buffer *bytes.Buffer
}
func NewFrameBridgingWriter(l *MessageLayer, frameType FrameType, totalLimit int) *frameBridgingWriter {
return &frameBridgingWriter{l, frameType, totalLimit, MAX_PAYLOAD_LENGTH, bytes.NewBuffer(make([]byte, 0, MAX_PAYLOAD_LENGTH))}
}
func (w *frameBridgingWriter) Write(b []byte) (n int, err error) {
for n = 0; n < len(b); {
i, err := w.writeUntilFrameFull(b[n:])
n += i
if err != nil {
return n, errors.WithStack(err)
}
}
return
}
func (w *frameBridgingWriter) writeUntilFrameFull(b []byte) (n int, err error) {
if len(b) <= 0 {
return
}
if w.bytesLeftToLimit == 0 {
err = errors.Errorf("exceeded limit of total %v bytes for this message")
return
}
maxwrite := len(b)
remainingInFrame := w.payloadLength - w.buffer.Len()
if maxwrite > remainingInFrame {
maxwrite = remainingInFrame
}
if w.bytesLeftToLimit > 0 && maxwrite > w.bytesLeftToLimit {
maxwrite = w.bytesLeftToLimit
}
w.buffer.Write(b[:maxwrite])
w.bytesLeftToLimit -= maxwrite
n = maxwrite
if w.bytesLeftToLimit == 0 {
err = w.flush(true)
} else if w.buffer.Len() == w.payloadLength {
err = w.flush(false)
}
return
}
func (w *frameBridgingWriter) flush(nomore bool) (err error) {
f := Frame{w.frameType, nomore, uint32(w.buffer.Len())}
err = w.l.writeFrame(f)
if err != nil {
errors.WithStack(err)
}
_, err = w.buffer.WriteTo(w.l.rwc)
return
}
func (w *frameBridgingWriter) Close() (err error) {
return w.flush(true)
}
type MessageLayer struct {
rwc io.ReadWriteCloser
logger Logger
}
func NewMessageLayer(rwc io.ReadWriteCloser) *MessageLayer {
return &MessageLayer{rwc, noLogger{}}
}
func (l *MessageLayer) Close() (err error) {
reimplement io.ReadWriteCloser based RPC mechanism The existing ByteStreamRPC requires writing RPC stub + server code for each RPC endpoint. Does not scale well. Goal: adding a new RPC call should - not require writing an RPC stub / handler - not require modifications to the RPC lib The wire format is inspired by HTTP2, the API by net/rpc. Frames are used for framing messages, i.e. a message is made of multiple frames which are glued together using a frame-bridging reader / writer. This roughly corresponds to HTTP2 streams, although we're happy with just one stream at any time and the resulting non-need for flow control, etc. Frames are typed using a header. The two most important types are 'Header' and 'Data'. The RPC protocol is built on top of this: - Client sends a header => multiple frames of type 'header' - Client sends request body => mulitiple frames of type 'data' - Server reads a header => multiple frames of type 'header' - Server reads request body => mulitiple frames of type 'data' - Server sends response header => ... - Server sends response body => ... An RPC header is serialized JSON and always the same structure. The body is of the type specified in the header. The RPC server and client use some semi-fancy reflection tequniques to automatically infer the data type of the request/response body based on the method signature of the server handler; or the client parameters, respectively. This boils down to a special-case for io.Reader, which are just dumped into a series of data frames as efficiently as possible. All other types are (de)serialized using encoding/json. The RPC layer and Frame Layer log some arbitrary messages that proved useful during debugging. By default, they log to a non-logger, which should not have a big impact on performance. pprof analysis shows the implementation spends its CPU time 60% waiting for syscalls 30% in memmove 10% ... On a Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz CPU, Linux 4.12, the implementation achieved ~3.6GiB/s. Future optimization may include spice(2) / vmspice(2) on Linux, although this doesn't fit so well with the heavy use of io.Reader / io.Writer throughout the codebase. The existing hackaround for local calls was re-implemented to fit the new interface of PRCServer and RPCClient. The 'R'PC method invocation is a bit slower because reflection is involved inbetween, but otherwise performance should be no different. The RPC code currently does not support multipart requests and thus does not support the equivalent of a POST. Thus, the switch to the new rpc code had the following fallout: - Move request objects + constants from rpc package to main app code - Sacrifice the hacky 'push = pull me' way of doing push -> need to further extend RPC to support multipart requests or something to implement this properly with additional interfaces -> should be done after replication is abstracted better than separate algorithms for doPull() and doPush()
2017-08-19 22:37:14 +02:00
f := Frame{
Type: FrameTypeRST,
NoMoreFrames: true,
}
if err = l.writeFrame(f); err != nil {
l.logger.Printf("error sending RST frame: %s", err)
return errors.WithStack(err)
reimplement io.ReadWriteCloser based RPC mechanism The existing ByteStreamRPC requires writing RPC stub + server code for each RPC endpoint. Does not scale well. Goal: adding a new RPC call should - not require writing an RPC stub / handler - not require modifications to the RPC lib The wire format is inspired by HTTP2, the API by net/rpc. Frames are used for framing messages, i.e. a message is made of multiple frames which are glued together using a frame-bridging reader / writer. This roughly corresponds to HTTP2 streams, although we're happy with just one stream at any time and the resulting non-need for flow control, etc. Frames are typed using a header. The two most important types are 'Header' and 'Data'. The RPC protocol is built on top of this: - Client sends a header => multiple frames of type 'header' - Client sends request body => mulitiple frames of type 'data' - Server reads a header => multiple frames of type 'header' - Server reads request body => mulitiple frames of type 'data' - Server sends response header => ... - Server sends response body => ... An RPC header is serialized JSON and always the same structure. The body is of the type specified in the header. The RPC server and client use some semi-fancy reflection tequniques to automatically infer the data type of the request/response body based on the method signature of the server handler; or the client parameters, respectively. This boils down to a special-case for io.Reader, which are just dumped into a series of data frames as efficiently as possible. All other types are (de)serialized using encoding/json. The RPC layer and Frame Layer log some arbitrary messages that proved useful during debugging. By default, they log to a non-logger, which should not have a big impact on performance. pprof analysis shows the implementation spends its CPU time 60% waiting for syscalls 30% in memmove 10% ... On a Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz CPU, Linux 4.12, the implementation achieved ~3.6GiB/s. Future optimization may include spice(2) / vmspice(2) on Linux, although this doesn't fit so well with the heavy use of io.Reader / io.Writer throughout the codebase. The existing hackaround for local calls was re-implemented to fit the new interface of PRCServer and RPCClient. The 'R'PC method invocation is a bit slower because reflection is involved inbetween, but otherwise performance should be no different. The RPC code currently does not support multipart requests and thus does not support the equivalent of a POST. Thus, the switch to the new rpc code had the following fallout: - Move request objects + constants from rpc package to main app code - Sacrifice the hacky 'push = pull me' way of doing push -> need to further extend RPC to support multipart requests or something to implement this properly with additional interfaces -> should be done after replication is abstracted better than separate algorithms for doPull() and doPush()
2017-08-19 22:37:14 +02:00
}
return nil
reimplement io.ReadWriteCloser based RPC mechanism The existing ByteStreamRPC requires writing RPC stub + server code for each RPC endpoint. Does not scale well. Goal: adding a new RPC call should - not require writing an RPC stub / handler - not require modifications to the RPC lib The wire format is inspired by HTTP2, the API by net/rpc. Frames are used for framing messages, i.e. a message is made of multiple frames which are glued together using a frame-bridging reader / writer. This roughly corresponds to HTTP2 streams, although we're happy with just one stream at any time and the resulting non-need for flow control, etc. Frames are typed using a header. The two most important types are 'Header' and 'Data'. The RPC protocol is built on top of this: - Client sends a header => multiple frames of type 'header' - Client sends request body => mulitiple frames of type 'data' - Server reads a header => multiple frames of type 'header' - Server reads request body => mulitiple frames of type 'data' - Server sends response header => ... - Server sends response body => ... An RPC header is serialized JSON and always the same structure. The body is of the type specified in the header. The RPC server and client use some semi-fancy reflection tequniques to automatically infer the data type of the request/response body based on the method signature of the server handler; or the client parameters, respectively. This boils down to a special-case for io.Reader, which are just dumped into a series of data frames as efficiently as possible. All other types are (de)serialized using encoding/json. The RPC layer and Frame Layer log some arbitrary messages that proved useful during debugging. By default, they log to a non-logger, which should not have a big impact on performance. pprof analysis shows the implementation spends its CPU time 60% waiting for syscalls 30% in memmove 10% ... On a Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz CPU, Linux 4.12, the implementation achieved ~3.6GiB/s. Future optimization may include spice(2) / vmspice(2) on Linux, although this doesn't fit so well with the heavy use of io.Reader / io.Writer throughout the codebase. The existing hackaround for local calls was re-implemented to fit the new interface of PRCServer and RPCClient. The 'R'PC method invocation is a bit slower because reflection is involved inbetween, but otherwise performance should be no different. The RPC code currently does not support multipart requests and thus does not support the equivalent of a POST. Thus, the switch to the new rpc code had the following fallout: - Move request objects + constants from rpc package to main app code - Sacrifice the hacky 'push = pull me' way of doing push -> need to further extend RPC to support multipart requests or something to implement this properly with additional interfaces -> should be done after replication is abstracted better than separate algorithms for doPull() and doPush()
2017-08-19 22:37:14 +02:00
}
var RST error = fmt.Errorf("reset frame observed on connection")
func (l *MessageLayer) readFrame() (f Frame, err error) {
err = binary.Read(l.rwc, binary.LittleEndian, &f.Type)
if err != nil {
err = errors.WithStack(err)
return
}
err = binary.Read(l.rwc, binary.LittleEndian, &f.NoMoreFrames)
if err != nil {
err = errors.WithStack(err)
return
}
err = binary.Read(l.rwc, binary.LittleEndian, &f.PayloadLength)
if err != nil {
err = errors.WithStack(err)
return
}
if f.Type == FrameTypeRST {
l.logger.Printf("read RST frame")
reimplement io.ReadWriteCloser based RPC mechanism The existing ByteStreamRPC requires writing RPC stub + server code for each RPC endpoint. Does not scale well. Goal: adding a new RPC call should - not require writing an RPC stub / handler - not require modifications to the RPC lib The wire format is inspired by HTTP2, the API by net/rpc. Frames are used for framing messages, i.e. a message is made of multiple frames which are glued together using a frame-bridging reader / writer. This roughly corresponds to HTTP2 streams, although we're happy with just one stream at any time and the resulting non-need for flow control, etc. Frames are typed using a header. The two most important types are 'Header' and 'Data'. The RPC protocol is built on top of this: - Client sends a header => multiple frames of type 'header' - Client sends request body => mulitiple frames of type 'data' - Server reads a header => multiple frames of type 'header' - Server reads request body => mulitiple frames of type 'data' - Server sends response header => ... - Server sends response body => ... An RPC header is serialized JSON and always the same structure. The body is of the type specified in the header. The RPC server and client use some semi-fancy reflection tequniques to automatically infer the data type of the request/response body based on the method signature of the server handler; or the client parameters, respectively. This boils down to a special-case for io.Reader, which are just dumped into a series of data frames as efficiently as possible. All other types are (de)serialized using encoding/json. The RPC layer and Frame Layer log some arbitrary messages that proved useful during debugging. By default, they log to a non-logger, which should not have a big impact on performance. pprof analysis shows the implementation spends its CPU time 60% waiting for syscalls 30% in memmove 10% ... On a Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz CPU, Linux 4.12, the implementation achieved ~3.6GiB/s. Future optimization may include spice(2) / vmspice(2) on Linux, although this doesn't fit so well with the heavy use of io.Reader / io.Writer throughout the codebase. The existing hackaround for local calls was re-implemented to fit the new interface of PRCServer and RPCClient. The 'R'PC method invocation is a bit slower because reflection is involved inbetween, but otherwise performance should be no different. The RPC code currently does not support multipart requests and thus does not support the equivalent of a POST. Thus, the switch to the new rpc code had the following fallout: - Move request objects + constants from rpc package to main app code - Sacrifice the hacky 'push = pull me' way of doing push -> need to further extend RPC to support multipart requests or something to implement this properly with additional interfaces -> should be done after replication is abstracted better than separate algorithms for doPull() and doPush()
2017-08-19 22:37:14 +02:00
err = RST
return
}
if f.PayloadLength > MAX_PAYLOAD_LENGTH {
err = errors.Errorf("frame exceeds max payload length")
return
}
return
}
func (l *MessageLayer) writeFrame(f Frame) (err error) {
err = binary.Write(l.rwc, binary.LittleEndian, &f.Type)
if err != nil {
return errors.WithStack(err)
}
err = binary.Write(l.rwc, binary.LittleEndian, &f.NoMoreFrames)
if err != nil {
return errors.WithStack(err)
}
err = binary.Write(l.rwc, binary.LittleEndian, &f.PayloadLength)
if err != nil {
return errors.WithStack(err)
}
if f.PayloadLength > MAX_PAYLOAD_LENGTH {
err = errors.Errorf("frame exceeds max payload length")
return
}
return
}
func (l *MessageLayer) ReadHeader() (h *Header, err error) {
r := NewFrameBridgingReader(l, FrameTypeHeader, MAX_HEADER_LENGTH)
h = &Header{}
if err = json.NewDecoder(r).Decode(&h); err != nil {
l.logger.Printf("cannot decode marshaled header: %s", err)
return nil, err
}
return h, nil
}
func (l *MessageLayer) WriteHeader(h *Header) (err error) {
w := NewFrameBridgingWriter(l, FrameTypeHeader, MAX_HEADER_LENGTH)
err = json.NewEncoder(w).Encode(h)
if err != nil {
return errors.Wrap(err, "cannot encode header, probably fatal")
}
w.Close()
return
}
func (l *MessageLayer) ReadData() (reader io.Reader) {
r := NewFrameBridgingReader(l, FrameTypeData, -1)
return r
}
func (l *MessageLayer) WriteData(source io.Reader) (err error) {
w := NewFrameBridgingWriter(l, FrameTypeData, -1)
_, err = io.Copy(w, source)
if err != nil {
return errors.WithStack(err)
}
err = w.Close()
return
}