Finish implementation of RPC.

This commit is contained in:
Christian Schwarz 2017-04-16 21:38:31 +02:00
parent c1aed10e8b
commit 4494afe47f
4 changed files with 139 additions and 17 deletions

View File

@ -2,6 +2,7 @@ package model
type Filesystem struct {
Name string
Parent *Filesystem
Children []Filesystem
Snapshots []Snapshot
}

View File

@ -6,16 +6,19 @@ import (
. "github.com/zrepl/zrepl/model"
. "github.com/zrepl/zrepl/util"
"errors"
"fmt"
"reflect"
)
type RPCRequester interface {
FilesystemRequest(r FilesystemRequest) (root Filesystem, err error)
FilesystemRequest(r FilesystemRequest) (roots []Filesystem, err error)
InitialTransferRequest(r InitialTransferRequest) (io.Reader, error)
IncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error)
}
type RPCHandler interface {
HandleFilesystemRequest(r FilesystemRequest) (root Filesystem, err error)
HandleFilesystemRequest(r FilesystemRequest) (roots []Filesystem, err error)
HandleInitialTransferRequest(r InitialTransferRequest) (io.Reader, error)
HandleIncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error)
}
@ -28,7 +31,7 @@ type ByteStreamRPC struct {
decoder *json.Decoder
}
func ConnectByteStreamRPC(conn io.ReadWriteCloser) (ByteStreamRPC, error) {
func ConnectByteStreamRPC(conn io.ReadWriteCloser) (RPCRequester, error) {
// TODO do ssh connection to transport, establish TCP-like communication channel
rpc := ByteStreamRPC{
conn: conn,
@ -37,8 +40,11 @@ func ConnectByteStreamRPC(conn io.ReadWriteCloser) (ByteStreamRPC, error) {
}
// Assert protocol versions are equal
req := NewByteStreamRPCProtocolVersionRequest()
rpc.encoder.Encode(&req)
err := rpc.ProtocolVersionRequest()
if err != nil {
return nil, err
}
return rpc, nil
}
@ -116,7 +122,25 @@ func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error {
return err
}
}
// TODO
case RTIncrementalTransferRequest:
var rq IncrementalTransferRequest
if err := decoder.Decode(&rq); err != nil {
respondWithError(conn, EDecodeRequestBody, nil)
}
snapReader, err := handler.HandleIncrementalTransferRequest(rq)
if err != nil {
respondWithError(conn, EHandler, err)
} else {
chunker := NewChunker(snapReader)
_, err := io.Copy(conn, &chunker)
if err != nil {
return err
}
}
default:
respondWithError(conn, EUnknownRequestType, nil)
conn.Close()
@ -130,28 +154,122 @@ func respondWithError(conn io.Writer, id ErrorId, err error) error {
return nil
}
func inferRequestType(v interface{}) (RequestType, error) {
switch v.(type) {
case ByteStreamRPCProtocolVersionRequest:
return RTProtocolVersionRequest, nil
case FilesystemRequest:
return RTFilesystemRequest, nil
case InitialTransferRequest:
return RTInitialTransferRequest, nil
default:
return 0, errors.New(fmt.Sprintf("cannot infer request type for type '%v'",
reflect.TypeOf(v)))
}
}
func genUUID() [16]byte {
return [16]byte{} // TODO
}
func (c ByteStreamRPC) sendRequest(v interface{}) (err error) {
var rt RequestType
if rt, err = inferRequestType(v); err != nil {
return
}
h := RequestHeader{
Type: rt,
Id: genUUID(),
}
if err = c.encoder.Encode(h); err != nil {
return
}
if err = c.encoder.Encode(v); err != nil {
return
}
return
}
func (c ByteStreamRPC) expectResponseType(rt ResponseType) (err error) {
var h ResponseHeader
if err = c.decoder.Decode(&h); err != nil {
return
}
if h.ResponseType != rt {
return errors.New("unexpected response type in response header")
}
return
}
func (c ByteStreamRPC) sendRequestReceiveHeader(request interface{}, rt ResponseType) (err error) {
if err = c.sendRequest(request); err != nil {
return err
}
if err = c.expectResponseType(rt); err != nil {
return err
}
return nil
}
func (c ByteStreamRPC) ProtocolVersionRequest() (err error) {
b := ByteStreamRPCProtocolVersionRequest{
ClientVersion: ByteStreamRPCProtocolVersion,
}
// OK response means the remote side can cope with our protocol version
return c.sendRequestReceiveHeader(b, ROK)
}
func (c ByteStreamRPC) FilesystemRequest(r FilesystemRequest) (roots []Filesystem, err error) {
return nil, nil
if err = c.sendRequestReceiveHeader(r, RFilesystems); err != nil {
return
}
roots = make([]Filesystem, 0)
if err = c.decoder.Decode(roots); err != nil {
return
}
return
}
func (c ByteStreamRPC) InitialTransferRequest(r InitialTransferRequest) (io.Reader, error) {
// send request header using protobuf or similar
return nil, nil
func (c ByteStreamRPC) InitialTransferRequest(r InitialTransferRequest) (unchunker io.Reader, err error) {
if err = c.sendRequestReceiveHeader(r, RChunkedStream); err != nil {
return
}
unchunker = NewUnchunker(c.conn)
return
}
func (c ByteStreamRPC) IncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error) {
return nil, nil
func (c ByteStreamRPC) IncrementalTransferRequest(r IncrementalTransferRequest) (unchunker io.Reader, err error) {
if err = c.sendRequestReceiveHeader(r, RChunkedStream); err != nil {
return
}
unchunker = NewUnchunker(c.conn)
return
}
type LocalRPC struct {
handler RPCHandler
}
func ConnectLocalRPC(handler RPCHandler) LocalRPC {
func ConnectLocalRPC(handler RPCHandler) RPCRequester {
return LocalRPC{handler}
}
func (c LocalRPC) FilesystemRequest(r FilesystemRequest) (root Filesystem, err error) {
func (c LocalRPC) FilesystemRequest(r FilesystemRequest) (roots []Filesystem, err error) {
return c.handler.HandleFilesystemRequest(r)
}

View File

@ -9,6 +9,7 @@ const (
RTProtocolVersionRequest RequestType = 1
RTFilesystemRequest = 16
RTInitialTransferRequest = 17
RTIncrementalTransferRequest = 18
)
type RequestHeader struct {
@ -55,7 +56,9 @@ const (
type ResponseType uint8
const (
ROK ResponseType = 0
ROK ResponseType = 1
RFilesystems = 2
RChunkedStream = 3
)
type ResponseHeader struct {

View File

@ -15,8 +15,8 @@ type Unchunker struct {
remainingChunkBytes uint32
}
func NewUnchunker(conn io.Reader) Unchunker {
return Unchunker{
func NewUnchunker(conn io.Reader) *Unchunker {
return &Unchunker{
in: conn,
remainingChunkBytes: 0,
}