From 8bdcdd5ec6c14b2125a9b7bb7c69cc6fe6ff1068 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sun, 30 Apr 2017 23:43:09 +0200 Subject: [PATCH] rpc: bytestream: listen: consistent error handling --- rpc/rpc.go | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/rpc/rpc.go b/rpc/rpc.go index 51e3da4..077c7c6 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -57,6 +57,8 @@ func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error { // a) a chunked response // b) or another JSON object + defer conn.Close() + decoder := json.NewDecoder(conn) encoder := json.NewEncoder(conn) @@ -65,8 +67,7 @@ func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error { var header RequestHeader = RequestHeader{} if err := decoder.Decode(&header); err != nil { respondWithError(encoder, EDecodeHeader, err) - conn.Close() - return err + return conn.Close() } switch header.Type { @@ -74,34 +75,37 @@ func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error { var rq ByteStreamRPCProtocolVersionRequest if err := decoder.Decode(&rq); err != nil { respondWithError(encoder, EDecodeRequestBody, nil) - conn.Close() + return conn.Close() } if rq.ClientVersion != ByteStreamRPCProtocolVersion { respondWithError(encoder, EProtocolVersionMismatch, nil) - conn.Close() + return conn.Close() } r := ResponseHeader{ - RequestId: header.Id, + RequestId: header.Id, + ResponseType: ROK, } if err := encoder.Encode(&r); err != nil { - return err + panic(err) } case RTFilesystemRequest: + var rq FilesystemRequest if err := decoder.Decode(&rq); err != nil { respondWithError(encoder, EDecodeRequestBody, nil) - conn.Close() + return conn.Close() } roots, err := handler.HandleFilesystemRequest(rq) if err != nil { respondWithError(encoder, EHandler, err) + return conn.Close() } else { if err := encoder.Encode(&roots); err != nil { - return err + panic(err) } } @@ -109,16 +113,18 @@ func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error { var rq InitialTransferRequest if err := decoder.Decode(&rq); err != nil { respondWithError(encoder, EDecodeRequestBody, nil) + return conn.Close() } snapReader, err := handler.HandleInitialTransferRequest(rq) if err != nil { respondWithError(encoder, EHandler, err) + return conn.Close() } else { chunker := NewChunker(snapReader) _, err := io.Copy(conn, &chunker) if err != nil { - return err + panic(err) } } @@ -127,6 +133,7 @@ func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error { var rq IncrementalTransferRequest if err := decoder.Decode(&rq); err != nil { respondWithError(encoder, EDecodeRequestBody, nil) + return conn.Close() } snapReader, err := handler.HandleIncrementalTransferRequest(rq) @@ -136,13 +143,13 @@ func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error { chunker := NewChunker(snapReader) _, err := io.Copy(conn, &chunker) if err != nil { - return err + panic(err) } } default: respondWithError(encoder, EUnknownRequestType, nil) - conn.Close() + return conn.Close() } }