zrepl/rpc/versionhandshake/versionhandshake.go

201 lines
5.9 KiB
Go
Raw Normal View History

// Package versionhandshake wraps a transport.{Connecter,AuthenticatedListener}
// to add an exchange of protocol version information on connection establishment.
//
// The protocol version information (banner) is plain text, thus making it
// easy to diagnose issues with standard tools.
package versionhandshake
import (
"bytes"
"fmt"
"io"
"net"
"strings"
"time"
"unicode/utf8"
)
type HandshakeMessage struct {
ProtocolVersion int
2019-03-22 19:41:12 +01:00
Extensions []string
}
// A HandshakeError describes what went wrong during the handshake.
// It implements net.Error and is always temporary.
type HandshakeError struct {
msg string
// If not nil, the underlying IO error that caused the handshake to fail.
2019-03-22 19:41:12 +01:00
IOError error
isAcceptError bool
}
var _ net.Error = &HandshakeError{}
func (e HandshakeError) Error() string { return e.msg }
// Like with net.OpErr (Go issue 6163), a client failing to handshake
// should be a temporary Accept error toward the Listener .
func (e HandshakeError) Temporary() bool {
2019-03-22 19:41:12 +01:00
if e.isAcceptError {
return true
}
2019-03-22 19:41:12 +01:00
te, ok := e.IOError.(interface{ Temporary() bool })
return ok && te.Temporary()
}
// If the underlying IOError was net.Error.Timeout(), Timeout() returns that value.
// Otherwise false.
func (e HandshakeError) Timeout() bool {
if neterr, ok := e.IOError.(net.Error); ok {
return neterr.Timeout()
}
return false
}
2019-03-22 19:41:12 +01:00
func hsErr(format string, args ...interface{}) *HandshakeError {
return &HandshakeError{msg: fmt.Sprintf(format, args...)}
}
2019-03-22 19:41:12 +01:00
func hsIOErr(err error, format string, args ...interface{}) *HandshakeError {
return &HandshakeError{IOError: err, msg: fmt.Sprintf(format, args...)}
}
// MaxProtocolVersion is the maximum allowed protocol version.
// This is a protocol constant, changing it may break the wire format.
const MaxProtocolVersion = 9999
// Only returns *HandshakeError as error.
func (m *HandshakeMessage) Encode() ([]byte, error) {
if m.ProtocolVersion <= 0 || m.ProtocolVersion > MaxProtocolVersion {
return nil, hsErr(fmt.Sprintf("protocol version must be in [1, %d]", MaxProtocolVersion))
}
if len(m.Extensions) >= MaxProtocolVersion {
return nil, hsErr(fmt.Sprintf("protocol only supports [0, %d] extensions", MaxProtocolVersion))
}
// EXTENSIONS is a count of subsequent \n separated lines that contain protocol extensions
var extensions strings.Builder
for i, ext := range m.Extensions {
if strings.ContainsAny(ext, "\n") {
return nil, hsErr("Extension #%d contains forbidden newline character", i)
}
if !utf8.ValidString(ext) {
return nil, hsErr("Extension #%d is not valid UTF-8", i)
}
extensions.WriteString(ext)
extensions.WriteString("\n")
}
withoutLen := fmt.Sprintf("ZREPL_ZFS_REPLICATION PROTOVERSION=%04d EXTENSIONS=%04d\n%s",
m.ProtocolVersion, len(m.Extensions), extensions.String())
withLen := fmt.Sprintf("%010d %s", len(withoutLen), withoutLen)
return []byte(withLen), nil
}
func (m *HandshakeMessage) DecodeReader(r io.Reader, maxLen int) error {
var lenAndSpace [11]byte
if _, err := io.ReadFull(r, lenAndSpace[:]); err != nil {
return hsIOErr(err, "error reading protocol banner length: %s", err)
}
if !utf8.Valid(lenAndSpace[:]) {
return hsErr("invalid start of handshake message: not valid UTF-8")
}
var followLen int
n, err := fmt.Sscanf(string(lenAndSpace[:]), "%010d ", &followLen)
if n != 1 || err != nil {
return hsErr("could not parse handshake message length")
}
if followLen > maxLen {
return hsErr("handshake message length exceeds max length (%d vs %d)",
followLen, maxLen)
}
var buf bytes.Buffer
_, err = io.Copy(&buf, io.LimitReader(r, int64(followLen)))
if err != nil {
return hsIOErr(err, "error reading protocol banner body: %s", err)
}
var (
protoVersion, extensionCount int
)
n, err = fmt.Fscanf(&buf, "ZREPL_ZFS_REPLICATION PROTOVERSION=%04d EXTENSIONS=%4d\n",
&protoVersion, &extensionCount)
if n != 2 || err != nil {
return hsErr("could not parse handshake message: %s", err)
}
if protoVersion < 1 {
return hsErr("invalid protocol version %q", protoVersion)
}
m.ProtocolVersion = protoVersion
if extensionCount < 0 {
return hsErr("invalid extension count %q", extensionCount)
}
if extensionCount == 0 {
if buf.Len() != 0 {
return hsErr("unexpected data trailing after header")
}
m.Extensions = nil
return nil
}
s := buf.String()
if strings.Count(s, "\n") != extensionCount {
return hsErr("inconsistent extension count: found %d, header says %d", len(m.Extensions), extensionCount)
}
exts := strings.Split(s, "\n")
if exts[len(exts)-1] != "" {
return hsErr("unexpected data trailing after last extension newline")
}
2019-03-22 19:41:12 +01:00
m.Extensions = exts[0 : len(exts)-1]
return nil
}
func DoHandshakeCurrentVersion(conn net.Conn, deadline time.Time) *HandshakeError {
// current protocol version is hardcoded here
endpoint: refactor, fix stale holds on initial replication failure, zfs-abstractions subcmd, more efficient ZFS queries The motivation for this recatoring are based on two independent issues: - @JMoVS found that the changes merged as part of #259 slowed his OS X based installation down significantly. Analysis of the zfs command logging introduced in #296 showed that `zfs holds` took most of the execution time, and they pointed out that not all of those `zfs holds` invocations were actually necessary. I.e.: zrepl was inefficient about retrieving information from ZFS. - @InsanePrawn found that failures on initial replication would lead to step holds accumulating on the sending side, i.e. they would never be cleaned up in the HintMostRecentCommonAncestor RPC handler. That was because we only sent that RPC if there was a most recent common ancestor detected during replication planning. @InsanePrawn prototyped an implementation of a `zrepl zfs-abstractions release` command to mitigate the situation. As part of that development work and back-and-forth with @problame, it became evident that the abstractions that #259 built on top of zfs in package endpoint (step holds, replication cursor, last-received-hold), were not well-represented for re-use in the `zrepl zfs-abstractions release` subocommand prototype. This commit refactors package endpoint to address both of these issues: - endpoint abstractions now share an interface `Abstraction` that, among other things, provides a uniform `Destroy()` method. However, that method should not be destroyed directly but instead the package-level `BatchDestroy` function should be used in order to allow for a migration to zfs channel programs in the future. - endpoint now has a query facitilty (`ListAbstractions`) which is used to find on-disk - step holds and bookmarks - replication cursors (v1, v2) - last-received-holds By describing the query in a struct, we can centralized the retrieval of information via the ZFS CLI and only have to be clever once. We are "clever" in the following ways: - When asking for hold-based abstractions, we only run `zfs holds` on snapshot that have `userrefs` > 0 - To support this functionality, add field `UserRefs` to zfs.FilesystemVersion and retrieve it anywhere we retrieve zfs.FilesystemVersion from ZFS. - When asking only for bookmark-based abstractions, we only run `zfs list -t bookmark`, not with snapshots. - Currently unused (except for CLI) per-filesystem concurrent lookup - Option to only include abstractions with CreateTXG in a specified range - refactor `endpoint`'s various ZFS info retrieval methods to use `ListAbstractions` - rename the `zrepl holds list` command to `zrepl zfs-abstractions list` - make `zrepl zfs-abstractions list` consume endpoint.ListAbstractions - Add a `ListStale` method which, given a query template, lists stale holds and bookmarks. - it uses replication cursor has different modes - the new `zrepl zfs-abstractions release-{all,stale}` commands can be used to remove abstractions of package endpoint - Adjust HintMostRecentCommonAncestor RPC for stale-holds cleanup: - send it also if no most recent common ancestor exists between sender and receiver - have the sender clean up its abstractions when it receives the RPC with no most recent common ancestor, using `ListStale` - Due to changed semantics, bump the protocol version. - Adjust HintMostRecentCommonAncestor RPC for performance problems encountered by @JMoVS - by default, per (job,fs)-combination, only consider cleaning step holds in the createtxg range `[last replication cursor,conservatively-estimated-receive-side-version)` - this behavior ensures resumability at cost proportional to the time that replication was donw - however, as explained in a comment, we might leak holds if the zrepl daemon stops running - that trade-off is acceptable because in the presumably rare this might happen the user has two tools at their hand: - Tool 1: run `zrepl zfs-abstractions release-stale` - Tool 2: use env var `ZREPL_ENDPOINT_SENDER_HINT_MOST_RECENT_STEP_HOLD_CLEANUP_MODE` to adjust the lower bound of the createtxg range (search for it in the code). The env var can also be used to disable hold-cleanup on the send-side entirely. supersedes closes #293 supersedes closes #282 fixes #280 fixes #278 Additionaly, we fixed a couple of bugs: - zfs: fix half-nil error reporting of dataset-does-not-exist for ZFSListChan and ZFSBookmark - endpoint: Sender's `HintMostRecentCommonAncestor` handler would not check whether access to the specified filesystem was allowed.
2020-03-26 23:43:17 +01:00
return DoHandshakeVersion(conn, deadline, 3)
}
const HandshakeMessageMaxLen = 16 * 4096
func DoHandshakeVersion(conn net.Conn, deadline time.Time, version int) (rErr *HandshakeError) {
ours := HandshakeMessage{
ProtocolVersion: version,
2019-03-22 19:41:12 +01:00
Extensions: nil,
}
hsb, err := ours.Encode()
if err != nil {
return hsErr("could not encode protocol banner: %s", err)
}
err = conn.SetDeadline(deadline)
if err != nil {
return hsErr("could not set deadline for protocol banner handshake: %s", err)
}
defer func() {
if rErr != nil {
return
}
err := conn.SetDeadline(time.Time{})
if err != nil {
rErr = hsErr("could not reset deadline after protocol banner handshake: %s", err)
}
}()
_, err = io.Copy(conn, bytes.NewBuffer(hsb))
if err != nil {
return hsErr("could not send protocol banner: %s", err)
}
theirs := HandshakeMessage{}
if err := theirs.DecodeReader(conn, HandshakeMessageMaxLen); err != nil {
return hsErr("could not decode protocol banner: %s", err)
}
if theirs.ProtocolVersion != ours.ProtocolVersion {
return hsErr("protocol versions do not match: ours is %d, theirs is %d",
ours.ProtocolVersion, theirs.ProtocolVersion)
}
// ignore extensions, we don't use them
return nil
}