zrepl/rpc/client.go
Christian Schwarz fa4d2098a8 rpc: re-architect connection teardown
Tear down occurs on each protocol level, stack-wise.

Open RWC
Open ML (with NewMessageLayer)
Open RPC (with NewServer/ NewClient)
Close RPC (with Close() from Client())
Close ML
* in Server: after error / receive of Close request
* in Client: after getting ACK for Close request from Server
Close RWC

To achieve this, a DataType for RPC control messages was added, which
has a separate set of endpoints. Not exactly pretty, but works for now.

The necessity of the RST frame remains to be determined. However, it is
nice to have a way to signal the other side something went terribly
wrong in the middle of an operation. Example: A frameBridingWriter fails
to read the next chunk of a file it is supposed to send, it can just
send an RST frame to signal this operation failed... Wouldn't trailers
make sense then?
2017-09-11 10:54:56 +02:00

137 lines
2.6 KiB
Go

package rpc
import (
"bytes"
"encoding/json"
"io"
"reflect"
"github.com/pkg/errors"
)
type Client struct {
ml *MessageLayer
logger Logger
}
func NewClient(rwc io.ReadWriteCloser) *Client {
return &Client{NewMessageLayer(rwc), noLogger{}}
}
func (c *Client) SetLogger(logger Logger, logMessageLayer bool) {
c.logger = logger
if logMessageLayer {
c.ml.logger = logger
} else {
c.ml.logger = noLogger{}
}
}
func (c *Client) Close() (err error) {
c.logger.Printf("sending Close request")
header := Header{
DataType: DataTypeControl,
Endpoint: ControlEndpointClose,
Accept: DataTypeControl,
}
err = c.ml.WriteHeader(&header)
if err != nil {
return
}
c.logger.Printf("reading Close ACK")
ack, err := c.ml.ReadHeader()
if err != nil {
return err
}
c.logger.Printf("received Close ACK: %#v", ack)
if ack.Error != StatusOK {
err = errors.Errorf("error hanging up: remote error (%s) %s", ack.Error, ack.ErrorMessage)
return
}
c.logger.Printf("closing MessageLayer")
if err = c.ml.Close(); err != nil {
c.logger.Printf("error closing RWC: %+v", err)
return
}
return err
}
func (c *Client) recvResponse() (h *Header, err error) {
h, err = c.ml.ReadHeader()
if err != nil {
return nil, errors.Wrap(err, "cannot read header")
}
// TODO validate
return
}
func (c *Client) writeRequest(h *Header) (err error) {
// TODO validate
err = c.ml.WriteHeader(h)
if err != nil {
return errors.Wrap(err, "cannot write header")
}
return
}
func (c *Client) Call(endpoint string, in, out interface{}) (err error) {
var accept DataType
{
outType := reflect.TypeOf(out)
if typeIsIOReaderPtr(outType) {
accept = DataTypeOctets
} else {
accept = DataTypeMarshaledJSON
}
}
h := Header{
Endpoint: endpoint,
DataType: DataTypeMarshaledJSON,
Accept: accept,
}
if err = c.writeRequest(&h); err != nil {
return err
}
var buf bytes.Buffer
if err = json.NewEncoder(&buf).Encode(in); err != nil {
panic("cannot encode 'in' parameter")
}
if err = c.ml.WriteData(&buf); err != nil {
return err
}
rh, err := c.recvResponse()
if err != nil {
return err
}
if rh.Error != StatusOK {
return &RPCError{rh}
}
rd := c.ml.ReadData()
switch accept {
case DataTypeOctets:
c.logger.Printf("setting out to ML data reader")
outPtr := out.(*io.Reader) // we checked that above
*outPtr = rd
case DataTypeMarshaledJSON:
c.logger.Printf("decoding marshaled json")
if err = json.NewDecoder(c.ml.ReadData()).Decode(out); err != nil {
return errors.Wrap(err, "cannot decode marshaled reply")
}
default:
panic("implementation error") // accept is controlled by us
}
return
}