mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-25 01:44:43 +01:00
rpc: bytestream: listen: consistent error handling
This commit is contained in:
parent
ec4284f80c
commit
8bdcdd5ec6
29
rpc/rpc.go
29
rpc/rpc.go
@ -57,6 +57,8 @@ func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error {
|
|||||||
// a) a chunked response
|
// a) a chunked response
|
||||||
// b) or another JSON object
|
// b) or another JSON object
|
||||||
|
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
decoder := json.NewDecoder(conn)
|
decoder := json.NewDecoder(conn)
|
||||||
encoder := json.NewEncoder(conn)
|
encoder := json.NewEncoder(conn)
|
||||||
|
|
||||||
@ -65,8 +67,7 @@ func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error {
|
|||||||
var header RequestHeader = RequestHeader{}
|
var header RequestHeader = RequestHeader{}
|
||||||
if err := decoder.Decode(&header); err != nil {
|
if err := decoder.Decode(&header); err != nil {
|
||||||
respondWithError(encoder, EDecodeHeader, err)
|
respondWithError(encoder, EDecodeHeader, err)
|
||||||
conn.Close()
|
return conn.Close()
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
switch header.Type {
|
switch header.Type {
|
||||||
@ -74,34 +75,37 @@ func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error {
|
|||||||
var rq ByteStreamRPCProtocolVersionRequest
|
var rq ByteStreamRPCProtocolVersionRequest
|
||||||
if err := decoder.Decode(&rq); err != nil {
|
if err := decoder.Decode(&rq); err != nil {
|
||||||
respondWithError(encoder, EDecodeRequestBody, nil)
|
respondWithError(encoder, EDecodeRequestBody, nil)
|
||||||
conn.Close()
|
return conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
if rq.ClientVersion != ByteStreamRPCProtocolVersion {
|
if rq.ClientVersion != ByteStreamRPCProtocolVersion {
|
||||||
respondWithError(encoder, EProtocolVersionMismatch, nil)
|
respondWithError(encoder, EProtocolVersionMismatch, nil)
|
||||||
conn.Close()
|
return conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
r := ResponseHeader{
|
r := ResponseHeader{
|
||||||
RequestId: header.Id,
|
RequestId: header.Id,
|
||||||
|
ResponseType: ROK,
|
||||||
}
|
}
|
||||||
if err := encoder.Encode(&r); err != nil {
|
if err := encoder.Encode(&r); err != nil {
|
||||||
return err
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
case RTFilesystemRequest:
|
case RTFilesystemRequest:
|
||||||
|
|
||||||
var rq FilesystemRequest
|
var rq FilesystemRequest
|
||||||
if err := decoder.Decode(&rq); err != nil {
|
if err := decoder.Decode(&rq); err != nil {
|
||||||
respondWithError(encoder, EDecodeRequestBody, nil)
|
respondWithError(encoder, EDecodeRequestBody, nil)
|
||||||
conn.Close()
|
return conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
roots, err := handler.HandleFilesystemRequest(rq)
|
roots, err := handler.HandleFilesystemRequest(rq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
respondWithError(encoder, EHandler, err)
|
respondWithError(encoder, EHandler, err)
|
||||||
|
return conn.Close()
|
||||||
} else {
|
} else {
|
||||||
if err := encoder.Encode(&roots); err != nil {
|
if err := encoder.Encode(&roots); err != nil {
|
||||||
return err
|
panic(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -109,16 +113,18 @@ func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error {
|
|||||||
var rq InitialTransferRequest
|
var rq InitialTransferRequest
|
||||||
if err := decoder.Decode(&rq); err != nil {
|
if err := decoder.Decode(&rq); err != nil {
|
||||||
respondWithError(encoder, EDecodeRequestBody, nil)
|
respondWithError(encoder, EDecodeRequestBody, nil)
|
||||||
|
return conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
snapReader, err := handler.HandleInitialTransferRequest(rq)
|
snapReader, err := handler.HandleInitialTransferRequest(rq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
respondWithError(encoder, EHandler, err)
|
respondWithError(encoder, EHandler, err)
|
||||||
|
return conn.Close()
|
||||||
} else {
|
} else {
|
||||||
chunker := NewChunker(snapReader)
|
chunker := NewChunker(snapReader)
|
||||||
_, err := io.Copy(conn, &chunker)
|
_, err := io.Copy(conn, &chunker)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
panic(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -127,6 +133,7 @@ func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error {
|
|||||||
var rq IncrementalTransferRequest
|
var rq IncrementalTransferRequest
|
||||||
if err := decoder.Decode(&rq); err != nil {
|
if err := decoder.Decode(&rq); err != nil {
|
||||||
respondWithError(encoder, EDecodeRequestBody, nil)
|
respondWithError(encoder, EDecodeRequestBody, nil)
|
||||||
|
return conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
snapReader, err := handler.HandleIncrementalTransferRequest(rq)
|
snapReader, err := handler.HandleIncrementalTransferRequest(rq)
|
||||||
@ -136,13 +143,13 @@ func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error {
|
|||||||
chunker := NewChunker(snapReader)
|
chunker := NewChunker(snapReader)
|
||||||
_, err := io.Copy(conn, &chunker)
|
_, err := io.Copy(conn, &chunker)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
panic(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
respondWithError(encoder, EUnknownRequestType, nil)
|
respondWithError(encoder, EUnknownRequestType, nil)
|
||||||
conn.Close()
|
return conn.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user