Further drafting of rpc module.

Also: fix typo in model definitions.
This commit is contained in:
Christian Schwarz 2017-04-15 18:31:14 +02:00
parent 32f07c51c7
commit c1aed10e8b
3 changed files with 183 additions and 47 deletions

View File

@ -1,14 +1,14 @@
package model package model
type Filesytem struct { type Filesystem struct {
Name string Name string
Children []Filesytem Children []Filesystem
Snapshots []Snapshot Snapshots []Snapshot
} }
type FilesytemMapping struct { type FilesytemMapping struct {
From Filesytem From Filesystem
To Filesytem To Filesystem
} }
type Snapshot struct { type Snapshot struct {
@ -16,7 +16,7 @@ type Snapshot struct {
} }
type Pool struct { type Pool struct {
Root Filesytem Root Filesystem
} }
type SSHTransport struct { type SSHTransport struct {

View File

@ -1,76 +1,165 @@
package rpc package rpc
import (
"encoding/json"
"io"
. "github.com/zrepl/zrepl/model"
. "github.com/zrepl/zrepl/util"
)
type RPCRequester interface { type RPCRequester interface {
FilesystemRequest(r FilesystemRequest) (root model.Filesystem, err error) FilesystemRequest(r FilesystemRequest) (root Filesystem, err error)
InitialTransferRequest(r InitialTransferRequest) (io.Read, error) InitialTransferRequest(r InitialTransferRequest) (io.Reader, error)
IncrementalTransferRequest(r IncrementalTransferRequest) (io.Read, error) IncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error)
} }
type RPCHandler interface { type RPCHandler interface {
HandleFilesystemRequest(r FilesystemRequest) (root model.Filesystem, err error) HandleFilesystemRequest(r FilesystemRequest) (root Filesystem, err error)
HandleInitialTransferRequest(r InitialTransferRequest) (io.Read, error) HandleInitialTransferRequest(r InitialTransferRequest) (io.Reader, error)
HandleIncrementalTransferRequestRequest(r IncrementalTransferRequest) (io.Read, error) HandleIncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error)
} }
const ByteStreamRPCProtocolVersion = 1
type ByteStreamRPC struct { type ByteStreamRPC struct {
conn io.ReadWriteCloser conn io.ReadWriteCloser
encoder *json.Encoder
decoder *json.Decoder
} }
func ConnectByteStreamRPC(conn io.ReadWriteCloser) (ByteStreamRPC, error) { func ConnectByteStreamRPC(conn io.ReadWriteCloser) (ByteStreamRPC, error) {
// TODO do ssh connection to transport, establish TCP-like communication channel // TODO do ssh connection to transport, establish TCP-like communication channel
conn := sshtransport.New()
rpc := ByteStreamRPC{ rpc := ByteStreamRPC{
conn: conn, conn: conn,
encoder: json.NewEncoder(conn),
decoder: json.NewDecoder(conn),
} }
return conn, nil
// Assert protocol versions are equal
req := NewByteStreamRPCProtocolVersionRequest()
rpc.encoder.Encode(&req)
return rpc, nil
} }
func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) (error) { func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error {
// Read from connection, decode wire protocol, route requests to handler
// A request consists of two subsequent JSON objects
// Object 1: RequestHeader => contains type of Request Body
// Object 2: RequestBody, e.g. IncrementalTransferRequest
// A response is always a ResponseHeader followed by bytes to be interpreted
// as indicated by the ResponseHeader.ResponseType, e.g.
// a) a chunked response
// b) or another JSON object
decoder := json.NewDecoder(conn)
encoder := json.NewEncoder(conn)
for {
var header RequestHeader = RequestHeader{}
if err := decoder.Decode(&header); err != nil {
respondWithError(conn, EDecodeHeader, err)
conn.Close()
return err
}
switch header.Type {
case RTProtocolVersionRequest:
var rq ByteStreamRPCProtocolVersionRequest
if err := decoder.Decode(&rq); err != nil {
respondWithError(conn, EDecodeRequestBody, nil)
conn.Close()
}
if rq.ClientVersion != ByteStreamRPCProtocolVersion {
respondWithError(conn, EProtocolVersionMismatch, nil)
conn.Close()
}
r := ResponseHeader{
RequestId: header.Id,
}
if err := encoder.Encode(&r); err != nil {
return err
}
case RTFilesystemRequest:
var rq FilesystemRequest
if err := decoder.Decode(&rq); err != nil {
respondWithError(conn, EDecodeRequestBody, nil)
conn.Close()
}
roots, err := handler.HandleFilesystemRequest(rq)
if err != nil {
respondWithError(conn, EHandler, err)
} else {
if err := encoder.Encode(&roots); err != nil {
return err
}
}
case RTInitialTransferRequest:
var rq InitialTransferRequest
if err := decoder.Decode(&rq); err != nil {
respondWithError(conn, EDecodeRequestBody, nil)
}
snapReader, err := handler.HandleInitialTransferRequest(rq)
if err != nil {
respondWithError(conn, EHandler, err)
} else {
chunker := NewChunker(snapReader)
_, err := io.Copy(conn, &chunker)
if err != nil {
return err
}
}
// TODO
default:
respondWithError(conn, EUnknownRequestType, nil)
conn.Close()
}
}
return nil return nil
} }
func (c ByteStreamRPC) FilesystemRequest(r FilesystemRequest) (roots []model.Filesystem, err error) { func respondWithError(conn io.Writer, id ErrorId, err error) error {
encodedReader := protobuf.Encode(r) return nil
c.conn.Write(NewChunker(encodedReader))
encodedResponseReader := NewUnchunker(c.conn.Read())
roots = protobuf.Decode(encodedResponse)
return
} }
func (c ByteStreamRPC) InitialTransferRequest(r InitialTransferRequest) (io.Read, error) { func (c ByteStreamRPC) FilesystemRequest(r FilesystemRequest) (roots []Filesystem, err error) {
return nil, nil
}
func (c ByteStreamRPC) InitialTransferRequest(r InitialTransferRequest) (io.Reader, error) {
// send request header using protobuf or similar // send request header using protobuf or similar
encodedReader := protobuf.Encode(r) return nil, nil
c.conn.Write(NewChunker(encodedReader))
// expect chunked response -> use unchunker on c.conn to read snapshot stream
return NewUmainnchunker(c.conn.Read())
} }
func (c ByteStreamRPC) IncrementalTransferRequest(r IncrementalTransferRequest) (io.Read, error) { func (c ByteStreamRPC) IncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error) {
return nil, nil
} }
type LocalRPC struct { type LocalRPC struct {
handler RPCHandler handler RPCHandler
} }
func ConnectLocalRPC(handler RPCHandler) LocalRPC { func ConnectLocalRPC(handler RPCHandler) LocalRPC {
return LocalRPC{handler} return LocalRPC{handler}
} }
func (c LocalRPC) FilesystemRequest(r FilesystemRequest) (root model.Filesystem, err error) { func (c LocalRPC) FilesystemRequest(r FilesystemRequest) (root Filesystem, err error) {
return c.handler.HandleFilesystemRequest(r) return c.handler.HandleFilesystemRequest(r)
} }
func (c LocalRPC) InitialTransferRequest(r InitialTransferRequest) (io.Read, error) { func (c LocalRPC) InitialTransferRequest(r InitialTransferRequest) (io.Reader, error) {
return c.handler.HandleInitialTransferRequest(r) return c.handler.HandleInitialTransferRequest(r)
} }
func (c LocalRPC) IncrementalTransferRequest(r IncrementalTransferRequest) (io.Read, error) { func (c LocalRPC) IncrementalTransferRequest(r IncrementalTransferRequest) (reader io.Reader, err error) {
return c.handler.HandleIncrementalTransferRequest(r) reader, err = c.handler.HandleIncrementalTransferRequest(r)
return
} }

