zrepl/rpc/rpc.go

410 lines
9.3 KiB
Go
Raw Normal View History

2017-04-14 19:26:32 +02:00
package rpc
import (
"bytes"
"encoding/json"
2017-04-16 21:38:31 +02:00
"errors"
"fmt"
2017-04-26 20:25:53 +02:00
. "github.com/zrepl/zrepl/util"
2017-05-03 17:12:15 +02:00
"github.com/zrepl/zrepl/zfs"
2017-04-26 20:25:53 +02:00
"io"
2017-04-16 21:38:31 +02:00
"reflect"
)
2017-04-14 19:26:32 +02:00
type RPCRequester interface {
FilesystemRequest(r FilesystemRequest) (roots []zfs.DatasetPath, err error)
2017-05-03 17:12:15 +02:00
FilesystemVersionsRequest(r FilesystemVersionsRequest) (versions []zfs.FilesystemVersion, err error)
InitialTransferRequest(r InitialTransferRequest) (io.Reader, error)
IncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error)
2017-04-14 19:26:32 +02:00
}
type RPCHandler interface {
HandleFilesystemRequest(r FilesystemRequest) (roots []zfs.DatasetPath, err error)
2017-05-03 17:12:15 +02:00
// returned versions ordered by birthtime, oldest first
HandleFilesystemVersionsRequest(r FilesystemVersionsRequest) (versions []zfs.FilesystemVersion, err error)
HandleInitialTransferRequest(r InitialTransferRequest) (io.Reader, error)
HandleIncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error)
2017-04-14 19:26:32 +02:00
}
2017-05-12 20:26:48 +02:00
type Logger interface {
Printf(format string, args ...interface{})
}
const ByteStreamRPCProtocolVersion = 1
2017-04-14 19:26:32 +02:00
type ByteStream interface {
io.ReadWriteCloser
}
2017-04-14 19:26:32 +02:00
type ByteStreamRPC struct {
conn ByteStream
log Logger
2017-04-14 19:26:32 +02:00
}
func ConnectByteStreamRPC(conn ByteStream) (RPCRequester, error) {
2017-04-14 19:26:32 +02:00
rpc := ByteStreamRPC{
conn: conn,
2017-04-14 19:26:32 +02:00
}
// Assert protocol versions are equal
2017-04-16 21:38:31 +02:00
err := rpc.ProtocolVersionRequest()
if err != nil {
return nil, err
}
return rpc, nil
2017-04-14 19:26:32 +02:00
}
type ByteStreamRPCDecodeJSONError struct {
Type reflect.Type
DecoderErr error
}
func (e ByteStreamRPCDecodeJSONError) Error() string {
return fmt.Sprintf("cannot decode %s: %s", e.Type, e.DecoderErr)
}
func ListenByteStreamRPC(conn ByteStream, handler RPCHandler, log Logger) error {
// A request consists of two subsequent chunked JSON objects
// Object 1: RequestHeader => contains type of Request Body
// Object 2: RequestBody, e.g. IncrementalTransferRequest
// A response is always a ResponseHeader followed by bytes to be interpreted
// as indicated by the ResponseHeader.ResponseType, e.g.
// a) a chunked response
// b) or another JSON object
defer conn.Close()
send := func(r interface{}) {
if err := writeChunkedJSON(conn, r); err != nil {
panic(err)
}
}
sendError := func(id ErrorId, msg string) {
r := ResponseHeader{
ErrorId: id,
ResponseType: RNONE,
Message: msg,
}
log.Printf("sending error response: %#v", r)
if err := writeChunkedJSON(conn, r); err != nil {
log.Printf("error sending error response: %#v", err)
panic(err)
}
}
recv := func(r interface{}) (err error) {
return readChunkedJSON(conn, r)
}
for {
var header RequestHeader = RequestHeader{}
if err := recv(&header); err != nil {
sendError(EDecodeHeader, err.Error())
return conn.Close()
}
switch header.Type {
case RTProtocolVersionRequest:
var rq ByteStreamRPCProtocolVersionRequest
if err := recv(&rq); err != nil {
sendError(EDecodeRequestBody, err.Error())
return conn.Close()
}
if rq.ClientVersion != ByteStreamRPCProtocolVersion {
sendError(EProtocolVersionMismatch, "")
return conn.Close()
}
r := ResponseHeader{
RequestId: header.Id,
ResponseType: ROK,
}
send(&r)
case RTFilesystemRequest:
var rq FilesystemRequest
if err := recv(&rq); err != nil {
sendError(EDecodeRequestBody, "")
return conn.Close()
}
roots, err := handler.HandleFilesystemRequest(rq)
if err != nil {
sendError(EHandler, err.Error())
return conn.Close()
} else {
r := ResponseHeader{
RequestId: header.Id,
ResponseType: RFilesystems,
}
send(&r)
send(&roots)
}
2017-05-03 17:12:15 +02:00
case RTFilesystemVersionsRequest:
var rq FilesystemVersionsRequest
if err := recv(&rq); err != nil {
sendError(EDecodeRequestBody, err.Error())
2017-05-03 17:12:15 +02:00
return err
}
diff, err := handler.HandleFilesystemVersionsRequest(rq)
if err != nil {
sendError(EHandler, err.Error())
2017-05-03 17:12:15 +02:00
return err
} else {
r := ResponseHeader{
RequestId: header.Id,
ResponseType: RFilesystemDiff,
}
send(&r)
send(&diff)
2017-05-03 17:12:15 +02:00
}
case RTInitialTransferRequest:
var rq InitialTransferRequest
if err := recv(&rq); err != nil {
sendError(EDecodeRequestBody, "")
return conn.Close()
}
log.Printf("initial transfer request: %#v", rq)
snapReader, err := handler.HandleInitialTransferRequest(rq)
if err != nil {
sendError(EHandler, err.Error())
return conn.Close()
} else {
r := ResponseHeader{
RequestId: header.Id,
ResponseType: RChunkedStream,
}
send(&r)
chunker := NewChunker(snapReader)
_, err := io.Copy(conn, &chunker)
if err != nil {
panic(err)
}
}
2017-04-16 21:38:31 +02:00
case RTIncrementalTransferRequest:
var rq IncrementalTransferRequest
if err := recv(&rq); err != nil {
sendError(EDecodeRequestBody, "")
return conn.Close()
2017-04-16 21:38:31 +02:00
}
snapReader, err := handler.HandleIncrementalTransferRequest(rq)
if err != nil {
sendError(EHandler, err.Error())
2017-04-16 21:38:31 +02:00
} else {
r := ResponseHeader{
RequestId: header.Id,
ResponseType: RChunkedStream,
}
send(&r)
2017-04-16 21:38:31 +02:00
chunker := NewChunker(snapReader)
_, err := io.Copy(conn, &chunker)
if err != nil {
panic(err)
2017-04-16 21:38:31 +02:00
}
}
default:
sendError(EUnknownRequestType, "")
return conn.Close()
}
}
2017-04-14 19:26:32 +02:00
return nil
}
func writeChunkedJSON(conn io.Writer, r interface{}) (err error) {
var buf bytes.Buffer
encoder := json.NewEncoder(&buf)
encoder.Encode(r)
ch := NewChunker(&buf)
_, err = io.Copy(conn, &ch)
return
}
2017-04-30 17:58:39 +02:00
func readChunkedJSON(conn io.ReadWriter, r interface{}) (err error) {
unch := NewUnchunker(conn)
dec := json.NewDecoder(unch)
err = dec.Decode(r)
if err != nil {
err = ByteStreamRPCDecodeJSONError{
Type: reflect.TypeOf(r),
DecoderErr: err,
}
2017-04-30 17:58:39 +02:00
}
closeErr := unch.Close()
if err == nil && closeErr != nil {
err = closeErr
2017-04-30 17:58:39 +02:00
}
return
2017-04-14 19:26:32 +02:00
}
2017-04-16 21:38:31 +02:00
func inferRequestType(v interface{}) (RequestType, error) {
switch v.(type) {
case ByteStreamRPCProtocolVersionRequest:
2017-04-26 20:25:53 +02:00
return RTProtocolVersionRequest, nil
2017-04-16 21:38:31 +02:00
case FilesystemRequest:
2017-04-26 20:25:53 +02:00
return RTFilesystemRequest, nil
2017-05-03 17:12:15 +02:00
case FilesystemVersionsRequest:
return RTFilesystemVersionsRequest, nil
2017-04-16 21:38:31 +02:00
case InitialTransferRequest:
2017-04-26 20:25:53 +02:00
return RTInitialTransferRequest, nil
case IncrementalTransferRequest:
return RTIncrementalTransferRequest, nil
2017-04-16 21:38:31 +02:00
default:
return 0, errors.New(fmt.Sprintf("cannot infer request type for type '%v'",
2017-04-26 20:25:53 +02:00
reflect.TypeOf(v)))
2017-04-16 21:38:31 +02:00
}
}
func genUUID() [16]byte {
return [16]byte{} // TODO
}
func (c ByteStreamRPC) sendRequest(v interface{}) (err error) {
var rt RequestType
if rt, err = inferRequestType(v); err != nil {
return
}
h := RequestHeader{
Type: rt,
2017-04-26 20:25:53 +02:00
Id: genUUID(),
2017-04-16 21:38:31 +02:00
}
if err = writeChunkedJSON(c.conn, h); err != nil {
2017-04-16 21:38:31 +02:00
return
}
if err = writeChunkedJSON(c.conn, v); err != nil {
2017-04-16 21:38:31 +02:00
return
}
return
}
func (c ByteStreamRPC) expectResponseType(rt ResponseType) (err error) {
2017-04-16 21:38:31 +02:00
var h ResponseHeader
if err = readChunkedJSON(c.conn, &h); err != nil {
2017-04-16 21:38:31 +02:00
return
}
if h.ResponseType != rt {
2017-05-06 10:58:23 +02:00
return errors.New(fmt.Sprintf("unexpected response type in response header: got %#v, expected %#v. response header: %#v",
h.ResponseType, rt, h))
2017-04-16 21:38:31 +02:00
}
return
}
func (c ByteStreamRPC) sendRequestReceiveHeader(request interface{}, rt ResponseType) (err error) {
if err = c.sendRequest(request); err != nil {
return err
}
if err = c.expectResponseType(rt); err != nil {
return err
}
return nil
}
func (c ByteStreamRPC) ProtocolVersionRequest() (err error) {
b := ByteStreamRPCProtocolVersionRequest{
ClientVersion: ByteStreamRPCProtocolVersion,
}
// OK response means the remote side can cope with our protocol version
return c.sendRequestReceiveHeader(b, ROK)
}
func (c ByteStreamRPC) FilesystemRequest(r FilesystemRequest) (roots []zfs.DatasetPath, err error) {
2017-04-16 21:38:31 +02:00
if err = c.sendRequestReceiveHeader(r, RFilesystems); err != nil {
return
}
roots = make([]zfs.DatasetPath, 0)
2017-04-16 21:38:31 +02:00
if err = readChunkedJSON(c.conn, &roots); err != nil {
2017-04-16 21:38:31 +02:00
return
}
return
2017-04-14 19:26:32 +02:00
}
2017-05-03 17:12:15 +02:00
func (c ByteStreamRPC) FilesystemVersionsRequest(r FilesystemVersionsRequest) (versions []zfs.FilesystemVersion, err error) {
if err = c.sendRequestReceiveHeader(r, RFilesystemDiff); err != nil {
return
}
err = readChunkedJSON(c.conn, &versions)
2017-05-03 17:12:15 +02:00
return
}
2017-04-16 21:38:31 +02:00
func (c ByteStreamRPC) InitialTransferRequest(r InitialTransferRequest) (unchunker io.Reader, err error) {
if err = c.sendRequestReceiveHeader(r, RChunkedStream); err != nil {
return
}
2017-04-16 21:38:31 +02:00
unchunker = NewUnchunker(c.conn)
return
2017-04-14 19:26:32 +02:00
}
2017-04-16 21:38:31 +02:00
func (c ByteStreamRPC) IncrementalTransferRequest(r IncrementalTransferRequest) (unchunker io.Reader, err error) {
if err = c.sendRequestReceiveHeader(r, RChunkedStream); err != nil {
return
}
unchunker = NewUnchunker(c.conn)
return
}
2017-04-14 19:26:32 +02:00
type LocalRPC struct {
handler RPCHandler
}
2017-04-16 21:38:31 +02:00
func ConnectLocalRPC(handler RPCHandler) RPCRequester {
2017-04-14 19:26:32 +02:00
return LocalRPC{handler}
}
func (c LocalRPC) FilesystemRequest(r FilesystemRequest) (roots []zfs.DatasetPath, err error) {
2017-04-14 19:26:32 +02:00
return c.handler.HandleFilesystemRequest(r)
}
2017-05-03 17:12:15 +02:00
func (c LocalRPC) FilesystemVersionsRequest(r FilesystemVersionsRequest) (versions []zfs.FilesystemVersion, err error) {
return c.handler.HandleFilesystemVersionsRequest(r)
}
func (c LocalRPC) InitialTransferRequest(r InitialTransferRequest) (io.Reader, error) {
2017-04-14 19:26:32 +02:00
return c.handler.HandleInitialTransferRequest(r)
}
func (c LocalRPC) IncrementalTransferRequest(r IncrementalTransferRequest) (reader io.Reader, err error) {
reader, err = c.handler.HandleIncrementalTransferRequest(r)
return
2017-04-14 19:26:32 +02:00
}