prototype resumable send & recv

This commit is contained in:
Christian Schwarz 2017-08-18 23:52:33 +02:00
parent e5b713ce5b
commit d12ab6cd57
7 changed files with 234 additions and 3 deletions

View File

@ -87,6 +87,46 @@ func (h Handler) HandleIncrementalTransferRequest(r rpc.IncrementalTransferReque
} }
func (h Handler) HandleResumeTransferRequest(r rpc.ResumeTransferRequest) (stream io.Reader, err error) {
// decode receive_resume_token: zfs send -nvP <token>
// A) use exit code to determine if could send (exit != 0 means could not send)
// B) check ACL if the filesystem in toname field (it is a snapshot so strip its snapshot name first)
// is allowed for the given client
/*
# zfs send -nvt will print nvlist contents on ZoL and FreeBSD, should be good enough
# will print regardless of whether it can send or not -> good for us
# need clean way to extract that...
# expect 'resume token contents:\nnvlist version: 0\n'
# parse everything after that is tab-indented as key value pairs separated by = sign
# => return dict
zfs send -nv -t 1-c6491acbe-c8-789c636064000310a500c4ec50360710e72765a526973030b0419460caa7a515a79630c001489e0d493ea9b224b5182451885d7f497e7a69660a0343f79b1b9a8a2b3db65b20c97382e5f312735319188a4bf28b12d353f5931293b34b0b8af5ab8a520b72f4d3f2f31d8a53f35220660300c1091dbe
resume token contents:
nvlist version: 0
object = 0x6
offset = 0x0
bytes = 0x7100
toguid = 0xb748a92129d8ec8b
toname = storage/backups/zrepl/foo@send
cannot resume send: 'storage/backups/zrepl/foo@send' used in the initial send no longer exists
zfs send -nvt 1-ebbbbea7e-f0-789c636064000310a501c49c50360710a715e5e7a69766a6304041f79b1b9a8a2b3db62b00d9ec48eaf293b252934b181858a0ea30e4d3d28a534b18e00024cf86249f5459925a0ca44fc861d75f920f71c59c5fdf6f7b3eea32b24092e704cbe725e6a632301497e41725a6a7ea27252667971614eb5715a516e4e8a7e5e73b14a7e6a51881cd06005a222749
resume token contents:
nvlist version: 0
fromguid = 0xb748a92129d8ec8b #NOTE the fromguid field which is only in this one, so don't hardcode
object = 0x4
offset = 0x0
bytes = 0x1ec8
toguid = 0x328ae249dbf7fa9c
toname = storage/backups/zrepl/foo@send2
send from storage/backups/zrepl/foo@send to storage/backups/zrepl/foo@send2 estimated size is 1.02M
*/
panic("not implemented")
return nil, nil
}
func (h Handler) HandlePullMeRequest(r rpc.PullMeRequest, clientIdentity string, client rpc.RPCRequester) (err error) { func (h Handler) HandlePullMeRequest(r rpc.PullMeRequest, clientIdentity string, client rpc.RPCRequester) (err error) {
// Check if we have a sink for this request // Check if we have a sink for this request

View File

@ -325,6 +325,87 @@ func doPull(pull PullContext) (err error) {
log("local filesystem does not exist") log("local filesystem does not exist")
case localState.Placeholder: case localState.Placeholder:
log("local filesystem is marked as placeholder") log("local filesystem is marked as placeholder")
case localState.ResumeToken != "":
log("local filesystem has receive_resume_token")
if false { // TODO Check if managed resume is disabled (explain information leakage in docs!)
log("managed resume tokens are disabled via zrepl config, assuming config change or external administrative action")
log("policy forbids aborting the partial recv automatically, skipping this filesystem")
log("for zrepl to resume replication for this dataset, administrators should finishing the recv manually or abort the recv via `zfs recv -A %s`", m.Local.ToString())
return true // TODO right choice to allow children? ... should be, since the fs exists...
}
log("decoding receive_resume_token")
// TODO, see handler comments
// TODO override logger with fromguid toguid
log("requesting resume of transfer")
r := rpc.ResumeTransferRequest{m.Remote, localState.ResumeToken}
stream, err := remote.ResumeTransferRequest(r)
if err != nil {
log("resume transfer request failed: %s", err)
rre, ok := err.(*rpc.ResumeTransferError)
if !ok {
log("skipping this filesystem, could be temporary issue")
return true // TODO right choice to allow children
}
// Determine if we should clear the resume token
clearAndUseSnaps := false
switch rre.Reason {
case rpc.ResumeTransferErrorReasonNotImplemented:
fallthrough
case rpc.ResumeTransferErrorReasonDisabled:
fallthrough
case rpc.ResumeTransferErrorReasonZFSErrorPermanent:
clearAndUseSnaps = true
case rpc.ResumeTransferErrorReasonZFSErrorMaybeTemporary:
fallthrough
default:
clearAndUseSnaps = false
}
if !clearAndUseSnaps {
log("skipping this filesystem, error identified as temporary")
return true // TODO right choice to allow children
}
log("clearing local receive_resume_token")
if err := zfs.ZFSRecvAbort(m.Local); err != nil {
log("error clearing receive_resume_token: %s", err)
return true // TODO right choice to allow children? the filesystem exists, so it should be ok
}
// TODO go back to top of function (put all this into separate function, then tail recursive call)
// TODO return this_function()
}
// TODO warning code duplication, see below. need to unify this
log("invoking zfs receive")
watcher := util.IOProgressWatcher{Reader: stream}
watcher.KickOff(1*time.Second, func(p util.IOProgress) {
log("progress on receive operation: %v bytes received", p.TotalRX)
})
recvArgs := []string{"-u"}
if localState.Placeholder {
log("receive with forced rollback to replace placeholder filesystem")
recvArgs = append(recvArgs, "-F")
}
if err = zfs.ZFSRecv(m.Local, &watcher, recvArgs...); err != nil {
log("error receiving stream: %s", err)
return false
}
log("finished receiving stream, %v bytes total", watcher.Progress().TotalRX)
// TODO further problem property handling code must be duplicated here...
// TODO tail recursive call to this function, we might not be dony syncing yet
// TODO return this_function()
return true
default: default:
log("local filesystem exists") log("local filesystem exists")
log("requesting local filesystem versions") log("requesting local filesystem versions")

View File

@ -0,0 +1,16 @@
// Code generated by "stringer -type ResumeTransferErrorReason"; DO NOT EDIT.
package rpc
import "fmt"
const _ResumeTransferErrorReason_name = "ResumeTransferErrorReasonNotImplementedResumeTransferErrorReasonDisabledResumeTransferErrorReasonZFSErrorPermanentResumeTransferErrorReasonZFSErrorMaybeTemporary"
var _ResumeTransferErrorReason_index = [...]uint8{0, 39, 72, 114, 161}
func (i ResumeTransferErrorReason) String() string {
if i >= ResumeTransferErrorReason(len(_ResumeTransferErrorReason_index)-1) {
return fmt.Sprintf("ResumeTransferErrorReason(%d)", i)
}
return _ResumeTransferErrorReason_name[_ResumeTransferErrorReason_index[i]:_ResumeTransferErrorReason_index[i+1]]
}

View File

@ -17,6 +17,7 @@ type RPCRequester interface {
FilesystemVersionsRequest(r FilesystemVersionsRequest) (versions []zfs.FilesystemVersion, err error) FilesystemVersionsRequest(r FilesystemVersionsRequest) (versions []zfs.FilesystemVersion, err error)
InitialTransferRequest(r InitialTransferRequest) (io.Reader, error) InitialTransferRequest(r InitialTransferRequest) (io.Reader, error)
IncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error) IncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error)
ResumeTransferRequest(r ResumeTransferRequest) (io.Reader, error)
PullMeRequest(r PullMeRequest, handler RPCHandler) (err error) PullMeRequest(r PullMeRequest, handler RPCHandler) (err error)
CloseRequest(r CloseRequest) (err error) CloseRequest(r CloseRequest) (err error)
ForceClose() (err error) ForceClose() (err error)
@ -30,6 +31,7 @@ type RPCHandler interface {
HandleInitialTransferRequest(r InitialTransferRequest) (io.Reader, error) HandleInitialTransferRequest(r InitialTransferRequest) (io.Reader, error)
HandleIncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error) HandleIncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error)
HandleResumeTransferRequest(r ResumeTransferRequest) (io.Reader, error)
// invert roles, i.e. handler becomes server and performs the requested pull using the client connection // invert roles, i.e. handler becomes server and performs the requested pull using the client connection
HandlePullMeRequest(r PullMeRequest, clientIdentity string, client RPCRequester) (err error) HandlePullMeRequest(r PullMeRequest, clientIdentity string, client RPCRequester) (err error)
@ -284,6 +286,37 @@ func (c ByteStreamRPC) serverLoop(handler RPCHandler) error {
log.Printf("finished sending incremental snapshot stream: total %v bytes sent", watcher.Progress().TotalRX) log.Printf("finished sending incremental snapshot stream: total %v bytes sent", watcher.Progress().TotalRX)
} }
case RTResumeTransferRequest:
var rq ResumeTransferRequest
if err := recv(&rq); err != nil {
sendError(EDecodeRequestBody, "")
return conn.Close()
}
stream, err := handler.HandleResumeTransferRequest(rq)
if err != nil {
sendError(EHandler, err.Error())
} else {
r := ResponseHeader{
RequestId: header.Id,
ResponseType: RChunkedStream,
}
send(&r)
chunker := NewChunker(stream)
watcher := IOProgressWatcher{Reader: &chunker}
watcher.KickOff(1*time.Second, func(p IOProgress) {
log.Printf("progress sending resumed stream: %v bytes sent", p.TotalRX)
})
_, err := io.Copy(conn, &watcher)
if err != nil {
panic(err)
}
log.Printf("finished sending resumed stream: total %v bytes sent", watcher.Progress().TotalRX)
}
case RTPullMeRequest: case RTPullMeRequest:
var rq PullMeRequest var rq PullMeRequest
@ -375,6 +408,8 @@ func inferRequestType(v interface{}) (RequestType, error) {
return RTInitialTransferRequest, nil return RTInitialTransferRequest, nil
case IncrementalTransferRequest: case IncrementalTransferRequest:
return RTIncrementalTransferRequest, nil return RTIncrementalTransferRequest, nil
case ResumeTransferRequest:
return RTResumeTransferRequest, nil
case PullMeRequest: case PullMeRequest:
return RTPullMeRequest, nil return RTPullMeRequest, nil
case CloseRequest: case CloseRequest:
@ -491,6 +526,14 @@ func (c ByteStreamRPC) IncrementalTransferRequest(r IncrementalTransferRequest)
return return
} }
func (c ByteStreamRPC) ResumeTransferRequest(r ResumeTransferRequest) (io.Reader, error) {
// TODO reconstruct ResumeTransferError
if err := c.sendRequestReceiveHeader(r, RChunkedStream); err != nil {
return nil, err
}
return NewUnchunker(c.conn), nil
}
func (c ByteStreamRPC) PullMeRequest(r PullMeRequest, handler RPCHandler) (err error) { func (c ByteStreamRPC) PullMeRequest(r PullMeRequest, handler RPCHandler) (err error) {
err = c.sendRequestReceiveHeader(r, ROK) err = c.sendRequestReceiveHeader(r, ROK)
return c.serverLoop(handler) return c.serverLoop(handler)
@ -533,6 +576,10 @@ func (c LocalRPC) IncrementalTransferRequest(r IncrementalTransferRequest) (read
return return
} }
func (c LocalRPC) ResumeTransferRequest(r ResumeTransferRequest) (io.Reader, error) {
return c.handler.HandleResumeTransferRequest(r)
}
func (c LocalRPC) PullMeRequest(r PullMeRequest, handler RPCHandler) (err error) { func (c LocalRPC) PullMeRequest(r PullMeRequest, handler RPCHandler) (err error) {
// The config syntactically only allows local Pulls, hence this is never called // The config syntactically only allows local Pulls, hence this is never called
// In theory, the following line should work: // In theory, the following line should work:

View File

@ -19,6 +19,7 @@ const (
RTFilesystemVersionsRequest = 0x11 RTFilesystemVersionsRequest = 0x11
RTInitialTransferRequest = 0x12 RTInitialTransferRequest = 0x12
RTIncrementalTransferRequest = 0x13 RTIncrementalTransferRequest = 0x13
RTResumeTransferRequest = 0x14
RTPullMeRequest = 0x20 RTPullMeRequest = 0x20
RTCloseRequest = 0xf0 RTCloseRequest = 0xf0
) )
@ -51,6 +52,30 @@ type IncrementalTransferRequest struct {
To zfs.FilesystemVersion To zfs.FilesystemVersion
} }
type ResumeTransferRequest struct {
Filesystem *zfs.DatasetPath
Token string
}
type ResumeTransferError struct {
Reason ResumeTransferErrorReason
ZFSError string
}
func (e *ResumeTransferError) Error() string {
return e.Reason.String()
}
//go:generate stringer -type ResumeTransferErrorReason
type ResumeTransferErrorReason uint8
const (
ResumeTransferErrorReasonNotImplemented ResumeTransferErrorReason = iota
ResumeTransferErrorReasonDisabled
ResumeTransferErrorReasonZFSErrorPermanent
ResumeTransferErrorReasonZFSErrorMaybeTemporary
)
func (r IncrementalTransferRequest) Respond(snapshotReader io.Reader) { func (r IncrementalTransferRequest) Respond(snapshotReader io.Reader) {
} }

View File

@ -189,15 +189,21 @@ const ZREPL_PLACEHOLDER_PROPERTY_NAME string = "zrepl:placeholder"
type FilesystemState struct { type FilesystemState struct {
Placeholder bool Placeholder bool
// TODO extend with resume token when that feature is finally added // If != "", the receive_resume_token found on the receiving side of a resumable send & recv
ResumeToken string
} }
// A somewhat efficient way to determine if a filesystem exists on this host. // A somewhat efficient way to determine if a filesystem exists on this host.
// Particularly useful if exists is called more than once (will only fork exec once and cache the result) // Particularly useful if exists is called more than once (will only fork exec once and cache the result)
func ZFSListFilesystemState() (localState map[string]FilesystemState, err error) { func ZFSListFilesystemState() (localState map[string]FilesystemState, err error) {
properties := []string{"name", ZREPL_PLACEHOLDER_PROPERTY_NAME}
if CLICompat.ResumableSendRecv {
properties = append(properties, "receive_resume_token")
}
var actual [][]string var actual [][]string
if actual, err = ZFSList([]string{"name", ZREPL_PLACEHOLDER_PROPERTY_NAME}, "-t", "filesystem,volume"); err != nil { if actual, err = ZFSList(properties, "-t", "filesystem,volume"); err != nil {
return return
} }
@ -208,8 +214,12 @@ func ZFSListFilesystemState() (localState map[string]FilesystemState, err error)
fmt.Errorf("ZFS does not return parseable dataset path: %s", e[0]) fmt.Errorf("ZFS does not return parseable dataset path: %s", e[0])
} }
placeholder, _ := IsPlaceholder(dp, e[1]) placeholder, _ := IsPlaceholder(dp, e[1])
receive_resume_token := ""
if CLICompat.ResumableSendRecv {
receive_resume_token = e[2]
}
localState[e[0]] = FilesystemState{ localState[e[0]] = FilesystemState{
placeholder, placeholder, receive_resume_token,
} }
} }
return return

View File

@ -13,6 +13,11 @@ import (
"github.com/zrepl/zrepl/util" "github.com/zrepl/zrepl/util"
) )
var CLICompat struct {
// whether the CLI supports resumable send & recv
ResumableSendRecv bool
}
type DatasetPath struct { type DatasetPath struct {
comps []string comps []string
} }
@ -239,6 +244,13 @@ func ZFSRecv(fs *DatasetPath, stream io.Reader, additionalArgs ...string) (err e
return nil return nil
} }
func ZFSRecvAbort(fs *DatasetPath) (err error) {
if !CLICompat.ResumableSendRecv {
panic("decide if this is an error")
}
panic("not implemented")
}
func ZFSSet(fs *DatasetPath, prop, val string) (err error) { func ZFSSet(fs *DatasetPath, prop, val string) (err error) {
if strings.ContainsRune(prop, '=') { if strings.ContainsRune(prop, '=') {