diff --git a/cmd/handler.go b/cmd/handler.go index 5c9e958..93a08d3 100644 --- a/cmd/handler.go +++ b/cmd/handler.go @@ -24,6 +24,9 @@ func (h Handler) HandleFilesystemRequest(r rpc.FilesystemRequest) (roots []model return } +func (h Handler) HandleFilesystemVersionsRequest(r rpc.FilesystemVersionsRequest) (versions []zfs.FilesystemVersion, err error) { +} + func (h Handler) HandleInitialTransferRequest(r rpc.InitialTransferRequest) (io.Reader, error) { // TODO ACL return zfs.InitialSend(r.Snapshot) diff --git a/rpc/rpc.go b/rpc/rpc.go index cc9a175..4214670 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -6,18 +6,24 @@ import ( "fmt" . "github.com/zrepl/zrepl/model" . "github.com/zrepl/zrepl/util" + "github.com/zrepl/zrepl/zfs" "io" "reflect" ) type RPCRequester interface { FilesystemRequest(r FilesystemRequest) (roots []Filesystem, err error) + FilesystemVersionsRequest(r FilesystemVersionsRequest) (versions []zfs.FilesystemVersion, err error) InitialTransferRequest(r InitialTransferRequest) (io.Reader, error) IncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error) } type RPCHandler interface { HandleFilesystemRequest(r FilesystemRequest) (roots []Filesystem, err error) + + // returned versions ordered by birthtime, oldest first + HandleFilesystemVersionsRequest(r FilesystemVersionsRequest) (versions []zfs.FilesystemVersion, err error) + HandleInitialTransferRequest(r InitialTransferRequest) (io.Reader, error) HandleIncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error) } @@ -116,6 +122,31 @@ func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error { } } + case RTFilesystemVersionsRequest: + + var rq FilesystemVersionsRequest + if err := decoder.Decode(&rq); err != nil { + respondWithError(encoder, EDecodeRequestBody, err) + return err + } + + diff, err := handler.HandleFilesystemVersionsRequest(rq) + if err != nil { + respondWithError(encoder, EHandler, err) + return err + } else { + r := ResponseHeader{ + RequestId: header.Id, + ResponseType: RFilesystemDiff, + } + if err := encoder.Encode(&r); err != nil { + panic(err) + } + if err := encoder.Encode(&diff); err != nil { + panic(err) + } + } + case RTInitialTransferRequest: var rq InitialTransferRequest if err := decoder.Decode(&rq); err != nil { @@ -182,6 +213,8 @@ func inferRequestType(v interface{}) (RequestType, error) { return RTProtocolVersionRequest, nil case FilesystemRequest: return RTFilesystemRequest, nil + case FilesystemVersionsRequest: + return RTFilesystemVersionsRequest, nil case InitialTransferRequest: return RTInitialTransferRequest, nil default: @@ -266,6 +299,16 @@ func (c ByteStreamRPC) FilesystemRequest(r FilesystemRequest) (roots []Filesyste return } +func (c ByteStreamRPC) FilesystemVersionsRequest(r FilesystemVersionsRequest) (versions []zfs.FilesystemVersion, err error) { + + if err = c.sendRequestReceiveHeader(r, RFilesystemDiff); err != nil { + return + } + + err = c.decoder.Decode(&versions) + return +} + func (c ByteStreamRPC) InitialTransferRequest(r InitialTransferRequest) (unchunker io.Reader, err error) { if err = c.sendRequestReceiveHeader(r, RChunkedStream); err != nil { @@ -295,6 +338,10 @@ func (c LocalRPC) FilesystemRequest(r FilesystemRequest) (roots []Filesystem, er return c.handler.HandleFilesystemRequest(r) } +func (c LocalRPC) FilesystemVersionsRequest(r FilesystemVersionsRequest) (versions []zfs.FilesystemVersion, err error) { + return c.handler.HandleFilesystemVersionsRequest(r) +} + func (c LocalRPC) InitialTransferRequest(r InitialTransferRequest) (io.Reader, error) { return c.handler.HandleInitialTransferRequest(r) } diff --git a/rpc/structs.go b/rpc/structs.go index 4726141..359ffec 100644 --- a/rpc/structs.go +++ b/rpc/structs.go @@ -1,6 +1,7 @@ package rpc import "io" +import "github.com/zrepl/zrepl/zfs" type RequestId [16]byte type RequestType uint8 @@ -10,6 +11,7 @@ const ( RTFilesystemRequest = 16 RTInitialTransferRequest = 17 RTIncrementalTransferRequest = 18 + RTFilesystemVersionsRequest = 19 ) type RequestHeader struct { @@ -21,6 +23,10 @@ type FilesystemRequest struct { Roots []string } +type FilesystemVersionsRequest struct { + Filesystem zfs.DatasetPath +} + type InitialTransferRequest struct { Snapshot string // tank/my/db@ljlsdjflksdf } @@ -60,6 +66,7 @@ const ( ROK = 1 RFilesystems = 2 RChunkedStream = 3 + RFilesystemDiff = 4 ) type ResponseHeader struct {