mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-22 08:23:50 +01:00
rpc: add FilesystemVersionsRequest
This commit is contained in:
parent
f46fb2f34f
commit
43f67d2b7c
@ -24,6 +24,9 @@ func (h Handler) HandleFilesystemRequest(r rpc.FilesystemRequest) (roots []model
|
||||
return
|
||||
}
|
||||
|
||||
func (h Handler) HandleFilesystemVersionsRequest(r rpc.FilesystemVersionsRequest) (versions []zfs.FilesystemVersion, err error) {
|
||||
}
|
||||
|
||||
func (h Handler) HandleInitialTransferRequest(r rpc.InitialTransferRequest) (io.Reader, error) {
|
||||
// TODO ACL
|
||||
return zfs.InitialSend(r.Snapshot)
|
||||
|
47
rpc/rpc.go
47
rpc/rpc.go
@ -6,18 +6,24 @@ import (
|
||||
"fmt"
|
||||
. "github.com/zrepl/zrepl/model"
|
||||
. "github.com/zrepl/zrepl/util"
|
||||
"github.com/zrepl/zrepl/zfs"
|
||||
"io"
|
||||
"reflect"
|
||||
)
|
||||
|
||||
type RPCRequester interface {
|
||||
FilesystemRequest(r FilesystemRequest) (roots []Filesystem, err error)
|
||||
FilesystemVersionsRequest(r FilesystemVersionsRequest) (versions []zfs.FilesystemVersion, err error)
|
||||
InitialTransferRequest(r InitialTransferRequest) (io.Reader, error)
|
||||
IncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error)
|
||||
}
|
||||
|
||||
type RPCHandler interface {
|
||||
HandleFilesystemRequest(r FilesystemRequest) (roots []Filesystem, err error)
|
||||
|
||||
// returned versions ordered by birthtime, oldest first
|
||||
HandleFilesystemVersionsRequest(r FilesystemVersionsRequest) (versions []zfs.FilesystemVersion, err error)
|
||||
|
||||
HandleInitialTransferRequest(r InitialTransferRequest) (io.Reader, error)
|
||||
HandleIncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error)
|
||||
}
|
||||
@ -116,6 +122,31 @@ func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) error {
|
||||
}
|
||||
}
|
||||
|
||||
case RTFilesystemVersionsRequest:
|
||||
|
||||
var rq FilesystemVersionsRequest
|
||||
if err := decoder.Decode(&rq); err != nil {
|
||||
respondWithError(encoder, EDecodeRequestBody, err)
|
||||
return err
|
||||
}
|
||||
|
||||
diff, err := handler.HandleFilesystemVersionsRequest(rq)
|
||||
if err != nil {
|
||||
respondWithError(encoder, EHandler, err)
|
||||
return err
|
||||
} else {
|
||||
r := ResponseHeader{
|
||||
RequestId: header.Id,
|
||||
ResponseType: RFilesystemDiff,
|
||||
}
|
||||
if err := encoder.Encode(&r); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := encoder.Encode(&diff); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
case RTInitialTransferRequest:
|
||||
var rq InitialTransferRequest
|
||||
if err := decoder.Decode(&rq); err != nil {
|
||||
@ -182,6 +213,8 @@ func inferRequestType(v interface{}) (RequestType, error) {
|
||||
return RTProtocolVersionRequest, nil
|
||||
case FilesystemRequest:
|
||||
return RTFilesystemRequest, nil
|
||||
case FilesystemVersionsRequest:
|
||||
return RTFilesystemVersionsRequest, nil
|
||||
case InitialTransferRequest:
|
||||
return RTInitialTransferRequest, nil
|
||||
default:
|
||||
@ -266,6 +299,16 @@ func (c ByteStreamRPC) FilesystemRequest(r FilesystemRequest) (roots []Filesyste
|
||||
return
|
||||
}
|
||||
|
||||
func (c ByteStreamRPC) FilesystemVersionsRequest(r FilesystemVersionsRequest) (versions []zfs.FilesystemVersion, err error) {
|
||||
|
||||
if err = c.sendRequestReceiveHeader(r, RFilesystemDiff); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = c.decoder.Decode(&versions)
|
||||
return
|
||||
}
|
||||
|
||||
func (c ByteStreamRPC) InitialTransferRequest(r InitialTransferRequest) (unchunker io.Reader, err error) {
|
||||
|
||||
if err = c.sendRequestReceiveHeader(r, RChunkedStream); err != nil {
|
||||
@ -295,6 +338,10 @@ func (c LocalRPC) FilesystemRequest(r FilesystemRequest) (roots []Filesystem, er
|
||||
return c.handler.HandleFilesystemRequest(r)
|
||||
}
|
||||
|
||||
func (c LocalRPC) FilesystemVersionsRequest(r FilesystemVersionsRequest) (versions []zfs.FilesystemVersion, err error) {
|
||||
return c.handler.HandleFilesystemVersionsRequest(r)
|
||||
}
|
||||
|
||||
func (c LocalRPC) InitialTransferRequest(r InitialTransferRequest) (io.Reader, error) {
|
||||
return c.handler.HandleInitialTransferRequest(r)
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package rpc
|
||||
|
||||
import "io"
|
||||
import "github.com/zrepl/zrepl/zfs"
|
||||
|
||||
type RequestId [16]byte
|
||||
type RequestType uint8
|
||||
@ -10,6 +11,7 @@ const (
|
||||
RTFilesystemRequest = 16
|
||||
RTInitialTransferRequest = 17
|
||||
RTIncrementalTransferRequest = 18
|
||||
RTFilesystemVersionsRequest = 19
|
||||
)
|
||||
|
||||
type RequestHeader struct {
|
||||
@ -21,6 +23,10 @@ type FilesystemRequest struct {
|
||||
Roots []string
|
||||
}
|
||||
|
||||
type FilesystemVersionsRequest struct {
|
||||
Filesystem zfs.DatasetPath
|
||||
}
|
||||
|
||||
type InitialTransferRequest struct {
|
||||
Snapshot string // tank/my/db@ljlsdjflksdf
|
||||
}
|
||||
@ -60,6 +66,7 @@ const (
|
||||
ROK = 1
|
||||
RFilesystems = 2
|
||||
RChunkedStream = 3
|
||||
RFilesystemDiff = 4
|
||||
)
|
||||
|
||||
type ResponseHeader struct {
|
||||
|
Loading…
Reference in New Issue
Block a user