reimplement io.ReadWriteCloser based RPC mechanism
The existing ByteStreamRPC requires writing RPC stub + server code
for each RPC endpoint. Does not scale well.
Goal: adding a new RPC call should
- not require writing an RPC stub / handler
- not require modifications to the RPC lib
The wire format is inspired by HTTP2, the API by net/rpc.
Frames are used for framing messages, i.e. a message is made of multiple
frames which are glued together using a frame-bridging reader / writer.
This roughly corresponds to HTTP2 streams, although we're happy with
just one stream at any time and the resulting non-need for flow control,
etc.
Frames are typed using a header. The two most important types are
'Header' and 'Data'.
The RPC protocol is built on top of this:
- Client sends a header => multiple frames of type 'header'
- Client sends request body => mulitiple frames of type 'data'
- Server reads a header => multiple frames of type 'header'
- Server reads request body => mulitiple frames of type 'data'
- Server sends response header => ...
- Server sends response body => ...
An RPC header is serialized JSON and always the same structure.
The body is of the type specified in the header.
The RPC server and client use some semi-fancy reflection tequniques to
automatically infer the data type of the request/response body based on
the method signature of the server handler; or the client parameters,
respectively.
This boils down to a special-case for io.Reader, which are just dumped
into a series of data frames as efficiently as possible.
All other types are (de)serialized using encoding/json.
The RPC layer and Frame Layer log some arbitrary messages that proved
useful during debugging. By default, they log to a non-logger, which
should not have a big impact on performance.
pprof analysis shows the implementation spends its CPU time
60% waiting for syscalls
30% in memmove
10% ...
On a Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz CPU, Linux 4.12, the
implementation achieved ~3.6GiB/s.
Future optimization may include spice(2) / vmspice(2) on Linux, although
this doesn't fit so well with the heavy use of io.Reader / io.Writer
throughout the codebase.
The existing hackaround for local calls was re-implemented to fit the
new interface of PRCServer and RPCClient.
The 'R'PC method invocation is a bit slower because reflection is
involved inbetween, but otherwise performance should be no different.
The RPC code currently does not support multipart requests and thus does
not support the equivalent of a POST.
Thus, the switch to the new rpc code had the following fallout:
- Move request objects + constants from rpc package to main app code
- Sacrifice the hacky 'push = pull me' way of doing push
-> need to further extend RPC to support multipart requests or
something to implement this properly with additional interfaces
-> should be done after replication is abstracted better than separate
algorithms for doPull() and doPush()
2017-08-19 22:37:14 +02:00
|
|
|
package rpc
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"encoding/json"
|
|
|
|
"io"
|
|
|
|
"reflect"
|
|
|
|
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
)
|
|
|
|
|
|
|
|
type Server struct {
|
|
|
|
ml *MessageLayer
|
|
|
|
logger Logger
|
|
|
|
endpoints map[string]endpointDescr
|
|
|
|
}
|
|
|
|
|
|
|
|
type typeMap struct {
|
|
|
|
local reflect.Type
|
|
|
|
proto DataType
|
|
|
|
}
|
|
|
|
type endpointDescr struct {
|
|
|
|
inType typeMap
|
|
|
|
outType typeMap
|
|
|
|
handler reflect.Value
|
|
|
|
}
|
|
|
|
|
|
|
|
type MarshaledJSONEndpoint func(bodyJSON interface{})
|
|
|
|
|
|
|
|
func NewServer(rwc io.ReadWriteCloser) *Server {
|
|
|
|
ml := NewMessageLayer(rwc)
|
|
|
|
return &Server{
|
|
|
|
ml, noLogger{}, make(map[string]endpointDescr),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Server) SetLogger(logger Logger, logMessageLayer bool) {
|
|
|
|
s.logger = logger
|
|
|
|
if logMessageLayer {
|
|
|
|
s.ml.logger = logger
|
|
|
|
} else {
|
|
|
|
s.ml.logger = noLogger{}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Server) RegisterEndpoint(name string, handler interface{}) (err error) {
|
|
|
|
_, ok := s.endpoints[name]
|
|
|
|
if ok {
|
|
|
|
return errors.Errorf("already set up an endpoint for '%s'", name)
|
|
|
|
}
|
|
|
|
s.endpoints[name], err = makeEndpointDescr(handler)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func checkResponseHeader(h *Header) (err error) {
|
|
|
|
var statusNotSet Status
|
|
|
|
if h.Error == statusNotSet {
|
|
|
|
return errors.Errorf("status has zero-value")
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Server) writeResponse(h *Header) (err error) {
|
|
|
|
// TODO validate
|
|
|
|
return s.ml.WriteHeader(h)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Server) recvRequest() (h *Header, err error) {
|
|
|
|
h, err = s.ml.ReadHeader()
|
|
|
|
if err != nil {
|
|
|
|
s.logger.Printf("error reading header: %s", err)
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
s.logger.Printf("validating request")
|
|
|
|
err = nil // TODO validate
|
|
|
|
if err == nil {
|
|
|
|
return h, nil
|
|
|
|
}
|
|
|
|
s.logger.Printf("request validation error: %s", err)
|
|
|
|
|
|
|
|
r := NewErrorHeader(StatusRequestError, "%s", err)
|
|
|
|
return nil, s.writeResponse(r)
|
|
|
|
}
|
|
|
|
|
|
|
|
var doneServeNext error = errors.New("this should not cause a HangUp() in the server")
|
2017-09-11 10:53:18 +02:00
|
|
|
var doneStopServing error = errors.New("this should cause the server to close the connection")
|
reimplement io.ReadWriteCloser based RPC mechanism
The existing ByteStreamRPC requires writing RPC stub + server code
for each RPC endpoint. Does not scale well.
Goal: adding a new RPC call should
- not require writing an RPC stub / handler
- not require modifications to the RPC lib
The wire format is inspired by HTTP2, the API by net/rpc.
Frames are used for framing messages, i.e. a message is made of multiple
frames which are glued together using a frame-bridging reader / writer.
This roughly corresponds to HTTP2 streams, although we're happy with
just one stream at any time and the resulting non-need for flow control,
etc.
Frames are typed using a header. The two most important types are
'Header' and 'Data'.
The RPC protocol is built on top of this:
- Client sends a header => multiple frames of type 'header'
- Client sends request body => mulitiple frames of type 'data'
- Server reads a header => multiple frames of type 'header'
- Server reads request body => mulitiple frames of type 'data'
- Server sends response header => ...
- Server sends response body => ...
An RPC header is serialized JSON and always the same structure.
The body is of the type specified in the header.
The RPC server and client use some semi-fancy reflection tequniques to
automatically infer the data type of the request/response body based on
the method signature of the server handler; or the client parameters,
respectively.
This boils down to a special-case for io.Reader, which are just dumped
into a series of data frames as efficiently as possible.
All other types are (de)serialized using encoding/json.
The RPC layer and Frame Layer log some arbitrary messages that proved
useful during debugging. By default, they log to a non-logger, which
should not have a big impact on performance.
pprof analysis shows the implementation spends its CPU time
60% waiting for syscalls
30% in memmove
10% ...
On a Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz CPU, Linux 4.12, the
implementation achieved ~3.6GiB/s.
Future optimization may include spice(2) / vmspice(2) on Linux, although
this doesn't fit so well with the heavy use of io.Reader / io.Writer
throughout the codebase.
The existing hackaround for local calls was re-implemented to fit the
new interface of PRCServer and RPCClient.
The 'R'PC method invocation is a bit slower because reflection is
involved inbetween, but otherwise performance should be no different.
The RPC code currently does not support multipart requests and thus does
not support the equivalent of a POST.
Thus, the switch to the new rpc code had the following fallout:
- Move request objects + constants from rpc package to main app code
- Sacrifice the hacky 'push = pull me' way of doing push
-> need to further extend RPC to support multipart requests or
something to implement this properly with additional interfaces
-> should be done after replication is abstracted better than separate
algorithms for doPull() and doPush()
2017-08-19 22:37:14 +02:00
|
|
|
|
|
|
|
var ProtocolError error = errors.New("protocol error, server should hang up")
|
|
|
|
|
2017-09-11 10:53:18 +02:00
|
|
|
const ControlEndpointClose string = "Close"
|
|
|
|
|
reimplement io.ReadWriteCloser based RPC mechanism
The existing ByteStreamRPC requires writing RPC stub + server code
for each RPC endpoint. Does not scale well.
Goal: adding a new RPC call should
- not require writing an RPC stub / handler
- not require modifications to the RPC lib
The wire format is inspired by HTTP2, the API by net/rpc.
Frames are used for framing messages, i.e. a message is made of multiple
frames which are glued together using a frame-bridging reader / writer.
This roughly corresponds to HTTP2 streams, although we're happy with
just one stream at any time and the resulting non-need for flow control,
etc.
Frames are typed using a header. The two most important types are
'Header' and 'Data'.
The RPC protocol is built on top of this:
- Client sends a header => multiple frames of type 'header'
- Client sends request body => mulitiple frames of type 'data'
- Server reads a header => multiple frames of type 'header'
- Server reads request body => mulitiple frames of type 'data'
- Server sends response header => ...
- Server sends response body => ...
An RPC header is serialized JSON and always the same structure.
The body is of the type specified in the header.
The RPC server and client use some semi-fancy reflection tequniques to
automatically infer the data type of the request/response body based on
the method signature of the server handler; or the client parameters,
respectively.
This boils down to a special-case for io.Reader, which are just dumped
into a series of data frames as efficiently as possible.
All other types are (de)serialized using encoding/json.
The RPC layer and Frame Layer log some arbitrary messages that proved
useful during debugging. By default, they log to a non-logger, which
should not have a big impact on performance.
pprof analysis shows the implementation spends its CPU time
60% waiting for syscalls
30% in memmove
10% ...
On a Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz CPU, Linux 4.12, the
implementation achieved ~3.6GiB/s.
Future optimization may include spice(2) / vmspice(2) on Linux, although
this doesn't fit so well with the heavy use of io.Reader / io.Writer
throughout the codebase.
The existing hackaround for local calls was re-implemented to fit the
new interface of PRCServer and RPCClient.
The 'R'PC method invocation is a bit slower because reflection is
involved inbetween, but otherwise performance should be no different.
The RPC code currently does not support multipart requests and thus does
not support the equivalent of a POST.
Thus, the switch to the new rpc code had the following fallout:
- Move request objects + constants from rpc package to main app code
- Sacrifice the hacky 'push = pull me' way of doing push
-> need to further extend RPC to support multipart requests or
something to implement this properly with additional interfaces
-> should be done after replication is abstracted better than separate
algorithms for doPull() and doPush()
2017-08-19 22:37:14 +02:00
|
|
|
// Serve the connection until failure or the client hangs up
|
|
|
|
func (s *Server) Serve() (err error) {
|
|
|
|
for {
|
|
|
|
|
|
|
|
err = s.ServeRequest()
|
|
|
|
|
|
|
|
if err == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if err == doneServeNext {
|
|
|
|
s.logger.Printf("subroutine returned pseudo-error indicating early-exit")
|
2017-09-11 10:53:18 +02:00
|
|
|
err = nil
|
reimplement io.ReadWriteCloser based RPC mechanism
The existing ByteStreamRPC requires writing RPC stub + server code
for each RPC endpoint. Does not scale well.
Goal: adding a new RPC call should
- not require writing an RPC stub / handler
- not require modifications to the RPC lib
The wire format is inspired by HTTP2, the API by net/rpc.
Frames are used for framing messages, i.e. a message is made of multiple
frames which are glued together using a frame-bridging reader / writer.
This roughly corresponds to HTTP2 streams, although we're happy with
just one stream at any time and the resulting non-need for flow control,
etc.
Frames are typed using a header. The two most important types are
'Header' and 'Data'.
The RPC protocol is built on top of this:
- Client sends a header => multiple frames of type 'header'
- Client sends request body => mulitiple frames of type 'data'
- Server reads a header => multiple frames of type 'header'
- Server reads request body => mulitiple frames of type 'data'
- Server sends response header => ...
- Server sends response body => ...
An RPC header is serialized JSON and always the same structure.
The body is of the type specified in the header.
The RPC server and client use some semi-fancy reflection tequniques to
automatically infer the data type of the request/response body based on
the method signature of the server handler; or the client parameters,
respectively.
This boils down to a special-case for io.Reader, which are just dumped
into a series of data frames as efficiently as possible.
All other types are (de)serialized using encoding/json.
The RPC layer and Frame Layer log some arbitrary messages that proved
useful during debugging. By default, they log to a non-logger, which
should not have a big impact on performance.
pprof analysis shows the implementation spends its CPU time
60% waiting for syscalls
30% in memmove
10% ...
On a Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz CPU, Linux 4.12, the
implementation achieved ~3.6GiB/s.
Future optimization may include spice(2) / vmspice(2) on Linux, although
this doesn't fit so well with the heavy use of io.Reader / io.Writer
throughout the codebase.
The existing hackaround for local calls was re-implemented to fit the
new interface of PRCServer and RPCClient.
The 'R'PC method invocation is a bit slower because reflection is
involved inbetween, but otherwise performance should be no different.
The RPC code currently does not support multipart requests and thus does
not support the equivalent of a POST.
Thus, the switch to the new rpc code had the following fallout:
- Move request objects + constants from rpc package to main app code
- Sacrifice the hacky 'push = pull me' way of doing push
-> need to further extend RPC to support multipart requests or
something to implement this properly with additional interfaces
-> should be done after replication is abstracted better than separate
algorithms for doPull() and doPush()
2017-08-19 22:37:14 +02:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2017-09-11 10:53:18 +02:00
|
|
|
if err == doneStopServing {
|
|
|
|
s.logger.Printf("subroutine returned pseudo-error indicating close request")
|
|
|
|
err = nil
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
s.logger.Printf("an error occurred that could not be handled on PRC protocol level: %+v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
s.logger.Printf("cloing MessageLayer")
|
|
|
|
if mlErr := s.ml.Close(); mlErr != nil {
|
|
|
|
s.logger.Printf("error closing MessageLayer: %+v", mlErr)
|
reimplement io.ReadWriteCloser based RPC mechanism
The existing ByteStreamRPC requires writing RPC stub + server code
for each RPC endpoint. Does not scale well.
Goal: adding a new RPC call should
- not require writing an RPC stub / handler
- not require modifications to the RPC lib
The wire format is inspired by HTTP2, the API by net/rpc.
Frames are used for framing messages, i.e. a message is made of multiple
frames which are glued together using a frame-bridging reader / writer.
This roughly corresponds to HTTP2 streams, although we're happy with
just one stream at any time and the resulting non-need for flow control,
etc.
Frames are typed using a header. The two most important types are
'Header' and 'Data'.
The RPC protocol is built on top of this:
- Client sends a header => multiple frames of type 'header'
- Client sends request body => mulitiple frames of type 'data'
- Server reads a header => multiple frames of type 'header'
- Server reads request body => mulitiple frames of type 'data'
- Server sends response header => ...
- Server sends response body => ...
An RPC header is serialized JSON and always the same structure.
The body is of the type specified in the header.
The RPC server and client use some semi-fancy reflection tequniques to
automatically infer the data type of the request/response body based on
the method signature of the server handler; or the client parameters,
respectively.
This boils down to a special-case for io.Reader, which are just dumped
into a series of data frames as efficiently as possible.
All other types are (de)serialized using encoding/json.
The RPC layer and Frame Layer log some arbitrary messages that proved
useful during debugging. By default, they log to a non-logger, which
should not have a big impact on performance.
pprof analysis shows the implementation spends its CPU time
60% waiting for syscalls
30% in memmove
10% ...
On a Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz CPU, Linux 4.12, the
implementation achieved ~3.6GiB/s.
Future optimization may include spice(2) / vmspice(2) on Linux, although
this doesn't fit so well with the heavy use of io.Reader / io.Writer
throughout the codebase.
The existing hackaround for local calls was re-implemented to fit the
new interface of PRCServer and RPCClient.
The 'R'PC method invocation is a bit slower because reflection is
involved inbetween, but otherwise performance should be no different.
The RPC code currently does not support multipart requests and thus does
not support the equivalent of a POST.
Thus, the switch to the new rpc code had the following fallout:
- Move request objects + constants from rpc package to main app code
- Sacrifice the hacky 'push = pull me' way of doing push
-> need to further extend RPC to support multipart requests or
something to implement this properly with additional interfaces
-> should be done after replication is abstracted better than separate
algorithms for doPull() and doPush()
2017-08-19 22:37:14 +02:00
|
|
|
}
|
2017-09-11 10:53:18 +02:00
|
|
|
|
|
|
|
return err
|
reimplement io.ReadWriteCloser based RPC mechanism
The existing ByteStreamRPC requires writing RPC stub + server code
for each RPC endpoint. Does not scale well.
Goal: adding a new RPC call should
- not require writing an RPC stub / handler
- not require modifications to the RPC lib
The wire format is inspired by HTTP2, the API by net/rpc.
Frames are used for framing messages, i.e. a message is made of multiple
frames which are glued together using a frame-bridging reader / writer.
This roughly corresponds to HTTP2 streams, although we're happy with
just one stream at any time and the resulting non-need for flow control,
etc.
Frames are typed using a header. The two most important types are
'Header' and 'Data'.
The RPC protocol is built on top of this:
- Client sends a header => multiple frames of type 'header'
- Client sends request body => mulitiple frames of type 'data'
- Server reads a header => multiple frames of type 'header'
- Server reads request body => mulitiple frames of type 'data'
- Server sends response header => ...
- Server sends response body => ...
An RPC header is serialized JSON and always the same structure.
The body is of the type specified in the header.
The RPC server and client use some semi-fancy reflection tequniques to
automatically infer the data type of the request/response body based on
the method signature of the server handler; or the client parameters,
respectively.
This boils down to a special-case for io.Reader, which are just dumped
into a series of data frames as efficiently as possible.
All other types are (de)serialized using encoding/json.
The RPC layer and Frame Layer log some arbitrary messages that proved
useful during debugging. By default, they log to a non-logger, which
should not have a big impact on performance.
pprof analysis shows the implementation spends its CPU time
60% waiting for syscalls
30% in memmove
10% ...
On a Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz CPU, Linux 4.12, the
implementation achieved ~3.6GiB/s.
Future optimization may include spice(2) / vmspice(2) on Linux, although
this doesn't fit so well with the heavy use of io.Reader / io.Writer
throughout the codebase.
The existing hackaround for local calls was re-implemented to fit the
new interface of PRCServer and RPCClient.
The 'R'PC method invocation is a bit slower because reflection is
involved inbetween, but otherwise performance should be no different.
The RPC code currently does not support multipart requests and thus does
not support the equivalent of a POST.
Thus, the switch to the new rpc code had the following fallout:
- Move request objects + constants from rpc package to main app code
- Sacrifice the hacky 'push = pull me' way of doing push
-> need to further extend RPC to support multipart requests or
something to implement this properly with additional interfaces
-> should be done after replication is abstracted better than separate
algorithms for doPull() and doPush()
2017-08-19 22:37:14 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// Serve a single request
|
|
|
|
// * wait for request to come in
|
|
|
|
// * call handler
|
|
|
|
// * reply
|
|
|
|
//
|
|
|
|
// The connection is left open, the next bytes on the conn should be
|
|
|
|
// the next request header.
|
|
|
|
//
|
|
|
|
// Returns an err != nil if the error is bad enough to hang up on the client.
|
|
|
|
// Examples: protocol version mismatches, protocol errors in general, ...
|
|
|
|
// Non-Examples: a handler error
|
|
|
|
func (s *Server) ServeRequest() (err error) {
|
|
|
|
|
|
|
|
ml := s.ml
|
|
|
|
|
|
|
|
s.logger.Printf("reading header")
|
|
|
|
h, err := s.recvRequest()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2017-09-11 10:53:18 +02:00
|
|
|
if h.DataType == DataTypeControl {
|
|
|
|
switch h.Endpoint {
|
|
|
|
case ControlEndpointClose:
|
|
|
|
ack := Header{Error: StatusOK, DataType: DataTypeControl}
|
|
|
|
err = s.writeResponse(&ack)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return doneStopServing
|
|
|
|
default:
|
|
|
|
r := NewErrorHeader(StatusRequestError, "unregistered control endpoint %s", h.Endpoint)
|
|
|
|
return s.writeResponse(r)
|
|
|
|
}
|
|
|
|
panic("implementation error")
|
|
|
|
}
|
|
|
|
|
reimplement io.ReadWriteCloser based RPC mechanism
The existing ByteStreamRPC requires writing RPC stub + server code
for each RPC endpoint. Does not scale well.
Goal: adding a new RPC call should
- not require writing an RPC stub / handler
- not require modifications to the RPC lib
The wire format is inspired by HTTP2, the API by net/rpc.
Frames are used for framing messages, i.e. a message is made of multiple
frames which are glued together using a frame-bridging reader / writer.
This roughly corresponds to HTTP2 streams, although we're happy with
just one stream at any time and the resulting non-need for flow control,
etc.
Frames are typed using a header. The two most important types are
'Header' and 'Data'.
The RPC protocol is built on top of this:
- Client sends a header => multiple frames of type 'header'
- Client sends request body => mulitiple frames of type 'data'
- Server reads a header => multiple frames of type 'header'
- Server reads request body => mulitiple frames of type 'data'
- Server sends response header => ...
- Server sends response body => ...
An RPC header is serialized JSON and always the same structure.
The body is of the type specified in the header.
The RPC server and client use some semi-fancy reflection tequniques to
automatically infer the data type of the request/response body based on
the method signature of the server handler; or the client parameters,
respectively.
This boils down to a special-case for io.Reader, which are just dumped
into a series of data frames as efficiently as possible.
All other types are (de)serialized using encoding/json.
The RPC layer and Frame Layer log some arbitrary messages that proved
useful during debugging. By default, they log to a non-logger, which
should not have a big impact on performance.
pprof analysis shows the implementation spends its CPU time
60% waiting for syscalls
30% in memmove
10% ...
On a Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz CPU, Linux 4.12, the
implementation achieved ~3.6GiB/s.
Future optimization may include spice(2) / vmspice(2) on Linux, although
this doesn't fit so well with the heavy use of io.Reader / io.Writer
throughout the codebase.
The existing hackaround for local calls was re-implemented to fit the
new interface of PRCServer and RPCClient.
The 'R'PC method invocation is a bit slower because reflection is
involved inbetween, but otherwise performance should be no different.
The RPC code currently does not support multipart requests and thus does
not support the equivalent of a POST.
Thus, the switch to the new rpc code had the following fallout:
- Move request objects + constants from rpc package to main app code
- Sacrifice the hacky 'push = pull me' way of doing push
-> need to further extend RPC to support multipart requests or
something to implement this properly with additional interfaces
-> should be done after replication is abstracted better than separate
algorithms for doPull() and doPush()
2017-08-19 22:37:14 +02:00
|
|
|
ep, ok := s.endpoints[h.Endpoint]
|
|
|
|
if !ok {
|
|
|
|
r := NewErrorHeader(StatusRequestError, "unregistered endpoint %s", h.Endpoint)
|
|
|
|
return s.writeResponse(r)
|
|
|
|
}
|
|
|
|
|
|
|
|
if ep.inType.proto != h.DataType {
|
|
|
|
r := NewErrorHeader(StatusRequestError, "wrong DataType for endpoint %s (has %s, you provided %s)", h.Endpoint, ep.inType.proto, h.DataType)
|
|
|
|
return s.writeResponse(r)
|
|
|
|
}
|
|
|
|
|
|
|
|
if ep.outType.proto != h.Accept {
|
|
|
|
r := NewErrorHeader(StatusRequestError, "wrong Accept for endpoint %s (has %s, you provided %s)", h.Endpoint, ep.outType.proto, h.Accept)
|
|
|
|
return s.writeResponse(r)
|
|
|
|
}
|
|
|
|
|
|
|
|
dr := ml.ReadData()
|
|
|
|
|
|
|
|
// Determine inval
|
|
|
|
var inval reflect.Value
|
|
|
|
switch ep.inType.proto {
|
|
|
|
case DataTypeMarshaledJSON:
|
|
|
|
// Unmarshal input
|
|
|
|
inval = reflect.New(ep.inType.local.Elem())
|
|
|
|
invalIface := inval.Interface()
|
|
|
|
err = json.NewDecoder(dr).Decode(invalIface)
|
|
|
|
if err != nil {
|
|
|
|
r := NewErrorHeader(StatusRequestError, "cannot decode marshaled JSON: %s", err)
|
|
|
|
return s.writeResponse(r)
|
|
|
|
}
|
|
|
|
case DataTypeOctets:
|
|
|
|
// Take data as is
|
|
|
|
inval = reflect.ValueOf(dr)
|
|
|
|
default:
|
|
|
|
panic("not implemented")
|
|
|
|
}
|
|
|
|
|
|
|
|
outval := reflect.New(ep.outType.local.Elem()) // outval is a double pointer
|
|
|
|
|
|
|
|
s.logger.Printf("before handler, inval=%v outval=%v", inval, outval)
|
|
|
|
|
|
|
|
// Call the handler
|
|
|
|
errs := ep.handler.Call([]reflect.Value{inval, outval})
|
|
|
|
|
|
|
|
if !errs[0].IsNil() {
|
|
|
|
he := errs[0].Interface().(error) // we checked that before...
|
|
|
|
s.logger.Printf("handler returned error: %s", err)
|
|
|
|
r := NewErrorHeader(StatusError, "%s", he.Error())
|
|
|
|
return s.writeResponse(r)
|
|
|
|
}
|
|
|
|
|
|
|
|
switch ep.outType.proto {
|
|
|
|
|
|
|
|
case DataTypeMarshaledJSON:
|
|
|
|
|
|
|
|
var dataBuf bytes.Buffer
|
|
|
|
// Marshal output
|
|
|
|
err = json.NewEncoder(&dataBuf).Encode(outval.Interface())
|
|
|
|
if err != nil {
|
|
|
|
r := NewErrorHeader(StatusServerError, "cannot marshal response: %s", err)
|
|
|
|
return s.writeResponse(r)
|
|
|
|
}
|
|
|
|
|
|
|
|
replyHeader := Header{
|
|
|
|
Error: StatusOK,
|
|
|
|
DataType: ep.outType.proto,
|
|
|
|
}
|
|
|
|
if err = s.writeResponse(&replyHeader); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if err = ml.WriteData(&dataBuf); err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
case DataTypeOctets:
|
|
|
|
|
|
|
|
h := Header{
|
|
|
|
Error: StatusOK,
|
|
|
|
DataType: DataTypeOctets,
|
|
|
|
}
|
|
|
|
if err = s.writeResponse(&h); err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
reader := outval.Interface().(*io.Reader) // we checked that when adding the endpoint
|
|
|
|
err = ml.WriteData(*reader)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|