diff --git a/cmd/main.go b/cmd/main.go index bb05a7c..4591dd0 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -208,6 +208,8 @@ func doPull(pull Pull, c *cli.Context, log jobrun.Logger) (err error) { return } + defer remote.Close() + fsr := rpc.FilesystemRequest{ Direction: rpc.DirectionPull, } diff --git a/rpc/rpc.go b/rpc/rpc.go index 787f9f9..0cb7351 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -8,6 +8,7 @@ import ( . "github.com/zrepl/zrepl/util" "github.com/zrepl/zrepl/zfs" "io" + "os" "reflect" ) @@ -16,6 +17,8 @@ type RPCRequester interface { FilesystemVersionsRequest(r FilesystemVersionsRequest) (versions []zfs.FilesystemVersion, err error) InitialTransferRequest(r InitialTransferRequest) (io.Reader, error) IncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error) + CloseRequest(r CloseRequest) (err error) + ForceClose() (err error) } type RPCHandler interface { @@ -129,6 +132,25 @@ func ListenByteStreamRPC(conn ByteStream, handler RPCHandler, log Logger) error } send(&r) + case RTCloseRequest: + + var rq CloseRequest + if err := recv(&rq); err != nil { + sendError(EDecodeRequestBody, err.Error()) + return conn.Close() + } + + if rq.Goodbye != "" { + log.Printf("close request with goodbye: %s", rq.Goodbye) + } + + send(&ResponseHeader{ + RequestId: header.Id, + ResponseType: ROK, + }) + + return conn.Close() + case RTFilesystemRequest: var rq FilesystemRequest @@ -271,6 +293,8 @@ func inferRequestType(v interface{}) (RequestType, error) { return RTInitialTransferRequest, nil case IncrementalTransferRequest: return RTIncrementalTransferRequest, nil + case CloseRequest: + return RTCloseRequest, nil default: return 0, errors.New(fmt.Sprintf("cannot infer request type for type '%v'", reflect.TypeOf(v))) @@ -383,6 +407,19 @@ func (c ByteStreamRPC) IncrementalTransferRequest(r IncrementalTransferRequest) return } +func (c ByteStreamRPC) CloseRequest(r CloseRequest) (err error) { + if err = c.sendRequestReceiveHeader(r, ROK); err != nil { + return + } + os.Stderr.WriteString("close request conn.Close()") + err = c.conn.Close() + return +} + +func (c ByteStreamRPC) ForceClose() (err error) { + return c.conn.Close() +} + type LocalRPC struct { handler RPCHandler } @@ -407,3 +444,7 @@ func (c LocalRPC) IncrementalTransferRequest(r IncrementalTransferRequest) (read reader, err = c.handler.HandleIncrementalTransferRequest(r) return } + +func (c LocalRPC) CloseRequest(r CloseRequest) error { return nil } + +func (c LocalRPC) ForceClose() error { return nil } diff --git a/rpc/structs.go b/rpc/structs.go index 831d56b..96d73e3 100644 --- a/rpc/structs.go +++ b/rpc/structs.go @@ -12,6 +12,7 @@ const ( RTFilesystemVersionsRequest = 0x11 RTInitialTransferRequest = 0x12 RTIncrementalTransferRequest = 0x13 + RTCloseRequest = 0x20 ) type RequestHeader struct { @@ -58,6 +59,10 @@ type ByteStreamRPCProtocolVersionRequest struct { ClientVersion uint8 } +type CloseRequest struct { + Goodbye string +} + type ErrorId uint8 const (