From d12ab6cd57a5f58557c430f5897a83fd829c075f Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 18 Aug 2017 23:52:33 +0200 Subject: [PATCH] prototype resumable send & recv --- cmd/handler.go | 40 ++++++++++++ cmd/replication.go | 81 +++++++++++++++++++++++++ rpc/resumetransfererrorreason_string.go | 16 +++++ rpc/rpc.go | 47 ++++++++++++++ rpc/structs.go | 25 ++++++++ zfs/diff.go | 16 ++++- zfs/zfs.go | 12 ++++ 7 files changed, 234 insertions(+), 3 deletions(-) create mode 100644 rpc/resumetransfererrorreason_string.go diff --git a/cmd/handler.go b/cmd/handler.go index 9ff0eb2..49bb8a8 100644 --- a/cmd/handler.go +++ b/cmd/handler.go @@ -87,6 +87,46 @@ func (h Handler) HandleIncrementalTransferRequest(r rpc.IncrementalTransferReque } +func (h Handler) HandleResumeTransferRequest(r rpc.ResumeTransferRequest) (stream io.Reader, err error) { + // decode receive_resume_token: zfs send -nvP + // A) use exit code to determine if could send (exit != 0 means could not send) + // B) check ACL if the filesystem in toname field (it is a snapshot so strip its snapshot name first) + // is allowed for the given client + /* + + # zfs send -nvt will print nvlist contents on ZoL and FreeBSD, should be good enough + # will print regardless of whether it can send or not -> good for us + # need clean way to extract that... + # expect 'resume token contents:\nnvlist version: 0\n' + # parse everything after that is tab-indented as key value pairs separated by = sign + # => return dict + + zfs send -nv -t 1-c6491acbe-c8-789c636064000310a500c4ec50360710e72765a526973030b0419460caa7a515a79630c001489e0d493ea9b224b5182451885d7f497e7a69660a0343f79b1b9a8a2b3db65b20c97382e5f312735319188a4bf28b12d353f5931293b34b0b8af5ab8a520b72f4d3f2f31d8a53f35220660300c1091dbe + resume token contents: + nvlist version: 0 + object = 0x6 + offset = 0x0 + bytes = 0x7100 + toguid = 0xb748a92129d8ec8b + toname = storage/backups/zrepl/foo@send + cannot resume send: 'storage/backups/zrepl/foo@send' used in the initial send no longer exists + + zfs send -nvt 1-ebbbbea7e-f0-789c636064000310a501c49c50360710a715e5e7a69766a6304041f79b1b9a8a2b3db62b00d9ec48eaf293b252934b181858a0ea30e4d3d28a534b18e00024cf86249f5459925a0ca44fc861d75f920f71c59c5fdf6f7b3eea32b24092e704cbe725e6a632301497e41725a6a7ea27252667971614eb5715a516e4e8a7e5e73b14a7e6a51881cd06005a222749 + resume token contents: + nvlist version: 0 + fromguid = 0xb748a92129d8ec8b #NOTE the fromguid field which is only in this one, so don't hardcode + object = 0x4 + offset = 0x0 + bytes = 0x1ec8 + toguid = 0x328ae249dbf7fa9c + toname = storage/backups/zrepl/foo@send2 + send from storage/backups/zrepl/foo@send to storage/backups/zrepl/foo@send2 estimated size is 1.02M + + */ + panic("not implemented") + return nil, nil +} + func (h Handler) HandlePullMeRequest(r rpc.PullMeRequest, clientIdentity string, client rpc.RPCRequester) (err error) { // Check if we have a sink for this request diff --git a/cmd/replication.go b/cmd/replication.go index 2d3941e..94eefb3 100644 --- a/cmd/replication.go +++ b/cmd/replication.go @@ -325,6 +325,87 @@ func doPull(pull PullContext) (err error) { log("local filesystem does not exist") case localState.Placeholder: log("local filesystem is marked as placeholder") + case localState.ResumeToken != "": + + log("local filesystem has receive_resume_token") + if false { // TODO Check if managed resume is disabled (explain information leakage in docs!) + log("managed resume tokens are disabled via zrepl config, assuming config change or external administrative action") + log("policy forbids aborting the partial recv automatically, skipping this filesystem") + log("for zrepl to resume replication for this dataset, administrators should finishing the recv manually or abort the recv via `zfs recv -A %s`", m.Local.ToString()) + return true // TODO right choice to allow children? ... should be, since the fs exists... + } + + log("decoding receive_resume_token") + // TODO, see handler comments + // TODO override logger with fromguid toguid + + log("requesting resume of transfer") + r := rpc.ResumeTransferRequest{m.Remote, localState.ResumeToken} + stream, err := remote.ResumeTransferRequest(r) + if err != nil { + + log("resume transfer request failed: %s", err) + rre, ok := err.(*rpc.ResumeTransferError) + if !ok { + log("skipping this filesystem, could be temporary issue") + return true // TODO right choice to allow children + } + + // Determine if we should clear the resume token + clearAndUseSnaps := false + switch rre.Reason { + case rpc.ResumeTransferErrorReasonNotImplemented: + fallthrough + case rpc.ResumeTransferErrorReasonDisabled: + fallthrough + case rpc.ResumeTransferErrorReasonZFSErrorPermanent: + clearAndUseSnaps = true + + case rpc.ResumeTransferErrorReasonZFSErrorMaybeTemporary: + fallthrough + default: + clearAndUseSnaps = false + } + + if !clearAndUseSnaps { + log("skipping this filesystem, error identified as temporary") + return true // TODO right choice to allow children + } + + log("clearing local receive_resume_token") + if err := zfs.ZFSRecvAbort(m.Local); err != nil { + log("error clearing receive_resume_token: %s", err) + return true // TODO right choice to allow children? the filesystem exists, so it should be ok + } + // TODO go back to top of function (put all this into separate function, then tail recursive call) + // TODO return this_function() + } + + // TODO warning code duplication, see below. need to unify this + log("invoking zfs receive") + watcher := util.IOProgressWatcher{Reader: stream} + watcher.KickOff(1*time.Second, func(p util.IOProgress) { + log("progress on receive operation: %v bytes received", p.TotalRX) + }) + + recvArgs := []string{"-u"} + if localState.Placeholder { + log("receive with forced rollback to replace placeholder filesystem") + recvArgs = append(recvArgs, "-F") + } + + if err = zfs.ZFSRecv(m.Local, &watcher, recvArgs...); err != nil { + log("error receiving stream: %s", err) + return false + } + log("finished receiving stream, %v bytes total", watcher.Progress().TotalRX) + + // TODO further problem property handling code must be duplicated here... + + // TODO tail recursive call to this function, we might not be dony syncing yet + // TODO return this_function() + return true + default: log("local filesystem exists") log("requesting local filesystem versions") diff --git a/rpc/resumetransfererrorreason_string.go b/rpc/resumetransfererrorreason_string.go new file mode 100644 index 0000000..6f952d0 --- /dev/null +++ b/rpc/resumetransfererrorreason_string.go @@ -0,0 +1,16 @@ +// Code generated by "stringer -type ResumeTransferErrorReason"; DO NOT EDIT. + +package rpc + +import "fmt" + +const _ResumeTransferErrorReason_name = "ResumeTransferErrorReasonNotImplementedResumeTransferErrorReasonDisabledResumeTransferErrorReasonZFSErrorPermanentResumeTransferErrorReasonZFSErrorMaybeTemporary" + +var _ResumeTransferErrorReason_index = [...]uint8{0, 39, 72, 114, 161} + +func (i ResumeTransferErrorReason) String() string { + if i >= ResumeTransferErrorReason(len(_ResumeTransferErrorReason_index)-1) { + return fmt.Sprintf("ResumeTransferErrorReason(%d)", i) + } + return _ResumeTransferErrorReason_name[_ResumeTransferErrorReason_index[i]:_ResumeTransferErrorReason_index[i+1]] +} diff --git a/rpc/rpc.go b/rpc/rpc.go index 99e045c..98104d1 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -17,6 +17,7 @@ type RPCRequester interface { FilesystemVersionsRequest(r FilesystemVersionsRequest) (versions []zfs.FilesystemVersion, err error) InitialTransferRequest(r InitialTransferRequest) (io.Reader, error) IncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error) + ResumeTransferRequest(r ResumeTransferRequest) (io.Reader, error) PullMeRequest(r PullMeRequest, handler RPCHandler) (err error) CloseRequest(r CloseRequest) (err error) ForceClose() (err error) @@ -30,6 +31,7 @@ type RPCHandler interface { HandleInitialTransferRequest(r InitialTransferRequest) (io.Reader, error) HandleIncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error) + HandleResumeTransferRequest(r ResumeTransferRequest) (io.Reader, error) // invert roles, i.e. handler becomes server and performs the requested pull using the client connection HandlePullMeRequest(r PullMeRequest, clientIdentity string, client RPCRequester) (err error) @@ -284,6 +286,37 @@ func (c ByteStreamRPC) serverLoop(handler RPCHandler) error { log.Printf("finished sending incremental snapshot stream: total %v bytes sent", watcher.Progress().TotalRX) } + case RTResumeTransferRequest: + + var rq ResumeTransferRequest + if err := recv(&rq); err != nil { + sendError(EDecodeRequestBody, "") + return conn.Close() + } + + stream, err := handler.HandleResumeTransferRequest(rq) + if err != nil { + sendError(EHandler, err.Error()) + } else { + r := ResponseHeader{ + RequestId: header.Id, + ResponseType: RChunkedStream, + } + send(&r) + + chunker := NewChunker(stream) + + watcher := IOProgressWatcher{Reader: &chunker} + watcher.KickOff(1*time.Second, func(p IOProgress) { + log.Printf("progress sending resumed stream: %v bytes sent", p.TotalRX) + }) + _, err := io.Copy(conn, &watcher) + if err != nil { + panic(err) + } + log.Printf("finished sending resumed stream: total %v bytes sent", watcher.Progress().TotalRX) + } + case RTPullMeRequest: var rq PullMeRequest @@ -375,6 +408,8 @@ func inferRequestType(v interface{}) (RequestType, error) { return RTInitialTransferRequest, nil case IncrementalTransferRequest: return RTIncrementalTransferRequest, nil + case ResumeTransferRequest: + return RTResumeTransferRequest, nil case PullMeRequest: return RTPullMeRequest, nil case CloseRequest: @@ -491,6 +526,14 @@ func (c ByteStreamRPC) IncrementalTransferRequest(r IncrementalTransferRequest) return } +func (c ByteStreamRPC) ResumeTransferRequest(r ResumeTransferRequest) (io.Reader, error) { + // TODO reconstruct ResumeTransferError + if err := c.sendRequestReceiveHeader(r, RChunkedStream); err != nil { + return nil, err + } + return NewUnchunker(c.conn), nil +} + func (c ByteStreamRPC) PullMeRequest(r PullMeRequest, handler RPCHandler) (err error) { err = c.sendRequestReceiveHeader(r, ROK) return c.serverLoop(handler) @@ -533,6 +576,10 @@ func (c LocalRPC) IncrementalTransferRequest(r IncrementalTransferRequest) (read return } +func (c LocalRPC) ResumeTransferRequest(r ResumeTransferRequest) (io.Reader, error) { + return c.handler.HandleResumeTransferRequest(r) +} + func (c LocalRPC) PullMeRequest(r PullMeRequest, handler RPCHandler) (err error) { // The config syntactically only allows local Pulls, hence this is never called // In theory, the following line should work: diff --git a/rpc/structs.go b/rpc/structs.go index 2a4c7a8..3cafe48 100644 --- a/rpc/structs.go +++ b/rpc/structs.go @@ -19,6 +19,7 @@ const ( RTFilesystemVersionsRequest = 0x11 RTInitialTransferRequest = 0x12 RTIncrementalTransferRequest = 0x13 + RTResumeTransferRequest = 0x14 RTPullMeRequest = 0x20 RTCloseRequest = 0xf0 ) @@ -51,6 +52,30 @@ type IncrementalTransferRequest struct { To zfs.FilesystemVersion } +type ResumeTransferRequest struct { + Filesystem *zfs.DatasetPath + Token string +} + +type ResumeTransferError struct { + Reason ResumeTransferErrorReason + ZFSError string +} + +func (e *ResumeTransferError) Error() string { + return e.Reason.String() +} + +//go:generate stringer -type ResumeTransferErrorReason +type ResumeTransferErrorReason uint8 + +const ( + ResumeTransferErrorReasonNotImplemented ResumeTransferErrorReason = iota + ResumeTransferErrorReasonDisabled + ResumeTransferErrorReasonZFSErrorPermanent + ResumeTransferErrorReasonZFSErrorMaybeTemporary +) + func (r IncrementalTransferRequest) Respond(snapshotReader io.Reader) { } diff --git a/zfs/diff.go b/zfs/diff.go index 2421f64..c383806 100644 --- a/zfs/diff.go +++ b/zfs/diff.go @@ -189,15 +189,21 @@ const ZREPL_PLACEHOLDER_PROPERTY_NAME string = "zrepl:placeholder" type FilesystemState struct { Placeholder bool - // TODO extend with resume token when that feature is finally added + // If != "", the receive_resume_token found on the receiving side of a resumable send & recv + ResumeToken string } // A somewhat efficient way to determine if a filesystem exists on this host. // Particularly useful if exists is called more than once (will only fork exec once and cache the result) func ZFSListFilesystemState() (localState map[string]FilesystemState, err error) { + properties := []string{"name", ZREPL_PLACEHOLDER_PROPERTY_NAME} + if CLICompat.ResumableSendRecv { + properties = append(properties, "receive_resume_token") + } + var actual [][]string - if actual, err = ZFSList([]string{"name", ZREPL_PLACEHOLDER_PROPERTY_NAME}, "-t", "filesystem,volume"); err != nil { + if actual, err = ZFSList(properties, "-t", "filesystem,volume"); err != nil { return } @@ -208,8 +214,12 @@ func ZFSListFilesystemState() (localState map[string]FilesystemState, err error) fmt.Errorf("ZFS does not return parseable dataset path: %s", e[0]) } placeholder, _ := IsPlaceholder(dp, e[1]) + receive_resume_token := "" + if CLICompat.ResumableSendRecv { + receive_resume_token = e[2] + } localState[e[0]] = FilesystemState{ - placeholder, + placeholder, receive_resume_token, } } return diff --git a/zfs/zfs.go b/zfs/zfs.go index 2b8005a..2d89937 100644 --- a/zfs/zfs.go +++ b/zfs/zfs.go @@ -13,6 +13,11 @@ import ( "github.com/zrepl/zrepl/util" ) +var CLICompat struct { + // whether the CLI supports resumable send & recv + ResumableSendRecv bool +} + type DatasetPath struct { comps []string } @@ -239,6 +244,13 @@ func ZFSRecv(fs *DatasetPath, stream io.Reader, additionalArgs ...string) (err e return nil } +func ZFSRecvAbort(fs *DatasetPath) (err error) { + if !CLICompat.ResumableSendRecv { + panic("decide if this is an error") + } + panic("not implemented") +} + func ZFSSet(fs *DatasetPath, prop, val string) (err error) { if strings.ContainsRune(prop, '=') {