From 4494afe47f9636881a795e84101fff31b6fdf64a Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sun, 16 Apr 2017 21:38:31 +0200 Subject: [PATCH] Finish implementation of RPC. --- model/model.go | 1 + rpc/rpc.go | 146 ++++++++++++++++++++++++++++++++++++++++++----- rpc/structs.go | 5 +- util/chunking.go | 4 +- 4 files changed, 139 insertions(+), 17 deletions(-) diff --git a/model/model.go b/model/model.go index 2e25d9e..399a12b 100644 --- a/model/model.go +++ b/model/model.go @@ -2,6 +2,7 @@ package model type Filesystem struct { Name string + Parent *Filesystem Children []Filesystem Snapshots []Snapshot } diff --git a/rpc/rpc.go b/rpc/rpc.go index 66afe19..3c17d94 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -6,16 +6,19 @@ import ( . "github.com/zrepl/zrepl/model" . "github.com/zrepl/zrepl/util" + "errors" + "fmt" + "reflect" ) type RPCRequester interface { - FilesystemRequest(r FilesystemRequest) (root Filesystem, err error) + FilesystemRequest(r FilesystemRequest) (roots []Filesystem, err error) InitialTransferRequest(r InitialTransferRequest) (io.Reader, error) IncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error) } type RPCHandler interface { - HandleFilesystemRequest(r FilesystemRequest) (root Filesystem, err error) + HandleFilesystemRequest(r FilesystemRequest) (roots []Filesystem, err error) HandleInitialTransferRequest(r InitialTransferRequest) (io.Reader, error) HandleIncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error) } @@ -28,7 +31,7 @@ type ByteStreamRPC struct { decoder *json.Decoder } -func ConnectByteStreamRPC(conn io.ReadWriteCloser) (ByteStreamRPC, error) { +func ConnectByteStreamRPC(conn io.ReadWriteCloser) (RPCRequester, error) { // TODO do ssh connection to transport, establish TCP-like communication channel rpc := ByteStreamRPC{ conn: conn, @@ -37,8 +40,11 @@ func ConnectByteStreamRPC(conn io.ReadWriteCloser) (ByteStreamRPC, error) { } // Assert protocol versions are equal - req := NewByteStreamRPCProtocolVersionRequest() - rpc.encoder.Encode(&req) + err := rpc.ProtocolVersionRequest() + if err != nil { + return nil, err + } + return rpc, nil } @@ -116,7 +122,25 @@ func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error { return err } } - // TODO + + case RTIncrementalTransferRequest: + + var rq IncrementalTransferRequest + if err := decoder.Decode(&rq); err != nil { + respondWithError(conn, EDecodeRequestBody, nil) + } + + snapReader, err := handler.HandleIncrementalTransferRequest(rq) + if err != nil { + respondWithError(conn, EHandler, err) + } else { + chunker := NewChunker(snapReader) + _, err := io.Copy(conn, &chunker) + if err != nil { + return err + } + } + default: respondWithError(conn, EUnknownRequestType, nil) conn.Close() @@ -130,28 +154,122 @@ func respondWithError(conn io.Writer, id ErrorId, err error) error { return nil } +func inferRequestType(v interface{}) (RequestType, error) { + switch v.(type) { + case ByteStreamRPCProtocolVersionRequest: + return RTProtocolVersionRequest, nil + case FilesystemRequest: + return RTFilesystemRequest, nil + case InitialTransferRequest: + return RTInitialTransferRequest, nil + default: + return 0, errors.New(fmt.Sprintf("cannot infer request type for type '%v'", + reflect.TypeOf(v))) + } +} + +func genUUID() [16]byte { + return [16]byte{} // TODO +} + +func (c ByteStreamRPC) sendRequest(v interface{}) (err error) { + + var rt RequestType + + if rt, err = inferRequestType(v); err != nil { + return + } + + h := RequestHeader{ + Type: rt, + Id: genUUID(), + } + + if err = c.encoder.Encode(h); err != nil { + return + } + if err = c.encoder.Encode(v); err != nil { + return + } + + return +} + +func (c ByteStreamRPC) expectResponseType(rt ResponseType) (err error) { + var h ResponseHeader + if err = c.decoder.Decode(&h); err != nil { + return + } + + if h.ResponseType != rt { + return errors.New("unexpected response type in response header") + } + return +} + +func (c ByteStreamRPC) sendRequestReceiveHeader(request interface{}, rt ResponseType) (err error) { + + if err = c.sendRequest(request); err != nil { + return err + } + + if err = c.expectResponseType(rt); err != nil { + return err + } + + return nil +} + +func (c ByteStreamRPC) ProtocolVersionRequest() (err error) { + b := ByteStreamRPCProtocolVersionRequest{ + ClientVersion: ByteStreamRPCProtocolVersion, + } + + // OK response means the remote side can cope with our protocol version + return c.sendRequestReceiveHeader(b, ROK) +} + func (c ByteStreamRPC) FilesystemRequest(r FilesystemRequest) (roots []Filesystem, err error) { - return nil, nil + + if err = c.sendRequestReceiveHeader(r, RFilesystems); err != nil { + return + } + + roots = make([]Filesystem, 0) + + if err = c.decoder.Decode(roots); err != nil { + return + } + + return } -func (c ByteStreamRPC) InitialTransferRequest(r InitialTransferRequest) (io.Reader, error) { - // send request header using protobuf or similar - return nil, nil +func (c ByteStreamRPC) InitialTransferRequest(r InitialTransferRequest) (unchunker io.Reader, err error) { + + if err = c.sendRequestReceiveHeader(r, RChunkedStream); err != nil { + return + } + unchunker = NewUnchunker(c.conn) + return } -func (c ByteStreamRPC) IncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error) { - return nil, nil +func (c ByteStreamRPC) IncrementalTransferRequest(r IncrementalTransferRequest) (unchunker io.Reader, err error) { + if err = c.sendRequestReceiveHeader(r, RChunkedStream); err != nil { + return + } + unchunker = NewUnchunker(c.conn) + return } type LocalRPC struct { handler RPCHandler } -func ConnectLocalRPC(handler RPCHandler) LocalRPC { +func ConnectLocalRPC(handler RPCHandler) RPCRequester { return LocalRPC{handler} } -func (c LocalRPC) FilesystemRequest(r FilesystemRequest) (root Filesystem, err error) { +func (c LocalRPC) FilesystemRequest(r FilesystemRequest) (roots []Filesystem, err error) { return c.handler.HandleFilesystemRequest(r) } diff --git a/rpc/structs.go b/rpc/structs.go index 1a501b7..aa4cab2 100644 --- a/rpc/structs.go +++ b/rpc/structs.go @@ -9,6 +9,7 @@ const ( RTProtocolVersionRequest RequestType = 1 RTFilesystemRequest = 16 RTInitialTransferRequest = 17 + RTIncrementalTransferRequest = 18 ) type RequestHeader struct { @@ -55,7 +56,9 @@ const ( type ResponseType uint8 const ( - ROK ResponseType = 0 + ROK ResponseType = 1 + RFilesystems = 2 + RChunkedStream = 3 ) type ResponseHeader struct { diff --git a/util/chunking.go b/util/chunking.go index d34c84b..2ba9cb5 100644 --- a/util/chunking.go +++ b/util/chunking.go @@ -15,8 +15,8 @@ type Unchunker struct { remainingChunkBytes uint32 } -func NewUnchunker(conn io.Reader) Unchunker { - return Unchunker{ +func NewUnchunker(conn io.Reader) *Unchunker { + return &Unchunker{ in: conn, remainingChunkBytes: 0, }