rpc: close outgoing SSH connection on exit.

This commit is contained in:
Christian Schwarz 2017-05-14 12:28:19 +02:00
parent 04206ebd8b
commit 48a4e8033a
3 changed files with 48 additions and 0 deletions

View File

@ -208,6 +208,8 @@ func doPull(pull Pull, c *cli.Context, log jobrun.Logger) (err error) {
return return
} }
defer remote.Close()
fsr := rpc.FilesystemRequest{ fsr := rpc.FilesystemRequest{
Direction: rpc.DirectionPull, Direction: rpc.DirectionPull,
} }

View File

@ -8,6 +8,7 @@ import (
. "github.com/zrepl/zrepl/util" . "github.com/zrepl/zrepl/util"
"github.com/zrepl/zrepl/zfs" "github.com/zrepl/zrepl/zfs"
"io" "io"
"os"
"reflect" "reflect"
) )
@ -16,6 +17,8 @@ type RPCRequester interface {
FilesystemVersionsRequest(r FilesystemVersionsRequest) (versions []zfs.FilesystemVersion, err error) FilesystemVersionsRequest(r FilesystemVersionsRequest) (versions []zfs.FilesystemVersion, err error)
InitialTransferRequest(r InitialTransferRequest) (io.Reader, error) InitialTransferRequest(r InitialTransferRequest) (io.Reader, error)
IncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error) IncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error)
CloseRequest(r CloseRequest) (err error)
ForceClose() (err error)
} }
type RPCHandler interface { type RPCHandler interface {
@ -129,6 +132,25 @@ func ListenByteStreamRPC(conn ByteStream, handler RPCHandler, log Logger) error
} }
send(&r) send(&r)
case RTCloseRequest:
var rq CloseRequest
if err := recv(&rq); err != nil {
sendError(EDecodeRequestBody, err.Error())
return conn.Close()
}
if rq.Goodbye != "" {
log.Printf("close request with goodbye: %s", rq.Goodbye)
}
send(&ResponseHeader{
RequestId: header.Id,
ResponseType: ROK,
})
return conn.Close()
case RTFilesystemRequest: case RTFilesystemRequest:
var rq FilesystemRequest var rq FilesystemRequest
@ -271,6 +293,8 @@ func inferRequestType(v interface{}) (RequestType, error) {
return RTInitialTransferRequest, nil return RTInitialTransferRequest, nil
case IncrementalTransferRequest: case IncrementalTransferRequest:
return RTIncrementalTransferRequest, nil return RTIncrementalTransferRequest, nil
case CloseRequest:
return RTCloseRequest, nil
default: default:
return 0, errors.New(fmt.Sprintf("cannot infer request type for type '%v'", return 0, errors.New(fmt.Sprintf("cannot infer request type for type '%v'",
reflect.TypeOf(v))) reflect.TypeOf(v)))
@ -383,6 +407,19 @@ func (c ByteStreamRPC) IncrementalTransferRequest(r IncrementalTransferRequest)
return return
} }
func (c ByteStreamRPC) CloseRequest(r CloseRequest) (err error) {
if err = c.sendRequestReceiveHeader(r, ROK); err != nil {
return
}
os.Stderr.WriteString("close request conn.Close()")
err = c.conn.Close()
return
}
func (c ByteStreamRPC) ForceClose() (err error) {
return c.conn.Close()
}
type LocalRPC struct { type LocalRPC struct {
handler RPCHandler handler RPCHandler
} }
@ -407,3 +444,7 @@ func (c LocalRPC) IncrementalTransferRequest(r IncrementalTransferRequest) (read
reader, err = c.handler.HandleIncrementalTransferRequest(r) reader, err = c.handler.HandleIncrementalTransferRequest(r)
return return
} }
func (c LocalRPC) CloseRequest(r CloseRequest) error { return nil }
func (c LocalRPC) ForceClose() error { return nil }

View File

@ -12,6 +12,7 @@ const (
RTFilesystemVersionsRequest = 0x11 RTFilesystemVersionsRequest = 0x11
RTInitialTransferRequest = 0x12 RTInitialTransferRequest = 0x12
RTIncrementalTransferRequest = 0x13 RTIncrementalTransferRequest = 0x13
RTCloseRequest = 0x20
) )
type RequestHeader struct { type RequestHeader struct {
@ -58,6 +59,10 @@ type ByteStreamRPCProtocolVersionRequest struct {
ClientVersion uint8 ClientVersion uint8
} }
type CloseRequest struct {
Goodbye string
}
type ErrorId uint8 type ErrorId uint8
const ( const (