mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-22 00:13:52 +01:00
rpc: implement respondWithError
This commit is contained in:
parent
6da2deb96e
commit
08370689c8
33
rpc/rpc.go
33
rpc/rpc.go
@ -64,7 +64,7 @@ func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error {
|
|||||||
|
|
||||||
var header RequestHeader = RequestHeader{}
|
var header RequestHeader = RequestHeader{}
|
||||||
if err := decoder.Decode(&header); err != nil {
|
if err := decoder.Decode(&header); err != nil {
|
||||||
respondWithError(conn, EDecodeHeader, err)
|
respondWithError(encoder, EDecodeHeader, err)
|
||||||
conn.Close()
|
conn.Close()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -73,12 +73,12 @@ func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error {
|
|||||||
case RTProtocolVersionRequest:
|
case RTProtocolVersionRequest:
|
||||||
var rq ByteStreamRPCProtocolVersionRequest
|
var rq ByteStreamRPCProtocolVersionRequest
|
||||||
if err := decoder.Decode(&rq); err != nil {
|
if err := decoder.Decode(&rq); err != nil {
|
||||||
respondWithError(conn, EDecodeRequestBody, nil)
|
respondWithError(encoder, EDecodeRequestBody, nil)
|
||||||
conn.Close()
|
conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
if rq.ClientVersion != ByteStreamRPCProtocolVersion {
|
if rq.ClientVersion != ByteStreamRPCProtocolVersion {
|
||||||
respondWithError(conn, EProtocolVersionMismatch, nil)
|
respondWithError(encoder, EProtocolVersionMismatch, nil)
|
||||||
conn.Close()
|
conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -92,13 +92,13 @@ func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error {
|
|||||||
case RTFilesystemRequest:
|
case RTFilesystemRequest:
|
||||||
var rq FilesystemRequest
|
var rq FilesystemRequest
|
||||||
if err := decoder.Decode(&rq); err != nil {
|
if err := decoder.Decode(&rq); err != nil {
|
||||||
respondWithError(conn, EDecodeRequestBody, nil)
|
respondWithError(encoder, EDecodeRequestBody, nil)
|
||||||
conn.Close()
|
conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
roots, err := handler.HandleFilesystemRequest(rq)
|
roots, err := handler.HandleFilesystemRequest(rq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
respondWithError(conn, EHandler, err)
|
respondWithError(encoder, EHandler, err)
|
||||||
} else {
|
} else {
|
||||||
if err := encoder.Encode(&roots); err != nil {
|
if err := encoder.Encode(&roots); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -108,12 +108,12 @@ func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error {
|
|||||||
case RTInitialTransferRequest:
|
case RTInitialTransferRequest:
|
||||||
var rq InitialTransferRequest
|
var rq InitialTransferRequest
|
||||||
if err := decoder.Decode(&rq); err != nil {
|
if err := decoder.Decode(&rq); err != nil {
|
||||||
respondWithError(conn, EDecodeRequestBody, nil)
|
respondWithError(encoder, EDecodeRequestBody, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
snapReader, err := handler.HandleInitialTransferRequest(rq)
|
snapReader, err := handler.HandleInitialTransferRequest(rq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
respondWithError(conn, EHandler, err)
|
respondWithError(encoder, EHandler, err)
|
||||||
} else {
|
} else {
|
||||||
chunker := NewChunker(snapReader)
|
chunker := NewChunker(snapReader)
|
||||||
_, err := io.Copy(conn, &chunker)
|
_, err := io.Copy(conn, &chunker)
|
||||||
@ -126,12 +126,12 @@ func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error {
|
|||||||
|
|
||||||
var rq IncrementalTransferRequest
|
var rq IncrementalTransferRequest
|
||||||
if err := decoder.Decode(&rq); err != nil {
|
if err := decoder.Decode(&rq); err != nil {
|
||||||
respondWithError(conn, EDecodeRequestBody, nil)
|
respondWithError(encoder, EDecodeRequestBody, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
snapReader, err := handler.HandleIncrementalTransferRequest(rq)
|
snapReader, err := handler.HandleIncrementalTransferRequest(rq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
respondWithError(conn, EHandler, err)
|
respondWithError(encoder, EHandler, err)
|
||||||
} else {
|
} else {
|
||||||
chunker := NewChunker(snapReader)
|
chunker := NewChunker(snapReader)
|
||||||
_, err := io.Copy(conn, &chunker)
|
_, err := io.Copy(conn, &chunker)
|
||||||
@ -141,7 +141,7 @@ func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
respondWithError(conn, EUnknownRequestType, nil)
|
respondWithError(encoder, EUnknownRequestType, nil)
|
||||||
conn.Close()
|
conn.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -149,8 +149,17 @@ func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func respondWithError(conn io.Writer, id ErrorId, err error) error {
|
func respondWithError(encoder *json.Encoder, id ErrorId, err error) {
|
||||||
return nil
|
|
||||||
|
r := ResponseHeader{
|
||||||
|
ErrorId: id,
|
||||||
|
ResponseType: RNONE,
|
||||||
|
Message: err.Error(),
|
||||||
|
}
|
||||||
|
if err := encoder.Encode(&r); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func inferRequestType(v interface{}) (RequestType, error) {
|
func inferRequestType(v interface{}) (RequestType, error) {
|
||||||
|
@ -56,7 +56,8 @@ const (
|
|||||||
type ResponseType uint8
|
type ResponseType uint8
|
||||||
|
|
||||||
const (
|
const (
|
||||||
ROK ResponseType = 1
|
RNONE ResponseType = 0
|
||||||
|
ROK = 1
|
||||||
RFilesystems = 2
|
RFilesystems = 2
|
||||||
RChunkedStream = 3
|
RChunkedStream = 3
|
||||||
)
|
)
|
||||||
|
Loading…
Reference in New Issue
Block a user