View File

@ -1,29 +1,76 @@
package protocol package rpc
type RequestId uint8 import "io"
type Request struct {
RequestType RequestType type RequestId [16]byte
RequestId [16]byte // UUID type RequestType uint8
const (
RTProtocolVersionRequest RequestType = 1
RTFilesystemRequest = 16
RTInitialTransferRequest = 17
)
type RequestHeader struct {
Type RequestType
Id [16]byte // UUID
} }
type FilesystemRequest struct { type FilesystemRequest struct {
Request
Roots []string Roots []string
} }
type InitialTransferRequest struct { type InitialTransferRequest struct {
Request
Snapshot string // tank/my/db@ljlsdjflksdf Snapshot string // tank/my/db@ljlsdjflksdf
} }
func (r InitialTransferRequest) Respond(snapshotReader io.Reader) { func (r InitialTransferRequest) Respond(snapshotReader io.Reader) {
} }
type IncrementalTransferRequest struct { type IncrementalTransferRequest struct {
Request
FromSnapshot string FromSnapshot string
ToSnapshot string ToSnapshot string
} }
func (r IncrementalTransferRequest) Respond(snapshotReader io.Reader) { func (r IncrementalTransferRequest) Respond(snapshotReader io.Reader) {
} }
type ByteStreamRPCProtocolVersionRequest struct {
ClientVersion uint8
}
type ErrorId uint8
const (
ENoError ErrorId = 0
EDecodeHeader = 1
EUnknownRequestType = 2
EDecodeRequestBody = 3
EProtocolVersionMismatch = 4
EHandler = 5
)
type ResponseType uint8
const (
ROK ResponseType = 0
)
type ResponseHeader struct {
RequestId RequestId
ErrorId ErrorId
Message string
ResponseType ResponseType
}
func NewByteStreamRPCProtocolVersionRequest() ByteStreamRPCProtocolVersionRequest {
return ByteStreamRPCProtocolVersionRequest{
ClientVersion: ByteStreamRPCProtocolVersion,
}
}
func newUUID() [16]byte {
return [16]byte{}
}