From c1aed10e8b4a2986b43d170ef9c455a1ab13a3b1 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sat, 15 Apr 2017 18:31:14 +0200 Subject: [PATCH] Further drafting of rpc module. Also: fix typo in model definitions. --- model/model.go | 10 ++-- rpc/rpc.go | 155 ++++++++++++++++++++++++++++++++++++++----------- rpc/structs.go | 65 ++++++++++++++++++--- 3 files changed, 183 insertions(+), 47 deletions(-) diff --git a/model/model.go b/model/model.go index 8a930a1..2e25d9e 100644 --- a/model/model.go +++ b/model/model.go @@ -1,14 +1,14 @@ package model -type Filesytem struct { +type Filesystem struct { Name string - Children []Filesytem + Children []Filesystem Snapshots []Snapshot } type FilesytemMapping struct { - From Filesytem - To Filesytem + From Filesystem + To Filesystem } type Snapshot struct { @@ -16,7 +16,7 @@ type Snapshot struct { } type Pool struct { - Root Filesytem + Root Filesystem } type SSHTransport struct { diff --git a/rpc/rpc.go b/rpc/rpc.go index f4ad965..66afe19 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -1,76 +1,165 @@ package rpc +import ( + "encoding/json" + "io" + + . "github.com/zrepl/zrepl/model" + . "github.com/zrepl/zrepl/util" +) + type RPCRequester interface { - FilesystemRequest(r FilesystemRequest) (root model.Filesystem, err error) - InitialTransferRequest(r InitialTransferRequest) (io.Read, error) - IncrementalTransferRequest(r IncrementalTransferRequest) (io.Read, error) + FilesystemRequest(r FilesystemRequest) (root Filesystem, err error) + InitialTransferRequest(r InitialTransferRequest) (io.Reader, error) + IncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error) } type RPCHandler interface { - HandleFilesystemRequest(r FilesystemRequest) (root model.Filesystem, err error) - HandleInitialTransferRequest(r InitialTransferRequest) (io.Read, error) - HandleIncrementalTransferRequestRequest(r IncrementalTransferRequest) (io.Read, error) + HandleFilesystemRequest(r FilesystemRequest) (root Filesystem, err error) + HandleInitialTransferRequest(r InitialTransferRequest) (io.Reader, error) + HandleIncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error) } +const ByteStreamRPCProtocolVersion = 1 type ByteStreamRPC struct { - conn io.ReadWriteCloser + conn io.ReadWriteCloser + encoder *json.Encoder + decoder *json.Decoder } func ConnectByteStreamRPC(conn io.ReadWriteCloser) (ByteStreamRPC, error) { // TODO do ssh connection to transport, establish TCP-like communication channel - conn := sshtransport.New() rpc := ByteStreamRPC{ - conn: conn, + conn: conn, + encoder: json.NewEncoder(conn), + decoder: json.NewDecoder(conn), } - return conn, nil + + // Assert protocol versions are equal + req := NewByteStreamRPCProtocolVersionRequest() + rpc.encoder.Encode(&req) + return rpc, nil } -func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) (error) { - // Read from connection, decode wire protocol, route requests to handler +func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error { + + // A request consists of two subsequent JSON objects + // Object 1: RequestHeader => contains type of Request Body + // Object 2: RequestBody, e.g. IncrementalTransferRequest + // A response is always a ResponseHeader followed by bytes to be interpreted + // as indicated by the ResponseHeader.ResponseType, e.g. + // a) a chunked response + // b) or another JSON object + + decoder := json.NewDecoder(conn) + encoder := json.NewEncoder(conn) + + for { + + var header RequestHeader = RequestHeader{} + if err := decoder.Decode(&header); err != nil { + respondWithError(conn, EDecodeHeader, err) + conn.Close() + return err + } + + switch header.Type { + case RTProtocolVersionRequest: + var rq ByteStreamRPCProtocolVersionRequest + if err := decoder.Decode(&rq); err != nil { + respondWithError(conn, EDecodeRequestBody, nil) + conn.Close() + } + + if rq.ClientVersion != ByteStreamRPCProtocolVersion { + respondWithError(conn, EProtocolVersionMismatch, nil) + conn.Close() + } + + r := ResponseHeader{ + RequestId: header.Id, + } + if err := encoder.Encode(&r); err != nil { + return err + } + + case RTFilesystemRequest: + var rq FilesystemRequest + if err := decoder.Decode(&rq); err != nil { + respondWithError(conn, EDecodeRequestBody, nil) + conn.Close() + } + + roots, err := handler.HandleFilesystemRequest(rq) + if err != nil { + respondWithError(conn, EHandler, err) + } else { + if err := encoder.Encode(&roots); err != nil { + return err + } + } + + case RTInitialTransferRequest: + var rq InitialTransferRequest + if err := decoder.Decode(&rq); err != nil { + respondWithError(conn, EDecodeRequestBody, nil) + } + + snapReader, err := handler.HandleInitialTransferRequest(rq) + if err != nil { + respondWithError(conn, EHandler, err) + } else { + chunker := NewChunker(snapReader) + _, err := io.Copy(conn, &chunker) + if err != nil { + return err + } + } + // TODO + default: + respondWithError(conn, EUnknownRequestType, nil) + conn.Close() + } + } + return nil } -func (c ByteStreamRPC) FilesystemRequest(r FilesystemRequest) (roots []model.Filesystem, err error) { - encodedReader := protobuf.Encode(r) - c.conn.Write(NewChunker(encodedReader)) - encodedResponseReader := NewUnchunker(c.conn.Read()) - roots = protobuf.Decode(encodedResponse) - return +func respondWithError(conn io.Writer, id ErrorId, err error) error { + return nil } -func (c ByteStreamRPC) InitialTransferRequest(r InitialTransferRequest) (io.Read, error) { +func (c ByteStreamRPC) FilesystemRequest(r FilesystemRequest) (roots []Filesystem, err error) { + return nil, nil +} + +func (c ByteStreamRPC) InitialTransferRequest(r InitialTransferRequest) (io.Reader, error) { // send request header using protobuf or similar - encodedReader := protobuf.Encode(r) - c.conn.Write(NewChunker(encodedReader)) - // expect chunked response -> use unchunker on c.conn to read snapshot stream - return NewUmainnchunker(c.conn.Read()) + return nil, nil } -func (c ByteStreamRPC) IncrementalTransferRequest(r IncrementalTransferRequest) (io.Read, error) { - +func (c ByteStreamRPC) IncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error) { + return nil, nil } - type LocalRPC struct { handler RPCHandler } - func ConnectLocalRPC(handler RPCHandler) LocalRPC { return LocalRPC{handler} } -func (c LocalRPC) FilesystemRequest(r FilesystemRequest) (root model.Filesystem, err error) { +func (c LocalRPC) FilesystemRequest(r FilesystemRequest) (root Filesystem, err error) { return c.handler.HandleFilesystemRequest(r) } -func (c LocalRPC) InitialTransferRequest(r InitialTransferRequest) (io.Read, error) { +func (c LocalRPC) InitialTransferRequest(r InitialTransferRequest) (io.Reader, error) { return c.handler.HandleInitialTransferRequest(r) } -func (c LocalRPC) IncrementalTransferRequest(r IncrementalTransferRequest) (io.Read, error) { - return c.handler.HandleIncrementalTransferRequest(r) +func (c LocalRPC) IncrementalTransferRequest(r IncrementalTransferRequest) (reader io.Reader, err error) { + reader, err = c.handler.HandleIncrementalTransferRequest(r) + return } - - diff --git a/rpc/structs.go b/rpc/structs.go index 455f22c..1a501b7 100644 --- a/rpc/structs.go +++ b/rpc/structs.go @@ -1,29 +1,76 @@ -package protocol +package rpc -type RequestId uint8 -type Request struct { - RequestType RequestType - RequestId [16]byte // UUID +import "io" + +type RequestId [16]byte +type RequestType uint8 + +const ( + RTProtocolVersionRequest RequestType = 1 + RTFilesystemRequest = 16 + RTInitialTransferRequest = 17 +) + +type RequestHeader struct { + Type RequestType + Id [16]byte // UUID } type FilesystemRequest struct { - Request Roots []string } type InitialTransferRequest struct { - Request Snapshot string // tank/my/db@ljlsdjflksdf } + func (r InitialTransferRequest) Respond(snapshotReader io.Reader) { } type IncrementalTransferRequest struct { - Request FromSnapshot string - ToSnapshot string + ToSnapshot string } + func (r IncrementalTransferRequest) Respond(snapshotReader io.Reader) { } + +type ByteStreamRPCProtocolVersionRequest struct { + ClientVersion uint8 +} + +type ErrorId uint8 + +const ( + ENoError ErrorId = 0 + EDecodeHeader = 1 + EUnknownRequestType = 2 + EDecodeRequestBody = 3 + EProtocolVersionMismatch = 4 + EHandler = 5 +) + +type ResponseType uint8 + +const ( + ROK ResponseType = 0 +) + +type ResponseHeader struct { + RequestId RequestId + ErrorId ErrorId + Message string + ResponseType ResponseType +} + +func NewByteStreamRPCProtocolVersionRequest() ByteStreamRPCProtocolVersionRequest { + return ByteStreamRPCProtocolVersionRequest{ + ClientVersion: ByteStreamRPCProtocolVersion, + } +} + +func newUUID() [16]byte { + return [16]byte{} +}